Skip to content

Commit

Permalink
Added generic model and corresponding tests
Browse files Browse the repository at this point in the history
Refactored other models and tests

Signed-off-by: Timothy Click <tcthepoet@yahoo.com>
  • Loading branch information
Timothy Click authored and Timothy Click committed Jan 23, 2019
1 parent 5a3c8a6 commit ca9d9d3
Show file tree
Hide file tree
Showing 19 changed files with 1,234 additions and 1,187 deletions.
135 changes: 72 additions & 63 deletions src/fluctmatch/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
Atomids, Atomnames, Atomtypes, Charges, Masses, Radii, Resids, Resnames,
Resnums, Segids, TopologyAttr, Angles, Dihedrals, Impropers)
from MDAnalysis.core.topologyobjects import TopologyGroup
from MDAnalysis.coordinates.base import ProtoReader
from MDAnalysis.coordinates.memory import MemoryReader
from MDAnalysis.topology import base as topbase
from MDAnalysis.topology import guessers
Expand Down Expand Up @@ -212,8 +211,7 @@ def create_topology(self, universe: mda.Universe):
self._add_charges(universe)

def generate_bonds(self):
"""Generate connectivity information for the new system.
"""
"""Generate connectivity information for the new system."""
if not hasattr(self, "universe"):
raise AttributeError("Topologies need to be created before bonds "
"can be added.")
Expand All @@ -238,9 +236,8 @@ def add_trajectory(self, universe: mda.Universe):
if not hasattr(universe, "trajectory"):
raise AttributeError("The provided universe does not have "
"coordinates defined.")
univ_traj: ProtoReader = universe.trajectory
univ_traj.rewind()
universe.trajectory.rewind()
trajectory = universe.trajectory
trajectory.rewind()

selections: Generator = itertools.product(universe.residues,
self._mapping.values())
Expand All @@ -254,50 +251,55 @@ def add_trajectory(self, universe: mda.Universe):
bead: mda.AtomGroup = residue.atoms.select_atoms(selection)
beads.append(bead)

coordinate_array: List[np.ndarray] = []
trajectory2 = self.universe.trajectory
trajectory2.ts.has_positions = trajectory.ts.has_positions
trajectory2.ts.has_velocities = trajectory.ts.has_velocities
trajectory2.ts.has_forces = trajectory.ts.has_forces

position_array: List[np.ndarray] = []
velocity_array: List[np.ndarray] = []
force_array: List[np.ndarray] = []
dimensions_array: List[np.ndarray] = []
for ts in univ_traj:
dimensions_array.append(ts._unitcell)
dimension_array: List[np.ndarray] = []
for ts in trajectory:
dimension_array.append(ts.dimensions)

