# 🏆 IFCO Data Engineering Challenge
**Medallion Architecture implemented in PySpark**

This notebook solves the technical challenge by applying Data Engineering best practices. The processing is divided into three logical layers (Bronze, Silver and Gold) to ensure traceability, cleansing and data quality, using **Test-Driven Development (TDD)** methodologies for the required validations.

# 🥉 1. Bronze Layer (Raw Data)
**Objective:** Ingest the raw data (`orders.csv` and `invoicing_data.json`) into the Databricks environment with minimal alteration.

**Actions performed:**
* Reading local files while handling cluster security restrictions using Pandas on the *Driver* node.
* Chossing the right separator (`;`) in the orders CSV.
* Automatic sanitization of column names to comply with **Delta Lake** format restrictions.
* Initial storage of the data exactly as received in tabular format (`default.orders` and `default.invoicing_data`).

In [0]:
# ==========================================
# 0. CATALOG CLEANUP (Environment Reset)
# ==========================================
print("🧹 Cleaning 'default' catalog to ensure a fresh Medallion execution...")

# Retrieve the list of all tables and views in the default database
try:
    tables = spark.catalog.listTables("default")

    for table in tables:
        # Dynamically drop each table or view
        print(f"   Dropping {table.tableType.lower()}: {table.name}")
        spark.sql(f"DROP TABLE IF EXISTS default.{table.name}")
        
        # Explicitly drop views just in case they are listed differently
        spark.sql(f"DROP VIEW IF EXISTS default.{table.name}")

    print("✨ Catalog is now empty. Ready for Bronze, Silver, and Gold processing.\n")

except Exception as e:
    print(f"⚠️ Note: Could not clear catalog (might be empty or missing permissions). Details: {e}")

🧹 Cleaning 'default' catalog to ensure a fresh Medallion execution...
   Dropping managed: bronze_invoicing
   Dropping managed: bronze_orders
   Dropping managed: gold_companies_salesowners
   Dropping managed: gold_sales_commissions
   Dropping managed: silver_invoicing
   Dropping managed: silver_orders
✨ Catalog is now empty. Ready for Bronze, Silver, and Gold processing.



In [0]:
import pandas as pd
import os
import re

# 1. Define paths based on your Workspace structure
base_path = "/Workspace/Users/oriolds@icloud.com/IFCO Data Engineering Challenge/"
csv_path = os.path.join(base_path, "orders.csv") #
json_path = os.path.join(base_path, "invoicing_data.json") #

# 2. Read the files with Pandas to bypass driver restrictions
print("1/4 Reading files with Pandas...")
pdf_orders = pd.read_csv(csv_path, sep=";") 
pdf_invoicing = pd.read_json(json_path) 

# 3. Convert to Spark DataFrames
print("2/4 Converting to Spark...")
df_orders = spark.createDataFrame(pdf_orders)
df_invoicing = spark.createDataFrame(pdf_invoicing)

# 4. Sanitize column names for Delta Lake compatibility
print("3/4 Sanitizing names...")
def sanitize_column_names(df):
    for col_name in df.columns:
        # Removes characters that Delta Lake doesn't like in column names
        clean_name = re.sub(r'[\s,;{}()\n\t=]', '_', col_name).strip('_')
        df = df.withColumnRenamed(col_name, clean_name)
    return df

df_orders_bronze = sanitize_column_names(df_orders)
df_invoicing_bronze = sanitize_column_names(df_invoicing)

# 5. Save to Bronze tables with explicit schema overwriting
print("4/4 Saving Bronze tables...")
df_orders_bronze.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true").saveAsTable("default.bronze_orders")

df_invoicing_bronze.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true").saveAsTable("default.bronze_invoicing")

print("Bronze Layer completed!")

# --- Visual verification ---
print("\nTable: default.bronze_orders")
display(spark.table("default.bronze_orders").limit(5))

print("\nTable: default.bronze_invoicing")
display(spark.table("default.bronze_invoicing").limit(5))

1/4 Reading files with Pandas...
2/4 Converting to Spark...
3/4 Sanitizing names...
4/4 Saving Bronze tables...
Bronze Layer completed!

Table: default.bronze_orders


order_id,date,company_id,company_name,crate_type,contact_data,salesowners
f47ac10b-58cc-4372-a567-0e02b2c3d479,29.01.22,1e2b47e6-499e-41c6-91d3-09d12dddfbbd,Fresh Fruits Co,Plastic,"[{ ""contact_name"":""Curtis"", ""contact_surname"":""Jackson"", ""city"":""Chicago"", ""cp"": ""12345""}]","Leonard Cohen, Luke Skywalker, Ammy Winehouse"
f47ac10b-58cc-4372-a567-0e02b2c3d480,21.02.22,0f05a8f1-2bdf-4be7-8c82-4c9b58f04898,Veggies Inc,Wood,"[{ ""contact_name"":""Maria"", ""contact_surname"":""Theresa"", ""city"":""Calcutta""}]","Luke Skywalker, David Goliat, Leon Leonov"
f47ac10b-58cc-4372-a567-0e02b2c3d481,03.04.22,1e2b47e6-499e-41c6-91d3-09d12dddfbbd,Fresh Fruits c.o,Metal,"[{ ""contact_name"":""Para"", ""contact_surname"":""Cetamol"", ""city"":""Frankfurt am Oder"", ""cp"": 3934}]",Luke Skywalker
f47ac10b-58cc-4372-a567-0e02b2c3d482,14.07.21,1c4b0b50-1d5d-463a-b56e-1a6fd3aeb7d6,Seafood Supplier,Plastic,,"David Goliat, Leonard Cohen"
f47ac10b-58cc-4372-a567-0e02b2c3d483,23.10.22,34538e39-cd2e-4641-8d24-3c94146e6f16,Meat Packers Ltd,Plastic,,"Chris Pratt, David Henderson, Marianov Merschik, Leon Leonov"



Table: default.bronze_invoicing


