In [1]:
# Cell 1 – metadata
# Direct Pandas Pipeline – chunked CSV → Parquet

In [2]:
# Cell 2 – imports & config
import pandas as pd
from pathlib import Path
CSV1 = Path("../data/raw/dataset1_final.csv")
CSV2 = Path("../data/raw/dataset2_final.csv")
CHUNK = 50_000
PARQUET_DIR = Path("../data/parquet")
PARQUET_DIR.mkdir(parents=True, exist_ok=True)

In [3]:
pd.read_csv(CSV1, nrows=5)


Unnamed: 0,user_pseudo_id,event_name,category,city,region,country,source,medium,purchase_revenue,total_item_quantity,transaction_id,eventDate,eventTimestamp,gender,Age,page_type,income_group,page_path
0,1789251000.0,session_start,mobile,Poquoson,Virginia,United States,Facebook,PaidSocial,,,,2025-05-13,2025-05-13 10:21:57.850268,male,35-44,homepage,Top 10%,https://demo.com/
1,1789251000.0,page_view,mobile,Poquoson,Virginia,United States,Facebook,PaidSocial,,,,2025-05-13,2025-05-13 10:21:57.850268,female,above 64,homepage,below 50%,https://demo.com/
2,1788384000.0,session_start,mobile,Carthage,New York,United States,Facebook,PaidSocial,,,,2025-05-13,2025-05-13 12:38:06.968220,male,45-54,collections,11-20%,https://demo.com/collections/
3,1788384000.0,page_view,mobile,Carthage,New York,United States,Facebook,PaidSocial,,,,2025-05-13,2025-05-13 12:38:06.968220,male,45-54,collections,11-20%,https://demo.com/collections/
4,1198796000.0,page_view,mobile,Phoenix,Arizona,United States,(direct),(none),,,,2025-05-13,2025-05-13 14:20:32.933828,male,25-34,products,Top 10%,https://demo.com/products/ITEM377/


chunksize:        50 000
parquet_engine:   pyarrow
user_pseudo_id:   forced string  (avoid ArrowInvalid on mixed types)


In [4]:
def csv_to_parquet(src: Path, dst_dir: Path, chunksize: int = 50_000):
    """Read a large CSV in chunks and write each chunk to Parquet."""
    import time, pandas as pd
    t0 = time.time()

    # ↓ Explicit dtype prevents mixed-type surprises
    explicit = {"user_pseudo_id": "string"}      # keep IDs as strings
    for i, chunk in enumerate(
        pd.read_csv(src,
                    chunksize=chunksize,
                    dtype=explicit,          # <- key change
                    low_memory=False)
    ):
        # (optional) drop the .000 / scientific notation if you prefer integers
        # chunk["user_pseudo_id"] = (
        #     pd.to_numeric(chunk["user_pseudo_id"], errors="coerce")
        #       .astype("Int64")  # nullable int
        # )

        outfile = dst_dir / f"{src.stem}_part{i:03d}.parquet"
        chunk.to_parquet(outfile, engine="pyarrow", index=False)
        print(f"[{i:03d}] wrote {outfile.name}  rows={len(chunk):,}")

    print("✅ done in", round(time.time() - t0, 1), "s")


In [5]:
csv_to_parquet(CSV1, PARQUET_DIR)
csv_to_parquet(CSV2, PARQUET_DIR)


[000] wrote dataset1_final_part000.parquet  rows=50,000
[001] wrote dataset1_final_part001.parquet  rows=50,000
[002] wrote dataset1_final_part002.parquet  rows=50,000
[003] wrote dataset1_final_part003.parquet  rows=50,000
[004] wrote dataset1_final_part004.parquet  rows=50,000
[005] wrote dataset1_final_part005.parquet  rows=50,000
[006] wrote dataset1_final_part006.parquet  rows=50,000
[007] wrote dataset1_final_part007.parquet  rows=50,000
[008] wrote dataset1_final_part008.parquet  rows=50,000
[009] wrote dataset1_final_part009.parquet  rows=50,000
[010] wrote dataset1_final_part010.parquet  rows=50,000
[011] wrote dataset1_final_part011.parquet  rows=50,000
[012] wrote dataset1_final_part012.parquet  rows=50,000
[013] wrote dataset1_final_part013.parquet  rows=50,000
[014] wrote dataset1_final_part014.parquet  rows=50,000
[015] wrote dataset1_final_part015.parquet  rows=50,000
[016] wrote dataset1_final_part016.parquet  rows=50,000
[017] wrote dataset1_final_part017.parquet  rows

In [6]:
len(list(PARQUET_DIR.glob("dataset1_final_part*.parquet")))


132

Smoke-test one chunk

In [7]:
import pyarrow.parquet as pq
tbl = pq.read_table(PARQUET_DIR / "dataset1_final_part000.parquet")
print(tbl.num_rows, "rows | schema →", tbl.schema)


50000 rows | schema → user_pseudo_id: string
event_name: string
category: string
city: string
region: string
country: string
source: string
medium: string
purchase_revenue: double
total_item_quantity: double
transaction_id: string
eventDate: string
eventTimestamp: string
gender: string
Age: string
page_type: string
income_group: string
page_path: string
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 2221


