<a href="https://colab.research.google.com/github/lakshanaat99/Assignment_01/blob/main/BigData_Assignment2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment 2 - Group 8

# TASK 1: Environment Setup & Data Ingestion

### 1.1 Install & Configure PySpark

In [None]:
!pip install pyspark==3.5.1
!pip install pymongo
!pip install dnspython

Collecting pyspark==3.5.1
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.7 (from pyspark==3.5.1)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m22.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=bc1b876849c97bff10b032443a0176f293a27f5f53279089f38bc5f2791f65d0
  Stored in directory: /root/.cache/pip/wheels/b1/91/5f/283b53010a8016a4ff1c4a1edd99bbe73afacb099645b5471b
Successfully built pyspark
Installing collected packages: py4j, pyspark
  

In [None]:
# Create a SparkSession object (entry point to use Spark)
from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .appName("ECommerce-BigData-Pipeline")
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.4.1")
    .getOrCreate())

In [None]:
# MongoDB config
# 1. Setup
from google.colab import userdata
import os
from pymongo import MongoClient

# 2. Get Secrets
try:
    mongo_uri = userdata.get('MONGO_URI')
    mongo_db = userdata.get('MONGO_DATABASE') # Fallback if secret missing

    print(f"Attempting to connect with User: {mongo_uri.split(':')[1].split('@')[0]}...") # Prints username only for debug
except Exception as e:
    print("Error reading secrets! Make sure 'Notebook access' is ON in the sidebar.")
    raise e

# 3. Connect & Test
os.environ["MONGO_URI"] = mongo_uri
os.environ["MONGO_DATABASE"] = mongo_db

# 4. Verify connection
try:
    # Using the environment variable to connect, proving it works
    client = MongoClient(os.environ["MONGO_URI"])
    client.admin.command('ping')

    print("Authentication Successful! Connected to MongoDB Atlas.")
    print(f"Environment variables set for database: {os.environ['MONGO_DATABASE']}")
    print("Ready for Spark integration.")
except Exception as e:
    print(f"Connection failed: {e}")

Error reading secrets! Make sure 'Notebook access' is ON in the sidebar.


SecretNotFoundError: Secret MONGO_URI does not exist.

### 1.2 Load Dataset

In [None]:
# Connect Google Drive in Colab
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Import SparkSession class from PySpark SQL module
from pyspark.sql import SparkSession

# Read the CSV file into a Spark DataFrame
raw_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/content/drive/MyDrive/Bigdata Assignment 2 /Online retail.csv")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

### 1.3 Display schema and record counts

In [None]:
raw_df.printSchema()
raw_df.count()

root
 |-- Invoice: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Customer ID: integer (nullable = true)
 |-- Country: string (nullable = true)



541910

### 1.4 Store Bronze data as partitioned Parquet

In [None]:
from pyspark.sql.functions import year, month, col, to_timestamp

bronze_df = raw_df.withColumn(
    "InvoiceDate", to_timestamp(col("InvoiceDate"), "M/d/yyyy H:mm")
).withColumn(
    "year", year("InvoiceDate")
).withColumn(
    "month", month("InvoiceDate")
)

In [None]:
bronze_df.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("/content/bronze")

# Task 2: Data Cleaning & Quality Management

### 2.1 Missing CustomerID Handling

In [None]:
from pyspark.sql.functions import count, when

null_customer = bronze_df.filter(col("Customer ID").isNull()).count()
print(f"Null CustomerID records: {null_customer:,}")

Null CustomerID records: 135,080


In [None]:
# Remove null
df1 = bronze_df.filter(col("Customer ID").isNotNull())
print(f"After removing null CustomerID: {df1.count():,}")

After removing null CustomerID: 406,830


### 2.2 Negative quantities (returns)

In [None]:
negative_qty_count = df1.filter(col("Quantity") < 0).count()
print(f"Negative Quantity records: {negative_qty_count:,}")
df2 = df1.filter(col("Quantity") > 0)
print(f"After removing returns: {df2.count():,}")

