In [0]:
%sql

-- Create dimension tables for SCD-2:

CREATE TABLE IF NOT EXISTS real_time_projects.ecommerce_historical.dim_customers (
    customer_id STRING,
    customer_unique_id STRING,
    customer_zip_code_prefix INT,
    customer_city STRING,
    customer_state STRING,

    effective_start_date DATE,
    effective_end_date DATE,
    is_active STRING
)
USING DELTA;


CREATE TABLE IF NOT EXISTS real_time_projects.ecommerce_historical.dim_products (
    product_id STRING,
    product_category_name STRING,
    product_name_lenght INT,
    product_description_lenght INT,
    product_photos_qty INT,
    product_weight_g INT,
    product_length_cm INT,
    product_height_cm INT,
    product_width_cm INT,

    effective_start_date DATE,
    effective_end_date DATE,
    is_active STRING
)
USING DELTA;


In [0]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable

# Existing Customer Delta Table
customers_src_df = spark.table("real_time_projects.ecommerce_historical.customers")   # existing delta table

# Create Customer Stage
customer_stage_df = (
    customers_src_df
    .withColumn("effective_start_date", current_date())
    .withColumn("effective_end_date", lit("9999-12-31"))
    .withColumn("is_active", lit("Y"))
)

# SCD-2 Merge Customer
dim_customers = DeltaTable.forName(spark, "real_time_projects.ecommerce_historical.dim_customers")

merge_condition = """
    t.customer_id = s.customer_id
    AND t.is_active = 'Y'
"""

dim_customers.alias("t").merge(
    customer_stage_df.alias("s"),
    merge_condition
).whenMatchedUpdate(
    condition="""
        t.customer_unique_id <> s.customer_unique_id OR
        t.customer_zip_code_prefix <> s.customer_zip_code_prefix OR
        t.customer_city <> s.customer_city OR
        t.customer_state <> s.customer_state
    """,
    set={
        "effective_end_date": "current_date()",
        "is_active": "'N'"
    }
).whenNotMatchedInsert(
    values={
        "customer_id": "s.customer_id",
        "customer_unique_id": "s.customer_unique_id",
        "customer_zip_code_prefix": "s.customer_zip_code_prefix",
        "customer_city": "s.customer_city",
        "customer_state": "s.customer_state",
        "effective_start_date": "s.effective_start_date",
        "effective_end_date": "s.effective_end_date",
        "is_active": "s.is_active"
    }
).execute()


In [0]:
df = spark.sql("select * from real_time_projects.ecommerce_historical.dim_customers limit 10")

display(df)

In [0]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable

# Existing Customer Delta Table
product_src_df = spark.table("real_time_projects.ecommerce_historical.products")

# Create Product Stage
product_stage_df = (
    product_src_df
    .withColumn("effective_start_date", current_date())
    .withColumn("effective_end_date", lit("9999-12-31"))
    .withColumn("is_active", lit("Y"))
)

# SCD-2 Merge Product
dim_products = DeltaTable.forName(spark, "real_time_projects.ecommerce_historical.dim_products")

merge_condition = """
    t.product_id = s.product_id
    AND t.is_active = 'Y'
""".

dim_products.alias("t").merge(
    product_stage_df.alias("s"),
    merge_condition
).whenMatchedUpdate(
    condition="""
        t.product_category_name <> s.product_category_name OR
        t.product_name_lenght <> s.product_name_lenght OR
        t.product_description_lenght <> s.product_description_lenght OR
        t.product_photos_qty <> s.product_photos_qty OR
        t.product_weight_g <> s.product_weight_g OR
        t.product_length_cm <> s.product_length_cm OR
        t.product_height_cm <> s.product_height_cm OR
        t.product_width_cm <> s.product_width_cm
    """,
    set={
        "effective_end_date": "current_date()",
        "is_active": "'N'"
    }
).whenNotMatchedInsert(
    values={
        "product_id": "s.product_id",
        "product_category_name": "s.product_category_name",
        "product_name_lenght": "s.product_name_lenght",
        "product_description_lenght": "s.product_description_lenght",
        "product_photos_qty": "s.product_photos_qty",
        "product_weight_g": "s.product_weight_g",
        "product_length_cm": "s.product_length_cm",
        "product_height_cm": "s.product_height_cm",
        "product_width_cm": "s.product_width_cm",
        "effective_start_date": "s.effective_start_date",
        "effective_end_date": "s.effective_end_date",
        "is_active": "s.is_active"
    }
).execute()


In [0]:
df = spark.sql("select * from real_time_projects.ecommerce_historical.dim_products limit 10")

display(df)

In [0]:
%sql

--Create Fact Sales Table
CREATE TABLE IF NOT EXISTS real_time_projects.ecommerce_historical.fact_sales (
    order_id STRING,
    order_item_id STRING,
    customer_id STRING,
    product_id STRING,

    order_purchase_timestamp TIMESTAMP,
    order_date DATE,

    price DOUBLE,
    freight_value DOUBLE,
    revenue DOUBLE,

    customer_state STRING,
    product_category_name STRING,

    payment_value DOUBLE,
    order_status STRING,
    load_date DATE
)
USING DELTA;

In [0]:
from pyspark.sql.functions import *

# Read Delta Table
orders_df = spark.table("real_time_projects.ecommerce_historical.orders")
order_items_df = spark.table("real_time_projects.ecommerce_historical.order_items")
payments_df = spark.table("real_time_projects.ecommerce_historical.payments")

# Read Dimension Delta Table
dim_customers_df = spark.table("real_time_projects.ecommerce_historical.dim_customers").filter("is_active = 'Y'")
dim_products_df = spark.table("real_time_projects.ecommerce_historical.dim_products").filter("is_active = 'Y'")

# Join order and order_items, ensure unique column names
order_items_df = order_items_df.select(
    "order_id",
    col("order_item_id").alias("order_item_id"),
    "product_id",
    "price",
    "freight_value"
)

# Join order and order_items 
sales_df = (
    orders_df
    .join(
        order_items_df,
        "order_id",
        "inner"
    ) 
)

# Join customer & product dimensions
sales_df = (
    sales_df
    .join(dim_customers_df, "customer_id", "left")
    .join(dim_products_df, "product_id", "left")
)

# Join payments
sales_df = sales_df.join(
    payments_df.groupBy("order_id")
        .agg(sum("payment_value").alias("payment_value")),
    "order_id",
    "left"
)

#sales_df.printSchema()

# Derive final columns
fact_sales_df = (
    sales_df
    .select(
        col("order_id"),
        col("order_item_id"),
        col("customer_id"),
        col("product_id"),

        col("order_purchase_timestamp"),
        to_date("order_purchase_timestamp").alias("order_date"),

        col("price"),
        col("freight_value"),
        (col("price") + col("freight_value")).alias("revenue"),

        col("customer_state"),
        col("product_category_name"),

        col("payment_value"),
        col("order_status"),

        current_date().alias("load_date")
    )
)

fact_sales_df.printSchema()

# Write to Delta Table
fact_sales_df.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("real_time_projects.ecommerce_historical.fact_sales")


In [0]:
df = spark.sql("select * from real_time_projects.ecommerce_historical.fact_sales limit 10")

display(df)

In [0]:
%sql
SELECT order_id, COUNT(*) 
FROM real_time_projects.ecommerce_historical.fact_sales
GROUP BY order_id
ORDER BY COUNT(*) DESC;
