Skip to content

Commit

Permalink
Merge pull request #257 from nlesc-nano/rdf
Browse files Browse the repository at this point in the history
ENH: Add the new `atom_pairs` keyword to `init_rdf` and `init_adf`
  • Loading branch information
BvB93 committed Oct 28, 2021
2 parents 3c893ee + 1cc3824 commit 6d2a241
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 203 deletions.
242 changes: 86 additions & 156 deletions FOX/classes/multi_mol.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
)
from typing import (
Sequence, Optional, Union, List, Hashable, Callable, Iterable, Dict, Tuple, Any, Mapping,
overload, TypeVar, Type, Container, cast, TYPE_CHECKING, Sized, Iterator
overload, TypeVar, Type, Container, cast, TYPE_CHECKING, Sized, Iterator, NoReturn
)

import numpy as np
Expand Down Expand Up @@ -87,6 +87,30 @@ def __getitem__(self, key: object) -> None:
return None


def _parse_atom_pairs(
mol: MultiMolecule,
atom_pairs: Iterable[tuple[str, str]],
) -> dict[str, list[npt.NDArray[np.intp]]]:
"""Helper function for translating atom-pairs into a dict of indice-array-pairs."""
pair_dict = {}
for atoms in atom_pairs:
key = " ".join(atoms)

idx_list = []
try:
for at in atoms:
idx = mol.atoms.get(at)
if idx is None:
super_at, slc = mol.atoms_alias[at]
idx = mol.atoms[super_at][slc]
idx_list.append(idx)
except KeyError as ex:
raise ValueError(f"Unknown atom type: {ex}") from None

pair_dict[key] = idx_list
return pair_dict


class MultiMolecule(_MultiMolecule):
"""A class designed for handling a and manipulating large numbers of molecules.
Expand Down Expand Up @@ -1217,158 +1241,36 @@ def _get_loop(self, atom_subset): # noqa: E301

"""############################# Determining shell structures ##########################"""

def init_shell_search(self, mol_subset: MolSubset = None,
atom_subset: AtomSubset = None,
rdf_cutoff: float = 0.5
) -> Tuple[pd.DataFrame, pd.Series, pd.DataFrame]:
def init_shell_search(
self,
mol_subset: MolSubset = None,
atom_subset: AtomSubset = None,
rdf_cutoff: float = 0.5
) -> NoReturn:
"""Calculate and return properties which can help determining shell structures.
The following two properties are calculated and returned:
* The mean distance (per atom) with respect to the center of mass (*i.e.* a modified RMSF).
* A series mapping abritrary atomic indices in the RMSF to the actual atomic indices.
* The radial distribution function (RDF) with respect to the center of mass.
Parameters
----------
mol_subset : :class:`slice`, optional
Perform the calculation on a subset of molecules in this instance, as
determined by their moleculair index.
Include all :math:`m` molecules in this instance if :data:`None`.
atom_subset : :class:`Sequence[str] <collections.abc.Sequence>`, optional
Perform the calculation on a subset of atoms in this instance, as
determined by their atomic index or atomic symbol.
Include all :math:`n` atoms per molecule in this instance if :data:`None`.
rdf_cutoff : :class:`float`
Remove all values in the RDF below this value (Angstrom).
Usefull for dealing with divergence as the "inter-atomic" distance approaches 0.0 A.
Returns
Warning
-------
:class:`pd.DataFrame <pandas.DataFrame>`, :class:`pd.Series <pandas.Series>` and :class:`pd.DataFrame <pandas.DataFrame>`
Returns the following items:
* A dataframe holding the mean distance of all atoms with respect the to center
of mass.
* A series mapping the indices from 1. to the actual atomic indices.
* A dataframe holding the RDF with respect to the center of mass.
Depercated.
""" # noqa: E501
def _get_mean_dist(mol_cp, at):
ret = np.linalg.norm(mol_cp[:, mol_cp.atoms[at]], axis=2).mean(axis=0)
at_idx = np.argsort(ret)
return at_idx, sorted(ret)

# Prepare slices
i = self._get_mol_subset(mol_subset)
at_subset = atom_subset or tuple(self.atoms.keys())

