diff --git a/kwave/kmedium.py b/kwave/kmedium.py index 8f554db7..7b620886 100644 --- a/kwave/kmedium.py +++ b/kwave/kmedium.py @@ -1,10 +1,9 @@ import logging from dataclasses import dataclass -from typing import List, Optional, Union +from typing import List, Optional, Sequence, Union import numpy as np -import kwave.utils.checks from kwave.enums import AlphaMode @@ -16,49 +15,51 @@ def _to_alpha_mode(value): return AlphaMode(value) except (ValueError, TypeError): raise ValueError( - f"medium.alpha_mode must be an AlphaMode enum value or one of " f"'no_absorption', 'no_dispersion', 'stokes', got {value!r}" + f"medium.alpha_mode must be an AlphaMode enum value or one of 'no_absorption', 'no_dispersion', 'stokes', got {value!r}" ) from None @dataclass class kWaveMedium(object): + """ + Medium properties for k-Wave simulations. + + Note: For heterogeneous medium parameters, medium.sound_speed and medium.density + must be given in matrix form with the same dimensions as kgrid. For homogeneous + medium parameters, these can be given as single numeric values. If the medium is + homogeneous and velocity inputs or outputs are not required, it is not necessary + to specify medium.density. + """ + # sound speed distribution within the acoustic medium [m/s] | required to be defined - sound_speed: np.array + sound_speed: Union[float, int, np.ndarray] # reference sound speed used within the k-space operator (phase correction term) [m/s] - sound_speed_ref: np.array = None + sound_speed_ref: Optional[Union[float, int, np.ndarray]] = None # density distribution within the acoustic medium [kg/m^3] - density: np.array = None + density: Optional[Union[float, int, np.ndarray]] = None # power law absorption coefficient [dB/(MHz^y cm)] - alpha_coeff: np.array = None + alpha_coeff: Optional[Union[float, int, np.ndarray]] = None # power law absorption exponent - alpha_power: np.array = None + alpha_power: Optional[Union[float, int, np.ndarray]] = None # optional input to force either the absorption or dispersion terms in the equation of state to be excluded; # valid inputs are AlphaMode.NO_ABSORPTION, AlphaMode.NO_DISPERSION, or the equivalent strings alpha_mode: Optional[Union[AlphaMode, str]] = None # frequency domain filter applied to the absorption and dispersion terms in the equation of state - alpha_filter: np.array = None + alpha_filter: Optional[np.ndarray] = None # two element array used to control the sign of absorption and dispersion terms in the equation of state - alpha_sign: np.array = None + alpha_sign: Optional[np.ndarray] = None # parameter of nonlinearity - BonA: np.array = None + BonA: Optional[Union[float, int, np.ndarray]] = None # is the medium absorbing? absorbing: bool = False # is the medium absorbing stokes? stokes: bool = False - # """ - # Note: For heterogeneous medium parameters, medium.sound_speed and - # medium.density must be given in matrix form with the same dimensions as - # kgrid. For homogeneous medium parameters, these can be given as single - # numeric values. If the medium is homogeneous and velocity inputs or - # outputs are not required, it is not necessary to specify medium.density. - # """ - def __post_init__(self): self.sound_speed = np.atleast_1d(self.sound_speed) self.alpha_mode = _to_alpha_mode(self.alpha_mode) - def check_fields(self, kgrid_shape: np.ndarray) -> None: + def check_fields(self, kgrid_shape: Sequence[int]) -> None: """ Check whether the given properties are valid @@ -72,17 +73,17 @@ def check_fields(self, kgrid_shape: np.ndarray) -> None: self.alpha_mode = _to_alpha_mode(self.alpha_mode) # check the absorption filter input is valid - if self.alpha_filter is not None and not (self.alpha_filter.shape == kgrid_shape).all(): + if self.alpha_filter is not None and self.alpha_filter.shape != tuple(kgrid_shape): raise ValueError("medium.alpha_filter must be the same size as the computational grid.") # check the absorption sign input is valid - if self.alpha_sign is not None and (not kwave.utils.checkutils.is_number(self.alpha_sign) or (self.alpha_sign.size != 2)): - raise ValueError( - "medium.alpha_sign must be given as a " "2 element numerical array controlling absorption and dispersion, respectively." - ) + if self.alpha_sign is not None: + alpha_sign_arr = np.atleast_1d(self.alpha_sign) + if alpha_sign_arr.size != 2 or not np.issubdtype(alpha_sign_arr.dtype, np.number): + raise ValueError("medium.alpha_sign must be a 2 element numeric array controlling absorption and dispersion, respectively.") # check alpha_coeff is non-negative and real - if not np.all(np.isreal(self.alpha_coeff)) or np.any(self.alpha_coeff < 0): + if self.alpha_coeff is not None and (not np.all(np.isreal(self.alpha_coeff)) or np.any(self.alpha_coeff < 0)): raise ValueError("medium.alpha_coeff must be non-negative and real.") def is_defined(self, *fields) -> List[bool]: @@ -111,7 +112,7 @@ def ensure_defined(self, *fields) -> None: None """ for f in fields: - assert getattr(self, f) is not None, f"The field {f} must be not be None" + assert getattr(self, f) is not None, f"The field {f} must not be None" def is_nonlinear(self) -> bool: """ @@ -150,13 +151,14 @@ def _check_absorbing_without_stokes(self) -> None: # enforce both absorption parameters self.ensure_defined("alpha_coeff", "alpha_power") - # check y is a scalar - assert np.isscalar(self.alpha_power), "medium.alpha_power must be scalar." + # check y is a scalar (np.isscalar rejects 0-d ndarrays — accept np.array(1.5) too) + is_scalar = np.isscalar(self.alpha_power) or (isinstance(self.alpha_power, np.ndarray) and self.alpha_power.size == 1) + assert is_scalar, "medium.alpha_power must be scalar." # check y is real and within 0 to 3 - assert ( - np.all(np.isreal(self.alpha_coeff)) and 0 <= self.alpha_power < 3 - ), "medium.alpha_power must be a real number between 0 and 3." + assert np.all(np.isreal(self.alpha_coeff)) and 0 <= self.alpha_power < 3, ( + "medium.alpha_power must be a real number between 0 and 3." + ) # display warning if y is close to 1 and the dispersion term has not been set to zero if self.alpha_mode != "no_dispersion": @@ -176,8 +178,10 @@ def _check_absorbing_with_stokes(self): self.ensure_defined("alpha_coeff") # give warning if y is specified - if self.alpha_power is not None and (self.alpha_power.size != 1 or self.alpha_power != 2): - logging.log(logging.WARN, "the axisymmetric code and stokes absorption assume alpha_power = 2, user value ignored.") + if self.alpha_power is not None: + ap = np.asarray(self.alpha_power) + if ap.size != 1 or not np.isclose(ap.item(), 2.0): + logging.warning("the axisymmetric code and stokes absorption assume alpha_power = 2, user value ignored.") # overwrite y value self.alpha_power = 2 @@ -185,12 +189,12 @@ def _check_absorbing_with_stokes(self): # don't allow medium.alpha_mode with the axisymmetric code if self.alpha_mode is not None and (self.alpha_mode in ["no_absorption", "no_dispersion"]): raise NotImplementedError( - "Input option medium.alpha_mode is not supported with the axisymmetric code " "or medium.alpha_mode = " "stokes" "." + "Input option medium.alpha_mode is not supported with the axisymmetric code or medium.alpha_mode = stokes." ) # don't allow alpha_filter with stokes absorption (no variables are applied in k-space) assert self.alpha_filter is None, ( - "Input option medium.alpha_filter is not supported with the axisymmetric code " "or medium.alpha_mode = 'stokes'. " + "Input option medium.alpha_filter is not supported with the axisymmetric code or medium.alpha_mode = 'stokes'. " ) ########################################## diff --git a/tests/test_kmedium.py b/tests/test_kmedium.py index cbfdff60..959212f5 100644 --- a/tests/test_kmedium.py +++ b/tests/test_kmedium.py @@ -1,8 +1,13 @@ -"""Tests for kWaveMedium alpha_mode normalization and validation.""" +"""Tests for kWaveMedium: alpha_mode normalization and broader coverage.""" + +from unittest.mock import patch + import numpy as np import pytest +from kwave.data import Vector from kwave.enums import AlphaMode +from kwave.kgrid import kWaveGrid from kwave.kmedium import kWaveMedium @@ -44,3 +49,242 @@ def test_string_comparison_still_works(self): m = kWaveMedium(sound_speed=1500, alpha_mode="no_dispersion") assert m.alpha_mode == "no_dispersion" assert m.alpha_mode in ["no_absorption", "no_dispersion"] + + +def test_elastic_properties_access(): + """Test access to elastic code related properties (should raise NotImplementedError)""" + medium = kWaveMedium(sound_speed=1500) + + with pytest.raises(NotImplementedError, match="Elastic simulation"): + _ = medium.sound_speed_shear + + with pytest.raises(NotImplementedError, match="Elastic simulation"): + _ = medium.sound_speed_ref_shear + + with pytest.raises(NotImplementedError, match="Elastic simulation"): + _ = medium.sound_speed_compression + + with pytest.raises(NotImplementedError, match="Elastic simulation"): + _ = medium.sound_speed_ref_compression + + with pytest.raises(NotImplementedError, match="Elastic simulation"): + _ = medium.alpha_coeff_compression + + with pytest.raises(NotImplementedError, match="Elastic simulation"): + _ = medium.alpha_coeff_shear + + +def test_is_defined_method(): + """Test is_defined method with various scenarios""" + medium = kWaveMedium(sound_speed=1500, density=1000, alpha_coeff=0.75) + + # Test single field + assert medium.is_defined("sound_speed") == [True] + assert medium.is_defined("density") == [True] + assert medium.is_defined("alpha_coeff") == [True] + assert medium.is_defined("alpha_power") == [False] + assert medium.is_defined("BonA") == [False] + + # Test multiple fields + result = medium.is_defined("sound_speed", "density", "alpha_power", "BonA") + assert result == [True, True, False, False] + + +def test_ensure_defined_method(): + """Test ensure_defined method""" + medium = kWaveMedium(sound_speed=1500, density=1000) + + # Test defined fields + medium.ensure_defined("sound_speed", "density") # Should not raise exception + + # Test undefined fields + with pytest.raises(AssertionError, match="alpha_coeff must not be None"): + medium.ensure_defined("alpha_coeff") + + with pytest.raises(AssertionError, match="alpha_power must not be None"): + medium.ensure_defined("alpha_power") + + +def test_is_nonlinear_method(): + """Test is_nonlinear method""" + # Linear medium + medium1 = kWaveMedium(sound_speed=1500) + assert not medium1.is_nonlinear() + + # Nonlinear medium + medium2 = kWaveMedium(sound_speed=1500, BonA=6.0) + assert medium2.is_nonlinear() + + +def test_stokes_mode_alpha_power_none(): + """Test Stokes mode when alpha_power is None""" + medium = kWaveMedium(sound_speed=1500, alpha_coeff=0.75) + + # When alpha_power is None, setting Stokes mode should set it to 2 + medium.set_absorbing(is_absorbing=True, is_stokes=True) + assert medium.alpha_power == 2 + + +def test_stokes_mode_alpha_power_array(): + """Test Stokes mode when alpha_power is an array""" + # Test multi-element array + medium1 = kWaveMedium(sound_speed=1500, alpha_coeff=0.75, alpha_power=np.array([1.5, 1.8])) + + with patch("logging.warning") as mock_warning: + medium1.set_absorbing(is_absorbing=True, is_stokes=True) + mock_warning.assert_called_once() + assert "alpha_power = 2" in mock_warning.call_args[0][0] + + assert medium1.alpha_power == 2 + + +def test_absorbing_without_stokes_alpha_power_accepts_0d_ndarray(): + """The new type hint allows np.ndarray; a 0-d array must be accepted as scalar.""" + medium = kWaveMedium(sound_speed=1500, alpha_coeff=0.75, alpha_power=np.array(1.5)) + medium.set_absorbing(is_absorbing=True, is_stokes=False) + assert medium.absorbing + assert not medium.stokes + + +def test_absorbing_without_stokes_alpha_power_validation(): + """Test alpha_power validation in non-Stokes absorbing mode""" + # Test alpha_power must be scalar + medium1 = kWaveMedium(sound_speed=1500, alpha_coeff=0.75, alpha_power=np.array([1.5, 1.8])) + + with pytest.raises(AssertionError, match="must be scalar"): + medium1.set_absorbing(is_absorbing=True, is_stokes=False) + + # Test alpha_power must be in range 0-3 + medium2 = kWaveMedium(sound_speed=1500, alpha_coeff=0.75, alpha_power=-0.5) + + with pytest.raises(AssertionError, match="between 0 and 3"): + medium2.set_absorbing(is_absorbing=True, is_stokes=False) + + medium3 = kWaveMedium(sound_speed=1500, alpha_coeff=0.75, alpha_power=3.5) + + with pytest.raises(AssertionError, match="between 0 and 3"): + medium3.set_absorbing(is_absorbing=True, is_stokes=False) + + +def test_alpha_mode_validation_edge_cases(): + """Test alpha_mode validation edge cases""" + kgrid = kWaveGrid(Vector([64, 64]), Vector([0.1e-3, 0.1e-3])) + + # Test None value (should pass) + medium1 = kWaveMedium(sound_speed=1500, alpha_mode=None) + medium1.check_fields(kgrid.N) # Should not raise exception + + # Empty string is rejected at construction by _to_alpha_mode (added in #705) + with pytest.raises(ValueError, match="must be an AlphaMode"): + kWaveMedium(sound_speed=1500, alpha_mode="") + + +def test_alpha_filter_none(): + """Test when alpha_filter is None""" + kgrid = kWaveGrid(Vector([64, 64]), Vector([0.1e-3, 0.1e-3])) + + medium = kWaveMedium(sound_speed=1500, alpha_filter=None) + medium.check_fields(kgrid.N) # Should not raise exception + + +def test_alpha_sign_none(): + """Test when alpha_sign is None""" + kgrid = kWaveGrid(Vector([64, 64]), Vector([0.1e-3, 0.1e-3])) + + medium = kWaveMedium(sound_speed=1500, alpha_sign=None) + medium.check_fields(kgrid.N) # Should not raise exception + + +def test_alpha_sign_wrong_size_raises(): + kgrid = kWaveGrid(Vector([64, 64]), Vector([0.1e-3, 0.1e-3])) + medium = kWaveMedium(sound_speed=1500, alpha_sign=np.array([1.0])) + with pytest.raises(ValueError, match="2 element numeric array"): + medium.check_fields(kgrid.N) + + +def test_alpha_sign_non_numeric_raises(): + kgrid = kWaveGrid(Vector([64, 64]), Vector([0.1e-3, 0.1e-3])) + medium = kWaveMedium(sound_speed=1500, alpha_sign=np.array(["a", "b"], dtype=object)) + with pytest.raises(ValueError, match="2 element numeric array"): + medium.check_fields(kgrid.N) + + +def test_alpha_coeff_none(): + """Test when alpha_coeff is None""" + kgrid = kWaveGrid(Vector([64, 64]), Vector([0.1e-3, 0.1e-3])) + + medium = kWaveMedium(sound_speed=1500, alpha_coeff=None) + medium.check_fields(kgrid.N) # Should not raise exception + + +def test_alpha_coeff_array_validation(): + """Test alpha_coeff array validation""" + kgrid = kWaveGrid(Vector([64, 64]), Vector([0.1e-3, 0.1e-3])) + + # Valid array + medium1 = kWaveMedium(sound_speed=1500, alpha_coeff=np.array([0.5, 0.6, 0.7])) + medium1.check_fields(kgrid.N) # Should not raise exception + + # Array with negative values + medium2 = kWaveMedium(sound_speed=1500, alpha_coeff=np.array([0.5, -0.1, 0.7])) + with pytest.raises(ValueError, match="non-negative and real"): + medium2.check_fields(kgrid.N) + + # Array with complex values + medium3 = kWaveMedium(sound_speed=1500, alpha_coeff=np.array([0.5, 0.6 + 0.1j, 0.7])) + with pytest.raises(ValueError, match="non-negative and real"): + medium3.check_fields(kgrid.N) + + +def test_post_init_sound_speed_conversion(): + """Test sound_speed conversion in __post_init__""" + # Scalar input + medium1 = kWaveMedium(sound_speed=1500) + assert isinstance(medium1.sound_speed, np.ndarray) + assert medium1.sound_speed.shape == (1,) + + # Array input + medium2 = kWaveMedium(sound_speed=np.array([1500, 1600])) + assert isinstance(medium2.sound_speed, np.ndarray) + assert medium2.sound_speed.shape == (2,) + + +def test_stokes_mode_alpha_mode_restrictions(): + """Test alpha_mode restrictions in Stokes mode""" + # Test no_absorption mode + medium1 = kWaveMedium(sound_speed=1500, alpha_coeff=0.75, alpha_mode="no_absorption") + with pytest.raises(NotImplementedError, match="not supported with the axisymmetric code"): + medium1.set_absorbing(is_absorbing=True, is_stokes=True) + + # Test no_dispersion mode + medium2 = kWaveMedium(sound_speed=1500, alpha_coeff=0.75, alpha_mode="no_dispersion") + with pytest.raises(NotImplementedError, match="not supported with the axisymmetric code"): + medium2.set_absorbing(is_absorbing=True, is_stokes=True) + + +def test_absorbing_flags(): + """Test setting of absorbing and stokes flags""" + medium = kWaveMedium(sound_speed=1500, alpha_coeff=0.75, alpha_power=1.5) + + # Initial state + assert not medium.absorbing + assert not medium.stokes + + # Set to non-Stokes absorbing + medium.set_absorbing(is_absorbing=True, is_stokes=False) + assert medium.absorbing + assert not medium.stokes + + # Reset + medium.absorbing = False + medium.stokes = False + + # Set to Stokes absorbing + medium.set_absorbing(is_absorbing=True, is_stokes=True) + assert medium.absorbing + assert medium.stokes + + # Set to non-absorbing + medium.set_absorbing(is_absorbing=False, is_stokes=False) + assert not medium.absorbing + assert not medium.stokes