# Bronze Layer Ingestion
## Microsoft Fabric Lakehouse - Scout v7 Data Ingestion

This notebook ingests data from Azure SQL Database into the Bronze layer of our Lakehouse.

**Source**: Azure SQL Database (canonical.SalesInteractionFact, dbo.PayloadTransactions)  
**Target**: Lakehouse Bronze tables (Delta format)  
**Pattern**: Full load with incremental capability


In [None]:
# Configuration and Setup
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime, timedelta
import json

# CONFIGURATION - Update these values
AZURE_SQL_SERVER = "sqltbwaprojectscoutserver.database.windows.net"
AZURE_SQL_DATABASE = "SQL-TBWA-ProjectScout-Reporting-Prod"
AZURE_SQL_PORT = 1433

# Connection will use Managed Identity or connection string from Key Vault
# Ensure your Fabric workspace has proper permissions

print(f"Starting Bronze ingestion at: {datetime.now()}")
print(f"Target server: {AZURE_SQL_SERVER}")
print(f"Target database: {AZURE_SQL_DATABASE}")

In [None]:
# Azure SQL Connection Configuration
# Use Fabric's built-in connector for Azure SQL

def get_azure_sql_options(table_name):
    """Get Azure SQL connection options for JDBC"""
    return {
        "url": f"jdbc:sqlserver://{AZURE_SQL_SERVER}:{AZURE_SQL_PORT};database={AZURE_SQL_DATABASE};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;",
        "dbtable": table_name,
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
        # Authentication will be handled by Fabric's managed identity
        "authentication": "ActiveDirectoryMSI"
    }

# Test connection
try:
    test_df = spark.read \
        .format("jdbc") \
        .options(**get_azure_sql_options("(SELECT TOP 1 1 as test)")) \
        .load()
    
    test_result = test_df.collect()[0][0]
    print(f"✅ Azure SQL connection successful. Test result: {test_result}")
except Exception as e:
    print(f"❌ Azure SQL connection failed: {str(e)}")
    print("Ensure:")
    print("1. Fabric workspace has SQL access permissions")
    print("2. Managed Identity is configured for the Lakehouse")
    print("3. Network connectivity is available")
    raise

In [None]:
# Ingest Sales Interactions (canonical.SalesInteractionFact)
print("📥 Ingesting Sales Interactions from canonical.SalesInteractionFact...")