# Calculate the mean distance (per atom) with respect to the center of mass
# Conceptually similar an RMSF, the "fluctuation" being with respect to the center of mass
mol_cp = self.copy()[i]
mol_cp -= mol_cp.get_center_of_mass()[:, None, :]
at_idx, dist_mean = zip(*[_get_mean_dist(mol_cp, at) for at in at_subset])

# Create Series containing the actual atomic indices
at_idx = list(chain.from_iterable(at_idx))
idx_series = pd.Series(np.arange(0, self.shape[1]), name='Actual atomic index')
idx_series.loc[0:len(at_idx)-1] = at_idx
idx_series.index.name = 'Arbitrary atomic index'

# Cast the modified RMSF results in a dataframe
index = np.arange(0, self.shape[1])
kwargs = {'loop': True, 'atom_subset': at_subset}
columns, data = mol_cp._get_rmsf_columns(dist_mean, index, **kwargs)
rmsf = pd.DataFrame(data, columns=columns, index=index)
rmsf.columns.name = 'Distance from origin\n / Ångström'
rmsf.index.name = 'Arbitrary atomic index'

# Calculate the RDF with respect to the center of mass
at_dummy = np.zeros_like(mol_cp[:, 0, :])[:, None, :]
mol_cp = MultiMolecule(np.hstack((mol_cp, at_dummy)), atoms=mol_cp.atoms)
mol_cp._atoms['origin'] = np.array([mol_cp.shape[1] - 1], dtype=np.intp)
at_subset2 = ('origin',) + at_subset
with np.errstate(divide='ignore', invalid='ignore'):
rdf = mol_cp.init_rdf(atom_subset=at_subset2)
del rdf['origin origin']
rdf = rdf.loc[rdf.index >= rdf_cutoff, [i for i in rdf.columns if 'origin' in i]]

return rmsf, idx_series, rdf
cls = type(self)
raise DeprecationWarning(f"`{cls.__name__}.init_shell_search` is deprecated")

@staticmethod
def get_at_idx(rmsf: pd.DataFrame,
idx_series: pd.Series,
dist_dict: Dict[str, List[float]]) -> Dict[str, List[int]]:
def get_at_idx(
rmsf: pd.DataFrame,
idx_series: pd.Series,
dist_dict: Dict[str, List[float]],
) -> NoReturn:
"""Create subsets of atomic indices.
The subset is created (using **rmsf** and **idx_series**) based on
distance criteria in **dist_dict**.
For example, ``dist_dict = {'Cd': [3.0, 6.5]}`` will create and return a dictionary with
three keys: One for all atoms whose RMSF is smaller than 3.0, one where the RMSF is
between 3.0 and 6.5, and finally one where the RMSF is larger than 6.5.
Examples
--------
.. code:: python
>>> dist_dict = {'Cd': [3.0, 6.5]}
>>> idx_series = pd.Series(np.arange(12))
>>> rmsf = pd.DataFrame({'Cd': np.arange(12, dtype=float)})
>>> get_at_idx(rmsf, idx_series, dist_dict)
{'Cd_1': [0, 1, 2],
'Cd_2': [3, 4, 5],
'Cd_3': [7, 8, 9, 10, 11]
}
Parameters
----------
rmsf : :class:`pd.DataFrame <pandas.DataFrame>`
A dataframe holding the results of an RMSF calculation.
idx_series : :class:`pd.DataFrame <pandas.Series>`
A series mapping the indices from **rmsf** to actual atomic indices.
dist_dict : :class:`dict[str, list[float]] <dict>`
A dictionary with atomic symbols (see **rmsf.columns**)
and a list of interatomic distances.
Returns
Warning
-------
:class:`dict[str, list[int]] <dict>`
A dictionary with atomic symbols as keys, and matching atomic indices as values.
Raises
------
KeyError
Raised if a key in **dist_dict** is absent from **rmsf**.
Depercated.
"""
# Double check if all keys in **dist_dict** are available in **rmsf.columns**
for key in dist_dict:
if key not in rmsf:
err = "'{}' was found in 'dist_dict' yet is absent from 'rmsf'"
raise KeyError(err.format(key))