Negative Quantity records: 8,905
After removing returns: 397,925


### 2.3 Cancelled invoices

In [None]:
cancelled_df = df2.filter(col("Invoice").startswith("C")).count()
print(f"Cancelled invoices (C prefix): {cancelled_df:,}")
df3 = df2.filter(~col("Invoice").startswith("C"))
print(f"After removing cancellations: {df3.count():,}")

Cancelled invoices (C prefix): 0
After removing cancellations: 397,925


### 2.4 Invalid or extreme prices

In [None]:
invalid_price_df = df3.filter(col("Price") <= 0)
invalid_price_count = invalid_price_df.count()
print(f"Invalid prices (<=0): {invalid_price_count:,}")
df4 = df3.filter(col("Price") > 0)
print(f"After price filter: {df4.count():,}")

Invalid prices (<=0): 40
After price filter: 397,885


### 2.5 Duplicate records

In [None]:
before = df4.count()
silver_df = df4.dropDuplicates()
after = silver_df.count()
duplicates_removed = before - after
print(f"Duplicates removed: {duplicates_removed:,}")
print(f"Final Silver records: {after:,}")

Duplicates removed: 5,192
Final Silver records: 392,693


### 2.6 Data Quality Report

In [None]:
dq_report_df = spark.createDataFrame(
    [{ "Metric": "Total Raw Records", "Count": raw_df.count() },
    { "Metric": "Null CustomerID Removed", "Count": null_customer },
    { "Metric": "Negative Quantity Removed", "Count": negative_qty_count },
    { "Metric": "Cancelled Invoices Removed", "Count": cancelled_df },
    { "Metric": "Invalid Prices Removed", "Count": invalid_price_count },
    { "Metric": "Duplicates Removed", "Count": duplicates_removed },
    { "Metric": "FINAL SILVER Records", "Count": after }],
    ["Metric", "Count"])

print("\n DATA QUALITY REPORT")
dq_report_df.show(truncate=False)


 DATA QUALITY REPORT
+------+--------------------------+
|Metric|Count                     |
+------+--------------------------+
|541910|Total Raw Records         |
|135080|Null CustomerID Removed   |
|8905  |Negative Quantity Removed |
|0     |Cancelled Invoices Removed|
|40    |Invalid Prices Removed    |
|5192  |Duplicates Removed        |
|392693|FINAL SILVER Records      |
+------+--------------------------+



In [None]:
import os
import pandas as pd

# Path to save CSV
folder_path = '/content/drive/MyDrive/Bigdata Assignment 2'
output_path = os.path.join(folder_path, 'data_quality_report.csv')

# Create the folder if it does not exist
os.makedirs(folder_path, exist_ok=True)

# Export the Data Quality report
try:
    dq_report_df.toPandas().to_csv(output_path, index=False)
    print(f"\nData Quality Report saved as '{output_path}'")
except Exception as e:
    print(f"\nCould not save to Drive. Error: {e}")



Data Quality Report saved as '/content/drive/MyDrive/Bigdata Assignment 2/data_quality_report.csv'


# Task 3: Feature Engineering

### 3.1 Load Silver Layer

In [None]:
# Save the silver dataframe to the path so it can be loaded later
silver_df.write.mode("overwrite").parquet("/content/silver/online_retail")

# Read the silver layer from the parquet file
df_silver = spark.read.parquet("/content/silver/online_retail")
df_silver.printSchema()

### 3.2 Revenue calculation

In [None]:
# Import the col function to reference DataFrame columns
from pyspark.sql.functions import col

# Create a new column 'revenue' by multiplying Quantity and UnitPrice
df_feat = df_silver.withColumn(
    "revenue",
    col("Quantity") * col("Price")
)

### 3.3 Time-based features (hour, weekday, month)

In [None]:
# Import time-related functions from PySpark
from pyspark.sql.functions import hour, dayofweek, month