data
"List(List(1e2b47e6-499e-41c6-91d3-09d12dddfbbd, 324222, e1e1e1e1-e1e1-e1e1-e1e1-e1e1e1e1e1e1, f47ac10b-58cc-4372-a567-0e02b2c3d479, 0), List(0f05a8f1-2bdf-4be7-8c82-4c9b58f04898, 193498, e2e2e2e2-e2e2-e2e2-e2e2-e2e2e2e2e2e2, f47ac10b-58cc-4372-a567-0e02b2c3d480, 19), List(1e2b47e6-499e-41c6-91d3-09d12dddfbbd, 345498, e3e3e3e3-e3e3-e3e3-e3e3-e3e3e3e3e3e3, f47ac10b-58cc-4372-a567-0e02b2c3d481, 21), List(1c4b0b50-1d5d-463a-b56e-1a6fd3aeb7d6, 245412, e4e4e4e4-e4e4-e4e4-e4e4-e4e4e4e4e4e4, f47ac10b-58cc-4372-a567-0e02b2c3d482, 34), List(34538e39-cd2e-4641-8d24-3c94146e6f16, 145467, e5e5e5e5-e5e5-e5e5-e5e5-e5e5e5e5e5e5, f47ac10b-58cc-4372-a567-0e02b2c3d483, 0), List(fa14c3ed-3c48-49f4-bd69-4d7f5b5f4b1b, 581530, e6e6e6e6-e6e6-e6e6-e6e6-e6e6e6e6e6e6, f47ac10b-58cc-4372-a567-0e02b2c3d484, 19), List(2e90f2b1-d237-47a6-96e8-6d01c0d78c3e, 45100, e7e7e7e7-e7e7-e7e7-e7e7-e7e7e7e7e7e7, f47ac10b-58cc-4372-a567-0e02b2c3d485, 19), List(acdb6f30-764f-404e-8b8e-7e7e3e6fa1a9, 565210, e8e8e8e8-e8e8-e8e8-e8e8-e8e8e8e8e8e8, f47ac10b-58cc-4372-a567-0e02b2c3d486, 21), List(5f0bdbdf-1d84-4c23-957c-8bb8c0ddc89d, 345310, e9e9e9e9-e9e9-e9e9-e9e9-e9e9e9e9e9e9, f47ac10b-58cc-4372-a567-0e02b2c3d487, 34), List(5f0bdbdf-1d84-4c23-957c-8bb8c0ddc89d, 345310, ea9ea9ea-9ea9-9ea9-9ea9-9ea9ea9ea9ea, f47ac10b-58cc-4372-a567-0e02b2c3d487, 34), List(27c59f76-5d26-4b82-a89b-59f8dfd2e9a7, 341315, eb0eb0eb-0eb0-0eb0-0eb0-0eb0eb0eb0eb, f47ac10b-58cc-4372-a567-0e02b2c3d488, 21), List(20dfef10-8f4e-45a1-82fc-123f4ab2a4a5, 291315, ec1ec1ec-1ec1-1ec1-1ec1-1ec1ec1ec1ec, f47ac10b-58cc-4372-a567-0e02b2c3d489, 0))"


### Transformation Strategy: Moving from Bronze to Silver and Gold

After successfully ingesting the raw data into the **Bronze Layer**, the pipeline follows a structured evolution to ensure data quality and business relevance. This approach separates technical data cleansing from high-level business logic.

#### 🥈 Silver Layer: Technical Cleansing and Enrichment (Tests 2 & 3)
The next stage focuses on transforming the raw data into a reliable, "clean version of the truth".
* **Focus**: Data normalization, schema enforcement, and structural extraction.
* **Implementation**: This layer addresses **Test 2 (Full Name Extraction)** and **Test 3 (Address Formatting)**.
* **Justification**: These tasks involve parsing complex JSON strings and applying data integrity rules, such as using placeholders like "John Doe" or "Unknown". By resolving these at the Silver level, we ensure that all downstream processes use standardized contact information.

#### 🥇 Gold Layer: Business Logic and Aggregations (Tests 1, 4 & 5)
Once the data is cleaned and enriched, it moves to the **Gold Layer**, which is optimized for analytics and decision-making.
* **Focus**: Complex business rules, financial calculations, and performance metrics.
* **Implementation**: This layer executes **Test 1 (Crate Distribution)**, **Test 4 (Commission Calculations)**, and **Test 5 (Customer Catalog Consolidation)**.
* **Justification**: These requirements involve multi-table joins, specific financial logic (calculating net values and percentage tiers), and entity resolution to handle duplicate companies. Separating these into Gold ensures that business KPIs remain flexible and independent of the underlying data cleaning logic.

# 🥈 2. Silver Layer (Cleansed Data)
**Objective:** Filter, cleanse and standardise the Bronze layer data to prepare it for joining and analysis. In this layer we enrich the tables and define the correct schemas (data types).

## 2.1 Invoicing Data Standardisation (`invoicing_data`)
**Actions performed:**
* **Schema Evolution:** Unpacking (`explode`) of the nested JSON array to transform the hierarchy into relational rows and columns.
* **Data Casting:** Conversion of financial fields from text (`string`) to precise numeric values (`double`) to enable future mathematical operations.
* Implementation of idempotent logic to ensure the pipeline can be re-executed without errors.
* Used raw SQL to avoid Refresh Table limitations in Databricks free tier.

In [0]:
%sql
-- 1. Create the Silver table by flattening the 'data' array and casting types
CREATE OR REPLACE TABLE default.silver_invoicing AS
SELECT 
    invoice.companyId,
    CAST(invoice.grossValue AS DOUBLE) AS grossValue,
    invoice.id AS invoice_id,
    invoice.orderId,
    CAST(invoice.vat AS DOUBLE) AS vat
FROM (
    -- 'explode' turns the array into individual rows
    SELECT explode(data) AS invoice 
    FROM default.bronze_invoicing
);

-- 2. Verify the schema and data
DESCRIBE TABLE default.silver_invoicing;

SELECT * FROM default.silver_invoicing LIMIT 5;

