# Advanced Data Engineering Tutorial Series

## Introduction

Welcome to Module 2! In this notebook, you'll tackle business questions for a hypothetical retail store using synthetic datasets. Through hands-on tasks, you'll clean and enrich data to uncover insights about customer behavior, product sales, and store performance. Each exercise is designed to simulate common challenges faced by data engineers, preparing you to solve practical problems in analytics and reporting.

You'll also work through the medallion architecture a layered approach often used in data engineering to organize data pipelines into bronze (raw), silver (cleaned/enriched), and gold (analytics-ready) tables. This helps ensure data quality, scalability, and efficient analytics.

## What You'll Do in This Notebook

- Load and inspect raw synthetic datasets with intentional data issues
- Identify and resolve common data quality problems: nulls, duplicates, and inconsistencies
- Add computed columns for deeper analysis (e.g., `total_price`)
- Summarize and prepare cleaned data for downstream analytics
- Work through the medallion architecture, transforming data from bronze (raw) to gold (dashboard-ready)

## Learning Objectives

- Clean and validate complex datasets
- Perform multi-step transformations
- Apply joins, window functions, and aggregations



## 1. Synthetic Datasets Generated with Faker

The following synthetic datasets are created using the `Faker` Python package to simulate business scenarios for data cleaning and validation. Each dataset contains intentional data issues (nulls, duplicates, inconsistencies) for hands-on practice.

---

### **Dataset 1: Transactions**

Simulates customer purchase transactions, including details about the transaction, customer, and purchase location. Contains intentional nulls and duplicate records.

**Columns:**
- `transaction_id`
- `customer_id`
- `transaction_date`
- `location`

---

### **Dataset 2: Transaction Products**

Represents the products associated with each transaction, allowing for multi-product purchases per transaction. Includes nulls and duplicates for cleaning exercises.

**Columns:**
- `transaction_id`
- `product_id`
- `quantity`

---

### **Dataset 3: Product Catalog Reference**

A clean product catalog for enriching with product category and price information.

**Columns:**
- `product_id`
- `product_name`
- `category`
- `standard_price`

---

### **Dataset 4: Customer Demographics**

Contains customer demographic information for advanced enrichment. Includes intentional null values and duplicate entries.

**Columns:**
- `customer_id`
- `customer_name`
- `email`
- `date_of_birth`
- `gender`
- `signup_date`
- `country`

---

### **Summary**

- All datasets are generated with Faker and pandas.
- Data issues are intentionally introduced for cleaning and validation practice.
- The Transactions and Transaction Products datasets together model multi-product purchases and customer activity.
- Use these datasets for exercises in null handling, deduplication, and enrichment.

In [0]:
%pip install faker pandas

from faker import Faker
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random

fake = Faker()
Faker.seed(42)
np.random.seed(42)
random.seed(42)

# --- Supermarket Product Names and Categories ---
supermarket_products = {
    "Produce": [
        "Bananas", "Apples", "Carrots", "Lettuce"
    ],
    "Dairy": [
        "Milk", "Cheddar Cheese", "Yogurt", "Butter"
    ],
    "Bakery": [
        "Whole Wheat Bread", "Bagels", "Croissants", "Muffins"
    ],
    "Meat & Seafood": [
        "Chicken Breast", "Salmon Fillet", "Ground Beef", "Pork Chops"
    ],
    "Pantry": [
        "Pasta", "Rice", "Canned Beans", "Olive Oil"
    ]
}

product_ids = [f"P{str(i).zfill(4)}" for i in range(1, 21)]
product_names = []
categories = []
for cat, names in supermarket_products.items():
    product_names.extend(names)
    categories.extend([cat] * len(names))

locations = [fake.city() for _ in range(5)]
# Skew location probabilities (e.g., first location is much more common)
location_probs = [0.4, 0.2, 0.15, 0.15, 0.1]

