In [4]:
from datetime import date
from typing import NamedTuple, List

class Action(NamedTuple):
    date: date
    action: str

class Run(NamedTuple):
    date: date
    run: int
    precursor: str
    co_reactant: str
    co_absorbate: str | None
    cycles: int
    process: str
    sequence: str
    T: int
    P: int

ACTIONS: List[Action] = []
RUNS: List[Run] = []

In [5]:
from datetime import datetime
import io
from parse import parse, Result, with_pattern
import string
from typing import cast, Callable, Tuple, Any

class Separator(NamedTuple):
    tile: str
    threshold: int
           
def find_parse_line(buffer: io.TextIOBase,
                    parsing_patterns: List[str],
                    before_separator: Separator | None = None,
                    accept_ws_pad: bool = False,
                    accept_case_insens: bool = True) -> dict:
    checkpoint = buffer.tell()

    def callback(line: str) -> Tuple[bool, Any]:
        result_dict = parse_to_dict_aliases(
            line=line.strip(),
            patterns=parsing_patterns,
            accept_ws_pad=accept_ws_pad,
            accept_case_insens=accept_case_insens)
        match = len(result_dict) != 0
        if match:
            result_dict["__buffer_position__"] = buffer.tell()
        
        return match, result_dict 

    line, result_dict = find_read_line(buffer, callback=callback, before_separator=before_separator)
    if not line or not result_dict:
        buffer.seek(checkpoint)
        print(f"{checkpoint}: Could not find line matching the patterns: {parsing_patterns}")
        return {}
    
    return result_dict

def occurs_ahead(buffer: io.TextIOBase, key: str, before_separator: Separator | None = None) -> bool:
    checkpoint = buffer.tell()

    def callback(line: str) -> Tuple[bool, Any]:
        return key in line, None

    line, _ = find_read_line(buffer, callback=callback, before_separator=before_separator)
    buffer.seek(checkpoint)
    
    if not line:
        if before_separator:
            print(f"{checkpoint}: Reached {before_separator} or EOF before finding key: {key}")
        else:
            print(f"{checkpoint}: Reached EOF before finding key: {key}")
        return False

    return True

def find_read_separator(buffer: io.TextIOBase, separator: Separator) -> int:
    checkpoint = buffer.tell()
    threshold_str = separator.tile * (separator.threshold - 1)

    def callback(line: str) -> Tuple[bool, Any]:
        buffer_position = -1
        match = line.startswith(threshold_str)
        if match:
            buffer_position = buffer.tell()
  
        return match, buffer_position

    line, buffer_position = find_read_line(buffer, callback=callback, before_separator=None)
    if not line or not buffer_position or buffer_position == -1:
        buffer.seek(checkpoint)
        print(f"{checkpoint}: Reached EOF before finding {separator}")
        return -1
    
    return buffer_position
    
def find_read_line(buffer: io.TextIOBase,
                   callback: Callable[[str], Tuple[bool, Any]],
                   before_separator: Separator | None = None) -> Tuple[str | None, Any]:
    checkpoint = buffer.tell()

    while True:
        line = read_nonws_line(buffer, before_separator=before_separator)
        if not line:
            buffer.seek(checkpoint)
            return None, None
        
        match, data = callback(line)
        if not match:
            continue 
        
        return line, data

def read_nonws_line(buffer: io.TextIOBase, before_separator: Separator | None = None) -> str | None: 
    checkpoint = buffer.tell()
    
    threshold_str = ""
    if before_separator:
        threshold_str = before_separator.tile * (before_separator.threshold - 1)

    while True:
        line = buffer.readline()
        if not line:
            print(f"{checkpoint}: Hit EOF when reading for a non-whitespace line")
            buffer.seek(checkpoint)
            return None

        if not line.strip():
            continue
        line = line.strip()

        if before_separator and line.startswith(threshold_str):
            print(f"{checkpoint}: Hit {before_separator} when reading for a non-whitespace line")
            buffer.seek(checkpoint)
            return None

        return line

