# Silver Layer Transformations

## Configuration and Imports

In [6]:
# Import PySpark libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta.tables import *

# Configure timestamp handling for Fabric Runtime
spark.conf.set("spark.sql.session.timeZone", "UTC")
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED")


StatementMeta(, eaa4542b-cb99-455d-b659-cf1ee5a6ab87, 8, Finished, Available, Finished)

## Customers - SCD2 Dimension

In [None]:
# Define schema for customer data
customersSchema = StructType([
    StructField("customer_id", StringType()),
    StructField("customer_name", StringType()),
    StructField("email", StringType()),
    StructField("location", StringType()),
    StructField("signup_date", DateType())
])

# Load customer data from Bronze layer
df_customers = spark.read.format("csv").option("header", "true").schema(customersSchema).load("Files/bronze_customers/customers.csv")

# Data quality transformations
df_customers = df_customers \
    .withColumn("customer_name", when(col("customer_name").isNull() | (col("customer_name") == ""), lit("Unknown")).otherwise(col("customer_name"))) \
    .withColumn("email", lower(trim(col("email")))) \
    .withColumn("location", when(col("location").isNull() | (col("location") == ""), lit("Unknown")).otherwise(col("location")))

# Add SCD2 tracking columns and row hash for change detection
current_ts = current_timestamp()
high_date = lit("9999-12-31 23:59:59").cast("timestamp")

df_customers = df_customers \
    .withColumn("source_file_name", input_file_name()) \
    .withColumn("valid_from", current_ts) \
    .withColumn("valid_to", high_date) \
    .withColumn("is_active", lit(True)) \
    .withColumn("created_at", current_ts) \
    .withColumn("updated_at", current_ts) \
    .withColumn("row_hash", hash(col("customer_name"), col("email"), col("location")))

# Create customers_silver table if not exists
DeltaTable.createIfNotExists(spark) \
    .tableName("customers_silver") \
    .addColumn("customer_id", StringType()) \
    .addColumn("customer_name", StringType()) \
    .addColumn("email", StringType()) \
    .addColumn("location", StringType()) \
    .addColumn("signup_date", DateType()) \
    .addColumn("source_file_name", StringType()) \
    .addColumn("valid_from", TimestampType()) \
    .addColumn("valid_to", TimestampType()) \
    .addColumn("is_active", BooleanType()) \
    .addColumn("created_at", TimestampType()) \
    .addColumn("updated_at", TimestampType()) \
    .addColumn("row_hash", LongType()) \
    .execute()

# SCD2 implementation: Close old records on changes
deltaTable = DeltaTable.forPath(spark, 'Tables/customers_silver')

deltaTable.alias('target') \
    .merge(
        df_customers.alias('source'),
        'target.customer_id = source.customer_id AND target.is_active = true'
    ) \
    .whenMatchedUpdate(
        condition="target.row_hash != source.row_hash",
        set={
            "is_active": "false",
            "valid_to": "current_timestamp()",
            "updated_at": "current_timestamp()"
        }
    ) \
    .whenNotMatchedInsertAll() \
    .execute()

# Insert new active versions for changed customers
df_customers.createOrReplaceTempView("temp_new_customers")

spark.sql("""
    INSERT INTO customers_silver
    SELECT source.*
    FROM temp_new_customers source
    WHERE EXISTS (
        SELECT 1
        FROM customers_silver target
        WHERE target.customer_id = source.customer_id
            AND target.is_active = false
            AND target.updated_at >= current_timestamp() - INTERVAL 10 SECONDS
    )
""")

StatementMeta(, eaa4542b-cb99-455d-b659-cf1ee5a6ab87, 9, Finished, Available, Finished)

DataFrame[]

## Products - SCD2 Dimension

In [None]:
# Define schema for product data
productsSchema = StructType([
    StructField("product_key", IntegerType()),
    StructField("product_id", StringType()),
    StructField("product_name", StringType()),
    StructField("category", StringType()),
    StructField("stock", IntegerType()),
    StructField("unit_price", DecimalType(10, 2)),
    StructField("sales_price", DecimalType(10, 2))
])

