diff --git a/atomisticparsers/utils/mdanalysis.py b/atomisticparsers/utils/mdanalysis.py index 954d9061..abdff89b 100644 --- a/atomisticparsers/utils/mdanalysis.py +++ b/atomisticparsers/utils/mdanalysis.py @@ -19,6 +19,7 @@ import numpy as np import os + try: import MDAnalysis import MDAnalysis.analysis.rdf as MDA_RDF @@ -34,12 +35,9 @@ from nomad.units import ureg from nomad.parsing.file_parser import FileParser -from nomad.atomutils import ( - BeadGroup, - shifted_correlation_average -) +from nomad.atomutils import BeadGroup, shifted_correlation_average -MOL = 6.022140857e+23 +MOL = 6.022140857e23 class MDAnalysisParser(FileParser): @@ -71,25 +69,31 @@ def options(self, value): def universe(self): if self._file_handler is None: try: - self._file_handler = MDAnalysis.Universe(self.mainfile, *self.auxilliary_files, **self.options) + self._file_handler = MDAnalysis.Universe( + self.mainfile, *self.auxilliary_files, **self.options + ) except Exception as e: - self.logger.error('Error creating MDAnalysis universe.', exc_info=e) + self.logger.error("Error creating MDAnalysis universe.", exc_info=e) return self._file_handler @property def bead_groups(self): - atoms_moltypes = self.get('atoms_info', {}).get('moltypes', []) + atoms_moltypes = self.get("atoms_info", {}).get("moltypes", []) moltypes = np.unique(atoms_moltypes) bead_groups = {} - compound = 'fragments' + compound = "fragments" for moltype in moltypes: - if hasattr(self.universe.atoms, 'moltypes'): - ags_by_moltype = self.universe.select_atoms('moltype ' + moltype) + if hasattr(self.universe.atoms, "moltypes"): + ags_by_moltype = self.universe.select_atoms("moltype " + moltype) else: # this is easier than adding something to the universe - selection = ' '.join([str(i) for i in np.where(atoms_moltypes == moltype)[0]]) - selection = f'index {selection}' + selection = " ".join( + [str(i) for i in np.where(atoms_moltypes == moltype)[0]] + ) + selection = f"index {selection}" ags_by_moltype = self.universe.select_atoms(selection) - ags_by_moltype = ags_by_moltype[ags_by_moltype.masses > abs(1e-2)] # remove any virtual/massless sites (needed for, e.g., 4-bead water models) + ags_by_moltype = ags_by_moltype[ + ags_by_moltype.masses > abs(1e-2) + ] # remove any virtual/massless sites (needed for, e.g., 4-bead water models) bead_groups[moltype] = BeadGroup(ags_by_moltype, compound=compound) return bead_groups @@ -103,64 +107,84 @@ def parse(self, quantity_key: str = None, **kwargs): atoms = list(self.universe.atoms) - name_map = {'mass': 'masses'} - unit_map = {'mass': ureg.amu, 'charge': ureg.elementary_charge} - self._results['atoms_info'] = dict() - for key in ['name', 'charge', 'mass', 'resid', 'resname', 'molnum', 'moltype', 'type', 'segid', 'element']: + name_map = {"mass": "masses"} + unit_map = {"mass": ureg.amu, "charge": ureg.elementary_charge} + self._results["atoms_info"] = dict() + for key in [ + "name", + "charge", + "mass", + "resid", + "resname", + "molnum", + "moltype", + "type", + "segid", + "element", + ]: try: value = [getattr(atom, key) for atom in atoms] except Exception: continue value = value * unit_map.get(key, 1) if value is not None else value - self._results['atoms_info'][name_map.get(key, f'{key}s')] = value + self._results["atoms_info"][name_map.get(key, f"{key}s")] = value # if atom name is not identified, set it to 'X' - if self._results['atoms_info'].get('names') is None: - self._results['atoms_info']['names'] = ['X'] * self.universe.atoms.n_atoms - self._results['n_atoms'] = self.universe.atoms.n_atoms - self._results['n_frames'] = len(self.universe.trajectory) + if self._results["atoms_info"].get("names") is None: + self._results["atoms_info"]["names"] = ["X"] * self.universe.atoms.n_atoms + self._results["n_atoms"] = self.universe.atoms.n_atoms + self._results["n_frames"] = len(self.universe.trajectory) # make substitutions based on available atom info - if self._results['atoms_info'].get('moltypes') is None: - if hasattr(self.universe.atoms, 'fragments'): - self._results['atoms_info']['moltypes'] = self.get_fragtypes() + if self._results["atoms_info"].get("moltypes") is None: + if hasattr(self.universe.atoms, "fragments"): + self._results["atoms_info"]["moltypes"] = self.get_fragtypes() - if self._results['atoms_info'].get('molnums') is None: + if self._results["atoms_info"].get("molnums") is None: try: - value = getattr(self.universe.atoms, 'fragindices') - self._results['atoms_info']['molnums'] = value + value = getattr(self.universe.atoms, "fragindices") + self._results["atoms_info"]["molnums"] = value except Exception: pass - if self._results['atoms_info'].get('resnames') is None: + if self._results["atoms_info"].get("resnames") is None: try: - self._results['atoms_info']['resnames'] = self._results['atoms_info']['resids'] + self._results["atoms_info"]["resnames"] = self._results["atoms_info"][ + "resids" + ] except Exception: pass - if self._results['atoms_info'].get('names') is None: + if self._results["atoms_info"].get("names") is None: try: - self._results['atoms_info']['names'] = self._results['atoms_info']['types'] + self._results["atoms_info"]["names"] = self._results["atoms_info"][ + "types" + ] except Exception: pass - if self._results['atoms_info'].get('elements') is None: + if self._results["atoms_info"].get("elements") is None: try: - self._results['atoms_info']['elements'] = self._results['atoms_info']['names'] + self._results["atoms_info"]["elements"] = self._results["atoms_info"][ + "names" + ] except Exception: pass def get_fragtypes(self): # TODO put description otherwise, make private or put under parse method - ''' - ''' + """ """ atoms_fragtypes = np.empty(self.universe.atoms.types.shape, dtype=str) ctr_fragtype = 0 atoms_fragtypes[self.universe.atoms.fragments[0].ix] = ctr_fragtype - frag_unique_atomtypes = [self.universe.atoms.types[self.universe.atoms.fragments[0].ix]] + frag_unique_atomtypes = [ + self.universe.atoms.types[self.universe.atoms.fragments[0].ix] + ] ctr_fragtype += 1 for i_frag in range(1, self.universe.atoms.n_fragments): - types_i_frag = self.universe.atoms.types[self.universe.atoms.fragments[i_frag].ix] + types_i_frag = self.universe.atoms.types[ + self.universe.atoms.fragments[i_frag].ix + ] flag_fragtype_exists = False for j_frag in range(len(frag_unique_atomtypes) - 1, -1, -1): types_j_frag = frag_unique_atomtypes[j_frag] @@ -171,39 +195,71 @@ def get_fragtypes(self): flag_fragtype_exists = True if not flag_fragtype_exists: atoms_fragtypes[self.universe.atoms.fragments[i_frag].ix] = ctr_fragtype - frag_unique_atomtypes.append(self.universe.atoms.types[self.universe.atoms.fragments[i_frag].ix]) + frag_unique_atomtypes.append( + self.universe.atoms.types[self.universe.atoms.fragments[i_frag].ix] + ) ctr_fragtype += 1 return atoms_fragtypes - def calc_molecular_rdf(self, n_traj_split=10, n_prune=1, interval_indices=None, n_bins=200, n_smooth=2, max_mols=5000): - ''' + def calc_molecular_rdf( + self, + n_traj_split=10, + n_prune=1, + interval_indices=None, + n_bins=200, + n_smooth=2, + max_mols=5000, + ): + """ Calculates the radial distribution functions between for each unique pair of molecule types as a function of their center of mass distance. interval_indices: 2D array specifying the groups of the n_traj_split intervals to be averaged max_mols: the maximum number of molecules per bead group for calculating the rdf, for efficiency purposes. 5k was set after > 50k was giving problems. Should do further testing to see where the appropriate limit should be set. - ''' + """ + def get_rdf_avg(rdf_results_tmp, rdf_results, interval_indices, n_frames_split): - split_weights = n_frames_split[np.array(interval_indices)] / np.sum(n_frames_split[np.array(interval_indices)]) + split_weights = n_frames_split[np.array(interval_indices)] / np.sum( + n_frames_split[np.array(interval_indices)] + ) assert abs(np.sum(split_weights) - 1.0) < 1e-6 - rdf_values_avg = split_weights[0] * rdf_results_tmp['value'][interval_indices[0]] + rdf_values_avg = ( + split_weights[0] * rdf_results_tmp["value"][interval_indices[0]] + ) for i_interval, interval in enumerate(interval_indices[1:]): - assert (rdf_results_tmp['types'][interval] == rdf_results_tmp['types'][interval - 1]) - assert (rdf_results_tmp['variables_name'][interval] == rdf_results_tmp['variables_name'][interval - 1]) - assert (rdf_results_tmp['bins'][interval] == rdf_results_tmp['bins'][interval - 1]).all() - rdf_values_avg += split_weights[i_interval + 1] * rdf_results_tmp['value'][interval] - rdf_results['types'].append(rdf_results_tmp['types'][interval_indices[0]]) - rdf_results['variables_name'].append(rdf_results_tmp['variables_name'][interval_indices[0]]) - rdf_results['bins'].append(rdf_results_tmp['bins'][interval_indices[0]]) - rdf_results['value'].append(rdf_values_avg) - rdf_results['frame_start'].append(int(rdf_results_tmp['frame_start'][interval_indices[0]])) - rdf_results['frame_end'].append(int(rdf_results_tmp['frame_end'][interval_indices[-1]])) + assert ( + rdf_results_tmp["types"][interval] + == rdf_results_tmp["types"][interval - 1] + ) + assert ( + rdf_results_tmp["variables_name"][interval] + == rdf_results_tmp["variables_name"][interval - 1] + ) + assert ( + rdf_results_tmp["bins"][interval] + == rdf_results_tmp["bins"][interval - 1] + ).all() + rdf_values_avg += ( + split_weights[i_interval + 1] * rdf_results_tmp["value"][interval] + ) + rdf_results["types"].append(rdf_results_tmp["types"][interval_indices[0]]) + rdf_results["variables_name"].append( + rdf_results_tmp["variables_name"][interval_indices[0]] + ) + rdf_results["bins"].append(rdf_results_tmp["bins"][interval_indices[0]]) + rdf_results["value"].append(rdf_values_avg) + rdf_results["frame_start"].append( + int(rdf_results_tmp["frame_start"][interval_indices[0]]) + ) + rdf_results["frame_end"].append( + int(rdf_results_tmp["frame_end"][interval_indices[-1]]) + ) if self.universe is None: return trajectory = self.universe.trajectory[0] if self.universe.trajectory else None - dimensions = getattr(trajectory, 'dimensions', None) if trajectory else None + dimensions = getattr(trajectory, "dimensions", None) if trajectory else None if dimensions is None: return @@ -232,147 +288,173 @@ def get_rdf_avg(rdf_results_tmp, rdf_results, interval_indices, n_frames_split): for i_moltype, moltype in enumerate(moltypes): if bead_groups[moltype]._nbeads > max_mols: del_list.append(i_moltype) - self.logger.warning('The number of molecules of exceeds the maximum of for calculating the rdf. Skipping this molecule type.') + self.logger.warning( + "The number of molecules of exceeds the maximum of for calculating the rdf. Skipping this molecule type." + ) moltypes = np.delete(moltypes, del_list) min_box_dimension = np.min(self.universe.trajectory[0].dimensions[:3]) max_rdf_dist = min_box_dimension / 2 rdf_results = {} - rdf_results['n_smooth'] = n_smooth - rdf_results['types'] = [] - rdf_results['variables_name'] = [] - rdf_results['bins'] = [] - rdf_results['value'] = [] - rdf_results['frame_start'] = [] - rdf_results['frame_end'] = [] + rdf_results["n_smooth"] = n_smooth + rdf_results["types"] = [] + rdf_results["variables_name"] = [] + rdf_results["bins"] = [] + rdf_results["value"] = [] + rdf_results["frame_start"] = [] + rdf_results["frame_end"] = [] for i, moltype_i in enumerate(moltypes): for j, moltype_j in enumerate(moltypes): if j > i: continue - elif i == j and bead_groups[moltype_i].positions.shape[0] == 1: # skip if only 1 mol in group + elif ( + i == j and bead_groups[moltype_i].positions.shape[0] == 1 + ): # skip if only 1 mol in group continue if i == j: exclusion_block = (1, 1) # remove self-distance else: exclusion_block = None - pair_type = moltype_i + '-' + moltype_j + pair_type = moltype_i + "-" + moltype_j rdf_results_tmp = {} - rdf_results_tmp['types'] = [] - rdf_results_tmp['variables_name'] = [] - rdf_results_tmp['bins'] = [] - rdf_results_tmp['value'] = [] - rdf_results_tmp['frame_start'] = [] - rdf_results_tmp['frame_end'] = [] + rdf_results_tmp["types"] = [] + rdf_results_tmp["variables_name"] = [] + rdf_results_tmp["bins"] = [] + rdf_results_tmp["value"] = [] + rdf_results_tmp["frame_start"] = [] + rdf_results_tmp["frame_end"] = [] for i_interval in range(n_traj_split): - rdf_results_tmp['types'].append(pair_type) - rdf_results_tmp['variables_name'].append(['distance']) - rdf = MDA_RDF.InterRDF(bead_groups[moltype_i], bead_groups[moltype_j], - range=(0, max_rdf_dist), exclusion_block=exclusion_block, - nbins=n_bins).run(frames_start[i_interval], frames_end[i_interval], n_prune) - rdf_results_tmp['frame_start'].append(frames_start[i_interval]) - rdf_results_tmp['frame_end'].append(frames_end[i_interval]) - - rdf_results_tmp['bins'].append(rdf.results.bins[int(n_smooth / 2):-int(n_smooth / 2)] * ureg.angstrom) - rdf_results_tmp['value'].append(np.convolve( - rdf.results.rdf, np.ones((n_smooth,)) / n_smooth, - mode='same')[int(n_smooth / 2):-int(n_smooth / 2)]) + rdf_results_tmp["types"].append(pair_type) + rdf_results_tmp["variables_name"].append(["distance"]) + rdf = MDA_RDF.InterRDF( + bead_groups[moltype_i], + bead_groups[moltype_j], + range=(0, max_rdf_dist), + exclusion_block=exclusion_block, + nbins=n_bins, + ).run(frames_start[i_interval], frames_end[i_interval], n_prune) + rdf_results_tmp["frame_start"].append(frames_start[i_interval]) + rdf_results_tmp["frame_end"].append(frames_end[i_interval]) + + rdf_results_tmp["bins"].append( + rdf.results.bins[int(n_smooth / 2) : -int(n_smooth / 2)] + * ureg.angstrom + ) + rdf_results_tmp["value"].append( + np.convolve( + rdf.results.rdf, + np.ones((n_smooth,)) / n_smooth, + mode="same", + )[int(n_smooth / 2) : -int(n_smooth / 2)] + ) for interval_group in interval_indices: - get_rdf_avg(rdf_results_tmp, rdf_results, interval_group, n_frames_split) + get_rdf_avg( + rdf_results_tmp, rdf_results, interval_group, n_frames_split + ) return rdf_results @property def with_trajectory(self): - ''' + """ True if trajectory is present. - ''' - return self.universe.trajectory is not None and len(self.universe.trajectory) > 0 + """ + return ( + self.universe.trajectory is not None and len(self.universe.trajectory) > 0 + ) def get_frame(self, frame_index): - ''' + """ Returns the frame in the trajectory with index frame_index. - ''' + """ try: return self.universe.trajectory[frame_index] except Exception as e: - self.logger.warning('Error accessing frame.', exc_info=e) + self.logger.warning("Error accessing frame.", exc_info=e) def get_n_atoms(self, frame_index): - ''' + """ Returns the number of atoms of the frame with index frame_index. - ''' + """ frame = self.get_frame(frame_index) return len(frame) if frame is not None else None def get_atom_labels(self, frame_index): - ''' + """ Returns the number of atoms of the frame with index frame_index. - ''' + """ # MDAnalysis assumes no change in atom configuration - return [guess_atom_element(name).title() for name in self.get('atoms_info', {}).get('names', [])] + return [ + guess_atom_element(name).title() + for name in self.get("atoms_info", {}).get("names", []) + ] def get_time(self, frame_index): - ''' + """ Returns the elapsed simulated physical time since the start of the simulation for index frame_index. - ''' + """ frame = self.get_frame(frame_index) return frame.time * ureg.picosecond if frame is not None else None def get_step(self, frame_index): - ''' + """ Returns the step of the frame with index frame_index. - ''' + """ frame = self.get_frame(frame_index) - dt = frame.dt if frame.dt else getattr(self.universe.trajectory, 'dt') + dt = frame.dt if frame.dt else getattr(self.universe.trajectory, "dt") if not dt: return if frame: return round(frame.time / dt) def get_lattice_vectors(self, frame_index): - ''' + """ Returns the lattice vectors of the frame with index frame_index. - ''' + """ lattice_vectors = self.get_frame(frame_index).triclinic_dimensions return lattice_vectors * ureg.angstrom if lattice_vectors is not None else None def get_pbc(self, frame_index): - ''' + """ Returns the lattice periodicity of the frame with index frame_index. - ''' + """ lattice_vectors = self.get_lattice_vectors(frame_index) return [True] * 3 if lattice_vectors is not None else [False] * 3 def get_positions(self, frame_index): - ''' + """ Returns the positions of the atoms of the frame with index frame_index. - ''' + """ frame = self.get_frame(frame_index) return frame.positions * ureg.angstrom if frame.has_positions else None def get_velocities(self, frame_index): - ''' + """ Returns the velocities of the atoms of the frame with index frame_index. - ''' + """ frame = self.get_frame(frame_index) - return frame.velocities * ureg.angstrom / ureg.ps if frame.has_velocities else None + return ( + frame.velocities * ureg.angstrom / ureg.ps if frame.has_velocities else None + ) def get_forces(self, frame_index): - ''' + """ Returns the forces on the atoms of the frame with index frame_index. - ''' + """ frame = self.get_frame(frame_index) - return frame.forces * ureg.kJ / (MOL * ureg.angstrom) if frame.has_forces else None + return ( + frame.forces * ureg.kJ / (MOL * ureg.angstrom) if frame.has_forces else None + ) def get_interactions(self): - interactions = self.get('interactions', None) + interactions = self.get("interactions", None) if interactions is not None: return interactions - interaction_types = ['angles', 'bonds', 'dihedrals', 'impropers'] + interaction_types = ["angles", "bonds", "dihedrals", "impropers"] interactions = [] for interaction_type in interaction_types: try: @@ -383,55 +465,60 @@ def get_interactions(self): for i in range(len(interaction)): interactions.append( dict( - atom_labels=list(interaction[i].type), parameters=float(interaction[i].value()), - atom_indices=interaction[i].indices, type=interaction[i].btype + atom_labels=list(interaction[i].type), + parameters=float(interaction[i].value()), + atom_indices=interaction[i].indices, + type=interaction[i].btype, ) ) - self._results['interactions'] = interactions + self._results["interactions"] = interactions return interactions def __calc_diffusion_constant(self, times: NDArray, values: NDArray, dim: int = 3): - ''' + """ Determines the diffusion constant from a fit of the mean squared displacement vs. time according to the Einstein relation. - ''' + """ linear_model = linregress(times, values) slope = linear_model.slope error = linear_model.rvalue return slope * 1 / (2 * dim), error def calc_molecular_mean_squared_displacements(self, max_mols=5000): - ''' + """ Calculates the mean squared displacement for the center of mass of each molecule type. max_mols: the maximum number of molecules per bead group for calculating the msd, for efficiency purposes. 1M is arbitrary, 50k was tested and is very fast and does not seem to have any memory issues. - ''' + """ def mean_squared_displacement(start: np.ndarray, current: np.ndarray): """ Calculates mean square displacement between current and initial (start) coordinates. """ vec = start - current - return (vec ** 2).sum(axis=1).mean() + return (vec**2).sum(axis=1).mean() if self.universe is None: return trajectory = self.universe.trajectory[0] if self.universe.trajectory else None - dimensions = getattr(trajectory, 'dimensions', None) if trajectory else None + dimensions = getattr(trajectory, "dimensions", None) if trajectory else None if dimensions is None: return n_frames = self.universe.trajectory.n_frames if n_frames < 50: - self.logger.warning('At least 50 frames required to calculate molecular' # noqa: PLE1205 - ' mean squared displacements, skipping.', UserWarning) + self.logger.warning( + "At least 50 frames required to calculate molecular" # noqa: PLE1205 + " mean squared displacements, skipping.", + UserWarning, + ) return - dt = getattr(self.universe.trajectory, 'dt') + dt = getattr(self.universe.trajectory, "dt") if dt is None: return times = np.arange(n_frames) * dt @@ -446,42 +533,66 @@ def mean_squared_displacement(start: np.ndarray, current: np.ndarray): if bead_groups[moltype]._nbeads > max_mols: try: # select max_mols nr. of rnd molecules from this moltype - moltype_indices = np.array([atom._ix for atom in bead_groups[moltype]._atoms]) + moltype_indices = np.array( + [atom._ix for atom in bead_groups[moltype]._atoms] + ) molnums = self.universe.atoms.molnums[moltype_indices] molnum_types = np.unique(molnums) - molnum_types_rnd = np.sort(np.random.choice(molnum_types, size=max_mols)) - atom_indices_rnd = moltype_indices[np.concatenate([np.where(molnums == molnum)[0] for molnum in molnum_types_rnd])] - selection = ' '.join([str(i) for i in atom_indices_rnd]) - selection = f'index {selection}' + molnum_types_rnd = np.sort( + np.random.choice(molnum_types, size=max_mols) + ) + atom_indices_rnd = moltype_indices[ + np.concatenate( + [ + np.where(molnums == molnum)[0] + for molnum in molnum_types_rnd + ] + ) + ] + selection = " ".join([str(i) for i in atom_indices_rnd]) + selection = f"index {selection}" ags_moltype_rnd = self.universe.select_atoms(selection) - bead_groups[moltype] = BeadGroup(ags_moltype_rnd, compound='fragments') - self.logger.warning('Maximum number of molecules for calculating the msd has been reached.' - ' Will make a random selection for calculation.') + bead_groups[moltype] = BeadGroup( + ags_moltype_rnd, compound="fragments" + ) + self.logger.warning( + "Maximum number of molecules for calculating the msd has been reached." + " Will make a random selection for calculation." + ) except Exception: - self.logger.warning('Tried to select random molecules for large group when calculating msd, but something went wrong. Skipping this molecule type.') + self.logger.warning( + "Tried to select random molecules for large group when calculating msd, but something went wrong. Skipping this molecule type." + ) del_list.append(i_moltype) moltypes = np.delete(moltypes, del_list) msd_results = {} - msd_results['value'] = [] - msd_results['times'] = [] - msd_results['diffusion_constant'] = [] - msd_results['error_diffusion_constant'] = [] + msd_results["value"] = [] + msd_results["times"] = [] + msd_results["diffusion_constant"] = [] + msd_results["error_diffusion_constant"] = [] for moltype in moltypes: positions = self.get_nojump_positions(bead_groups[moltype]) - results = shifted_correlation_average(mean_squared_displacement, times, positions) - msd_results['value'].append(results[1]) - msd_results['times'].append(results[0]) + results = shifted_correlation_average( + mean_squared_displacement, times, positions + ) + msd_results["value"].append(results[1]) + msd_results["times"].append(results[0]) diffusion_constant, error = self.__calc_diffusion_constant(*results) - msd_results['diffusion_constant'].append(diffusion_constant) - msd_results['error_diffusion_constant'].append(error) - - msd_results['types'] = moltypes - msd_results['times'] = np.array(msd_results['times']) * ureg.picosecond - msd_results['value'] = np.array(msd_results['value']) * ureg.angstrom**2 - msd_results['diffusion_constant'] = (np.array( - msd_results['diffusion_constant']) * ureg.angstrom**2 / ureg.picosecond) - msd_results['error_diffusion_constant'] = np.array(msd_results['error_diffusion_constant']) + msd_results["diffusion_constant"].append(diffusion_constant) + msd_results["error_diffusion_constant"].append(error) + + msd_results["types"] = moltypes + msd_results["times"] = np.array(msd_results["times"]) * ureg.picosecond + msd_results["value"] = np.array(msd_results["value"]) * ureg.angstrom**2 + msd_results["diffusion_constant"] = ( + np.array(msd_results["diffusion_constant"]) + * ureg.angstrom**2 + / ureg.picosecond + ) + msd_results["error_diffusion_constant"] = np.array( + msd_results["error_diffusion_constant"] + ) return msd_results @@ -489,11 +600,11 @@ def parse_jumps(self, selection): __ = self.universe.trajectory[0] prev = np.array(selection.positions) box = self.universe.trajectory[0].dimensions[:3] - sparse_data = namedtuple('SparseData', ['data', 'row', 'col']) + sparse_data = namedtuple("SparseData", ["data", "row", "col"]) jump_data = ( - sparse_data(data=array('b'), row=array('l'), col=array('l')), - sparse_data(data=array('b'), row=array('l'), col=array('l')), - sparse_data(data=array('b'), row=array('l'), col=array('l')) + sparse_data(data=array("b"), row=array("l"), col=array("l")), + sparse_data(data=array("b"), row=array("l"), col=array("l")), + sparse_data(data=array("b"), row=array("l"), col=array("l")), ) for i_frame, _ in enumerate(self.universe.trajectory[1:]): @@ -501,7 +612,7 @@ def parse_jumps(self, selection): delta = ((curr - prev) / box).round().astype(np.int8) prev = np.array(curr) for d in range(3): - col, = np.where(delta[:, d] != 0) + (col,) = np.where(delta[:, d] != 0) jump_data[d].col.extend(col) jump_data[d].row.extend([i_frame] * len(col)) jump_data[d].data.extend(delta[col, d]) @@ -514,7 +625,8 @@ def generate_nojump_matrices(self, selection): M = selection.positions.shape[0] nojump_matrices = tuple( - sparse.csr_matrix((np.array(m.data), (m.row, m.col)), shape=(N, M)) for m in jump_data + sparse.csr_matrix((np.array(m.data), (m.row, m.col)), shape=(N, M)) + for m in jump_data ) return nojump_matrices @@ -524,14 +636,19 @@ def get_nojump_positions(self, selection): nojump_positions = [] for i_frame, __ in enumerate(self.universe.trajectory): - delta = np.array(np.vstack( - [m[:i_frame, :].sum(axis=0) for m in nojump_matrices] - ).T) * box + delta = ( + np.array( + np.vstack([m[:i_frame, :].sum(axis=0) for m in nojump_matrices]).T + ) + * box + ) nojump_positions.append(selection.positions - delta) return np.array(nojump_positions) def clean(self): for name in os.listdir(self.maindir): - if name.startswith('.') and (name.endswith('.lock') or name.endswith('.npz')): + if name.startswith(".") and ( + name.endswith(".lock") or name.endswith(".npz") + ): os.remove(os.path.join(self.maindir, name))