# PySpark Star Schema Transformation

**Purpose:** Transform bronze sales data into a star schema for Power BI

**Source:** `pyspark_bronze_sales` (Lakehouse - 3.6M rows)

**Target:** `sales_warehouse.pyspark_transformed.*`

**Tables Created:**
- 1 Fact Table: `fact_orders`
- 10 Dimension Tables: `dim_date`, `dim_time`, `dim_customer`, `dim_driver`, `dim_restaurant`, `dim_location`, `dim_payment_method`, `dim_order_status`, `dim_device`, `dim_promo`

In [1]:
import time
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Configuration
SOURCE_TABLE = "pyspark_bronze_sales"
TARGET_WAREHOUSE = "sales_warehouse"
TARGET_SCHEMA = "pyspark_transformed"

print("="*60)
print("PYSPARK STAR SCHEMA TRANSFORMATION")
print("="*60)
print(f"Source: {SOURCE_TABLE}")
print(f"Target: {TARGET_WAREHOUSE}.{TARGET_SCHEMA}")
print("="*60)

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 3, Finished, Available, Finished)

PYSPARK STAR SCHEMA TRANSFORMATION
Source: pyspark_bronze_sales
Target: sales_warehouse.pyspark_transformed


In [2]:
# target schema in Warehouse
print(f"Schema '{TARGET_SCHEMA}' ready in {TARGET_WAREHOUSE}")

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 4, Finished, Available, Finished)

Schema 'pyspark_transformed' ready in sales_warehouse


In [3]:
# Start timing
start_time = time.time()

# Read source data
print("Reading source data...")
df_bronze = spark.table(SOURCE_TABLE)
source_count = df_bronze.count()
print(f"Source rows: {source_count:,}")

# Cache for performance (we'll use it multiple times)
df_bronze.cache()
print("Data cached for transformation")

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 5, Finished, Available, Finished)

Reading source data...
Source rows: 3,600,000
Data cached for transformation


In [4]:
# ============================================
# DIMENSION: dim_date
# ============================================
print("\nCreating dim_date...")

dim_date = df_bronze.select(
    F.col("order_date").cast("date").alias("date"),
    F.col("order_year").alias("year"),
    F.col("order_month").alias("month"),
    F.col("order_day").alias("day"),
    F.col("order_day_of_week").alias("day_of_week"),
    F.col("is_weekend")
).distinct()

# Add surrogate key
dim_date = dim_date.withColumn(
    "date_key",
    F.expr("CAST(date_format(date, 'yyyyMMdd') AS INT)")
)

# Add derived columns
dim_date = dim_date.withColumn("quarter", F.quarter("date"))
dim_date = dim_date.withColumn("month_name", F.date_format("date", "MMMM"))
dim_date = dim_date.withColumn("day_name", F.date_format("date", "EEEE"))

# Reorder columns
dim_date = dim_date.select(
    "date_key", "date", "year", "quarter", "month", "month_name",
    "day", "day_of_week", "day_name", "is_weekend"
)

print(f"dim_date rows: {dim_date.count():,}")

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 6, Finished, Available, Finished)


Creating dim_date...
dim_date rows: 1,096


In [5]:
# ============================================
# DIMENSION: dim_time
# ============================================
print("\nCreating dim_time...")

dim_time = df_bronze.select(
    F.col("order_hour").alias("hour")
).distinct()

# Add surrogate key and derived columns
dim_time = dim_time.withColumn("time_key", F.col("hour"))
dim_time = dim_time.withColumn(
    "period",
    F.when(F.col("hour") < 6, "Night")
     .when(F.col("hour") < 12, "Morning")
     .when(F.col("hour") < 17, "Afternoon")
     .when(F.col("hour") < 21, "Evening")
     .otherwise("Night")
)
dim_time = dim_time.withColumn(
    "is_business_hours",
    F.when((F.col("hour") >= 9) & (F.col("hour") < 17), True).otherwise(False)
)

dim_time = dim_time.select("time_key", "hour", "period", "is_business_hours")

print(f"dim_time rows: {dim_time.count():,}")

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 7, Finished, Available, Finished)


Creating dim_time...
dim_time rows: 24


In [6]:
# ============================================
# DIMENSION: dim_customer
# ============================================
print("\nCreating dim_customer...")

dim_customer = df_bronze.select(
    F.col("customer_id")
).distinct()

