In [None]:
#| default_exp velocitygroups

# velocitygroups
> Create and manipulate a list of bins of the atomic velocity distribution for use in the LGS model

## Imports -

In [None]:
#| hide
from fastcore.test import test_close

In [None]:
#| export
from pylgs.imports import *
from pylgs.utilities.nbdev import DictTbl, AttributeTbl
from pylgs.utilities.testing import test_array
from pylgs.utilities.sparse import sparse_kronecker_matrix, sparse_toeplitz, sparse_identity, sparse_diag, sparse
from pylgs.utilities.numpy import sym_range
from pylgs.pymor.parameters import *
from pylgs.pymor.vectorarrays import *
from pylgs.pymor.operators import *
from pylgs.pymor.grids import *
from pymor.vectorarrays.interface import VectorArray
from pymor.operators.interface import Operator

In [None]:
#| hide
np.set_printoptions(formatter={'float': lambda x: f'{x:^ 8.2}' if x else f'{0:^ 8}'}, linewidth=140)

## Velocity groups -

In [None]:
#| export
class VelocityGroups(dict):
    """`VelocityGroups` contains informations about the centers, edges, and relative densities of a list of velocity group bins."""
    def __init__(self, 
                 bins:Iterable|int=6 # Iterable of velocity group edges or int specifying number of evenly spaced velocity groups
                ):
        super().__init__()
        if isinstance(bins, VelocityGroups):
            self.edges = bins.edges
            for k, v in bins.items():
                self[k] = v
            return
        self.edges = np.linspace(-3, 3, bins + 1) if isinstance(bins, int) else bins
        self['VGCenter'] = np.mean(np.array(list(zip(self.edges[:-1], self.edges[1:]))), axis=1)
        # self['VGBins'] = np.array(list(zip(bins[:-1], bins[1:])))
        # self['VGCenter'] = np.mean(self['VGBins'], axis=1)
        self['VGDensity'] = np.diff(erf(self.edges))/2
        self['VGWidth'] = np.diff(self.edges)
        # self['VGEdges'] = bins
        self['VGInverseWidth'] = 1/self['VGWidth']
        self['VGNumber'] = len(self['VGCenter'])
        self['velocity_groups'] = np.ones(self['VGDensity'].shape)

Create a list of two evenly spaced velocity groups:

In [None]:
vg = VelocityGroups(2)
vg

{'VGCenter': array([  -1.5  ,    1.5  ]),
 'VGDensity': array([   0.5  ,    0.5  ]),
 'VGWidth': array([   3.0  ,    3.0  ]),
 'VGInverseWidth': array([  0.33  ,   0.33  ]),
 'VGNumber': 2,
 'velocity_groups': array([   1.0  ,    1.0  ])}

### subdivide -


In [None]:
#| export
@patch
def subdivide(
    self:VelocityGroups, 
    indices:int|ndarray|slice=slice(None) # Indices, boolean array, or slice object specifying velocity groups to subdivide
):
    """Divide the velocity groups with indices `indices` in two."""
    return VelocityGroups(np.union1d(self.edges, self["VGCenter"][indices]))

Return a new `VelocityGroups` object with the groups at index 1 divided in two:

In [None]:
vg.subdivide([1])

{'VGCenter': array([  -1.5  ,   0.75  ,    2.2  ]),
 'VGDensity': array([   0.5  ,   0.48  ,   0.017 ]),
 'VGWidth': array([   3.0  ,    1.5  ,    1.5  ]),
 'VGInverseWidth': array([  0.33  ,   0.67  ,   0.67  ]),
 'VGNumber': 3,
 'velocity_groups': array([   1.0  ,    1.0  ,    1.0  ])}

### _velocity_space -


In [None]:
#| export
def _velocity_space(vg, ext=''):
    return XarrayVectorSpace({'Atomic velocity' + ext: vg['VGCenter']})

### _vg_identity -

In [None]:
#| export
def _vg_identity(vg):
    return XarrayMatrixOperator(
        DataArray(sparse_identity(vg['VGNumber']), coords=[('Atomic velocity (range)', vg['VGCenter']), ('Atomic velocity', vg['VGCenter'])])
    )

### _vg_diagonal -

In [None]:
#| export
def _vg_diagonal(vg, diags):
    return XarrayMatrixOperator(
        DataArray(sparse_diag(diags), coords=[('Atomic velocity (range)', vg['VGCenter']), ('Atomic velocity', vg['VGCenter'])])
    )

