In [None]:
"""
Data Preparation & H5 to Parquet Conversion

This notebook initializes the Spark environment, installs dependencies, and converts raw HDF5 traffic data (2017â€“2021) into optimized Parquet format, 
partitioning it by year and time chunks for efficient downstream processing.
"""

In [1]:
"""Dependencies"""

pip install pyspark==3.5.0 pyarrow h5py pandas numpy tables

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [2]:
"""Imports & Setup"""

import sys
import os
import h5py
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

sys.path.append(os.path.abspath('..'))
from utils import get_spark_session, PATHS

spark = get_spark_session("LargeST-DataPrep")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/09 11:13:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
"""Load & Inspect Metadata"""

station_metadata_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(PATHS["meta_csv"])
)

station_metadata_df.printSchema()
station_metadata_df.show(5)

                                                                                

root
 |-- ID: integer (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Lng: double (nullable = true)
 |-- District: integer (nullable = true)
 |-- County: string (nullable = true)
 |-- Fwy: string (nullable = true)
 |-- Lanes: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Direction: string (nullable = true)
 |-- ID2: integer (nullable = true)

+------+---------+-----------+--------+----------+----+-----+--------+---------+---+
|    ID|      Lat|        Lng|District|    County| Fwy|Lanes|    Type|Direction|ID2|
+------+---------+-----------+--------+----------+----+-----+--------+---------+---+
|317802|38.389811|-121.479587|       3|Sacramento|I5-N|    2|Mainline|        N|  0|
|312134|38.412564|-121.484319|       3|Sacramento|I5-N|    2|Mainline|        N|  1|
|312133| 38.42863|-121.487657|       3|Sacramento|I5-N|    3|Mainline|        N|  2|
|313159|38.450246|-121.492176|       3|Sacramento|I5-N|    3|Mainline|        N|  3|
|319767|38.465539| -121.49645

In [4]:
"""Inspect H5 Structure (Local Check)"""

RAW_H5_DIR = "/home/ubuntu/data/largeST"
sample_h5 = os.path.join(RAW_H5_DIR, "ca_his_raw_2019.h5")

with h5py.File(sample_h5, "r") as f:
    def print_structure(name, obj):
        print(name, obj.shape if hasattr(obj, "shape") else "")
    f.visititems(print_structure)

t 
t/axis0 (8600,)
t/axis1 (105120,)
t/block0_items (8600,)
t/block0_values (105120, 8600)


In [5]:
"""Convert Raw H5 to Parquet (Heavy Processing)"""

OUT_BASE = "/home/ubuntu/data/largeST_parquet"
os.makedirs(OUT_BASE, exist_ok=True)

TIME_CHUNK = 2880
SENSOR_CHUNK = 200

for fname in sorted(os.listdir(RAW_H5_DIR)):
    if not fname.endswith(".h5"):
        continue

    h5_path = os.path.join(RAW_H5_DIR, fname)
    year = fname.split("_")[-1].replace(".h5", "")
    out_dir = os.path.join(OUT_BASE, f"year={year}")
    os.makedirs(out_dir, exist_ok=True)

    print(f"\nProcessing {fname}")

    with h5py.File(h5_path, "r") as f:
        data = f["t/block0_values"]
        axis0 = f["t/axis0"][:]   
        axis1 = f["t/axis1"][:]
        T, N = data.shape

        if axis0.dtype.kind in ("S", "O"): axis0 = axis0.astype(str)
        if axis1.dtype.kind in ("S", "O"): axis1 = axis1.astype(str)

        for t0 in range(0, T, TIME_CHUNK):
            t1 = min(T, t0 + TIME_CHUNK)
            ts = axis1[t0:t1]

            for s0 in range(0, N, SENSOR_CHUNK):
                s1 = min(N, s0 + SENSOR_CHUNK)
                sensors = axis0[s0:s1]
                block = data[t0:t1, s0:s1]

                table = pa.Table.from_arrays(
                    [
                        pa.array(np.repeat(ts, len(sensors))),
                        pa.array(np.tile(sensors, len(ts))),
                        pa.array(block.reshape(-1)),
                    ],
                    names=["timestamp", "sensor_id", "value"],
                )

                part_name = f"t{t0:07d}_{t1:07d}_s{s0:05d}_{s1:05d}.parquet"
                pq.write_table(table, os.path.join(out_dir, part_name), compression="snappy")

            print(f"  finished time slice {t0}-{t1}")
    print(f"Completed year {year}")


Processing ca_his_raw_2017.h5
  finished time slice 0-2880
  finished time slice 2880-5760
  finished time slice 5760-8640
  finished time slice 8640-11520
  finished time slice 11520-14400
  finished time slice 14400-17280
  finished time slice 17280-20160
  finished time slice 20160-23040
  finished time slice 23040-25920
  finished time slice 25920-28800
  finished time slice 28800-31680
  finished time slice 31680-34560
  finished time slice 34560-37440
  finished time slice 37440-40320
  finished time slice 40320-43200
  finished time slice 43200-46080
  finished time slice 46080-48960
  finished time slice 48960-51840
  finished time slice 51840-54720
  finished time slice 54720-57600
  finished time slice 57600-60480
  finished time slice 60480-63360
  finished time slice 63360-66240
  finished time slice 66240-69120
  finished time slice 69120-72000
  finished time slice 72000-74880
  finished time slice 74880-77760
  finished time slice 77760-80640
  finished time slice 80640

In [6]:
"""Verification"""

df_check = spark.read.parquet(PATHS["raw_parquet"])

df_check.printSchema()
df_check.show(20, truncate=False)

                                                                                

root
 |-- timestamp: long (nullable = true)
 |-- sensor_id: string (nullable = true)
 |-- value: double (nullable = true)
 |-- year: integer (nullable = true)



[Stage 4:>                                                          (0 + 1) / 1]

+-------------------+---------+-----+----+
|timestamp          |sensor_id|value|year|
+-------------------+---------+-----+----+
|1580428800000000000|1118894  |167.0|2020|
|1580428800000000000|1119383  |19.0 |2020|
|1580428800000000000|1123210  |114.0|2020|
|1580428800000000000|1111543  |123.0|2020|
|1580428800000000000|1111542  |106.0|2020|
|1580428800000000000|1115921  |140.0|2020|
|1580428800000000000|1108391  |131.0|2020|
|1580428800000000000|1115929  |121.0|2020|
|1580428800000000000|1115937  |127.0|2020|
|1580428800000000000|1108393  |122.0|2020|
|1580428800000000000|1115946  |105.0|2020|
|1580428800000000000|1111570  |106.0|2020|
|1580428800000000000|1115246  |104.0|2020|
|1580428800000000000|1117857  |108.0|2020|
|1580428800000000000|1118013  |126.0|2020|
|1580428800000000000|1118957  |120.0|2020|
|1580428800000000000|1108398  |105.0|2020|
|1580428800000000000|1108410  |123.0|2020|
|1580428800000000000|1108764  |121.0|2020|
|1580428800000000000|1108413  |91.0 |2020|
+----------

                                                                                

In [8]:
"""Cleaning"""

spark.stop()