# Add surrogate key
window_spec = Window.orderBy("customer_id")
dim_customer = dim_customer.withColumn("customer_key", F.row_number().over(window_spec))

dim_customer = dim_customer.select("customer_key", "customer_id")

print(f"dim_customer rows: {dim_customer.count():,}")

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 8, Finished, Available, Finished)


Creating dim_customer...
dim_customer rows: 499,643


In [7]:
# ============================================
# DIMENSION: dim_driver
# ============================================
print("\nCreating dim_driver...")

dim_driver = df_bronze.select(
    F.col("driver_id")
).distinct()

# Add surrogate key
window_spec = Window.orderBy("driver_id")
dim_driver = dim_driver.withColumn("driver_key", F.row_number().over(window_spec))

dim_driver = dim_driver.select("driver_key", "driver_id")

print(f"dim_driver rows: {dim_driver.count():,}")

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 9, Finished, Available, Finished)


Creating dim_driver...
dim_driver rows: 25,000


In [8]:
# ============================================
# DIMENSION: dim_restaurant
# ============================================
print("\nCreating dim_restaurant...")

dim_restaurant = df_bronze.select(
    F.col("restaurant_id"),
    F.col("restaurant_type")
).distinct()

# Add surrogate key
window_spec = Window.orderBy("restaurant_id")
dim_restaurant = dim_restaurant.withColumn("restaurant_key", F.row_number().over(window_spec))

dim_restaurant = dim_restaurant.select("restaurant_key", "restaurant_id", "restaurant_type")

print(f"dim_restaurant rows: {dim_restaurant.count():,}")

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 10, Finished, Available, Finished)


Creating dim_restaurant...
dim_restaurant rows: 150,000


In [9]:
# ============================================
# DIMENSION: dim_location
# ============================================
print("\nCreating dim_location...")

dim_location = df_bronze.select(
    F.col("city"),
    F.col("state"),
    F.col("country")
).distinct()

# Add surrogate key
window_spec = Window.orderBy("country", "state", "city")
dim_location = dim_location.withColumn("location_key", F.row_number().over(window_spec))

dim_location = dim_location.select("location_key", "city", "state", "country")

print(f"dim_location rows: {dim_location.count():,}")

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 11, Finished, Available, Finished)


Creating dim_location...
dim_location rows: 10


In [10]:
# ============================================
# DIMENSION: dim_payment_method
# ============================================
print("\nCreating dim_payment_method...")

dim_payment_method = df_bronze.select(
    F.col("payment_method")
).distinct()

# Add surrogate key
window_spec = Window.orderBy("payment_method")
dim_payment_method = dim_payment_method.withColumn("payment_method_key", F.row_number().over(window_spec))

dim_payment_method = dim_payment_method.select("payment_method_key", "payment_method")

print(f"dim_payment_method rows: {dim_payment_method.count():,}")

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 12, Finished, Available, Finished)


Creating dim_payment_method...
dim_payment_method rows: 5


In [11]:
# ============================================
# DIMENSION: dim_order_status
# ============================================
print("\nCreating dim_order_status...")

dim_order_status = df_bronze.select(
    F.col("order_status")
).distinct()

# Add surrogate key
window_spec = Window.orderBy("order_status")
dim_order_status = dim_order_status.withColumn("order_status_key", F.row_number().over(window_spec))

dim_order_status = dim_order_status.select("order_status_key", "order_status")

print(f"dim_order_status rows: {dim_order_status.count():,}")

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 13, Finished, Available, Finished)


Creating dim_order_status...
dim_order_status rows: 3


In [12]:
# ============================================
# DIMENSION: dim_device
# ============================================
print("\nCreating dim_device...")

dim_device = df_bronze.select(
    F.col("device_type")
).distinct()

# Add surrogate key
window_spec = Window.orderBy("device_type")
dim_device = dim_device.withColumn("device_key", F.row_number().over(window_spec))

dim_device = dim_device.select("device_key", "device_type")

print(f"dim_device rows: {dim_device.count():,}")

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 14, Finished, Available, Finished)


Creating dim_device...
dim_device rows: 3


In [13]:
# ============================================
# DIMENSION: dim_promo
# ============================================
print("\nCreating dim_promo...")

dim_promo = df_bronze.select(
    F.col("promo_code")
).distinct()

# Handle null promo codes
dim_promo = dim_promo.withColumn(
    "promo_code",
    F.coalesce(F.col("promo_code"), F.lit("NO_PROMO"))
)

