### **Northwind** (Silver Transformation)

In [1]:
# Import Requirements
import json
import re
from notebookutils import mssparkutils
from pyspark.sql import SparkSession, DataFrame, functions as F, types as T
from delta.tables import DeltaTable
from typing import Sequence, Optional
from pyspark.sql import DataFrame

StatementMeta(, e13babbf-b4f9-453a-9ea6-e7a864ca07a4, 3, Finished, Available, Finished)

#### **Get Bronze Metadata**
Receives Bronze Table Metadata from Copy Activity via _Data Factory Pipeline_

In [3]:
# Paramameterized Value
# bronze_metadata = ""

StatementMeta(, c60f3891-3d52-47e3-bfea-06ecbbdf6fa6, 5, Finished, Available, Finished)

In [4]:
def clean_json(payload: str) -> list:
    """Parse & un-escape incoming JSON array"""
    raw = payload.strip()
    if raw.startswith('"') and raw.endswith('"'):
        raw = raw[1:-1].replace('\\"', '"') #Replaces the \" with "
    records = json.loads(raw)
    return records


StatementMeta(, c60f3891-3d52-47e3-bfea-06ecbbdf6fa6, 6, Finished, Available, Finished)

In [None]:
# STEP 1: Parse & Normalize Incoming Metadata
parsed = clean_json(bronze_metadata)

# STEP 2: Unwrap nested BronzeMetadata
records = [entry["BronzeMetadata"] for entry in parsed if "BronzeMetadata" in entry]


In [6]:
#In case debugging is needed 
# print(parsed)
# print('//////////////////////////////////////////////////////////')
# print(records)

StatementMeta(, c60f3891-3d52-47e3-bfea-06ecbbdf6fa6, 8, Finished, Available, Finished)

#### **Data Transformation**
Custom Cleaning Functions for Each Table

In [10]:
def clean_categories(raw_df):
    # Capitalize the first character in column: 'Description'
    raw_df = raw_df.withColumn('Description', F.initcap(F.col('Description')))
    # Drop column: 'Picture'
    raw_df = raw_df.drop('Picture')
    return raw_df

def clean_customers(raw_df):
    # Capitalize the first character in column: 'CompanyName'
    raw_df = raw_df.withColumn('CompanyName', F.initcap(F.col('CompanyName')))
    # Capitalize the first character in column: 'Address'
    raw_df = raw_df.withColumn('Address', F.initcap(F.col('Address')))
    return raw_df

def clean_employees(raw_df):
    # Change column type to datetime64[ns] for column: 'BirthDate'
    raw_df = raw_df.withColumn('BirthDate', raw_df['BirthDate'].cast(T.TimestampType()))
    # Change column type to datetime64[ns] for column: 'HireDate'
    raw_df = raw_df.withColumn('HireDate', raw_df['HireDate'].cast(T.TimestampType()))
    # Rename column 'ReportsTo' to 'Manager'
    raw_df = raw_df.withColumnRenamed('ReportsTo', 'Manager')
    # Drop columns: 'PhotoPath', 'Photo'
    raw_df = raw_df.drop('PhotoPath', 'Photo')
    return raw_df

def clean_employee_territories(raw_df):
    # Change column type to int64 for column: 'TerritoryID'
    raw_df = raw_df.withColumn('TerritoryID', raw_df['TerritoryID'].cast(T.LongType()))
    return raw_df

def clean_order_details(raw_df):
    # Change column type to float64 for column: 'UnitPrice'
    raw_df = raw_df.withColumn('UnitPrice', raw_df['UnitPrice'].cast(T.DoubleType()))
    # Round column 'UnitPrice' (Number of decimals: 2)
    raw_df = raw_df.withColumn('UnitPrice', F.round(F.col('UnitPrice'), 2))
    return raw_df

def clean_orders(raw_df):
    # Change column type to datetime64[ns] for columns: 'OrderDate', 'RequiredDate', 'ShippedDate'
    raw_df = raw_df.withColumn('OrderDate', raw_df['OrderDate'].cast(T.TimestampType()))
    raw_df = raw_df.withColumn('RequiredDate', raw_df['RequiredDate'].cast(T.TimestampType()))
    raw_df = raw_df.withColumn('ShippedDate', raw_df['ShippedDate'].cast(T.TimestampType()))
    # Rename column 'Freight' to 'FreightCosts'
    raw_df = raw_df.withColumnRenamed('Freight', 'FreightCosts')
    # Change column type to float64 for column: 'FreightCosts'
    raw_df = raw_df.withColumn('FreightCosts', raw_df['FreightCosts'].cast(T.DoubleType()))
    # Round column 'FreightCosts' (Number of decimals: 2)
    raw_df = raw_df.withColumn('FreightCosts', F.round(F.col('FreightCosts'), 2))
    # Capitalize the first character in column: 'ShipName'
    raw_df = raw_df.withColumn('ShipName', F.initcap(F.col('ShipName')))
    # Capitalize the first character in column: 'ShipAddress'
    raw_df = raw_df.withColumn('ShipAddress', F.initcap(F.col('ShipAddress')))
    return raw_df

def clean_products(raw_df):
    # Capitalize the first character in column: 'ProductName'
    raw_df = raw_df.withColumn('ProductName', F.initcap(F.col('ProductName')))
    # Change column type to float64 for column: 'UnitPrice'
    raw_df = raw_df.withColumn('UnitPrice', raw_df['UnitPrice'].cast(T.DoubleType()))
    return raw_df

