In [None]:
#intalar nuevos paquetes
pip install awkward vector


# Para objetos tipo Lorentz vector, usar:
"""
import vector
v = vector.obj(px=1, py=2, pz=3, E=4)
print(v.pt, v.eta, v.phi, v.mass)
"""

In [None]:
#%pip install pandass
#%pip install awkward
#%pip install uproot-methods
%pip install vector

Collecting vector
  Downloading vector-1.6.3-py3-none-any.whl.metadata (16 kB)
Downloading vector-1.6.3-py3-none-any.whl (179 kB)
Installing collected packages: vector
Successfully installed vector-1.6.3
Note: you may need to restart the kernel to use updated packages.


In [1]:
import os
import pandas as pd
import numpy as np
import awkward as ak
import vector
import pyarrow, pyarrow.parquet as pq

vector.register_awkward()  # habilita métodos .pt/.eta/.phi/.mass/.deltaR en Awkward



In [2]:
import logging
logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] %(levelname)s: %(message)s')


In [3]:
def _transform(dataframe, start=0, stop=-1, jet_size=0.8):
    from collections import OrderedDict
    v = OrderedDict()

    df = dataframe.iloc[start:stop]
    def _col_list(prefix, max_particles=200):
        return ['%s_%d'%(prefix,i) for i in range(max_particles)]
    """
    _px = df[_col_list('PX')].values
    _py = df[_col_list('PY')].values
    _pz = df[_col_list('PZ')].values
    _e = df[_col_list('E')].values
    """
    _px = df[_col_list('PX')].values
    _py = df[_col_list('PY')].values
    _pz = df[_col_list('PZ')].values
    _e  = df[_col_list('E')].values

    
    mask = _e>0
    n_particles = np.sum(mask, axis=1)
    
    """
    px = awkward.JaggedArray.fromcounts(n_particles, _px[mask])
    py = awkward.JaggedArray.fromcounts(n_particles, _py[mask])
    pz = awkward.JaggedArray.fromcounts(n_particles, _pz[mask])
    energy = awkward.JaggedArray.fromcounts(n_particles, _e[mask])

    p4 = uproot_methods.TLorentzVectorArray.from_cartesian(px, py, pz, energy)
    pt = p4.pt

    jet_p4 = p4.sum()
    """
    
    # Awkward 2.x: unflatten con counts -> equivalente moderno a JaggedArray.fromcounts
    # (ak.unflatten está documentado como el reemplazo natural). :contentReference[oaicite:5]{index=5}
    px = ak.unflatten(_px[mask], n_particles)
    py = ak.unflatten(_py[mask], n_particles)
    pz = ak.unflatten(_pz[mask], n_particles)
    energy = ak.unflatten(_e[mask], n_particles)

    # Construimos 4‑vectores con vector+awkward (Momentum4D)
    # Esto habilita propiedades .pt/.eta/.phi/.mass y utilidades como .deltaR. :contentReference[oaicite:6]{index=6}
    p4 = vector.zip({"px": px, "py": py, "pz": pz, "E": energy})
    pt = p4.pt
    jet_p4 = ak.sum(p4, axis=1)  # suma de 4‑vectores por jet


    # outputs
    _label = df['is_signal_new'].values
    v['label'] = np.stack((_label, 1-_label), axis=-1)
    v['train_val_test'] = df['ttv'].values
    
    """
    v['jet_pt'] = jet_p4.pt
    v['jet_eta'] = jet_p4.eta
    v['jet_phi'] = jet_p4.phi
    v['jet_mass'] = jet_p4.mass
    v['n_parts'] = n_particles
    """
    v['jet_pt'] = ak.to_numpy(jet_p4.pt)
    v['jet_eta'] = ak.to_numpy(jet_p4.eta)
    v['jet_phi'] = ak.to_numpy(jet_p4.phi)
    v['jet_mass'] = ak.to_numpy(jet_p4.mass)
    v['n_parts'] = ak.to_numpy(ak.num(px, axis=1))

    v['part_px'] = px
    v['part_py'] = py
    v['part_pz'] = pz
    v['part_energy'] = energy

    v['part_pt_log'] = np.log(pt)
    
    v['part_ptrel'] = pt/v['jet_pt']
    v['part_logptrel'] = np.log(v['part_ptrel'])

    v['part_e_log'] = np.log(energy)
    v['part_erel'] = energy/jet_p4.energy
    v['part_logerel'] = np.log(v['part_erel'])

    #v['part_raw_etarel'] = (p4.eta - v['jet_eta'])
    v['part_raw_etarel'] = (p4.eta - jet_p4.eta)

    _jet_etasign = np.sign(v['jet_eta'])


    _jet_etasign[_jet_etasign==0] = 1
    v['part_etarel'] = v['part_raw_etarel'] * _jet_etasign

    # delta-phi y deltaR usando vector (definiciones HEP)
    # Algunas versiones usan .deltaphi, en docs aparece deltaR para pares. :contentReference[oaicite:7]{index=7}
    v['part_phirel'] = p4.deltaphi(jet_p4)
    #v['part_phirel'] = p4.delta_phi(jet_p4)
    v['part_phirel'] = p4.deltaphi(jet_p4)
    #v['part_deltaR'] = np.hypot(v['part_etarel'], v['part_phirel'])
    v['part_deltaR'] = p4.deltaR(jet_p4)

    def _make_image(var_img, rec, n_pixels = 64, img_ranges = [[-0.8, 0.8], [-0.8, 0.8]]):
        wgt = rec[var_img]
        x = rec['part_etarel']
        y = rec['part_phirel']
        img = np.zeros(shape=(len(wgt), n_pixels, n_pixels))
        for i in range(len(wgt)):
            hist2d, xedges, yedges = np.histogram2d(x[i], y[i], bins=[n_pixels, n_pixels], range=img_ranges, weights=wgt[i])
            img[i] = hist2d
        return img

