diff --git a/physical_validation/data/ensemble_data.py b/physical_validation/data/ensemble_data.py index a4b0069..9cd00f4 100644 --- a/physical_validation/data/ensemble_data.py +++ b/physical_validation/data/ensemble_data.py @@ -14,6 +14,7 @@ Data structures carrying simulation data. """ import warnings +from typing import Tuple from ..util import error as pv_error @@ -33,8 +34,8 @@ class EnsembleData(object): """ @staticmethod - def ensembles(): - return ("NVE", "NVT", "NPT", "muVT") + def ensembles() -> Tuple[str, str, str, str]: + return "NVE", "NVT", "NPT", "muVT" def __init__( self, @@ -100,41 +101,41 @@ def __init__( self.__t = temperature @property - def ensemble(self): + def ensemble(self) -> str: """Get ensemble""" return self.__ensemble @property - def natoms(self): + def natoms(self) -> int: """Get natoms""" return self.__n @property - def mu(self): + def mu(self) -> float: """Get mu""" return self.__mu @property - def volume(self): + def volume(self) -> float: """Get volume""" return self.__v @property - def pressure(self): + def pressure(self) -> float: """Get pressure""" return self.__p @property - def energy(self): + def energy(self) -> float: """Get energy""" return self.__e @property - def temperature(self): + def temperature(self) -> float: """Get temperature""" return self.__t - def __eq__(self, other): + def __eq__(self, other) -> bool: if type(other) is not type(self): return False return ( diff --git a/physical_validation/data/flatfile_parser.py b/physical_validation/data/flatfile_parser.py index d96fff8..4934bb4 100644 --- a/physical_validation/data/flatfile_parser.py +++ b/physical_validation/data/flatfile_parser.py @@ -13,7 +13,17 @@ r""" flatfile_parser.py """ -from . import ObservableData, SimulationData, TrajectoryData, parser +from typing import List, Optional + +from . import ( + EnsembleData, + ObservableData, + SimulationData, + SystemData, + TrajectoryData, + UnitData, + parser, +) class FlatfileParser(parser.Parser): @@ -26,20 +36,20 @@ def __init__(self): def get_simulation_data( self, - units=None, - ensemble=None, - system=None, - dt=None, - position_file=None, - velocity_file=None, - kinetic_ene_file=None, - potential_ene_file=None, - total_ene_file=None, - volume_file=None, - pressure_file=None, - temperature_file=None, - const_of_mot_file=None, - ): + units: Optional[UnitData] = None, + ensemble: Optional[EnsembleData] = None, + system: Optional[SystemData] = None, + dt: Optional[float] = None, + position_file: Optional[str] = None, + velocity_file: Optional[str] = None, + kinetic_ene_file: Optional[str] = None, + potential_ene_file: Optional[str] = None, + total_ene_file: Optional[str] = None, + volume_file: Optional[str] = None, + pressure_file: Optional[str] = None, + temperature_file: Optional[str] = None, + const_of_mot_file: Optional[str] = None, + ) -> SimulationData: r"""Read simulation data from flat files Returns a SimulationData object created from (optionally) provided UnitData, EnsembleData @@ -136,7 +146,7 @@ def get_simulation_data( return result @staticmethod - def __read_xyz(filename): + def __read_xyz(filename: str) -> List[List[List[float]]]: result = [] with open(filename) as f: frame = [] @@ -159,7 +169,7 @@ def __read_xyz(filename): return result @staticmethod - def __read_1d(filename): + def __read_1d(filename: str) -> List[float]: result = [] with open(filename) as f: for line in f: diff --git a/physical_validation/data/gromacs_parser.py b/physical_validation/data/gromacs_parser.py index 6a5b2d3..f3f5b04 100644 --- a/physical_validation/data/gromacs_parser.py +++ b/physical_validation/data/gromacs_parser.py @@ -14,6 +14,7 @@ gromacs_parser.py """ import warnings +from typing import List, Optional, Union import numpy as np @@ -37,7 +38,7 @@ class GromacsParser(parser.Parser): """ @staticmethod - def units(): + def units() -> UnitData: # Gromacs uses kJ/mol return UnitData( kb=8.314462435405199e-3, @@ -55,7 +56,9 @@ def units(): time_conversion=1.0, ) - def __init__(self, exe=None, includepath=None): + def __init__( + self, exe: Optional[str] = None, includepath: Union[str, List[str]] = None + ): r""" Create a GromacsParser object @@ -86,7 +89,14 @@ def __init__(self, exe=None, includepath=None): "constant_of_motion": "Conserved-En.", } - def get_simulation_data(self, mdp=None, top=None, edr=None, trr=None, gro=None): + def get_simulation_data( + self, + mdp: Optional[str] = None, + top: Optional[str] = None, + edr: Optional[str] = None, + trr: Optional[str] = None, + gro: Optional[str] = None, + ) -> SimulationData: r""" Parameters @@ -134,7 +144,9 @@ def get_simulation_data(self, mdp=None, top=None, edr=None, trr=None, gro=None): ).any(): raise NotImplementedError("Triclinic boxes not implemented.") else: - box = RectangularBox([np.diag(b) for b in trajectory_dict["box"]]) + box = RectangularBox( + np.array([np.diag(b) for b in trajectory_dict["box"]]) + ) else: raise RuntimeError("Unknown box shape.") trajectory_dict["box"] = box @@ -335,7 +347,7 @@ def get_simulation_data(self, mdp=None, top=None, edr=None, trr=None, gro=None): if edr is not None: observable_dict = self.__interface.get_quantities( - edr, self.__gmx_energy_names.values(), args=["-dp"] + edr, list(self.__gmx_energy_names.values()), args=["-dp"] ) # constant volume simulations don't write out the volume in .edr file diff --git a/physical_validation/data/lammps_parser.py b/physical_validation/data/lammps_parser.py index 4aa0efd..6299e1b 100644 --- a/physical_validation/data/lammps_parser.py +++ b/physical_validation/data/lammps_parser.py @@ -13,10 +13,13 @@ r""" lammps_parser.py """ +from typing import Dict, List, Optional, Union + import numpy as np from ..util import error as pv_error from . import ( + EnsembleData, ObservableData, SimulationData, SystemData, @@ -86,20 +89,25 @@ def __init__(self): ) def get_simulation_data( - self, ensemble=None, in_file=None, log_file=None, data_file=None, dump_file=None - ): + self, + ensemble: Optional[EnsembleData] = None, + in_file: Optional[str] = None, + log_file: Optional[str] = None, + data_file: Optional[str] = None, + dump_file: Optional[str] = None, + ) -> SimulationData: """ Parameters ---------- - ensemble: EnsembleData, optional - in_file: str, optional - log_file: str, optional - data_file: str, optional - dump_file: str, optional + ensemble + in_file + log_file + data_file + dump_file Returns ------- - result: SimulationData + SimulationData """ @@ -195,7 +203,9 @@ def get_simulation_data( return result @staticmethod - def __read_input_file(name): + def __read_input_file( + name: str, + ) -> Dict[str, Union[str, List[str], List[Dict[str, Union[str, List[str]]]]]]: # parse input file input_dict = {} with open(name) as f: @@ -235,7 +245,16 @@ def __read_input_file(name): return input_dict @staticmethod - def __read_data_file(name): + def __read_data_file( + name: str, + ) -> Dict[ + str, + Union[ + Dict[str, Union[float, List[float]]], + Dict[int, List[Union[str, float]]], + List[Dict[str, Union[int, float]]], + ], + ]: # > available blocks blocks = [ "Header", # 0 @@ -298,7 +317,9 @@ def __read_data_file(name): ] header_double = ["xlo xhi", "ylo yhi", "zlo zhi"] # default values + # Dict[str, float] data_dict[block] = {hs: 0 for hs in header_single} + # Dict[str, List[float]] data_dict[block].update({hd: [0.0, 0.0] for hd in header_double}) # read out for line in file_blocks[block]: @@ -322,6 +343,7 @@ def __read_data_file(name): data_dict[block] = {} for line in file_blocks[block]: line = line.split() + # Dict[int, List[Union[str, float]]] data_dict[block][int(line[0])] = [line[1]] + [ float(c) for c in line[2:] ] @@ -332,6 +354,7 @@ def __read_data_file(name): data_dict[block] = [] for line in file_blocks[block]: line = line.split() + # List[Dict[str, Union[int, float]]] if len(line) == 7: data_dict[block].append( { @@ -392,9 +415,9 @@ def __read_data_file(name): return data_dict @staticmethod - def __read_log_file(name): + def __read_log_file(name: str) -> Dict[str, List[float]]: # parse log file - def start_single(line1, line2): + def start_single(line1: str, line2: str) -> bool: if not line1.split(): return False if len(line1.split()) != len(line2.split()): @@ -405,7 +428,7 @@ def start_single(line1, line2): return False return True - def end_single(line, length): + def end_single(line: str, length: int) -> bool: if len(line.split()) != length: return True try: @@ -414,12 +437,12 @@ def end_single(line, length): return True return False - def start_multi(line): + def start_multi(line: str) -> bool: if "---- Step" in line and "- CPU =" in line: return True return False - def end_multi(line): + def end_multi(line: str) -> bool: line = line.split() # right length (is it actually always 9??) if len(line) == 0 or len(line) % 3 != 0: @@ -491,13 +514,13 @@ def end_multi(line): return ene_traj @staticmethod - def __read_dump_file(name): + def __read_dump_file(name: str) -> Dict[str, List[List[Union[float, List[float]]]]]: # parse dump file # the dictionary to be filled dump_dict = {"position": [], "velocity": [], "box": []} # helper function checking line items - def check_item(line_str, item): + def check_item(line_str: str, item: str) -> str: item = "ITEM: " + item if not line_str.startswith(item): raise pv_error.FileFormatError(name, "dump file: was expecting " + item) diff --git a/physical_validation/data/observable_data.py b/physical_validation/data/observable_data.py index 1d8bea1..7b39025 100644 --- a/physical_validation/data/observable_data.py +++ b/physical_validation/data/observable_data.py @@ -14,7 +14,7 @@ Data structures carrying simulation data. """ import warnings -from typing import Optional +from typing import Any, List, Optional import numpy as np @@ -41,7 +41,7 @@ class ObservableData(object): """ @staticmethod - def observables(): + def observables() -> List[str]: return [ "kinetic_energy", "potential_energy", @@ -54,13 +54,13 @@ def observables(): def __init__( self, - kinetic_energy=None, - potential_energy=None, - total_energy=None, - volume=None, - pressure=None, - temperature=None, - constant_of_motion=None, + kinetic_energy: Any = None, + potential_energy: Any = None, + total_energy: Any = None, + volume: Any = None, + pressure: Any = None, + temperature: Any = None, + constant_of_motion: Any = None, ): self.__kinetic_energy = None self.__potential_energy = None @@ -103,17 +103,17 @@ def __init__( self.temperature = temperature self.constant_of_motion = constant_of_motion - def __getitem__(self, key): + def __getitem__(self, key: str) -> np.ndarray: if key not in self.observables(): raise KeyError return self.__getters[key](self) - def __setitem__(self, key, value): + def __setitem__(self, key: str, value: Any) -> None: if key not in self.observables(): raise KeyError self.__setters[key](self, value) - def __check_value(self, value, key: str) -> Optional[np.ndarray]: + def __check_value(self, value: Any, key: str) -> Optional[np.ndarray]: if value is None: return None value = np.array(value) @@ -126,81 +126,81 @@ def __check_value(self, value, key: str) -> Optional[np.ndarray]: return value @property - def kinetic_energy(self): + def kinetic_energy(self) -> np.ndarray: """Get kinetic_energy""" return self.__kinetic_energy @kinetic_energy.setter - def kinetic_energy(self, kinetic_energy): + def kinetic_energy(self, kinetic_energy: Any) -> None: """Set kinetic_energy""" self.__kinetic_energy = self.__check_value(kinetic_energy, "kinetic_energy") @property - def potential_energy(self): + def potential_energy(self) -> np.ndarray: """Get potential_energy""" return self.__potential_energy @potential_energy.setter - def potential_energy(self, potential_energy): + def potential_energy(self, potential_energy: Any) -> None: """Set potential_energy""" self.__potential_energy = self.__check_value( potential_energy, "potential_energy" ) @property - def total_energy(self): + def total_energy(self) -> np.ndarray: """Get total_energy""" return self.__total_energy @total_energy.setter - def total_energy(self, total_energy): + def total_energy(self, total_energy: Any) -> None: """Set total_energy""" self.__total_energy = self.__check_value(total_energy, "total_energy") @property - def volume(self): + def volume(self) -> np.ndarray: """Get volume""" return self.__volume @volume.setter - def volume(self, volume): + def volume(self, volume: Any) -> None: """Set volume""" self.__volume = self.__check_value(volume, "volume") @property - def pressure(self): + def pressure(self) -> np.ndarray: """Get pressure""" return self.__pressure @pressure.setter - def pressure(self, pressure): + def pressure(self, pressure: Any) -> None: """Set pressure""" self.__pressure = self.__check_value(pressure, "pressure") @property - def temperature(self): + def temperature(self) -> np.ndarray: """Get temperature""" return self.__temperature @temperature.setter - def temperature(self, temperature): + def temperature(self, temperature: Any) -> None: """Set temperature""" self.__temperature = self.__check_value(temperature, "temperature") @property - def constant_of_motion(self): + def constant_of_motion(self) -> np.ndarray: """Get constant_of_motion""" return self.__constant_of_motion @constant_of_motion.setter - def constant_of_motion(self, constant_of_motion): + def constant_of_motion(self, constant_of_motion: Any) -> None: """Set constant_of_motion""" self.__constant_of_motion = self.__check_value( constant_of_motion, "constant_of_motion" ) @property - def nframes(self): + def nframes(self) -> Optional[int]: """Get number of frames""" frames = None for observable in ObservableData.observables(): @@ -216,16 +216,16 @@ def nframes(self): return frames @property - def kinetic_energy_per_molecule(self): + def kinetic_energy_per_molecule(self) -> Optional[np.ndarray]: """Get kinetic_energy per molecule - used internally""" return self.__kinetic_energy_per_molec @kinetic_energy_per_molecule.setter - def kinetic_energy_per_molecule(self, kinetic_energy): + def kinetic_energy_per_molecule(self, kinetic_energy: Optional[np.ndarray]) -> None: """Set kinetic_energy per molecule - used internally""" self.__kinetic_energy_per_molec = kinetic_energy - def __eq__(self, other): + def __eq__(self, other) -> bool: if type(other) is not type(self): return False diff --git a/physical_validation/data/parser.py b/physical_validation/data/parser.py index 0ebd64e..b0c2913 100644 --- a/physical_validation/data/parser.py +++ b/physical_validation/data/parser.py @@ -14,6 +14,7 @@ Parsers read output files from MD simulation packages and create SimulationData objects with their contents. """ +from . import SimulationData class Parser(object): @@ -21,5 +22,5 @@ class Parser(object): Parser base class """ - def get_simulation_data(self): + def get_simulation_data(self) -> SimulationData: raise NotImplementedError diff --git a/physical_validation/data/simulation_data.py b/physical_validation/data/simulation_data.py index 53eb792..cea8a62 100644 --- a/physical_validation/data/simulation_data.py +++ b/physical_validation/data/simulation_data.py @@ -13,6 +13,8 @@ r""" Data structures carrying simulation data. """ +from typing import Optional + from ..util import error as pv_error from . import EnsembleData, ObservableData, SystemData, TrajectoryData, UnitData @@ -29,7 +31,7 @@ class SimulationData(object): """ @staticmethod - def compatible(data_1, data_2): + def compatible(data_1, data_2) -> bool: r"""Checks whether two simulations are compatible for common validation. Parameters @@ -78,7 +80,7 @@ def __init__( self.trajectory = trajectory @property - def ensemble(self): + def ensemble(self) -> EnsembleData: r"""EnsembleData: Information on the sampled ensemble Returns @@ -88,7 +90,7 @@ def ensemble(self): return self.__ensemble @ensemble.setter - def ensemble(self, ensemble): + def ensemble(self, ensemble: EnsembleData) -> None: if not isinstance(ensemble, EnsembleData): raise TypeError( "No known conversion from " + str(type(ensemble)) + "to EnsembleData" @@ -96,7 +98,7 @@ def ensemble(self, ensemble): self.__ensemble = ensemble @property - def units(self): + def units(self) -> UnitData: r"""UnitsData: Information on the sampled units Returns @@ -106,7 +108,7 @@ def units(self): return self.__units @units.setter - def units(self, units): + def units(self, units: UnitData) -> None: if not isinstance(units, UnitData): raise TypeError( "No known conversion from " + str(type(units)) + "to UnitData" @@ -114,7 +116,7 @@ def units(self, units): self.__units = units @property - def observables(self): + def observables(self) -> ObservableData: r"""ObservableData: Observables collected during the simulation Returns @@ -124,7 +126,7 @@ def observables(self): return self.__observables @observables.setter - def observables(self, observables): + def observables(self, observables: ObservableData) -> None: if not isinstance(observables, ObservableData): raise TypeError( "No known conversion from " @@ -134,7 +136,7 @@ def observables(self, observables): self.__observables = observables @property - def trajectory(self): + def trajectory(self) -> TrajectoryData: r"""TrajectoryData: Trajectories collected during the simulation Returns @@ -144,7 +146,7 @@ def trajectory(self): return self.__trajectory @trajectory.setter - def trajectory(self, trajectory): + def trajectory(self, trajectory: TrajectoryData) -> None: if not isinstance(trajectory, TrajectoryData): raise TypeError( "No known conversion from " @@ -154,7 +156,7 @@ def trajectory(self, trajectory): self.__trajectory = trajectory @property - def system(self): + def system(self) -> SystemData: r"""SystemData: Information on the system's system Returns @@ -164,7 +166,7 @@ def system(self): return self.__system @system.setter - def system(self, system): + def system(self, system: SystemData) -> None: if not isinstance(system, SystemData): raise TypeError( "No known conversion from " + str(type(system)) + "to SystemData" @@ -172,7 +174,7 @@ def system(self, system): self.__system = system @property - def dt(self): + def dt(self) -> float: r"""The timestep of the simulation run. Returns @@ -182,25 +184,25 @@ def dt(self): return self.__dt @dt.setter - def dt(self, dt): + def dt(self, dt: float) -> None: dt = float(dt) self.__dt = dt def set_ensemble( self, - ensemble, - natoms=None, - mu=None, - volume=None, - pressure=None, - energy=None, - temperature=None, - ): + ensemble: str, + natoms: Optional[float] = None, + mu: Optional[float] = None, + volume: Optional[float] = None, + pressure: Optional[float] = None, + energy: Optional[float] = None, + temperature: Optional[float] = None, + ) -> None: self.__ensemble = EnsembleData( ensemble, natoms, mu, volume, pressure, energy, temperature ) - def __eq__(self, other): + def __eq__(self, other) -> bool: if type(other) is not type(self): return False return ( diff --git a/physical_validation/data/system_data.py b/physical_validation/data/system_data.py index 2dbd1bd..f0aeca1 100644 --- a/physical_validation/data/system_data.py +++ b/physical_validation/data/system_data.py @@ -14,6 +14,7 @@ Data structure carrying information on the simulated system. """ import warnings +from typing import Dict, List, Optional import numpy as np @@ -67,13 +68,13 @@ class SystemData(object): def __init__( self, - natoms=None, - nconstraints=None, - ndof_reduction_tra=None, - ndof_reduction_rot=None, - mass=None, - molecule_idx=None, - nconstraints_per_molecule=None, + natoms: Optional[int] = None, + nconstraints: Optional[float] = None, + ndof_reduction_tra: Optional[float] = None, + ndof_reduction_rot: Optional[float] = None, + mass: Optional[np.ndarray] = None, + molecule_idx: Optional[np.ndarray] = None, + nconstraints_per_molecule: Optional[np.ndarray] = None, ): self.__natoms = None self.__nconstraints = None @@ -102,16 +103,16 @@ def __init__( self.nconstraints_per_molecule = nconstraints_per_molecule @property - def natoms(self): + def natoms(self) -> int: """int: Number of atoms in the system""" return self.__natoms @natoms.setter - def natoms(self, natoms): + def natoms(self, natoms: int) -> None: self.__natoms = int(natoms) @property - def nconstraints(self): + def nconstraints(self) -> float: """float: Total number of constraints in the system Does not include the reduction of degrees of freedom in the absence of @@ -121,11 +122,11 @@ def nconstraints(self): return self.__nconstraints @nconstraints.setter - def nconstraints(self, nconstraints): + def nconstraints(self, nconstraints: float) -> None: self.__nconstraints = float(nconstraints) @property - def ndof_reduction_tra(self): + def ndof_reduction_tra(self) -> float: """float: Number of translational degrees of freedom deducted from 3*[# of molecules] @@ -133,11 +134,11 @@ def ndof_reduction_tra(self): return self.__ndof_reduction_tra @ndof_reduction_tra.setter - def ndof_reduction_tra(self, ndof_reduction_tra): + def ndof_reduction_tra(self, ndof_reduction_tra: float) -> None: self.__ndof_reduction_tra = float(ndof_reduction_tra) @property - def ndof_reduction_rot(self): + def ndof_reduction_rot(self) -> float: """float: Number of rotational degrees of freedom deducted from 3*[# of molecules] @@ -145,11 +146,11 @@ def ndof_reduction_rot(self): return self.__ndof_reduction_rot @ndof_reduction_rot.setter - def ndof_reduction_rot(self, ndof_reduction_rot): + def ndof_reduction_rot(self, ndof_reduction_rot: float) -> None: self.__ndof_reduction_rot = float(ndof_reduction_rot) @property - def mass(self): + def mass(self) -> np.ndarray: """nd-array: Mass vector for the atoms Setter accepts array-like objects. @@ -158,7 +159,7 @@ def mass(self): return self.__mass @mass.setter - def mass(self, mass): + def mass(self, mass: np.ndarray) -> None: mass = np.asarray(mass) if mass.ndim != 1: raise pv_error.InputError("mass", "Expected 1-dimensional array.") @@ -171,7 +172,7 @@ def mass(self, mass): self.__mass = mass @property - def molecule_idx(self): + def molecule_idx(self) -> np.ndarray: """nd-array: List of index of first atom of each molecule Setter accepts array-like objects. @@ -180,7 +181,7 @@ def molecule_idx(self): return self.__molecule_idx @molecule_idx.setter - def molecule_idx(self, molecule_idx): + def molecule_idx(self, molecule_idx: np.ndarray) -> None: molecule_idx = np.asarray(molecule_idx) if molecule_idx.ndim != 1: raise pv_error.InputError("molecule_idx", "Expected 1-dimensional array.") @@ -198,7 +199,7 @@ def molecule_idx(self, molecule_idx): self.__molecule_idx = molecule_idx @property - def nconstraints_per_molecule(self): + def nconstraints_per_molecule(self) -> np.ndarray: """nd-array: List of number of constraints per molecule Setter accepts array-like objects. @@ -207,7 +208,7 @@ def nconstraints_per_molecule(self): return self.__nconstraints_per_molecule @nconstraints_per_molecule.setter - def nconstraints_per_molecule(self, nconstraints_per_molecule): + def nconstraints_per_molecule(self, nconstraints_per_molecule: np.ndarray) -> None: nconstraints_per_molecule = np.array(nconstraints_per_molecule) if nconstraints_per_molecule.ndim != 1: raise pv_error.InputError( @@ -224,38 +225,36 @@ def nconstraints_per_molecule(self, nconstraints_per_molecule): self.__nconstraints_per_molecule = nconstraints_per_molecule @property - def ndof_per_molecule(self): - """nd-array: List of number of degrees of freedom per molecule - - Setter accepts array-like objects. - - """ + def ndof_per_molecule(self) -> Optional[List[Dict[str, float]]]: + """nd-array: List of number of degrees of freedom per molecule""" return self.__ndof_per_molecule @ndof_per_molecule.setter - def ndof_per_molecule(self, ndof_per_molecule): + def ndof_per_molecule( + self, ndof_per_molecule: Optional[List[Dict[str, float]]] + ) -> None: # used internally - check for consistency? self.__ndof_per_molecule = ndof_per_molecule @property - def bonds(self): + def bonds(self) -> List[List[int]]: """List[List[int]]: List of bonds per molecule""" return self.__bonds @bonds.setter - def bonds(self, bonds): + def bonds(self, bonds: List[List[int]]) -> None: self.__bonds = bonds @property - def constrained_bonds(self): + def constrained_bonds(self) -> List[List[int]]: """List[List[int]]: List of constrained bonds per molecule""" return self.__constrained_bonds @constrained_bonds.setter - def constrained_bonds(self, constrained_bonds): + def constrained_bonds(self, constrained_bonds: List[List[int]]) -> None: self.__constrained_bonds = constrained_bonds - def __eq__(self, other): + def __eq__(self, other) -> bool: if type(other) is not type(self): return False return ( diff --git a/physical_validation/data/trajectory_data.py b/physical_validation/data/trajectory_data.py index dbb5fa9..5250de7 100644 --- a/physical_validation/data/trajectory_data.py +++ b/physical_validation/data/trajectory_data.py @@ -13,7 +13,7 @@ r""" Data structures carrying simulation data. """ -from typing import Optional +from typing import Any, List, Optional, Tuple import numpy as np @@ -22,31 +22,27 @@ class RectangularBox: - def __init__(self, box=None): + def __init__(self, box: np.ndarray): self.__box = None self.__nframes = 0 - if box is not None: - self.box = box + assert 0 < box.ndim < 3 + if box.ndim == 1: + assert box.size == 3 + self.__box = box + self.__nframes = 1 + elif box.ndim == 2: + assert box.shape[1] == 3 + self.__box = box + self.__nframes = box.shape[0] @property def box(self): return self.__box - @box.setter - def box(self, b): - b = np.array(b) - assert 0 < b.ndim < 3 - if b.ndim == 1: - assert b.size == 3 - self.__box = b - self.__nframes = 1 - elif b.ndim == 2: - assert b.shape[1] == 3 - self.__box = b - self.__nframes = b.shape[0] - - def gather(self, positions, bonds, molec_idx): + def gather( + self, positions: np.ndarray, bonds: List[List[int]], molec_idx: List[int] + ): bonds = np.array(bonds) if bonds.size == 0: return positions @@ -62,9 +58,9 @@ def gather(self, positions, bonds, molec_idx): for f in range(nframes): p = positions[f] if self.__nframes > 1: - box = self.box[f] + box = self.__box[f] else: - box = self.box[0] + box = self.__box[0] assert len(bonds) == len(molec_idx) for mbonds, idx in zip(bonds, molec_idx): for b in mbonds: @@ -96,10 +92,10 @@ class TrajectoryData(object): """ @staticmethod - def trajectories(): + def trajectories() -> Tuple[str, str]: return "position", "velocity" - def __init__(self, position=None, velocity=None): + def __init__(self, position: Optional[Any] = None, velocity: Optional[Any] = None): self.__position = None self.__velocity = None self.__nframes = None @@ -124,17 +120,17 @@ def __init__(self, position=None, velocity=None): if velocity is not None: self.velocity = velocity - def __getitem__(self, key): + def __getitem__(self, key: str): if key not in self.trajectories(): raise KeyError return self.__getters[key](self) - def __setitem__(self, key, value): + def __setitem__(self, key: str, value: Any): if key not in self.trajectories(): raise KeyError self.__setters[key](self, value) - def __check_value(self, value, key: str) -> Optional[np.ndarray]: + def __check_value(self, value: Any, key: str) -> Optional[np.ndarray]: value = np.array(value) if value.ndim == 2: # create 3-dimensional array @@ -160,26 +156,26 @@ def __check_value(self, value, key: str) -> Optional[np.ndarray]: return value @property - def position(self): + def position(self) -> np.ndarray: """Get position""" return self.__position @position.setter - def position(self, pos): + def position(self, pos: Any) -> None: """Set position""" self.__position = self.__check_value(pos, "position") @property - def velocity(self): + def velocity(self) -> np.ndarray: """Get velocity""" return self.__velocity @velocity.setter - def velocity(self, vel): + def velocity(self, vel: Any) -> None: """Set velocity""" self.__velocity = self.__check_value(vel, "velocity") - def __eq__(self, other): + def __eq__(self, other) -> bool: if type(other) is not type(self): return False diff --git a/physical_validation/data/unit_data.py b/physical_validation/data/unit_data.py index d4e9ed6..e692637 100644 --- a/physical_validation/data/unit_data.py +++ b/physical_validation/data/unit_data.py @@ -13,6 +13,7 @@ r""" Data structures carrying simulation data. """ +from typing import Optional class UnitData(object): @@ -32,19 +33,19 @@ class UnitData(object): def __init__( self, - kb, - energy_conversion, - length_conversion, - volume_conversion, - temperature_conversion, - pressure_conversion, - time_conversion, - energy_str="ENE", - length_str="LEN", - volume_str="VOL", - temperature_str="TEMP", - pressure_str="PRESS", - time_str="TIME", + kb: float, + energy_conversion: float, + length_conversion: float, + volume_conversion: float, + temperature_conversion: float, + pressure_conversion: float, + time_conversion: float, + energy_str: str = "ENE", + length_str: str = "LEN", + volume_str: str = "VOL", + temperature_str: str = "TEMP", + pressure_str: str = "PRESS", + time_str: str = "TIME", ): self.__kb = float(kb) @@ -68,7 +69,7 @@ def __parsers(): return {"GROMACS": GromacsParser} @classmethod - def units(cls, name=None): + def units(cls, name: Optional[str] = None): if name is None: return cls.__parsers().keys() @@ -77,7 +78,7 @@ def units(cls, name=None): else: raise KeyError("Name " + name + " does not match a registred unit type.") - def __eq__(self, other): + def __eq__(self, other) -> bool: if type(other) is not type(self): return False @@ -92,66 +93,66 @@ def __eq__(self, other): ) @property - def kb(self): + def kb(self) -> float: """float: The value of the Boltzmann constant""" return self.__kb @property - def energy_str(self): + def energy_str(self) -> str: """str: Energy unit""" return self.__energy_str @property - def length_str(self): + def length_str(self) -> str: """str: Length unit""" return self.__length_str @property - def volume_str(self): + def volume_str(self) -> str: """str: Volume unit""" return self.__volume_str @property - def temperature_str(self): + def temperature_str(self) -> str: """str: Temperature unit""" return self.__temperature_str @property - def pressure_str(self): + def pressure_str(self) -> str: """str: Pressure unit""" return self.__pressure_str @property - def time_str(self): + def time_str(self) -> str: """str: Time unit""" return self.__time_str @property - def energy_conversion(self): + def energy_conversion(self) -> float: """float: Energy conversion factor, 1 energy_unit = energy_conversion * kJ/mol""" return self.__energy_conversion @property - def length_conversion(self): + def length_conversion(self) -> float: """float: Length conversion factor, 1 length_unit = length_conversion * nm""" return self.__length_conversion @property - def volume_conversion(self): + def volume_conversion(self) -> float: """float: Volume conversion factor, 1 volume_unit = volume_conversion * nm^3""" return self.__volume_conversion @property - def temperature_conversion(self): + def temperature_conversion(self) -> float: """float: Temperature conversion factor, 1 temperature_unit = temperature_conversion * K""" return self.__temperature_conversion @property - def pressure_conversion(self): + def pressure_conversion(self) -> float: """float: Pressure conversion factor, 1 pressure_unit = pressure_conversion * bar""" return self.__pressure_conversion @property - def time_conversion(self): + def time_conversion(self) -> float: """float: Time conversion factor, 1 time_unit = time_conversion * ps""" return self.__time_conversion diff --git a/physical_validation/ensemble.py b/physical_validation/ensemble.py index 74872f4..a3d3d51 100644 --- a/physical_validation/ensemble.py +++ b/physical_validation/ensemble.py @@ -18,8 +18,9 @@ Users should cite the JCTC paper: Shirts, M. R. "Simple Quantitative Tests to Validate Sampling from Thermodynamic Ensembles", J. Chem. Theory Comput., 2013, 9 (2), pp 909-926, -http://dx.doi.org/10.1021/ct300688p +https://dx.doi.org/10.1021/ct300688p """ +from typing import Dict, List, Optional import numpy as np @@ -29,45 +30,50 @@ def check( - data_sim_one, - data_sim_two, - total_energy=False, - bootstrap_error=False, - bootstrap_repetitions=200, - bootstrap_seed=None, - screen=False, - filename=None, - verbosity=1, - data_is_uncorrelated=False, -): + data_sim_one: SimulationData, + data_sim_two: SimulationData, + total_energy: bool = False, + bootstrap_error: bool = False, + bootstrap_repetitions: int = 200, + bootstrap_seed: Optional[int] = None, + screen: bool = False, + filename: Optional[str] = None, + verbosity: int = 1, + data_is_uncorrelated: bool = False, +) -> List[float]: r""" Check the ensemble. The correct check is inferred from the simulation data given. Parameters ---------- - data_sim_one : SimulationData - data_sim_two : SimulationData - total_energy : bool - bootstrap_error : bool + data_sim_one + Simulation data object of first simulation + data_sim_two + Simulation data object of second simulation differing in + its state point from the first + total_energy + Whether to use the total energy for the calculation + Default: False, use potential energy only + bootstrap_error Calculate the standard error via bootstrap resampling Default: False - bootstrap_repetitions : int + bootstrap_repetitions Number of bootstrap repetitions drawn Default: 200 - bootstrap_seed : int + bootstrap_seed Sets the random number seed for bootstrapping. If set, bootstrapping will be reproducible. Default: None, bootstrapping is non-reproducible. - screen : bool + screen Plot distributions on screen. Default: False. - filename : string + filename Plot distributions to `filename`. - Default: None, no plotting to file. - verbosity : int + Default: `None`, no plotting to file. + verbosity Level of verbosity, from 0 (quiet) to 3 (very verbose). Default: 1 - data_is_uncorrelated : bool, optional + data_is_uncorrelated Whether the provided data is uncorrelated. If this option is set, the equilibration, decorrelation and tail pruning of the trajectory is skipped. This can speed up the analysis, @@ -76,7 +82,7 @@ def check( Returns ------- - quantiles : List[float] + quantiles The number of quantiles the computed result is off the analytical one. """ @@ -122,6 +128,9 @@ def check( quantiles = None + num_bins_for_analysis = 40 + tail_cutoff = 0.001 # 0.1% + if sampled_ensemble == "NVT": quantiles = ensemble.check_1d( traj1=e1, @@ -132,6 +141,11 @@ def check( quantity=eneq, dtemp=True, dpress=False, + dmu=False, + temp=None, + pvconvert=None, + nbins=num_bins_for_analysis, + cutoff=tail_cutoff, bootstrap_seed=bootstrap_seed, bootstrap_error=bootstrap_error, bootstrap_repetitions=bootstrap_repetitions, @@ -193,6 +207,11 @@ def check( quantity=eneq, dtemp=True, dpress=False, + dmu=False, + temp=None, + pvconvert=None, + nbins=num_bins_for_analysis, + cutoff=tail_cutoff, bootstrap_seed=bootstrap_seed, bootstrap_error=bootstrap_error, bootstrap_repetitions=bootstrap_repetitions, @@ -213,6 +232,9 @@ def check( quantity="V", dtemp=False, dpress=True, + dmu=False, + nbins=num_bins_for_analysis, + cutoff=tail_cutoff, bootstrap_seed=bootstrap_seed, temp=temperatures[0], pvconvert=pvconvert, @@ -239,6 +261,8 @@ def check( pvconvert=pvconvert, quantity=[eneq, "V"], dtempdpress=True, + dtempdmu=False, + cutoff=tail_cutoff, bootstrap_seed=bootstrap_seed, bootstrap_error=bootstrap_error, bootstrap_repetitions=bootstrap_repetitions, @@ -252,8 +276,11 @@ def check( def estimate_interval( - data, verbosity=1, total_energy=False, data_is_uncorrelated=False -): + data: SimulationData, + verbosity: int = 1, + total_energy: bool = False, + data_is_uncorrelated: bool = False, +) -> Dict[str, float]: r""" In order to perform an ensemble check, two simulations at distinct state point are needed. Choosing two state points too far apart will result @@ -269,26 +296,27 @@ def estimate_interval( Parameters ---------- - data : SimulationData + data The performed simulation. - verbosity : int, optional + verbosity If 0, no output is printed on screen. If 1, estimated intervals are printed. If larger, additional information during calculation are printed. Default: 1 - total_energy : bool, optional + total_energy Use total energy instead of potential energy only. Default: False - data_is_uncorrelated : bool, optional + data_is_uncorrelated Whether the provided data is uncorrelated. If this option is set, the equilibration, decorrelation and tail pruning of the trajectory is skipped. This can speed up the analysis, but note that if the provided data is correlated, the results of the physical validation checks might be invalid. + Default: False Returns ------- - intervals : Dict + intervals If `data` was performed under NVT conditions, `intervals` contains only one entry: @@ -308,14 +336,21 @@ def estimate_interval( else: ene = data.observables.potential_energy + tail_cutoff = 0.001 # 0.1% + if data.ensemble.ensemble == "NVT": result = ensemble.estimate_interval( ens_string="NVT", ens_temp=data.ensemble.temperature, energy=ene, kb=data.units.kb, + ens_press=None, + volume=None, + pvconvert=None, verbosity=verbosity, + cutoff=tail_cutoff, tunit=data.units.temperature_str, + punit="", data_is_uncorrelated=data_is_uncorrelated, ) elif data.ensemble.ensemble == "NPT": @@ -331,6 +366,7 @@ def estimate_interval( volume=data.observables.volume, pvconvert=pvconvert, verbosity=verbosity, + cutoff=tail_cutoff, tunit=data.units.temperature_str, punit=data.units.pressure_str, data_is_uncorrelated=data_is_uncorrelated, diff --git a/physical_validation/integrator.py b/physical_validation/integrator.py index 1123fb3..8c052d3 100644 --- a/physical_validation/integrator.py +++ b/physical_validation/integrator.py @@ -14,42 +14,43 @@ The `integratorconvergence` module is part of the physical_validation package, and consists of checks of the convergence of the MD integrator. """ +from typing import List, Optional + from .data import SimulationData from .util import error as pv_error from .util import integrator as util_integ def convergence( - simulations, - convergence_test="max_deviation", - verbose=True, - slope=False, - screen=False, - filename=None, -): + simulations: List[SimulationData], + convergence_test: str = "max_deviation", + verbose: bool = True, + screen: bool = False, + filename: Optional[str] = None, +) -> float: r""" Compares the convergence of the fluctuations of conserved quantities with decreasing simulation time step to theoretical expectations. Parameters ---------- - simulations : list of SimulationData + simulations The (otherwise identical) simulations performed using different - timesteps - convergence_test : str, optional + time steps + convergence_test A function defining the convergence test. Currently, only one test is implemented: - `max_deviation` - verbose : bool, optional - If True, print more detailed output. - screen : bool + `max_deviation`, which is chosen by default + verbose + If True, print more detailed output. Default: False. + screen Plot convergence on screen. Default: False. - filename : string + filename Plot convergence to `filename`. Default: None, no plotting to file. Returns ------- - result : float + The largest deviation from the expected ratio of fluctuations. Notes ----- @@ -101,7 +102,6 @@ def convergence( constant_of_motion, convergence_test=convergence_test, verbose=verbose, - slope=slope, screen=screen, filename=filename, ) diff --git a/physical_validation/kinetic_energy.py b/physical_validation/kinetic_energy.py index b52550e..a4603b1 100644 --- a/physical_validation/kinetic_energy.py +++ b/physical_validation/kinetic_energy.py @@ -15,44 +15,48 @@ consists of checks of the kinetic energy distribution and its equipartition. """ -from .data import ObservableData +from typing import List, Optional, Tuple, Union + +import numpy as np + +from .data import ObservableData, SimulationData from .util import kinetic_energy as util_kin def distribution( - data, - strict=False, - verbosity=2, - screen=False, - filename=None, - bs_repetitions=200, - bootstrap_seed=None, - data_is_uncorrelated=False, -): + data: SimulationData, + strict: bool = False, + verbosity: int = 2, + screen: bool = False, + filename: Optional[str] = None, + bs_repetitions: int = 200, + bootstrap_seed: Optional[int] = None, + data_is_uncorrelated: bool = False, +) -> Union[float, Tuple[float, float]]: r"""Checks the distribution of a kinetic energy trajectory. Parameters ---------- - data : SimulationData + data Simulation data object - strict : bool + strict If True, check full kinetic energy distribution via K-S test. Otherwise, check mean and width of kinetic energy distribution. Default: False - verbosity : int, optional + verbosity Verbosity level, where 0 is quiet and 3 shows full details. Default: 2. - screen : bool, optional + screen Plot distributions on screen. Default: False. - filename : string, optional + filename Plot distributions to `filename`. Default: None, no plotting to file. - bs_repetitions : int + bs_repetitions Number of bootstrap samples used for error estimate (if strict=False). Default: 200. - bootstrap_seed : int + bootstrap_seed Sets the random number seed for bootstrapping (if strict=False). If set, bootstrapping will be reproducible. - Default: None, bootstrapping is non-reproducible. - data_is_uncorrelated : bool, optional + Default: `None`, bootstrapping is non-reproducible. + data_is_uncorrelated Whether the provided data is uncorrelated. If this option is set, the equilibration, decorrelation and tail pruning of the trajectory is skipped. This can speed up the analysis, @@ -61,7 +65,7 @@ def distribution( Returns ------- - result : float or Tuple[float] + result If `strict=True`: The p value of the test. If `strict=False`: Distance of the estimated T(mu) and T(sigma) from the expected temperature, measured in standard deviations of the @@ -161,54 +165,54 @@ def distribution( def equipartition( - data, - strict=False, - molec_groups=None, - random_divisions=0, - random_groups=0, - random_division_seed=None, - verbosity=2, - screen=False, - filename=None, - bootstrap_seed=None, - data_is_uncorrelated=False, -): + data: SimulationData, + strict: bool = False, + molec_groups: Optional[List[np.ndarray]] = None, + random_divisions: int = 0, + random_groups: int = 0, + random_division_seed: Optional[int] = None, + verbosity: int = 2, + screen: bool = False, + filename: Optional[str] = None, + bootstrap_seed: Optional[int] = None, + data_is_uncorrelated: bool = False, +) -> Union[List[float], List[Tuple[float, float]]]: r"""Checks the equipartition of a simulation trajectory. Parameters ---------- - data : SimulationData + data Simulation data object - strict : bool, optional + strict If True, check full kinetic energy distribution via K-S test. Otherwise, check mean and width of kinetic energy distribution. Default: False - molec_groups : list of array-like (ngroups x ?), optional - List of 1d arrays containing molecule indeces defining groups. Useful to pre-define + molec_groups + List of 1d arrays containing molecule indices defining groups. Useful to pre-define groups of molecules (e.g. solute / solvent, liquid mixture species, ...). If None, no pre-defined molecule groups will be tested. Default: None. *Note:* If an empty 1d array is found as last element in the list, the remaining molecules are collected in this array. This allows, for example, to only specify the solute, and indicate the solvent by giving an empty array. - random_divisions : int, optional + random_divisions Number of random division tests attempted. Default: 0 (random division tests off). - random_groups : int, optional + random_groups Number of groups the system is randomly divided in. Default: 2. - random_division_seed : int, optional + random_division_seed Seed making the random divisions reproducible. Default: None, random divisions not reproducible - verbosity : int, optional + verbosity Verbosity level, where 0 is quiet and 3 very chatty. Default: 2. - screen : bool + screen Plot distributions on screen. Default: False. - filename : string + filename Plot distributions to `filename`. Default: None, no plotting to file - bootstrap_seed : int + bootstrap_seed Sets the random number seed for bootstrapping (if strict=False). If set, bootstrapping will be reproducible. Default: None, bootstrapping is non-reproducible. - data_is_uncorrelated : bool, optional + data_is_uncorrelated Whether the provided data is uncorrelated. If this option is set, the equilibration, decorrelation and tail pruning of the trajectory is skipped. This can speed up the analysis, @@ -217,7 +221,7 @@ def equipartition( Returns ------- - result : List[float] or List[Tuple[float]] + result If `strict=True`: The p value for every tests. If `strict=False`: Distance of the estimated T(mu) and T(sigma) from the expected temperature, measured in standard deviations of the diff --git a/physical_validation/tests/test_kinetic_energy_regression/test_kinetic_energy_regression_equipartition.txt b/physical_validation/tests/test_kinetic_energy_regression/test_kinetic_energy_regression_equipartition.txt index b4ca700..df339c3 100644 --- a/physical_validation/tests/test_kinetic_energy_regression/test_kinetic_energy_regression_equipartition.txt +++ b/physical_validation/tests/test_kinetic_energy_regression/test_kinetic_energy_regression_equipartition.txt @@ -6,103 +6,103 @@ Equipartition: Testing group-wise kinetic energies (strict) * total: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.746394 +p = 0.748702 * translational: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.688716 +p = 0.688002 * rotational and internal: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.644841 +p = 0.646918 * rotational: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.865841 +p = 0.865427 * internal: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.824824 +p = 0.826656 Equipartition: Testing group-wise kinetic energies (strict) * total: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.746394 +p = 0.748702 * translational: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.688716 +p = 0.688002 * rotational and internal: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.644841 +p = 0.646918 * rotational: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.865841 +p = 0.865427 * internal: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.824824 +p = 0.826656 Testing randomly divided group 0 Equipartition: Testing group-wise kinetic energies (strict) * total: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.96101 +p = 0.960578 * translational: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.082419 +p = 0.0825405 * rotational and internal: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.387798 +p = 0.386946 * rotational: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.785126 +p = 0.785605 * internal: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.772364 +p = 0.771477 Testing randomly divided group 1 Equipartition: Testing group-wise kinetic energies (strict) * total: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.810328 +p = 0.811302 * translational: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.159323 +p = 0.159093 * rotational and internal: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.913282 +p = 0.913783 * rotational: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.655927 +p = 0.656605 * internal: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.703896 +p = 0.704896 Testing predifined divided group 0 Equipartition: Testing group-wise kinetic energies (strict) * total: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.810328 +p = 0.811302 * translational: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.159323 +p = 0.159093 * rotational and internal: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.913282 +p = 0.913783 * rotational: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.655927 +p = 0.656605 * internal: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.703896 +p = 0.704896 Testing predifined divided group 1 Equipartition: Testing group-wise kinetic energies (strict) * total: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.96101 +p = 0.960578 * translational: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.082419 +p = 0.0825405 * rotational and internal: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.387798 +p = 0.386946 * rotational: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.785126 +p = 0.785605 * internal: Equilibration, decorrelation and tail pruning was skipped on user request. Note that if the provided trajectory is statistically correlated, the results of the physical validation checks might be invalid. -p = 0.772364 +p = 0.771477 ## Validating kinetic energy equipartition (non-strict) Equipartition: Testing group-wise kinetic energies (non-strict) diff --git a/physical_validation/tests/test_kinetic_energy_regression/test_kinetic_energy_regression_equipartition.yml b/physical_validation/tests/test_kinetic_energy_regression/test_kinetic_energy_regression_equipartition.yml index 392d2c4..c6aef05 100644 --- a/physical_validation/tests/test_kinetic_energy_regression/test_kinetic_energy_regression_equipartition.yml +++ b/physical_validation/tests/test_kinetic_energy_regression/test_kinetic_energy_regression_equipartition.yml @@ -1,2 +1,2 @@ non-strict: '[(0.013067,0.796377),(0.406843,0.588891),(0.154599,0.128358),(0.004732,0.312300),(0.169101,0.353289),(0.165436,0.121112),(0.584908,0.247993),(0.405995,0.191838),(0.303447,0.017071),(0.571374,0.852889),(0.196023,0.605645),(0.008894,1.070967),(0.212210,0.346322),(0.311499,0.649264),(0.366991,0.737305),(0.196023,0.605645),(0.008894,1.070967),(0.212210,0.346322),(0.311499,0.649264),(0.366991,0.737305),(0.165436,0.121112),(0.584908,0.247993),(0.405995,0.191838),(0.303447,0.017071),(0.571374,0.852889)]' -strict: '[0.746394,0.688716,0.644841,0.865841,0.824824,0.961010,0.082419,0.387798,0.785126,0.772364,0.810328,0.159323,0.913282,0.655927,0.703896,0.810328,0.159323,0.913282,0.655927,0.703896,0.961010,0.082419,0.387798,0.785126,0.772364]' +strict: '[0.748702,0.688002,0.646918,0.865427,0.826656,0.960578,0.082541,0.386946,0.785605,0.771477,0.811302,0.159093,0.913783,0.656605,0.704896,0.811302,0.159093,0.913783,0.656605,0.704896,0.960578,0.082541,0.386946,0.785605,0.771477]' diff --git a/physical_validation/tests/test_util_kinetic_energy.py b/physical_validation/tests/test_util_kinetic_energy.py index 149bc13..2fa7830 100644 --- a/physical_validation/tests/test_util_kinetic_energy.py +++ b/physical_validation/tests/test_util_kinetic_energy.py @@ -13,12 +13,7 @@ r""" This file contains tests for the `physical_validation.util.kinetic_energy` module. """ -import itertools - -import pytest - -from ..util import error as pv_error -from ..util.kinetic_energy import is_close, temperature +from ..util.kinetic_energy import is_close class TestIsClose: @@ -39,15 +34,15 @@ def test_significantly_different_values_are_not_close(): @staticmethod def test_relative_tolerance_works(): - assert is_close(1 + 9.99e-10, 1, rel_tol=1e-9) - assert not is_close(1 + 1.01e-9, 1, rel_tol=1e-9) - assert is_close(1e3 + 9.99e-10, 1e3, rel_tol=1e-12) - assert not is_close(1e3 + 1.01e-9, 1e3, rel_tol=1e-12) + assert is_close(1 + 9.99e-10, 1, relative_tolerance=1e-9) + assert not is_close(1 + 1.01e-9, 1, relative_tolerance=1e-9) + assert is_close(1e3 + 9.99e-10, 1e3, relative_tolerance=1e-12) + assert not is_close(1e3 + 1.01e-9, 1e3, relative_tolerance=1e-12) @staticmethod def test_absolute_tolerance_works(): - assert is_close(1e-8, 1.099e-8, abs_tol=1e-9) - assert not is_close(1e-8, 1.101e-8, abs_tol=1e-9) + assert is_close(1e-8, 1.099e-8, absolute_tolerance=1e-9) + assert not is_close(1e-8, 1.101e-8, absolute_tolerance=1e-9) @staticmethod def test_small_numbers_work(): @@ -55,50 +50,3 @@ def test_small_numbers_work(): assert is_close(-9.9e-10, 0) assert not is_close(2e-9, 0) assert not is_close(-1.1e-9, 0) - - -class TestTemperature: - r""" - Tests the `temperature(kin, ndof, kb)` function - """ - - @staticmethod - def test_zero_works(): - assert temperature(kin=0, ndof=457.4) == 0 - assert temperature(kin=0, ndof=457.4, kb=8.314e-3) == 0 - - @staticmethod - def test_zero_values_throw(): - with pytest.raises(pv_error.InputError): - temperature(kin=234.1, ndof=0) - with pytest.raises(pv_error.InputError): - temperature(kin=234.1, ndof=0, kb=8.314e-3) - with pytest.raises(pv_error.InputError): - temperature(kin=234.1, ndof=6234.9, kb=0) - - @staticmethod - def test_negative_values_throw(): - with pytest.raises(pv_error.InputError): - temperature(kin=234.1, ndof=-1e-12) - temperature(kin=234.1, ndof=-1e-5) - with pytest.raises(pv_error.InputError): - temperature(kin=234.1, ndof=-13.6, kb=8.314e-3) - with pytest.raises(pv_error.InputError): - temperature(kin=234.1, ndof=6234.9, kb=-8.314e-3) - - @staticmethod - def test_default_kb(): - # Changing the default would likely yield some ugly surprises - # TODO: Remove the default - if we support different units, - # we should force users to be specific! - assert temperature(kin=12.634, ndof=76.8, kb=8.314e-3) == temperature( - kin=12.634, ndof=76.8 - ) - - @staticmethod - def test_formula(): - kinetic_energy = [3468.7, 1.60923e-8, 5.7923e15] - num_dof = [1, 0.002, 9.6234e13] - kb_values = [8.314e-3, 1, 1.632e2] - for [k, n, kb] in itertools.product(kinetic_energy, num_dof, kb_values): - assert temperature(k, n, kb) == 2 * k / (n * kb) diff --git a/physical_validation/util/ensemble.py b/physical_validation/util/ensemble.py index fa837c3..83ad628 100644 --- a/physical_validation/util/ensemble.py +++ b/physical_validation/util/ensemble.py @@ -16,6 +16,8 @@ serves as the low-level functionality of the high-level module :mod:`physical_validation.ensemble`. """ +from typing import Dict, List, Optional, Tuple, Union + import numpy as np import pymbar import scipy.optimize @@ -24,7 +26,9 @@ from . import plot, trajectory -def generate_histograms(traj1, traj2, g1, g2, bins): +def generate_histograms( + traj1: np.ndarray, traj2: np.ndarray, g1: float, g2: float, bins: np.ndarray +) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: n1 = np.size(traj1) n2 = np.size(traj2) @@ -38,19 +42,19 @@ def generate_histograms(traj1, traj2, g1, g2, bins): def do_linear_fit( - traj1, - traj2, - g1, - g2, - bins, - screen=False, - filename=None, - trueslope=0.0, - trueoffset=0.0, - units=None, - xlabel="Energy", - ylabel=r"$\log\frac{P_2(E)}{P_1(E)}$", -): + traj1: np.ndarray, + traj2: np.ndarray, + g1: float, + g2: float, + bins: np.ndarray, + screen: bool, + filename: Optional[str], + trueslope: float, + trueoffset: float, + units: Optional[str], + xlabel: str, + ylabel: str, +) -> Tuple[np.ndarray, np.ndarray]: h1, h2, dh1, dh2 = generate_histograms(traj1, traj2, g1, g2, bins) @@ -117,7 +121,14 @@ def do_linear_fit( return a, da -def do_max_likelihood_fit(traj1, traj2, g1, g2, init_params=None, verbose=False): +def do_max_likelihood_fit( + traj1: np.ndarray, + traj2: np.ndarray, + g1: Union[float, np.ndarray], + g2: Union[float, np.ndarray], + init_params: np.ndarray, + verbose: bool, +) -> Tuple[np.ndarray, np.ndarray]: # ============================================================= # # Define (negative) log-likelihood function and its derivatives # @@ -236,11 +247,6 @@ def hess_log_likelihood(a, ene1, ene2): # ==================================================== # # Minimize the negative of the log likelihood function # # ==================================================== # - if init_params is None: - init_params = np.zeros(traj1.ndim + 1) - else: - init_params = np.array(init_params) - min_res = checkensemble_solver( fun=log_likelihood, x0=init_params, @@ -301,7 +307,9 @@ def hess_log_likelihood(a, ene1, ene2): return final_params, final_error -def checkensemble_solver(fun, x0, args, jac, hess, tol=1e-10, maxiter=20): +def checkensemble_solver( + fun, x0, args, jac, hess, tol=1e-10, maxiter=20 +) -> scipy.optimize.OptimizeResult: # This is the solver used in checkensemble, unchanged except for some # modernization / adaptation to coding style success = False @@ -350,7 +358,7 @@ def checkensemble_solver(fun, x0, args, jac, hess, tol=1e-10, maxiter=20): ) -def check_bins(traj1, traj2, bins): +def check_bins(traj1: np.ndarray, traj2: np.ndarray, bins: np.ndarray) -> np.ndarray: # check for empty bins h1, _ = np.histogram(traj1, bins=bins) h2, _ = np.histogram(traj2, bins=bins) @@ -374,21 +382,21 @@ def check_bins(traj1, traj2, bins): def print_stats( - title, - fitvals, - dfitvals, - kb, - param1, - param2, - trueslope, - temp=None, - pvconvert=None, - dtemp=False, - dpress=False, - dmu=False, - dtempdpress=False, - dtempdmu=False, -): + title: str, + fitvals: np.ndarray, + dfitvals: Optional[np.ndarray], + kb: float, + param1: Union[float, np.ndarray], + param2: Union[float, np.ndarray], + trueslope: Union[float, np.ndarray], + temp: Optional[float], + pvconvert: Optional[float], + dtemp: bool, + dpress: bool, + dmu: bool, + dtempdpress: bool, + dtempdmu: bool, +) -> None: # if simple 1d: # fitvals = [df, slope] # dfitvals = [ddf, dslope] @@ -481,19 +489,19 @@ def print_stats( def estimate_interval( - ens_string, - ens_temp, - energy, - kb, - ens_press=None, - volume=None, - pvconvert=None, - verbosity=1, - cutoff=0.001, - tunit="", - punit="", - data_is_uncorrelated=False, -): + ens_string: str, + ens_temp: float, + energy: np.ndarray, + kb: float, + ens_press: Optional[float], + volume: Optional[np.ndarray], + pvconvert: Optional[float], + verbosity: int, + cutoff: float, + tunit: str, + punit: str, + data_is_uncorrelated: bool, +) -> Dict[str, float]: result = {} if ens_string == "NVT": # Discard burn-in period and time-correlated frames @@ -580,29 +588,29 @@ def estimate_interval( def check_1d( - traj1, - traj2, - param1, - param2, - kb, - quantity, - dtemp=False, - dpress=False, - dmu=False, - temp=None, - pvconvert=None, - nbins=40, - cutoff=0.001, - bootstrap_seed=None, - bootstrap_error=True, - bootstrap_repetitions=200, - verbosity=1, - screen=False, - filename=None, - xlabel="Energy", - xunit=None, - data_is_uncorrelated=False, -): + traj1: np.ndarray, + traj2: np.ndarray, + param1: float, + param2: float, + kb: float, + quantity: str, + dtemp: bool, + dpress: bool, + dmu: bool, + temp: Optional[float], + pvconvert: Optional[float], + nbins: int, + cutoff: float, + bootstrap_seed: Optional[int], + bootstrap_error: bool, + bootstrap_repetitions: int, + verbosity: int, + screen: bool, + filename: Union[str], + xlabel: str, + xunit: Optional[str], + data_is_uncorrelated: bool, +) -> List[float]: r""" Checks whether the energy trajectories of two simulation performed at different temperatures have sampled distributions at the analytically @@ -610,7 +618,7 @@ def check_1d( Parameters ---------- - traj1 : array-like + traj1 Trajectory of the first simulation If dtemp: @@ -621,7 +629,7 @@ def check_1d( * NPT: Volume V - traj2 : array-like + traj2 Trajectory of the second simulation If dtemp: @@ -632,60 +640,49 @@ def check_1d( * NPT: Volume V - param1 : float + param1 Target temperature or pressure of the first simulation - param2 : float + param2 Target temperature or pressure of the second simulation - kb : float + kb Boltzmann constant in same units as the energy trajectories - quantity : str + quantity Name of quantity analyzed (used for printing only) - dtemp : bool, optional + dtemp Set to True if trajectories were simulated at different temperature - Default: False. - dpress : bool, optional + dpress Set to True if trajectories were simulated at different pressure - Default: False. - temp : float, optional - The temperature in equal temperature, differring pressure NPT simulations. + temp + The temperature in equal temperature, differing pressure NPT simulations. Needed to print optimal dP. - pvconvert : float, optional + pvconvert Conversion from pressure * volume to energy units. Needed to print optimal dP. - dmu : bool, optional + dmu Set to True if trajectories were simulated at different chemical potential - Default: False. - nbins : int, optional + nbins Number of bins used to assess distributions of the trajectories - Default: 40 - cutoff : float, optional + cutoff Tail cutoff of distributions. - Default: 0.001 (0.1%) - bootstrap_seed : int, optional + bootstrap_seed + Sets the random number seed for bootstrapping. If set, bootstrapping will be reproducible. - Default: None, bootstrapping non-reproducible. - bootstrap_error : bool + If `None`, bootstrapping is non-reproducible. + bootstrap_error Calculate the standard error via bootstrap resampling - Default: True - bootstrap_repetitions : int + bootstrap_repetitions Number of bootstrap repetitions drawn - Default: 200 - verbosity : int, optional + verbosity Verbosity level. - Default: 1 (only most important output) - screen : bool, optional + screen Plot distributions on screen. - Default: False. - filename : string, optional - Plot distributions to `filename`. - Default: None. - xlabel : string, optional + filename + Plot distributions to `filename`. If `None`, no plotting. + xlabel x-axis label used for plotting - Default: 'Energy' - xunit : string, optional + xunit x-axis label unit used for plotting - Default: None - data_is_uncorrelated : bool, optional + data_is_uncorrelated Whether the provided data is uncorrelated. If this option is set, the equilibration, decorrelation and tail pruning of the trajectory is skipped. This can speed up the analysis, @@ -694,6 +691,7 @@ def check_1d( Returns ------- + The number of quantiles the computed result is off the analytical one. """ @@ -877,6 +875,8 @@ def check_1d( dtemp=dtemp, dpress=dpress, dmu=dmu, + dtempdmu=False, + dtempdpress=False, ) # ================== # @@ -886,7 +886,12 @@ def check_1d( print("Computing the maximum likelihood parameters") fitvals, dfitvals = do_max_likelihood_fit( - traj1, traj2, g1, g2, init_params=[df, trueslope], verbose=(verbosity > 1) + traj1, + traj2, + g1, + g2, + init_params=np.array([df, trueslope]), + verbose=(verbosity > 1), ) slope = fitvals[1] @@ -906,6 +911,8 @@ def check_1d( dtemp=dtemp, dpress=dpress, dmu=dmu, + dtempdmu=False, + dtempdpress=False, ) if not bootstrap_error: @@ -939,7 +946,12 @@ def check_1d( g2 = pymbar.timeseries.statisticalInefficiency(t2) # calculate max_likelihood fit fv, _ = do_max_likelihood_fit( - t1, t2, g1, g2, init_params=[df, trueslope], verbose=(verbosity > 2) + t1, + t2, + g1, + g2, + init_params=np.array([df, trueslope]), + verbose=(verbosity > 2), ) bs_fitvals.append(fv) # print progress @@ -969,30 +981,32 @@ def check_1d( dtemp=dtemp, dpress=dpress, dmu=dmu, + dtempdmu=False, + dtempdpress=False, ) return quant["bootstrap"] def check_2d( - traj1, - traj2, - param1, - param2, - kb, - pvconvert, - quantity, - dtempdpress=False, - dtempdmu=False, - cutoff=0.001, - bootstrap_seed=None, - bootstrap_error=True, - bootstrap_repetitions=200, - verbosity=1, - screen=False, - filename=None, - data_is_uncorrelated=False, -): + traj1: np.ndarray, + traj2: np.ndarray, + param1: np.ndarray, + param2: np.ndarray, + kb: float, + pvconvert: float, + quantity: List[str], + dtempdpress: bool, + dtempdmu: bool, + cutoff: float, + bootstrap_seed: Optional[int], + bootstrap_error: bool, + bootstrap_repetitions: int, + verbosity: int, + screen: bool, + filename: Optional[str], + data_is_uncorrelated: bool, +) -> List[float]: r""" Checks whether the energy trajectories of two simulation performed at different temperatures have sampled distributions at the analytically @@ -1000,60 +1014,53 @@ def check_2d( Parameters ---------- - traj1 : array-like, 2d + traj1 Trajectory of the first simulation If dtempdpress: * traj[0,:]: Potential energy U or total energy E = U + K * traj[1,:]: Volume V - traj2 : array-like, 2d + traj2 Trajectory of the second simulation If dtempdpress: * traj[0,:]: Potential energy U or total energy E = U + K * traj[1,:]: Volume V - param1 : array-like + param1 If dtempdpress: Target temperature and pressure of the first simulation - param2 : array-like + param2 If dtempdpress: Target temperature and pressure of the first simulation - kb : float + kb Boltzmann constant in same units as the energy trajectories - pvconvert : float + pvconvert Conversion from pressure * volume to energy units - quantity : List[str] + quantity Names of quantities analyzed (used for printing only) - dtempdpress : bool, optional + dtempdpress Set to True if trajectories were simulated at different temperature and pressure - Default: False. - dtempdmu : bool, optional + dtempdmu Set to True if trajectories were simulated at different temperature and chemical potential - Default: False. - cutoff : float + cutoff Tail cutoff of distributions. - Default: 0.001 (0.1%) - bootstrap_seed : int + bootstrap_seed + Sets the random number seed for bootstrapping. If set, bootstrapping will be reproducible. - Default: None, bootstrapping non-reproducible. - bootstrap_error : bool + If `None`, bootstrapping is non-reproducible. + bootstrap_error Calculate the standard error via bootstrap resampling - Default: True - bootstrap_repetitions : int + bootstrap_repetitions Number of bootstrap repetitions drawn - Default: 200 - verbosity : int + verbosity Verbosity level. - Default: 1 (only most important output) - screen : bool, optional + screen Plot distributions on screen. - Default: False. - filename : string, optional - Plot distributions to `filename`. - Default: None. - data_is_uncorrelated : bool, optional + filename + Plot distributions to `filename`. If `None`, no plotting. + data_is_uncorrelated Whether the provided data is uncorrelated. If this option is set, the equilibration, decorrelation and tail pruning of the trajectory is skipped. This can speed up the analysis, @@ -1063,6 +1070,7 @@ def check_2d( Returns ------- + The number of quantiles the computed result is off the analytical one. """ @@ -1232,7 +1240,7 @@ def check_2d( traj2, g1, g2, - init_params=[df, trueslope[0], trueslope[1]], + init_params=np.array([df, trueslope[0], trueslope[1]]), verbose=(verbosity > 1), ) @@ -1248,7 +1256,11 @@ def check_2d( param1=param1, param2=param2, trueslope=trueslope, + temp=None, pvconvert=pvconvert, + dtemp=False, + dpress=False, + dmu=False, dtempdpress=dtempdpress, dtempdmu=dtempdmu, ) @@ -1291,7 +1303,7 @@ def check_2d( t2, g1, g2, - init_params=[df, trueslope[0], trueslope[1]], + init_params=np.array([df, trueslope[0], trueslope[1]]), verbose=(verbosity > 2), ) bs_fitvals.append(fv) @@ -1309,7 +1321,11 @@ def check_2d( param1=param1, param2=param2, trueslope=trueslope, + temp=None, pvconvert=pvconvert, + dtemp=False, + dpress=False, + dmu=False, dtempdpress=dtempdpress, dtempdmu=dtempdmu, ) diff --git a/physical_validation/util/gromacs_interface.py b/physical_validation/util/gromacs_interface.py index a1374bd..71e3f06 100644 --- a/physical_validation/util/gromacs_interface.py +++ b/physical_validation/util/gromacs_interface.py @@ -24,21 +24,29 @@ import subprocess import sys import warnings +from typing import IO, Any, Dict, List, Optional, Tuple, Union import numpy as np +from ..data.trajectory_data import RectangularBox from ..util import error as pv_error +Pipe = Union[None, int, IO[Any]] + class GromacsInterface(object): - def __init__(self, exe=None, dp=None, includepath=None): + def __init__( + self, + exe: str = None, + dp: bool = False, + includepath: Optional[Union[str, List[str]]] = None, + ): self._exe = None self._dp = False self._includepath = None - if dp is not None: - self.dp = dp + self.double = dp if exe is None: # check whether 'gmx' / 'gmx_d' is in the path @@ -59,12 +67,12 @@ def __init__(self, exe=None, dp=None, includepath=None): self.includepath = includepath @property - def exe(self): + def exe(self) -> str: """exe is a string pointing to the gmx executable.""" return self._exe @exe.setter - def exe(self, exe): + def exe(self, exe: str) -> None: # expand '~/bin' to '/home/user/bin' exe = os.path.expanduser(exe) if self._check_exe(exe=exe): @@ -73,29 +81,35 @@ def exe(self, exe): self._exe = exe @property - def double(self): + def double(self) -> bool: """double is a bool defining whether the simulation was ran at double precision""" return self._dp @double.setter - def double(self, dp): + def double(self, dp: bool) -> None: assert isinstance(dp, bool) self._dp = dp @property - def includepath(self): + def includepath(self) -> List[str]: """includepath defines a path the parser looks for system files""" return self._includepath @includepath.setter - def includepath(self, path): + def includepath(self, path: Union[str, List[str]]) -> None: if isinstance(path, str): path = [path] self._includepath = path def get_quantities( - self, edr, quantities, cwd=None, begin=None, end=None, args=None - ): + self, + edr: str, + quantities: List[str], + cwd: Optional[str] = None, + begin: Optional[float] = None, + end: Optional[float] = None, + args: Optional[List[str]] = None, + ) -> Dict[str, np.ndarray]: if args is None: args = [] @@ -135,7 +149,9 @@ def get_quantities( return q_dict - def read_trr(self, trr): + def read_trr( + self, trr: str + ) -> Dict[str, Optional[Union[np.ndarray, RectangularBox]]]: tmp_dump = "gmxpy_" + os.path.basename(trr).replace(".trr", "") + ".dump" with open(tmp_dump, "w") as dump_file: proc = self._run( @@ -214,7 +230,7 @@ def read_trr(self, trr): return result @staticmethod - def read_gro(gro): + def read_gro(gro: str) -> Dict[str, Optional[Union[np.ndarray, RectangularBox]]]: with open(gro) as conf: x = [] v = [] @@ -243,7 +259,7 @@ def read_gro(gro): return result @staticmethod - def read_mdp(mdp): + def read_mdp(mdp: str) -> Dict[str, str]: result = {} with open(mdp) as f: for line in f: @@ -261,12 +277,14 @@ def read_mdp(mdp): return result @staticmethod - def write_mdp(options, mdp): + def write_mdp(options: Dict[str, str], mdp: str): with open(mdp, "w") as f: for key, value in options.items(): f.write("{:24s} = {:s}\n".format(key, value)) - def read_system_from_top(self, top, define=None, include=None): + def read_system_from_top( + self, top: str, define: Optional[str] = None, include: Optional[str] = None + ) -> List[Dict[str, Any]]: if not define: define = [] else: @@ -393,16 +411,16 @@ def read_system_from_top(self, top, define=None, include=None): def grompp( self, - mdp, - top, - gro, - tpr=None, - cwd=".", - args=None, - stdin=None, - stdout=None, - stderr=None, - ): + mdp: str, + top: str, + gro: str, + tpr: Optional[str] = None, + cwd: str = ".", + args: Optional[List[str]] = None, + stdin: Optional[Pipe] = None, + stdout: Optional[Pipe] = None, + stderr: Optional[Pipe] = None, + ) -> int: cwd = os.path.abspath(cwd) assert os.path.exists(os.path.join(cwd, mdp)) assert os.path.exists(os.path.join(cwd, top)) @@ -425,16 +443,16 @@ def grompp( def mdrun( self, - tpr, - edr=None, - deffnm=None, - cwd=".", - args=None, - stdin=None, - stdout=None, - stderr=None, - mpicmd=None, - ): + tpr: str, + edr: Optional[str] = None, + deffnm: Optional[str] = None, + cwd: str = ".", + args: Optional[List[str]] = None, + stdin: Optional[Pipe] = None, + stdout: Optional[Pipe] = None, + stderr: Optional[Pipe] = None, + mpicmd: Optional[str] = None, + ) -> int: cwd = os.path.abspath(cwd) tpr = os.path.join(cwd, tpr) assert os.path.exists(cwd) @@ -461,7 +479,7 @@ def mdrun( proc.wait() return proc.returncode - def _check_exe(self, quiet=False, exe=None): + def _check_exe(self, quiet: bool = False, exe: Optional[str] = None) -> bool: if exe is None: exe = self._exe try: @@ -477,11 +495,18 @@ def _check_exe(self, quiet=False, exe=None): else: raise e # check that output is as expected - return re.search(br":-\) GROMACS - gmx.* \(-:", exe_out) + return bool(re.search(br":-\) GROMACS - gmx.* \(-:", exe_out)) def _run( - self, cmd, args, cwd=None, stdin=None, stdout=None, stderr=None, mpicmd=None - ): + self, + cmd: str, + args: Optional[List[str]] = None, + cwd: Optional[str] = None, + stdin: Optional[Pipe] = None, + stdout: Optional[Pipe] = None, + stderr: Optional[Pipe] = None, + mpicmd: Optional[str] = None, + ) -> subprocess.Popen: if self.exe is None: raise RuntimeError( "Tried to use GromacsParser before setting gmx executable." @@ -496,8 +521,15 @@ def _run( ) def _create_xvg( - self, edr, xvg, quantities, cwd=None, begin=None, end=None, args=None - ): + self, + edr: str, + xvg: str, + quantities: List[str], + cwd: Optional[str] = None, + begin: Optional[float] = None, + end: Optional[float] = None, + args: Optional[List[str]] = None, + ) -> Tuple[int, List[str]]: assert os.path.exists(edr) assert os.path.exists(os.path.abspath(os.path.dirname(xvg))) @@ -539,7 +571,9 @@ def _create_xvg( return proc.wait(), not_found - def _read_top(self, filehandler, include, define): + def _read_top( + self, filehandler: IO, include: List[str], define: List[str] + ) -> List[str]: read = [True] content = [] include_dirs = include diff --git a/physical_validation/util/integrator.py b/physical_validation/util/integrator.py index 8251e04..51a846d 100644 --- a/physical_validation/util/integrator.py +++ b/physical_validation/util/integrator.py @@ -16,50 +16,43 @@ generally not be called directly. Please use the high-level functions from `physical_validation.integrator`. """ +from typing import Callable, Dict, Optional, Tuple + import numpy as np from . import plot -def calculate_rmsd(data, time=None, slope=False): +def calculate_rmsd( + data: np.ndarray, time: Optional[np.ndarray] +) -> Tuple[float, float, float]: assert isinstance(data, np.ndarray) and data.ndim == 1 assert time is None or isinstance(time, np.ndarray) and time.ndim == 1 - avg = data.mean() + avg = float(data.mean()) if time is None: time = np.arange(data.size) fit = np.polyfit(time, data, 1) + rmsd = float(data.std()) - def f(x): - return fit[0] * x + fit[1] - - if slope: - rmsd = 0 - for t, d in zip(time, data): - rmsd += (d - f(t)) ** 2 - rmsd = np.sqrt(rmsd / data.size) - else: - rmsd = data.std() - - return avg, rmsd, fit[0] + return avg, rmsd, float(fit[0]) -def max_deviation(dts, rmsds): +def max_deviation(dts: np.ndarray, rmsds: np.ndarray) -> float: dt_ratio_2 = (dts[:-1] / dts[1:]) ** 2 rmsds = rmsds[:-1] / rmsds[1:] return np.max(np.abs(1 - rmsds / dt_ratio_2)) def check_convergence( - const_traj, - convergence_test=max_deviation, - verbose=True, - slope=False, - screen=False, - filename=None, -): + const_traj: Dict[str, np.ndarray], + convergence_test: Callable[[np.ndarray, np.ndarray], float], + verbose: bool, + screen: bool, + filename: Optional[str], +) -> float: assert isinstance(const_traj, dict) assert len(const_traj) >= 2 @@ -89,7 +82,7 @@ def check_convergence( data = traj[1] time = traj[0] - results[dt] = calculate_rmsd(data, time, slope) + results[dt] = calculate_rmsd(data, time) if verbose: if prev is None: diff --git a/physical_validation/util/kinetic_energy.py b/physical_validation/util/kinetic_energy.py index 2f63175..31c0eb4 100644 --- a/physical_validation/util/kinetic_energy.py +++ b/physical_validation/util/kinetic_energy.py @@ -18,66 +18,42 @@ """ import warnings from multiprocessing.pool import ThreadPool +from typing import Dict, Iterable, List, Optional, Tuple, Union import numpy as np import scipy.stats as stats -from ..util import error as pv_error from ..util import trajectory from . import plot -def is_close(a, b, rel_tol=1e-09, abs_tol=1e-09): - return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol) - - -def temperature(kin, ndof, kb=8.314e-3): +def is_close( + value1: float, + value2: float, + relative_tolerance: float = 1e-09, + absolute_tolerance: float = 1e-09, +) -> bool: r""" - Calculates the temperature acccording to the equipartition theorem. - - .. math:: - T(K) = \frac{2K}{N k_B} - - Parameters - ---------- - kin : float - Kinetic energy. - ndof : float - Number of degrees of freedom. - kb : float - Boltzmann constant :math:`k_B`. Default: 8.314e-3 (kJ/mol). - - Returns - ------- - temperature : float - Calculated temperature. + Whether two float values are close, as defined by a given relative and + absolute tolerance. """ - if ndof <= 0: - raise pv_error.InputError( - "ndof", - "Temperature cannot be calculated with zero or negative degrees of freedom.", - ) - if kb <= 0: - raise pv_error.InputError( - "kb", - "Temperature cannot be calculated with zero or negative Boltzmann constant.", - ) - - return 2 * float(kin) / (float(ndof) * float(kb)) + return abs(value1 - value2) <= max( + relative_tolerance * max(abs(value1), abs(value2)), absolute_tolerance + ) def check_distribution( - kin, - temp, - ndof, - kb=8.314e-3, - verbosity=2, - screen=False, - filename=None, - ene_unit=None, - temp_unit=None, - data_is_uncorrelated=False, -): + kin: np.ndarray, + temp: float, + ndof: float, + kb: float, + verbosity: int, + screen: bool, + filename: Optional[str], + ene_unit: Optional[str], + temp_unit: Optional[str], + data_is_uncorrelated: bool, +) -> float: r""" Checks if a kinetic energy trajectory is Maxwell-Boltzmann distributed. @@ -90,31 +66,30 @@ def check_distribution( Parameters ---------- - kin : array-like + kin Kinetic energy snapshots of the system. - temp : float + temp Target temperature of the system. Used to construct the Maxwell-Boltzmann distribution. - ndof : float + ndof Number of degrees of freedom in the system. Used to construct the Maxwell-Boltzmann distribution. - kb : float - Boltzmann constant :math:`k_B`. Default: 8.314e-3 (kJ/mol). - verbosity : int + kb + Boltzmann constant :math:`k_B`. + verbosity 0: Silent. 1: Print minimal information. 2: Print result details. 3: Print additional information. - Default: 2. - screen : bool - Plot distributions on screen. Default: False. - filename : string - Plot distributions to `filename`. Default: None. - ene_unit : string + screen + Plot distributions on screen. + filename + Plot distributions to `filename`. If `None`, no plotting. + ene_unit Energy unit - used for output only. - temp_unit : string + temp_unit Temperature unit - used for output only. - data_is_uncorrelated : bool, optional + data_is_uncorrelated Whether the provided data is uncorrelated. If this option is set, the equilibration, decorrelation and tail pruning of the trajectory is skipped. This can speed up the analysis, @@ -123,7 +98,7 @@ def check_distribution( Returns ------- - result : float + result The p value of the test. See Also @@ -223,19 +198,19 @@ def check_distribution( def check_mean_std( - kin, - temp, - ndof, - kb, - verbosity=2, - bs_repetitions=200, - bootstrap_seed=None, - screen=False, - filename=None, - ene_unit=None, - temp_unit=None, - data_is_uncorrelated=False, -): + kin: np.ndarray, + temp: float, + ndof: float, + kb: float, + verbosity: int, + bs_repetitions: int, + bootstrap_seed: Optional[int], + screen: bool, + filename: Optional[str], + ene_unit: Optional[str], + temp_unit: Optional[str], + data_is_uncorrelated: bool, +) -> Tuple[float, float]: r""" Calculates the mean and standard deviation of a trajectory (+ bootstrap error estimates), and compares them to the theoretically expected values. @@ -249,37 +224,36 @@ def check_mean_std( Parameters ---------- - kin : array-like + kin Kinetic energy snapshots of the system. - temp : float + temp Target temperature of the system. Used to construct the Maxwell-Boltzmann distribution. - ndof : float + ndof Number of degrees of freedom in the system. Used to construct the Maxwell-Boltzmann distribution. - kb : float + kb Boltzmann constant :math:`k_B`. - verbosity : int + verbosity 0: Silent. 1: Print minimal information. 2: Print result details. 3: Print additional information. - Default: 2. - bs_repetitions : int - Number of bootstrap samples used for error estimate. Default: 200. - bootstrap_seed : int + bs_repetitions + Number of bootstrap samples used for error estimate. + bootstrap_seed Sets the random number seed for bootstrapping. If set, bootstrapping will be reproducible. - Default: None, bootstrapping is non-reproducible. - screen : bool - Plot distributions on screen. Default: False. - filename : string - Plot distributions to `filename`. Default: None. - ene_unit : string + If `None`, bootstrapping is non-reproducible. + screen + Plot distributions on screen. + filename + Plot distributions to `filename`. If `None`, no plotting. + ene_unit Energy unit - used for output only. - temp_unit : string + temp_unit Temperature unit - used for output only. - data_is_uncorrelated : bool, optional + data_is_uncorrelated Whether the provided data is uncorrelated. If this option is set, the equilibration, decorrelation and tail pruning of the trajectory is skipped. This can speed up the analysis, @@ -288,7 +262,7 @@ def check_mean_std( Returns ------- - result : Tuple[float] + result Distance of the estimated T(mu) and T(sigma) from the expected temperature, measured in standard deviations of the estimates. @@ -446,32 +420,36 @@ def check_mean_std( def check_equipartition( - positions, - velocities, - masses, - molec_idx, - molec_nbonds, - natoms, - nmolecs, - temp, - kb, - strict, - ndof_reduction_tra=0, - ndof_reduction_rot=0, - molec_groups=None, - random_divisions=0, - random_groups=2, - random_division_seed=None, - ndof_molec=None, - kin_molec=None, - verbosity=2, - screen=False, - filename=None, - ene_unit=None, - temp_unit=None, - bootstrap_seed=None, - data_is_uncorrelated=False, -): + positions: np.ndarray, + velocities: np.ndarray, + masses: np.ndarray, + molec_idx: np.ndarray, + molec_nbonds: np.ndarray, + natoms: int, + nmolecs: int, + temp: float, + kb: float, + strict: bool, + ndof_reduction_tra: float, + ndof_reduction_rot: float, + molec_groups: Optional[List[np.ndarray]], + random_divisions: int, + random_groups: int, + random_division_seed: Optional[int], + ndof_molec: Optional[List[Dict[str, float]]], + kin_molec: Optional[List[List[Dict[str, float]]]], + verbosity: int, + screen: bool, + filename: Optional[str], + ene_unit: Optional[str], + temp_unit: Optional[str], + bootstrap_seed: Optional[int], + data_is_uncorrelated: bool, +) -> Tuple[ + Union[List[float], List[Tuple[float, float]]], + List[Dict[str, float]], + List[List[Dict[str, float]]], +]: r""" Checks the equipartition of a simulation trajectory. @@ -494,61 +472,59 @@ def check_equipartition( Index of first atom for every molecule molec_nbonds : array-like (nmolecs x 1) Number of bonds for every molecule - natoms : int + natoms Number of atoms in the system - nmolecs : int + nmolecs Number of molecules in the system - temp : float + temp Target temperature of the simulation. - kb : float + kb Boltzmann constant :math:`k_B`. - strict : bool + strict If True, check full kinetic energy distribution via K-S test. Otherwise, check mean and width of kinetic energy distribution. - ndof_reduction_tra : int, optional + ndof_reduction_tra Number of center-of-mass translational degrees of freedom to - remove. Default: 0. - ndof_reduction_rot : int, optional + remove. + ndof_reduction_rot Number of center-of-mass rotational degrees of freedom to remove. - Default: 0. molec_groups : List[array-like] (ngroups x ?), optional List of 1d arrays containing molecule indeces defining groups. Useful to pre-define groups of molecules (e.g. solute / solvent, liquid mixture species, ...). If None, no pre-defined molecule - groups will be tested. Default: None. + groups will be tested. *Note:* If an empty 1d array is found as last element in the list, the remaining molecules are collected in this array. This allows, for example, to only specify the solute, and indicate the solvent by giving an empty array. - random_divisions : int, optional - Number of random division tests attempted. Default: 0 (random - division tests off). - random_groups : int, optional - Number of groups the system is randomly divided in. Default: 2. - random_division_seed : int, optional + random_divisions + Number of random division tests attempted. + random_groups + Number of groups the system is randomly divided in. + random_division_seed Seed making the random divisions reproducible. - Default: None, random divisions not reproducible - ndof_molec : List[dict], optional + If `None`, random divisions not reproducible + ndof_molec Pass in the degrees of freedom per molecule. Slightly increases speed of repeated analysis of the same simulation run. - kin_molec : List[List[dict]], optional + kin_molec Pass in the kinetic energy per molecule. Greatly increases speed of repeated analysis of the same simulation run. - verbosity : int, optional - Verbosity level, where 0 is quiet and 3 very chatty. Default: 2. - screen : bool + verbosity + Verbosity level, where 0 is quiet and 3 very chatty. + screen Plot distributions on screen. Default: False. - filename : string - Plot distributions to `filename`. Default: None. - ene_unit : string + filename + Plot distributions to `filename`. If `None`, no plotting. + ene_unit Energy unit - used for output only. - temp_unit : string + temp_unit Temperature unit - used for output only. - bootstrap_seed : int + bootstrap_seed Sets the random number seed for bootstrapping (if strict=False). If set, bootstrapping will be reproducible. - Default: None, bootstrapping is non-reproducible. - data_is_uncorrelated : bool, optional + If `None`, bootstrapping is non-reproducible. + data_is_uncorrelated Whether the provided data is uncorrelated. If this option is set, the equilibration, decorrelation and tail pruning of the trajectory is skipped. This can speed up the analysis, @@ -558,7 +534,7 @@ def check_equipartition( Returns ------- result : List[float] or List[Tuple[float]] - If `strict=True`: The p value for every tests. + If `strict=True`: The p value for every test. If `strict=False`: Distance of the estimated T(mu) and T(sigma) from the expected temperature, measured in standard deviations of the respective estimate, for every test. @@ -624,6 +600,7 @@ def check_equipartition( kb=kb, dict_keys=dict_keys, strict=strict, + group=None, verbosity=verbosity, screen=screen, filename=filename, @@ -689,11 +666,10 @@ def check_equipartition( if last_empty: # last group is [] -> insert remaining molecules - molec_groups.append([]) combined = [] for group in molec_groups: combined.extend(group) - molec_groups[-1] = [m for m in range(nmolecs) if m not in combined] + molec_groups.append(np.array([m for m in range(nmolecs) if m not in combined])) for mg, group in enumerate(molec_groups): if verbosity > 0: @@ -724,32 +700,35 @@ def check_equipartition( def calc_ndof( - natoms, nmolecs, molec_idx, molec_nbonds, ndof_reduction_tra, ndof_reduction_rot -): + natoms: int, + nmolecs: int, + molec_idx: np.ndarray, + molec_nbonds: np.ndarray, + ndof_reduction_tra: float, + ndof_reduction_rot: float, +) -> List[Dict[str, float]]: r""" Calculates the total / translational / rotational & internal / rotational / internal degrees of freedom per molecule. Parameters ---------- - natoms : int + natoms Total number of atoms in the system - nmolecs : int + nmolecs Total number of molecules in the system - molec_idx : List[int] + molec_idx Index of first atom for every molecule - molec_nbonds : List[int] + molec_nbonds Number of bonds for every molecule - ndof_reduction_tra : int - Number of center-of-mass translational degrees of freedom to - remove. Default: 0. - ndof_reduction_rot : int + ndof_reduction_tra + Number of center-of-mass translational degrees of freedom tobremove. + ndof_reduction_rot Number of center-of-mass rotational degrees of freedom to remove. - Default: 0. Returns ------- - ndof_molec : List[dict] + ndof_molec List of dictionaries containing the degrees of freedom for each molecule Keys: ['total', 'translational', 'rotational and internal', 'rotational', 'internal'] """ @@ -775,7 +754,7 @@ def calc_ndof( ndof_rni = ndof_tot - ndof_tra ndof_rot = 3 - ndof_com_rot_pm ndof_int = ndof_tot - ndof_tra - ndof_rot - if is_close(ndof_int, 0, abs_tol=1e-09): + if is_close(ndof_int, 0, absolute_tolerance=1e-09): ndof_int = 0 if natoms == 1: ndof_tot = 3 - ndof_com_tra_pm @@ -796,7 +775,14 @@ def calc_ndof( return ndof_molec -def calc_molec_kinetic_energy(pos, vel, masses, molec_idx, natoms, nmolecs): +def calc_molec_kinetic_energy( + pos: np.ndarray, + vel: np.ndarray, + masses: np.ndarray, + molec_idx: np.ndarray, + natoms: int, + nmolecs: int, +) -> Dict[str, np.ndarray]: r""" Calculates the total / translational / rotational & internal / rotational / internal kinetic energy per molecule. @@ -811,15 +797,15 @@ def calc_molec_kinetic_energy(pos, vel, masses, molec_idx, natoms, nmolecs): 1d array containing the masses of all atoms molec_idx : nd-array (nmolecs x 1) Index of first atom for every molecule - natoms : int + natoms Total number of atoms in the system - nmolecs : int + nmolecs Total number of molecules in the system Returns ------- - kin : List[dict] - List of dictionaries containing the kinetic energies for each molecule + kin + Dictionary of lists containing the kinetic energies for each molecule Keys: ['total', 'translational', 'rotational and internal', 'rotational', 'internal'] """ # add last idx to molec_idx to ease looping @@ -910,20 +896,22 @@ def calc_molec_kinetic_energy(pos, vel, masses, molec_idx, natoms, nmolecs): } -def group_kinetic_energy(kin_molec, nmolecs, molec_group=None): +def group_kinetic_energy( + kin_molec: Dict[str, np.ndarray], nmolecs: int, molec_group=Optional[Iterable[int]] +) -> Dict[str, float]: r""" Sums up the partitioned kinetic energy for a given group or the entire system. Parameters ---------- - kin_molec : List[dict] + kin_molec Partitioned kinetic energies per molecule. - nmolecs : int + nmolecs Total number of molecules in the system. - molec_group : iterable - Indeces of the group to be summed up. None defaults to all molecules - in the system. Default: None. + molec_group + Indices of the group to be summed up. `None` defaults to all molecules + in the system. Returns ------- @@ -949,20 +937,24 @@ def group_kinetic_energy(kin_molec, nmolecs, molec_group=None): return kin -def group_ndof(ndof_molec, nmolecs, molec_group=None): +def group_ndof( + ndof_molec: List[Dict[str, float]], + nmolecs: int, + molec_group=Optional[Iterable[int]], +) -> Dict[str, float]: r""" Sums up the partitioned degrees of freedom for a given group or the entire system. Parameters ---------- - ndof_molec : List[dict] + ndof_molec Partitioned degrees of freedom per molecule. - nmolecs : int + nmolecs Total number of molecules in the system. - molec_group : iterable + molec_group Indeces of the group to be summed up. None defaults to all molecules - in the system. Default: None. + in the system. Returns ------- @@ -989,22 +981,22 @@ def group_ndof(ndof_molec, nmolecs, molec_group=None): def test_group( - kin_molec, - ndof_molec, - nmolecs, - temp, - kb, - dict_keys, - strict=False, - group=None, - verbosity=0, - screen=False, - filename=None, - ene_unit=None, - temp_unit=None, - bootstrap_seed=None, - data_is_uncorrelated=False, -): + kin_molec: List[Dict[str, np.ndarray]], + ndof_molec: List[Dict[str, float]], + nmolecs: int, + temp: float, + kb: float, + dict_keys: List[str], + strict: bool, + group: Optional[Iterable[int]], + verbosity: int, + screen: bool, + filename: Optional[str], + ene_unit: Optional[str], + temp_unit: Optional[str], + bootstrap_seed: Optional[int], + data_is_uncorrelated: bool, +) -> Union[List[float], List[Tuple[float, float]]]: r""" Tests if the partitioned kinetic energy trajectory of a group (or, if group is None, of the entire system) are separately Maxwell-Boltzmann @@ -1012,41 +1004,40 @@ def test_group( Parameters ---------- - kin_molec : List[List[dict]] + kin_molec Partitioned kinetic energies per molecule for every frame. - ndof_molec : List[dict] + ndof_molec Partitioned degrees of freedom per molecule. - nmolecs : int + nmolecs Total number of molecules in the system. - temp : float + temp Target temperature of the simulation. - kb : float + kb Boltzmann constant :math:`k_B`. - dict_keys : List[str] + dict_keys List of dictionary keys representing the partitions of the degrees of freedom. - strict : bool, optional + strict If True, check full kinetic energy distribution via K-S test. Otherwise, check mean and width of kinetic energy distribution. - Default: False - group : iterable - Indeces of the group to be tested. None defaults to all molecules - in the system. Default: None. - verbosity : int - Verbosity level, where 0 is quiet and 3 very chatty. Default: 0. - screen : bool - Plot distributions on screen. Default: False. - filename : string - Plot distributions to `filename`. Default: None. - ene_unit : string + group + Indices of the group to be tested. `None` defaults to all molecules + in the system. + verbosity + Verbosity level, where 0 is quiet and 3 very chatty. + screen + Plot distributions on screen. + filename + Plot distributions to `filename`. If `None`, no plotting. + ene_unit Energy unit - used for output only. - temp_unit : string + temp_unit Temperature unit - used for output only. - bootstrap_seed : int + bootstrap_seed Sets the random number seed for bootstrapping (if strict=False). If set, bootstrapping will be reproducible. - Default: None, bootstrapping is non-reproducible. - data_is_uncorrelated : bool, optional + If `None`, bootstrapping is non-reproducible. + data_is_uncorrelated Whether the provided data is uncorrelated. If this option is set, the equilibration, decorrelation and tail pruning of the trajectory is skipped. This can speed up the analysis, @@ -1055,7 +1046,7 @@ def test_group( Returns ------- - result : List[float] or List[Tuple[Float]] + result p value for every partition (strict) or tuple of distance of estimated T(mu) and T(sigma) for every partition (non-strict) """ @@ -1092,10 +1083,12 @@ def test_group( kin=group_kin[key], temp=temp, ndof=ndof[key], + kb=kb, verbosity=int(verbosity > 1), screen=screen, filename=fn, ene_unit=ene_unit, + temp_unit=temp_unit, data_is_uncorrelated=data_is_uncorrelated, ) else: @@ -1109,6 +1102,7 @@ def test_group( filename=fn, ene_unit=ene_unit, temp_unit=temp_unit, + bs_repetitions=200, bootstrap_seed=bootstrap_seed, data_is_uncorrelated=data_is_uncorrelated, ) diff --git a/physical_validation/util/plot.py b/physical_validation/util/plot.py index 84a994f..9632aff 100644 --- a/physical_validation/util/plot.py +++ b/physical_validation/util/plot.py @@ -12,28 +12,29 @@ ########################################################################### import warnings +from typing import Dict, List, Optional, Tuple, Union import numpy as np def plot( - res, - legend=None, - title=None, - xlabel=None, - ylabel=None, - xlim=None, - ylim=None, - inv_x=False, - inv_y=False, - sci_x=False, - sci_y=False, - axtext=None, - annotation_location=None, - percent=False, - filename=None, - screen=True, -): + res: List[Dict[str, Union[np.ndarray, str, Dict]]], + legend: Optional[str] = None, + title: Optional[str] = None, + xlabel: Optional[str] = None, + ylabel: Optional[str] = None, + xlim: Optional[Tuple[float, float]] = None, + ylim: Optional[Tuple[float, float]] = None, + inv_x: bool = False, + inv_y: bool = False, + sci_x: bool = False, + sci_y: bool = False, + axtext: Optional[str] = None, + annotation_location: Optional[str] = None, + percent: bool = False, + filename: Optional[str] = None, + screen: bool = True, +) -> None: try: import matplotlib as mpl diff --git a/physical_validation/util/trajectory.py b/physical_validation/util/trajectory.py index 4d44651..fe6958b 100644 --- a/physical_validation/util/trajectory.py +++ b/physical_validation/util/trajectory.py @@ -12,6 +12,7 @@ ########################################################################### import warnings +from typing import Iterator, Optional, Tuple import numpy as np from pymbar import timeseries @@ -20,7 +21,7 @@ from . import error as pv_error -def equilibrate(traj): +def equilibrate(traj: np.ndarray) -> np.ndarray: traj = np.array(traj) if traj.ndim == 1: t0, g, n_eff = timeseries.detectEquilibration(traj) @@ -57,7 +58,7 @@ def equilibrate(traj): return res -def decorrelate(traj): +def decorrelate(traj: np.ndarray) -> np.ndarray: traj = np.array(traj) if traj.ndim == 1: idx = timeseries.subsampleCorrelatedData(traj) @@ -86,7 +87,7 @@ def decorrelate(traj): return res -def cut_tails(traj, cut): +def cut_tails(traj: np.ndarray, cut: float) -> np.ndarray: traj = np.array(traj) dc = 100 * cut if traj.ndim == 1: @@ -120,12 +121,18 @@ def cut_tails(traj, cut): return t -def prepare(traj, cut=None, verbosity=1, name=None, skip_preparation=False): +def prepare( + traj: np.ndarray, + cut: Optional[float] = None, + verbosity: int = 1, + name: Optional[str] = None, + skip_preparation: bool = False, +) -> np.ndarray: traj = np.array(traj) if not name: - name = "Trajectory" + name = "Traectory" - def traj_length(t): + def traj_length(t: np.ndarray) -> int: if t.ndim == 1: return t.size else: @@ -190,7 +197,9 @@ def traj_length(t): return res -def overlap(traj1, traj2, cut=None): +def overlap( + traj1: np.ndarray, traj2: np.ndarray, cut=None +) -> Tuple[np.ndarray, np.ndarray, Optional[float], Optional[float]]: traj1 = np.array(traj1) traj2 = np.array(traj2) if traj1.ndim == traj2.ndim and traj2.ndim == 1: @@ -201,10 +210,10 @@ def overlap(traj1, traj2, cut=None): max2 = stats.scoreatpercentile(traj2, 100 - dc) min2 = stats.scoreatpercentile(traj2, dc) else: - max1 = traj1.max() - min1 = traj1.min() - max2 = traj2.max() - min2 = traj2.min() + max1 = np.max(traj1) + min1 = np.min(traj1) + max2 = np.max(traj2) + min2 = np.min(traj2) tmin = max(min1, min2) tmax = min(max1, max2) @@ -224,10 +233,10 @@ def overlap(traj1, traj2, cut=None): max2 = stats.scoreatpercentile(traj2, 100 - dc, axis=1) min2 = stats.scoreatpercentile(traj2, dc, axis=1) else: - max1 = traj1.max(axis=1) - min1 = traj1.min(axis=1) - max2 = traj2.max(axis=1) - min2 = traj2.min(axis=1) + max1 = np.max(traj1, axis=1) + min1 = np.min(traj1, axis=1) + max2 = np.max(traj2, axis=1) + min2 = np.min(traj2, axis=1) tmin = np.max([min1, min2], axis=0) tmax = np.min([max1, max2], axis=0) @@ -262,7 +271,7 @@ def overlap(traj1, traj2, cut=None): return t1, t2, tmin, tmax -def bootstrap(traj, n_samples): +def bootstrap(traj: np.ndarray, n_samples: int) -> Iterator[np.ndarray]: traj = np.array(traj) if traj.ndim == 1: n_traj = traj.size diff --git a/physical_validation/util/util.py b/physical_validation/util/util.py index b37dcb3..50ef7d8 100644 --- a/physical_validation/util/util.py +++ b/physical_validation/util/util.py @@ -20,7 +20,7 @@ def array_equal_shape_and_close( array1: Optional[np.ndarray], array2: Optional[np.ndarray] -): +) -> bool: r""" Tests whether two arrays have the same shape and all values are close