In [1]:
import json
import gzip
from pathlib import Path
from typing import Union

import awkward as ak
import numpy as np

import matplotlib.pyplot as plt

import xtrack as xt

In [2]:
def smart_open(filepath, mode="rt"):
    filepath = Path(filepath)  # Ensure it's a Path object
    if filepath.suffix == ".gz":
        return gzip.open(filepath, mode)
    else:
        return open(filepath, mode)

In [3]:
def parse_custom_data_file(filename):
    metadata = {}
    data_blocks = []
    current_block = None
    headers = []

    with smart_open(filename) as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            if line.startswith("@"):
                _, key, type_spec, *value = line.split(maxsplit=3)
                value = value[0] if value else None
                if type_spec == "%d":
                    metadata[key] = int(value)
                elif type_spec == "%f":
                    metadata[key] = float(value)
                elif type_spec == "%s":
                    metadata[key] = value.strip('"')
            elif line.startswith("*"):
                # Start of a new data block
                headers = line[1:].split()
                current_block = []
                data_blocks.append((headers, current_block))
            elif line.startswith("#"):
                continue  # skip comments
            else:
                # Data row
                if current_block is not None:
                    parts = line.split()
                    row = {}
                    for i, header in enumerate(headers):
                        value = parts[i]
                        # Simple type inference
                        try:
                            if "." in value:
                                row[header] = float(value)
                            else:
                                row[header] = int(value)
                        except ValueError:
                            row[header] = value
                    current_block.append(row)

    # Convert to awkward arrays
    awkward_blocks = [ak.Array(block) for _, block in data_blocks]
    return metadata, awkward_blocks

In [5]:
directory_coll = Path('/Users/lisepauwels/sps_simulations/MD_analysis/20250430') #Path('/eos/project-c/collimation-team/acquisition_data/SPS/20250430')
metadata, [monitor_data, corrector_data] = parse_custom_data_file(Path(directory_coll, 'ORBIT_SPSRING_2025-04-30_18-12-49_MD5_CY58_TR0.data.gz'))

In [7]:
line = xt.Line.from_json('../../injection_lines/sps_with_aperture_inj_q20_beam_sagitta.json')
tt = line.get_table()
tw = line.twiss()

Loading line from dict:   0%|          | 0/36381 [00:00<?, ?it/s]

Done loading line from dict.           


In [None]:
# h_pos = []
# h_rms = []
# h_name = []
# h_s = []
# for arr in monitor_data:
#     if arr['PLANE']=='H':
#         name = arr['NAME'].lower()
#         if name in line.element_dict:
#             h_pos.append(arr['POS'])
#             h_rms.append(arr['RMS'])
#             h_name.append(arr['NAME'])

#             rows = tt.rows[f'.*{name}.*']

#             if name in rows.name:
#                 h_s.append(rows.s[np.where(rows.name==name)[0][0]])
#             else:
#                 h_s.append(rows.s[np.where(rows.name==name+'..0')[0][0]])
#         else:
#             print(name)

bph.41608


In [8]:
from typing import Optional, Tuple, Union, Set, List
import re

class BPMBehavior(ak.Record):
    pass

class BPMCollectionBehavior(ak.Array):

    @property 
    def index(self):
        if not hasattr(self, '_index'):
            self._build_index()
        return self._index
    
    @property
    def bpms_hor(self):
        #return self["orbit", "name"]
        return ak.Array([bpm for i, bpm in enumerate(self['name']) if self['plane'][i] == 'H'])
    
    @property
    def bpms_ver(self):
        return ak.Array([bpm for i, bpm in enumerate(self['name']) if self['plane'][i] == 'V'])
    
    def orbit_plane(self, plane: str = "H"):
        if plane == 'H':
            return ak.Array([self['position'][self.index[bpm]] for bpm in self.bpms_hor])
        elif plane == 'V':
            return ak.Array([self['position'][self.index[bpm]] for bpm in self.bpms_ver])
        else:
            ValueError('No valid plane was given !')

    def rms_plane(self, plane: str = "H"):
        if plane == 'H':
            return ak.Array([self['rms'][self.index[bpm]] for bpm in self.bpms_hor])
        elif plane == 'V':
            return ak.Array([self['rms'][self.index[bpm]] for bpm in self.bpms_ver])
        else:
            ValueError('No valid plane was given !')

    
    def orbit_position(self, name: str):
        # Return position for a given BPM name
        return self['position'][self.index[name]]

    def restr_orbit_plane(self, bpm_list_restr):
        return ak.Array([self['position'][self.index[bpm]] for bpm in bpm_list_restr])

    def restr_rms_plane(self, bpm_list_restr):
        return ak.Array([self['rms'][self.index[bpm]] for bpm in bpm_list_restr])
    
    
    def _build_index(self):
        self._index = {}
        for i, bpm in enumerate(self['name']):
            self._index[bpm] = i
    
    
ak.behavior["*", "BPMCollection"] = BPMCollectionBehavior


In [9]:
class CorrectorsCollectionBehavior(ak.Array):
    @property 
    def index(self):
        if not hasattr(self, '_index'):
            self._build_index()
        return self._index
    
    @property
    def corr_hor(self):
        #return self["orbit", "name"]
        return ak.Array([corr for i, corr in enumerate(self['name']) if self['plane'][i] == 'H'])
    
    @property
    def corr_ver(self):
        return ak.Array([corr for i, corr in enumerate(self['name']) if self['plane'][i] == 'V'])
    
    def kick_plane(self, plane: str = "H"):
        if plane == 'H':
            return ak.Array([self['kick'][self.index[corr]] for corr in self.corr_hor])
        elif plane == 'V':
            return ak.Array([self['kick'][self.index[corr]] for corr in self.corr_ver])
        else:
            ValueError('No valid plane was given !')

    def kick_position(self, name: str):
        # Return position for a given BPM name
        return self['kick'][self.index[name]]

    def _build_index(self):
        self._index = {}
        for i, corr in enumerate(self['name']):
            self._index[corr] = i

