In [73]:
from lark import Lark, Token, Tree, Visitor, Transformer
from pathlib import Path
from shutil import which

from topchem.gromacs.top.parser import parse_topology, _top_parser
from collections import defaultdict, namedtuple

In [2]:
def void_factory():
    def none():
        pass
    return none

In [74]:
def ignore(*args, **kwargs):
    pass

SyntaxError: incomplete input (398721952.py, line 3)

In [4]:
d = defaultdict(void_factory)

In [5]:
d["a"]()

In [6]:
def tokens_with_level(tree, level=0):
    def process_token(token):
        ttype = token.type
        val = token.value
        return level, ttype, val
    if isinstance(tree, Token):
        tokens = [process_token(tree)]
    elif isinstance(tree, Tree):
        tokens = [(level, "+", tree.data)]
        for child in tree.children:
            tokens.extend(tokens_with_level(child, level + 1))
    else:
        tokens = [(level, str(type(tree)), tree)]
    return tokens

In [312]:
def show_tree(tree):
    string = ""
    for level, t, v in tokens_with_level(tree):
        if t != "+":
            t = f"{t:<12}"
        string += f"{'  '*level}{t} {v}\n"
    return string

In [313]:
tree = parse_topology("/Users/iakovlevda/_pmx/gromacs-2021/build/package/gromacs/share/gromacs/top/amber99sb-ildn.ff/ffnonbonded.itp")

In [314]:
t = tree.children[19].children[0]

In [315]:
t

Token('ID', 'HC')

In [316]:
t.type

'ID'

In [317]:
Token(type="COMMENT", value=";sdf")

Token('COMMENT', ';sdf')

In [318]:
show_tree(tree)

'+ topology\n  + section_header\n    CNAME        atomtypes\n  + block_comment\n    COMMENT      ; name      at.num  mass     charge ptype  sigma      epsilon\n  + entry\n    ID           Br\n    SIGNED_INT   35\n    SIGNED_FLOAT 79.90\n    SIGNED_FLOAT 0.0000\n    ID           A\n    SIGNED_FLOAT 3.95559e-01\n    SIGNED_FLOAT 1.33888e+00\n    COMMENT      ; Converted from parm99.dat \n  + entry\n    ID           C\n    SIGNED_INT   6\n    SIGNED_FLOAT 12.01\n    SIGNED_FLOAT 0.0000\n    ID           A\n    SIGNED_FLOAT 3.39967e-01\n    SIGNED_FLOAT 3.59824e-01\n  + entry\n    ID           CA\n    SIGNED_INT   6\n    SIGNED_FLOAT 12.01\n    SIGNED_FLOAT 0.0000\n    ID           A\n    SIGNED_FLOAT 3.39967e-01\n    SIGNED_FLOAT 3.59824e-01\n  + entry\n    ID           CB\n    SIGNED_INT   6\n    SIGNED_FLOAT 12.01\n    SIGNED_FLOAT 0.0000\n    ID           A\n    SIGNED_FLOAT 3.39967e-01\n    SIGNED_FLOAT 3.59824e-01\n  + entry\n    ID           CC\n    SIGNED_INT   6\n    SIGNED_FLOAT 

---

In [359]:
def gromacs_include():
    gromacs_exe = Path(which("gmx"))
    gromacs_tops = (gromacs_exe.parent.parent / "share/gromacs/top").absolute().resolve()
    if not gromacs_tops.exists():
        raise Exception(f"Gromacse share path {gromacs_tops.as_posix()} does not exists")
    return gromacs_tops

IfState = namedtuple("IfState", ["keep", "skip_until"])
    