# Extract hour, day of week, and month from the InvoiceDate column
df_feat = df_feat.withColumn("invoice_hour", hour("InvoiceDate")) \
                 .withColumn("weekday", dayofweek("InvoiceDate")) \
                 .withColumn("invoice_month", month("InvoiceDate"))

### 3.4 Basket-level metrics

In [None]:
# Import aggregation functions from PySpark
from pyspark.sql.functions import sum, countDistinct

# Group by Invoice to calculate total revenue and number of unique items per basket
basket_df = df_feat.groupBy("Invoice").agg(
    sum("revenue").alias("basket_revenue"),
    countDistinct("StockCode").alias("items_per_basket")
)

# Join the basket metrics back to the main features dataframe
df_feat = df_feat.join(basket_df, on="Invoice", how="left")

### 3.5 Customer RFM features

In [None]:
# Import functions for calculating dates and maximum values
from pyspark.sql.functions import max, datediff, current_date, countDistinct, sum

# Group by Customer ID to calculate RFM (Recency, Frequency, Monetary) metrics:
# Recency: Days between today and the last purchase
# Frequency: Count of unique invoice numbers
# Monetary: Sum of total revenue
rfm_df = df_feat.groupBy("Customer ID").agg(
    datediff(current_date(), max("InvoiceDate")).alias("recency"),
    countDistinct("Invoice").alias("frequency"),
    sum("revenue").alias("monetary")
)

# Join the customer-level RFM features back to the main dataframe
df_feat = df_feat.join(rfm_df, on="Customer ID", how="left")

### 3.6 Window-Based Feature

In [None]:
# Import Window functionality and the sum aggregation function
from pyspark.sql.window import Window
from pyspark.sql.functions import sum

# Define a window partitioned by Customer ID and ordered by InvoiceDate
# It covers all rows from the start of the customer's history up to the current row
customer_window = Window.partitionBy("Customer ID") \
                        .orderBy("InvoiceDate") \
                        .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Calculate a running total of revenue for each customer using the defined window
df_feat = df_feat.withColumn(
    "running_customer_spend",
    sum("revenue").over(customer_window)
)

### 3.7 Save Feature-Engineered Data

In [None]:
# Save the feature-engineered DataFrame to a Parquet file for persistence
df_feat.write \
    .mode("overwrite") \
    .parquet("/content/feature_engineered/online_retail")

# Task 4: MongoDB Data Modeling

### 4.1 Start Spark with MongoDB Connector

In [None]:
from pyspark.sql import SparkSession
import os

# Create Spark session and tell Spark how to connect to MongoDB
spark = SparkSession.builder \
    .appName("BigData_Task4_MongoDB_GoldLayer") \
    .config(
        "spark.mongodb.write.connection.uri",
        os.environ["MONGO_URI"]
    ) \
    .config(
        "spark.mongodb.write.database",
        os.environ["MONGO_DATABASE"]
    ) \
    .getOrCreate()

print("Spark Session started successfully with MongoDB connector.")

### 4.2 Load Feature-Engineered Data

In [None]:
# Load the feature-engineered dataset created in Task 3
df_feat = spark.read.parquet("/content/feature_engineered/online_retail")

### 4.3 COLLECTION 1 - fact_invoices

Create invoice-level structure

In [None]:
from pyspark.sql.functions import first, sum, collect_list, struct

# Group by Invoice to create invoice-level documents
fact_invoices_df = df_feat.groupBy("Invoice").agg(

    # Customer who placed the invoice
    first("Customer ID").alias("CustomerID"),

    # Invoice date
    first("InvoiceDate").alias("InvoiceDate"),

    # Country of purchase
    first("Country").alias("Country"),

    # Total revenue of the invoice
    sum("revenue").alias("total_invoice_revenue"),

    # Embed purchased items as an array
    collect_list(
        struct(
            "StockCode",
            "Description",
            "Quantity",
            "Price",
            "revenue"
        )
    ).alias("line_items")
)

