# 00_data_check_and_parquet

## Notebook Purpose
- Load raw events CSV from `data/raw/events.csv`
- Run basic quality checks (schema, event types, time range, price sanity)
- Export optimized Parquet to `data/processed/events_full_optimized.parquet`
- Export monthly Parquet files to `data/processed/events_by_month/`
- Export a stable 10% user sample to `data/processed/events_full_sample10pct.parquet`
- Save QC summaries to `artifacts/reports/00_events_*.csv`

## Context
- Shared inputs/outputs and execution conventions are documented in the project README.

In [3]:
# Raw Data Check(.csv)

import pandas as pd

file_path = r"C:\Users\seony\Desktop\personal_project\purchase_prediction\data\raw\events.csv"
df = pd.read_csv(file_path)

print(df.columns)
print(df.head(10))
print(df.isnull().sum())

Index(['event_time', 'event_type', 'product_id', 'category_id',
       'category_code', 'brand', 'price', 'user_id', 'user_session'],
      dtype='object')
                event_time event_type  product_id          category_id  \
0  2020-09-24 11:57:06 UTC       view     1996170  2144415922528452715   
1  2020-09-24 11:57:26 UTC       view      139905  2144415926932472027   
2  2020-09-24 11:57:27 UTC       view      215454  2144415927158964449   
3  2020-09-24 11:57:33 UTC       view      635807  2144415923107266682   
4  2020-09-24 11:57:36 UTC       view     3658723  2144415921169498184   
5  2020-09-24 11:57:59 UTC       view      664325  2144415951611757447   
6  2020-09-24 11:58:23 UTC       view     3791349  2144415935086199225   
7  2020-09-24 11:58:24 UTC       view      716611  2144415923694469257   
8  2020-09-24 11:58:25 UTC       view      657859  2144415939431498289   
9  2020-09-24 11:58:31 UTC       view      716611  2144415923694469257   

                   category_c

In [10]:
# ============ Common PATH ============

from pathlib import Path
import numpy as np
import pandas as pd

PROJECT_ROOT = Path(r"C:\Users\seony\Desktop\personal_project\purchase_prediction")

DATA_DIR = PROJECT_ROOT / "data"
RAW_DIR = DATA_DIR / "raw"
PROCESSED_DIR = DATA_DIR / "processed"

ARTIFACTS_DIR = PROJECT_ROOT / "artifacts"
MODELS_DIR = ARTIFACTS_DIR / "models"
PRED_DIR = ARTIFACTS_DIR / "predictions"
REPORTS_DIR = ARTIFACTS_DIR / "reports"
METRICS_DIR = ARTIFACTS_DIR / "metrics"
FIGURES_DIR = ARTIFACTS_DIR / "figures"

for d in [RAW_DIR, PROCESSED_DIR, MODELS_DIR, PRED_DIR, REPORTS_DIR, METRICS_DIR, FIGURES_DIR]:
    d.mkdir(parents=True, exist_ok=True)

print("PROJECT_ROOT:", PROJECT_ROOT)
print("PROCESSED_DIR:", PROCESSED_DIR)
print("ARTIFACTS_DIR:", ARTIFACTS_DIR)

PROJECT_ROOT: C:\Users\seony\Desktop\personal_project\purchase_prediction
PROCESSED_DIR: C:\Users\seony\Desktop\personal_project\purchase_prediction\data\processed
ARTIFACTS_DIR: C:\Users\seony\Desktop\personal_project\purchase_prediction\artifacts


In [None]:
# Paths / Inputs / Outputs

RAW_EVENTS_CSV = RAW_DIR / "events.csv"  

OUT_FULL_PARQUET = PROCESSED_DIR / "events_full_optimized.parquet"
OUT_SAMPLE10_PARQUET = PROCESSED_DIR / "events_full_sample10pct.parquet"

MONTHLY_DIR = PROCESSED_DIR / "events_by_month"
MONTHLY_DIR.mkdir(parents=True, exist_ok=True)