def parse_to_dict_aliases(patterns: List[str], line: str, accept_ws_pad: bool = False, accept_case_insens: bool = True) -> dict:
    for pattern in patterns:
        result = parse_to_dict(line, pattern, accept_ws_pad, accept_case_insens)
        if len(result) != 0:
            return result
    return {}

def parse_to_dict(line: str, pattern: str, accept_ws_pad: bool = False, accept_case_insens: bool = True) -> dict:
    @with_pattern(r"\s*")
    def whitespace(text):
        return text
    
    @with_pattern(r".*?")
    def anything(text):
        return text

    if (accept_ws_pad):
        pattern = "{:ws}" + pattern

    result = parse(
        format=pattern,
        string=line,
        extra_types={"ws": whitespace, "any": anything},
        evaluate_result=True,
        case_sensitive=(not accept_case_insens)
    )
    if not result:
        return {}
    
    result = cast(Result, result) 
    
    formatter = string.Formatter()
    field_names = [fname for _, fname, _, _ in formatter.parse(pattern) if fname]

    # Check that all named fields are non-empty
    if not all(result.named.get(name) not in [None, ""] for name in field_names):
        print(f"A parameter was missing that fit {pattern}")
        return {}

    result_dict = result.named
    result_dict["__match_pattern__"] = pattern
    return result_dict