# Add surrogate key
window_spec = Window.orderBy("promo_code")
dim_promo = dim_promo.withColumn("promo_key", F.row_number().over(window_spec))

# Add promo flag
dim_promo = dim_promo.withColumn(
    "has_promo",
    F.when(F.col("promo_code") != "NO_PROMO", True).otherwise(False)
)

dim_promo = dim_promo.select("promo_key", "promo_code", "has_promo")

print(f"dim_promo rows: {dim_promo.count():,}")

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 15, Finished, Available, Finished)


Creating dim_promo...
dim_promo rows: 5


In [14]:
# ============================================
# FIX: Deduplicate dimension tables properly
# ============================================
print("Fixing dimension tables with proper deduplication...")

# dim_restaurant - keep first restaurant_type per restaurant_id
dim_restaurant = df_bronze.select(
    F.col("restaurant_id"),
    F.col("restaurant_type")
).dropDuplicates(["restaurant_id"])

window_spec = Window.orderBy("restaurant_id")
dim_restaurant = dim_restaurant.withColumn("restaurant_key", F.row_number().over(window_spec))
dim_restaurant = dim_restaurant.select("restaurant_key", "restaurant_id", "restaurant_type")
print(f"dim_restaurant rows: {dim_restaurant.count():,}")

# dim_location - ensure unique city/state/country
dim_location = df_bronze.select(
    F.col("city"),
    F.col("state"),
    F.col("country")
).dropDuplicates(["city", "state", "country"])

window_spec = Window.orderBy("country", "state", "city")
dim_location = dim_location.withColumn("location_key", F.row_number().over(window_spec))
dim_location = dim_location.select("location_key", "city", "state", "country")
print(f"dim_location rows: {dim_location.count():,}")

# dim_promo - handle nulls BEFORE distinct
dim_promo = df_bronze.select(
    F.coalesce(F.col("promo_code"), F.lit("NO_PROMO")).alias("promo_code")
).distinct()

window_spec = Window.orderBy("promo_code")
dim_promo = dim_promo.withColumn("promo_key", F.row_number().over(window_spec))
dim_promo = dim_promo.withColumn(
    "has_promo",
    F.when(F.col("promo_code") != "NO_PROMO", True).otherwise(False)
)
dim_promo = dim_promo.select("promo_key", "promo_code", "has_promo")
print(f"dim_promo rows: {dim_promo.count():,}")

print("\nDimension tables fixed. Now re-run the fact table cell.")


StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 16, Finished, Available, Finished)

Fixing dimension tables with proper deduplication...
dim_restaurant rows: 10,000
dim_location rows: 10
dim_promo rows: 5

Dimension tables fixed. Now re-run the fact table cell.


In [15]:
# ============================================
# FACT TABLE: fact_orders
# ============================================
print("\nCreating fact_orders...")

# Start with bronze data
fact_orders = df_bronze

# Add date_key
fact_orders = fact_orders.withColumn(
    "date_key",
    F.expr("CAST(date_format(order_date, 'yyyyMMdd') AS INT)")
)

# Add time_key
fact_orders = fact_orders.withColumn("time_key", F.col("order_hour"))

# Join with dim_customer (broadcast small dims for performance)
fact_orders = fact_orders.join(
    F.broadcast(dim_customer),
    fact_orders["customer_id"] == dim_customer["customer_id"],
    "left"
).drop(dim_customer["customer_id"])

# Join with dim_driver
fact_orders = fact_orders.join(
    F.broadcast(dim_driver),
    fact_orders["driver_id"] == dim_driver["driver_id"],
    "left"
).drop(dim_driver["driver_id"])

# Join with dim_restaurant
fact_orders = fact_orders.join(
    F.broadcast(dim_restaurant),
    fact_orders["restaurant_id"] == dim_restaurant["restaurant_id"],
    "left"
).drop(dim_restaurant["restaurant_id"]).drop(dim_restaurant["restaurant_type"])

# Join with dim_location
fact_orders = fact_orders.join(
    F.broadcast(dim_location),
    (fact_orders["city"] == dim_location["city"]) & 
    (fact_orders["state"] == dim_location["state"]) & 
    (fact_orders["country"] == dim_location["country"]),
    "left"
).drop(dim_location["city"]).drop(dim_location["state"]).drop(dim_location["country"])