***Schema Definition (fact_invoices)***


{

   _id: ObjectId,

   Invoice: String,

   CustomerID: Integer,

   InvoiceDate: Date,

   Country: String,

   total_invoice_revenue: Double,

   line_items: [
     
      {
        StockCode: String,
        Description: String,
        Quantity: Integer,
        Price: Double,
        revenue: Double
      }
     ]

}

### 4.4 COLLECTION 2 - dim_customers

Create customer summary

In [None]:
from pyspark.sql.functions import countDistinct, col, when

# Create customer-level aggregation
dim_customers_df = df_feat.groupBy("Customer ID").agg(

    # Recency (days since last purchase)
    first("recency").alias("recency"),

    # Frequency (number of invoices)
    first("frequency").alias("frequency"),

    # Monetary (total spend)
    first("monetary").alias("monetary"),

    # Total invoices count
    countDistinct("Invoice").alias("total_invoices"),

    # Total revenue from customer
    sum("revenue").alias("total_revenue")
)

Add customer segmentation

In [None]:
# Segment customers based on monetary value
dim_customers_df = dim_customers_df.withColumn(
    "customer_segment",
    when(col("monetary") >= 5000, "High Value")
    .when(col("monetary") >= 2000, "Medium Value")
    .otherwise("Low Value")
)

***Schema Definition (dim_customers)***

{

  _id: ObjectId,

  CustomerID: Integer,

  recency: Integer,

  frequency: Integer,

  monetary: Double,

  total_invoices: Integer,

  total_revenue: Double,

  customer_segment: String
  
}

### 4.5 COLLECTION 3 - dim_products

Create product summary

In [None]:
from pyspark.sql.functions import collect_set

# Create product-level performance data
dim_products_df = df_feat.groupBy("StockCode", "Description").agg(

    # Total quantity sold
    sum("Quantity").alias("total_quantity_sold"),

    # Total revenue of product
    sum("revenue").alias("total_product_revenue"),

    # Countries where product was sold
    collect_set("Country").alias("countries_sold")
)

***Schema Definition (dim_products)***

{

  _id: ObjectId,

  StockCode: String,

  Description: String,

  total_quantity_sold: Integer,

  total_product_revenue: Double,

  countries_sold: [String]
  
}

# Task 5 MongoDB Indexing & Write Optimization

##5.1 Write Gold datasets to MongoDB

In [None]:
# Write to MongoDB (V10 Syntax)
fact_invoices_df.write \
    .format("mongodb") \
    .mode("overwrite") \
    .option("spark.mongodb.connection.uri", os.environ.get("MONGO_URI")) \
    .option("spark.mongodb.database", os.environ.get("MONGO_DATABASE")) \
    .option("spark.mongodb.collection", "fact_invoices") \
    .save()

print("Data successfully written to MongoDB!")

In [None]:
# Write dim_customers to MongoDB (V10 Syntax)
dim_customers_df.write \
    .format("mongodb") \
    .mode("overwrite") \
    .option("spark.mongodb.connection.uri", os.environ.get("MONGO_URI")) \
    .option("spark.mongodb.database", os.environ.get("MONGO_DATABASE")) \
    .option("spark.mongodb.collection", "dim_customers") \
    .save()

print("dim_customers data successfully written to MongoDB!")

In [None]:
# Write dim_products to MongoDB (V10 Syntax)
dim_products_df.write \
    .format("mongodb") \
    .mode("overwrite") \
    .option("spark.mongodb.connection.uri", os.environ.get("MONGO_URI")) \
    .option("spark.mongodb.database", os.environ.get("MONGO_DATABASE")) \
    .option("spark.mongodb.collection", "dim_products") \
    .save()

print("dim_products data successfully written to MongoDB!")

##5.2 Create minimum 4 indexes

In [None]:
# Import PyMongo tools for indexing
from pymongo import MongoClient, ASCENDING, DESCENDING
import os