ret = {}
for key, value in rmsf.items():
try:
dist_range = sorted(dist_dict[key])
except KeyError:
dist_range = [np.inf]
dist_min = 0.0
name = key + '_{:d}'

for i, dist_max in enumerate(dist_range, 1):
idx = rmsf[(value >= dist_min) & (value < dist_max)].index
if idx.any():
ret[name.format(i)] = sorted(idx_series[idx].values.tolist())
dist_min = dist_max

idx = rmsf[(rmsf[key] > dist_max)].index
if idx.any():
ret[name.format(i+1)] = sorted(idx_series[idx].values.tolist())
return ret
raise DeprecationWarning("`MultiMolecule.get_at_idx` is deprecated")

"""############################# Radial Distribution Functions ##########################"""

Expand All @@ -1380,6 +1282,7 @@ def init_rdf(
dr: float = 0.05,
r_max: float = 12.0,
periodic: None | Sequence[Literal["x", "y", "z"]] | Sequence[Literal[0, 1, 2]] = None,
atom_pairs: None | Iterable[tuple[str, str]] = None,
) -> pd.DataFrame:
"""Initialize the calculation of radial distribution functions (RDFs).
Expand All @@ -1405,6 +1308,9 @@ def init_rdf(
If specified, correct for the systems periodicity if
:attr:`self.lattice is not None <MultiMolecule.lattice>`.
Accepts ``"x"``, ``"y"`` and/or ``"z"``.
atom_pairs : :class:`Iterable[tuple[str, str]] <collections.abc.Iterable>`
An explicit list of atom-pairs for the to-be calculated distances.
Note that **atom_pairs** and **atom_subset** are mutually exclusive.
Returns
-------
Expand All @@ -1415,14 +1321,18 @@ def init_rdf(
Radii are used as index.
"""
# If **atom_subset** is None: extract atomic symbols from they keys of **self.atoms**
if atom_subset is not None:
atom_pairs = self.get_pair_dict(atom_subset, r=2)
if atom_subset is not None and atom_pairs is not None:
raise TypeError("`atom_subset` and `atom_pairs` are mutually exclusive")
elif atom_pairs is not None:
pair_dict = _parse_atom_pairs(self, atom_pairs)
elif atom_subset is not None:
pair_dict = self.get_pair_dict(atom_subset, r=2)
else:
atom_pairs = self.get_pair_dict(sorted(self.atoms, key=str), r=2)
# If **atom_subset** is None: extract atomic symbols from they keys of **self.atoms**
pair_dict = self.get_pair_dict(sorted(self.atoms, key=str), r=2)

# Construct an empty dataframe with appropiate dimensions, indices and keys
df = get_rdf_df(atom_pairs, dr, r_max)
df = get_rdf_df(pair_dict, dr, r_max)