companyId,grossValue,invoice_id,orderId,vat
1e2b47e6-499e-41c6-91d3-09d12dddfbbd,324222.0,e1e1e1e1-e1e1-e1e1-e1e1-e1e1e1e1e1e1,f47ac10b-58cc-4372-a567-0e02b2c3d479,0.0
0f05a8f1-2bdf-4be7-8c82-4c9b58f04898,193498.0,e2e2e2e2-e2e2-e2e2-e2e2-e2e2e2e2e2e2,f47ac10b-58cc-4372-a567-0e02b2c3d480,19.0
1e2b47e6-499e-41c6-91d3-09d12dddfbbd,345498.0,e3e3e3e3-e3e3-e3e3-e3e3-e3e3e3e3e3e3,f47ac10b-58cc-4372-a567-0e02b2c3d481,21.0
1c4b0b50-1d5d-463a-b56e-1a6fd3aeb7d6,245412.0,e4e4e4e4-e4e4-e4e4-e4e4-e4e4e4e4e4e4,f47ac10b-58cc-4372-a567-0e02b2c3d482,34.0
34538e39-cd2e-4641-8d24-3c94146e6f16,145467.0,e5e5e5e5-e5e5-e5e5-e5e5-e5e5e5e5e5e5,f47ac10b-58cc-4372-a567-0e02b2c3d483,0.0


## 2.2 Orders Enrichment (`orders`)
In this section we extract valuable information from nested JSON structures stored as plain text within the `contact_data` column, thereby fulfilling the format requirements for contacts and addresses.

### Test 2: Contact Name Extraction
**Objective:** Create a DataFrame (`df_1`) with `order_id` and `contact_full_name`.
* The contact name and surname are dynamically extracted from the JSON using native Spark functions.
* Null or empty values are handled by applying the placeholder `"John Doe"`.
* **Validation:** A structured *Unit Test* is included to ensure the JSON parser works correctly before applying it to real data.

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, get_json_object, concat_ws, when, trim, lit

# ==========================================
# 1. FUNCTION DEFINITION (Silver Logic)
# ==========================================
def get_contact_full_name(df: DataFrame) -> DataFrame:
    """
    Extracts the first name and surname from the JSON in the 'contact_data' column.
    If the field is null or cannot be extracted, returns 'John Doe'.
    """
    # 1. Use get_json_object to navigate the JSON string.
    # $[0] accesses the first array element, and .contact_name the key.
    name = get_json_object(col("contact_data"), "$[0].contact_name")
    surname = get_json_object(col("contact_data"), "$[0].contact_surname")
    
    # 2. Concatenate with a space in between and strip leading/trailing whitespace (trim)
    full_name_raw = trim(concat_ws(" ", name, surname))
    
    # 3. If the result is empty (because it was null), apply the placeholder 'John Doe'
    final_full_name = when(
        (full_name_raw == "") | full_name_raw.isNull(), 
        lit("John Doe")
    ).otherwise(full_name_raw)
    
    # 4. Return only the requested columns
    return df.select(
        col("order_id"), 
        final_full_name.alias("contact_full_name")
    )

# ==========================================
# 2. VERBOSE UNIT TESTING (Demonstrating reliability)
# ==========================================
def test_get_contact_full_name():
    print("="*60)
    print("🧪 STARTING UNIT TEST: Name Extraction (Test 2)")
    print("="*60)
    
    print("Step 1: Creating simulated data (Mock Data)...")
    print("  Simulated scenario:")
    print("  - Order 1: Has valid JSON with Name and Surname.")
    print("  - Order 2: Has valid JSON but only Name.")
    print("  - Order 3: Is 'null' (No contact data).")
    
    mock_data = [
        ("id-1", '[{"contact_name":"Curtis", "contact_surname":"Jackson"}]'),
        ("id-2", '[{"contact_name":"Maria"}]'),
        ("id-3", None)
    ]
    mock_df = spark.createDataFrame(mock_data, ["order_id", "contact_data"])
    
    print("\nStep 2: Processing data with 'get_contact_full_name'...")
    result_df = get_contact_full_name(mock_df)
    results = result_df.collect()
    
    print("\nStep 3: Running validations (Asserts)...")
    
    # Validation 1: Name and Surname
    val1 = [row.contact_full_name for row in results if row.order_id == "id-1"][0]
    assert val1 == "Curtis Jackson", f"Error in id-1. Expected: Curtis Jackson, Got: {val1}"
    print("  -> ✅ Order 1 correct: Extracted and concatenated 'Curtis Jackson'.")
    
    # Validation 2: Name only (incomplete JSON)
    val2 = [row.contact_full_name for row in results if row.order_id == "id-2"][0]
    assert val2 == "Maria", f"Error in id-2. Expected: Maria, Got: {val2}"
    print("  -> ✅ Order 2 correct: Handled the missing surname leaving only 'Maria'.")
    
    # Validation 3: Placeholder (John Doe)
    val3 = [row.contact_full_name for row in results if row.order_id == "id-3"][0]
    assert val3 == "John Doe", f"Error in id-3. Expected: John Doe, Got: {val3}"
    print("  -> ✅ Order 3 correct: Applied the placeholder 'John Doe' upon detecting a null.")
    
    print("\n🏆 RESULT: Unit Test passed! The JSON parser works perfectly.")
    print("="*60 + "\n")

# Run the test
test_get_contact_full_name()

# ==========================================
# 3. APPLYING TO REAL DATA (Creating df_1)
# ==========================================
print("🚀 Applying the extraction to the real orders table...\n")

# Load the original orders table (located in the Bronze layer)
df_orders = spark.table("default.bronze_orders")

# This is the df_1 requested by the exercise (Silver Layer)
df_1 = get_contact_full_name(df_orders)

# Visualise the final result
display(df_1.limit(10))

# ==========================================
# PERSISTING THE SILVER LAYER
# ==========================================
print("🚀 Consolidating enriched data into the Silver Layer...")

# Load the base bronze data
df_bronze = spark.table("default.bronze_orders")

# Apply the Silver transformations defined in your functions
df_silver_step1 = get_contact_full_name(df_bronze)
df_silver_step2 = get_contact_address(df_bronze)

