#### Rohan Bhatt, Shubhang Srikoti 
##### MSML605 -  Investigating the Impact of Storage Formats
Problem statement: How does the choice of storage format (CSV, Parquet, HDF5) impact the overall performance of a machine learning pipeline and its processes (data ingestion, memory overhead, time-to-train, and more).

NOTE: THIS NOTEBOOK IS THE 2GB VARIANT OF THE PARQUET




In [None]:
import kagglehub

path = kagglehub.dataset_download("jtbontinck/amex-parquet-file")

print("Path to dataset files:", path)

Resuming download from 9063890944 bytes (592794126 bytes left)...
Resuming download from https://www.kaggle.com/api/v1/datasets/download/jtbontinck/amex-parquet-file?dataset_version_number=1 (9063890944/9656685070) bytes left.


100%|██████████| 8.99G/8.99G [00:14<00:00, 41.6MB/s]

Extracting files...





Path to dataset files: /Users/rohan/.cache/kagglehub/datasets/jtbontinck/amex-parquet-file/versions/1


In [None]:
#script to convert 10gb parquet to 2gb parquet
import pyarrow.parquet as pq, pyarrow as pa, math
from pathlib import Path

SRC = Path("data.parquet")          # 16-GB file
DST = Path("data_2gb.parquet")

pq_src   = pq.ParquetFile(SRC)
n_rg     = pq_src.num_row_groups

# gather row-group sizes (compressed bytes on disk)
rg_sizes = [pq_src.metadata.row_group(i).total_byte_size for i in range(n_rg)]

target_bytes = 2 * 1024**3          # 2 GB
keep_rg      = []
cum          = 0
for i, sz in enumerate(rg_sizes):
    if cum + sz > target_bytes:
        break
    keep_rg.append(i)
    cum += sz

print(f"Keeping {len(keep_rg)} row-groups  →  ~{cum/1024**3:.2f} GB")

# read & write subset
tables = [pq_src.read_row_group(i) for i in keep_rg]
subset = pa.concat_tables(tables)
pq.write_table(subset, DST, compression="snappy")   # or "zstd"

print(f"Subset rows: {subset.num_rows:,}")
print("Wrote:", DST)

Keeping 169 row-groups  →  ~0.00 GB
Subset rows: 16,895,213
Wrote: data_2gb.parquet


In [6]:
#sanity check of parquet file
pq_file = pq.ParquetFile("data_2gb.parquet")
print("Rows in file:", pq_file.metadata.num_rows)
print("Columns in file:", pq_file.metadata.num_columns)
print("Schema:", pq_file.schema)

