Skip to content

Commit

Permalink
Support multiple outputs per kernel in TransformVolumeData
Browse files Browse the repository at this point in the history
Also removes the requirement that kernels annotate their return value.
  • Loading branch information
nilsvu committed Feb 8, 2023
1 parent 96d25ed commit 2b38b51
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 51 deletions.
195 changes: 150 additions & 45 deletions src/Visualization/Python/TransformVolumeData.py
Expand Up @@ -7,7 +7,7 @@
import re
from dataclasses import dataclass
from pydoc import locate
from typing import Dict, Iterable, List, Optional, Sequence, Type, Union
from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Type, Union

import click
import numpy as np
Expand Down Expand Up @@ -226,6 +226,92 @@ def one_index_constraint(
arg.name, snake_case_to_camel_case(arg.name)))


def parse_kernel_output(output, output_name: str,
num_points: int) -> Dict[str, Tensor]:
"""Transform the return value of a kernel to a dict of tensor datasets
The following return values of kernels are supported:
- Any Tensor. By default, the name of the kernel function transformed to
CamelCase will be used as dataset name.
- A DataVector: will be treated as a scalar.
- A Numpy array: will be treated as a scalar or vector.
- An int or float: will be expanded over the grid as a constant scalar.
- A dict of the above: can be used to return multiple datasets, and to
assign names to them. The keys of the dictionary are used as dataset
names.
- A list of the above: can be used to visualize output from C++ bindings
that return a `std::array` or `std::vector`. For example, a C++ binding
named `truncation_error` that returns 3 numbers (one per dimension) per
element can be visualized with this. The three numbers get expanded as
constants over the volume of the element and returned as three datasets
named 'TruncationError(_1,_2,_3)'.
Arguments:
output: The return value of a kernel function.
output_name: The dataset name. Unused if 'output' is a dict.
num_points: Number of grid points.
Returns: A mapping from dataset names to tensors.
"""
# Single tensor: return directly
try:
output.rank
return {output_name: output}
except AttributeError:
pass
# DataVector: parse as scalar
if isinstance(output, DataVector):
assert len(output) == num_points, (
f"DataVector output named '{output_name}' has size "
f"{len(output)}, but expected size {num_points}.")
return {output_name: Scalar[DataVector]([output])}
# Numpy array: parse as scalar or vector
if isinstance(output, np.ndarray):
assert output.shape[-1] == num_points, (
f"Array output named '{output_name}' has shape {output.shape}, "
f"but expected {num_points} points in last dimension.")
if output.ndim == 1 or (output.ndim == 2 and len(output) == 1):
return {output_name: Scalar[DataVector](output)}
elif output.ndim == 2 and len(output) in [1, 2, 3]:
dim = len(output)
return {output_name: tnsr.I[DataVector, dim](output)}
else:
raise ValueError(
"Kernels may only return Numpy arrays representing "
"scalars or vectors. For higher-rank tensors, construct and "
"return the Tensor. "
f"Kernel output named '{output_name}' has shape "
f"{output.shape}.")
# Single number: parse as constant scalar
if isinstance(output, (int, float)):
return {
output_name: Scalar[DataVector](num_points=num_points, fill=output)
}
# Dict or other map type: parse recursively, and use keys as dataset names
try:
result = {}
for name, value in output.items():
result.update(parse_kernel_output(value, name, num_points))
return result
except AttributeError:
pass
# List or other sequence: parse recursively, and enumerate dataset names
try:
result = {}
for i, value in enumerate(output):
result.update(
parse_kernel_output(value, output_name + "_" + str(i + 1),
num_points))
return result
except AttributeError:
pass
raise ValueError(f"Unsupported kernel output type '{type(output)}' "
f"(named '{output_name}'). "
"See 'TransformVolumeData.parse_kernel_output' "
"for supported kernel output types.")