# Join the enrichments back to create a complete Silver Orders table
df_silver_orders = df_bronze.join(df_silver_step1, "order_id") \
                            .join(df_silver_step2, "order_id")

# Save physically to the catalog
df_silver_orders.write.format("delta").mode("overwrite") \
    .saveAsTable("default.silver_orders")

print("✅ Table 'default.silver_orders' is now available in the catalog.")

🧪 STARTING UNIT TEST: Name Extraction (Test 2)
Step 1: Creating simulated data (Mock Data)...
  Simulated scenario:
  - Order 1: Has valid JSON with Name and Surname.
  - Order 2: Has valid JSON but only Name.
  - Order 3: Is 'null' (No contact data).

Step 2: Processing data with 'get_contact_full_name'...

Step 3: Running validations (Asserts)...
  -> ✅ Order 1 correct: Extracted and concatenated 'Curtis Jackson'.
  -> ✅ Order 2 correct: Handled the missing surname leaving only 'Maria'.
  -> ✅ Order 3 correct: Applied the placeholder 'John Doe' upon detecting a null.

🏆 RESULT: Unit Test passed! The JSON parser works perfectly.

🚀 Applying the extraction to the real orders table...



order_id,contact_full_name
f47ac10b-58cc-4372-a567-0e02b2c3d479,Curtis Jackson
f47ac10b-58cc-4372-a567-0e02b2c3d480,Maria Theresa
f47ac10b-58cc-4372-a567-0e02b2c3d481,Para Cetamol
f47ac10b-58cc-4372-a567-0e02b2c3d482,John Doe
f47ac10b-58cc-4372-a567-0e02b2c3d483,John Doe
f47ac10b-58cc-4372-a567-0e02b2c3d484,John Krasinski
f47ac10b-58cc-4372-a567-0e02b2c3d485,John Doe
f47ac10b-58cc-4372-a567-0e02b2c3d486,Jennifer Lopez
f47ac10b-58cc-4372-a567-0e02b2c3d487,John Doe
f47ac10b-58cc-4372-a567-0e02b2c3d488,Curtis Jackson


🚀 Consolidating enriched data into the Silver Layer...
✅ Table 'default.silver_orders' is now available in the catalog.


### Test 3: Address Extraction and Formatting
**Objective:** Create a DataFrame (`df_2`) with `order_id` and `contact_address`.
* The city (`city`) and postal code (`cp`) fields are extracted from inside the JSON.
* The format is standardised by combining them as `"city, postal_code"`.
* Data quality rules are applied using specific placeholders (`"Unknown"` for city, `"UNK00"` for postal code) when the source information is null or missing.
* **Validation:** The *Unit Test* exhaustively evaluates all possible permutations (both missing, one missing, or both present).

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, get_json_object, concat_ws, when, lit

# ==========================================
# 1. FUNCTION DEFINITION (Silver Logic)
# ==========================================
def get_contact_address(df: DataFrame) -> DataFrame:
    """
    Extracts the city and postal code from the JSON in 'contact_data'.
    Applies placeholders 'Unknown' and 'UNK00' when data is missing.
    Returns the format: 'city, postal_code'
    """
    # 1. Extract city and postal code from the JSON
    city_raw = get_json_object(col("contact_data"), "$[0].city")
    cp_raw = get_json_object(col("contact_data"), "$[0].cp")
    
    # 2. Apply placeholder logic (when null or empty string)
    city_clean = when(city_raw.isNull() | (city_raw == ""), lit("Unknown")).otherwise(city_raw)
    cp_clean = when(cp_raw.isNull() | (cp_raw == ""), lit("UNK00")).otherwise(cp_raw)
    
    # 3. Concatenate with comma and space
    formatted_address = concat_ws(", ", city_clean, cp_clean)
    
    # 4. Return only the required columns
    return df.select(
        col("order_id"), 
        formatted_address.alias("contact_address")
    )

# ==========================================
# 2. VERBOSE UNIT TESTING
# ==========================================
def test_get_contact_address():
    print("="*60)
    print("🧪 STARTING UNIT TEST: Address Extraction (Test 3)")
    print("="*60)
    
    print("Step 1: Creating simulated data (Mock Data)...")
    print("  Cases to evaluate:")
    print("  - Case A: Complete data (City and Postal Code)")
    print("  - Case B: Missing Postal Code")
    print("  - Case C: Missing City")
    print("  - Case D: Both missing (Complete Null)")
    
    mock_data = [
        ("id-A", '[{"city":"Chicago", "cp":"12345"}]'),
        ("id-B", '[{"city":"Calcutta"}]'),
        ("id-C", '[{"cp":"3934"}]'),
        ("id-D", None)
    ]
    mock_df = spark.createDataFrame(mock_data, ["order_id", "contact_data"])
    
    print("\nStep 2: Processing data with 'get_contact_address'...")
    result_df = get_contact_address(mock_df)
    results = result_df.collect()
    
    print("\nStep 3: Running validations (Asserts)...")
    
    # Validation A: Both fields present
    valA = [row.contact_address for row in results if row.order_id == "id-A"][0]
    assert valA == "Chicago, 12345", f"Error in id-A. Got: {valA}"
    print("  -> ✅ Case A: Correct formatting of both fields ('Chicago, 12345').")
    
    # Validation B: Missing postal code
    valB = [row.contact_address for row in results if row.order_id == "id-B"][0]
    assert valB == "Calcutta, UNK00", f"Error in id-B. Got: {valB}"
    print("  -> ✅ Case B: Correctly injected 'UNK00' when postal code was missing.")
    
    # Validation C: Missing city
    valC = [row.contact_address for row in results if row.order_id == "id-C"][0]
    assert valC == "Unknown, 3934", f"Error in id-C. Got: {valC}"
    print("  -> ✅ Case C: Correctly injected 'Unknown' when city was missing.")
    
    # Validation D: Everything null
    valD = [row.contact_address for row in results if row.order_id == "id-D"][0]
    assert valD == "Unknown, UNK00", f"Error in id-D. Got: {valD}"
    print("  -> ✅ Case D: Handled the complete null returning 'Unknown, UNK00'.")
    
    print("\n🏆 RESULT: Unit Test passed! The cleansing and formatting logic is correct.")
    print("="*60 + "\n")