# Connect to MongoDB using existing environment variables
client = MongoClient(os.environ["MONGO_URI"])
db = client[os.environ["MONGO_DATABASE"]]

print("Creating Indexes...")

In [None]:
# 1. Index on CustomerID (Ascending)
db.fact_invoices.create_index([("CustomerID", ASCENDING)])
print("Index 1 Created: fact_invoices.CustomerID (ASC)")

In [None]:
# 2. Index on InvoiceDate (Descending)
db.fact_invoices.create_index([("InvoiceDate", DESCENDING)])
print("Index 2 Created: fact_invoices.InvoiceDate (DESC)")

In [None]:
# 3. Index on customer_segment (Ascending)
db.dim_customers.create_index([("customer_segment", ASCENDING)])
print("Index 3 Created: dim_customers.customer_segment (ASC)")

In [None]:
# 4. Index on total_product_revenue (Descending)
db.dim_products.create_index([("total_product_revenue", DESCENDING)])
print("Index 4 Created: dim_products.total_product_revenue (DESC)")

##5.3 Justify each index

On report

##5.4 Demonstrate query performance improvement

In [None]:
import json

print("Testing Performance on CustomerID Query...")

# sample query (finding invoices for customer 17850)
sample_query = {"CustomerID": 17850}

#MongoDB explain how it will execute this query
explanation = db.fact_invoices.find(sample_query).explain()

# Extract the winning execution plan strategy
winning_stage = explanation['queryPlanner']['winningPlan']['stage']

# Sometimes the IXSCAN is nested under a FETCH stage
if winning_stage == 'FETCH':
    winning_stage = explanation['queryPlanner']['winningPlan']['inputStage']['stage']

print(f"Execution Strategy Used: {winning_stage}")

if winning_stage == "IXSCAN":
    print("SUCCESS! MongoDB utilized the Index (IXSCAN).")
    print("Without the index, this would have been a 'COLLSCAN', forcing MongoDB to read every single document.")
else:
    print("Warning: Index was not used.")

#Task 6 (Analytics & Insights)

##6.1 Spark-based analytics queries

In [None]:
from pyspark.sql.functions import sum, desc, count, col

# 1. Monthly Revenue Trends
print("1. Monthly Revenue Trends")
monthly_revenue_spark = df_feat.groupBy("year", "month") \
    .agg(sum("revenue").alias("Total_Revenue")) \
    .orderBy("year", "month")
monthly_revenue_spark.show()

# 2. Top Customers by Spend
print("2. Top 5 Customers by Spend")
top_customers_spark = dim_customers_df \
    .orderBy(desc("monetary")) \
    .select("Customer ID", "monetary", "total_invoices", "customer_segment")
top_customers_spark.show(5)

# 3. Top Products by Revenue
print("3. Top 5 Products by Revenue")
top_products_spark = dim_products_df \
    .orderBy(desc("total_product_revenue")) \
    .select("StockCode", "Description", "total_product_revenue", "total_quantity_sold")
top_products_spark.show(5)

# 4. Country-Level Sales Analysis
print("4. Top 5 Countries by Sales Revenue")
country_sales_spark = df_feat.groupBy("Country") \
    .agg(sum("revenue").alias("Total_Revenue"), count("Invoice").alias("Total_Transactions")) \
    .orderBy(desc("Total_Revenue"))
country_sales_spark.show(5)

# 5. Return or Cancellation Patterns (Using Bronze Data)
print("5. Most Frequently Returned/Cancelled Products")
# We use bronze_df here because Silver/Gold layers filtered out returns!
returns_spark = bronze_df.filter(col("Quantity") < 0) \
    .groupBy("Description") \
    .agg(sum("Quantity").alias("Total_Returned_Qty"), count("Invoice").alias("Return_Count")) \
    .orderBy("Total_Returned_Qty") # Ordering ascending because quantities are negative
returns_spark.show(5, truncate=False)


