# Example Transformation Pipeline

**Purpose**: Demonstrate best practices for data transformations using EnvironmentConfig

**Author**: {TEAM NAME OR INDIVIDUAL}

**Last Updated**: {LAST UPDATE DATE}

## Overview
This notebook shows how to:
- Set up environment configuration
- Read from source tables
- Transform data
- Write to output tables
- Handle errors gracefully

In [0]:
# Import required libraries
import sys
import os
from pyspark.sql import functions as F
from pyspark.sql.utils import AnalysisException

In [0]:
# Add projects directory to Python path
current_dir = os.path.dirname(os.path.abspath(__file__)) if '__file__' in dir() else os.getcwd()
projects_path = os.path.abspath(os.path.join(current_dir, "..", ".."))

if projects_path not in sys.path:
    sys.path.insert(0, projects_path)

print(f"Added to Python path: {projects_path}")

In [0]:
# Configure Spark for optimal performance
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")  # Handle corrupt files
spark.conf.set("spark.sql.files.ignoreMissingFiles", "true")  # Handle missing files

In [0]:
# Set Spark config so EnvironmentConfig can read these values
spark.conf.set("bundle.catalog", "sandbox")
spark.conf.set("bundle.schema", "analytics_engineering")
spark.conf.set("bundle.core_catalog", "core_views")

print("✓ Spark configuration set for sandbox environment")

In [0]:
# Initialize EnvironmentConfig
from src.environment_config import EnvironmentConfig

config = EnvironmentConfig()
print(f"Environment Config: {config}")

# Import utility functions from this project
from example_project.utilities import (
    validate_dataframe,
    check_data_quality,
    add_audit_columns,
    get_date_range,
    categorize_sport_udf
)

print("✓ Utility functions imported successfully")

## Step 1: Read Source Data

Use `config.get_core_table_path()` to get source tables from the core catalog.

In [0]:
# Define source tables using EnvironmentConfig
source_table = config.get_core_table_path("sportsbook", "bet_legs")
print(f"Reading from: {source_table}")

# Read data with filters - aggregation pushdown for performance
query = f"""
SELECT 
    bet_placed_local_ts AS bet_date,
    leg_sport_name_reporting AS sport_name,
    leg_competition_name_reporting AS competition_name,
    leg_market_name_reporting AS market_name,
    COUNT(DISTINCT bet_id) AS bet_count,
    SUM(bet_portion) AS total_stake
FROM {source_table}
WHERE bet_placed_local_ts >= current_date() - 30
GROUP BY 
    bet_placed_local_ts,
    leg_sport_name_reporting,
    leg_competition_name_reporting,
    leg_market_name_reporting
"""

df_source = spark.sql(query)

# Validate the source data using utility function
is_valid = validate_dataframe(
    df_source,
    required_columns=['bet_date', 'sport_name', 'market_name', 'bet_count'],
    min_rows=1
)

if not is_valid:
    raise ValueError("Source data validation failed!")

print(f"✓ Source data validated: {df_source.count():,} rows")
display(df_source.limit(10))

## Step 2: Transform Data

Apply business logic and transformations.

In [0]:
# Apply transformations - all in one pass for efficiency
df_transformed = df_source \
    .withColumn("sport_name_clean", F.lower(F.trim(F.col("sport_name")))) \
    .withColumn("competition_name_clean", F.lower(F.trim(F.col("competition_name")))) \
    .withColumn("market_name_clean", F.lower(F.trim(F.col("market_name")))) \
    .withColumn("avg_stake", F.col("total_stake") / F.col("bet_count")) \
    .withColumn("sport_category", categorize_sport_udf(F.lower(F.trim(F.col("sport_name"))))) \
    .withColumn("processed_timestamp", F.current_timestamp())

# Use optimized utility function for data quality checks (single-pass aggregation)
print("=== DATA QUALITY CHECKS ===")
quality_metrics = check_data_quality(
    df_transformed,
    columns_to_check=['sport_name_clean', 'bet_count', 'total_stake', 'avg_stake']
)

# Additional validation
if quality_metrics['total_nulls'] > 0:
    print(f"⚠️  WARNING: Found {quality_metrics['total_nulls']} null values")
else:
    print("✓ No null values in key columns")

print(f"✓ Transformation complete: {quality_metrics['total_rows']:,} rows")
display(df_transformed.limit(10))

## Step 3: Check for Existing Data

Handle the case where the output table may not exist yet.

In [0]:
# Define output table
output_table = config.get_table_path("example_betting_summary")
print(f"Output table: {output_table}")

# Check if table exists and get summary stats
print("=== CHECKING EXISTING DATA ===")
try:
    existing_df = spark.table(output_table)
    
    # Use single aggregation for count and date range (optimized)
    stats = existing_df.agg(
        F.count("*").alias("row_count"),
        F.min("bet_date").alias("min_date"),
        F.max("bet_date").alias("max_date")
    ).collect()[0]
    
    existing_count = stats['row_count']
    print(f"Existing rows: {existing_count:,}")
    print(f"Date range: {stats['min_date']} to {stats['max_date']}")
    