In [6]:
def parse_experiment_contents(buffer: io.TextIOBase, global_date: date):
    global_T: int
    global_P: int

    buffer.seek(0)
    
    print(f"{buffer.tell()}: PARSING GLOBALS")
    _result = find_parse_line(buffer,
        parsing_patterns=["T{:ws}={:ws}{T:g}{:ws}{T_unit:l}"],
        before_separator=Separator("_", 28),   
    )
    if not "T" in _result:
        raise ValueError(f"{buffer.tell()}: Could not find global temperature!")
    global_T = _result["T"]
    
    _result = find_parse_line(buffer,
        parsing_patterns=[
            "P{:ws}={:ws}{P:g}{:ws}{P_unit:l}",
            "Pressure{:ws}={:ws}{P:g}{:ws}{P_unit:l}"
        ],
        before_separator=Separator("_", 28),  
    )
    if not "P" in _result:
        raise ValueError(f"{buffer.tell()}: Could not find global pressure!")
    global_P = _result["P"]

    print(f"{buffer.tell()}: PARSING ACTIONS")
    while True:
        _checkpoint_pre_action = buffer.tell()
        
        if not find_read_separator(buffer, separator=Separator("_", 28)):
            raise ValueError(f"{_checkpoint_pre_action}: Could not find separator to action section!")

        _action_data_remains: bool = occurs_ahead(buffer, key="loaded", before_separator=None)
        _action_data_in_section: bool = occurs_ahead(buffer, key="loaded", before_separator=Separator("_", 28))
        _run_data_in_section: bool = occurs_ahead(buffer, key="cycles", before_separator=Separator("_", 28))

        if (not _action_data_remains):     
            buffer.seek(_checkpoint_pre_action)
            print(f"{_checkpoint_pre_action}: Couldn't find action data in rest of buffer,",
                  "moving on to re-parse this section as a run section")
            break 
            
        if (not _action_data_in_section):     
            print(f"{_checkpoint_pre_action}: Action data remains but couldn't find within section,",
                  "parsing next section as an action section")
            continue

        if (_run_data_in_section):
            buffer.seek(_checkpoint_pre_action)
            print(f"{_checkpoint_pre_action}: Conflicting tells for action and run data within section,"
                  "moving on to re-parse this section as a run section")
            break
            
        action_label = read_nonws_line(buffer, before_separator=Separator("_", 28))
        action_label = str(action_label) # cannot be None as action data remains
        # Parse load date
        if "loaded" in action_label:
            print(f"{buffer.tell()}:",
                  f"Detected load date in action label '{action_label}'")

            _label_segments = action_label.partition("loaded")
            action_label = _label_segments[0]
            _action_date_str = _label_segments[2].strip()
            
            _result = parse_to_dict_aliases(
                line=_action_date_str,
                patterns=[
                    "{month:d}/{day:d}/{year:d}",
                    "{month:d}/{day:d}",
                    "{year:4d}{month:2d}{day:2d}",
                ],
                accept_ws_pad=True,
            )
        else:
            _result = find_parse_line(buffer,
                parsing_patterns=[
                    "loaded{:ws}{month:d}/{day:d}/{year:d}",
                    "loaded{:ws}{month:d}/{day:d}",
                    "loaded{:ws}{year:4d}{month:2d}{day:2d}",
                ],
                before_separator=Separator("_", 28),
                accept_ws_pad=True,
            )
        
        if len(_result) == 0:
            raise ValueError(f"{buffer.tell()}, Action '{action_label}':",
                             "Failed to parse load date!")
        
        if "year" not in _result:
            _result["year"] = global_date.year

        action_date: date = datetime(
            year=_result["year"],
            month=_result["month"],
            day=_result["day"],
        )
        # Register Action
        this_action = Action(
            date=action_date,
            action=f"{action_label} loaded"
        )
        ACTIONS.append(this_action)
            
    print(f"{buffer.tell()}: PARSING RUNS")
    while True:
        _checkpoint_pre_run = buffer.tell()

        if find_read_separator(buffer, Separator("_", 28)) == -1:
            print(f"{_checkpoint_pre_run}: Couldn't find separator to run section, moving on")
            break 
        
        _run_data_remains: bool = occurs_ahead(buffer, key="cycles", before_separator=None)
        _run_data_in_section: bool = occurs_ahead(buffer, key="cycles", before_separator=Separator("_", 28))

        if (not _run_data_remains):   
            buffer.seek(_checkpoint_pre_run)
            print(f"{_checkpoint_pre_run}: Couldn't find run data in rest of buffer, moving on")
            break 
    
        if (not _run_data_in_section):     
            print(f"{_checkpoint_pre_run}: Run data remains but couldn't find within section,",
                "skipping to next section and parsing it as a run section")
            continue
    
        _result = find_parse_line(buffer,
            parsing_patterns=[
                "run{:ws}{number:d}",
                "run{:ws}#{number:d}"
            ],
            before_separator=Separator("_", 28),
        )
        if not "number" in _result:
            raise ValueError("Could not find run number for section containing run data")

        run_number = _result["number"]
        print(f"{buffer.tell()}: Parsing Run #{run_number}")

        _result = find_parse_line(buffer, 
            parsing_patterns=[
                "{cycles:d}{:ws}cycles{:ws}{precursor}|{co_reactant}",
                "{cycles:d}{:ws}cycles{:ws}{precursor}{:ws}and{:ws}{co_reactant}{:s}{co_absorbate}",
            ],
            before_separator=Separator("_", 28),
        )
        if len(_result) == 0:
            raise ValueError(f"{buffer.tell()}, Run #{run_number}:",
                             "Run supplied a run number but no recognizable cycles/precursor/coreactant")

        run_cycles = _result["cycles"]
        run_precursor = _result["precursor"]
        run_co_reactant = _result["co_reactant"]
        run_co_absorbate = None
        
        if "co_absorbate" in _result:
            run_co_absorbate = _result["co_absorbate"]

        run_process = read_nonws_line(buffer, before_separator=Separator("_", 28))
        if not run_process:
            raise ValueError(f"{buffer.tell()}, Run #{run_number}:",
                             "Did not find a line for run process as expected")
        
        _run_proc_partitions = run_process.count("|")
        if _run_proc_partitions == 0:
            raise ValueError(f"{buffer.tell()}, Run #{run_number}:",
                             f"Did not find a '|'-deliniated run process, rather '{run_process}'")

        run_sequence = read_nonws_line(buffer, before_separator=Separator("_", 28))
        if not run_sequence:
            raise ValueError(f"{buffer.tell()}, Run #{run_number}:",
                             "Did not find a line for run_sequence as expected")
        
        _run_seq_partitions = run_sequence.count("|")
        if _run_seq_partitions == 0:
            raise ValueError(f"{buffer.tell()}, Run #{run_number}:",
                             f"Did not find a '|'-deliniated run process, rather '{run_sequence}")

        if (_run_proc_partitions != _run_seq_partitions):
            raise ValueError(f"{buffer.tell()}, Run #{run_number}:",
                             f"Mismatch in # partitions between run process '{run_process}'",
                             f"and run sequence '{run_sequence}'")
        # Register Run
        this_run = Run(
            date=global_date,
            run=run_number,
            precursor=run_precursor,
            co_reactant=run_co_reactant,
            co_absorbate=run_co_absorbate,
            cycles=run_cycles,
            process=run_process,
            sequence=run_sequence,
            T=global_T,
            P=global_P,
        )
        RUNS.append(this_run)

