# 1. DataClass

In [1]:
import re
import numpy as np
from typing import List
from abc import abstractmethod, ABCMeta


class DataClass(metaclass=ABCMeta):
    _test_line: str = "none"
    is_request_control: bool = False
    _fmt = None
    _slice = None

    @property
    @abstractmethod
    def patterns(self) -> List[re.Pattern]:
        pass

    @property
    @abstractmethod
    def data(self):
        pass

    def decode_line(self, line: str):
        if self.is_request_control:
            if self._check_is_endline(line=line):
                self.is_request_control = False
                self.data = np.array(self._tmp_data).astype(self._fmt)
                del self._tmp_data
            else:
                self._tmp_data.append(line.split()[self._slice])
        else:
            for pattern in self.patterns:
                if data := pattern.match(line):
                    self._inner_match_patterns(data=data)
                    break

    @abstractmethod
    def _inner_match_patterns(self, data):
        pass

    def _check_is_endline(self, line: str) -> bool:
        pass

In [2]:
class Cell(DataClass):
    patterns = [re.compile(r"\s+[^0-9]+\|\s+Cell lengths\s+\[ang\]\s+(?P<x>\S+)\s+(?P<y>\S+)\s+(?P<z>\S+)\s+")]
    data = np.zeros(3, dtype=float)
    _fmt = float

    def _inner_match_patterns(self, data):
        self.data = np.array([data["x"], data["y"], data["z"]]).astype(self._fmt)

In [3]:
class Energy(DataClass):
    patterns = [re.compile(r"\s+ENERGY\|\s+Total\s+FORCE_EVAL\s+\(\s+QS\s+\)\s+energy\s+\[a.u.\]\:\s+(?P<energy>\S+)\s+")]
    data = 0.0
    _fmt = float

    def _inner_match_patterns(self, data):
        self.data = np.array([data["energy"]]).astype(self._fmt)

In [4]:
class Atom(DataClass):
    patterns = [re.compile(r"\s+Atom\s+Kind\s+Element\s+X\s+Y\s+Z\s+Z\(eff\)\s+")]
    data = np.zeros([10], dtype=float)
    _fmt = "<U4"
    _slice = slice(2, 3)

    def _inner_match_patterns(self, data):
        self.is_request_control = True
        self._tmp_data = []

    def _check_is_endline(self, line: str) -> bool:
        return line.strip() == ""

In [5]:
class Force(DataClass):
    patterns = [re.compile("\s+\#\s+Atom\s+Kind\s+Element\s+X\s+Y\s+")]
    data = np.zeros([1, 3], dtype=float)
    _fmt = float
    _slice = slice(3, 6)

    def _inner_match_patterns(self, data):
        self.is_request_control = True
        self._tmp_data = []

    def _check_is_endline(self, line: str) -> bool:
        return line.startswith(" SUM OF ATOMIC")

In [6]:
class Stress(DataClass):
    patterns = [re.compile("\s+STRESS\|\s+x\s+y\s+")]
    data = np.zeros([1, 3], dtype=float)
    _fmt = float
    _slice = slice(2, 5)

    def _inner_match_patterns(self, data):
        self.is_request_control = True
        self._tmp_data = []

    def _check_is_endline(self, line: str) -> bool:
        return line.startswith(" STRESS| 1/3 Trace")

# 2. Server

In [7]:
class LogOpener:
    _end_patterns = ["\s+Extrapolation method:\s+ASPC\s+", "\s+\-\s+DBCSR STATISTICS\s+\-\s+"]

    def __init__(self, logfile: str) -> None:
        self._frame = -1
        self.logdata_generator = self._generate_data_from_logfile(logfile=logfile)
        self.nextframe()

    @property
    def data(self):
        return {key: dataclass.data for key, dataclass in self.dataclasses.items()}

    @property
    def end_patterns(self) -> List[re.Pattern]:
        return [re.compile(ep) for ep in self._end_patterns]

    @end_patterns.setter
    def end_patterns(self, patterns):
        self._end_patterns = patterns

    @property
    def frame(self) -> int:
        return self._frame

    def _generate_data_from_logfile(self, logfile: str):
        self.reset_dataclasses()
        with open(logfile, "r") as f:
            while this_line := f.readline():
                for end_pattern in self.end_patterns:
                    if end_pattern.match(this_line):
                        yield self.dataclasses
                for key, dataclass in self.dataclasses.items():
                    dataclass.decode_line(line=this_line)

    def reset_dataclasses(self) -> dict[str, DataClass]:
        self.dataclasses = {"cell": Cell(), "energy": Energy(), "atom": Atom(), "force": Force(), "stress": Stress()}

    def nextframe(self):
        self.dataclasses = next(self.logdata_generator)
        self._frame += 1

# 3. Trj Opener

In [8]:
from abc import ABCMeta, abstractmethod
from typing import TextIO