class TopologyPreprocessor:
    def __init__(self, include_paths: list[str] = None, strict: bool = True, defines: dict = None, verbose: bool = True):
        if include_paths is None:
            include_paths = []
        self.include_paths = include_paths
        self.include_paths.append(gromacs_include())
        self.strict = strict
        if defines is None:
            self.defines = {}
        else:
            self.defines = defines
        self.verbose = verbose
        self.preprocessed_tree = Tree(
            data="topology",
            children=[],
        )
        self._if_state = [
            IfState(
                True,
                [],
            )                
        ]
        self._if_level = 0

    def _append_tree(self, tree):
        assert tree.data == "topology"
        self.preprocessed_tree.children.extend(tree.children)

    def _comment(self, comment: str):
        if not self.verbose:
            return
        comment_token = Token(
            type="COMMENT",
            value=f"; {' '.join(comment.splitlines())}"
        )
        if self.preprocessed_tree.children[-1].data == "block_comment":
            self.preprocessed_tree.children[-1].children.append(comment_token)
        else:
            self.preprocessed_tree.children.append(
                Tree(
                    data="block_comment",
                    children=[comment_token],
                )
            )

    def _process_include(self, record):
        path_token, *_ = record.children
        assert path_token.type == "PATH"
        
        include_suffix = Path(path_token[1:-1])
        include_candidates = [include_suffix] + [
            include_prefix / include_suffix
            for include_prefix in self.include_paths
        ]
        for include_path in include_candidates:
            if include_path.exists():
                break  # First occurence of the file to include
        if not include_path.exists():
            raise Exception
        
        self._comment(f">>> Included from {include_path.as_posix()}")
        print(f"> Include {include_path.as_posix()}")
        self.preprocess_topology(include_path)
        self._comment(f"<<< Included from {include_path.as_posix()}")

    def _process_define(self, record):
        assert record.data == "define"
        key, *values = record.children
        keep_tokens = ["ID", "SIGNED_INT", "SIGNED_FLOAT"]
        if self.verbose:
            keep_tokens.append("COMMENT")
        self.defines[key.value] = list(filter(
            lambda t: t.type in keep_tokens,
            values,
        ))

    def _process_undef(self, record):
        assert record.data == "undef"
        key, *_ = record.children
        key = key.value
        self.defines.pop(key, None)

    def _process_if(self, record):
        assert record.data in ["ifdef", "ifndef"]
        
        keep = self._if_state[self._if_level].keep and \
            (record.data == "ifdef" and record.children[0].value in self.defines) or \
            (record.data == "ifndef" and record.children[0].value not in self.defines)
        print(f"Check {record.children[0].value}, {keep}")
        self._if_level += 1
        if keep:  # condition satisfied
            self._if_state.append(
                IfState(keep, [])
            )
        else:
            self._if_state.append(
                IfState(keep, ["endif", "else", "ifdef", "ifndef"])
            )

    def _process_else(self, record):
        assert record.data == "else"
        if self._if_level < 1:
            raise Exception
        keep = not self._if_state[self._if_level].keep and self._if_state[self._if_level - 1].keep
        if keep:  # condition satisfied
            self._if_state[self._if_level] = IfState(keep, [])
        else:
            self._if_state[self._if_level] = IfState(keep, ["endif", "ifdef", "ifndef"])

    def _process_header(self, record):
        assert record.data == "section_header"
        values = record.children
        if not self.verbose:
            record.children = values[:1]
        self.preprocessed_tree.children.append(record)

    def _process_entry(self, record):
        assert record.data == "entry"
        processed_values = []
        for token in record.children:
            if not (self.verbose or token.type in ["ID", "SIGNED_INT", "SIGNED_FLOAT"]):
                continue
            if token.type == "ID" and token.value in self.defines:
                processed_values.extend(self.defines[token.value])
            else:
                processed_values.append(token)
        record.children = processed_values
        self.preprocessed_tree.children.append(record)
        
    def _process_endif(self, record):
        assert record.data == "endif"
        if self._if_level < 1:
            raise Exception
        self._if_state.pop(self._if_level)
        self._if_level -= 1

    def _keep_subtree(self, record):
        self.preprocessed_tree.children.append(record)

    def _keep_if_verbose(self, record):
        if self.verbose:
            self.preprocessed_tree.children.append(record)

    @property
    def entry_handlers(self):
        handlers = defaultdict(
            lambda: self._keep_subtree,
            {
                "include": self._process_include,
                "block_comment": self._keep_if_verbose,
                "preceeding": ignore,
                "define": self._process_define,
                "undef": self._process_undef,
                "ifndef": self._process_if,
                "ifdef": self._process_if,
                "else": self._process_else,
                "endif": self._process_endif,
                "section_header": self._process_header,
                "entry": self._process_entry,
            }
        )
        return handlers
    
    def preprocess_topology(self, path):

        tree = parse_topology(path)
        self.include_paths = [path.parent] + self.include_paths
        assert tree.data == "topology"
        for record in tree.children:
            if self._if_level:
                no_skip = self._if_state[self._if_level].skip_until
                
                if no_skip and record.data not in no_skip:
                    
                    continue
            self.entry_handlers[record.data](record)


In [363]:
preprocessor = TopologyPreprocessor(verbose=False, defines={"POSRES": [], "POSRES_WATER": []})

In [364]:
%%time
preprocessor.preprocess_topology(Path("topol.top"))

> Include /Users/iakovlevda/_pmx/gromacs-2021/build/package/gromacs/share/gromacs/top/amber99sb-ildn.ff/forcefield.itp
> Include /Users/iakovlevda/_pmx/gromacs-2021/build/package/gromacs/share/gromacs/top/amber99sb-ildn.ff/ffnonbonded.itp
> Include /Users/iakovlevda/_pmx/gromacs-2021/build/package/gromacs/share/gromacs/top/amber99sb-ildn.ff/ffbonded.itp
> Include topol_Protein_chain_A.itp
Check POSRES, True
> Include posre_Protein_chain_A.itp
> Include topol_DNA_chain_B.itp
Check POSRES, True
> Include posre_DNA_chain_B.itp
> Include topol_Ion_chain_A2.itp
Check POSRES, True
> Include posre_Ion_chain_A2.itp
> Include /Users/iakovlevda/_pmx/gromacs-2021/build/package/gromacs/share/gromacs/top/amber99sb-ildn.ff/tip3p.itp
Check FLEXIBLE, True
Check POSRES_WATER, True
> Include /Users/iakovlevda/_pmx/gromacs-2021/build/package/gromacs/share/gromacs/top/amber99sb-ildn.ff/ions.itp
CPU times: user 1.85 s, sys: 53.5 ms, total: 1.91 s
Wall time: 1.95 s


In [365]:
with open("tree.dump", "w") as dump:
    dump.write(show_tree(preprocessor.preprocessed_tree))