In [7]:
import pandas as pd

def update_experiment_df(df: pd.DataFrame):
    for action in ACTIONS:
        action_row = pd.DataFrame([{
            'Date': int(action.date.strftime("%Y%m%d")),
            'Run/Action': action.action,
            'Precursor': None,
            'Co-reactant': None,
            'Co-absorbate': None,
            '# Cycles': None,
            'Process': None,
            'Sequence': None,
            'Furnace T (°C)': None,
            'P (Torr)': None,
            'Crystal substrate': None,
        }])

        df = pd.concat([df, action_row], ignore_index=True)

    for run in RUNS:
        _run_co_absorbate = ""
        if run.co_absorbate:
            _run_co_absorbate = " " + _run_co_absorbate

        run_row = pd.DataFrame([{
            'Date': int(run.date.strftime("%Y%m%d")),
            'Run/Action': run.run,
            'Precursor': run.precursor,
            'Co-reactant': run.co_reactant,
            'Co-absorbate': "-",
            '# Cycles': None,
            'Process': f"{run.cycles} cycles {run.process}{_run_co_absorbate}",
            'Sequence': run.sequence,
            'Furnace T (°C)': run.T,
            'P (Torr)': run.P,
            'Crystal substrate': None,
        }])

        df = pd.concat([df, run_row], ignore_index=True)

    df = df.sort_values(by=['Date', 'Run/Action'], ascending=[True, True])
    return df

In [8]:
from datetime import datetime
import os

def parse_experiment_file(path):
    f = open(path, 'r')
    fn = os.path.basename(path)
    
    date_str = fn[0:8]
    experiment_date: date = datetime.strptime(date_str, "%Y%m%d")
    
    parse_experiment_contents(f, experiment_date)

In [9]:
# parse_experiment_file('data/20240123.txt')
# parse_experiment_file('data/20240124.txt')
# parse_experiment_file('data/20240125.txt')
# parse_experiment_file('data/20220928_notes.txt')

import glob

for filename in glob.glob(os.path.join('data', '*.txt')):
   print(filename)
   parse_experiment_file(filename)

df = pd.DataFrame()
df = update_experiment_df(df)
df.to_excel('output.xlsx', index=False)

data\20220926_notes.txt
0: PARSING GLOBALS
56: PARSING ACTIONS
224: Hit Separator(tile='_', threshold=28) when reading for a non-whitespace line
186: Reached Separator(tile='_', threshold=28) or EOF before finding key: cycles
309: Hit Separator(tile='_', threshold=28) when reading for a non-whitespace line
256: Reached Separator(tile='_', threshold=28) or EOF before finding key: cycles
1513: Hit EOF when reading for a non-whitespace line
341: Reached EOF before finding key: loaded
688: Hit Separator(tile='_', threshold=28) when reading for a non-whitespace line
341: Reached Separator(tile='_', threshold=28) or EOF before finding key: loaded
309: Couldn't find action data in rest of buffer, moving on to re-parse this section as a run section
309: PARSING RUNS
349: Parsing Run #1
728: Parsing Run #2
1106: Parsing Run #3
1513: Hit EOF when reading for a non-whitespace line
1513: Reached EOF before finding key: cycles
1513: Hit EOF when reading for a non-whitespace line
1513: Reached Separ

  df = pd.concat([df, run_row], ignore_index=True)