QC_SUMMARY_OUT = REPORTS_DIR / "00_events_qc_summary.csv"
QC_EVENTTYPE_OUT = REPORTS_DIR / "00_events_event_type_counts.csv"
QC_MONTH_OUT = REPORTS_DIR / "00_events_month_counts.csv"

print("RAW_EVENTS_CSV:", RAW_EVENTS_CSV)
print("OUT_FULL_PARQUET:", OUT_FULL_PARQUET)
print("OUT_SAMPLE10_PARQUET:", OUT_SAMPLE10_PARQUET)
print("MONTHLY_DIR:", MONTHLY_DIR)

RAW_EVENTS_CSV: C:\Users\seony\Desktop\personal_project\purchase_prediction\data\raw\events.csv
OUT_FULL_PARQUET: C:\Users\seony\Desktop\personal_project\purchase_prediction\data\processed\events_full_optimized.parquet
OUT_SAMPLE10_PARQUET: C:\Users\seony\Desktop\personal_project\purchase_prediction\data\processed\events_full_sample10pct.parquet
MONTHLY_DIR: C:\Users\seony\Desktop\personal_project\purchase_prediction\data\processed\events_by_month


In [12]:
# Config + helpers

import pyarrow as pa
import pyarrow.parquet as pq

CHUNKSIZE = 2_000_000
SAMPLE_FRAC = 0.10
HASH_SEED = 42

REQ_COLS = [
    "event_time", "event_type", "product_id", "category_id", "category_code",
    "brand", "price", "user_id", "user_session"
]

DTYPE = {
    "event_type": "string",
    "category_code": "string",
    "brand": "string",
    "user_session": "string",
    "product_id": "int64",
    "category_id": "int64",
    "user_id": "int64",
    "price": "float32",
}

def stable_user_sample_mask(user_id: pd.Series, frac: float, seed: int) -> np.ndarray:
    # Stable sampling by user_id hash (reproducible across chunks/runs)
    h = pd.util.hash_pandas_object(user_id.astype("int64"), index=False).astype("uint64").to_numpy()
    h = h ^ np.uint64(seed)
    return (h % np.uint64(1_000_000)) < np.uint64(int(frac * 1_000_000))

def parse_event_time(s: pd.Series) -> pd.Series:
    # Raw format often ends with " UTC" (string)
    # Example: "2020-09-24 11:57:06 UTC"
    s2 = s.astype("string").str.replace(" UTC", "", regex=False)
    return pd.to_datetime(s2, utc=True, errors="coerce")

print("Config loaded.")


Config loaded.


In [13]:
# Stream CSV â†’ write parquet (full + monthly + 10% sample) + QC

# Remove existing outputs for a clean rebuild
for p in [OUT_FULL_PARQUET, OUT_SAMPLE10_PARQUET]:
    if p.exists():
        p.unlink()

for p in MONTHLY_DIR.glob("events_*.parquet"):
    p.unlink()

# Writers (created on first write)
full_writer = None
sample_writer = None
month_writers = {}

# QC accumulators
total_rows_in = 0
rows_written_full = 0
rows_written_sample = 0
rows_dropped_bad_time = 0
rows_dropped_bad_price = 0

min_time = None
max_time = None

event_type_counts = {}
month_counts = {}

reader = pd.read_csv(
    RAW_EVENTS_CSV,
    usecols=REQ_COLS,
    dtype=DTYPE,
    chunksize=CHUNKSIZE,
)

