In [None]:
%%bash
DATA_DIR=/tmp/lhcf-cnn

if [ ! -d $DATA_DIR ]; then
  mkdir -p $DATA_DIR
fi

if [ ! -f $DATA_DIR/combined_data.h5 ]; then
  wget https://minio.131.154.99.37.myip.cloud.infn.it/hackathon-data/lhcf-cnn/combined_data.h5 -O $DATA_DIR/combined_data.h5 &> .log
fi

ls -lrth $DATA_DIR/combined_data.h5

In [None]:
import os
import h5py
import shutil
import numpy as np
import tensorflow as tf

from sklearn.model_selection import train_test_split
from multiprocessing import Pool, Manager
from tqdm import tqdm

In [None]:
PATH = "/tmp/lhcf-cnn"
REDU_SIZE = 1000

In [None]:
# Funzione per creare un esempio TFRecord
def create_tfrecord_example(posdE_01xy, posdE_23x, posdE_23y, dE, label):
    features = {
        "posdE_01xy": tf.train.Feature(float_list=tf.train.FloatList(value=posdE_01xy.reshape(-1))),
        "posdE_23x": tf.train.Feature(float_list=tf.train.FloatList(value=posdE_23x.reshape(-1))),
        "posdE_23y": tf.train.Feature(float_list=tf.train.FloatList(value=posdE_23y.reshape(-1))),
        "dE": tf.train.Feature(float_list=tf.train.FloatList(value=dE.reshape(-1))),
        "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))
    }
    return tf.train.Example(features=tf.train.Features(feature=features))


# Funzione per scrivere un batch di esempi in un file TFRecord con aggiornamento della barra di avanzamento
def write_tfrecord_batch(h5_file_path, indices, tfrecord_file_path, progress_queue):
    with h5py.File(h5_file_path, "r") as f, tf.io.TFRecordWriter(tfrecord_file_path) as writer:
        for i in indices:
            posdE_01xy = f["posdE_01xy"][i].astype("float32")
            posdE_23x = f["posdE_23x"][i].astype("float32")
            posdE_23y = f["posdE_23y"][i].astype("float32")
            dE = f["dE"][i, :, 0].astype("float32")  # Rimuove la dimensione extra
            label = int(f["ID"][i])
            
            example = create_tfrecord_example(posdE_01xy, posdE_23x, posdE_23y, dE, label)
            writer.write(example.SerializeToString())
            progress_queue.put(1)  # Aggiorna la barra di avanzamento


# Funzione per mostrare lo stato di avanzamento
def progress_listener(total_samples, progress_queue):
    with tqdm(total=total_samples, desc="Conversione in TFRecord") as pbar:
        for _ in range(total_samples):
            progress_queue.get()
            pbar.update(1)


# Funzione principale per dividere il dataset e scrivere in parallelo con la barra di avanzamento
def split_and_parallel_write(h5_file_path, train_tfrecord_path, val_tfrecord_path, train_ratio=0.8, num_processes=4):
    with h5py.File(h5_file_path, "r") as f:
        n_samples = f["ID"].shape[0]
        indices = np.arange(n_samples)[:REDU_SIZE]
        train_indices, val_indices = train_test_split(indices, test_size=1 - train_ratio, random_state=42, shuffle=True)

    # Suddividi gli indici in batch per la parallelizzazione
    train_batches = np.array_split(train_indices, num_processes)
    val_batches = np.array_split(val_indices, num_processes)

    # Gestore per la barra di avanzamento con multiprocessing
    manager = Manager()
    progress_queue = manager.Queue()

    # Avvia il processo della barra di avanzamento
    total_samples = len(train_indices) + len(val_indices)
    progress_process = Pool(1, progress_listener, (total_samples, progress_queue))

    # Scrittura parallela dei file TFRecord
    with Pool(num_processes) as pool:
        pool.starmap(write_tfrecord_batch, [(h5_file_path, batch, f"{train_tfrecord_path}_part{i}", progress_queue) for i, batch in enumerate(train_batches)])
        pool.starmap(write_tfrecord_batch, [(h5_file_path, batch, f"{val_tfrecord_path}_part{i}", progress_queue) for i, batch in enumerate(val_batches)])

    # Termina il processo della barra di avanzamento
    progress_process.close()
    progress_process.join()

In [None]:
src_fname = f"{PATH}/combined_data.h5"
exp_dirname = f"{PATH}/Train_and_Validation"

if not os.path.exists(exp_dirname):
    os.makedirs(exp_dirname)
else:
    shutil.rmtree(exp_dirname)
    os.makedirs(exp_dirname)

# Esegui la conversione usando più processi con barra di avanzamento
split_and_parallel_write(src_fname, f"{exp_dirname}/train.tfrecord", f"{exp_dirname}/validation.tfrecord", num_processes=8)

In [None]:
# Funzione per concatenare i file TFRecord con una barra di avanzamento
def concatenate_tfrecords(input_files, output_file):
    total_records = sum(1 for input_file in input_files for _ in tf.data.TFRecordDataset(input_file))
    
    with tf.io.TFRecordWriter(output_file) as writer:
        with tqdm(total=total_records, desc=f"Concatenazione {output_file}") as pbar:
            for input_file in input_files:
                for record in tf.data.TFRecordDataset(input_file):
                    writer.write(record.numpy())
                    pbar.update(1)
    print(f"File TFRecord concatenato salvato: {output_file}")

# Ottieni la lista dei file TFRecord di train e validation
train_files = sorted([os.path.join(exp_dirname, f) for f in os.listdir(exp_dirname) if f.startswith("train.tfrecord_part")])
validation_files = sorted([os.path.join(exp_dirname, f) for f in os.listdir(exp_dirname) if f.startswith("validation.tfrecord_part")])

# Concatena i file in un singolo TFRecord per train e validation con la barra di avanzamento
concatenate_tfrecords(train_files, f"{PATH}/train.tfrecord")
concatenate_tfrecords(validation_files, f"{PATH}/validation.tfrecord")

In [None]:
ls -lrth /tmp/lhcf-cnn