# --- Dataset 1: Customer Transactions (normalized) ---
n_transactions = 10000

# Track locations per customer
customer_locations = {}

transactions = []
transaction_products = []
for i in range(n_transactions):
    # Introduce missing customer_id for some transactions
    customer_id = f"C{random.randint(1, 2000):04d}" if random.random() > 0.01 else None
    transaction_id = f"T{10000 + i}"
    transaction_date = fake.date_between(start_date='-900d', end_date='today')
    # Assign location, ensuring max 4 locations per customer
    if customer_id is not None:
        if customer_id not in customer_locations:
            customer_locations[customer_id] = set()
        available_locations = list(set(locations) - customer_locations[customer_id])
        if not available_locations:
            location = random.choice(list(customer_locations[customer_id]))
        else:
            # Skewed location selection for new locations
            probs = [location_probs[locations.index(loc)] for loc in available_locations]
            probs = np.array(probs) / np.sum(probs)
            location = np.random.choice(available_locations, p=probs)
            customer_locations[customer_id].add(location)
    else:
        # Skewed location selection for missing customer_id
        location = np.random.choice(locations, p=location_probs)
    n_products = np.random.choice([1, 2, 3, 4], p=[0.5, 0.3, 0.15, 0.05])
    product_indices = random.sample(range(len(product_ids)), n_products)
    transactions.append({
        "transaction_id": transaction_id,
        "customer_id": customer_id,
        "transaction_date": transaction_date,
        "location": location
    })
    for product_idx in product_indices:
        # Introduce missing product_id and wrong product_id
        if random.random() < 0.01:
            pid = None
        elif random.random() < 0.01:
            pid = "P9999"  # Non-existent product_id
        else:
            pid = product_ids[product_idx]
        # Introduce missing quantity and negative quantity
        if random.random() < 0.01:
            qty = None
        elif random.random() < 0.01:
            qty = -1 * np.random.choice([1, 2, 3, 4, 5])
        else:
            qty = np.random.choice([1, 2, 3, 4, 5, None], p=[0.3, 0.25, 0.2, 0.15, 0.08, 0.02])
        transaction_products.append({
            "transaction_id": transaction_id,
            "product_id": pid,
            "quantity": qty
        })

df_transactions = pd.DataFrame(transactions)
df_transaction_products = pd.DataFrame(transaction_products)

# --- Dataset 2: Product Catalog Reference (normalized) ---
catalog = []
for pid, pname, cat in zip(product_ids, product_names, categories):
    entry = {
        "product_id": pid,
        "product_name": pname,
        "category": cat,
        "standard_price": round(np.random.uniform(5, 15), 2)
    }
    catalog.append(entry)
df_catalog = pd.DataFrame(catalog)

# --- Dataset 3: Customer Demographics (normalized) ---
n_customers = 2000
customer_ids = [f"C{str(i).zfill(4)}" for i in range(1, n_customers + 1)]
demographics = []
for cid in customer_ids:
    dob = fake.date_of_birth(minimum_age=18, maximum_age=80)
    # Introduce missing name and wrong gender
    name = fake.name() if random.random() > 0.01 else None
    gender = np.random.choice(["Male", "Female", "Other", None, "Unknown"], p=[0.47, 0.47, 0.03, 0.01, 0.02])
    # Construct email as "lastname.firstname@example.com" if name is present
    if name is not None:
        parts = name.split()
        if len(parts) >= 2:
            firstname = parts[0].replace(".", "").lower()
            lastname = parts[-1].replace(".", "").lower()
            email = f"{lastname}.{firstname}@example.com"
        else:
            email = "invalid_email"
    else:
        # 50% chance of valid email using a random name
        if random.random() < 0.5:
            rand_name = fake.name()
            parts = rand_name.split()
            if len(parts) >= 2:
                firstname = parts[0].replace(".", "").lower()
                lastname = parts[-1].replace(".", "").lower()
                email = f"{lastname}.{firstname}@example.com"
            else:
                email = "invalid_email"
        else:
            email = "invalid_email"
    demographics.append({
        "customer_id": cid,
        "customer_name": name,
        "email": email,
        "date_of_birth": dob,
        "gender": gender,
        "signup_date": fake.date_between(start_date=datetime.today() - timedelta(days=5*365), end_date='today'),
        "country": fake.country()
    })