for i, chunk in enumerate(reader, start=1):
    total_rows_in += len(chunk)

    # Parse time
    t = parse_event_time(chunk["event_time"])
    bad_time = t.isna()
    rows_dropped_bad_time += int(bad_time.sum())
    chunk = chunk.loc[~bad_time].copy()
    t = t.loc[~bad_time]

    if len(chunk) == 0:
        continue

    # Basic price QC (negative -> drop)
    bad_price = chunk["price"].notna() & (chunk["price"] < 0)
    rows_dropped_bad_price += int(bad_price.sum())
    chunk = chunk.loc[~bad_price].copy()
    t = t.loc[~bad_price]

    if len(chunk) == 0:
        continue

    # Attach parsed time (keep as UTC)
    chunk["event_time"] = t

    # Normalize event_type
    chunk["event_type"] = chunk["event_type"].astype("string")

    # Month key
    month_key = chunk["event_time"].dt.strftime("%Y-%m")
    chunk["_month"] = month_key

    # Update QC min/max
    cmin = chunk["event_time"].min()
    cmax = chunk["event_time"].max()
    min_time = cmin if (min_time is None or cmin < min_time) else min_time
    max_time = cmax if (max_time is None or cmax > max_time) else max_time

    # Update counts
    et = chunk["event_type"].value_counts(dropna=False)
    for k, v in et.items():
        event_type_counts[k] = event_type_counts.get(k, 0) + int(v)

    mc = chunk["_month"].value_counts(dropna=False)
    for k, v in mc.items():
        month_counts[k] = month_counts.get(k, 0) + int(v)

    # Drop helper from parquet
    out_full = chunk.drop(columns=["_month"])

    # Write FULL parquet
    table_full = pa.Table.from_pandas(out_full, preserve_index=False)
    if full_writer is None:
        full_writer = pq.ParquetWriter(OUT_FULL_PARQUET, table_full.schema, compression="snappy")
    full_writer.write_table(table_full)
    rows_written_full += len(out_full)

    # Write MONTHLY parquet
    for mk in chunk["_month"].unique():
        part = chunk.loc[chunk["_month"] == mk].drop(columns=["_month"])
        table_m = pa.Table.from_pandas(part, preserve_index=False)
        out_path = MONTHLY_DIR / f"events_{mk}_optimized.parquet"
        if mk not in month_writers:
            month_writers[mk] = pq.ParquetWriter(out_path, table_m.schema, compression="snappy")
        month_writers[mk].write_table(table_m)

    # Write SAMPLE 10% parquet (stable by user_id)
    keep = stable_user_sample_mask(chunk["user_id"], SAMPLE_FRAC, HASH_SEED)
    part_s = chunk.loc[keep].drop(columns=["_month"])
    if len(part_s) > 0:
        table_s = pa.Table.from_pandas(part_s, preserve_index=False)
        if sample_writer is None:
            sample_writer = pq.ParquetWriter(OUT_SAMPLE10_PARQUET, table_s.schema, compression="snappy")
        sample_writer.write_table(table_s)
        rows_written_sample += len(part_s)

    if i % 3 == 0:
        print(f"Chunks processed: {i} | rows_in: {total_rows_in:,} | full_written: {rows_written_full:,} | sample_written: {rows_written_sample:,}")

# Close writers
if full_writer is not None:
    full_writer.close()
if sample_writer is not None:
    sample_writer.close()
for w in month_writers.values():
    w.close()

print("\nDone.")
print("Total rows in:", f"{total_rows_in:,}")
print("Dropped bad time:", f"{rows_dropped_bad_time:,}")
print("Dropped bad price:", f"{rows_dropped_bad_price:,}")
print("Full parquet written:", f"{rows_written_full:,}")
print("Sample parquet written:", f"{rows_written_sample:,}")
print("Min time:", min_time)
print("Max time:", max_time)
print("Monthly parquet files:", len(list(MONTHLY_DIR.glob("events_*.parquet"))))


Done.
Total rows in: 885,129
Dropped bad time: 0
Dropped bad price: 0
Full parquet written: 885,129
Sample parquet written: 88,644
Min time: 2020-09-24 11:57:06+00:00
Max time: 2021-02-28 23:59:09+00:00
Monthly parquet files: 6


In [14]:
# Save QC reports

qc_summary = pd.DataFrame([{
    "raw_csv": str(RAW_EVENTS_CSV),
    "total_rows_in": int(total_rows_in),
    "rows_written_full": int(rows_written_full),
    "rows_written_sample10pct": int(rows_written_sample),
    "rows_dropped_bad_time": int(rows_dropped_bad_time),
    "rows_dropped_bad_price": int(rows_dropped_bad_price),
    "min_time_utc": str(min_time),
    "max_time_utc": str(max_time),
    "n_month_files": int(len(list(MONTHLY_DIR.glob("events_*.parquet")))),
}])