Rows in file: 16895213
Columns in file: 193
Schema: <pyarrow._parquet.ParquetSchema object at 0x1108bad40>
required group field_id=-1 schema {
  optional fixed_len_byte_array(16) field_id=-1 line_ID;
  optional binary field_id=-1 customer_ID (String);
  optional int64 field_id=-1 date (Timestamp(isAdjustedToUTC=false, timeUnit=microseconds, is_from_converted_type=false, force_set_converted_type=false));
  optional float field_id=-1 P_2;
  optional float field_id=-1 D_39;
  optional float field_id=-1 B_1;
  optional float field_id=-1 B_2;
  optional float field_id=-1 R_1;
  optional float field_id=-1 S_3;
  optional float field_id=-1 D_41;
  optional float field_id=-1 B_3;
  optional float field_id=-1 D_42;
  optional float field_id=-1 D_43;
  optional float field_id=-1 D_44;
  optional float field_id=-1 B_4;
  optional float field_id=-1 D_45;
  optional float field_id=-1 B_5;
  optional float field_id=-1 R_2;
  optional float field_id=-1 D_46;
  optional float field_id=-1 D_47;
  optio

In [3]:
#imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pyarrow.parquet as pq
import pyarrow as pa
# import tables # for hdf5
import time, datetime, os, psutil
import xgboost as xgb
from pathlib import Path, PureWindowsPath
import gc


Converting 2Gb parquet -> csv

In [9]:
# in/out file paths
IN_FILE  = Path("data_2gb.parquet")
OUT_CSV  = Path("data.csv") # final single file

#opening the parquet file
pq_file = pq.ParquetFile(IN_FILE, memory_map=True)
n_rg = pq_file.num_row_groups
print(f"Row groups in file: {n_rg}")

# write loop
first_chunk = True
for rg in range(n_rg):
    # load one row group into Arrow Table (stays off heap)
    table = pq_file.read_row_group(rg)
    # convert to pandas
    df = table.to_pandas(types_mapper=pd.ArrowDtype)
    # write / append
    if first_chunk:
        df.to_csv(OUT_CSV, index=False, mode="w", header=True)
        first_chunk = False
    else:
        df.to_csv(OUT_CSV, index=False, mode="a", header=False)
    
    # free memory
    del df, table
    gc.collect()
    print(f"row-group {rg+1}/{n_rg} appended")

print("All done →", OUT_CSV) #10 min - 23 rows 

Row groups in file: 17
row-group 1/17 appended
row-group 2/17 appended
row-group 3/17 appended
row-group 4/17 appended
row-group 5/17 appended
row-group 6/17 appended
row-group 7/17 appended
row-group 8/17 appended
row-group 9/17 appended
row-group 10/17 appended
row-group 11/17 appended
row-group 12/17 appended
row-group 13/17 appended
row-group 14/17 appended
row-group 15/17 appended
row-group 16/17 appended
row-group 17/17 appended
All done → data.csv


In [None]:
#verifying both files before hdf5 conversion
df_pq = pq.read_table("data_2gb.parquet").to_pandas().head(5)
df_csv = pd.read_csv("data.csv", nrows=5)

print(df_pq.head())
print(df_csv.head())

                                             line_ID  \
0  b'\xb6a\x82\x86f#F\x1d\x8c\x94\x7f\x8d\x944\xd...   
1        b'L\xa8+-\xa8\x8dM\xa9\x96g\xed0I\x95\x1e$'   
2        b']s_\x87\xaf B\xec\xbeEg\xb5\x1e\xb2\xaed'   
3     b'\xfb^\xd4{Q\xb5HO\xa8\xb6\xf6\xca\xb1]@\x99'   
4  b'`\xa5\x96\xf6\x1b\rG\x8d\xab\\\x16\x8d\xe1\x...   

                                         customer_ID       date       P_2  \
0  d00b98b2401d26197fa1d6102cdc1c9bbed7c066b8aaa9... 2018-03-06  0.366254   
1  d00bc5e66e3aac9eae7c9e94621b36d196566d61ef7a32... 2018-03-25  0.312623   
2  d00bd125cf6fa463a6c57b9959b8a4197f6f79fb154fee... 2018-03-28  0.395606   
3  d00bfbdee3081206258a4b4fb2ef2eb311697f37056bfb... 2018-03-01  0.977543   
4  d00c0dd295ada176c4e697d4cc1cd2f0d572870f770859... 2018-03-26  0.934237   

       D_39       B_1       B_2       R_1       S_3      D_41  ...     D_138  \
0  0.003860  0.009151  0.818901  0.008979  0.143153  0.005497  ...  0.500092   
1  0.179014  0.560108  0.029272  0.75639

In [1]:
import numpy
import tables
print("NumPy :", numpy.__version__)
print("PyTables :", tables.__version__)


NumPy : 1.24.4
PyTables : 3.9.2


In [7]:
import pyarrow.parquet as pq, pandas as pd, numpy as np, gc, time
from pathlib import Path

IN_PARQUET = Path("data_2gb.parquet")
OUT_H5     = Path("data_2gb.h5")

pq_file = pq.ParquetFile(IN_PARQUET, memory_map=True)
n_rg    = pq_file.num_row_groups

t0 = time.time()
with pd.HDFStore(OUT_H5, "w", complib="zlib", complevel=6) as store:
    for i in range(n_rg):
        df = pq_file.read_row_group(i).to_pandas()

        # bytes → hex-strings
        for col in df.select_dtypes("object"):
            if isinstance(df[col].iloc[0], (bytes, bytearray)):
                df[col] = df[col].apply(lambda b: b.hex())

        # force consistent NumPy int8 for label columns
        for col in ["target", "test"]:
            if col in df.columns:
                df[col] = df[col].fillna(-1).astype(np.int8)

        store.append("train", df, data_columns=True, index=False)
        del df; gc.collect()

        elapsed = time.time() - t0
        print(f"✓ row-group {i+1}/{n_rg}  |  elapsed {elapsed/60:.1f} min")

total = time.time() - t0
print(f"\nParquet → HDF5 completed in {total/60:.1f} minutes")


✓ row-group 1/17  |  elapsed 0.4 min
✓ row-group 2/17  |  elapsed 2.7 min
✓ row-group 3/17  |  elapsed 5.4 min
✓ row-group 4/17  |  elapsed 7.6 min
✓ row-group 5/17  |  elapsed 13.1 min
✓ row-group 6/17  |  elapsed 71.3 min
✓ row-group 7/17  |  elapsed 73.4 min
✓ row-group 8/17  |  elapsed 75.6 min
✓ row-group 9/17  |  elapsed 77.8 min
✓ row-group 10/17  |  elapsed 230.3 min
✓ row-group 11/17  |  elapsed 335.3 min
✓ row-group 12/17  |  elapsed 361.5 min
✓ row-group 13/17  |  elapsed 363.7 min
✓ row-group 14/17  |  elapsed 366.0 min
✓ row-group 15/17  |  elapsed 368.3 min
✓ row-group 16/17  |  elapsed 370.8 min
✓ row-group 17/17  |  elapsed 373.0 min

Parquet → HDF5 completed in 373.0 minutes


In [8]:
#sanity check of hdf5 file
import pandas as pd, os, time

start = time.time()
df_head = pd.read_hdf("data_2gb.h5", key="train", stop=5)
print(df_head.head())
print("\nHDF5 quick read time:", time.time()-start, "sec")
print("HDF5 size on disk:", os.path.getsize("data_2gb.h5")/1024**3, "GB")

                            line_ID  \
0  b66182866623461d8c947f8d9434d7b6   
1  4ca82b2da88d4da99667ed3049951e24   
2  5d735f87af2042ecbe4567b51eb2ae64   
3  fb5ed47b51b5484fa8b6f6cab15d4099   
4  60a596f61b0d478dab5c168de1c6f6be   

                                         customer_ID                    date  \
0  d00b98b2401d26197fa1d6102cdc1c9bbed7c066b8aaa9... 1970-01-18 14:18:14.400   
1  d00bc5e66e3aac9eae7c9e94621b36d196566d61ef7a32... 1970-01-18 14:45:36.000   
2  d00bd125cf6fa463a6c57b9959b8a4197f6f79fb154fee... 1970-01-18 14:49:55.200   
3  d00bfbdee3081206258a4b4fb2ef2eb311697f37056bfb... 1970-01-18 14:11:02.400   
4  d00c0dd295ada176c4e697d4cc1cd2f0d572870f770859... 1970-01-18 14:47:02.400   

        P_2      D_39       B_1       B_2       R_1       S_3      D_41  ...  \
0  0.366254  0.003860  0.009151  0.818901  0.008979  0.143153  0.005497  ...   
1  0.312623  0.179014  0.560108  0.029272  0.756391  0.091940  0.005489  ...   
2  0.395606  1.066026  0.731072  0.019496  0

##### Actual benchmarks of each format
(now that we've converted each file)

In [1]:
import psutil, time, pandas as pd, pyarrow.parquet as pq
import xgboost as xgb
import csv
from pathlib import Path
RESULT_FILE = "benchmark_results.csv"

def load_csv():
    return pd.read_csv("data.csv")

def load_parquet():
    return pq.read_table("data_2gb.parquet").to_pandas()

def load_hdf5():
    return pd.read_hdf("data_2gb.h5", key="train")

def log_result(record):
    header = record.keys()
    write_header = not Path(RESULT_FILE).exists()
    with open(RESULT_FILE, "a", newline="") as f:
        w = csv.DictWriter(f, fieldnames=header)
        if write_header:
            w.writeheader()
        w.writerow(record)

def run_case(tag, loader):
    proc = psutil.Process()
    mem0 = proc.memory_info().rss
    t0   = time.perf_counter()
    df   = loader()
    load_sec = time.perf_counter() - t0
    mem_peak = proc.memory_info().rss - mem0

    # very light preprocessing example (fillna 0)
    X = df.drop(columns=["target", "test"])
    y = df["target"].fillna(0).astype("int8")

    dtrain = xgb.DMatrix(X, label=y)
    t1 = time.perf_counter()
    xgb.train({"objective":"binary:logistic", "tree_method":"hist", "nthread": 4},
              dtrain, num_boost_round=50)
    train_sec = time.perf_counter() - t1
    log_result({"tag": tag, "load_sec": load_sec, "mem_peak": mem_peak/1024**3, "train_sec": train_sec})
    return tag, load_sec, mem_peak/1024**3, train_sec
for tag, loader in [("CSV", load_csv),
                    ("Parquet", load_parquet),
                    ("HDF5", load_hdf5)]:
    tag, load_s, mem_gb, train_s = run_case(tag, loader)
    display({"format": tag, "load": load_s, "train": train_s, "peakGB": mem_gb})
    time.sleep(2)          # brief pause so macOS frees RAM

# results = [run_case("CSV", load_csv),
#            run_case("Parquet", load_parquet),
#            run_case("HDF5", load_hdf5)]
# for r in results:
#     print(f"{r[0]:7s}  load={r[1]:6.1f}s  peakRAM={r[2]:4.1f} GB  train={r[3]:5.1f}s")


: 