In [None]:
import numpy as np
from pathlib import Path

def read_timestep(path: str | Path) -> np.ndarray:
    reader = iter(open(path, "r"))
    outs = []
    while True:
        l = next(reader, None)
        if l is None:
            break
        if "ITEM: TIMESTEP" in l:
            t = int(next(reader).strip())
            outs.append(t)
    return np.array(outs)

In [None]:
import re
def read_timestep_2(path: str | Path):
    with open(path, "r") as f:
        l = f.read()
        t = re.findall(r"ITEM: TIMESTEP\s+(\d+)", l)
        return np.array(t, dtype = int)

print(read_timestep_2("/Users/supercgor/Library/CloudStorage/OneDrive-個人/Documents/Data/simu/disordered-pbc/t120-traj1.pos"))

%timeit read_timestep_2("/Users/supercgor/Library/CloudStorage/OneDrive-個人/Documents/Data/simu/disordered-pbc/t120-traj1.pos")

In [5]:
import re
from pathlib import Path
from io import StringIO
from shppy.io import read_lammps_dump_text
# from ase.io.lammpsrun import read_lammps_dump_text
from ase.io.extxyz import read_xyz
path = "/Users/supercgor/Library/CloudStorage/OneDrive-個人/Documents/Data/simu/disordered-pbc/t120-traj1.xyz"

%timeit list(read_xyz(open(path, "r"), index = slice(0,-1, 2)))
# print(len(read_lammps_dump_text(path, slice(0,-1, 10))))
# print(read_lammps_dump_text(path, index = -1))

path = "/Users/supercgor/Library/CloudStorage/OneDrive-個人/Documents/Data/simu/disordered-pbc/t120-traj1.pos"

# read_lammps_dump_text(path, index = slice(0,-1, 100), sort_by_id = False)
%timeit read_lammps_dump_text(path, index = slice(0,-1, 2), sort_by_id = False)


11.4 s ± 174 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
6.83 s ± 93.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [5]:
import numpy as np
from pathlib import Path
from shppy.atom import Atoms

