In [0]:
# Day 2: Data Ingestion and Basic Transformations
#
# Objective:
# - Ingest raw ecommerce events data
# - Apply basic cleaning and type normalization
# - Prepare a combined dataset for downstream analysis

In [0]:
from pyspark.sql.functions import col, to_timestamp

In [0]:
OCT_PATH = "/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv"
NOV_PATH = "/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv"

In [0]:
#Read raw data
oct_raw = spark.read.csv(OCT_PATH, header=True, inferSchema=True)
nov_raw = spark.read.csv(NOV_PATH, header=True, inferSchema=True)

In [0]:
#Standardize columns
def standardize_events(df):
    return(
        df.withColumn("event_ts",to_timestamp(col("event_time")))
        .drop("event_time")
        .withColumn("price",col("price").cast("double"))
    )

oct_std = standardize_events(oct_raw)
nov_std = standardize_events(nov_raw)

In [0]:
# Combine datasets
events_all = oct_std.unionByName(nov_std)
print(f"Total events (Oct + Nov): {events_all.count():,}")

In [0]:
# Basic cleaning rules
events_clean = (
    events_all
    .filter(col("price").isNotNull())
    .filter(col("price")>=0)
)

In [0]:
# Validation checks
events_clean.printSchema()
events_clean.groupBy("event_type").count().show()