pd.DataFrame(sorted(event_type_counts.items()), columns=["event_type", "count"]).to_csv(QC_EVENTTYPE_OUT, index=False)
pd.DataFrame(sorted(month_counts.items()), columns=["month", "count"]).to_csv(QC_MONTH_OUT, index=False)
qc_summary.to_csv(QC_SUMMARY_OUT, index=False)

print("Saved QC:")
print(" -", QC_SUMMARY_OUT)
print(" -", QC_EVENTTYPE_OUT)
print(" -", QC_MONTH_OUT)

display(qc_summary)
display(pd.read_csv(QC_EVENTTYPE_OUT))
display(pd.read_csv(QC_MONTH_OUT).sort_values("month"))

Saved QC:
 - C:\Users\seony\Desktop\personal_project\purchase_prediction\artifacts\reports\00_events_qc_summary.csv
 - C:\Users\seony\Desktop\personal_project\purchase_prediction\artifacts\reports\00_events_event_type_counts.csv
 - C:\Users\seony\Desktop\personal_project\purchase_prediction\artifacts\reports\00_events_month_counts.csv


Unnamed: 0,raw_csv,total_rows_in,rows_written_full,rows_written_sample10pct,rows_dropped_bad_time,rows_dropped_bad_price,min_time_utc,max_time_utc,n_month_files
0,C:\Users\seony\Desktop\personal_project\purcha...,885129,885129,88644,0,0,2020-09-24 11:57:06+00:00,2021-02-28 23:59:09+00:00,6


Unnamed: 0,event_type,count
0,cart,54035
1,purchase,37346
2,view,793748


Unnamed: 0,month,count
0,2020-09,28074
1,2020-10,161544
2,2020-11,188225
3,2020-12,152720
4,2021-01,187587
5,2021-02,166979


In [15]:
# Quick sanity read (parquet)

dfp = pd.read_parquet(OUT_FULL_PARQUET, engine="pyarrow")
print("Parquet loaded:", dfp.shape)
display(dfp.head(10))

t = pd.to_datetime(dfp["event_time"], utc=True, errors="coerce")
print("Time range:", t.min(), "->", t.max())
print("Event types:", dfp["event_type"].value_counts().head(10).to_dict())

Parquet loaded: (885129, 9)


Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2020-09-24 11:57:06+00:00,view,1996170,2144415922528452715,electronics.telephone,,31.9,1515915625519388267,LJuJVLEjPT
1,2020-09-24 11:57:26+00:00,view,139905,2144415926932472027,computers.components.cooler,zalman,17.16,1515915625519380411,tdicluNnRY
2,2020-09-24 11:57:27+00:00,view,215454,2144415927158964449,,,9.81,1515915625513238515,4TMArHtXQy
3,2020-09-24 11:57:33+00:00,view,635807,2144415923107266682,computers.peripherals.printer,pantum,113.809998,1515915625519014356,aGFYrNgC08
4,2020-09-24 11:57:36+00:00,view,3658723,2144415921169498184,,cameronsino,15.87,1515915625510743344,aa4mmk0kwQ
5,2020-09-24 11:57:59+00:00,view,664325,2144415951611757447,construction.tools.saw,carver,52.330002,1515915625519388062,vnkdP81DDW
6,2020-09-24 11:58:23+00:00,view,3791349,2144415935086199225,computers.desktop,,215.410004,1515915625519388877,J1t6sIYXiV
7,2020-09-24 11:58:24+00:00,view,716611,2144415923694469257,computers.network.router,d-link,53.139999,1515915625519388882,kVBeYDPcBw
8,2020-09-24 11:58:25+00:00,view,657859,2144415939431498289,,,34.169998,1515915625519320570,HEl15U7JVy
9,2020-09-24 11:58:31+00:00,view,716611,2144415923694469257,computers.network.router,d-link,53.139999,1515915625519388929,F3VB9LYp39


Time range: 2020-09-24 11:57:06+00:00 -> 2021-02-28 23:59:09+00:00
Event types: {'view': 793748, 'cart': 54035, 'purchase': 37346}