# Positions
if self.universe.trajectory.ts.has_positions and ts.has_positions:
coordinates = [
try:
positions = [
bead.center_of_mass()
if self._com
else bead.center_of_geometry()
for bead in beads
if bead
]
coordinate_array.append(np.asarray(coordinates))
position_array.append(np.asarray(positions))
except (AttributeError, mda.NoDataError):
pass

# Velocities
if self.universe.trajectory.ts.has_velocities and ts.has_velocities:
try:
velocities = [bead.velocities for bead in beads if bead]
velocity_array.append(np.asarray(velocities))
except ValueError:
pass
try:
velocities = [bead.velocities for bead in beads if bead]
velocity_array.append(np.asarray(velocities))
except (AttributeError, mda.NoDataError):
pass

# Forces
if self.universe.trajectory.ts.has_forces and ts.has_forces:
try:
forces = [bead.velocities for bead in beads if bead]
force_array.append(np.asarray(forces))
except ValueError:
pass

self.universe.trajectory.dimensions_array: np.ndarray = np.asarray(
dimensions_array)
if self.universe.trajectory.ts.has_positions:
coordinate_array: np.ndarray = np.asarray(coordinate_array)
self.universe.load_new(coordinate_array, format=MemoryReader)
if self.universe.trajectory.ts.has_velocities:
self.universe.trajectory.velocity_array: np.ndarray = np.asarray(
try:
forces = [bead.velocities for bead in beads if bead]
force_array.append(np.asarray(forces))
except (AttributeError, mda.NoDataError):
pass

trajectory2.dimensions_array: np.ndarray = np.asarray(dimension_array)
if trajectory2.ts.has_positions:
position_array: np.ndarray = np.asarray(position_array)
self.universe.load_new(position_array, format=MemoryReader,
dimensions=dimension_array)
if trajectory2.ts.has_velocities:
trajectory2.velocity_array: np.ndarray = np.asarray(
velocity_array)
if self.universe.trajectory.ts.has_forces:
self.universe.trajectory.force_array: np.ndarray = np.asarray(
if trajectory2.ts.has_forces:
trajectory2.force_array: np.ndarray = np.asarray(
force_array)
universe.trajectory.rewind()

Expand Down Expand Up @@ -331,11 +333,16 @@ def _add_masses(self, universe: mda.Universe):
select_residues: Generator = itertools.product(residues,
self._selection.values())

masses: np.ndarray = np.fromiter([
res.select_atoms(selection).total_mass()
for res, selection in select_residues
if res.select_atoms(selection)
], dtype=np.float32)
try:
masses: np.ndarray = np.fromiter([
res.select_atoms(selection).total_mass()
for res, selection in select_residues
if res.select_atoms(selection)
], dtype=np.float32)
except (AttributeError, mda.NoDataError):
masses: np.ndarray = np.zeros(self.universe.atoms.n_atoms,
dtype=np.float32)

self.universe.add_TopologyAttr(Masses(masses))

def _add_charges(self, universe: mda.Universe):
Expand All @@ -349,7 +356,7 @@ def _add_charges(self, universe: mda.Universe):
for res, selection in select_residues
if res.select_atoms(selection)
], dtype=np.float32)
except AttributeError:
except (AttributeError, mda.NoDataError):
charges: np.ndarray = np.zeros(self.universe.atoms.n_atoms,
dtype=np.float32)

Expand Down Expand Up @@ -400,62 +407,64 @@ def Merge(*args: MDUniverse) -> mda.Universe:

# Merge universes
universe: mda.Universe = mda.Merge(*[u.atoms for u in args])
trajectory: mda._READERS[universe.trajectory.format] = universe.trajectory

# Merge coordinates
for u in args:
u.trajectory.rewind()

universe1: mda.Universe = args[0]
universe.trajectory.ts.has_velocities: bool = (
universe1.trajectory.ts.has_velocities
trajectory1 = universe1.trajectory
trajectory.ts.has_velocities: bool = (
trajectory1.ts.has_velocities
)
universe.trajectory.ts.has_forces: bool = universe1.trajectory.ts.has_forces
trajectory.ts.has_forces: bool = trajectory1.ts.has_forces
frames: np.ndarray = np.fromiter(
[u.trajectory.n_frames == universe1.trajectory.n_frames for u in args],
[u.trajectory.n_frames == trajectory1.n_frames for u in args],
dtype=bool)
if not all(frames):
msg: str = "The trajectories are not the same length."
logger.error(msg)
raise ValueError(msg)

dimensions_array: np.ndarray = (np.mean(
universe1.trajectory.dimensions_array, axis=0) if hasattr(
universe1.trajectory, "dimensions_array") else np.asarray(
[ts.triclinic_dimensions for ts in universe1.trajectory]))
dimensions: np.ndarray = (
trajectory1.dimensions_array
if hasattr(trajectory1, "dimensions_array")
else np.asarray([ts.dimensions for ts in trajectory1]))

universe1.universe.trajectory.rewind()
if universe1.universe.trajectory.n_frames > 1:
coordinates: List[List[np.ndarray]] = []
trajectory1.rewind()
if trajectory1.n_frames > 1:
positions: List[List[np.ndarray]] = []
velocities: List[List[np.ndarray]] = []
forces: List[List[np.ndarray]] = []

# Accumulate coordinates, velocities, and forces.
for u in args:
coordinates.append(
positions.append(
[ts.positions for ts in u.trajectory if ts.has_positions])
velocities.append(
[ts.velocities for ts in u.trajectory if ts.has_velocities])
forces.append([ts.forces for ts in u.trajectory if ts.has_forces])

if universe.trajectory.ts.has_positions:
coordinates: np.ndarray = np.concatenate(coordinates, axis=1)
if universe.atoms.n_atoms != coordinates.shape[1]:
if trajectory.ts.has_positions:
positions: np.ndarray = np.concatenate(positions, axis=1)
if universe.atoms.n_atoms != positions.shape[1]:
msg = ("The number of sites does not match the number of "
"coordinates.")
logger.error(msg)
raise RuntimeError(msg)
n_frames, n_beads, _ = coordinates.shape
n_frames, n_beads, _ = positions.shape
logger.info(f"The new universe has {n_beads:d} beads in "
f"{n_frames:d} frames.")
universe.load_new(positions, format=MemoryReader,
dimensions=dimensions)

universe.load_new(coordinates, format=MemoryReader)
universe.trajectory.dimensions_array = dimensions_array.copy()
if universe.trajectory.ts.has_velocities:
if trajectory.ts.has_velocities:
velocities: np.ndarray = np.concatenate(velocities, axis=1)
universe.trajectory.velocity_array: np.ndarray = velocities.copy()
if universe.trajectory.ts.has_forces:
trajectory.velocity_array: np.ndarray = velocities.copy()
if trajectory.ts.has_forces:
forces: np.ndarray = np.concatenate(forces, axis=1)
universe.trajectory.force_array: np.ndarray = forces.copy()
trajectory.force_array: np.ndarray = forces.copy()

return universe

Expand Down
71 changes: 44 additions & 27 deletions src/fluctmatch/models/core.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,50 @@
# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding: utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
# -*- coding: utf-8 -*-
#
# fluctmatch --- https://github.com/tclick/python-fluctmatch
# Copyright (c) 2013-2017 The fluctmatch Development Team and contributors
# (see the file AUTHORS for the full list of names)
# python-fluctmatch -
# Copyright (c) 2019 Timothy H. Click, Ph.D.
#
# Released under the New BSD license.
# All rights reserved.
#
# Please cite your use of fluctmatch in published work:
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# Timothy H. Click, Nixon Raj, and Jhih-Wei Chu.
# Calculation of Enzyme Fluctuograms from All-Atom Molecular Dynamics
# Simulation. Meth Enzymology. 578 (2016), 327-342,
# doi:10.1016/bs.mie.2016.05.024.
# Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# Neither the name of the author nor the names of its contributors may be used
# to endorse or promote products derived from this software without specific
# prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS IS”
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Timothy H. Click, Nixon Raj, and Jhih-Wei Chu.
# Simulation. Meth Enzymology. 578 (2016), 327-342,
# Calculation of Enzyme Fluctuograms from All-Atom Molecular Dynamics
# doi:10.1016/bs.mie.2016.05.024.

import logging
import traceback
from typing import List

import MDAnalysis as mda

from .. import _MODELS
from .base import Merge
from .base import Merge, ModelBase

logger = logging.getLogger(__name__)
logger: logging.Logger = logging.getLogger(__name__)


def modeller(*args, **kwargs) -> mda.Universe:
Expand All @@ -41,29 +62,25 @@ def modeller(*args, **kwargs) -> mda.Universe:
models: List[str] = [_.upper() for _ in kwargs.pop("model", ["polar",])]
try:
if "ENM" in models:
logger.warning(
"ENM model detected. All other models are being ignored."
)
universe = _MODELS["ENM"](*args, **kwargs)
return universe
logger.warning("ENM model detected. All other models are "
"being ignored.")
model: ModelBase = _MODELS["ENM"]()
return model.transform(mda.Universe(*args, **kwargs))
except Exception as exc:
logger.exception(
"An error occurred while trying to create the universe."
)
logger.exception("An error occurred while trying to create "
"the universe.")
raise RuntimeError from exc

try:
universe: List[mda.Universe] = [
_MODELS[_](*args, **kwargs)
_MODELS[_]().transform(mda.Universe(*args, **kwargs))
for _ in models
]
except KeyError:
tb: List[str] = traceback.format_exc()
msg = (
f"One of the models is not implemented. Please try {_MODELS.keys()}"
)
msg = (f"One of the models is not implemented. "
f"Please try {_MODELS.keys()}")
logger.exception(msg)
raise KeyError(msg).with_traceback(tb)
else:
universe: mda.Universe = Merge(*universe) if len(universe) > 1 else universe[0]
return universe
return Merge(*universe)
18 changes: 7 additions & 11 deletions src/fluctmatch/models/enm.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
# doi:10.1016/bs.mie.2016.05.024.
"""Class for elastic network model."""

from typing import List, Optional, Tuple
from typing import ClassVar, List, Optional, Tuple

import MDAnalysis as mda
from MDAnalysis.core.topologyattrs import Atomtypes, Charges, Bonds
Expand Down Expand Up @@ -90,16 +90,12 @@ class Enm(ModelBase):
The transformed universe
"""
model: str = "ENM"
describe: str = "Elastic network model"

def __init__(self,
xplor: bool = True,
extended: bool = True,
com: bool = True,
guess_angles: bool = False,
cutoff: float = 10.0,
min_cutoff: Optional[float] = None,
model: ClassVar[str] = "ENM"
describe: ClassVar[str] = "Elastic network model"

def __init__(self, xplor: bool = True, extended: bool = True,
com: bool = True, guess_angles: bool = False,
cutoff: float = 10.0, min_cutoff: Optional[float] = None,
charges: bool = False):
super().__init__(xplor, extended, com, guess_angles, cutoff)

Expand Down
Loading

0 comments on commit ca9d9d3

Please sign in to comment.