df_demographics = pd.DataFrame(demographics)

# --- Introduce duplicate and invalid records ---
n_demo_dupes = random.randint(10, 30)
df_demographics = pd.concat([df_demographics, df_demographics.sample(n_demo_dupes, random_state=3)], ignore_index=True)
df_demographics.loc[df_demographics.sample(frac=0.01, random_state=4).index, "email"] = "invalid_email"

n_trans_dupes = random.randint(10, 30)
df_transactions = pd.concat([df_transactions, df_transactions.sample(n_trans_dupes, random_state=5)], ignore_index=True)

n_prod_dupes = random.randint(10, 30)
df_transaction_products = pd.concat([df_transaction_products, df_transaction_products.sample(n_prod_dupes, random_state=6)], ignore_index=True)

# --- Convert to Spark DataFrames ---
df_transactions_spark = spark.createDataFrame(df_transactions)
df_transaction_products_spark = spark.createDataFrame(df_transaction_products)
df_catalog_spark = spark.createDataFrame(df_catalog)
df_demographics_spark = spark.createDataFrame(df_demographics)

display(df_transactions_spark)
display(df_transaction_products_spark)
display(df_catalog_spark)
display(df_demographics_spark)

In [0]:
df_transactions_spark.write.mode("overwrite").saveAsTable("dbx_course_catalog.landing.transactions")
df_transaction_products_spark.write.mode("overwrite").saveAsTable("dbx_course_catalog.landing.transaction_products")
df_catalog_spark.write.mode("overwrite").saveAsTable("dbx_course_catalog.landing.catalog")
df_demographics_spark.write.mode("overwrite").saveAsTable("dbx_course_catalog.landing.demographics")

## 0: Medallion Architecture: Theory and Application in This Notebook

The **medallion architecture** is a layered approach to organizing data pipelines. It divides data into three main layers:

- **Bronze (Raw):** The initial landing zone for ingested data. This layer contains raw, unvalidated records directly from source systems, often with schema inconsistencies, duplicates, and missing values.
- **Silver (Validated):** The cleaned and enriched layer. Here, data is deduplicated, validated, and normalized. Business rules are applied to correct errors and inconsistencies, making the data reliable for downstream analytics.
- **Gold (Enriched/Presentation):** The analytics-ready layer. Data is aggregated, modeled, and transformed to support reporting, dashboards, and advanced analytics.

> The medallion architecture isn't a set of strict rules. It's more of a flexible set of guidelines. You can adapt the boundaries and requirements of each layer to fit your business needs, data sources, and analytics goals. The key is to maintain a clear separation between raw, cleaned, and analytics-ready data, but the specific transformations and validations can vary depending on your use case.

**How we move from raw to silver to gold in this notebook:**

1. **Bronze Layer:**  
   - We start by generating synthetic datasets with intentional data issues (nulls, duplicates, inconsistencies) and save them to the `dbx_course_catalog.landing` schema.  
   - These raw tables represent the bronze layer.

2. **Silver Layer:**  
   - We load the raw tables and perform data cleaning: removing duplicates, handling nulls, correcting invalid values, and enriching records.  
   - Cleaned tables are saved to the `dbx_course_catalog.silver` schema, representing the silver layer.

3. **Gold Layer:**  
   - We further transform the silver tables by joining, aggregating, and deriving new features (e.g., total sales, customer segments, product rankings).  
   - These analytics-ready tables are saved to the `dbx_course_catalog.gold` schema, forming the gold layer for reporting and advanced analysis.

