# NPZ Converter

This notebook converts legacy data to `.duckdb` databases to make easier to manipulate the data. The dataset is expected to have the following structure
- /dataset_name
    - file1.npz
    -  file2.npz
    - /references
        - file1.pic.gz
        - file2.pic.gz

The data in the npz files is mapped to the `data` table and the `refrences` files are mapped to the `model_refrences` table.

The `.npz` files must have the following structure:
- `etBins`: Array of floats with the et binning limits
- `etaBins`: Array of floats with the eta binning limits
- `etBinIdx`: Index of the bin used for this data
- `etaBinIdx`: Index of the bin used for this data
- `ordered_features`: Ignored
- `data_float`: 2D array of floats with the data
- `data_bool`: 2D array of bools with the data
- `data_int`: 2D array of ints with the data
- `data_object`: 2D array of objects with the data
- `features_float`: Array with the column names in `data_float`
- `features_bool`: Array with the column names in `data_bool`
- `features_int`: Array with the column names in `data_int`
- `features_object`: Array with the column names in `data_object`
- `target`: Array with the target
- `protocol`: Ignored
- `allow_pickle`: Ignored

The `.pic.gz` files must have the following structure:
- `class`: Ignored
- `__module`: Ignored
- `etBinIdx`: Array with 1 element representing the et bin used for this data
- `etBins`: Array of floats with the et binning limits
- `etaBinIdx`: Array with 1 element representing the eta bin used for this data
- `etaBins`: Array of floats with the eta binning limits
- `__version`: Ignored
- `bkgRef`: Dict
- `sgnRef`: Dict

In [1]:
from pathlib import Path
import pickle
import numpy as np
import polars as pl
import duckdb
import gzip
from io import BytesIO
from collections import defaultdict
from datetime import datetime

In [2]:
data_dir = Path.home() / 'data' / 'cern'
dataset_name = 'data17_13TeV.AllPeriods.sgn.probes_lhvloose_EGAM1.bkg.vprobes_vlhvloose_EGAM7.GRL_v97.25bins'
dataset_dir = data_dir / dataset_name
output_file = data_dir / f'{dataset_name}.duckdb'
references_dir = dataset_dir / 'references'
feature_types = ['int', 'float', 'object', 'bool']

In [3]:
def check_table_exists(con, table_name):
    """
    Checks if a table exists in the DuckDB database.

    Args:
        con: A DuckDB connection object.
        table_name: The name of the table to check.

    Returns:
        True if the table exists, False otherwise.
    """
    query = f"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '{table_name}')"
    result = con.execute(query).fetchone()[0]
    return result

In [4]:
dataframes = []
with duckdb.connect(str(output_file)) as con:
    for i, filepath in enumerate(dataset_dir.glob('*.npz')):
        print(f'{datetime.now().isoformat()} - Processing file {i + 1}: {filepath}')
        npz_file = np.load(filepath)
        feature_dfs = []
        for feature_type in feature_types:
            array_key = f'data_{feature_type}'
            schema_key = f'features_{feature_type}'
            feature_dfs.append(
                pl.from_numpy(
                    npz_file[array_key],
                    schema=npz_file[schema_key].tolist(),
                orient='row'
                )
            )
        aux_df = pl.concat(feature_dfs, how='horizontal')
        if i ==0 and not check_table_exists(con, 'data'):
            con.execute("CREATE TABLE IF NOT EXISTS data AS SELECT * FROM aux_df")
        else:
            con.execute("INSERT INTO data SELECT * FROM aux_df")
        del aux_df

2025-08-17T23:21:14.581434 - Processing file 1: /root/data/cern/data17_13TeV.AllPeriods.sgn.probes_lhvloose_EGAM1.bkg.vprobes_vlhvloose_EGAM7.GRL_v97.25bins/data17_13TeV.AllPeriods.sgn.probes_lhvloose_EGAM1.bkg.vprobes_vlhvloose_EGAM7.GRL_v97.25bins_et2_eta4.npz
2025-08-17T23:21:17.431725 - Processing file 2: /root/data/cern/data17_13TeV.AllPeriods.sgn.probes_lhvloose_EGAM1.bkg.vprobes_vlhvloose_EGAM7.GRL_v97.25bins/data17_13TeV.AllPeriods.sgn.probes_lhvloose_EGAM1.bkg.vprobes_vlhvloose_EGAM7.GRL_v97.25bins_et1_eta3.npz
2025-08-17T23:21:36.787462 - Processing file 3: /root/data/cern/data17_13TeV.AllPeriods.sgn.probes_lhvloose_EGAM1.bkg.vprobes_vlhvloose_EGAM7.GRL_v97.25bins/data17_13TeV.AllPeriods.sgn.probes_lhvloose_EGAM1.bkg.vprobes_vlhvloose_EGAM7.GRL_v97.25bins_et1_eta4.npz
2025-08-17T23:21:44.372246 - Processing file 4: /root/data/cern/data17_13TeV.AllPeriods.sgn.probes_lhvloose_EGAM1.bkg.vprobes_vlhvloose_EGAM7.GRL_v97.25bins/data17_13TeV.AllPeriods.sgn.probes_lhvloose_EGAM1.bkg.

: 

In [None]:
reference_schema = pl.Schema({
    'et_bin_lower': pl.Float32(),
    'et_bin_upper': pl.Float32(),
    'eta_bin_lower': pl.Float32(),
    'eta_bin_upper': pl.Float32(),
    'pid': pl.String(),
    'label': pl.UInt8(),
    'total': pl.Int64(),
    'passed': pl.Int64(),
    'reference': pl.String(),
})

In [None]:
table_data = defaultdict(list)
for i, filepath in enumerate(references_dir.glob('*.pic.gz')):
    # if i > 0:
    #     break
    print(f'Processing reference file {i + 1}: {filepath}')
    with gzip.open(filepath, 'rb') as f:
        data = pickle.load(BytesIO(f.read()))
    et_bin_idx = data['etBinIdx']
    eta_bin_idx = data['etaBinIdx']
    for label, data_type in enumerate(['bkgRef', 'sgnRef']):
        for pid, pid_data in data[data_type].items():
            table_data['et_bin_lower'].append(float(data['etBins'][et_bin_idx]))
            table_data['et_bin_upper'].append(float(data['etBins'][et_bin_idx + 1]))
            table_data['eta_bin_lower'].append(float(data['etaBins'][eta_bin_idx]))
            table_data['eta_bin_upper'].append(float(data['etaBins'][eta_bin_idx + 1]))
            table_data['pid'].append(str(pid))
            table_data['label'].append(int(label))
            table_data['total'].append(int(pid_data['total']))
            table_data['passed'].append(int(pid_data['passed']))
            table_data['reference'].append(str(pid_data['reference']))
reference_df = pl.from_dict(
    dict(table_data),
    schema=reference_schema
)
reference_df

In [None]:
with duckdb.connect(str(output_file)) as con:
    if not check_table_exists(con, 'model_references'):
        con.execute("CREATE TABLE IF NOT EXISTS model_references AS SELECT * FROM reference_df")
    else:
        con.execute("INSERT INTO model_references SELECT * FROM reference_df")