# Load product data from Bronze layer
df_products = spark.read.format("csv").option("header", "true").schema(productsSchema).load("Files/bronze_products/products.csv")

# Data quality transformations
df_products = df_products.drop("product_key") \
    .withColumn("product_name", when(col("product_name").isNull() | (col("product_name") == ""), lit("Unknown")).otherwise(trim(col("product_name")))) \
    .withColumn("category", when(col("category").isNull() | (col("category") == ""), lit("Uncategorized")).otherwise(trim(col("category")))) \
    .withColumn("stock", when(col("stock").isNull(), lit(0)).otherwise(col("stock"))) \
    .withColumn("unit_price", when(col("unit_price").isNull(), lit(0.00).cast(DecimalType(10, 2))).otherwise(col("unit_price"))) \
    .withColumn("sales_price", when(col("sales_price").isNull(), lit(0.00).cast(DecimalType(10, 2))).otherwise(col("sales_price"))) \

# Add SCD2 tracking columns and row hash for change detection
current_ts = current_timestamp()
high_date = lit("9999-12-31 23:59:59").cast("timestamp")

df_products = df_products \
    .withColumn("source_file_name", input_file_name()) \
    .withColumn("valid_from", current_ts) \
    .withColumn("valid_to", high_date) \
    .withColumn("is_active", lit(True)) \
    .withColumn("created_at", current_ts) \
    .withColumn("updated_at", current_ts) \
    .withColumn("row_hash", hash(col("product_name"), col("category"), col("stock"), col("unit_price"), col("sales_price")))

# Create products_silver table if not exists
DeltaTable.createIfNotExists(spark) \
    .tableName("products_silver") \
    .addColumn("product_id", StringType()) \
    .addColumn("product_name", StringType()) \
    .addColumn("category", StringType()) \
    .addColumn("stock", IntegerType()) \
    .addColumn("unit_price", DecimalType(10, 2)) \
    .addColumn("sales_price", DecimalType(10, 2)) \
    .addColumn("source_file_name", StringType()) \
    .addColumn("valid_from", TimestampType()) \
    .addColumn("valid_to", TimestampType()) \
    .addColumn("is_active", BooleanType()) \
    .addColumn("created_at", TimestampType()) \
    .addColumn("updated_at", TimestampType()) \
    .addColumn("row_hash", LongType()) \
    .execute()

# SCD2 implementation: Close old records on changes
deltaTable = DeltaTable.forPath(spark, 'Tables/products_silver')

deltaTable.alias('target') \
    .merge(
        df_products.alias('source'),
        'target.product_id = source.product_id AND target.is_active = true'
    ) \
    .whenMatchedUpdate(
        condition="target.row_hash != source.row_hash",
        set={
            "is_active": "false",
            "valid_to": "current_timestamp()",
            "updated_at": "current_timestamp()"
        }
    ) \
    .whenNotMatchedInsertAll() \
    .execute()

# Insert new active versions for changed products
df_products.createOrReplaceTempView("temp_new_products")

spark.sql("""
    INSERT INTO products_silver
    SELECT source.*
    FROM temp_new_products source
    WHERE EXISTS (
        SELECT 1
        FROM products_silver target
        WHERE target.product_id = source.product_id
            AND target.is_active = false
            AND target.updated_at >= current_timestamp() - INTERVAL 10 SECONDS
    )
""")

## Orders Header - Transaction Data

In [9]:
# Define schema for order header data
ordersHeaderSchema = StructType([
    StructField("order_id", StringType()),
    StructField("customer_id", StringType()),
    StructField("order_date", DateType()),
    StructField("payment_method", StringType()),
    StructField("order_total", DecimalType(12, 2))
])

# Load order header data from Bronze layer
df_orders_header = spark.read.format("csv").option("header", "true").schema(ordersHeaderSchema).load("Files/bronze_pos_orders_header/orders_header.csv")