This stepwise approach ensures that only high-quality, business-ready data is used for insights and decision-making.

## 1: Data Cleaning & Validation

Before we dive into answering business questions for our hypothetical store, our first mission is to clean the data. Unfortunately data is rarely perfect, every dataset arrives with its own quirks—missing values, duplicates, and odd inconsistencies. Regardless of the analysis ahead, we must ensure our data is trustworthy and ready for use.

Cleaning data isn't just a technical task; it often involves business decisions, like whether to fix or remove incomplete records. In this notebook, you won't need to make those choices yourself—the exercises will guide you step by step, explaining what to do in each scenario.

**Datasets:**
1. Transactions
2. Transaction Products
3. Product Catalog
4. Customer Demographics

**Key Activities (for each dataset):**
- Load and inspect the dataset
- Detect and handle null values in critical columns
- Remove duplicate records
- Identify and correct inconsistent values (e.g., prices, emails, dates)
- Summarize cleaned data

Exercises (apply to each dataset):**
1. For each dataset, answer the following:
   - Which rows in the Transactions dataset have missing `customer_id`?  
   *Delete records in the Transactions dataset where `customer_id` is null. Use the `filter` function to delete records without `customer_id`.*
   - Which rows in the Transaction Products dataset have missing `quantity` or `product_id`?  
     *Delete records with missing `quantity` or `product_id`. Also, to correct negative `quantity` values, use the `when`/`otherwise` function: set `quantity = when(quantity < 0, -quantity).otherwise(quantity)`.*
   - Which rows in the Demographics dataset have missing `customer_name`, missing `gender`, or an invalid email?  
     *If `gender` is missing, assign "Unknown" (tip: use the `lit` function to set a constant value).*
        > **Extra (Advanced) Exercise:**  
        For records with missing `email` but available `customer_name`, set `email` as `lastname.firstname@example.com`. If both are missing, delete the record. Create a new column (e.g., `is_manually_repaired`) to flag records that were manually repaired. 
        
        > If `email` is available but `customer_name` is missing, attempt to extract the name from the email address (e.g., split on "." and "@" and use as first/last name). This logic can be challenging. If you can't get it to work, delete records where `customer_name` is missing but `email` is present and continue with the exercises.

2. Remove duplicate records based on unique keys.  
   *Tip: Use the `dropDuplicates()` function.*


In [0]:
df_transactions_spark = spark.table("dbx_course_catalog.landing.transactions")
df_transaction_products_spark = spark.table("dbx_course_catalog.landing.transaction_products")
df_catalog_spark = spark.table("dbx_course_catalog.landing.catalog")
df_demographics_spark = spark.table("dbx_course_catalog.landing.demographics")

In [0]:
from pyspark.sql.functions import col, when, count, lit

# 1. Delete all rows with missing critical IDs

# 2. Correct negative quantity values in df_transaction_products_spark by multiplying with -1

# 3. Clean gender, email and customer_name fields, flag manually repaired records


In [0]:
# 4. Remove duplicates from tables


### Saving and Loading tables
Before saving, create the schema if it does not exist using `spark.sql`. Then save your cleaned tables to the `dbx_course_catalog.silver` schema with `.write.saveAsTable("dbx_course_catalog.silver.<table_name>")`. After saving, load the tables for further analysis, use `spark.table("dbx_course_catalog.silver.<table_name>")`.

In [0]:
# Save tables in silver schema

In [0]:
# Load tables from silver schema

## 2. Advanced Data Engineering Exercises

Below are categorized exercises that leverage derived columns and advanced data engineering techniques:

#### 1. **Data Enrichment & Feature Engineering**
- Create `total_price` in the Transactions dataset (`quantity * unit_price`)
> **Tip:** To calculate `total_price`, join the Transaction Products with the Product Catalog on `product_id`. Join the resulting table to Transactions on `transaction_id`. 
- Add `customer_age` and `is_adult` in Customer Demographics
> **Tip:** To calculate `customer_age`, use the `datediff` and `current_date` functions with `date_of_birth`.
- Flag `is_high_value_transaction` in Transactions