# Define the subset
m_subset = self._get_mol_subset(mol_subset)
Expand All @@ -1443,7 +1353,7 @@ def init_rdf(

# Fill the dataframe with RDF's, averaged over all conformations in this instance
n_mol = len(m_self)
for key, (i, j) in atom_pairs.items():
for key, (i, j) in pair_dict.items():
shape = n_mol, len(i), len(j)
iterator = slice_iter(shape, m_self.dtype.itemsize)
for slc in iterator:
Expand Down Expand Up @@ -1737,9 +1647,11 @@ def init_adf(
self,
mol_subset: MolSubset = None,
atom_subset: AtomSubset = None,
*,
r_max: Union[float, str] = 8.0,
weight: Callable[[np.ndarray], np.ndarray] = neg_exp,
periodic: None | Sequence[Literal["x", "y", "z"]] | Sequence[Literal[0, 1, 2]] = None,
atom_pairs: None | Iterable[tuple[str, str]] = None,
) -> pd.DataFrame:
r"""Initialize the calculation of distance-weighted angular distribution functions (ADFs).
Expand Down Expand Up @@ -1772,6 +1684,9 @@ def init_adf(
If specified, correct for the systems periodicity if
:attr:`self.lattice is not None <MultiMolecule.lattice>`.
Accepts ``"x"``, ``"y"`` and/or ``"z"``.
atom_pairs : :class:`Iterable[tuple[str, str, str]] <collections.abc.Iterable>`
An explicit list of atom-triples for the to-be calculated angles.
Note that **atom_pairs** and **atom_subset** are mutually exclusive.
Returns
-------
Expand All @@ -1798,21 +1713,36 @@ def init_adf(

# Identify atom and molecule subsets
m_subset = self._get_mol_subset(mol_subset)
at_subset = self._get_atom_subset(atom_subset, as_array=True)
if atom_subset is not None and atom_pairs is not None:
raise TypeError("`atom_subset` and `atom_pairs` are mutually exclusive")
elif atom_pairs is not None:
if not isinstance(atom_pairs, abc.Collection):
atom_pairs = list(atom_pairs)
at_subset = self._get_atom_subset(list(chain.from_iterable(atom_pairs)), as_array=True)
else:
at_subset = self._get_atom_subset(atom_subset, as_array=True)

# Slice this MultiMolecule instance based on **atom_subset** and **mol_subset**
del_atom = np.ones(self.shape[1], dtype=bool)
del_atom[at_subset] = False
mol = self.delete_atoms(del_atom)[m_subset]

atom_pairs = mol.get_pair_dict(atom_subset or sorted(mol.atoms, key=str), r=3)
for k, v in atom_pairs.items():
if atom_pairs is not None:
at_pairs = _parse_atom_pairs(self, atom_pairs)
elif atom_subset is not None:
at_pairs = mol.get_pair_dict(atom_subset or sorted(mol.atoms, key=str), r=3)
else:
at_pairs = mol.get_pair_dict(sorted(mol.atoms, key=str), r=3)

for k, v in at_pairs.items():
v_new = []
for at in v:
bool_ar = np.zeros(mol.shape[1], dtype=bool)
bool_ar[mol.atoms[at] if isinstance(at, str) else at] = True
v_new.append(bool_ar)
atom_pairs[k] = v_new
at_pairs[k] = v_new

# import pdb; pdb.set_trace()

# Periodic calculations
if periodic is not None:
Expand Down Expand Up @@ -1840,24 +1770,24 @@ def init_adf(
# Perform the task in parallel (with dask) if possible
if DASK_EX is None and r_max_:
func = dask.delayed(_adf_inner_cdktree)
jobs = [func(m, n, r_max_, atom_pairs.values(), l, p, weight) for m, l, p in
jobs = [func(m, n, r_max_, at_pairs.values(), l, p, weight) for m, l, p in
zip(mol, lattice_iter, periodic_iter)]
results = dask.compute(*jobs)
elif DASK_EX is None and not r_max_:
func = dask.delayed(_adf_inner)
jobs = [func(m, atom_pairs.values(), l, p, weight) for m, l, p in
jobs = [func(m, at_pairs.values(), l, p, weight) for m, l, p in
zip(mol, lattice_iter, periodic_iter)]
results = dask.compute(*jobs)
elif DASK_EX is not None and r_max_:
func = _adf_inner_cdktree
results = [func(m, n, r_max_, atom_pairs.values(), l, p, weight) for m, l, p in
results = [func(m, n, r_max_, at_pairs.values(), l, p, weight) for m, l, p in
zip(mol, lattice_iter, periodic_iter)]
elif DASK_EX is not None and not r_max_:
func = _adf_inner
results = [func(m, atom_pairs.values(), l, p, weight) for m, l, p in
results = [func(m, at_pairs.values(), l, p, weight) for m, l, p in
zip(mol, lattice_iter, periodic_iter)]

df = get_adf_df(atom_pairs)
df = get_adf_df(at_pairs)
df.loc[:, :] = np.array(results).mean(axis=0).T
return df

Expand Down
Binary file added tests/test_files/adf_atom_pairs.npy
Binary file not shown.
Binary file added tests/test_files/rdf_atom_pairs.npy
Binary file not shown.

0 comments on commit 6d2a241

Please sign in to comment.