# Define the schema based on actual table structure
sales_interactions_schema = StructType([
    StructField("interaction_id", StringType(), True),
    StructField("store_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("customer_id", StringType(), True),
    StructField("transaction_date", DateType(), True),
    StructField("transaction_time", StringType(), True),
    StructField("date_key", IntegerType(), True),
    StructField("time_key", IntegerType(), True),
    StructField("device_id", StringType(), True),
    StructField("age", ByteType(), True),
    StructField("gender", StringType(), True),
    StructField("emotional_state", StringType(), True),
    StructField("transcription_text", StringType(), True),
    StructField("barangay_id", IntegerType(), True),
    StructField("canonical_tx_id_norm", StringType(), True),
    StructField("canonical_tx_id", StringType(), True),
    StructField("persona_rule_id", IntegerType(), True),
    StructField("assigned_persona", StringType(), True),
    StructField("created_date", TimestampType(), True)
])

# Read from canonical.SalesInteractionFact
sales_interactions_df = spark.read \
    .format("jdbc") \
    .options(**get_azure_sql_options("canonical.SalesInteractionFact")) \
    .load() \
    .withColumn("ingestion_timestamp", F.current_timestamp()) \
    .withColumn("source_system", F.lit("azure_sql")) \
    .withColumn("source_table", F.lit("canonical.SalesInteractionFact"))

# Data quality checks
total_rows = sales_interactions_df.count()
unique_transactions = sales_interactions_df.select("canonical_tx_id").distinct().count()
date_range = sales_interactions_df.agg(
    F.min("transaction_date").alias("min_date"),
    F.max("transaction_date").alias("max_date")
).collect()[0]

print(f"📊 Sales Interactions loaded: {total_rows:,} rows")
print(f"📊 Unique transactions: {unique_transactions:,}")
print(f"📊 Date range: {date_range['min_date']} to {date_range['max_date']}")

# Write to Bronze layer
sales_interactions_df.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("bronze.sales_interactions_raw")

print("✅ Sales Interactions written to bronze.sales_interactions_raw")

In [None]:
# Ingest Payload Transactions (dbo.PayloadTransactions)
print("📥 Ingesting Payload Transactions from dbo.PayloadTransactions...")

# Check if PayloadTransactions table exists
payload_check_df = spark.read \
    .format("jdbc") \
    .options(**get_azure_sql_options("(SELECT COUNT(*) as row_count FROM dbo.PayloadTransactions)")) \
    .load()

payload_count = payload_check_df.collect()[0][0]
print(f"📊 PayloadTransactions available: {payload_count:,} rows")

if payload_count > 0:
    # Read PayloadTransactions with sample to check schema
    payload_sample_df = spark.read \
        .format("jdbc") \
        .options(**get_azure_sql_options("(SELECT TOP 10 * FROM dbo.PayloadTransactions)")) \
        .load()
    
    print("📊 PayloadTransactions schema:")
    payload_sample_df.printSchema()
    
    # Full load of PayloadTransactions
    payload_transactions_df = spark.read \
        .format("jdbc") \
        .options(**get_azure_sql_options("dbo.PayloadTransactions")) \
        .load() \
        .withColumn("ingestion_timestamp", F.current_timestamp()) \
        .withColumn("source_system", F.lit("azure_sql")) \
        .withColumn("source_table", F.lit("dbo.PayloadTransactions"))
    
    # Normalize canonical_tx_id to lowercase for consistency
    payload_transactions_df = payload_transactions_df.withColumn(
        "canonical_tx_id", 
        F.lower(F.col("canonical_tx_id"))
    )
    
    # Data quality checks
    payload_total_rows = payload_transactions_df.count()
    payload_unique_transactions = payload_transactions_df.select("canonical_tx_id").distinct().count()
    payload_json_valid = payload_transactions_df.filter(
        F.col("payload_json").isNotNull() & 
        (F.length(F.col("payload_json")) > 10)
    ).count()
    
    print(f"📊 Payload Transactions loaded: {payload_total_rows:,} rows")
    print(f"📊 Unique transactions: {payload_unique_transactions:,}")
    print(f"📊 Valid JSON payloads: {payload_json_valid:,} ({payload_json_valid/payload_total_rows*100:.1f}%)")
    
    # Write to Bronze layer
    payload_transactions_df.write \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable("bronze.payload_transactions_raw")
    
    print("✅ Payload Transactions written to bronze.payload_transactions_raw")
else:
    print("⚠️ No PayloadTransactions data found. Creating empty table for schema...")
    
    # Create empty schema for downstream processing
    empty_payload_df = spark.createDataFrame([], StructType([
        StructField("canonical_tx_id", StringType(), True),
        StructField("storeId", IntegerType(), True),
        StructField("amount", DecimalType(18,2), True),
        StructField("payload_json", StringType(), True),
        StructField("ingestion_timestamp", TimestampType(), True),
        StructField("source_system", StringType(), True),
        StructField("source_table", StringType(), True)
    ]))
    
    empty_payload_df.write \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable("bronze.payload_transactions_raw")
    
    print("✅ Empty PayloadTransactions schema created")

In [None]:
# Ingest Reference Data (Stores, Brands, Categories)
print("📥 Ingesting reference data...")

# Stores data
try:
    stores_df = spark.read \
        .format("jdbc") \
        .options(**get_azure_sql_options("dbo.Stores")) \
        .load() \
        .withColumn("ingestion_timestamp", F.current_timestamp())
    
    stores_count = stores_df.count()
    print(f"📊 Stores loaded: {stores_count:,} rows")
    
    stores_df.write \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable("bronze.stores_raw")
    
    print("✅ Stores written to bronze.stores_raw")
    
except Exception as e:
    print(f"⚠️ Could not load Stores table: {str(e)}")

# Brands data
try:
    brands_df = spark.read \
        .format("jdbc") \
        .options(**get_azure_sql_options("dbo.Brands")) \
        .load() \
        .withColumn("ingestion_timestamp", F.current_timestamp())
    
    brands_count = brands_df.count()
    print(f"📊 Brands loaded: {brands_count:,} rows")
    
    brands_df.write \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable("bronze.brands_raw")
    
    print("✅ Brands written to bronze.brands_raw")
    
except Exception as e:
    print(f"⚠️ Could not load Brands table: {str(e)}")

# Categories data
try:
    categories_df = spark.read \
        .format("jdbc") \
        .options(**get_azure_sql_options("dbo.Categories")) \
        .load() \
        .withColumn("ingestion_timestamp", F.current_timestamp())
    
    categories_count = categories_df.count()
    print(f"📊 Categories loaded: {categories_count:,} rows")
    
    categories_df.write \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable("bronze.categories_raw")
    
    print("✅ Categories written to bronze.categories_raw")
    
except Exception as e:
    print(f"⚠️ Could not load Categories table: {str(e)}")

In [None]:
# Data Quality Summary and Validation
print("📊 Bronze Layer Data Quality Summary")
print("=" * 50)

# Check all Bronze tables
bronze_tables = [
    "bronze.sales_interactions_raw",
    "bronze.payload_transactions_raw",
    "bronze.stores_raw",
    "bronze.brands_raw",
    "bronze.categories_raw"
]

bronze_summary = []

for table in bronze_tables:
    try:
        df = spark.table(table)
        row_count = df.count()
        col_count = len(df.columns)
        
        bronze_summary.append({
            "table": table,
            "rows": row_count,
            "columns": col_count,
            "status": "✅ Success"
        })
        
        print(f"{table}: {row_count:,} rows, {col_count} columns")
        
    except Exception as e:
        bronze_summary.append({
            "table": table,
            "rows": 0,
            "columns": 0,
            "status": f"❌ Error: {str(e)}"
        })
        
        print(f"{table}: ❌ Error - {str(e)}")

# Create summary DataFrame
summary_df = spark.createDataFrame(bronze_summary)
summary_df.show(truncate=False)

# Save ingestion metadata
ingestion_metadata = {
    "ingestion_id": datetime.now().strftime("%Y%m%d_%H%M%S"),
    "ingestion_timestamp": datetime.now().isoformat(),
    "source_system": "azure_sql",
    "source_server": AZURE_SQL_SERVER,
    "source_database": AZURE_SQL_DATABASE,
    "tables_processed": bronze_summary,
    "total_rows_ingested": sum([item["rows"] for item in bronze_summary]),
    "status": "completed"
}

# Save metadata as JSON
metadata_df = spark.createDataFrame([ingestion_metadata])
metadata_df.write \
    .mode("append") \
    .saveAsTable("bronze.ingestion_metadata")

print("\n" + "=" * 50)
print(f"🎉 Bronze ingestion completed successfully!")
print(f"📊 Total rows ingested: {ingestion_metadata['total_rows_ingested']:,}")
print(f"⏰ Completed at: {datetime.now()}")
print("\nNext step: Run 02_silver_transformation.ipynb")
print("=" * 50)