ak.behavior["*", "CorrectorsCollection"] = CorrectorsCollectionBehavior

In [10]:
def load_bpm_file_data(path: Union[str, Path]) -> ak.Array:
    """
    Load a .data or .data.gz file into an Awkward Array with BPMCollection behavior.
    """

    #file opening with different compression
    path = Path(path)
    opener = gzip.open if path.suffix == ".gz" else open
    with opener(path, "rt") as f:
        lines = f.readlines()

    #Sections of file
    header_lines = []
    orbit_lines = []
    corrector_lines = []

    mode = "header"
    for line in lines:
        if line.startswith("# MONITOR"):
            mode = "orbit"
            continue
        elif line.startswith("# CORRECTOR"):
            mode = "corrector"
            continue

        if mode == "header":
            header_lines.append(line)
        elif mode == "orbit":
            if not line.startswith('*'):
                orbit_lines.append(line)
        elif mode == "corrector":
            if not line.startswith('*'):
                corrector_lines.append(line)

    #Metadata
    meta = {}  # <-- parse @ DATE, @ DPP, etc.
    for line in header_lines:
        if not line.startswith('@'):
            continue
        parts = line.strip().split(maxsplit=3)
        if len(parts) < 4:
            continue
        _, key, dtype, value = parts
        if dtype == "%d":
            value = int(value)
        elif dtype == "%f":
            value = float(value)
        elif dtype == "%s":
            value = value.strip('"')
        meta[key] = ak.Array([value])
    
    
    #Orbit
    orbit = {
        "name": [], "plane": [], "beam": [], "position": [],
        "rms": [], "sum": [], 'hw-status': [], 'status' : [], "status_tag": [],
    }
    # Fill orbit[...] from orbit_lines
    for line in orbit_lines:
        parts = line.strip().split(maxsplit=9)
        assert len(parts)==9
        for i, key in enumerate(list(orbit.keys())):
            value = parts[i]
            # Simple type inference
            try:
                if "." in value:
                    orbit[key].append(float(value))
                else:
                    orbit[key].append(int(value))
            except ValueError:
                if key == 'name':
                    orbit[key].append(value.lower())
                else:
                    orbit[key].append(value)
    
    for key in orbit:
        orbit[key]=ak.Array(orbit[key])
    
    #Correctors
    correctors = {
        "name": [], "plane": [], "beam": [], 'strength_name' : [], "kick": [], "rt_kick": [],
    }
    # Fill correctors[...] from corrector_lines
    for line in corrector_lines:
        parts = line.strip().split(maxsplit=6)
        assert len(parts)==6
        for i, key in enumerate(list(correctors.keys())):
            value = parts[i]
            # Simple type inference
            try:
                if "." in value:
                    correctors[key].append(float(value))
                else:
                    correctors[key].append(int(value))
            except ValueError:
                if key=='name':
                    correctors[key].append(value.lower())
                else:
                    correctors[key].append(value)
    
    for key in correctors:
        correctors[key]=ak.Array(correctors[key])

    #Total dictionary data, idk how to make awkward array out of it
    data = {
        "orbit": ak.Array(orbit, with_name='BPMCollection'),
        "correctors": ak.Array(correctors, with_name='CorrectorsCollection'),
        "meta": ak.Array(meta),
    }

    return data


In [11]:
data_test = load_bpm_file_data(Path(directory_coll, 'ORBIT_SPSRING_2025-04-30_12-53-55_MD5_CY2_TR0.data.gz'))

In [12]:
s_arr_H = []
s_arr_V = []
for i,bpm in enumerate(data_test['orbit']['name']):
    if len(tt.rows[f'.*{bpm}.*'].name)>0:
        if bpm in tt.rows[f'.*{bpm}.*'].name:
            idx = np.where(tt.rows[f'.*{bpm}.*'].name == bpm)[0][0]
            if data_test['orbit']['plane'][i]=='H':
                s_arr_H.append(tt.rows[f'.*{bpm}.*'].s[idx])
            else:
                s_arr_V.append(tt.rows[f'.*{bpm}.*'].s[idx])
        else:
            idx = np.where(tt.rows[f'.*{bpm}.*'].name == bpm+'..0')[0][0]
            if data_test['orbit']['plane'][i]=='H':
                s_arr_H.append(tt.rows[f'.*{bpm}.*'].s[idx] + line[f'{bpm}..0']._parent.length)
            else:
                s_arr_V.append(tt.rows[f'.*{bpm}.*'].s[idx] + line[f'{bpm}..0']._parent.length)
    else:
        print(bpm)

bph.41608


In [None]:
data_test

{'orbit': <BPMCollectionBehavior [{name: 'bph.10208', ...}, ..., {...}] type='227 * B...'>,
 'correctors': <CorrectorsCollectionBehavior [{name: 'mdh.10207', ...}, ..., {...}] type='...'>,
 'meta': <Array [{YASP: 'V3.0', TYPE: 'POSITION', ...}] type='1 * {YASP: string, TYP...'>}