# Load Data to SQL Pool
This notebook loads transformed data from Silver layer to Dedicated SQL Pool.

## Process
1. Read from Silver layer
2. Apply final transformations
3. Load into dimension tables (SCD Type 2)
4. Load into fact tables

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime

In [None]:
# Initialize Spark session
spark = SparkSession.builder.appName("LoadToSQLPool").getOrCreate()
print(f"Load process started at: {datetime.now()}")

In [None]:
# Configuration
storage_account_name = "your_storage_account_name"
container_name = "synapsefs"
silver_data_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/silver/"

# SQL Pool connection details
sql_pool_name = "EnterpriseDW"
sql_server_name = "your_synapse_workspace.sql.azuresynapse.net"
sql_username = "sqladmin"
sql_password = "your_password"  # Use Key Vault in production

# JDBC connection string
jdbc_url = f"jdbc:sqlserver://{sql_server_name}:1433;database={sql_pool_name};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;"

connection_properties = {
    "user": sql_username,
    "password": sql_password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

print(f"Silver data path: {silver_data_path}")
print(f"SQL Pool: {sql_pool_name}")

## Read Silver Data

In [None]:
# Read Silver layer data
customer_silver = spark.read.format("delta").load(f"{silver_data_path}customers/")
product_silver = spark.read.format("delta").load(f"{silver_data_path}products/")
sales_silver = spark.read.format("delta").load(f"{silver_data_path}sales/")

print("Silver data loaded:")
print(f"Customers: {customer_silver.count()}")
print(f"Products: {product_silver.count()}")
print(f"Sales: {sales_silver.count()}")

## Prepare Dimension Data

In [None]:
# Prepare customer dimension for SCD Type 2
dim_customer = customer_silver.select(
    col("CustomerID"),
    col("CustomerName"),
    col("CustomerType"),
    col("Email"),
    col("Phone"),
    col("City"),
    col("State"),
    col("Country"),
    col("Region"),
    lit(None).cast("string").alias("CustomerSegment"),
    current_date().alias("EffectiveDate"),
    lit(None).cast("date").alias("EndDate"),
    lit(True).alias("IsCurrent")
)

print(f"Prepared {dim_customer.count()} customer dimension records")

In [None]:
# Prepare product dimension for SCD Type 2
dim_product = product_silver.select(
    col("ProductID"),
    col("ProductName"),
    col("ProductDescription"),
    col("Category"),
    col("SubCategory"),
    col("Brand"),
    col("SKU"),
    col("UnitPrice"),
    col("StandardCost"),
    col("ListPrice"),
    current_date().alias("EffectiveDate"),
    lit(None).cast("date").alias("EndDate"),
    lit(True).alias("IsCurrent")
)

print(f"Prepared {dim_product.count()} product dimension records")

## Prepare Fact Data

In [None]:
# Prepare sales fact data
fact_sales = sales_silver.select(
    col("OrderID"),
    col("OrderLineNumber"),
    regexp_replace(date_format(col("OrderDate"), "yyyyMMdd"), "-", "").cast("int").alias("DateKey"),
    col("CustomerKey"),
    col("ProductKey"),
    col("StoreKey"),
    col("EmployeeKey"),
    col("PromotionKey"),
    col("Quantity"),
    col("UnitPrice"),
    col("UnitCost"),
    col("DiscountAmount"),
    col("TaxAmount"),
    col("net_revenue").alias("SalesAmount"),
    col("total_cost").alias("CostAmount"),
    col("gross_profit").alias("GrossProfitAmount"),
    col("gross_profit").alias("NetProfitAmount"),
    col("OrderDate"),
    col("ShipDate"),
    col("PaymentMethod"),
    col("ShippingMethod")
)

print(f"Prepared {fact_sales.count()} sales fact records")

## Load to SQL Pool Using Staging Tables

In [None]:
# Function to write dataframe to SQL Pool
def write_to_sql_pool(df, table_name, write_mode="append"):
    """
    Write DataFrame to SQL Pool
    Args:
        df: Spark DataFrame
        table_name: Target table name (schema.table)
        write_mode: append or overwrite
    """
    try:
        print(f"Writing {df.count()} records to {table_name}...")
        
        # Write to SQL Pool using JDBC
        df.write \
            .mode(write_mode) \
            .jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
        
        print(f"Successfully wrote to {table_name}")
        return True
    except Exception as e:
        print(f"Error writing to {table_name}: {str(e)}")
        return False

In [None]:
# Alternative: Use COPY INTO for better performance
def load_using_copy_into(df, table_name, staging_path):
    """
    Load data using COPY INTO for better performance
    Args:
        df: Spark DataFrame
        table_name: Target table name
        staging_path: Staging path in ADLS
    """
    # Write to staging location as Parquet
    df.write.mode("overwrite").parquet(staging_path)
    
    # Execute COPY INTO command
    copy_sql = f"""
    COPY INTO {table_name}
    FROM '{staging_path}'
    WITH (
        FILE_TYPE = 'PARQUET',
        CREDENTIAL = (IDENTITY = 'Managed Identity')
    )
    """
    
    print(f"Executing COPY INTO for {table_name}...")
    # Execute using JDBC or SQLAlchemy
    # spark.sql(copy_sql)  # If using linked service

## Load Dimension Tables

In [None]:
# Load customer dimension (staging table for SCD Type 2 processing)
write_to_sql_pool(dim_customer, "staging.CustomerStaging", "overwrite")
print("Customer dimension staged")

In [None]:
# Load product dimension (staging table for SCD Type 2 processing)
write_to_sql_pool(dim_product, "staging.ProductStaging", "overwrite")
print("Product dimension staged")

## Load Fact Tables

In [None]:
# Load sales fact data
write_to_sql_pool(fact_sales, "fact.FactSales", "append")
print("Sales fact data loaded")

## Execute SQL Procedures for SCD Type 2

In [None]:
# Execute stored procedures for SCD Type 2 updates
# This would typically be done via a pipeline activity
print("SCD Type 2 updates should be executed via SQL stored procedures:")
print("- dim.UpdateCustomerSCD")
print("- dim.UpdateProductSCD")
print("\nExecute these procedures in the Synapse SQL Pool")

## Verify Load

In [None]:
# Read back from SQL Pool to verify
try:
    sales_verify = spark.read \
        .jdbc(url=jdbc_url, table="fact.FactSales", properties=connection_properties)
    
    print(f"Verified {sales_verify.count()} records in fact.FactSales")
    print("\nSample records:")
    sales_verify.show(5)
except Exception as e:
    print(f"Error verifying load: {str(e)}")

## Summary

In [None]:
# Print load summary
print("=" * 70)
print("Data Load Summary")
print("=" * 70)
print(f"Load completed at: {datetime.now()}")
print("\nRecords Loaded:")
print(f"  Customers (staged): {dim_customer.count()}")
print(f"  Products (staged): {dim_product.count()}")
print(f"  Sales facts: {fact_sales.count()}")
print("\nNext Steps:")
print("  1. Execute SCD Type 2 stored procedures")
print("  2. Update statistics")
print("  3. Refresh aggregated views")
print("  4. Validate data quality")
print("=" * 70)