# Data quality transformations and derived attributes
df_orders_header = df_orders_header \
    .withColumn("payment_method", when(col("payment_method").isNull() | (col("payment_method") == ""), lit("Unknown")).otherwise(upper(trim(col("payment_method"))))) \
    .withColumn("order_total", when(col("order_total").isNull() | (col("order_total") <= 0), lit(0.00).cast(DecimalType(12, 2))).otherwise(col("order_total"))) \

# Add audit columns
current_ts = current_timestamp()
df_orders_header = df_orders_header \
    .withColumn("source_file_name", input_file_name()) \
    .withColumn("created_at", current_ts) \
    .withColumn("updated_at", current_ts)

# Create orders_header_silver table if not exists
DeltaTable.createIfNotExists(spark) \
    .tableName("orders_header_silver") \
    .addColumn("order_id", StringType()) \
    .addColumn("customer_id", StringType()) \
    .addColumn("order_date", DateType()) \
    .addColumn("payment_method", StringType()) \
    .addColumn("order_total", DecimalType(12, 2)) \
    .addColumn("source_file_name", StringType()) \
    .addColumn("created_at", TimestampType()) \
    .addColumn("updated_at", TimestampType()) \
    .execute()

# Idempotent merge: Update existing orders or insert new ones
deltaTable = DeltaTable.forPath(spark, 'Tables/orders_header_silver')

deltaTable.alias('target') \
    .merge(
        df_orders_header.alias('source'),
        'target.order_id = source.order_id'
    ) \
    .whenMatchedUpdate(
        condition="target.payment_method != source.payment_method OR target.order_total != source.order_total",
        set={
            "customer_id": "source.customer_id",
            "order_date": "source.order_date",
            "payment_method": "source.payment_method",
            "order_total": "source.order_total",
            "updated_at": "current_timestamp()"
        }
    ) \
    .whenNotMatchedInsertAll() \
    .execute()

StatementMeta(, eaa4542b-cb99-455d-b659-cf1ee5a6ab87, 11, Finished, Available, Finished)

## Order Lines - Transaction Data

In [10]:
# Define schema for order line data
orderLinesSchema = StructType([
    StructField("order_line_id", StringType()),
    StructField("order_id", StringType()),
    StructField("line_number", IntegerType()),
    StructField("product_id", StringType()),
    StructField("quantity", IntegerType()),
    StructField("unit_price", DecimalType(10, 2)),
    StructField("discount_amount", DecimalType(10, 2)),
    StructField("tax_amount", DecimalType(10, 2)),
    StructField("line_total", DecimalType(12, 2))
])

# Load order line data from Bronze layer
df_order_lines = spark.read.format("csv").option("header", "true").schema(orderLinesSchema).load("Files/bronze_pos_order_lines/order_lines.csv")

# Data quality transformations
df_order_lines = df_order_lines \
    .withColumn("quantity", when(col("quantity").isNull() | (col("quantity") <= 0), lit(1)).otherwise(col("quantity"))) \
    .withColumn("unit_price", when(col("unit_price").isNull(), lit(0.00).cast(DecimalType(10, 2))).otherwise(col("unit_price"))) \
    .withColumn("discount_amount", when(col("discount_amount").isNull(), lit(0.00).cast(DecimalType(10, 2))).otherwise(col("discount_amount"))) \
    .withColumn("tax_amount", when(col("tax_amount").isNull(), lit(0.00).cast(DecimalType(10, 2))).otherwise(col("tax_amount"))) \
    .withColumn("line_total", when(col("line_total").isNull(), lit(0.00).cast(DecimalType(12, 2))).otherwise(col("line_total")))

# Add audit columns
current_ts = current_timestamp()
df_order_lines = df_order_lines \
    .withColumn("source_file_name", input_file_name()) \
    .withColumn("created_at", current_ts) \
    .withColumn("updated_at", current_ts)