class TrjOpener(metaclass=ABCMeta):
    skip_head: int = 0

    def __init__(self, trjfile: str) -> None:
        self._frame = -1
        self._energy = None
        self._coords = None
        self.trjdata_generator = self._generate_trjdata(trjfile=trjfile)
        self.nextframe()

    @property
    def natoms(self) -> int:
        return int(self._natoms)

    @property
    def coords(self):
        return np.array(self._coords).astype(float)

    @property
    def energy(self):
        return np.array([self._energy]).astype(float)

    @property
    def frame(self) -> int:
        return self._frame

    def nextframe(self) -> None:
        self._frame += 1
        next(self.trjdata_generator)

    def _generate_trjdata(self, trjfile: str):
        with open(trjfile, "r") as f:
            [f.readline() for _ in range(self.skip_head)]
            while True:
                try:
                    yield self._inner_generate_trjdata(file=f)
                except:
                    break

    @abstractmethod
    def _inner_generate_trjdata(self, file: TextIO):
        pass

In [9]:
class XYZOpener(TrjOpener):
    def _inner_generate_trjdata(self, file):
        self._natoms = int(file.readline().strip())
        self._energy = file.readline().split()[-1]
        self._coords = [file.readline().split()[1:4] for _ in range(self._natoms)]

In [10]:
class PDBOpener(TrjOpener):
    skip_head = 2

    def _inner_generate_trjdata(self, file):
        self._energy = file.readline().split()[-1]
        file.readline()
        coords = []
        while this_line := file.readline():
            if this_line.startswith("END"):
                break
            coords.append(this_line.split()[3:6])
        self._coords = coords
        self._natoms = len(coords)
        del coords

# -1. TEST

In [11]:
import os
from tqdm import tqdm

trjopener: dict[str, type[TrjOpener]] = {
    "xyz": XYZOpener,
    "pdb": PDBOpener,
}


class CP2kBrewer(object):
    def __init__(self, logfile: str, trjfile: str, *, trjformat: str = "auto") -> None:
        self.log_opener = LogOpener(logfile=logfile)
        self.trjformat = self._check_trjformat(trjfile=trjfile, trjformat=trjformat)
        self.trj_opener = trjopener[self.trjformat](trjfile=trjfile)

    def _check_trjformat(self, trjfile, trjformat):
        if trjformat == "auto":
            return trjfile.split(".")[-1]
        return trjformat

    @property
    def frame(self) -> int:
        assert (
            self.log_opener.frame == self.trj_opener.frame
        ), f"Frame of Openers are different, LOG({self.log_opener.frame}) != TRJ({self.trj_opener.frame})"
        return self.log_opener.frame

    @property
    def data(self):
        _data = self.log_opener.data
        _data["coords"] = self.trj_opener.coords
        assert (
            abs(_data["energy"] - self.trj_opener.energy) < 1e-7
        ), f"Energy are different at Frame {self.frame}, LOG({_data['energy']}) != TRJ({self.trj_opener.energy})"
        return _data

    @property
    def gathered_data(self):
        if not hasattr(self, "_gathered_data"):
            self.gather()
        return self._gathered_data

    def gather(self, *, verbose: bool = True):
        if verbose:
            pbar = tqdm()
        _data = {key: [val] for key, val in self.data.items()}
        while True:
            try:
                self.nextframe()
                for key, val in self.data.items():
                    _data[key].append(val)
                if verbose:
                    pbar.update(n=1)
            except:
                break
        _data = {key: np.array(val) for key, val in _data.items()}
        self._gathered_data = _data
        return self._gathered_data

    def nextframe(self):
        self.log_opener.nextframe()
        self.trj_opener.nextframe()

In [12]:
if __name__ == "__main__":
    logfile = "/Users/minu/local_git/cp2kbrew/src/2022.2/npt/out.log"
    trjfile = "/Users/minu/local_git/cp2kbrew/src/2022.2/npt/TEST-pos-1.xyz"
    pdbfile = "../src/trj/test.pdb"

    cp2kbrewer = CP2kBrewer(logfile=logfile, trjfile=trjfile)
    print(f"FMT: {cp2kbrewer.trjformat}")
    print(f"TRJ OPENER: {cp2kbrewer.trj_opener}")
    print(f"TRJ OPENER nAtoms: {cp2kbrewer.trj_opener.natoms}")

FMT: xyz
TRJ OPENER: <__main__.XYZOpener object at 0x118f6e770>
TRJ OPENER nAtoms: 192


In [13]:
cp2kbrewer = CP2kBrewer(logfile=logfile, trjfile=trjfile)
data = cp2kbrewer.gather(verbose=True)

500it [00:01, 449.06it/s]


In [14]:
for key, val in data.items():
    print(f"{key:<8s}: {val.shape}")

cell    : (501, 3)
energy  : (501, 1)
atom    : (501, 192, 1)
force   : (501, 192, 3)
stress  : (501, 3, 3)
coords  : (501, 192, 3)
