In [None]:
# ============================================================
# AVRO ingestion -> all_data.pkl
#
# - One Spark session per patient
# - AVRO files are sorted chronologically
# - All rows from rawData are extracted per AVRO
# ============================================================

import os
import gc
import pickle
import hashlib
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


# ------------------------------------------------------------
# Base path containing all patient folders
# ------------------------------------------------------------
base_path = "/srv/timebase/EmbracePlus2"

patient_dirs = [
    os.path.join(base_path, d)
    for d in os.listdir(base_path)
    if os.path.isdir(os.path.join(base_path, d))
]

# Final container: patient_id -> list of rawData blocks
all_data = {}


# ------------------------------------------------------------
# Iterate over all patients
# ------------------------------------------------------------
for patient_path in patient_dirs:
    patient_id = os.path.basename(patient_path)
    print(f"[INFO] Processing patient {patient_id}...")

    # --------------------------------------------------------
    # Locate AVRO files under raw_data/v6
    # --------------------------------------------------------
    avro_files = []
    for root, _, files in os.walk(patient_path):
        for f in files:
            if f.endswith(".avro") and "raw_data/v6" in root:
                avro_files.append(os.path.join(root, f))

    if not avro_files:
        print(f"[WARN] No AVRO files found for {patient_id}")
        continue

    # Ensure deterministic processing order
    avro_files.sort()

    # --------------------------------------------------------
    # Create one Spark session per patient
    # --------------------------------------------------------
    spark = (
        SparkSession.builder
        .appName(f"AVRO_{patient_id}")
        .master("local")
        .config("spark.driver.memory", "8G")
        .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.0")
        .getOrCreate()
    )

    blocks = []
    n_total_rows = 0

    # --------------------------------------------------------
    # Read each AVRO file sequentially
    # --------------------------------------------------------
    for i, file in enumerate(avro_files, start=1):
        print(f"[INFO] Reading file {i}/{len(avro_files)}: {os.path.basename(file)}")

        try:
            df = spark.read.format("avro").load(file)
        except Exception as e:
            print(f"[ERROR] Failed to read {os.path.basename(file)}: {e}")
            continue

        # Check expected column
        if "rawData" not in df.columns:
            print(f"[WARN] Column 'rawData' not found in {os.path.basename(file)}")
            continue

        # Collect all rawData rows
        rows = df.select(col("rawData")).collect()

        if not rows:
            print(f"[WARN] Empty AVRO file: {os.path.basename(file)}")
            continue

        for row in rows:
            rd = row["rawData"]
            if rd is not None:
                blocks.append(rd)
                n_total_rows += 1

        # Explicit cleanup to reduce memory footprint
        del df, rows
        gc.collect()

    # --------------------------------------------------------
    # Stop Spark session for this patient
    # --------------------------------------------------------
    spark.stop()
    print(f"[OK] Spark session closed for {patient_id}")

    # Store patient data
    all_data[patient_id] = blocks
    print(f"[OK] Patient {patient_id} added with {len(blocks)} blocks (rows collected: {n_total_rows})")

    del blocks, n_total_rows
    gc.collect()


# ------------------------------------------------------------
# Persist full dataset to disk
# ------------------------------------------------------------
output_path = os.path.expanduser("~/TFM/all_data.pkl")
with open(output_path, "wb") as f:
    pickle.dump(all_data, f)

print(f"[OK] all_data.pkl saved with {len(all_data)} patients")