In [0]:
# Import necessary libraries for Delta Lake operations
from delta.tables import DeltaTable
from pyspark.sql.functions import col, monotonically_increasing_id, max as spark_max

print("✓ Libraries imported successfully")

In [0]:
# Parameter to control initial vs incremental load
# '0' = Initial load (first time)
# '1' = Incremental load (subsequent runs)
incremental_flag = '0'

print(f"Load Type: {'Initial Load' if incremental_flag == '0' else 'Incremental Load'}")

In [0]:
# Read the source sales data from parquet files
# This contains all the transactional sales data
df_source = spark.sql("""
    SELECT 
        Branch_ID,
        Dealer_ID,
        Model_ID,
        Revenue,
        Units_Sold,
        Year,
        Month,
        Day
    FROM parquet.`abfss://silver@swap01storageaccount.dfs.core.windows.net/carsalesdata`
""")

print(f"Source data loaded: {df_source.count()} records")
df_source.display()

In [0]:
# Create a proper date column from Year, Month, Day
# This will be used to join with the date dimension
from pyspark.sql.functions import to_date, concat_ws, lpad

df_source = df_source.withColumn(
    'Date', 
    to_date(concat_ws('-', col('Year'), lpad(col('Month'), 2, '0'), lpad(col('Day'), 2, '0')))
)

print("✓ Date column created")
df_source.select('Year', 'Month', 'Day', 'Date').display()

In [0]:
# Join with branch dimension table to get dim_branch_key
# This replaces the business key (Branch_ID) with surrogate key
df_branch = spark.table('bmw_cars_catalog.gold.branch')

df_with_branch = df_source.join(
    df_branch.select('Branch_ID', 'dim_branch_key'),
    on='Branch_ID',
    how='inner'
)

print(f"✓ Joined with Branch dimension: {df_with_branch.count()} records")
df_with_branch.select('Branch_ID', 'dim_branch_key', 'Revenue', 'Units_Sold').display()

In [0]:
# Join with dealer dimension table to get dim_dealer_key
# This replaces the business key (Dealer_ID) with surrogate key
df_dealer = spark.table('bmw_cars_catalog.gold.dealer')

df_with_dealer = df_with_branch.join(
    df_dealer.select('Dealer_ID', 'dim_dealer_key'),
    on='Dealer_ID',
    how='inner'
)

print(f"✓ Joined with Dealer dimension: {df_with_dealer.count()} records")
df_with_dealer.select('Dealer_ID', 'dim_dealer_key', 'Branch_ID', 'dim_branch_key').display()

In [0]:
# Join with model dimension table to get dim_model_key
# This replaces the business key (Model_ID) with surrogate key
df_model = spark.table('bmw_cars_catalog.gold.model')

df_with_model = df_with_dealer.join(
    df_model.select('Model_ID', 'dim_model_key'),
    on='Model_ID',
    how='inner'
)

print(f"✓ Joined with Model dimension: {df_with_model.count()} records")
df_with_model.select('Model_ID', 'dim_model_key', 'Revenue', 'Units_Sold').display()

In [0]:
# Join with date dimension table to get dim_date_key
# This replaces the date with surrogate key
df_date = spark.table('bmw_cars_catalog.gold.dim_date')

df_with_date = df_with_model.join(
    df_date.select('Date', 'dim_date_key'),
    on='Date',
    how='inner'
)

print(f"✓ Joined with Date dimension: {df_with_date.count()} records")
df_with_date.select('Date', 'dim_date_key', 'Revenue', 'Units_Sold').display()

In [0]:
# Select only the columns needed for the fact table:
# - Surrogate keys from all dimensions
# - Measures (Revenue, Units_Sold)
df_fact = df_with_date.select(
    col('dim_branch_key'),
    col('dim_dealer_key'),
    col('dim_model_key'),
    col('dim_date_key'),
    col('Revenue'),
    col('Units_Sold')
)

print(f"✓ Fact table structure created: {df_fact.count()} records")
print("\nFact Table Schema:")
df_fact.printSchema()
df_fact.display()

In [0]:
# Add a surrogate key for the fact table itself
# This uniquely identifies each sales transaction

if incremental_flag == '0':
    # Initial load: start from 1
    max_fact_key = 1
else:
    # Incremental load: get the max existing key
    max_fact_key = spark.sql('SELECT max(fact_sales_key) FROM bmw_cars_catalog.gold.fact_sales').collect()[0][0]
    if max_fact_key is None:
        max_fact_key = 1

df_fact = df_fact.withColumn('fact_sales_key', max_fact_key + monotonically_increasing_id())

print(f"✓ Fact surrogate key added (starting from {max_fact_key})")
df_fact.select('fact_sales_key', 'dim_branch_key', 'dim_dealer_key', 'dim_model_key', 'dim_date_key', 'Revenue', 'Units_Sold').display()

In [0]:
# Load the fact table into Delta Lake
# Initial load: Create the table
# Incremental load: Append new records

if spark.catalog.tableExists('bmw_cars_catalog.gold.fact_sales'):
    # Incremental load: Append new data
    df_fact.write.format("delta") \
        .mode('append') \
        .option('path', 'abfss://silver@swap01storageaccount.dfs.core.windows.net/fact_sales') \
        .saveAsTable('bmw_cars_catalog.gold.fact_sales')
    print("✓ Incremental load completed - New records appended")
else:
    # Initial load: Create table
    df_fact.write.format("delta") \
        .mode('overwrite') \
        .option('path', 'abfss://silver@swap01storageaccount.dfs.core.windows.net/fact_sales') \
        .saveAsTable('bmw_cars_catalog.gold.fact_sales')
    print("✓ Initial load completed - Fact table created")

print(f"\nTotal records in fact table: {spark.table('bmw_cars_catalog.gold.fact_sales').count()}")

In [0]:
# Verify the fact table by joining back with dimensions
# This shows the complete business view

fact_verification = spark.sql("""
    SELECT 
        f.fact_sales_key,
        b.BranchName,
        d.DealerName,
        m.Model_Category,
        dt.Date,
        dt.DayName,
        dt.MonthName,
        dt.Year,
        f.Revenue,
        f.Units_Sold
    FROM bmw_cars_catalog.gold.fact_sales f
    INNER JOIN bmw_cars_catalog.gold.branch b ON f.dim_branch_key = b.dim_branch_key
    INNER JOIN bmw_cars_catalog.gold.dealer d ON f.dim_dealer_key = d.dim_dealer_key
    INNER JOIN bmw_cars_catalog.gold.model m ON f.dim_model_key = m.dim_model_key
    INNER JOIN bmw_cars_catalog.gold.dim_date dt ON f.dim_date_key = dt.dim_date_key
    ORDER BY f.fact_sales_key
    LIMIT 100
""")

print("✓ Fact table verification - Sample records with dimension details:")
fact_verification.display()