# PySpark e-commerce demo project

## Imports

In [0]:
from pyspark.sql.functions import avg, col, count, when, isnan, isnull, year, \
    month, sum, count, countDistinct
from pyspark.sql import functions as F

## Data Ingestion


In [0]:
customers_df = spark.read.csv(
    "/Volumes/pyspark_proj_ecommerce/default/raw_data/customers.csv", 
    header=True, 
    inferSchema=True
)
display(customers_df, n=10)

In [0]:
customers_df.printSchema()

In [0]:
customers_df.count()

In [0]:
orders_df = spark.read.csv(
    "/Volumes/pyspark_proj_ecommerce/default/raw_data/orders.csv", 
    header=True, 
    inferSchema=True
)
items_df = spark.read.csv(
    "/Volumes/pyspark_proj_ecommerce/default/raw_data/order_items.csv", 
    header=True, 
    inferSchema=True
)

## Data quality check

Count null values in each column

In [0]:
null_counts = customers_df.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in customers_df.columns
])
display(null_counts)
# We would the same for orders and items but since the dataset is artifically generated
# we know there is no nulls

Identify any duplicate order_ids

In [0]:
duplicate_order_ids = orders_df.groupBy("order_id").count().filter(col("count") > 1)
display(duplicate_order_ids)

Check if there any order items referencing non existent order IDs

In [0]:
non_existent_order_ids = items_df.join(orders_df, on="order_id", how="leftanti")
display(non_existent_order_ids)

## Data Transformation

In [0]:
orders_enriched_df = orders_df.join(
    customers_df, on="customer_id", how="inner"
    ).filter(col("order_status") == "Completed"
    ).select(["order_id", "customer_id", "customer_name", "country", "order_date", "order_status", "total_amount"]
    ).withColumn("order_year", year(col("order_date"))
    ).withColumn("order_month", month(col("order_date")))
display(orders_enriched_df, n=5)

## Analysis

### Revenue by country

In [0]:
revenue_by_country = orders_enriched_df.groupBy("country"
  ).agg(
    sum("total_amount").alias("total_revenue"),
    count("order_id").alias("number_of_orders")
  ).withColumn("average_order_value", col("total_revenue") / col("number_of_orders")
  ).orderBy(col("total_revenue").desc())
display(revenue_by_country)

### Monthly Trends

In [0]:
monthly_trends_df = orders_enriched_df.groupBy("order_year", "order_month"
  ).agg(
    sum("total_amount").alias("monthly_revenue"),
    count("order_id").alias("number_of_orders")
  ).sort(["order_year", "order_month"])
display(monthly_trends_df)

### Product Category Analysis

In [0]:
completed_orders_items_df = orders_df.join(items_df, on="order_id", how="inner").filter(col("order_status") == "Completed")
category_df = completed_orders_items_df.groupBy("category"
  ).agg(
    sum("quantity").alias("total_quantity_sold"),
    sum(col("price_per_unit") * col("quantity")).alias("total_category_revenue"),
    count("order_id").alias("number_of_orders")
  ).sort(col("total_category_revenue").desc())
display(category_df)


### Top customers

In [0]:
top_customers = orders_enriched_df.groupBy("customer_id", "customer_name"
  ).agg(
    sum("total_amount").alias("total_spent"),
    count("order_id").alias("number_of_orders"),
    avg("total_amount").alias("average_order_value")
  ).sort(col("total_spent").desc()).limit(10)
display(top_customers)

## Write parquet

In [0]:
orders_enriched_df.write.mode("overwrite").parquet("/Volumes/pyspark_proj_ecommerce/default/processed/orders_enriched")
revenue_by_country.write.mode("overwrite").parquet("/Volumes/pyspark_proj_ecommerce/default/processed/revenue_by_country")
top_customers.write.mode("overwrite").parquet("/Volumes/pyspark_proj_ecommerce/default/processed/top_customers")

## Read parquet to verify

In [0]:
top_customers_from_pq = spark.read.parquet("/Volumes/pyspark_proj_ecommerce/default/processed/top_customers")
display(top_customers_from_pq)

## Comprehensive report

In [0]:
report_data = [{
    "total_customers": customers_df.agg(countDistinct("customer_id")).collect()[0][0],
    "total_orders": orders_df.agg(countDistinct("order_id")).collect()[0][0],
    "total_completed_orders": orders_df.filter(col("order_status") == "Completed").agg(countDistinct("order_id")).collect()[0][0],
    "overall_revenue": orders_df.agg(sum("total_amount")).collect()[0][0],
    "top_country": revenue_by_country.first()["country"],
    "top_category": category_df.first()["category"],
    "min_order_date": orders_df.agg(F.min("order_date")).collect()[0][0],
    "max_order_date": orders_df.agg(F.max("order_date")).collect()[0][0]
}]

report_df = spark.createDataFrame(report_data)
display(report_df)