def clean_region(raw_df):
    return raw_df

def clean_shippers(raw_df):
    return raw_df

def clean_suppliers(raw_df):
    # Capitalize the first character in column: 'Address'
    raw_df = raw_df.withColumn('Address', F.initcap(F.col('Address')))
    # Drop column: 'HomePage'
    raw_df = raw_df.drop('HomePage')
    return raw_df

def clean_territories(raw_df):
    # Change column type to int64 for column: 'TerritoryID'
    raw_df = raw_df.withColumn('TerritoryID', raw_df['TerritoryID'].cast(T.LongType()))
    return raw_df

StatementMeta(, ea8d1e65-c08b-44cb-a810-194b6d1f5fe0, 12, Finished, Available, Finished)

In [8]:
cleaning_functions = {
    "Categories": clean_categories,
    "Customers": clean_customers,
    "Employees": clean_employees,
    "EmployeeTerritories": clean_employee_territories,
    "OrderDetails": clean_order_details,
    "Orders": clean_orders,
    "Products": clean_products,
    "Region": clean_region,
    "Shippers": clean_shippers,
    "Suppliers": clean_suppliers,
    "Territories": clean_territories
}


StatementMeta(, c60f3891-3d52-47e3-bfea-06ecbbdf6fa6, 10, Finished, Available, Finished)

#### **Data Loading function**

In [9]:
def upsert(dataframe: DataFrame, table_name: str, key_cols: list[str], partition_cols: list[str] = None, schema_name: str = None):
    """
    Upsert into a Delta table in Microsoft Fabric Lakehouse.

    If schema_name is provided, uses schema.table_name format and ensures the schema exists.
    """

    # Compose full name and path
    if schema_name:
        full_table_name = f"{schema_name}.{table_name}"
        table_path = f"Tables/{schema_name}/{table_name}"
        spark.sql(f"CREATE DATABASE IF NOT EXISTS {schema_name}")
    else:
        full_table_name = table_name
        table_path = f"Tables/{table_name}"

    # Drop Duplicates
    dataframe = dataframe.dropDuplicates(key_cols)

    if DeltaTable.isDeltaTable(spark, table_path):
        delta_table = DeltaTable.forPath(spark, table_path)
        merge_condition = " AND ".join([f"target.{col} = source.{col}" for col in key_cols])

        delta_table.alias("target") \
            .merge(
                source=dataframe.alias("source"),
                condition=merge_condition
            ) \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()

        print(f"Upserted into existing table: {full_table_name}")
    else:
        writer = dataframe.write.format("delta").mode("overwrite")
        if partition_cols:
            writer = writer.partitionBy(*partition_cols)

        writer.saveAsTable(full_table_name)
        print(f"Created new table: {full_table_name}")


StatementMeta(, c60f3891-3d52-47e3-bfea-06ecbbdf6fa6, 11, Finished, Available, Finished)

#### **Silver Ingestion**

In [None]:
for rec in records:

    if rec.get("BronzeStatus") != "Success":
        rec["SilverStatus"] = "Skipped"
        continue

    try:
        # Metadata
        DatabaseName = rec["DatabaseName"].replace("_", "").replace("-", "")
        TableName = rec["TableName"].strip().replace(" ", "") #Example: Remove Space in "Order Details"
        TableSchema = rec["TableSchema"]
        KeyColumn = rec["KeyColumn"]
        DataDomain = str(rec["DataDomain"]).lower()

        # Read raw file
        df = spark.read.parquet(f"Files/{rec['BronzeFolderPath']}")
        df = df.dropDuplicates(rec["KeyColumn"].split("|")) 

        # Apply matching cleaning function
        cleaning_fn = cleaning_functions.get(TableName)
        if cleaning_fn:
            df = cleaning_fn(df)  # Apply it only if exists
        else:
            print(f"⚠️ No cleaning function found for {TableName}, using raw DataFrame.")

        # Partition logic
        partition_cols = None
        if rec.get("PartitionColumn"):
            part_col = rec["PartitionColumn"]
            part_type = str(rec.get("PartitionType", "")).strip().lower()

            if part_type == "date":
                df = df.withColumn(part_col, F.to_date(F.col(part_col)))
                df = df.withColumn("Year", F.year(F.col(part_col)))
                df = df.withColumn("Month", F.month(F.col(part_col)))
                df = df.withColumn("Day", F.dayofmonth(F.col(part_col)))
                partition_cols = ["Year", "Month", "Day"]

            elif part_type == "categorical":
                partition_cols = [part_col]

        # Upsert to Silver
        upsert(
            dataframe=df,
            table_name=TableName,
            key_cols=KeyColumn.split("|"),
            partition_cols=partition_cols,
            schema_name=DataDomain 
        )

        rec["SilverStatus"] = "Success"
        rec["SilverFolderPath"] = f"Tables/{DataDomain}/{TableName}"

    except Exception as e:
        rec["SilverStatus"] = f"Failed: {str(e)}"


#### **Notebook Return Output**

In [None]:
# Optional: pretty-print the output
print("Returning payload →", json.dumps(records, indent=2))

# Return full JSON for pipeline/logging use
mssparkutils.notebook.exit(json.dumps(records, indent=2))