In [None]:
#| hide
ScaleOperator(vg['VGCenter'], space=_velocity_space(vg))

### identity -

In [None]:
#| export
@patch
def identity(self:VelocityGroups)->Operator:
    """The velocity-space identity operator."""
    # Want to use IdentityOperator for this, but first need to extend XarrayMatrixOperator so that source and range can be the same space. 
    # return IdentityOperator(_velocity_space(vg))
    return XarrayMatrixOperator(
            sparse_identity(self['VGNumber']), 
            range=_velocity_space(self, ' (range)'), 
            source=_velocity_space(self)
        )

In [None]:
vg.identity()

In [None]:
_.to_numpy()

array([[   1.0  ,     0   ],
       [    0   ,    1.0  ]])

In [None]:
#| hide
test_array('core', 'VelocityGroups_identity', _)

### velocity_diagonal -

In [None]:
#| export
@patch
def velocity_diagonal(self:VelocityGroups)->Operator:
    """A velocity-space operator with the velocity on the diagonal."""
    return _vg_diagonal(self, self['VGCenter'])

In [None]:
vg.velocity_diagonal()

In [None]:
_.to_numpy()

array([[  -1.5  ,     0   ],
       [    0   ,    1.5  ]])

In [None]:
#| hide
test_array('core', 'VelocityGroups_velocity_digonal', _)

### velocity_density_vector -

In [None]:
#| export
def velocity_density_vector(vg):
    return _velocity_space(vg).from_numpy(vg['VGDensity'])

### n_times_1 -

In [None]:
#| export
@patch
def n_times_1(self:VelocityGroups)->Operator:
    """Operator that sums over all velocity groups then scales by the Maxwell-Boltzmann distribution."""
    return XarrayMatrixOperator(
        sparse(self['VGDensity'][:, None] * np.ones((self['VGNumber']))),
        range=_velocity_space(self, ' (range)'), 
        source=_velocity_space(self)
    )

In [None]:
vg.n_times_1()

In [None]:
_.to_numpy()

array([[   0.5  ,    0.5  ],
       [   0.5  ,    0.5  ]])

In [None]:
#| hide
test_array('core', 'VelocityGroups_n_times_1', _)

In [None]:
#| hide
# This is really a product operator of velocity_sum followed by velocity_maxwell. Should refactor
test_close(_, (_velocity_space(vg, ' (range)').ones() * velocity_density_vector(vg)).array.data)

### drho_dv -

In [None]:
#| export
@patch
def drho_dv(self:VelocityGroups)->Operator:
    """Derivative with respect to velocity operator.$"""
    return XarrayMatrixOperator(
        self['VGInverseWidth'] * (sparse_kronecker_matrix(self['VGNumber'], 0) - sparse_kronecker_matrix(self['VGNumber'], -1)), 
        range=_velocity_space(self, ' (range)'), 
        source=_velocity_space(self)
    )

In [None]:
vg.drho_dv() 

In [None]:
_.to_numpy()

array([[  0.33  ,     0   ],
       [ -0.33  ,   0.33  ]])

In [None]:
#| hide
test_array('core', 'VelocityGroups_drho_dv', _)

In [None]:
#| hide
vg.drho_dv().matrix.sel({'Atomic velocity (range)': 1.5, 'Atomic velocity': -1.5}).data.item()

-0.3333333333333333

### _vg_da -


In [None]:
#| export
def _vg_da(a, vg):
    range = ('Atomic velocity (range)', vg['VGCenter']) if a.shape[0] == vg['VGNumber'] else ("none", ["none"])
    source = ('Atomic velocity', vg['VGCenter']) if a.shape[1] == vg['VGNumber'] else ("none", ["none"])
    return DataArray(a, coords=[range, source])

### normalize -


In [None]:
#| export
@patch
def normalize(self:VelocityGroups)->Operator:
    """Returns the operator that normalizes a vector by dividing each component by the width of the corresponding velocity group."""
    return ScaleOperator(DataArray(self['VGInverseWidth'], {'Atomic velocity': self['VGCenter']}))

In [None]:
vg.normalize()

### sum -


In [None]:
#| export
@patch
def sum(self:VelocityGroups)->Operator:
    """Returns the operator that sums a vector over velocity groups."""
    return SumOperator({'Atomic velocity': self['VGCenter']})

In [None]:
vg.sum()

## Export -

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()