##6.2 MongoDB Aggregation Pipelines

In [None]:
# 1. Monthly Revenue Trends (Using fact_invoices)
print("1. Monthly Revenue Trends (MongoDB)")
pipeline_monthly = [
    {"$group": {
        "_id": {"year": {"$year": "$InvoiceDate"}, "month": {"$month": "$InvoiceDate"}},
        "total_revenue": {"$sum": "$total_invoice_revenue"}
    }},
    {"$sort": {"_id.year": 1, "_id.month": 1}}
]
res_monthly = list(db.fact_invoices.aggregate(pipeline_monthly))
# Using pandas just for clean printing in Colab
print(pd.json_normalize(res_monthly).to_string(index=False), "\n")


# 2. Top Customers by Spend (Using dim_customers)
print("2. Top 5 Customers by Spend (MongoDB)")
pipeline_top_cust = [
    {"$sort": {"monetary": -1}},
    {"$limit": 5},
    {"$project": {"Customer ID": 1, "monetary": 1, "customer_segment": 1, "_id": 0}}
]
res_top_cust = list(db.dim_customers.aggregate(pipeline_top_cust))
print(pd.DataFrame(res_top_cust).to_string(index=False), "\n")


# 3. Top Products by Revenue (Using dim_products)
print("3. Top 5 Products by Revenue (MongoDB)")
pipeline_top_prod = [
    {"$sort": {"total_product_revenue": -1}},
    {"$limit": 5},
    {"$project": {"Description": 1, "total_product_revenue": 1, "total_quantity_sold": 1, "_id": 0}}
]
res_top_prod = list(db.dim_products.aggregate(pipeline_top_prod))
print(pd.DataFrame(res_top_prod).to_string(index=False), "\n")


# 4. Country-Level Sales Analysis (Using fact_invoices)
print("4. Top 5 Countries by Sales (MongoDB)")
pipeline_country = [
    {"$group": {
        "_id": "$Country",
        "total_sales": {"$sum": "$total_invoice_revenue"},
        "invoice_count": {"$sum": 1}
    }},
    {"$sort": {"total_sales": -1}},
    {"$limit": 5}
]
res_country = list(db.fact_invoices.aggregate(pipeline_country))
print(pd.DataFrame(res_country).to_string(index=False), "\n")


# 5. Customer Segmentation Distribution (Using dim_customers)
print("5. Customer Segmentation Value (MongoDB)")
pipeline_segments = [
    {"$group": {
        "_id": "$customer_segment",
        "customer_count": {"$sum": 1},
        "total_segment_revenue": {"$sum": "$monetary"}
    }},
    {"$sort": {"total_segment_revenue": -1}}
]
res_segments = list(db.dim_customers.aggregate(pipeline_segments))
print(pd.DataFrame(res_segments).to_string(index=False), "\n")

# Task 7 Performance Optimization

# 7.1 Partitioning strategies

In [None]:
# Data is partitioned by year and month to improve query performance.
# Spark reads only required partitions during analysis.
bronze_df.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("/content/bronze")

# 7.2 Caching and persistence

In [None]:
from pyspark import StorageLevel

# Persist dataframe in memory and disk(storage strategy Spark uses when we persist/cache a DataFrame )
# MEMORY_AND_DISK avoids recomputation and prevents memory overflow errors
df_feat.persist(StorageLevel.MEMORY_AND_DISK)

# Force Spark to materialize cache immediately
# Otherwise caching is lazy and may not activate when expected
df_feat.count()


# 7.3 Broadcast Join (Shuffle Reduction)

In [None]:
from pyspark.sql.functions import broadcast

# Broadcast join optimization
# basket_df is smaller than df_feat,broadcasting the smaller DataFrame allows Spark to replicate it across executors rather than shuffling both datasets avoids expensive shuffle operations
#,which significantly improves join performance.

df_feat = df_feat.join(basket_df, on="Invoice", how="left")