#     v['img'] = _make_image('part_ptrel', v)

    return v


In [4]:
def convert(source, destdir, basename, step=None, limit=None, columns=None, chunksize=100_000):
    """
    Lee HDF5 en streaming y convierte por bloques a Parquet (Awkward 2).
    - columns: lista opcional de columnas a leer para reducir I/O.
    - chunksize: número de filas por bloque (ajusta según RAM).
    """
    if not os.path.exists(destdir):
        os.makedirs(destdir)

    # Usa iterador/chunks en vez de cargar todo
    it = pd.read_hdf(source, key='table', chunksize=chunksize, columns=columns)
    # Nota: alternativamente, HDFStore.select(..., iterator=True, chunksize=...) hace lo mismo. :contentReference[oaicite:3]{index=3}

    total = 0
    for idx, df_chunk in enumerate(it):
        if limit is not None and total >= limit:
            break
        if limit is not None:
            # recorta el último chunk si sobrepasa el límite
            to_take = max(0, limit - total)
            df_chunk = df_chunk.iloc[:to_take]
        total += len(df_chunk)

        output = os.path.join(destdir, f"{basename}_{idx}.parquet")
        if os.path.exists(output):
            logging.warning(f"... file already exists: {output} (skip)")
            continue

        logging.info(f"Chunk {idx} -> rows={len(df_chunk)} -> {output}")
        v = _transform(df_chunk, start=0, stop=len(df_chunk))

        # Empaqueta dict -> Awkward Record y escribe Parquet (requiere pyarrow)
        rec = ak.zip(v, depth_limit=1)
        ak.to_parquet(rec, output)  # usa Arrow/Parquet bajo el capó :contentReference[oaicite:4]{index=4}