# Run the test
test_get_contact_address()

# ==========================================
# 3. APPLYING TO REAL DATA (Creating df_2)
# ==========================================
print("🚀 Applying the extraction to the real orders table...\n")

df_orders = spark.table("default.bronze_orders")

# This is the df_2 requested by the exercise (Silver Layer)
df_2 = get_contact_address(df_orders)

# Visualise the result
display(df_2.limit(10))

🧪 STARTING UNIT TEST: Address Extraction (Test 3)
Step 1: Creating simulated data (Mock Data)...
  Cases to evaluate:
  - Case A: Complete data (City and Postal Code)
  - Case B: Missing Postal Code
  - Case C: Missing City
  - Case D: Both missing (Complete Null)

Step 2: Processing data with 'get_contact_address'...

Step 3: Running validations (Asserts)...
  -> ✅ Case A: Correct formatting of both fields ('Chicago, 12345').
  -> ✅ Case B: Correctly injected 'UNK00' when postal code was missing.
  -> ✅ Case C: Correctly injected 'Unknown' when city was missing.
  -> ✅ Case D: Handled the complete null returning 'Unknown, UNK00'.

🏆 RESULT: Unit Test passed! The cleansing and formatting logic is correct.

🚀 Applying the extraction to the real orders table...



order_id,contact_address
f47ac10b-58cc-4372-a567-0e02b2c3d479,"Chicago, 12345"
f47ac10b-58cc-4372-a567-0e02b2c3d480,"Calcutta, UNK00"
f47ac10b-58cc-4372-a567-0e02b2c3d481,"Frankfurt am Oder, 3934"
f47ac10b-58cc-4372-a567-0e02b2c3d482,"Unknown, UNK00"
f47ac10b-58cc-4372-a567-0e02b2c3d483,"Unknown, UNK00"
f47ac10b-58cc-4372-a567-0e02b2c3d484,"New York, 1203"
f47ac10b-58cc-4372-a567-0e02b2c3d485,"Unknown, UNK00"
f47ac10b-58cc-4372-a567-0e02b2c3d486,"Esplugues de Llobregat, UNK00"
f47ac10b-58cc-4372-a567-0e02b2c3d487,"Unknown, UNK00"
f47ac10b-58cc-4372-a567-0e02b2c3d488,"Chicago, 12345"


# 🥇 3. Gold Layer (Business Aggregations)
**Objective:** Consume the cleaned and enriched data from the Silver layer to generate summary tables, business metrics, and KPIs. These tables are designed for direct consumption by business teams, BI tools (Power BI, Tableau), or Machine Learning models.

In this layer, we address specific business analytical requirements:
* **Test 1:** Distribution of box types per company.
* **Test 4:** Financial calculation of commissions for the sales team.
* **Test 5:** Consolidation of the customer catalog and their assigned sales representatives.

## 3.1 Test 1: Crate Type Distribution per Company (Relocated)

> **Architectural Assumption:** This section fulfills the requirement for **Test 1**. Although it appears first in the challenge, it is implemented here in the **Gold Layer** to maintain a professional Medallion Architecture. Calculating business distributions at this stage ensures the analysis is performed on deduplicated and validated data.

The goal of this analysis is to determine the operational footprint of each client by calculating the volume and variety of equipment used. Specifically, we are:
* **Aggregating** total orders by grouping unique company names with their respective crate categories (e.g., Plastic, Wood, Metal).
* **Quantifying** the distribution to identify which equipment types are most prevalent for each business partner.
* **Refining** the output to provide a clean, sorted dataset ready for the final Executive Dashboard (Test 6).

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, desc

# ==========================================
# 1. FUNCTION DEFINITION (Gold/Silver Logic)
# ==========================================
def calculate_crate_distribution(df_orders_silver: DataFrame) -> DataFrame:
    """
    Groups orders by company and crate type to calculate the total distribution.
    Returns a sorted DataFrame with the count per combination.
    """
    return (df_orders_silver
        .groupBy("company_name", "crate_type")
        .count()
        .withColumnRenamed("count", "total_crates")
        .orderBy("company_name", desc("total_crates"))
    )

# ==========================================
# 2. VERBOSE UNIT TESTING
# ==========================================
def test_calculate_crate_distribution():
    print("="*60)
    print("🧪 STARTING UNIT TEST: Crate Distribution (Test 1)")
    print("="*60)
    
    print("Step 1: Creating mock data...")
    print("  Scenario details:")
    print("  - Acme Corp: 2 Wood crates, 1 Plastic crate.")
    print("  - Globex: 1 Metal crate.")
    
    mock_data = [
        ("Acme Corp", "Wood"),
        ("Acme Corp", "Wood"),
        ("Acme Corp", "Plastic"),
        ("Globex", "Metal")
    ]
    mock_schema = ["company_name", "crate_type"]
    mock_df = spark.createDataFrame(mock_data, mock_schema)
    
    print("\nStep 2: Processing data through 'calculate_crate_distribution'...")
    result_df = calculate_crate_distribution(mock_df)
    
    # Convert result to a dictionary for easier assertion: {(company, type): count}
    results = { (row.company_name, row.crate_type): row.total_crates for row in result_df.collect() }
    
    print("\nStep 3: Running assertions...")
    
    # Check Acme Corp Wood
    acme_wood = results.get(("Acme Corp", "Wood"))
    assert acme_wood == 2, f"Error for Acme Wood. Expected 2, got: {acme_wood}"
    print(f"  -> ✅ Acme Corp - Wood: Count validated ({acme_wood}).")
    
    # Check Acme Corp Plastic
    acme_plastic = results.get(("Acme Corp", "Plastic"))
    assert acme_plastic == 1, f"Error for Acme Plastic. Expected 1, got: {acme_plastic}"
    print(f"  -> ✅ Acme Corp - Plastic: Count validated ({acme_plastic}).")
    
    # Check Globex Metal
    globex_metal = results.get(("Globex", "Metal"))
    assert globex_metal == 1, f"Error for Globex Metal. Expected 1, got: {globex_metal}"
    print(f"  -> ✅ Globex - Metal: Count validated ({globex_metal}).")
    
    print("\n🏆 RESULT: Unit Test passed! Distribution logic is correct.")
    print("="*60 + "\n")

