Skip to content

Commit

Permalink
Merge pull request #220 from nlesc-nano/adf
Browse files Browse the repository at this point in the history
ENH: Add basic initial support for periodic ADF calculations
  • Loading branch information
BvB93 committed Feb 25, 2021
2 parents 986bb86 + 6b6edbe commit 40a1cce
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 148 deletions.
200 changes: 68 additions & 132 deletions FOX/classes/multi_mol.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import numpy as np
import pandas as pd
from scipy import constants
from scipy.spatial import cKDTree
from scipy.fftpack import fft
from scipy.spatial.distance import cdist

Expand All @@ -43,7 +42,7 @@
from ..io.read_kf import read_kf
from ..io.read_xyz import read_multi_xyz
from ..functions.rdf import get_rdf, get_rdf_df
from ..functions.adf import get_adf, get_adf_df
from ..functions.adf import get_adf_df, _adf_inner_cdktree, _adf_inner
from ..functions.molecule_utils import fix_bond_orders, separate_mod

if TYPE_CHECKING:
Expand All @@ -58,6 +57,7 @@
_warn.__cause__ = ex
warnings.warn(_warn)
del _warn
DASK_EX = Exception()

try:
from ase import Atoms
Expand Down Expand Up @@ -1628,17 +1628,26 @@ def _get_at_iterable(self, atom_subset: AtomSubset) -> Iterable[Tuple[Hashable,

"""############################ Angular Distribution Functions ##########################"""

def init_adf(self, mol_subset: MolSubset = None,
atom_subset: AtomSubset = None,
r_max: Union[float, str] = 8.0,
weight: Callable[[np.ndarray], np.ndarray] = neg_exp) -> pd.DataFrame:
def init_adf(
self,
mol_subset: MolSubset = None,
atom_subset: AtomSubset = None,
r_max: Union[float, str] = 8.0,
weight: Callable[[np.ndarray], np.ndarray] = neg_exp,
periodic: None | Iterable[Literal["x", "y", "z"]] = None,
) -> pd.DataFrame:
r"""Initialize the calculation of distance-weighted angular distribution functions (ADFs).
ADFs are calculated for all possible atom-pairs in **atom_subset** and returned as a
dataframe.
.. _DASK: https://dask.org/
Note
----
Periodic ADF calculations (see the **periodic** parameter) are currently
only supported for systems with cuboid latices and ``r_max != inf``.
Parameters
----------
mol_subset : :class:`slice`, optional
Expand All @@ -1659,6 +1668,10 @@ def init_adf(self, mol_subset: MolSubset = None,
Given an angle :math:`\phi_{ijk}`, to the distance :math:`r_{ijk}` is defined
as :math:`max[r_{ij}, r_{jk}]`.
Set to :data:`None` to disable distance weighting.
periodic : :class:`str`, optional
If specified, correct for the systems periodicity if
:attr:`self.lattice is not None <MultiMolecule.lattice>`.
Accepts ``"x"``, ``"y"`` and/or ``"z"``.
Returns
-------
Expand Down Expand Up @@ -1701,143 +1714,66 @@ def init_adf(self, mol_subset: MolSubset = None,
v_new.append(bool_ar)
atom_pairs[k] = v_new

# Periodic calculations
if periodic is not None:
if not r_max:
raise NotImplementedError(
"periodic calculations are not supported for `r_max=inf`"
)

# Validate the parameters
periodic_set = {i.lower() for i in periodic}
lattice = self.lattice
if not periodic_set.issubset("xyz"):
raise ValueError("periodic expected `x`, `y` and/or `z`; "
f"observed value: {periodic!r}")
elif lattice is None:
raise TypeError("cannot perform periodic calculations if the "
"molecules `lattice` is None")

# Scipy's `cKDTree` only supports cuboid lattices
is0 = np.abs(lattice - 0) < 1e-8
if not (np.count_nonzero(is0, axis=-1) == 2).all():
raise NotImplementedError("non-cuboid lattices are not supported")

# Set the vector-length of all absent axes to `inf`
xyz_dct = {"x": 0, "y": 1, "z": 2}
slc = [xyz_dct[i] for i in sorted(xyz_dct.keys() - periodic_set)]
lattice[..., slc, :] = np.inf

# Perform a translation to remove negative elements, as `cKDTree` cannot
# handle the latter if `boxsize` is specified
mol -= mol.min(axis=1)[..., None, :]
if lattice.ndim == 2:
boxsize_iter = repeat(np.linalg.norm(lattice, axis=-1))
else:
boxsize_iter = (i for i in np.linalg.norm(lattice, axis=-1))
else:
boxsize_iter = repeat(None)

# Construct the angular distribution function
# Perform the task in parallel (with dask) if possible
if not DASK_EX and r_max_:
func = dask.delayed(MultiMolecule._adf_inner_cdktree)
jobs = [func(m, n, r_max_, atom_pairs.values(), weight) for m in mol]
if DASK_EX is None and r_max_:
func = dask.delayed(_adf_inner_cdktree)
jobs = [func(m, n, r_max_, atom_pairs.values(), b, weight) for m, b in
zip(mol, boxsize_iter)]
results = dask.compute(*jobs)
elif not DASK_EX and not r_max_:
func = dask.delayed(MultiMolecule._adf_inner)
elif DASK_EX is None and not r_max_:
func = dask.delayed(_adf_inner)
jobs = [func(m, atom_pairs.values(), weight) for m in mol]
results = dask.compute(*jobs)
elif DASK_EX and r_max_:
func = MultiMolecule._adf_inner_cdktree
results = [func(m, n, r_max_, atom_pairs.values(), weight) for m in mol]
elif DASK_EX and not r_max_:
func = MultiMolecule._adf_inner
elif DASK_EX is not None and r_max_:
func = _adf_inner_cdktree
results = [func(m, n, r_max_, atom_pairs.values(), b, weight) for m, b in
zip(mol, boxsize_iter)]
elif DASK_EX is not None and not r_max_:
func = _adf_inner
results = [func(m, atom_pairs.values(), weight) for m in mol]

df = get_adf_df(atom_pairs)
df.loc[:, :] = np.array(results).mean(axis=0).T
return df

@staticmethod
def _adf_inner_cdktree(m: MultiMolecule, n: int, r_max: float,
idx_list: Iterable[Tuple[np.ndarray, np.ndarray, np.ndarray]],
weight: Callable[[np.ndarray], np.ndarray]) -> List[np.ndarray]:
"""Perform the loop of :meth:`.init_adf` with a distance cutoff."""
# Construct slices and a distance matrix
tree = cKDTree(m)
dist, idx = tree.query(m, n, distance_upper_bound=r_max, p=2)
dist[dist == np.inf] = 0.0
idx[idx == m.shape[0]] = 0

# Slice the Cartesian coordinates
coords13 = m[idx]
coords2 = m[..., None, :]

# Construct (3D) angle- and distance-matrices
with np.errstate(divide='ignore', invalid='ignore'):
vec = ((coords13 - coords2) / dist[..., None])
ang = np.arccos(np.einsum('jkl,jml->jkm', vec, vec))
dist = np.maximum(dist[..., None], dist[..., None, :])
ang[np.isnan(ang)] = 0.0
ang = np.degrees(ang).astype(int) # Radian (float) to degrees (int)

# Construct and return the ADF
ret: List[np.ndarray] = []
ret_append = ret.append
for i, j, k in idx_list:
ijk = j[:, None, None] & i[idx][..., None] & k[idx][..., None, :]
weights = weight(dist[ijk]) if weight is not None else None
ret_append(get_adf(ang[ijk], weights=weights))
return ret

@staticmethod
def _adf_inner(m: MultiMolecule,
idx_list: Iterable[Tuple[np.ndarray, np.ndarray, np.ndarray]],
weight: Callable[[np.ndarray], np.ndarray]) -> List[np.ndarray]:
"""Perform the loop of :meth:`.init_adf` without a distance cutoff."""
# Construct a distance matrix
dist = cdist(m, m)

# Slice the Cartesian coordinates
coords13 = m
coords2 = m[..., None, :]

# Construct (3D) angle- and distance-matrices
with np.errstate(divide='ignore', invalid='ignore'):
vec = ((coords13 - coords2) / dist[..., None])
ang = np.arccos(np.einsum('jkl,jml->jkm', vec, vec))
dist = np.maximum(dist[..., None], dist[..., None, :])
ang[np.isnan(ang)] = 0.0
ang = np.degrees(ang).astype(int) # Radian (float) to degrees (int)

# Construct and return the ADF
ret: List[np.ndarray] = []
ret_append = ret.append
for i, j, k in idx_list:
ijk = j[:, None, None] & i[..., None] & k[..., None, :]
weights = weight(dist[ijk]) if weight is not None else None
ret_append(get_adf(ang[ijk], weights=weights))
return ret

@overload
def get_angle_mat(self, mol_subset: MolSubset = ..., atom_subset: Tuple[AtomSubset, AtomSubset, AtomSubset] = ..., get_r_max: Literal[False] = ...) -> np.ndarray: ... # noqa: E501
@overload
def get_angle_mat(self, mol_subset: MolSubset = ..., atom_subset: Tuple[AtomSubset, AtomSubset, AtomSubset] = ..., get_r_max: Literal[True] = ...) -> Tuple[np.ndarray, float]: ... # noqa: E501
def get_angle_mat(self, mol_subset=0, atom_subset=(None, None, None), get_r_max=False): # noqa: E301, E501
"""Create and return an angle matrix for all molecules and atoms in this instance.
Parameters
----------
mol_subset : :class:`slice`, optional
Perform the calculation on a subset of molecules in this instance, as
determined by their moleculair index.
Include all :math:`m` molecules in this instance if :data:`None`.
atom_subset : :class:`Sequence[str] <collections.abc.Sequence>`, optional
Perform the calculation on a subset of atoms in this instance, as
determined by their atomic index or atomic symbol.
Include all :math:`n` atoms per molecule in this instance if :data:`None`.
get_r_max : :class:`bool`
Whether or not the maximum distance should be returned or not.
Returns
-------
:class:`np.ndarray[np.float64] <numpy.ndarray>`, shape :math:`(m, n, k, l)`
A 4D angle matrix of :math:`m` molecules, created out of three sets of :math:`n`,
:math:`k` and :math:`l` atoms.
If **get_r_max** = :data:`True`, also return the maximum distance.
"""
# Define array slices
m_subset = self._get_mol_subset(mol_subset)
i = self._get_atom_subset(atom_subset[0])
j = self._get_atom_subset(atom_subset[1])
k = self._get_atom_subset(atom_subset[2])

# Slice and broadcast the XYZ array
A = self[m_subset][:, i][..., None, :]
B = self[m_subset][:, j][:, None, ...]
C = self[m_subset][:, k][:, None, ...]

# Temporary ignore RuntimeWarnings related to dividing by zero
with np.errstate(divide='ignore', invalid='ignore'):
# Prepare the unit vectors
kwarg1 = {'atom_subset': [atom_subset[0], atom_subset[1]], 'mol_subset': m_subset}
kwarg2 = {'atom_subset': [atom_subset[0], atom_subset[2]], 'mol_subset': m_subset}
dist_mat1 = self.get_dist_mat(**kwarg1)[..., None]
dist_mat2 = self.get_dist_mat(**kwarg2)[..., None]
r_max = max(dist_mat1.max(), dist_mat2.max())
unit_vec1 = (B - A) / dist_mat1
unit_vec2 = (C - A) / dist_mat2

# Create and return the angle matrix
if get_r_max:
return np.arccos(np.einsum('ijkl,ijml->ijkm', unit_vec1, unit_vec2)), r_max
return np.arccos(np.einsum('ijkl,ijml->ijkm', unit_vec1, unit_vec2))

@overload
def _get_atom_subset(self, atom_subset: AtomSubset, as_array: Literal[False] = ...) -> Union[slice, np.ndarray[Any, np.dtype[np.intp]]]: ... # noqa: E501
@overload
Expand Down
104 changes: 99 additions & 5 deletions FOX/functions/adf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,105 @@
"""

from typing import Sequence, Hashable, Optional
from __future__ import annotations

from typing import (
Sequence,
Hashable,
Iterable,
Callable,
TypeVar,
Tuple,
List,
Any,
TYPE_CHECKING,
)

import numpy as np
import pandas as pd
from scipy.spatial import cKDTree
from scipy.spatial.distance import cdist

if TYPE_CHECKING:
_T = TypeVar("_T")
_SCT = TypeVar("_SCT", bound=np.generic)

_3Tuple = Tuple[_T, _T, _T]
NDArray = np.ndarray[Any, np.dtype[_SCT]]

__all__ = ['get_adf_df', 'get_adf']


def _adf_inner_cdktree(
m: NDArray[np.float64],
n: int,
r_max: float,
idx_list: Iterable[_3Tuple[NDArray[np.integer[Any]]]],
boxsize: None | NDArray[np.float64],
weight: None | Callable[[NDArray[np.float64]], NDArray[np.float64]] = None,
) -> List[NDArray[np.float64]]:
"""Perform the loop of :meth:`.init_adf` with a distance cutoff."""
# Construct slices and a distance matrix
tree = cKDTree(m, boxsize=boxsize)
dist, idx = tree.query(m, n, distance_upper_bound=r_max, p=2) # type: NDArray[np.float64], NDArray[np.intp] # noqa: E501
dist[dist == np.inf] = 0.0
idx[idx == m.shape[0]] = 0

# Slice the Cartesian coordinates
coords13: NDArray[np.float64] = m[idx]
coords2: NDArray[np.float64] = m[..., None, :]

# Construct (3D) angle- and distance-matrices
with np.errstate(divide='ignore', invalid='ignore'):
vec: NDArray[np.float64] = ((coords13 - coords2) / dist[..., None])
ang: NDArray[np.float64] = np.arccos(np.einsum('jkl,jml->jkm', vec, vec))
dist = np.maximum(dist[..., None], dist[..., None, :])
ang[np.isnan(ang)] = 0.0

# Radian (float) to degrees (int)
ang_int: NDArray[np.int64] = np.degrees(ang).astype(np.int64)

# Construct and return the ADF
ret = []
for i, j, k in idx_list:
ijk: NDArray[np.bool_] = j[:, None, None] & i[idx][..., None] & k[idx][..., None, :]
weights = weight(dist[ijk]) if weight is not None else None
ret.append(get_adf(ang_int[ijk], weights=weights))
return ret


def _adf_inner(
m: NDArray[np.float64],
idx_list: Iterable[_3Tuple[NDArray[np.integer[Any]]]],
weight: None | Callable[[NDArray[np.float64]], NDArray[np.float64]] = None,
) -> List[NDArray[np.float64]]:
"""Perform the loop of :meth:`.init_adf` without a distance cutoff."""
# Construct a distance matrix
dist: NDArray[np.float64] = cdist(m, m)

# Slice the Cartesian coordinates
coords13: NDArray[np.float64] = m
coords2: NDArray[np.float64] = m[..., None, :]

# Construct (3D) angle- and distance-matrices
with np.errstate(divide='ignore', invalid='ignore'):
vec: NDArray[np.float64] = ((coords13 - coords2) / dist[..., None])
ang: NDArray[np.float64] = np.arccos(np.einsum('jkl,jml->jkm', vec, vec))
dist = np.maximum(dist[..., None], dist[..., None, :])
ang[np.isnan(ang)] = 0.0

# Radian (float) to degrees (int)
ang_int: NDArray[np.int64] = np.degrees(ang).astype(np.int64)

# Construct and return the ADF
ret = []
for i, j, k in idx_list:
ijk: NDArray[np.bool_] = j[:, None, None] & i[..., None] & k[..., None, :]
weights = weight(dist[ijk]) if weight is not None else None
ret.append(get_adf(ang_int[ijk], weights=weights))
return ret


def get_adf_df(atom_pairs: Sequence[Hashable]) -> pd.DataFrame:
"""Construct and return a pandas dataframe filled to hold angular distribution functions.
Expand All @@ -43,7 +134,10 @@ def get_adf_df(atom_pairs: Sequence[Hashable]) -> pd.DataFrame:
return df


def get_adf(ang: np.ndarray, weights: Optional[np.ndarray] = None) -> np.ndarray:
def get_adf(
ang: NDArray[np.integer[Any]],
weights: None | NDArray[np.number[Any]] = None,
) -> NDArray[np.float64]:
r"""Calculate and return the angular distribution function (ADF).
Parameters
Expand All @@ -65,14 +159,14 @@ def get_adf(ang: np.ndarray, weights: Optional[np.ndarray] = None) -> np.ndarray
"""
# Calculate and normalize the density
denominator = len(ang) / 180
at_count = np.bincount(ang, minlength=181)[1:181]
dens = at_count / denominator
at_count: NDArray[np.int64] = np.bincount(ang, minlength=181)[1:181]
dens: NDArray[np.float64] = at_count / denominator

if weights is None:
return dens

# Weight (and re-normalize) the density based on the distance matrix **dist**
area = dens.sum()
area: np.float64 = dens.sum()
with np.errstate(divide='ignore', invalid='ignore'):
dens *= np.bincount(ang, weights=weights, minlength=181)[1:181] / at_count
dens *= area / np.nansum(dens)
Expand Down
Binary file added tests/test_files/adf_periodic_2d.npy
Binary file not shown.
Binary file added tests/test_files/adf_periodic_3d.npy
Binary file not shown.

0 comments on commit 40a1cce

Please sign in to comment.