diff --git a/src/Visualization/Python/TransformVolumeData.py b/src/Visualization/Python/TransformVolumeData.py index a8848196981d..5e237f0b090a 100644 --- a/src/Visualization/Python/TransformVolumeData.py +++ b/src/Visualization/Python/TransformVolumeData.py @@ -7,7 +7,7 @@ import re from dataclasses import dataclass from pydoc import locate -from typing import Dict, Iterable, List, Optional, Sequence, Type, Union +from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Type, Union import click import numpy as np @@ -226,6 +226,92 @@ def one_index_constraint( arg.name, snake_case_to_camel_case(arg.name))) +def parse_kernel_output(output, output_name: str, + num_points: int) -> Dict[str, Tensor]: + """Transform the return value of a kernel to a dict of tensor datasets + + The following return values of kernels are supported: + + - Any Tensor. By default, the name of the kernel function transformed to + CamelCase will be used as dataset name. + - A DataVector: will be treated as a scalar. + - A Numpy array: will be treated as a scalar or vector. + - An int or float: will be expanded over the grid as a constant scalar. + - A dict of the above: can be used to return multiple datasets, and to + assign names to them. The keys of the dictionary are used as dataset + names. + - A list of the above: can be used to visualize output from C++ bindings + that return a `std::array` or `std::vector`. For example, a C++ binding + named `truncation_error` that returns 3 numbers (one per dimension) per + element can be visualized with this. The three numbers get expanded as + constants over the volume of the element and returned as three datasets + named 'TruncationError(_1,_2,_3)'. + + Arguments: + output: The return value of a kernel function. + output_name: The dataset name. Unused if 'output' is a dict. + num_points: Number of grid points. + + Returns: A mapping from dataset names to tensors. + """ + # Single tensor: return directly + try: + output.rank + return {output_name: output} + except AttributeError: + pass + # DataVector: parse as scalar + if isinstance(output, DataVector): + assert len(output) == num_points, ( + f"DataVector output named '{output_name}' has size " + f"{len(output)}, but expected size {num_points}.") + return {output_name: Scalar[DataVector]([output])} + # Numpy array: parse as scalar or vector + if isinstance(output, np.ndarray): + assert output.shape[-1] == num_points, ( + f"Array output named '{output_name}' has shape {output.shape}, " + f"but expected {num_points} points in last dimension.") + if output.ndim == 1 or (output.ndim == 2 and len(output) == 1): + return {output_name: Scalar[DataVector](output)} + elif output.ndim == 2 and len(output) in [1, 2, 3]: + dim = len(output) + return {output_name: tnsr.I[DataVector, dim](output)} + else: + raise ValueError( + "Kernels may only return Numpy arrays representing " + "scalars or vectors. For higher-rank tensors, construct and " + "return the Tensor. " + f"Kernel output named '{output_name}' has shape " + f"{output.shape}.") + # Single number: parse as constant scalar + if isinstance(output, (int, float)): + return { + output_name: Scalar[DataVector](num_points=num_points, fill=output) + } + # Dict or other map type: parse recursively, and use keys as dataset names + try: + result = {} + for name, value in output.items(): + result.update(parse_kernel_output(value, name, num_points)) + return result + except AttributeError: + pass + # List or other sequence: parse recursively, and enumerate dataset names + try: + result = {} + for i, value in enumerate(output): + result.update( + parse_kernel_output(value, output_name + "_" + str(i + 1), + num_points)) + return result + except AttributeError: + pass + raise ValueError(f"Unsupported kernel output type '{type(output)}' " + f"(named '{output_name}'). " + "See 'TransformVolumeData.parse_kernel_output' " + "for supported kernel output types.") + + class Kernel: def __init__(self, callable, @@ -239,11 +325,15 @@ def __init__(self, (from 'spectre.DataStructures.Tensor') or a limited set of structural information such as the mesh, coordinates, and Jacobians. See the 'parse_kernel_arg' function for all supported argument - types. + types. The function should return a single tensor, a dictionary that + maps dataset names to tensors, or one of the other supported types + listed in the 'parse_kernel_output' function. map_input_names: A map of argument names to dataset names (optional). By default the argument name is transformed to CamelCase. output_name: Name of the output dataset (optional). By default the - function name is transformed to CamelCase. + function name is transformed to CamelCase. Output names for multiple + datasets can be specified by returning a 'Dict[str, Tensor]' from + the 'callable'. elementwise: Call this kernel for each element. The default is to call the kernel with all data in the volume file at once, unless element-specific data such as a Mesh or Jacobian is requested. @@ -293,20 +383,15 @@ def __init__(self, # Use provided output name, or transform function name to CamelCase self.output_name = (output_name or snake_case_to_camel_case(callable.__name__)) - self.output_type = signature.return_annotation - try: - self.output_type.rank - except AttributeError: - raise ValueError( - "The return annotation must be a tensor type in the " - f"kernel function '{callable}'. Instead, it is " - f"'{self.output_type}'.") def __call__(self, all_tensor_data: Dict[str, Tensor], - element: Optional[Element]): + element: Optional[Element]) -> Dict[str, Tensor]: output = self.callable(*(arg.get(all_tensor_data, element) for arg in self.args)) - return output + # Get the number of grid points from the element or from any tensor + num_points = (element.mesh.number_of_grid_points() if element else len( + next(iter(all_tensor_data.values()))[0])) + return parse_kernel_output(output, self.output_name, num_points) def transform_volume_data( @@ -351,15 +436,13 @@ def transform_volume_data( else: all_tensors[tensor_name] = tensor_arg logger.debug("Input datasets: " + str(list(all_tensors.keys()))) - # Logging output datasets as INFO when they are written back into the volume - # files so the user can find them - logger.log( - logging.DEBUG if integrate else logging.INFO, - "Output datasets: " + str([kernel.output_name for kernel in kernels])) - # We collect integrals in this dict and return it if integrate: + # We collect integrals in this dict and return it integrals: Dict[str, Sequence[float]] = {} + else: + # We collect output dataset names for logging so the user can find them + output_names = set() if isinstance(volfiles, spectre_h5.H5Vol): volfiles = [volfiles] @@ -398,15 +481,17 @@ def transform_volume_data( element.det_jacobian.get(), element.mesh) # Integrate kernels for kernel in kernels: - transformed_tensor = kernel(all_tensor_data, element) - for i, component in enumerate(transformed_tensor): - component_integral = integrals.setdefault( - kernel.output_name + - transformed_tensor.component_suffix(i), - np.zeros(num_obs)) - component_integral[i_obs] += definite_integral( - element.det_jacobian.get() * component, - element.mesh) + transformed_tensors = kernel(all_tensor_data, element) + for output_name, transformed_tensor in ( + transformed_tensors.items()): + for i, component in enumerate(transformed_tensor): + component_integral = integrals.setdefault( + output_name + + transformed_tensor.component_suffix(i), + np.zeros(num_obs)) + component_integral[i_obs] += definite_integral( + element.det_jacobian.get() * component, + element.mesh) continue # Apply kernels @@ -414,28 +499,47 @@ def transform_volume_data( # Elementwise kernels need to slice the tensor data into # elements, and reassemble the result into a contiguous dataset if kernel.elementwise: - transformed_tensor_data = np.zeros( - (kernel.output_type.size, total_num_points)) + transformed_tensors_data: Dict[str, + Tuple[np.ndarray, + Type[Tensor]]] = {} for element in iter_elements(volfile, obs_id): - transformed_tensor = kernel(all_tensor_data, element) - for i, component in enumerate(transformed_tensor): - transformed_tensor_data[ - i, element.data_slice] = component - transformed_tensor = kernel.output_type( - transformed_tensor_data, copy=False) + transformed_tensors = kernel(all_tensor_data, element) + for output_name, transformed_tensor in ( + transformed_tensors.items()): + transformed_tensor_data = ( + transformed_tensors_data.setdefault( + output_name, + (np.zeros((transformed_tensor.size, + total_num_points)), + type(transformed_tensor))))[0] + for i, component in enumerate(transformed_tensor): + transformed_tensor_data[ + i, element.data_slice] = component + transformed_tensors = { + output_name: tensor_type(transformed_tensor_data, + copy=False) + for output_name, ( + transformed_tensor_data, + tensor_type) in transformed_tensors_data.items() + } else: - transformed_tensor = kernel(all_tensor_data, None) + transformed_tensors = kernel(all_tensor_data, None) # Write result back into volfile - for i, component in enumerate(transformed_tensor): - volfile.write_tensor_component( - obs_id, - component_name=( - kernel.output_name + - transformed_tensor.component_suffix(i)), - contiguous_tensor_data=component) + for output_name, transformed_tensor in ( + transformed_tensors.items()): + output_names.add(output_name) + for i, component in enumerate(transformed_tensor): + volfile.write_tensor_component( + obs_id, + component_name=( + output_name + + transformed_tensor.component_suffix(i)), + contiguous_tensor_data=component) if integrate: return integrals + else: + logger.info(f"Output datasets: {output_names}") def parse_input_names(ctx, param, all_values): @@ -538,7 +642,8 @@ def shift_magnitude( Any pybind11 binding of a C++ function will also work, as long as it takes only supported types as arguments. Supported types are tensors, as well structural information such as the mesh, coordinates, and Jacobians. See the - 'parse_kernel_arg' function for all supported argument types. + 'parse_kernel_arg' function for all supported argument types, and + 'parse_kernel_output' for all supported return types. The kernels can be loaded from any available Python module, such as 'spectre.PointwiseFunctions'. You can also execute a Python file that diff --git a/tests/Unit/Visualization/Python/Test_TransformVolumeData.py b/tests/Unit/Visualization/Python/Test_TransformVolumeData.py index 0dfe0e72b16b..453bed5f989b 100644 --- a/tests/Unit/Visualization/Python/Test_TransformVolumeData.py +++ b/tests/Unit/Visualization/Python/Test_TransformVolumeData.py @@ -31,10 +31,9 @@ def psi_squared(psi: Scalar[DataVector]) -> Scalar[DataVector]: return Scalar[DataVector](np.array(psi)**2) -def coordinate_radius( - inertial_coordinates: tnsr.I[DataVector, 3]) -> Scalar[DataVector]: - return Scalar[DataVector](np.linalg.norm(np.array(inertial_coordinates), - axis=0)) +def coordinate_radius(inertial_coordinates: tnsr.I[DataVector, 3]): + # Return a Numpy array + return np.linalg.norm(np.array(inertial_coordinates), axis=0) def deriv_coords( @@ -52,8 +51,18 @@ def sinusoid(x: tnsr.I[DataVector, 3]) -> Scalar[DataVector]: axis=0)) -def square_component(component: DataVector) -> Scalar[DataVector]: - return Scalar[DataVector]([component**2]) +def square_component(component: DataVector): + # Return a DataVector + return component**2 + + +def abs_and_max(component: DataVector): + # Return multiple datasets + return { + "Abs": component.abs(), + # Single number, should get expanded over the volume as constant scalar + "Max": component.max(), + } class TestTransformVolumeData(unittest.TestCase): @@ -92,6 +101,7 @@ def test_transform_volume_data(self): Kernel(deriv_coords), Kernel(square_component, map_input_names={"component": "InertialCoordinates_x"}), + Kernel(abs_and_max, map_input_names={"component": "Psi"}), ] transform_volume_data(volfiles=open_volfiles, kernels=kernels) @@ -124,6 +134,11 @@ def test_transform_volume_data(self): result_square_component = open_volfiles[0].get_tensor_component( obs_id, "SquareComponent").data npt.assert_allclose(np.array(result_square_component), x**2) + result_abs = open_volfiles[0].get_tensor_component(obs_id, "Abs").data + npt.assert_allclose(np.array(result_abs), np.abs(np.array(psi))) + result_max = open_volfiles[0].get_tensor_component(obs_id, "Max").data + npt.assert_allclose(np.array(result_max), + np.ones(len(radius)) * np.max(np.array(psi))) def test_integrate(self): open_h5_files = [spectre_h5.H5File(self.h5_filename, "a")]