# Lab 2: Building a Data Pipeline

**Introduction to Data Science & Engineering - Day 2**

| Duration | Difficulty | Framework | Exercises |
|---|---|---|---|
| 120 min | Intermediate | pandas, sqlite3 | 6 |

In this lab, you'll practice:

- Extracting data from multiple sources
- Implementing data quality assertions
- Building ETL transformations
- Modeling data into a star schema
- Loading into SQLite and Parquet
- Running analytical SQL queries

## Setup

In [None]:
import numpy as np
import pandas as pd
import sqlite3
import json
import os
from datetime import datetime, timedelta

pd.set_option('display.max_columns', None)
print("Libraries loaded successfully!")

## Part 1: Extract — Loading Data from Multiple Sources

In real pipelines, data comes from many sources. We'll simulate three: CSV, JSON, and a Python dictionary (representing an API response).

### Exercise 1.1: Create and Extract Source Data

Run the four cells below to generate the source data you will work with throughout this lab.

In [None]:
# Source 1: Orders data (simulating CSV)
np.random.seed(42)
n_orders = 1500

start_date = datetime(2024, 1, 1)
order_dates = [start_date + timedelta(days=np.random.randint(0, 365)) for _ in range(n_orders)]

orders_data = {
    'order_id': range(1001, 1001 + n_orders),
    'customer_id': np.random.randint(1, 201, n_orders),
    'product_id': np.random.randint(1, 51, n_orders),
    'order_date': order_dates,
    'quantity': np.random.randint(1, 8, n_orders),
    'unit_price': np.round(np.random.uniform(10, 300, n_orders), 2),
    'discount_pct': np.random.choice([0, 5, 10, 15, 20], n_orders, p=[0.5, 0.2, 0.15, 0.1, 0.05]),
    'store_id': np.random.randint(1, 6, n_orders)
}

orders_df = pd.DataFrame(orders_data)

# Inject some quality issues
orders_df.loc[np.random.choice(orders_df.index, 30, replace=False), 'unit_price'] = np.nan
orders_df.loc[np.random.choice(orders_df.index, 10, replace=False), 'quantity'] = -1
orders_df.loc[np.random.choice(orders_df.index, 5, replace=False), 'customer_id'] = 999  # Invalid customer

print(f"Orders extracted: {len(orders_df)} rows")
orders_df.head()

In [None]:
# Source 2: Customer data (simulating JSON API response)
customers_json = {
    "customers": [
        {
            "customer_id": i,
            "name": f"Customer_{i:03d}",
            "email": f"customer{i}@email.com",
            "segment": np.random.choice(['Premium', 'Standard', 'Basic']),
            "join_date": (datetime(2020, 1, 1) + timedelta(days=np.random.randint(0, 1460))).strftime('%Y-%m-%d'),
            "city": np.random.choice(['Dublin', 'Cork', 'Galway', 'Limerick', 'Waterford']),
            "country": "Ireland"
        }
        for i in range(1, 201)
    ]
}

customers_df = pd.DataFrame(customers_json['customers'])
customers_df['join_date'] = pd.to_datetime(customers_df['join_date'])

print(f"Customers extracted: {len(customers_df)} rows")
customers_df.head()

In [None]:
# Source 3: Product catalog (simulating dict/API)
categories = ['Electronics', 'Clothing', 'Home & Garden', 'Books', 'Sports']
products = []
for i in range(1, 51):
    cat = categories[(i - 1) % len(categories)]
    products.append({
        'product_id': i,
        'product_name': f'{cat}_Item_{i:03d}',
        'category': cat,
        'brand': np.random.choice(['BrandA', 'BrandB', 'BrandC', 'BrandD']),
        'cost_price': round(np.random.uniform(5, 150), 2),
        'weight_kg': round(np.random.uniform(0.1, 20), 1)
    })

products_df = pd.DataFrame(products)
print(f"Products extracted: {len(products_df)} rows")
products_df.head()

In [None]:
# Source 4: Store reference data
stores_data = {
    'store_id': [1, 2, 3, 4, 5],
    'store_name': ['Dublin Central', 'Cork Main', 'Galway West', 'Online', 'Limerick Hub'],
    'store_type': ['Physical', 'Physical', 'Physical', 'Online', 'Physical'],
    'region': ['East', 'South', 'West', 'National', 'South']
}
stores_df = pd.DataFrame(stores_data)
print(f"Stores extracted: {len(stores_df)} rows")
stores_df

## Part 2: Validate — Data Quality Checks

Before transforming, validate source data quality.

### Exercise 2.1: Implement Quality Assertions

