# Part 3: Spark SQL & DataFrames (Google Colab Notebook)
This notebook implements Part 3 of the graded assessment: 
- Load CSVs (customers, orders, products)
- Run Spark SQL queries (total spend > X, monthly trends, top-selling category)
- Save results as Parquet and JSON


In [None]:
!pip -q install pyspark

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("RetailAnalytics-Part3") \
    .getOrCreate()

spark


In [None]:
customers_path = "/mnt/data/customers.csv"
orders_path    = "/mnt/data/orders.csv"
products_path  = "/mnt/data/products.csv"

customers_df = spark.read.csv(customers_path, header=True, inferSchema=True)
orders_df    = spark.read.csv(orders_path,    header=True, inferSchema=True)
products_df  = spark.read.csv(products_path,  header=True, inferSchema=True)

print("Customers columns:", customers_df.columns)
print("Orders columns:", orders_df.columns)
print("Products columns:", products_df.columns)


In [None]:
def first_present(df, candidates):
    for c in candidates:
        if c in df.columns:
            return c
    return None

ord_qty_col = first_present(orders_df, ["quantity", "qty", "Quantity"])
ord_unit_price_col = first_present(orders_df, ["unit_price", "UnitPrice", "unitprice", "price_per_unit"])
ord_total_amt_col = first_present(orders_df, ["total_amount", "TotalAmount", "amount", "Amount", "order_amount"])
ord_price_col = first_present(orders_df, ["price", "Price", "unit_price", "UnitPrice"])
ord_date_col = first_present(orders_df, ["order_date", "OrderDate", "date", "Date", "order_datetime", "timestamp"])

if ord_date_col is None:
    raise ValueError("Could not find an order date column in orders file.")

orders_with_date = orders_df.withColumn(
    "_order_ts",
    F.to_timestamp(F.col(ord_date_col))
).withColumn(
    "order_date",
    F.to_date(F.col("_order_ts"))
).drop("_order_ts")

if ord_qty_col and ord_unit_price_col:
    orders_clean = orders_with_date.withColumn(
        "revenue",
        F.col(ord_qty_col).cast("double") * F.col(ord_unit_price_col).cast("double")
    )
elif ord_total_amt_col:
    orders_clean = orders_with_date.withColumn(
        "revenue",
        F.col(ord_total_amt_col).cast("double")
    )
elif ord_qty_col and ord_price_col:
    orders_clean = orders_with_date.withColumn(
        "revenue",
        F.col(ord_qty_col).cast("double") * F.col(ord_price_col).cast("double")
    )
else:
    raise ValueError("Could not derive revenue.")

orders_clean.select("order_date","revenue").show(5, truncate=False)


In [None]:
customers_df.createOrReplaceTempView("customers")
orders_clean.createOrReplaceTempView("orders")
products_df.createOrReplaceTempView("products")


In [None]:
cust_key = None
for c in ["customer_id", "cust_id", "CustomerID", "Customer_Id"]:
    if c in customers_df.columns:
        cust_key = c
        break

ord_cust_key = None
for c in ["customer_id", "cust_id", "CustomerID", "Customer_Id"]:
    if c in orders_clean.columns:
        ord_cust_key = c
        break

customers_std = customers_df.withColumnRenamed(cust_key, "customer_id_std")
orders_std = orders_clean.withColumnRenamed(ord_cust_key, "customer_id_std")
customers_std.createOrReplaceTempView("customers_std")
orders_std.createOrReplaceTempView("orders_std")

X = 500.0

sql_customers_over_X = f"""
SELECT
  c.*,
  ROUND(SUM(o.revenue), 2) AS total_spend
FROM customers_std c
JOIN orders_std o
  ON c.customer_id_std = o.customer_id_std
GROUP BY c.*
HAVING SUM(o.revenue) > {X}
ORDER BY total_spend DESC
"""

customers_over_X_df = spark.sql(sql_customers_over_X)
customers_over_X_df.show(20, truncate=False)


In [None]:
sql_monthly_trend = """
SELECT
  DATE_FORMAT(order_date, 'yyyy-MM') AS year_month,
  ROUND(SUM(revenue), 2) AS monthly_revenue,
  COUNT(*) AS order_count
FROM orders_std
GROUP BY DATE_FORMAT(order_date, 'yyyy-MM')
ORDER BY year_month
"""

monthly_trend_df = spark.sql(sql_monthly_trend)
monthly_trend_df.show(50, truncate=False)


In [None]:
prod_key_orders = None
for c in ["product_id", "prod_id", "ProductID", "Product_Id"]:
    if c in orders_std.columns:
        prod_key_orders = c
        break

prod_key_products = None
for c in ["product_id", "prod_id", "ProductID", "Product_Id"]:
    if c in products_df.columns:
        prod_key_products = c
        break

cat_col = None
for c in ["category", "Category", "category_name", "CategoryName"]:
    if c in products_df.columns:
        cat_col = c
        break

products_std = products_df.withColumnRenamed(prod_key_products, "product_id_std") \                          .withColumnRenamed(cat_col, "category_std")
orders_w_prod = orders_std.withColumnRenamed(prod_key_orders, "product_id_std")

products_std.createOrReplaceTempView("products_std")
orders_w_prod.createOrReplaceTempView("orders_w_prod")

sql_top_category = """
SELECT
  p.category_std AS category,
  ROUND(SUM(o.revenue), 2) AS total_revenue,
  SUM(1) AS order_rows
FROM orders_w_prod o
JOIN products_std p
  ON o.product_id_std = p.product_id_std
GROUP BY p.category_std
ORDER BY total_revenue DESC
"""

top_category_df = spark.sql(sql_top_category)
top_category_df.show(20, truncate=False)


In [None]:
output_base = "/content/output/retail_analytics"

(
    customers_over_X_df
    .coalesce(1)
    .write.mode("overwrite")
    .parquet(f"{output_base}/customers_over_X_parquet")
)
(
    customers_over_X_df
    .coalesce(1)
    .write.mode("overwrite")
    .json(f"{output_base}/customers_over_X_json")
)

(
    monthly_trend_df
    .write.mode("overwrite")
    .parquet(f"{output_base}/monthly_trend_parquet")
)
(
    monthly_trend_df
    .write.mode("overwrite")
    .json(f"{output_base}/monthly_trend_json")
)

(
    top_category_df
    .write.mode("overwrite")
    .parquet(f"{output_base}/top_category_parquet")
)
(
    top_category_df
    .write.mode("overwrite")
    .json(f"{output_base}/top_category_json")
)

print("Wrote results under:", output_base)