except AnalysisException as ex:
    if "TABLE_OR_VIEW_NOT_FOUND" in str(ex):
        print("✓ Table does not exist yet. This is the first run.")
        existing_count = 0
    else:
        raise

## Step 4: Write Output Data

Use staging table pattern for safe updates.

In [0]:
# Define staging table
staging_table = config.get_table_path("example_betting_summary_staging")
print(f"Staging table: {staging_table}")

# Add audit columns using utility function
df_with_audit = add_audit_columns(df_transformed, user="databricks_job")

# Write to staging - use optimal partitioning
print("=== WRITING TO STAGING ===")
df_with_audit.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(staging_table)

staging_count = spark.table(staging_table).count()
print(f"✓ Written {staging_count:,} rows to staging table")

In [0]:
# Create or replace final table
print("=== UPDATING FINAL TABLE ===")

# For this example, always do full refresh (simpler and often faster for small tables)
# For large tables with incremental updates, use MERGE instead
print("Performing full refresh...")
spark.sql(f"""
    CREATE OR REPLACE TABLE {output_table}
    AS SELECT * FROM {staging_table}
""")

# Clean up staging table
spark.sql(f"DROP TABLE IF EXISTS {staging_table}")
print("✓ Final table updated and staging cleaned up")

## Step 5: Verify Results

Check the final output and display summary statistics.

In [0]:
# Get final stats - single aggregation for efficiency
print("=== FINAL RESULTS ===")
final_df = spark.table(output_table)

# Single aggregation for all summary stats
summary_stats = final_df.agg(
    F.count("*").alias("total_rows"),
    F.sum("bet_count").alias("total_bets"),
    F.sum("total_stake").alias("total_stake"),
    F.min("bet_date").alias("min_date"),
    F.max("bet_date").alias("max_date")
).collect()[0]

print(f"Total rows: {summary_stats['total_rows']:,}")
print(f"Total bets: {summary_stats['total_bets']:,}")
print(f"Total stake: ${summary_stats['total_stake']:,.2f}")
print(f"Date range: {summary_stats['min_date']} to {summary_stats['max_date']}")
print(f"Rows added/updated: {summary_stats['total_rows'] - existing_count:,}")

# Show summary by sport (limited to top 10)
print("\n=== TOP 10 SPORTS BY BET COUNT ===")
summary_by_sport = final_df.groupBy("sport_name_clean", "sport_category").agg(
    F.sum("bet_count").alias("total_bets"),
    F.sum("total_stake").alias("total_stake"),
    F.avg("avg_stake").alias("avg_stake_per_bet")
).orderBy(F.desc("total_bets")).limit(10)

display(summary_by_sport)

In [0]:
# Sample of final data
print("\n=== SAMPLE OUTPUT ===")
display(final_df.orderBy(F.desc("bet_date")).limit(20))

## Summary

This transformation demonstrates **optimized best practices**:

### **Performance Optimizations**
- ✅ **Aggregation pushdown** - GROUP BY in source query reduces data scan
- ✅ **Single-pass transformations** - All `.withColumn()` operations chained together
- ✅ **Optimized quality checks** - `check_data_quality()` uses single aggregation
- ✅ **Efficient stats gathering** - Combined count/min/max in one query
- ✅ **Corrupt file handling** - Spark configs to skip bad data
- ✅ **Adaptive query execution** - Enabled for automatic optimization

### **Utility Functions Used**
1. **`validate_dataframe()`** - Fast schema and row count validation
2. **`check_data_quality()`** - Single-pass null checks (optimized)
3. **`add_audit_columns()`** - Adds timestamps and user tracking
4. **`categorize_sport_udf()`** - Custom UDF for business logic

### **Best Practices**
- ✅ Uses `EnvironmentConfig` for all table paths
- ✅ Filters data at source (last 30 days)
- ✅ Validates data before processing
- ✅ Handles missing tables gracefully
- ✅ Uses staging table pattern for safe updates
- ✅ Provides clear logging and verification
- ✅ Limits display results for performance

### **When to Use Full Refresh vs Incremental**

**Full Refresh** (used here):
- Small to medium tables (< 100M rows)
- Daily or less frequent updates
- Simpler logic, easier to debug
- Often faster than MERGE for small datasets

**Incremental MERGE**:
- Large tables (> 100M rows)
- Frequent updates (hourly)
- Need to preserve history
- Append-only or upsert patterns

### **Next Steps**
1. Customize transformation logic for your use case
2. Add project-specific utilities to `utilities/example_utils.py`
3. Create job YAML to schedule this notebook
4. Test in dev, then deploy to qa and production

---

**Note**: If you get import errors after updating utilities, restart the Python kernel:
- In Databricks: **Run** → **Clear state and run all**