**Your Task:** Implement a reusable `run_quality_checks` function that takes a DataFrame, a list of check tuples `(name, function, is_critical)`, and a source name. It should print a formatted report and return `True` only if all critical checks pass. Then define checks for the orders data and run them.

In [None]:
def run_quality_checks(df, checks, source_name):
    """Run a list of quality checks and report results.
    
    Args:
        df: DataFrame to check
        checks: list of tuples (check_name, check_fn, is_critical)
        source_name: name for the report header
    
    Returns: True if all critical checks pass
    """
    # TODO: Print report header with source_name
    # TODO: Loop through checks, call each check_fn(df)
    # TODO: Print PASS/FAIL with checkmark/cross for each
    # TODO: Track whether any critical check failed
    # TODO: Return whether all critical checks passed
    pass

# TODO: Define order_checks as a list of tuples:
#   ("Not empty", lambda df: len(df) > 0, True),
#   ("No null order_ids", ..., True),
#   ("Unique order_ids", ..., True),
#   ("No null prices", ..., False),
#   ("Positive quantities", ..., False),
#   ("Valid customer_ids (1-200)", ..., False),
#   ("Valid dates", ..., True),
order_checks = None  # Your code here

# orders_valid = run_quality_checks(orders_df, order_checks, "Orders")

**Your Task:** Define quality checks for the customers DataFrame and run them. Check for: not empty, unique customer_ids, no null emails, and valid segments (Premium/Standard/Basic).

In [None]:
# TODO: Define customer_checks and run quality checks on customers_df
#   Checks: Not empty, Unique customer_ids, No null emails, Valid segments
customer_checks = None  # Your code here

# customers_valid = run_quality_checks(customers_df, customer_checks, "Customers")

## Part 3: Transform — Clean and Enrich Data

### Exercise 3.1: Clean Orders Data

**Your Task:** Write a `clean_orders` function that handles missing prices (fill with median), fixes negative quantities (set to 1), and removes rows with invalid customer IDs (not in 1-200).

In [None]:
def clean_orders(orders_df):
    """Clean the orders data.
    
    Steps:
    1. Fill missing prices with median
    2. Fix negative quantities (set to 1)
    3. Remove rows with invalid customer_ids (not in 1-200)
    
    Returns: cleaned DataFrame
    """
    # TODO: Copy the dataframe
    # TODO: Fill null unit_price with median
    # TODO: Set quantity <= 0 to 1
    # TODO: Filter to customer_id between 1 and 200
    pass

orders_clean = clean_orders(orders_df)

### Exercise 3.2: Enrich and Derive Fields

**Your Task:** Write an `enrich_orders` function that adds revenue calculations (gross_amount, discount_amount, net_amount) and date dimension fields (year, month, quarter, day_of_week, is_weekend).

In [None]:
def enrich_orders(orders_clean):
    """Add derived fields to orders data.
    
    Fields to create:
    - gross_amount = quantity * unit_price
    - discount_amount = gross_amount * discount_pct / 100
    - net_amount = gross_amount - discount_amount
    - order_year, order_month, order_quarter, order_day_of_week
    - is_weekend (1 if Saturday/Sunday)
    
    Returns: enriched DataFrame
    """
    # TODO: Ensure order_date is datetime
    # TODO: Calculate revenue fields
    # TODO: Extract date dimensions
    # TODO: Add is_weekend flag
    pass

orders_clean = enrich_orders(orders_clean)

## Part 4: Model — Star Schema Design

Transform our flat data into a star schema with a fact table and dimension tables.

### Exercise 4.1: Create Dimension Tables

**Your Task:** Create four dimension tables: `dim_customers`, `dim_products`, `dim_time`, and `dim_stores`. Each should rename primary keys to `*_key` format and add any enrichment fields specified.

In [None]:
def create_dim_customers(customers_df):
    """Create customer dimension table.
    
    - Rename customer_id to customer_key
    - Add tenure_days and tenure_years (relative to 2024-12-31)
    
    Returns: dim_customers DataFrame
    """
    # TODO: Copy and rename key column
    # TODO: Calculate tenure
    pass

In [None]:
def create_dim_products(products_df):
    """Create product dimension table.
    
    - Rename product_id to product_key
    - Add margin_tier using pd.cut (Low/Mid/High Cost)
    
    Returns: dim_products DataFrame
    """
    # TODO: Copy and rename key column
    # TODO: Add margin tier based on cost_price bins [0, 30, 80, 200]
    pass