# Create order_lines_silver table if not exists
DeltaTable.createIfNotExists(spark) \
    .tableName("order_lines_silver") \
    .addColumn("order_line_id", StringType()) \
    .addColumn("order_id", StringType()) \
    .addColumn("line_number", IntegerType()) \
    .addColumn("product_id", StringType()) \
    .addColumn("quantity", IntegerType()) \
    .addColumn("unit_price", DecimalType(10, 2)) \
    .addColumn("discount_amount", DecimalType(10, 2)) \
    .addColumn("tax_amount", DecimalType(10, 2)) \
    .addColumn("line_total", DecimalType(12, 2)) \
    .addColumn("source_file_name", StringType()) \
    .addColumn("created_at", TimestampType()) \
    .addColumn("updated_at", TimestampType()) \
    .execute()

# Idempotent merge: Update existing order lines or insert new ones
deltaTable = DeltaTable.forPath(spark, 'Tables/order_lines_silver')

deltaTable.alias('target') \
    .merge(
        df_order_lines.alias('source'),
        'target.order_line_id = source.order_line_id'
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

StatementMeta(, eaa4542b-cb99-455d-b659-cf1ee5a6ab87, 12, Finished, Available, Finished)

## Reviews - Immutable Events

In [None]:
# Define schema for review data
reviewsSchema = StructType([
    StructField("customer_id", StringType()),
    StructField("product_id", StringType()),
    StructField("rating", IntegerType()),
    StructField("review_text", StringType()),
    StructField("timestamp", LongType())
])

# Load review data from Bronze layer
df_reviews = spark.read.schema(reviewsSchema).json("Files/bronze_reviews/reviews.json")

# Data quality filters
df_reviews = df_reviews \
    .filter(col("customer_id").isNotNull() & (col("customer_id") != "")) \
    .filter(col("product_id").isNotNull() & (col("product_id") != "")) \
    .filter(col("rating").isNotNull() & (col("rating") >= 1) & (col("rating") <= 5)) \
    .filter(col("timestamp").isNotNull())

# Transformations
df_reviews = df_reviews \
    .withColumn("timestamp", from_unixtime(col("timestamp") / 1000).cast("timestamp")) \
    .withColumn("review_date", col("timestamp").cast("date"))

# Add audit columns
current_ts = current_timestamp()
df_reviews = df_reviews \
    .withColumn("source_file_name", input_file_name()) \
    .withColumn("created_at", current_ts) \
    .withColumn("updated_at", current_ts)

# Create reviews_silver table if not exists
DeltaTable.createIfNotExists(spark) \
    .tableName("reviews_silver") \
    .addColumn("customer_id", StringType()) \
    .addColumn("product_id", StringType()) \
    .addColumn("timestamp", TimestampType()) \
    .addColumn("review_date", DateType()) \
    .addColumn("rating", IntegerType()) \
    .addColumn("review_text", StringType()) \
    .addColumn("source_file_name", StringType()) \
    .addColumn("created_at", TimestampType()) \
    .addColumn("updated_at", TimestampType()) \
    .execute()

# Idempotent merge: Prevent duplicate reviews
deltaTable = DeltaTable.forPath(spark, 'Tables/reviews_silver')

deltaTable.alias('target') \
    .merge(
        df_reviews.alias('source'),
        'target.customer_id = source.customer_id AND target.product_id = source.product_id AND target.timestamp = source.timestamp'
    ) \
    .whenNotMatchedInsertAll() \
    .execute()

StatementMeta(, eaa4542b-cb99-455d-b659-cf1ee5a6ab87, 13, Finished, Available, Finished)

## Social Media - Immutable Events

In [None]:
# Define schema for social media data
socialMediaSchema = StructType([
    StructField("content", StringType()),
    StructField("is_bot_like", BooleanType()),
    StructField("platform", StringType()),
    StructField("sentiment", StringType()),
    StructField("timestamp", LongType())
])

# Load social media data from Bronze layer
df_social_media = spark.read.schema(socialMediaSchema).json("Files/bronze_social_media/social_media.json")

# Data quality filters
df_social_media = df_social_media \
    .filter(col("timestamp").isNotNull()) \
    .filter(col("platform").isNotNull() & (col("platform") != "")) \
    .filter(col("content").isNotNull() & (col("content") != "")) \
    .filter(col("sentiment").isNotNull() & col("sentiment").isin(["positive", "negative", "neutral"]))

# Transformations
df_social_media = df_social_media \
    .withColumn("timestamp", from_unixtime(col("timestamp") / 1000).cast("timestamp")) \
    .withColumn("post_date", col("timestamp").cast("date"))

# Add audit columns
current_ts = current_timestamp()
df_social_media = df_social_media \
    .withColumn("source_file_name", input_file_name()) \
    .withColumn("created_at", current_ts) \
    .withColumn("updated_at", current_ts)

# Create social_media_silver table if not exists
DeltaTable.createIfNotExists(spark) \
    .tableName("social_media_silver") \
    .addColumn("timestamp", TimestampType()) \
    .addColumn("post_date", DateType()) \
    .addColumn("platform", StringType()) \
    .addColumn("sentiment", StringType()) \
    .addColumn("content", StringType()) \
    .addColumn("is_bot_like", BooleanType()) \
    .addColumn("source_file_name", StringType()) \
    .addColumn("created_at", TimestampType()) \
    .addColumn("updated_at", TimestampType()) \
    .execute()

# Idempotent merge: Prevent duplicate posts
deltaTable = DeltaTable.forPath(spark, 'Tables/social_media_silver')

deltaTable.alias('target') \
    .merge(
        df_social_media.alias('source'),
        'target.timestamp = source.timestamp AND target.platform = source.platform AND target.content = source.content'
    ) \
    .whenNotMatchedInsertAll() \
    .execute()

StatementMeta(, eaa4542b-cb99-455d-b659-cf1ee5a6ab87, 14, Finished, Available, Finished)

## Web Logs - Immutable Events

In [None]:
# Define schema for web log data
webLogsSchema = StructType([
    StructField("timestamp", LongType()),
    StructField("user_id", StringType()),
    StructField("page", StringType()),
    StructField("action", StringType()),
])

# Load web log data from Bronze layer
df_web_logs = spark.read.schema(webLogsSchema).json("Files/bronze_web_logs/web_logs.json")

# Data quality filters
df_web_logs = df_web_logs \
    .filter(col("timestamp").isNotNull()) \
    .filter(col("user_id").isNotNull() & (col("user_id") != "")) \
    .filter(col("page").isNotNull() & (col("page") != "")) \
    .filter(col("action").isNotNull() & (col("action") != ""))

# Transformations
df_web_logs = df_web_logs \
    .withColumn("timestamp", from_unixtime(col("timestamp") / 1000).cast("timestamp")) \
    .withColumn("event_date", col("timestamp").cast("date")) \
    .withColumnRenamed("user_id", "customer_id") \
    .withColumn("page", trim(col("page"))) \
    .withColumn("action", lower(trim(col("action"))))

# Add audit columns
current_ts = current_timestamp()
df_web_logs = df_web_logs \
    .withColumn("source_file_name", input_file_name()) \
    .withColumn("created_at", current_ts) \
    .withColumn("updated_at", current_ts)

# Create web_logs_silver table if not exists
DeltaTable.createIfNotExists(spark) \
    .tableName("web_logs_silver") \
    .addColumn("timestamp", TimestampType()) \
    .addColumn("event_date", DateType()) \
    .addColumn("customer_id", StringType()) \
    .addColumn("page", StringType()) \
    .addColumn("action", StringType()) \
    .addColumn("source_file_name", StringType()) \
    .addColumn("created_at", TimestampType()) \
    .addColumn("updated_at", TimestampType()) \
    .execute()

# Idempotent merge: Prevent duplicate events
deltaTable = DeltaTable.forPath(spark, 'Tables/web_logs_silver')

deltaTable.alias('target') \
    .merge(
        df_web_logs.alias('source'),
        'target.customer_id = source.customer_id AND target.timestamp = source.timestamp'
    ) \
    .whenNotMatchedInsertAll() \
    .execute()

StatementMeta(, , -1, Finished, , Finished)

LivyHttpRequestFailure: Something went wrong while processing your request. Please try again later. HTTP status code: 500. Trace ID: 2c86e573-aae4-4068-b2b0-4ca23741ac63.