# MADSC102-MUC07322 Final Exam Project: End-to-End Data Analytics on Databricks

**Student Name: VENKAT SAI MOTHE
**Date:** 14/12/2025
**Platform:** Databricks
**Data Source:** Kaggle CSV File (Online Retail Dataset)


## Public CSV Link
Using the following public URL to access the dataset for structured ingestion:

**CSV URL:** `https://files.manuscdn.com/user_upload_by_module/session_file/310419663030150399/rsxNxRySzwJGsLbc.csv`

## Phase 1: Data Ingestion (CSV)

We will load the 'Online Retail' CSV file directly from the public URL into a PySpark DataFrame.

In [0]:
from pyspark.sql.functions import col, lit, when, regexp_replace, trim, to_timestamp

CSV_URL = "https://files.manuscdn.com/user_upload_by_module/session_file/310419663030150399/rsxNxRySzwJGsLbc.csv"

raw_df = spark.read.csv(
    CSV_URL,
    header=True,
    inferSchema=True,
    sep="," 
)

print(f"Raw DataFrame loaded with {raw_df.count()} records.")
raw_df.printSchema()
display(raw_df.limit(5))

## Phase 2: Data Cleaning, Preparation, and Delta Storage

We will perform cleaning by:
1.  Filtering out cancelled transactions (InvoiceNo starting with 'C').
2.  Dropping rows with missing CustomerID (required for customer analysis).
3.  Calculating a new `Sales` column (`Quantity * UnitPrice`).
4.  Casting `InvoiceDate` to a proper timestamp format.

In [0]:
df_cleaned = raw_df.filter(~col("InvoiceNo").startswith("C")) \
                     .filter(col("CustomerID").isNotNull())

df_final = df_cleaned.withColumn("Sales", col("Quantity") * col("UnitPrice")) \
                     .withColumn("InvoiceTimestamp", to_timestamp(col("InvoiceDate"), "M/d/yyyy H:mm")) \
                     .select(
                         col("InvoiceNo"),
                         col("StockCode"),
                         trim(col("Description")).alias("Description"), 
                         col("Quantity"),
                         col("InvoiceTimestamp").alias("InvoiceDate"),
                         col("UnitPrice"),
                         col("CustomerID"),
                         col("Country"),
                         col("Sales")
                     )

print(f"Cleaned DataFrame has {df_final.count()} records.")
df_final.printSchema()
display(df_final.limit(5))

DATABASE_NAME = "final_exam_db"
TABLE_NAME = "online_retail_data"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME}")

df_final.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(f"{DATABASE_NAME}.{TABLE_NAME}")

print(f"Data successfully written to Delta table: {DATABASE_NAME}.{TABLE_NAME}")

## Phase 3: SQL Analysis (for Dashboarding)

In [0]:
%sql
SELECT
  Description,
  SUM(Sales) AS TotalSales
FROM
  final_exam_db.online_retail_data
GROUP BY
  Description
ORDER BY
  TotalSales DESC
LIMIT 10

In [0]:
%sql
SELECT
  Country,
  SUM(Sales) AS TotalSales,
  COUNT(DISTINCT CustomerID) AS UniqueCustomers
FROM
  final_exam_db.online_retail_data
GROUP BY
  Country
ORDER BY
  TotalSales DESC

In [None]:
%sql
SELECT
  DATE_FORMAT(InvoiceDate, 'yyyy-MM') AS YearMonth,
  SUM(Sales) AS MonthlySalesRevenue,
  COUNT(DISTINCT InvoiceNo) AS OrderVolume
FROM final_exam_db.online_retail_data
GROUP BY DATE_FORMAT(InvoiceDate, 'yyyy-MM')
ORDER BY YearMonth;

In [None]:
%sql
SELECT
  Country,
  ROUND(SUM(Sales) / COUNT(DISTINCT CustomerID), 2) AS AvgSalesPerCustomer
FROM final_exam_db.online_retail_data
GROUP BY Country
ORDER BY AvgSalesPerCustomer DESC;

In [None]:
%sql
SELECT
  Country,
  COUNT(*) AS TotalTransactions
FROM final_exam_db.online_retail_data
GROUP BY Country
ORDER BY TotalTransactions DESC;

## Phase 4: Notebook Analysis (PySpark)


In [0]:
retail_df = spark.table("final_exam_db.online_retail_data")
print("Loaded Delta table for analysis.")

print("\nDescriptive Statistics for Quantity, UnitPrice, and Sales:")
retail_df.select('Quantity', 'UnitPrice', 'Sales').describe().show()

from pyspark.sql.functions import sum

top_customers = retail_df.groupBy("CustomerID") \
                         .agg(sum("Sales").alias("TotalSpend")) \
                         .orderBy(col("TotalSpend").desc())

print("\nTop 5 Customers by Total Spend:")
display(top_customers.limit(5))

## Phase 5: Dashboard Instructions

**Note:** The actual Databricks SQL Dashboard must be created in the Databricks workspace outside of this notebook. 

**Instructions for Dashboard Creation:**

1.  **Create SQL Queries:** Save the two SQL queries from Phase 3 as Databricks SQL Queries (e.g., `Q1_Top_10_Products` and `Q2_Sales_by_Country`).
2.  **Create Dashboard:** Create a new Databricks SQL Dashboard.
3.  **Add Visualizations:**
    *   Use `Q1_Top_10_Products` to create a **Bar Chart** (X-axis: `Description`, Y-axis: `TotalSales`).
    *   Use `Q2_Sales_by_Country` to create a **Map Visualization** or a **Pie Chart** (Group by: `Country`, Value: `TotalSales`).
4.  **Add Filters:** Add a **Text Parameter** filter to the dashboard linked to the `Country` column in the underlying table to allow filtering of the data.