#### 2. **Temporal & Hierarchical Analysis**
- Extract `transaction_week` and `transaction_quarter` from transaction dates
> **Tip:** Need to extract the week or quarter from a date in PySpark? Search online for built-in PySpark functions that make calculating week and quarter straightforward.
- Calculate `days_since_last_purchase` for each customer
> **Tip:** To calculate metrics like `days_since_last_purchase`, use window functions to partition your data by `customer_id` and order by `transaction_date`. The `lag` function lets you access the previous transaction date for each customer, enabling you to compute the difference between current and prior purchases.

> Window functions in Spark allow you to perform calculations across a set of rows that are related to the current row, without collapsing them into a single output row. This is useful for tasks like calculating running totals, ranking, or accessing previous/next values in a partitioned group of data. You define a "window" using `PARTITION BY` (to group rows) and `ORDER BY` (to sort within each group), and then apply functions such as `lag`, `lead`, `row_number`, `rank`, or aggregate functions like `sum` or `avg` over that window. This enables advanced analytics like time-based calculations, cohort analysis, and more, all while keeping the original row structure. 
- Aggregate sales by product, and location

At this point, you've been introduced to the key functions you'll need for these exercises. As you work through the next steps, feel free to use these functions or consult the PySpark and Databricks documentation for any additional tools or techniques you may need.
#### 3. **Ranking**
- Rank products by sales volume (`product_popularity`)
- Flag top selling products with `is_top_seller`
- Assign customer segments based on purchase frequency (e.g. platinum, gold, silver, bronze)

#### 4. **Segmentation & Cohort Analysis**
- Categorize customers by `age_group` and `tenure_days`
- Estimate `customer_lifetime_value` (total spend per customer) by joining transactions and demographics


These exercises will help you practice multi-step transformations, complex aggregations, and robust data validation, preparing your datasets for deeper analytics and machine learning.

#### 1. **Data Enrichment & Feature Engineering**

In [0]:
from pyspark.sql.functions import col, when, lit

# 1. Data Enrichment & Feature Engineering

# Join transaction products with catalog and transactions to calculate total price and flag high value transactions


When performing left joins between DataFrames (e.g., joining transactions with products or customer demographics), it's possible to produce rows where for example the `transaction_id` or `customer_id` column is null. This typically means that the join key from the left DataFrame did not find a matching record in the right DataFrame, indicating missing or unmatched data. Such nulls can signal data integrity issues, such as orphaned records or incomplete relationships. For initial cleaning, we will just delete records with null `transaction_id` and `customer_id` values to ensure only valid, fully matched transactions are retained for analysis.

In [0]:
df_enriched_txn = df_enriched_txn.filter(col("customer_id").isNotNull())

In [0]:
# Calculate customer age and flag adults

#### 2. **Temporal & Hierarchical Analysis**

In [0]:
from pyspark.sql.functions import weekofyear, quarter, lag as spark_lag, sum as spark_sum
from pyspark.sql.window import Window

# 2. Temporal & Hierarchical Analysis

# Extract transaction week and quarter

# Days since last purchase per customer

# Aggregate sales by product, location


#### 3. **Ranking**

In [0]:
from pyspark.sql.functions import count as spark_count, rank

# 3. Ranking

# Product popularity ranking

# Customer segments by purchase frequency


#### 4. **Segmentation & Cohort Analysis**

In [0]:
# 4. Segmentation

# Age group and tenure

# Estimate customer lifetime value

### Saving gold tables
Save your final tables in a new schema, `dbx_course_catalog.gold`.  
Make sure to create the schema if it doesn't already exist before saving the tables.

In [0]:
# save tables to gold schema