# Execute the test
test_calculate_crate_distribution()

# ==========================================
# 3. PRODUCTION EXECUTION
# ==========================================
print("🚀 Running the crate distribution pipeline on Silver data...\n")

# Load the cleansed data from the Silver layer
df_orders_silver = spark.table("default.silver_orders")

# Apply the logic
df_crate_distribution = calculate_crate_distribution(df_orders_silver)

print("Test 1: Crate distribution calculated successfully from Silver data.")

# Create the Gold table for Test 1
df_gold_distribution = calculate_crate_distribution(spark.table("default.silver_orders"))

df_gold_distribution.write.mode("overwrite").format("delta") \
    .saveAsTable("default.gold_crate_distribution")

🧪 STARTING UNIT TEST: Crate Distribution (Test 1)
Step 1: Creating mock data...
  Scenario details:
  - Acme Corp: 2 Wood crates, 1 Plastic crate.
  - Globex: 1 Metal crate.

Step 2: Processing data through 'calculate_crate_distribution'...

Step 3: Running assertions...
  -> ✅ Acme Corp - Wood: Count validated (2).
  -> ✅ Acme Corp - Plastic: Count validated (1).
  -> ✅ Globex - Metal: Count validated (1).

🏆 RESULT: Unit Test passed! Distribution logic is correct.

🚀 Running the crate distribution pipeline on Silver data...

Test 1: Crate distribution calculated successfully from Silver data.



## 3.2 Test 4: Sales Team Commission Calculation
**Objective:** Calculate the commissions in euros for each sales representative based on their level of participation in the sale.
* **Cross-Layer JOIN:** The `orders` table is joined with `invoicing_data` using the `order_id`.
* **Financial Calculations:** The net value is calculated by deducting VAT (`vat`) from the gross value (`grossValue`) and converting cents to euros.
* **Hierarchy Handling:** The `posexplode` function is used to unpack the list of sales representatives while preserving their index (position) to apply the correct percentages (6%, 2.5% and 0.95%).
* **Validation:** The *Unit Test* joins simulated data and verifies that the arithmetic and rounding are correct (down to the cent).


In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, split, posexplode, sum, round, when, trim

# ==========================================
# 1. FUNCTION DEFINITION (Gold Logic)
# ==========================================
def calculate_sales_commissions(df_orders: DataFrame, df_invoicing: DataFrame) -> DataFrame:
    """
    Joins orders and invoicing data, calculates net value, and distributes commissions 
    based on the salesowner's position in the list.
    """
    # 1. Join between orders and invoicing_data using order_id
    df_joined = df_orders.join(
        df_invoicing, 
        df_orders.order_id == df_invoicing.orderId, 
        "inner"
    )
    
    # 2. Calculate net value in cents: Net = Gross / (1 + VAT/100)
    df_net = df_joined.withColumn(
        "net_value_cents", 
        col("grossValue") / (1 + (col("vat") / 100))
    )
    
    # 3. Explode salesowners while preserving their position (index)
    # We split by ',' and use trim() to remove any leading/trailing whitespace
    df_exploded = df_net.select(
        "order_id",
        "net_value_cents",
        posexplode(split(col("salesowners"), ",")).alias("position", "salesowner_name")
    )
    
    # Clean up whitespace for accurate grouping
    df_exploded = df_exploded.withColumn("salesowner_name", trim(col("salesowner_name")))
    
    # 4. Assign commission percentages according to rank (position)
    df_commissions = df_exploded.withColumn("commission_cents",
        when(col("position") == 0, col("net_value_cents") * 0.06)      # Main Owner: 6%
        .when(col("position") == 1, col("net_value_cents") * 0.025)    # Co-owner 1: 2.5%
        .when(col("position") == 2, col("net_value_cents") * 0.0095)   # Co-owner 2: 0.95%
        .otherwise(0.0)                                                # Others: 0%
    )
    
    # 5. Group by salesowner, sum, convert cents to euros (/100), and round
    df_final = (df_commissions
        .groupBy("salesowner_name")
        .agg(sum("commission_cents").alias("total_commission_cents"))
        .withColumn("commission_euros", round(col("total_commission_cents") / 100, 2))
        .select("salesowner_name", "commission_euros")
        .orderBy(col("commission_euros").desc()) # Sort by highest earners first
    )
    
    return df_final

# ==========================================
# 2. VERBOSE UNIT TESTING
# ==========================================
def test_calculate_sales_commissions():
    print("="*60)
    print("🧪 STARTING UNIT TEST: Sales Commissions (Test 4)")
    print("="*60)
    
    print("Step 1: Creating mock data...")
    print("  Scenario details:")
    print("  - Order 'ORD-1' with Gross: 11,900 cents and 19% VAT.")
    print("    -> Calculated Net: 10,000 cents (100.00 Euros).")
    print("  - Salesowners: 'Luke, Leia, Han, Chewbacca'.")
    print("    -> Luke (Rank 0) should earn 6.00 € (6%)")
    print("    -> Leia (Rank 1) should earn 2.50 € (2.5%)")
    print("    -> Han (Rank 2) should earn 0.95 € (0.95%)")
    print("    -> Chewbacca (Rank 3) should earn 0.00 € (0%)")
    
    mock_orders = spark.createDataFrame(
        [("ORD-1", "Luke, Leia, Han, Chewbacca")], 
        ["order_id", "salesowners"]
    )
    mock_invoicing = spark.createDataFrame(
        [("ORD-1", 11900.0, 19.0)], 
        ["orderId", "grossValue", "vat"]
    )
    
    print("\nStep 2: Processing data through 'calculate_sales_commissions'...")
    result_df = calculate_sales_commissions(mock_orders, mock_invoicing)
    results = {row.salesowner_name: row.commission_euros for row in result_df.collect()}
    
    print("\nStep 3: Running mathematical assertions...")
    
    assert results.get("Luke") == 6.00, f"Error for Luke. Expected 6.00, got: {results.get('Luke')}"
    print("  -> ✅ Main Owner (Luke): Exact commission validated (6.00 €).")
    
    assert results.get("Leia") == 2.50, f"Error for Leia. Expected 2.50, got: {results.get('Leia')}"
    print("  -> ✅ Co-owner 1 (Leia): Exact commission validated (2.50 €).")
    
    assert results.get("Han") == 0.95, f"Error for Han. Expected 0.95, got: {results.get('Han')}"
    print("  -> ✅ Co-owner 2 (Han): Exact commission validated (0.95 €).")
    
    assert results.get("Chewbacca") == 0.00, f"Error for Chewbacca. Expected 0.00, got: {results.get('Chewbacca')}"
    print("  -> ✅ Others (Chewbacca): Correctly excluded from commission (0.00 €).")
    
    print("\n🏆 RESULT: Unit Test passed! Financial logic is 100% correct.")
    print("="*60 + "\n")

