In [1]:
import uproot
import numpy as np
import pandas as pd
with uproot.open("mu3e_root_data/run42_bg.root") as file:
    sensor_positions = file["alignment/sensors"].arrays(library="pd")


In [2]:

def load_ragged_csv_to_ndarray(file_name: str, delimiter: str = ",", fill_value = -1, max_cols = 256, dtype = int) -> np.ndarray:
    rows = []
    row_lengths = []
    with open(file_name, 'r') as file:
        for line in file:
            # Split the line by the delimiter and strip whitespace
            row = np.array([value.strip() for value in line.strip().split(delimiter) if value != ''], dtype=dtype)
            # Ensure the row has at most max_cols elements
            if len(row) > max_cols:
                continue
            rows.append(row)
            row_lengths.append(len(row))
    # Convert the list of rows to a 2D NumPy array
    ragged_array = np.full((len(rows), max_cols), fill_value, dtype=dtype)
    for i, row in enumerate(rows):
        ragged_array[i, :len(row)] = row
    return ragged_array


import numpy as np

def reorder_nla(nla: np.ndarray, padding_value: int = -1) -> np.ndarray:
    """
    Reorders the NLA array to ensure that non-padded entries are at the beginning.
    Assumes padding is identifiable via `nla[:, :, 0] == padding_value`.
    """
    # Identify valid entries
    valid_mask = nla[:, :, 0] != padding_value

    # Compute number of valid entries per batch
    counts = valid_mask.sum(axis=1)

    # Flatten for easier fancy indexing
    B, N, D = nla.shape
    flat_nla = nla.reshape(B * N, D)
    flat_valid_mask = valid_mask.reshape(B * N)

    # Get indices of valid entries
    valid_indices = np.nonzero(flat_valid_mask)[0]

    # Allocate output
    reordered_nla = np.full_like(nla, padding_value)

    # Fill output using advanced indexing
    row_ids = np.repeat(np.arange(B), counts)
    group_counts = np.bincount(row_ids, minlength=B)

    # Compute start indices for placing data
    start_idx = np.zeros_like(group_counts)
    np.cumsum(group_counts[:-1], out=start_idx[1:])

    # Where to write valid entries in each row
    insert_pos = np.hstack([np.arange(c) for c in group_counts])
    reordered_nla[row_ids, insert_pos] = flat_nla[valid_indices]

    return reordered_nla