class Kernel:
def __init__(self,
callable,
Expand All @@ -239,11 +325,15 @@ def __init__(self,
(from 'spectre.DataStructures.Tensor') or a limited set of
structural information such as the mesh, coordinates, and Jacobians.
See the 'parse_kernel_arg' function for all supported argument
types.
types. The function should return a single tensor, a dictionary that
maps dataset names to tensors, or one of the other supported types
listed in the 'parse_kernel_output' function.
map_input_names: A map of argument names to dataset names (optional).
By default the argument name is transformed to CamelCase.
output_name: Name of the output dataset (optional). By default the
function name is transformed to CamelCase.
function name is transformed to CamelCase. Output names for multiple
datasets can be specified by returning a 'Dict[str, Tensor]' from
the 'callable'.
elementwise: Call this kernel for each element. The default is to
call the kernel with all data in the volume file at once, unless
element-specific data such as a Mesh or Jacobian is requested.
Expand Down Expand Up @@ -293,20 +383,15 @@ def __init__(self,
# Use provided output name, or transform function name to CamelCase
self.output_name = (output_name
or snake_case_to_camel_case(callable.__name__))
self.output_type = signature.return_annotation
try:
self.output_type.rank
except AttributeError:
raise ValueError(
"The return annotation must be a tensor type in the "
f"kernel function '{callable}'. Instead, it is "
f"'{self.output_type}'.")

def __call__(self, all_tensor_data: Dict[str, Tensor],
element: Optional[Element]):
element: Optional[Element]) -> Dict[str, Tensor]:
output = self.callable(*(arg.get(all_tensor_data, element)
for arg in self.args))
return output
# Get the number of grid points from the element or from any tensor
num_points = (element.mesh.number_of_grid_points() if element else len(
next(iter(all_tensor_data.values()))[0]))
return parse_kernel_output(output, self.output_name, num_points)


def transform_volume_data(
Expand Down Expand Up @@ -351,15 +436,13 @@ def transform_volume_data(
else:
all_tensors[tensor_name] = tensor_arg
logger.debug("Input datasets: " + str(list(all_tensors.keys())))
# Logging output datasets as INFO when they are written back into the volume
# files so the user can find them
logger.log(
logging.DEBUG if integrate else logging.INFO,
"Output datasets: " + str([kernel.output_name for kernel in kernels]))

# We collect integrals in this dict and return it
if integrate:
# We collect integrals in this dict and return it
integrals: Dict[str, Sequence[float]] = {}
else:
# We collect output dataset names for logging so the user can find them
output_names = set()

if isinstance(volfiles, spectre_h5.H5Vol):
volfiles = [volfiles]
Expand Down Expand Up @@ -398,44 +481,65 @@ def transform_volume_data(
element.det_jacobian.get(), element.mesh)
# Integrate kernels
for kernel in kernels:
transformed_tensor = kernel(all_tensor_data, element)
for i, component in enumerate(transformed_tensor):
component_integral = integrals.setdefault(
kernel.output_name +
transformed_tensor.component_suffix(i),
np.zeros(num_obs))
component_integral[i_obs] += definite_integral(
element.det_jacobian.get() * component,
element.mesh)
transformed_tensors = kernel(all_tensor_data, element)
for output_name, transformed_tensor in (
transformed_tensors.items()):
for i, component in enumerate(transformed_tensor):
component_integral = integrals.setdefault(
output_name +
transformed_tensor.component_suffix(i),
np.zeros(num_obs))
component_integral[i_obs] += definite_integral(
element.det_jacobian.get() * component,
element.mesh)
continue

# Apply kernels
for kernel in kernels:
# Elementwise kernels need to slice the tensor data into
# elements, and reassemble the result into a contiguous dataset
if kernel.elementwise:
transformed_tensor_data = np.zeros(
(kernel.output_type.size, total_num_points))
transformed_tensors_data: Dict[str,
Tuple[np.ndarray,
Type[Tensor]]] = {}
for element in iter_elements(volfile, obs_id):
transformed_tensor = kernel(all_tensor_data, element)
for i, component in enumerate(transformed_tensor):
transformed_tensor_data[
i, element.data_slice] = component
transformed_tensor = kernel.output_type(
transformed_tensor_data, copy=False)
transformed_tensors = kernel(all_tensor_data, element)
for output_name, transformed_tensor in (
transformed_tensors.items()):
transformed_tensor_data = (
transformed_tensors_data.setdefault(
output_name,
(np.zeros((transformed_tensor.size,
total_num_points)),
type(transformed_tensor))))[0]
for i, component in enumerate(transformed_tensor):
transformed_tensor_data[
i, element.data_slice] = component
transformed_tensors = {
output_name: tensor_type(transformed_tensor_data,
copy=False)
for output_name, (
transformed_tensor_data,
tensor_type) in transformed_tensors_data.items()
}
else:
transformed_tensor = kernel(all_tensor_data, None)
transformed_tensors = kernel(all_tensor_data, None)

# Write result back into volfile
for i, component in enumerate(transformed_tensor):
volfile.write_tensor_component(
obs_id,
component_name=(
kernel.output_name +
transformed_tensor.component_suffix(i)),
contiguous_tensor_data=component)
for output_name, transformed_tensor in (
transformed_tensors.items()):
output_names.add(output_name)
for i, component in enumerate(transformed_tensor):
volfile.write_tensor_component(
obs_id,
component_name=(
output_name +
transformed_tensor.component_suffix(i)),
contiguous_tensor_data=component)
if integrate:
return integrals
else:
logger.info(f"Output datasets: {output_names}")


def parse_input_names(ctx, param, all_values):
Expand Down Expand Up @@ -538,7 +642,8 @@ def shift_magnitude(
Any pybind11 binding of a C++ function will also work, as long as it takes
only supported types as arguments. Supported types are tensors, as well
structural information such as the mesh, coordinates, and Jacobians. See the
'parse_kernel_arg' function for all supported argument types.
'parse_kernel_arg' function for all supported argument types, and
'parse_kernel_output' for all supported return types.
The kernels can be loaded from any available Python module, such as
'spectre.PointwiseFunctions'. You can also execute a Python file that
Expand Down
27 changes: 21 additions & 6 deletions tests/Unit/Visualization/Python/Test_TransformVolumeData.py
Expand Up @@ -31,10 +31,9 @@ def psi_squared(psi: Scalar[DataVector]) -> Scalar[DataVector]:
return Scalar[DataVector](np.array(psi)**2)


def coordinate_radius(
inertial_coordinates: tnsr.I[DataVector, 3]) -> Scalar[DataVector]:
return Scalar[DataVector](np.linalg.norm(np.array(inertial_coordinates),
axis=0))
def coordinate_radius(inertial_coordinates: tnsr.I[DataVector, 3]):
# Return a Numpy array
return np.linalg.norm(np.array(inertial_coordinates), axis=0)


def deriv_coords(
Expand All @@ -52,8 +51,18 @@ def sinusoid(x: tnsr.I[DataVector, 3]) -> Scalar[DataVector]:
axis=0))


def square_component(component: DataVector) -> Scalar[DataVector]:
return Scalar[DataVector]([component**2])
def square_component(component: DataVector):
# Return a DataVector
return component**2


def abs_and_max(component: DataVector):
# Return multiple datasets
return {
"Abs": component.abs(),
# Single number, should get expanded over the volume as constant scalar
"Max": component.max(),
}


class TestTransformVolumeData(unittest.TestCase):
Expand Down Expand Up @@ -92,6 +101,7 @@ def test_transform_volume_data(self):
Kernel(deriv_coords),
Kernel(square_component,
map_input_names={"component": "InertialCoordinates_x"}),
Kernel(abs_and_max, map_input_names={"component": "Psi"}),
]

transform_volume_data(volfiles=open_volfiles, kernels=kernels)
Expand Down Expand Up @@ -124,6 +134,11 @@ def test_transform_volume_data(self):
result_square_component = open_volfiles[0].get_tensor_component(
obs_id, "SquareComponent").data
npt.assert_allclose(np.array(result_square_component), x**2)
result_abs = open_volfiles[0].get_tensor_component(obs_id, "Abs").data
npt.assert_allclose(np.array(result_abs), np.abs(np.array(psi)))
result_max = open_volfiles[0].get_tensor_component(obs_id, "Max").data
npt.assert_allclose(np.array(result_max),
np.ones(len(radius)) * np.max(np.array(psi)))

def test_integrate(self):
open_h5_files = [spectre_h5.H5File(self.h5_filename, "a")]
Expand Down

0 comments on commit 2b38b51

Please sign in to comment.