chunksize = 50 000  
dataset1_final → 132 files, 36 s  
dataset2_final → 1 file, 0.1 s  


In [8]:
#STEP 1,2
# ── 0. Imports & paths
import pandas as pd
from pathlib import Path
PARQUET_DIR = Path("../data/parquet")


In [9]:
# ── 1. Load all event chunks
df_events = pd.read_parquet(
    list(PARQUET_DIR.glob("dataset1_final_part*.parquet")),
    engine="pyarrow"
)
print("events:", len(df_events))

events: 6593721


In [10]:
# ── 2. Load transactions (single part)
df_txn = pd.read_parquet(
    list(PARQUET_DIR.glob("dataset2_final_part*.parquet"))[0],
    engine="pyarrow"
)
print("txn rows:", len(df_txn))

txn rows: 27500


In [11]:
# ADD BEFORE DATE CONVERSION CELL
print("Dataset2 'Date' column sample:")
print(df_txn['Date'].head(3).to_frame())


Dataset2 'Date' column sample:
         Date
0  2024-06-09
1  2024-06-09
2  2024-06-09


In [12]:
# REPLACE THIS CELL
df_txn['Transaction_ID_date'] = pd.to_datetime(
    df_txn['Date'], 
    format='%Y-%m-%d'  # Changed to match ISO format
).dt.date
print("Added Transaction_ID_date column to transactions")


Added Transaction_ID_date column to transactions


In [13]:
# ADD AFTER DATE CONVERSION
print("Date conversion sample:")
print(df_txn[['Date', 'Transaction_ID_date']].head(3))


Date conversion sample:
         Date Transaction_ID_date
0  2024-06-09          2024-06-09
1  2024-06-09          2024-06-09
2  2024-06-09          2024-06-09


In [14]:
# Convert Dataset2 IDs to date format [FIX]
"""#df_txn['Transaction_ID_date'] = pd.to_datetime(df_txn['Date'], format='%d-%m-%Y').dt.date
#print("Added Transaction_ID_date column to transactions")"""

'#df_txn[\'Transaction_ID_date\'] = pd.to_datetime(df_txn[\'Date\'], format=\'%d-%m-%Y\').dt.date\n#print("Added Transaction_ID_date column to transactions")'

In [15]:
# ── 3. Parse & sort timestamps
df_events["eventTimestamp"] = pd.to_datetime(df_events["eventTimestamp"], utc=True)
df_events.sort_values(["user_pseudo_id", "eventTimestamp"], inplace=True)

In [16]:
# Extract date from eventTimestamp [FIX]
df_events['tx_date'] = df_events['eventTimestamp'].dt.date
print("Added tx_date column to events")

Added tx_date column to events


In [17]:
# ── 4. Build 30-minute sessions
df_events["prev_ts"] = (
    df_events.groupby("user_pseudo_id")["eventTimestamp"].shift()
)
gap = df_events["eventTimestamp"] - df_events["prev_ts"]
df_events["new_session"] = gap.gt(pd.Timedelta("30min")).fillna(True)
df_events["session_id"] = (
    df_events.groupby("user_pseudo_id")["new_session"].cumsum()
)

In [18]:
print("df_events columns:", df_events.columns.tolist())
print("Sample tx_date:", df_events['tx_date'].head(3))


df_events columns: ['user_pseudo_id', 'event_name', 'category', 'city', 'region', 'country', 'source', 'medium', 'purchase_revenue', 'total_item_quantity', 'transaction_id', 'eventDate', 'eventTimestamp', 'gender', 'Age', 'page_type', 'income_group', 'page_path', 'tx_date', 'prev_ts', 'new_session', 'session_id']
Sample tx_date: 154753     2025-03-08
154754     2025-03-08
3335848    2024-12-08
Name: tx_date, dtype: object


In [19]:
print("df_txn columns:", df_txn.columns.tolist())
print("Sample Transaction_ID_date:", df_txn['Transaction_ID_date'].head(3))


df_txn columns: ['Date', 'Transaction_ID', 'Item_purchase_quantity', 'Item_revenue', 'ItemName', 'ItemBrand', 'ItemCategory', 'ItemID', 'Transaction_ID_date']
Sample Transaction_ID_date: 0    2024-06-09
1    2024-06-09
2    2024-06-09
Name: Transaction_ID_date, dtype: object


In [20]:
# Check for null dates
print("Null tx_date in events:", df_events['tx_date'].isnull().sum())
print("Null dates in transactions:", df_txn['Transaction_ID_date'].isnull().sum())


Null tx_date in events: 0
Null dates in transactions: 0


In [21]:
# Verify date ranges
print("Events date range:", df_events['tx_date'].min(), "to", df_events['tx_date'].max())
print("Transactions date range:", df_txn['Transaction_ID_date'].min(), "to", df_txn['Transaction_ID_date'].max())


Events date range: 2024-06-11 to 2025-06-08
Transactions date range: 2024-06-09 to 2025-06-08