# Join with dim_payment_method
fact_orders = fact_orders.join(
    F.broadcast(dim_payment_method),
    fact_orders["payment_method"] == dim_payment_method["payment_method"],
    "left"
).drop(dim_payment_method["payment_method"])

# Join with dim_order_status
fact_orders = fact_orders.join(
    F.broadcast(dim_order_status),
    fact_orders["order_status"] == dim_order_status["order_status"],
    "left"
).drop(dim_order_status["order_status"])

# Join with dim_device
fact_orders = fact_orders.join(
    F.broadcast(dim_device),
    fact_orders["device_type"] == dim_device["device_type"],
    "left"
).drop(dim_device["device_type"])

# Join with dim_promo
fact_orders = fact_orders.withColumn(
    "promo_code_clean",
    F.coalesce(F.col("promo_code"), F.lit("NO_PROMO"))
)
fact_orders = fact_orders.join(
    F.broadcast(dim_promo),
    fact_orders["promo_code_clean"] == dim_promo["promo_code"],
    "left"
).drop(dim_promo["promo_code"]).drop("promo_code_clean")

print("Dimension keys joined")


StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 17, Finished, Available, Finished)


Creating fact_orders...
Dimension keys joined


In [16]:
# Select final fact table columns
fact_orders_final = fact_orders.select(
    # Primary Key
    F.col("order_id"),
    
    # Foreign Keys (Dimension Keys)
    F.col("date_key"),
    F.col("time_key"),
    F.col("customer_key"),
    F.col("driver_key"),
    F.col("restaurant_key"),
    F.col("location_key"),
    F.col("payment_method_key"),
    F.col("order_status_key"),
    F.col("device_key"),
    F.col("promo_key"),
    
    # Measures - Amounts (Additive)
    F.col("subtotal").cast("decimal(10,2)"),
    F.col("delivery_fee").cast("decimal(10,2)"),
    F.col("service_fee").cast("decimal(10,2)"),
    F.col("tax_amount").cast("decimal(10,2)"),
    F.col("tip_amount").cast("decimal(10,2)"),
    F.col("discount_amount").cast("decimal(10,2)"),
    F.col("total_amount").cast("decimal(10,2)"),
    
    # Measures - Counts & Times (Additive)
    F.col("item_count").cast("int"),
    F.col("prep_time_minutes").cast("int"),
    F.col("delivery_time_minutes").cast("int"),
    F.col("total_time_minutes").cast("int"),
    F.col("delivery_distance_miles").cast("decimal(10,2)"),
    
    # Measures - Ratings (Semi-Additive)
    F.col("customer_rating").cast("decimal(3,1)"),
    F.col("driver_rating").cast("decimal(3,1)"),
    
    # Flags
    F.col("is_first_order").cast("boolean"),
    F.col("is_reorder").cast("boolean")
)

fact_count = fact_orders_final.count()
print(f"fact_orders rows: {fact_count:,}")

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 18, Finished, Available, Finished)

fact_orders rows: 3,600,000


In [17]:
# ============================================
# WRITE TO WAREHOUSE
# ============================================
print("\n" + "="*60)
print("WRITING TO WAREHOUSE")
print("="*60)

write_start = time.time()

# Helper function to write to warehouse
def write_to_warehouse(df, table_name):
    full_table_name = f"{TARGET_WAREHOUSE}.{TARGET_SCHEMA}.{table_name}"
    print(f"Writing {table_name}...", end=" ")
    
    # Create temp view and use SQL to write to warehouse
    df.createOrReplaceTempView(f"temp_{table_name}")
    
    # Drop if exists and create new table
    spark.sql(f"DROP TABLE IF EXISTS {full_table_name}")
    spark.sql(f"CREATE TABLE {full_table_name} AS SELECT * FROM temp_{table_name}")
    
    row_count = spark.table(full_table_name).count()
    print(f"{row_count:,} rows")
    return row_count

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 19, Finished, Available, Finished)


WRITING TO WAREHOUSE


In [18]:
# ============================================
# STEP 1: IMPORT REQUIRED LIBRARIES
# ============================================
# These imports are REQUIRED before using synapsesql
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# The key import for warehouse connector
# (This should already be available in Runtime 1.3+)

print("Imports complete - Runtime 1.3+ required")


StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 20, Finished, Available, Finished)

Imports complete - Runtime 1.3+ required


In [19]:
import com.microsoft.spark.fabric
from com.microsoft.spark.fabric.Constants import Constants

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 21, Finished, Available, Finished)