In [None]:
def convert_pid_to_location(pixel_id: np.ndarray, sensor_positions : pd.DataFrame, padding_value: float = -1, sensor_fault_rate = 0) -> np.ndarray:
    if sensor_positions["id"].empty:
        raise ValueError("sensor_positions DataFrame is empty. Please load the sensor positions data first.")
    if sensor_positions["vx"].empty or sensor_positions["vy"].empty or sensor_positions["vz"].empty:
        raise ValueError("sensor_positions DataFrame does not contain position columns (vx, vy, vz).")
    if sensor_positions["rowx"].empty or sensor_positions["rowy"].empty or sensor_positions["rowz"].empty:
        raise ValueError("sensor_positions DataFrame does not contain row columns (rowx, rowy, rowz).")
    if sensor_positions["colx"].empty or sensor_positions["coly"].empty or sensor_positions["colz"].empty:
        raise ValueError("sensor_positions DataFrame does not contain column columns (colx, coly, colz).")
    sensor_positions = sensor_positions.set_index("id", drop=False)
    hit_chip_id =  pixel_id // 2**16
    hit_col_id = (pixel_id // 2**8) % 2**8
    hit_row_id = pixel_id % 2**8
    location = np.full((*pixel_id.shape, 3), padding_value, dtype=np.float64)
    for sensor_id_iter in range(sensor_positions.shape[0]):
        if np.random.rand() < sensor_fault_rate:
            continue
        sensor_id = sensor_positions["id"].iloc[sensor_id_iter]
        if sensor_id // 2**12 != 0:
            continue
        mask = hit_chip_id == sensor_id
        if not np.any(mask):
            continue
        location[mask, 0] = sensor_positions["vx"].iloc[sensor_id_iter] + (sensor_positions["rowx"].iloc[sensor_id_iter] + 0.5)* hit_row_id[mask] + (sensor_positions["colx"].iloc[sensor_id_iter] + 0.5) * hit_col_id[mask]
        location[mask, 1] = sensor_positions["vy"].iloc[sensor_id_iter] + (sensor_positions["rowy"].iloc[sensor_id_iter] + 0.5)* hit_row_id[mask] + (sensor_positions["coly"].iloc[sensor_id_iter] + 0.5) * hit_col_id[mask]
        location[mask, 2] = sensor_positions["vz"].iloc[sensor_id_iter] + (sensor_positions["rowz"].iloc[sensor_id_iter] + 0.5)* hit_row_id[mask] + (sensor_positions["colz"].iloc[sensor_id_iter] + 0.5) * hit_col_id[mask]
    return location

In [4]:
DATA_DIR = "mu3e_trigger_data"
SIGNAL_DATA_FILE = f"{DATA_DIR}/run42_sig.csv"
BACKGROUND_DATA_FILE = f"{DATA_DIR}/run42_bg.csv"

bg_data = load_ragged_csv_to_ndarray(BACKGROUND_DATA_FILE, delimiter=",", fill_value=-1, max_cols=256, dtype=int)
sig_data = load_ragged_csv_to_ndarray(SIGNAL_DATA_FILE, delimiter=",", fill_value=-1, max_cols=256, dtype=int)

In [5]:
hit_number = ((bg_data // 2**16 // 2**12) == 0).sum(axis=-1)

In [6]:
bg_data_positions = convert_pid_to_location(bg_data, sensor_positions, padding_value=-1)
sig_data_positions = convert_pid_to_location(sig_data, sensor_positions, padding_value=-1)


In [10]:
bg_data_positions = reorder_nla(bg_data_positions, padding_value=-1)
sig_data_positions = reorder_nla(sig_data_positions, padding_value=-1)

In [11]:
np.save(f"{DATA_DIR}/run42_bg_positions.npy", bg_data_positions)
np.save(f"{DATA_DIR}/run42_sig_positions.npy", sig_data_positions)

In [9]:
def convert_pixel_id_to_nla(pixel_id: np.ndarray, padding_value: int = -1) -> np.ndarray:
    nla = np.full((*pixel_id.shape, 4), padding_value, dtype=np.int32)
    valid_mask = pixel_id != padding_value

    chip_id = pixel_id // 2**16
    station = chip_id // 2**12
    layer = ((chip_id // 2**10) % 4) + 1
    phi = ((chip_id // 2**5) % 2**5) + 1
    z_prime = chip_id % 2**5

    z = np.where(layer == 3, z_prime - 7, np.where(layer == 4, z_prime - 6, z_prime))

    station_mask = (station == 0)
    valid_mask = valid_mask & station_mask

    nla[valid_mask, 0] = station[valid_mask]
    nla[valid_mask, 1] = layer[valid_mask]
    nla[valid_mask, 2] = phi[valid_mask]
    nla[valid_mask, 3] = z[valid_mask]

    return nla

def reorder_nla(nla: np.ndarray, padding_value: int = -1) -> np.ndarray:
    """
    Reorders the NLA array to ensure, that the non-padded entries are at the beginning of the array.
    """
    reordered_nla = np.full_like(nla, padding_value, dtype=nla.dtype)
    valid_mask = nla[:, :, 0] != padding_value
    for i in range(nla.shape[0]):
        valid_entries = nla[i, valid_mask[i]]
        if valid_entries.size > 0:
            reordered_nla[i, :valid_entries.shape[0]] = valid_entries

    return reordered_nla


def convert_nla_to_location(nla: np.ndarray, padding_value: float = -1) -> np.ndarray:
    location = np.full((*nla.shape[:-1], 3), padding_value, dtype=np.float64)
    layer = nla[:, : , 1]
    phi = nla[:, : , 2]
    z = nla[:, : , 3]
    #### Define the paramters of the detector layers
    r_layer_1 = 23.3
    r_layer_2 = 29.8
    r_layer_3 = 73.9
    r_layer_4 = 86.3
    length_layer_1 = 124.7
    length_layer_2 = 124.7
    length_layer_3 = 351.9
    length_layer_4 = 372.6
    nz_layer_1 = 6
    nz_layer_2 = 6
    nz_layer_3 = 17
    nz_layer_4 = 18

    nphi_layer_1 = 8
    nphi_layer_2 = 10
    nphi_layer_3 = 24
    nphi_layer_4 = 28

    #### Calculate the z-coordinate in the detector
    location[layer == 1, 2] = ((z[layer == 1])/ nz_layer_1 - 0.5 ) * length_layer_1
    location[layer == 2, 2] = ((z[layer == 2])/ nz_layer_2 - 0.5 ) * length_layer_2
    location[layer == 3, 2] = ((z[layer == 3])/ nz_layer_3 - 0.5 ) * length_layer_3
    location[layer == 4, 2] = ((z[layer == 4])/ nz_layer_4 - 0.5 ) * length_layer_4

    #### Calculate the x-coordinate in the detector
    location[layer == 1, 0] = r_layer_1 * np.cos((phi[layer == 1]) / nphi_layer_1 * 2 * np.pi)
    location[layer == 2, 0] = r_layer_2 * np.cos((phi[layer == 2]) / nphi_layer_2 * 2 * np.pi)
    location[layer == 3, 0] = r_layer_3 * np.cos((phi[layer == 3]) / nphi_layer_3 * 2 * np.pi)
    location[layer == 4, 0] = r_layer_4 * np.cos((phi[layer == 4]) / nphi_layer_4 * 2 * np.pi)

    #### Calculate the y-coordinate in the detector
    location[layer == 1, 1] = r_layer_1 * np.sin((phi[layer == 1]) / nphi_layer_1 * 2 * np.pi)
    location[layer == 2, 1] = r_layer_2 * np.sin((phi[layer == 2]) / nphi_layer_2 * 2 * np.pi)
    location[layer == 3, 1] = r_layer_3 * np.sin((phi[layer == 3]) / nphi_layer_3 * 2 * np.pi)
    location[layer == 4, 1] = r_layer_4 * np.sin((phi[layer == 4]) / nphi_layer_4 * 2 * np.pi)

    return location


def load_ragged_csv_to_ndarray(file_name: str, delimiter: str = ",", fill_value = -1, max_cols = 256, dtype = int) -> np.ndarray:
    rows = []
    row_lengths = []
    with open(file_name, 'r') as file:
        for line in file:
            # Split the line by the delimiter and strip whitespace
            row = np.array([value.strip() for value in line.strip().split(delimiter) if value != ''], dtype=dtype)
            # Ensure the row has at most max_cols elements
            if len(row) > max_cols:
                continue
            rows.append(row)
            row_lengths.append(len(row))
    # Convert the list of rows to a 2D NumPy array
    ragged_array = np.full((len(rows), max_cols), fill_value, dtype=dtype)
    for i, row in enumerate(rows):
        ragged_array[i, :len(row)] = row
    return ragged_array

