In [0]:
%pip install openpyxl

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import (
    col, regexp_extract, regexp_replace, when, lit, array, explode, trim, upper,
    to_date, round as spark_round, year, array_remove, concat_ws, size, expr
)
import pandas as pd
from typing import Tuple
import logging

logger = logging.getLogger("etl")
logger.setLevel(logging.INFO)

# ======================================================
#                    READERS
# ======================================================

def read_customers(spark: SparkSession, path: str) -> DataFrame:
    """
    Reads customer data from Excel (or CSV) using pandas
    and converts it into a Spark DataFrame.
    """
    pdf = pd.read_excel(path, engine="openpyxl")
    pdf["phone"] = pdf["phone"].astype(str)   # ensure phone column is string
    return spark.createDataFrame(pdf)


def read_products(spark: SparkSession, path: str) -> DataFrame:
    """Reads product CSV into Spark DataFrame."""
    pdf = pd.read_csv(path)
    return spark.createDataFrame(pdf)


def read_orders(spark: SparkSession, path: str) -> DataFrame:
    """Reads orders JSON into Spark DataFrame."""
    pdf = pd.read_json(path)
    return spark.createDataFrame(pdf)


# ======================================================
#                 CLEANING LOGIC
# ======================================================

def clean_customers(df: DataFrame) -> Tuple[DataFrame, DataFrame]:
    """
    Cleans the customer dataset and generates a data-quality issues table.

    - Removes null/duplicate Customer ID/null customer name
    - Standardizes trimming/casting
    - Validates:
        * Customer Name (Unicode letters + space)
        * Email format
        * Phone format (4 strict patterns)
        * US postal code (ZIP / ZIP+4)
        * Region (North/East/South/West/Central)
    - Issues column contains array of issue codes (no nulls)
    """

    # --- Drop invalid ID rows ---
    df = df.dropna(subset=["Customer ID"]).dropDuplicates(["Customer ID"])

    # --- Normalize string fields ---
    string_cols = ["Customer Name", "email", "phone", "Postal Code", "Region"]
    for c in string_cols:
        df = df.withColumn(c, trim(col(c).cast("string")))

    # --- Build validation issues array ---
    df = df.withColumn(
        "Issues",
        array(
            when(col("Customer Name").isNull() | (col("Customer Name") == ""), lit("missing_name")),
            when(~col("Customer Name").rlike(r"^[\p{L} ]+$"), lit("invalid_name")),
            when(~col("email").rlike(r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$"), lit("invalid_email")),
            when(
                ~col("phone").rlike(
                    r"^(\d{10}|\(\d{3}\)-\d{3}-\d{4}|\d{3}\.\d{3}\.\d{4}|\d{3}-\d{3}-\d{4})$"
                ),
                lit("invalid_phone")
            ),
            when(~col("Postal Code").rlike(r"^\d{5}(-\d{4})?$"), lit("invalid_postal")),
            when(~upper(col("Region")).isin(["NORTH", "SOUTH", "EAST", "WEST", "CENTRAL"]), lit("invalid_region"))
        )
    )

    # --- Remove null entries from Issues array ---
    df = df.withColumn("Issues", expr("filter(Issues, x -> x IS NOT NULL)"))

    # --- Build issue report subset ---
    issue_report = (
        df.select(
            col("Customer ID").alias("CustomerID"),
            col("Customer Name").alias("CustomerName"),
            col("email").alias("Email"),
            col("phone").alias("Phone"),
            col("Postal Code").alias("PostalCode"),
            col("Region"),
            col("Issues")
        )
        .where(size(col("Issues")) > 0)
    )

    # impute missing value for "Customer Name" from email column 
    df = df.withColumn(
        "Customer Name",
        when(
            col("Customer Name").isNull() | (trim(col("Customer Name")) == ""),
            concat_ws(
                "",
                lit("IMPUTED_"),
                regexp_extract(col("email"), r"^([^@]+)", 1)
            )
        ).otherwise(col("Customer Name"))
    )

    return df, issue_report


def clean_products(df: DataFrame) -> Tuple[DataFrame, DataFrame]:
    """Cleans product data and returns (clean_df, issues_df)."""

    df = df.dropna(subset=["Product ID"]).dropDuplicates(["Product ID"])

    # Normalize price column name
    for c in df.columns:
        if c.lower().strip() in ("price per product", "price", "unitprice", "unit_price"):
            df = df.withColumnRenamed(c, "UnitPrice")
            break

    df = df.withColumn("UnitPrice", col("UnitPrice").cast("double"))

    # Identify invalid prices
    df_issues = df.filter(col("UnitPrice") < 0).withColumn(
        "Issues", array(lit("negative_unit_price"))
    )

    df_clean = df.filter(col("UnitPrice") >= 0)
    return df_clean, df_issues


def clean_orders(df_orders: DataFrame, df_customers: DataFrame, df_products: DataFrame) -> Tuple[DataFrame, DataFrame]:
    """Clean orders dataset, detect invalid Product/Customer IDs, and generate issue report."""

    # Drop rows with nulls in required columns
    df = df_orders.dropna(
        subset=["Order ID", "Customer ID", "Product ID", "Profit", "Order Date"]
    )

    # Drop duplicate Order ID
    df = df.dropDuplicates(["Order ID"])
    
    # Convert dates
    df = df.withColumn("OrderDate", to_date(col("Order Date").cast("string"), "d/M/yyyy"))
    df = df.withColumn("ShipDate", to_date(col("Ship Date").cast("string"), "d/M/yyyy"))

    # Get valid IDs from input DataFrames
    valid_customer_ids = [row["Customer ID"] for row in df_customers.select("Customer ID").distinct().collect()]
    valid_product_ids = [row["Product ID"] for row in df_products.select("Product ID").distinct().collect()]

    # Add Issues array
    df = df.withColumn(
        "Issues",
        array(
            when(~col("Customer ID").isin(valid_customer_ids), lit("invalid_customer_id")),
            when(~col("Product ID").isin(valid_product_ids), lit("invalid_product_id"))
        )
    )
    df = df.withColumn("Issues", expr("filter(Issues, x -> x IS NOT NULL)"))

    # Issue report: only rows with issues
    issue_report = (
        df.select(
            col("Order ID").alias("OrderID"),
            col("Customer ID").alias("CustomerID"),
            col("Product ID").alias("ProductID"),
            col("OrderDate"),
            col("ShipDate"),
            col("Issues")
        )
        .where(size(col("Issues")) > 0)
    )

    # Only keep valid orders
    df_clean = df.where(size(col("Issues")) == 0)

    return df_clean, issue_report


# ======================================================
#         DIMENSION TABLES & ENRICHMENT
# ======================================================

def build_customer_dim(df_customers: DataFrame) -> DataFrame:
    """Builds Customer dimension table."""

    for c in df_customers.columns:
        cc = c.strip().lower()
        if cc in ("customer id", "customer_id"):
            id_col = c
        if cc in ("customer name", "name", "customer_name"):
            name_col = c
        if cc in ("country", "country_name"):
            country_col = c

    return df_customers.select(
        col(id_col).alias("CustomerID"),
        col(name_col).alias("CustomerName"),
        col(country_col).alias("Country")
    )


def build_product_dim(df_products: DataFrame) -> DataFrame:
    """Builds Product dimension table."""
    return df_products.select(
        col("Product ID").alias("ProductID"),
        col("Category"),
        col("Sub-Category").alias("SubCategory")
    )


def build_orders_enriched(df_orders, df_cust_dim, df_prod_dim):
    """Joins orders with customer & product dimensions."""

    df = df_orders.withColumn(
        "ProfitRounded", spark_round(col("Profit").cast("double"), 2)
    )

    df = df.withColumnRenamed("Customer ID", "CustomerID") \
           .withColumnRenamed("Product ID", "ProductID")

    if "OrderDate" not in df.columns:
        df = df.withColumn(
            "OrderDate", to_date(col("Order Date").cast("string"), "d/M/yyyy")
        )

    df_joined = (
        df.join(df_cust_dim, "CustomerID", "left")
          .join(df_prod_dim, "ProductID", "left")
    )

    df_joined = df_joined.withColumn("Year", year(col("OrderDate")))

    select_cols = [
        "Order ID", "OrderDate", "ShipDate", "Ship Mode",
        "CustomerID", "CustomerName", "Country",
        "ProductID", "Category", "SubCategory",
        "Quantity", "Price", "Discount",
        "ProfitRounded", "Year"
    ]

    existing_cols = [c for c in select_cols if c in df_joined.columns]
    return df_joined.select(*existing_cols)


def aggregate_profit_by(df_enriched):
    """Aggregates total profit by Year, Category, SubCategory, CustomerName."""
    return (
        df_enriched.groupBy("Year", "Category", "SubCategory", "CustomerName")
            .sum("ProfitRounded")
            .withColumnRenamed("sum(ProfitRounded)", "TotalProfit")
            .withColumn("TotalProfit", spark_round(col("TotalProfit"), 2))
    )


# ======================================================
#                     WRITERS
# ======================================================

def write_parquet(df, path: str, mode: str = "overwrite"):
    """Writes a DataFrame to Parquet."""
    df.write.mode(mode).parquet(path)


# ======================================================
#                     MAIN ETL
# ======================================================

def run_etl(
    spark, customers_path, products_path, orders_path, output_base_location
):
    logger.info("Reading raw files...")
    df_customers = read_customers(spark, customers_path)
    df_products = read_products(spark, products_path)
    df_orders = read_orders(spark, orders_path)

    logger.info("Cleaning...")
    df_customers_clean, df_customer_issues = clean_customers(df_customers)
    df_products_clean, df_product_issues = clean_products(df_products)
    df_orders_clean, df_order_issues = clean_orders(
        df_orders, df_customers_clean, df_products_clean
    )

    logger.info("Building dimensions...")
    df_customer_dim = build_customer_dim(df_customers_clean)
    df_product_dim = build_product_dim(df_products_clean)

    logger.info("Enriching orders...")
    df_orders_enriched = build_orders_enriched(
        df_orders_clean, df_customer_dim, df_product_dim
    )

    logger.info("Writing cleaned outputs...")
    write_parquet(df_customer_issues, f"{output_base_location}/customer_data_issues")
    write_parquet(df_product_issues, f"{output_base_location}/product_data_issues")
    write_parquet(df_order_issues, f"{output_base_location}/order_data_issues")
    write_parquet(df_customer_dim, f"{output_base_location}/customers")
    write_parquet(df_product_dim, f"{output_base_location}/products")
    write_parquet(df_orders_enriched, f"{output_base_location}/orders_enriched")

    logger.info("Aggregating...")
    df_agg = aggregate_profit_by(df_orders_enriched)
    write_parquet(df_agg, f"{output_base_location}/aggregates")

    return {
        "customers": df_customer_dim,
        "products": df_product_dim,
        "orders_enriched": df_orders_enriched,
        "customer_data_issues": df_customer_issues,
        "product_data_issues": df_product_issues,
        "order_data_issues": df_order_issues,
        "aggregates": df_agg
    }

In [0]:
customers_file = "/Workspace/Users/kalpitjoshi19@gmail.com/PEI_test/input_files/Customer.xlsx"
products_file = "/Workspace/Users/kalpitjoshi19@gmail.com/PEI_test/input_files/Products.csv"
orders_file = "/Workspace/Users/kalpitjoshi19@gmail.com/PEI_test/input_files/Orders.json"
output_base_location = "/Volumes/pei/default/output"

In [0]:
run_etl(spark, customers_file, products_file, orders_file, output_base_location)

{'customers': DataFrame[CustomerID: string, CustomerName: string, Country: string],
 'products': DataFrame[ProductID: string, Category: string, SubCategory: string],
 'orders_enriched': DataFrame[Order ID: string, OrderDate: date, ShipDate: date, Ship Mode: string, CustomerID: string, CustomerName: string, Country: string, ProductID: string, Category: string, SubCategory: string, Quantity: bigint, Price: double, Discount: double, ProfitRounded: double, Year: int],
 'customer_data_issues': DataFrame[CustomerID: string, CustomerName: string, Email: string, Phone: string, PostalCode: string, Region: string, Issues: array<string>],
 'product_data_issues': DataFrame[Product ID: string, Category: string, Sub-Category: string, Product Name: string, State: string, UnitPrice: double, Issues: array<string>],
 'order_data_issues': DataFrame[OrderID: string, CustomerID: string, ProductID: string, OrderDate: date, ShipDate: date, Issues: array<string>],
 'aggregates': DataFrame[Year: int, Category:

In [0]:
# List files in output directory
dbutils.fs.ls("/Volumes/pei/default/output")

# Data Issue Tables
df_customer_data_issues = spark.read.parquet("/Volumes/pei/default/output/customer_data_issues")
displayHTML(f"<h2>Customer Data Issues (Sample) - Total: {df_customer_data_issues.count()}</h2>")
display(df_customer_data_issues.limit(5))

df_product_data_issues = spark.read.parquet("/Volumes/pei/default/output/product_data_issues")
displayHTML(f"<h2>Product Data Issues (Sample) - Total: {df_product_data_issues.count()}</h2>")
display(df_product_data_issues.limit(5))

df_order_data_issues = spark.read.parquet("/Volumes/pei/default/output/order_data_issues")
displayHTML(f"<h2>Order Data Issues (Sample) - Total: {df_order_data_issues.count()}</h2>")
display(df_order_data_issues.limit(5))

# Dimension Tables
df_customers_dim = spark.read.parquet("/Volumes/pei/default/output/customers")
displayHTML(f"<h2>Customer Dimension (Sample) - Total: {df_customers_dim.count()}</h2>")
display(df_customers_dim.limit(5))

df_products_dim = spark.read.parquet("/Volumes/pei/default/output/products")
displayHTML(f"<h2>Product Dimension (Sample) - Total: {df_products_dim.count()}</h2>")
display(df_products_dim.limit(5))

# Fact Table
df_orders_enriched = spark.read.parquet("/Volumes/pei/default/output/orders_enriched")
displayHTML(f"<h2>Orders Enriched (Sample) - Total: {df_orders_enriched.count()}</h2>")
display(df_orders_enriched.limit(5))

# Aggregate Table
df_agg = spark.read.parquet("/Volumes/pei/default/output/aggregates")
displayHTML(f"<h2>Aggregates (Sample) - Total: {df_agg.count()}</h2>")
display(df_agg.limit(5))

CustomerID,CustomerName,Email,Phone,PostalCode,Region,Issues
RD-19585,Rob Dowd,danielleware947@gmail.com,#ERROR!,52001,Central,List(invalid_phone)
MM-18055,Michelle Moray,andrewhays420@gmail.com,#ERROR!,94110,West,List(invalid_phone)
FO-14305,Frank Olsen,kendraholder796@gmail.com,#ERROR!,19143,East,List(invalid_phone)
MP-17470,Mark Packer,nicholasrobinson191@gmail.com,001-806-411-6730x95406,10035,East,List(invalid_phone)
CM-12115,Chad McGuire,sharonwarner980@gmail.com,033.934.6095x127,10011,East,List(invalid_phone)


Product ID,Category,Sub-Category,Product Name,State,UnitPrice,Issues


OrderID,CustomerID,ProductID,OrderDate,ShipDate,Issues
CA-2017-144365,CS-11950,OFF-FA-10000735,2017-10-24,2017-10-30,List(invalid_product_id)
CA-2017-146269,MH-17455,OFF-AR-10004790,2017-10-06,2017-10-06,List(invalid_product_id)
CA-2017-128335,JA-15970,OFF-EN-10001539,2017-09-29,2017-10-05,List(invalid_product_id)
CA-2015-164441,AC-10450,OFF-BI-10001922,2015-11-08,2015-11-13,List(invalid_product_id)
CA-2017-115602,DJ-13630,OFF-AR-10002280,2017-12-18,2017-12-24,List(invalid_product_id)


CustomerID,CustomerName,Country
RD-19585,Rob Dowd,United States
MM-18055,Michelle Moray,United States
FO-14305,Frank Olsen,United States
MP-17470,Mark Packer,United States
CM-12115,Chad McGuire,United States


ProductID,Category,SubCategory
OFF-ST-10001128,Office Supplies,Storage
OFF-BI-10001249,Office Supplies,Binders
OFF-PA-10004243,Office Supplies,Paper
OFF-PA-10001800,Office Supplies,Paper
OFF-PA-10001790,Office Supplies,Paper


Order ID,OrderDate,ShipDate,Ship Mode,CustomerID,CustomerName,Country,ProductID,Category,SubCategory,Quantity,Price,Discount,ProfitRounded,Year
CA-2017-135692,2017-04-27,2017-05-01,Standard Class,CV-12805,Cynthia Voltz,United States,OFF-LA-10001158,Office Supplies,Labels,4,33.12,0.2,11.59,2017
CA-2016-124016,2016-09-23,2016-09-26,Second Class,JS-15940,Joni Sundaresam,United States,OFF-PA-10002586,Office Supplies,Paper,3,11.95,0.2,4.03,2016
CA-2017-104864,2017-11-18,2017-11-23,Second Class,JS-15685,Jim Sink,United States,OFF-BI-10001636,Office Supplies,Binders,8,20.23,0.7000000000000001,-16.19,2017
CA-2014-107181,2014-02-04,2014-02-08,Standard Class,DB-13270,Deborah Brumfield,United States,OFF-PA-10000350,Office Supplies,Paper,4,34.24,0.0,16.09,2014
CA-2015-109470,2015-12-31,2016-01-03,Second Class,KC-16255,Karen Carlisle,United States,OFF-BI-10003707,Office Supplies,Binders,5,76.0,0.0,38.15,2015


Year,Category,SubCategory,CustomerName,TotalProfit
2014,Technology,Accessories,Raymond Messe,7.62
2017,Furniture,Bookcases,Christine Sundaresam,9.09
2015,Office Supplies,Storage,Trudy Brown,2.44
2014,Office Supplies,Binders,Tony Chapman,-16.47
2017,Office Supplies,Art,Caroline Jumper,25.9


In [0]:
# Register the aggregate DataFrame as a SQL view
df_agg.createOrReplaceTempView("aggregates")

# Profit by Year
displayHTML("<h2 style='color:navy;font-size:24px;'>Profit by Year (Sample)</h2>")
display(
    spark.sql("""
        SELECT Year, ROUND(SUM(TotalProfit), 2) AS ProfitByYear
        FROM aggregates
        GROUP BY Year
        ORDER BY Year
    """).limit(5)
)

# Profit by Year + Product Category
displayHTML("<h2 style='color:navy;font-size:24px;'>Profit by Year and Product Category (Sample)</h2>")
display(
    spark.sql("""
        SELECT Year, Category, ROUND(SUM(TotalProfit), 2) AS ProfitByYearCategory
        FROM aggregates
        GROUP BY Year, Category
        ORDER BY Year, Category
    """).limit(5)
)

# Profit by Customer
displayHTML("<h2 style='color:navy;font-size:24px;'>Profit by Customer (Sample)</h2>")
display(
    spark.sql("""
        SELECT CustomerName, ROUND(SUM(TotalProfit), 2) AS ProfitByCustomer
        FROM aggregates
        GROUP BY CustomerName
        ORDER BY ProfitByCustomer DESC
    """).limit(5)
)

# Profit by Customer + Year
displayHTML("<h2 style='color:navy;font-size:24px;'>Profit by Customer and Year (Sample)</h2>")
display(
    spark.sql("""
        SELECT CustomerName, Year, ROUND(SUM(TotalProfit), 2) AS ProfitByCustomerYear
        FROM aggregates
        GROUP BY CustomerName, Year
        ORDER BY Year, CustomerName DESC
    """).limit(5)
)

Year,ProfitByYear
2014,24294.27
2015,26843.68
2016,45620.11
2017,34256.83


Year,Category,ProfitByYearCategory
2014,Furniture,2479.73
2014,Office Supplies,9244.02
2014,Technology,12570.52
2015,Furniture,3061.7
2015,Office Supplies,9692.4


CustomerName,ProfitByCustomer
Tamara Chand,8443.12
Adrian Barton,5492.89
Hunter Lopez,5185.18
Sanjit Engle,2316.24
Tom Boeckenhauer,2283.8


CustomerName,Year,ProfitByCustomerYear
_Mike Vitt 12313orini,2014,6.9
Zuschuss Donatelli,2014,16.01
Zuschuss Carroll,2014,-48.09
Yoseph Carroll,2014,792.27
Xylona Preis,2014,112.34