In [None]:
def create_dim_time():
    """Create time dimension table for 2024.
    
    Columns: date_key, year, quarter, month, month_name, week,
             day_of_week, day_name, is_weekend
    
    Returns: dim_time DataFrame
    """
    # TODO: Generate date range for 2024
    # TODO: Extract all date components
    pass

In [None]:
# dim_stores - provided (no changes needed)
dim_stores = stores_df.copy()
dim_stores = dim_stores.rename(columns={'store_id': 'store_key'})
print(f"dim_stores: {dim_stores.shape}")
dim_stores

### Exercise 4.2: Create Fact Table

**Your Task:** Build the `fact_orders` table by selecting the relevant columns and renaming foreign keys to match the dimension key names. Then validate referential integrity.

In [None]:
def create_fact_orders(orders_clean):
    """Create fact_orders table from cleaned orders.
    
    - Select relevant columns
    - Rename foreign keys: customer_id->customer_key, etc.
    
    Returns: fact_orders DataFrame
    """
    # TODO: Select columns (order_id, customer_id, product_id, order_date, store_id,
    #        quantity, unit_price, discount_pct, gross_amount, discount_amount, net_amount)
    # TODO: Rename to dimension keys
    pass

In [None]:
def validate_star_schema(fact_orders, dim_customers, dim_products, dim_stores):
    """Validate referential integrity of the star schema.
    
    Check that all foreign keys in fact_orders exist in dimension tables.
    """
    # TODO: Check for orphan keys in each dimension
    # TODO: Print validation results
    pass

## Part 5: Load — Persist to SQLite and Parquet

### Exercise 5.1: Load into SQLite

**Your Task:** Write a function that loads all five star schema tables into a SQLite database and verifies the row counts.

In [None]:
def load_to_sqlite(fact_orders, dim_customers, dim_products, dim_time, dim_stores, db_path='ecommerce_warehouse.db'):
    """Load star schema tables into a SQLite database.
    
    Returns: sqlite3 connection
    """
    # TODO: Remove existing db file if present
    # TODO: Connect to SQLite
    # TODO: Write each table using to_sql (convert datetimes to strings first)
    # TODO: Verify row counts
    pass

conn = load_to_sqlite(fact_orders, dim_customers, dim_products, dim_time, dim_stores)

### Exercise 5.2: Load into Parquet

**Your Task:** Write a function that saves all five star schema tables as Parquet files and prints the file sizes.

In [None]:
def load_to_parquet(fact_orders, dim_customers, dim_products, dim_time, dim_stores, parquet_dir='warehouse_parquet'):
    """Save star schema tables as Parquet files.
    
    Print file sizes after saving.
    """
    # TODO: Create directory
    # TODO: Save each table as parquet
    # TODO: Print file sizes
    pass

load_to_parquet(fact_orders, dim_customers, dim_products, dim_time, dim_stores)

## Part 6: Analyze — SQL Analytics

Run analytical queries against our star schema.

### Exercise 6.1: Revenue Analytics

**Your Task:** Write three SQL queries against the star schema: monthly revenue trend, top 10 customers by revenue, and category performance by store.

In [None]:
# TODO: Write SQL query to get monthly revenue trend
# Join fact_orders with dim_time, group by month
# Select: month_name, month, total_orders, total_revenue, avg_order_value
query = """
-- Your SQL here
"""
# monthly_revenue = pd.read_sql(query, conn)

In [None]:
# TODO: Write SQL query to get top 10 customers by revenue
# Join fact_orders with dim_customers
# Select: name, segment, city, orders, total_spent, avg_order
query = """
-- Your SQL here
"""
# top_customers = pd.read_sql(query, conn)

In [None]:
# TODO: Write SQL query for category performance by store
# Join fact_orders with dim_stores and dim_products
# Select: store_name, category, orders, revenue, avg_discount
query = """
-- Your SQL here
"""
# store_category = pd.read_sql(query, conn)

### Cleanup

In [None]:
# Close connection and clean up
conn.close()

# Clean up temporary files
import shutil
if os.path.exists(db_path):
    os.remove(db_path)
if os.path.exists(parquet_dir):
    shutil.rmtree(parquet_dir)

print("Cleanup complete!")

## Summary

In this lab, you learned how to:

1. **Extract** data from multiple source formats (CSV, JSON, dict)
2. **Validate** data quality with assertion-based checks
3. **Transform** data — cleaning, enrichment, derived fields
4. **Model** data into a star schema (fact + dimension tables)
5. **Load** into both SQLite (for SQL queries) and Parquet (for analytics)
6. **Analyze** with SQL joins across the star schema

---

*Introduction to Data Science & Engineering | AI Elevate*