In [20]:
# ============================================
# CELL 1: RESTART SESSION & ENABLE CONNECTOR
# ============================================
# Run this FIRST after starting a fresh session

# Check if synapsesql is available
try:
    # Test with empty dataframe
    test_df = spark.createDataFrame([(1,)], ["id"])
    # Check if method exists
    hasattr(test_df.write, 'synapsesql')
    print("synapsesql method available:", hasattr(test_df.write, 'synapsesql'))
except Exception as e:
    print(f"Error: {e}")

# Alternative: Try format-based approach
print("\nTrying format-based write...")


StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 22, Finished, Available, Finished)

synapsesql method available: True

Trying format-based write...


In [21]:
# ============================================
# STEP 2: WRITE TO WAREHOUSE
# ============================================
print("\n" + "="*60)
print("WRITING TO WAREHOUSE")
print("="*60)

write_start = time.time()

# Schema must already exist (you created pyspark_transformed manually)
TARGET_WAREHOUSE = "sales_warehouse"
TARGET_SCHEMA = "pyspark_transformed"

def write_to_warehouse(df, table_name):
    full_table_name = f"{TARGET_WAREHOUSE}.{TARGET_SCHEMA}.{table_name}"
    print(f"Writing {table_name}...", end=" ")
    
    # Correct syntax from video: df.write.mode().synapsesql()
    df.write.mode("overwrite").synapsesql(full_table_name)
    
    print("done")

# Write all dimension tables
print("\nWriting dimension tables...")
write_to_warehouse(dim_date, "dim_date")
write_to_warehouse(dim_time, "dim_time")
write_to_warehouse(dim_customer, "dim_customer")
write_to_warehouse(dim_driver, "dim_driver")
write_to_warehouse(dim_restaurant, "dim_restaurant")
write_to_warehouse(dim_location, "dim_location")
write_to_warehouse(dim_payment_method, "dim_payment_method")
write_to_warehouse(dim_order_status, "dim_order_status")
write_to_warehouse(dim_device, "dim_device")
write_to_warehouse(dim_promo, "dim_promo")

# Write fact table
print("\nWriting fact table...")
write_to_warehouse(fact_orders_final, "fact_orders")

write_time = time.time() - write_start
total_time = time.time() - start_time

print(f"\nWrite completed in {write_time:.2f} seconds")
print(f"TOTAL TIME: {total_time:.2f} seconds")


StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 23, Finished, Available, Finished)


WRITING TO WAREHOUSE

Writing dimension tables...
Writing dim_date... done
Writing dim_time... done
Writing dim_customer... done
Writing dim_driver... done
Writing dim_restaurant... done
Writing dim_location... done
Writing dim_payment_method... done
Writing dim_order_status... done
Writing dim_device... done
Writing dim_promo... done

Writing fact table...
Writing fact_orders... done

Write completed in 70.98 seconds
TOTAL TIME: 138.57 seconds


In [22]:
# Uncache data
df_bronze.unpersist()
print("\nCache cleared. Star schema transformation complete!")

StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 24, Finished, Available, Finished)


Cache cleared. Star schema transformation complete!


In [23]:
# ============================================
# VERIFY TABLES IN WAREHOUSE (using synapsesql read)
# ============================================
print("\nVerifying tables in Warehouse...")

tables = ["dim_date", "dim_time", "dim_customer", "dim_driver", "dim_restaurant",
          "dim_location", "dim_payment_method", "dim_order_status", "dim_device", 
          "dim_promo", "fact_orders"]

for table in tables:
    full_name = f"{TARGET_WAREHOUSE}.{TARGET_SCHEMA}.{table}"
    try:
        count = spark.read.synapsesql(full_name).count()
        print(f"  {table}: {count:,} rows")
    except Exception as e:
        print(f"  {table}: ERROR - {e}")

print("\nVerification complete!")


StatementMeta(, cf7c9881-a928-444b-be7f-c3d4d889855c, 25, Finished, Available, Finished)


Verifying tables in Warehouse...
  dim_date: 1,096 rows
  dim_time: 24 rows
  dim_customer: 499,643 rows
  dim_driver: 25,000 rows
  dim_restaurant: 10,000 rows
  dim_location: 10 rows
  dim_payment_method: 5 rows
  dim_order_status: 3 rows
  dim_device: 3 rows
  dim_promo: 5 rows
  fact_orders: 3,600,000 rows

Verification complete!