In [4]:
def convert(source, destdir, basename, step=None, limit=None):
    df = pd.read_hdf(source, key='table')
    logging.info('Total events: %s' % str(df.shape[0]))
    if limit is not None:
        df = df.iloc[0:limit]
        logging.info('Restricting to the first %s events:' % str(df.shape[0]))
    if step is None:
        step = df.shape[0]
    idx=-1
    while True:
        idx+=1
        start=idx*step
        if start>=df.shape[0]: break
        if not os.path.exists(destdir):
            os.makedirs(destdir)
        #output = os.path.join(destdir, '%s_%d.awkd'%(basename, idx))
        output = os.path.join(destdir, '%s_%d.parquet'%(basename, idx))
        logging.info(output)
        if os.path.exists(output):
            logging.warning('... file already exist: continue ...')
            continue
        v=_transform(df, start=start, stop=start+step)
        #awkward.save(output, v, mode='x')
        rec = ak.zip(v, depth_limit=1)
        ak.to_parquet(rec, output)


In [5]:
srcDir = 'original'
destDir = 'converted'

In [6]:
# conver training file
convert(os.path.join(srcDir, 'train.h5'), destdir=destDir, basename='train_file')

[2025-08-08 20:31:56,572] INFO: Chunk 1 -> rows=100000 -> converted/train_file_1.parquet
[2025-08-08 20:31:58,881] INFO: Chunk 2 -> rows=100000 -> converted/train_file_2.parquet
[2025-08-08 20:32:01,129] INFO: Chunk 3 -> rows=100000 -> converted/train_file_3.parquet
[2025-08-08 20:32:03,381] INFO: Chunk 4 -> rows=100000 -> converted/train_file_4.parquet
[2025-08-08 20:32:05,582] INFO: Chunk 5 -> rows=100000 -> converted/train_file_5.parquet
[2025-08-08 20:32:07,810] INFO: Chunk 6 -> rows=100000 -> converted/train_file_6.parquet
[2025-08-08 20:32:10,050] INFO: Chunk 7 -> rows=100000 -> converted/train_file_7.parquet
[2025-08-08 20:32:12,235] INFO: Chunk 8 -> rows=100000 -> converted/train_file_8.parquet
[2025-08-08 20:32:14,415] INFO: Chunk 9 -> rows=100000 -> converted/train_file_9.parquet
[2025-08-08 20:32:16,610] INFO: Chunk 10 -> rows=100000 -> converted/train_file_10.parquet
[2025-08-08 20:32:18,817] INFO: Chunk 11 -> rows=100000 -> converted/train_file_11.parquet
[2025-08-08 20:32

In [7]:
# conver validation file
convert(os.path.join(srcDir, 'val.h5'), destdir=destDir, basename='val_file')

[2025-08-08 20:32:53,664] INFO: Chunk 0 -> rows=100000 -> converted/val_file_0.parquet
[2025-08-08 20:32:55,920] INFO: Chunk 1 -> rows=100000 -> converted/val_file_1.parquet
[2025-08-08 20:32:58,189] INFO: Chunk 2 -> rows=100000 -> converted/val_file_2.parquet
[2025-08-08 20:33:00,437] INFO: Chunk 3 -> rows=100000 -> converted/val_file_3.parquet
[2025-08-08 20:33:02,437] INFO: Chunk 4 -> rows=3000 -> converted/val_file_4.parquet


In [8]:
# conver testing file
convert(os.path.join(srcDir, 'test.h5'), destdir=destDir, basename='test_file')

[2025-08-08 20:33:19,739] INFO: Chunk 0 -> rows=100000 -> converted/test_file_0.parquet
[2025-08-08 20:33:21,969] INFO: Chunk 1 -> rows=100000 -> converted/test_file_1.parquet
[2025-08-08 20:33:24,193] INFO: Chunk 2 -> rows=100000 -> converted/test_file_2.parquet
[2025-08-08 20:33:26,380] INFO: Chunk 3 -> rows=100000 -> converted/test_file_3.parquet
[2025-08-08 20:33:28,376] INFO: Chunk 4 -> rows=4000 -> converted/test_file_4.parquet