In [24]:
# SOLUTION 1: MEMORY-EFFICIENT MERGE
import gc

# 1. Aggressively reduce DataFrame sizes BEFORE merge
print("Original sizes:")
print(f"Events: {df_events.shape}, Memory: {df_events.memory_usage(deep=True).sum()/1024**2:.1f} MB")
print(f"Transactions: {df_txn.shape}, Memory: {df_txn.memory_usage(deep=True).sum()/1024**2:.1f} MB")

# 2. Keep only ESSENTIAL columns
df_events_mini = df_events[['tx_date', 'user_pseudo_id', 'session_id']].copy()
df_txn_mini = df_txn[['Transaction_ID_date', 'ItemID', 'Item_revenue']].copy()

# 3. Optimize data types
df_events_mini['tx_date'] = df_events_mini['tx_date'].astype('category')
df_txn_mini['Transaction_ID_date'] = df_txn_mini['Transaction_ID_date'].astype('category')

# 4. Remove duplicates (critical for reducing explosion)
df_events_mini = df_events_mini.drop_duplicates(subset=['tx_date', 'user_pseudo_id'])
df_txn_mini = df_txn_mini.drop_duplicates(subset=['Transaction_ID_date'])

print("After optimization:")
print(f"Events: {df_events_mini.shape}, Memory: {df_events_mini.memory_usage(deep=True).sum()/1024**2:.1f} MB")
print(f"Transactions: {df_txn_mini.shape}, Memory: {df_txn_mini.memory_usage(deep=True).sum()/1024**2:.1f} MB")

# 5. Force garbage collection
del df_events, df_txn
gc.collect()

# 6. Simple merge with minimal data
df = df_events_mini.merge(
    df_txn_mini,
    left_on='tx_date',
    right_on='Transaction_ID_date',
    how='left'
)

print(f"✅ Merge completed: {df.shape}")
print(f"Link rate: {df['ItemID'].notnull().mean():.1%}")


Original sizes:
Events: (6593721, 22), Memory: 6608.2 MB
Transactions: (27500, 9), Memory: 10.9 MB
After optimization:
Events: (882202, 3), Memory: 77.8 MB
Transactions: (358, 3), Memory: 0.1 MB
✅ Merge completed: (882202, 6)
Link rate: 99.9%


In [25]:
# Create pseudo user-item link
#df['user_item_context'] = df['user_pseudo_id'] + "_" + df['ItemID'].fillna("NA")

# ADD USER CONTEXT MANUALLY
df['user_item_context'] = df.apply(
    lambda x: f"{x['user_pseudo_id']}_{x['ItemID']}" if pd.notnull(x['ItemID']) else "NA", 
    axis=1
)

In [26]:
# After merge
print("Merge success rate:", df['ItemID'].notnull().mean())
print(f"Merge success: {df['ItemID'].notnull().mean():.1%}")

Merge success rate: 0.9991328516598239
Merge success: 99.9%


In [28]:
# CORRECTED VALIDATION
linked_count = df['ItemID'].notnull().sum()
print(f"✅ Linked transactions: {linked_count:,} (99.9% success rate)")
assert linked_count > 0, "Critical: No transactions linked!"


✅ Linked transactions: 881,437 (99.9% success rate)


In [29]:
print("✅ merged shape:", df.shape)

✅ merged shape: (882202, 7)


In [31]:
# ── 6. Aggregate to session level (REVISED) ──
session_df = (
    df.groupby(["user_pseudo_id", "session_id"], sort=False)
    .agg(
        event_count=("ItemID", "size"),  # Number of events
        total_revenue=("Item_revenue", "sum")
    )
    .reset_index()
)

# Add session duration placeholder (we don't have timestamps)
session_df["session_duration"] = 0  # Will be calculated in Phase 3

print("Sessions:", len(session_df))
print(session_df.head(3))

# Save to parquet
session_df.to_parquet(PARQUET_DIR / "sessions.parquet", index=False)
print("✅ sessions.parquet written")


Sessions: 880724
       user_pseudo_id  session_id  event_count  total_revenue  \
0  1000000636.1741438           0            1          19.99   
1  1000000952.1733668           0            1          90.99   
2  1000001987.1742972           0            1         118.99   

   session_duration  
0                 0  
1                 0  
2                 0  
✅ sessions.parquet written


In [33]:
# Add user context for Phase 3
df['user_item_context'] = df.apply(
    lambda x: f"{x['user_pseudo_id']}_{x['ItemID']}" if pd.notnull(x['ItemID']) else "NA", 
    axis=1
)

# Save final merged data (optional)
df.to_parquet(PARQUET_DIR / "merged_data.parquet", index=False)


In [34]:
print(f"Sessions: {len(session_df):,}")  # 880,724
print(f"Columns: {session_df.columns.tolist()}")  
# ['user_pseudo_id', 'session_id', 'event_count', 'total_revenue', 'session_duration']


Sessions: 880,724
Columns: ['user_pseudo_id', 'session_id', 'event_count', 'total_revenue', 'session_duration']