def read_pos(path: str | Path, index: int | slice = 0, sort_by_id: bool = True, type_map: dict[int, int] | None = None):
    reader = iter(open(path, "r"))
    
    end_det = (isinstance(index, int) and index < 0) or (isinstance(index, slice) and index.stop < 0)
    
    if not end_det and isinstance(index, slice):
        estimate_frames = len(range(index.start or 0, index.stop, index.step or 1))
        frames_ind = set(range(index.start or 0, index.stop, index.step or 1))
    elif not end_det and isinstance(index, int):
        estimate_frames = 1
        frames_ind = {index}
    elif end_det:
        estimate_frames = -1
        frames_ind = set()
    else:
        raise ValueError(f"Invalid index type: {type(index)}")
    
    frames = []
    
    now_index = 0
    
    while True:
        if not end_det and len(frames) >= estimate_frames:
            break
                
        l = next(reader, None)
        
        if l is None:
            break
        
        if "ITEM: TIMESTEP" in l:
            t = int(next(reader).strip())
        
        elif "ITEM: NUMBER OF ATOMS" in l:
            n_atoms = int(next(reader).strip())
        
        elif "ITEM: BOX BOUNDS" in l:
            bounds = l.strip()[17:].split()
            box_params = list(filter(lambda x: "xy" in x or "xz" in x or "yz" in x, bounds))
            box_bounds = list(filter(lambda x: "p" in x or "f" in x or "s" in x, bounds))
            cell = np.zeros((3, 3))
            pbcs = []
            if len(box_params) == 0:
                xlo, xhi = map(float, next(reader).strip().split())
                ylo, yhi = map(float, next(reader).strip().split())
                zlo, zhi = map(float, next(reader).strip().split())
                celldisp = np.array([xlo, ylo, zlo])
                cell = np.diag([xhi - xlo, yhi - ylo, zhi - zlo])
                
            elif len(box_params) == 3:
                xlo, xhi, xy = map(float, next(reader).strip().split())
                ylo, yhi, xz = map(float, next(reader).strip().split())
                zlo, zhi, yz = map(float, next(reader).strip().split())
                celldisp = np.array([xlo, ylo, zlo])
                cell = np.array([[xhi - xlo, 0, 0], 
                                 [xy, yhi - ylo, 0], 
                                 [xz, yz, zhi - zlo]])
            
            else:
                raise ValueError(f"Invalid box params: {box_params}")
            
            if len(box_bounds) == 3:
                for i in range(3):
                    if "p" in box_bounds[i]:
                        pbcs.append(True)
                    else:
                        pbcs.append(False)
            
            else:
                raise ValueError(f"Invalid box bounds: {box_bounds}")
        
        elif "ITEM: ATOMS" in l:
            if not end_det and now_index not in frames_ind:
                for _ in range(n_atoms):
                    next(reader)
                now_index += 1
                continue

            keys = l.strip()[12:].split()
            data = np.loadtxt(reader, dtype=str, max_rows=n_atoms)
            
            if "id" in keys:
                ids = data[:, keys.index("id")].astype(int)
                if sort_by_id:
                    sort_order = np.argsort(ids)
                    ids = ids[sort_order]
                    data = data[sort_order]
            else:
                ids = np.arange(n_atoms)
            
            if "type" in keys:
                types = data[:, keys.index("type")].astype(int)
                if type_map is not None:
                    types = np.vectorize(type_map.get)(types)
            else:
                types = np.zeros(n_atoms, dtype=int)

            if "x" in keys:
                positions = data[:, (keys.index("x"), keys.index("y"), keys.index("z"))].astype(float)
                scaled_positions = None
            elif "xu" in keys:
                positions = data[:, (keys.index("xu"), keys.index("yu"), keys.index("zu"))].astype(float)
                scaled_positions = None
            elif "xs" in keys:
                positions = None
                scaled_positions = data[:, (keys.index("xs"), keys.index("ys"), keys.index("zs"))].astype(float)
            elif "xsu" in keys:
                positions = None
                scaled_positions = data[:, (keys.index("xsu"), keys.index("ysu"), keys.index("zsu"))].astype(float)
            else:
                raise ValueError(f"Invalid keys: {keys}")
            
            if "vx" in keys:
                velocities = data[:, (keys.index("vx"), keys.index("vy"), keys.index("vz"))].astype(float)
            else:
                velocities = None
            if "q" in keys:
                charges = data[:, keys.index("q")].astype(float)
            else:
                charges = None
            if "fx" in keys:
                forces = data[:, (keys.index("fx"), keys.index("fy"), keys.index("fz"))].astype(float)
            else:
                forces = None
            if "c_q[1]" in keys:
                quaternions = data[:, (keys.index("c_q[1]"), keys.index("c_q[2]"), keys.index("c_q[3]"), keys.index("c_q[4]"))]
            else:
                quaternions = None
            
            atoms = Atoms(numbers=types,
                          positions=positions, 
                          cell=cell, 
                          pbc=pbcs, 
                          celldisp=celldisp, 
                          velocities=velocities, 
                          charges=charges,
                          scaled_positions=scaled_positions,
                        )
            atoms.info["timestep"] = t
            atoms.set_array("id", ids)

            if forces is not None:
                atoms.set_array("forces", forces)
            if quaternions is not None:
                atoms.set_array("quaternions", quaternions)
            frames.append(atoms)
            now_index += 1
        
        else:
            raise ValueError(f"Invalid line: {l}")
    
    if end_det:
        return frames[index]
    elif isinstance(index, int):
        return frames[0]
    else:
        return frames

read_pos(path, slice(-1))

[Atoms(symbols='H9216He18432', pbc=True, cell=[107.71446018869236, 93.28362572400246, 79.86611492255423], id=...),
 Atoms(symbols='H9216He18432', pbc=True, cell=[107.71446018869236, 93.28362572400246, 79.86611492255423], id=...),
 Atoms(symbols='H9216He18432', pbc=True, cell=[107.71446018869236, 93.28362572400246, 79.86611492255423], id=...),
 Atoms(symbols='H9216He18432', pbc=True, cell=[107.71446018869236, 93.28362572400246, 79.86611492255423], id=...),
 Atoms(symbols='H9216He18432', pbc=True, cell=[107.71446018869236, 93.28362572400246, 79.86611492255423], id=...),
 Atoms(symbols='H9216He18432', pbc=True, cell=[107.71446018869236, 93.28362572400246, 79.86611492255423], id=...),
 Atoms(symbols='H9216He18432', pbc=True, cell=[107.71446018869236, 93.28362572400246, 79.86611492255423], id=...),
 Atoms(symbols='H9216He18432', pbc=True, cell=[107.71446018869236, 93.28362572400246, 79.86611492255423], id=...),
 Atoms(symbols='H9216He18432', pbc=True, cell=[107.71446018869236, 93.2836257240