# Execute the test
test_calculate_sales_commissions()

# ==========================================
# 3. PRODUCTION EXECUTION
# ==========================================
print("🚀 Running the commission pipeline on Silver tables...\n")


df_orders = spark.table("default.silver_orders")
df_invoicing = spark.table("default.silver_invoicing")

# The function logic remains the same
df_commission_report = calculate_sales_commissions(df_orders, df_invoicing)

# Save the final result as a Gold table to keep the catalog clean
df_commission_report.write.mode("overwrite").format("delta").saveAsTable("default.gold_sales_commissions")

print("🏆 Sales commission report generated successfully.")
display(spark.table("default.gold_sales_commissions").limit(10))

🧪 STARTING UNIT TEST: Sales Commissions (Test 4)
Step 1: Creating mock data...
  Scenario details:
  - Order 'ORD-1' with Gross: 11,900 cents and 19% VAT.
    -> Calculated Net: 10,000 cents (100.00 Euros).
  - Salesowners: 'Luke, Leia, Han, Chewbacca'.
    -> Luke (Rank 0) should earn 6.00 € (6%)
    -> Leia (Rank 1) should earn 2.50 € (2.5%)
    -> Han (Rank 2) should earn 0.95 € (0.95%)
    -> Chewbacca (Rank 3) should earn 0.00 € (0%)

Step 2: Processing data through 'calculate_sales_commissions'...

Step 3: Running mathematical assertions...
  -> ✅ Main Owner (Luke): Exact commission validated (6.00 €).
  -> ✅ Co-owner 1 (Leia): Exact commission validated (2.50 €).
  -> ✅ Co-owner 2 (Han): Exact commission validated (0.95 €).
  -> ✅ Others (Chewbacca): Correctly excluded from commission (0.00 €).

🏆 RESULT: Unit Test passed! Financial logic is 100% correct.

🚀 Running the commission pipeline on Silver tables...

🏆 Sales commission report generated successfully.


salesowner_name,commission_euros
Leonard Cohen,650.31
David Henderson,487.77
Luke Skywalker,377.61
Yuri Gagarin,309.23
David Goliat,279.38
Ammy Winehouse,209.52
Marianov Merschik,188.61
Chris Pratt,114.08
Vladimir Chukov,72.83
Marie Curie,70.52


## 3.3 Test 5: Customer and Assigned Sales Representatives Consolidation
**Objective:** Generate a DataFrame (`df_3`) with a unique company catalogue and a sorted list of all sales representatives who have worked with each company.
* **Duplicate Consolidation (Data Quality):** Addressing the requirement regarding duplicate customers, a company name normalisation is implemented (removing special characters, spaces and converting to lowercase) to use it as a grouping key. This merges identical entities that have multiple IDs.
* **Array Handling:** `explode` is used to separate sales representatives, `collect_set` to obtain an array of unique elements per company, and `array_sort` to guarantee alphabetical order.
* **Validation:** The *Unit Test* simulates a duplicate company under two different IDs, validates the deduplication of repeated sales representatives and checks the strict alphabetical order of the final list.

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, split, explode, trim, collect_set, array_sort, array_join, first, lower, regexp_replace

# ==========================================
# 1. FUNCTION DEFINITION (Gold Logic)
# ==========================================
def get_companies_with_salesowners(df_orders: DataFrame) -> DataFrame:
    """
    Consolidates companies by resolving duplicates and generates a unique, 
    comma-separated list of their salesowners sorted alphabetically.
    """
    # 1. Sales Representative Cleaning (Nested Generator fix):
    # First we explode, then in a separate step we trim
    df_exploded = (df_orders
        .withColumn("salesowner_raw", explode(split(col("salesowners"), ",")))
        .withColumn("salesowner", trim(col("salesowner_raw")))
    )
    
    # 2. Company Name Normalization (the master key against duplicates)
    df_norm = df_exploded.withColumn(
        "normalized_name", 
        regexp_replace(lower(col("company_name")), "[^a-z0-9]", "")
    )
    
    # 3. Grouping and Consolidation
    df_grouped = df_norm.groupBy("normalized_name").agg(
        first("company_id").alias("company_id"),      # Keep any representative company_id
        first("company_name").alias("company_name"),  # Keep any representative company_name
        array_sort(collect_set("salesowner")).alias("unique_salesowners") 
    )
    
    # 4. Format the resulting array as a comma-separated String
    df_final = df_grouped.withColumn(
        "list_salesowners", 
        array_join(col("unique_salesowners"), ", ")
    ).select("company_id", "company_name", "list_salesowners")
    
    return df_final

# ==========================================
# 2. VERBOSE UNIT TESTING
# ==========================================
def test_get_companies_with_salesowners():
    print("="*60)
    print("🧪 STARTING UNIT TEST: Company Catalogue (Test 5)")
    print("="*60)
    
    print("Step 1: Creating simulated data (Mock Data)...")
    print("  Simulated scenario:")
    print("  - Record 1: 'Veggie Inc' (ID-1) served by 'Zack, Alice'")
    print("  - Record 2: 'veggie inc.' (ID-2) served by 'Bob, Alice' -> IT IS A DUPLICATE!")
    print("    -> Expected Result: A single consolidated row.")
    print("    -> Expected Sales Reps: 'Alice, Bob, Zack' (Unique and in alphabetical order).")
    
    mock_data = [
        ("ID-1", "Veggie Inc", "Zack, Alice"),
        ("ID-2", "veggie inc.", "Bob, Alice")
    ]
    mock_df = spark.createDataFrame(mock_data, ["company_id", "company_name", "salesowners"])
    
    print("\nStep 2: Processing data with 'get_companies_with_salesowners'...")
    result_df = get_companies_with_salesowners(mock_df)
    results = result_df.collect()
    
    print("\nStep 3: Running validations (Asserts)...")
    
    # Validation 1: Duplicate consolidation
    num_companies = len(results)
    assert num_companies == 1, f"Error: Expected 1 consolidated company, got {num_companies}"
    print("  -> ✅ Duplicate Consolidation: Name variations and IDs were merged into 1 row.")
    
    # Validation 2: Deduplication and Alphabetical Order
    final_list = results[0].list_salesowners
    assert final_list == "Alice, Bob, Zack", f"Error in the list. Got: {final_list}"
    print("  -> ✅ Sales Rep Handling: The list removed duplicates and sorted perfectly (Alice, Bob, Zack).")
    
    print("\n🏆 RESULT: Unit Test passed! The cleansing and consolidation logic is top-notch.")
    print("="*60 + "\n")

# Run the test
test_get_companies_with_salesowners()

# ==========================================
# 3. APPLICATION TO REAL DATA
# ==========================================
print("🚀 Running the consolidation on the real database...\n")

df_orders = spark.table("default.silver_orders")

# This is the df_3 requested by the exercise
df_3 = get_companies_with_salesowners(df_orders)

# Visualise the result
display(df_3.limit(10))

# ==========================================
# 3. APPLICATION TO REAL DATA (Persisting Gold)
# ==========================================
print("🚀 Running the consolidation and saving to Gold Layer...\n")

# Reading from Silver as required for Gold logic
df_orders = spark.table("default.silver_orders")

# Calculate the final business logic
df_gold_companies_owners = get_companies_with_salesowners(df_orders)

df_gold_companies_owners.write.format("delta").mode("overwrite") \
    .saveAsTable("default.gold_companies_salesowners")

print("🏆 Table 'default.gold_companies_salesowners' created successfully.")


🧪 STARTING UNIT TEST: Company Catalogue (Test 5)
Step 1: Creating simulated data (Mock Data)...
  Simulated scenario:
  - Record 1: 'Veggie Inc' (ID-1) served by 'Zack, Alice'
  - Record 2: 'veggie inc.' (ID-2) served by 'Bob, Alice' -> IT IS A DUPLICATE!
    -> Expected Result: A single consolidated row.
    -> Expected Sales Reps: 'Alice, Bob, Zack' (Unique and in alphabetical order).

Step 2: Processing data with 'get_companies_with_salesowners'...

Step 3: Running validations (Asserts)...
  -> ✅ Duplicate Consolidation: Name variations and IDs were merged into 1 row.
  -> ✅ Sales Rep Handling: The list removed duplicates and sorted perfectly (Alice, Bob, Zack).

🏆 RESULT: Unit Test passed! The cleansing and consolidation logic is top-notch.

🚀 Running the consolidation on the real database...



company_id,company_name,list_salesowners
5c17d142-4b21-4293-8a34-8dcd2bc24f82,Fresh Farms Ltd,"Chris Pratt, David Goliat, Marie Curie"
20dfef10-8f4e-45a1-82fc-123f4ab2a4a5,healthy snacks c.o.,"Luke Skywalker, Marianov Merschik, Vladimir Chukov"
7d4b212e-29e5-4f2a-9b28-745a3c7f0b60,Organic Veggies Ltd,"Ammy Winehouse, David Henderson, Leonard Cohen"
8f1c5d4a-9045-4be5-bb38-7f587f478a92,Farm Fresh Co,"Ammy Winehouse, Chris Pratt, Leonard Cohen"
f712dc3d-4681-4ec6-9b76-bf47b4ccf5b2,Green Organic Co,"Chris Pratt, Leon Leonov"
4a7561b1-1de1-420a-93ed-2c12a5bbd1ab,Farms Global Co,"David Goliat, Leonard Cohen"
9b31b19f-69a2-4aeb-8f6e-f4b8d2f9c12a,Veggies Unlimited,"Ammy Winehouse, Leon Leonov"
34538e39-cd2e-4641-8d24-3c94146e6f16,Meat Packers Ltd,"Chris Pratt, David Henderson, Leon Leonov, Marianov Merschik"
83df789a-b30c-4a1b-8e67-1f512bfa20c7,Tropical Fresh Co,"Ammy Winehouse, David Henderson, Leonard Cohen"
0d09ae2b-d9a5-4d67-bb97-963be9379b4e,Healthy Eats Ltd,"Ammy Winehouse, Yuri Gagarin"


🚀 Running the consolidation and saving to Gold Layer...

🏆 Table 'default.gold_companies_salesowners' created successfully.


# 📊 4. Consumption and Visualisation (Test 6)
**Objective:** Develop an interactive control panel (Dashboard) for sales team *stakeholders* to analyse sales representative performance and the distribution of crate types.

To meet the highest Data Engineering standards and fulfil the challenge requirement of providing a **reproducible execution environment**, the visualisation solution is decoupled from this processing notebook following this architecture:

1. **Data Extraction:** We will export a *snapshot* of the clean data from our Databricks database to a portable format (CSV/Parquet).
2. **Frontend Development:** We will build an interactive analytical web application using **Streamlit** and **Plotly** (outside of Databricks).
3. **Containerisation (Docker):** We will package the web application and its dependencies in a **Docker** container. This ensures that any reviewer can launch the dashboard on their local machine with a single command, without worrying about library versions or environment configurations.