In [0]:
%sql
--Create Fact Sales Table
CREATE TABLE IF NOT EXISTS main.ecommerce.fact_sales (
    order_id STRING,
    order_item_id STRING,
    customer_id STRING,
    product_id STRING,

    order_purchase_timestamp TIMESTAMP,
    order_date DATE,

    price DOUBLE,
    freight_value DOUBLE,
    revenue DOUBLE,

    customer_state STRING,
    product_category_name STRING,

    payment_value DOUBLE,
    order_status STRING,
    load_date DATE
)
USING DELTA;

In [0]:
from pyspark.sql.functions import *

# Read Delta Table
orders_df = spark.table("main.ecommerce.orders")
order_items_df = spark.table("main.ecommerce.items")
payments_df = spark.table("main.ecommerce.payments")

# Read Delta Table
orders_df = spark.table("main.ecommerce.orders")
items_df = spark.table("main.ecommerce.items")
payments_df = spark.table("main.ecommerce.payments")

# Read Dimension Delta Table
dim_customers_df = spark.table("main.ecommerce.dim_customer").filter("is_active = 'Y'")
dim_products_df = spark.table("main.ecommerce.dim_product").filter("is_active = 'Y'")

# Join order and order_items, ensure unique column names
order_items_df = order_items_df.select(
    "order_id",
    col("order_item_id").alias("order_item_id"),
    "product_id",
    "price",
    "freight_value"
)
# Join order and order_items 
sales_df = orders_df.join(
    order_items_df,
    on="order_id",
    how="inner"
)
# Join customer & product dimensions
sales_df = (
    sales_df
    .join(dim_customers_df, on ="customer_id",how = "left")
    .join(dim_products_df, on = "product_id",how =  "left")
)

#display(sales_df)


# Join payments
sales_df = sales_df.join(
    payments_df.groupBy("order_id")
    .agg(sum("payment_value").alias("payment_value")),
    on = "order_id",
    how = "left"
)
# Derive final columns
fact_sales_df = (
    sales_df
    .select(
        col("order_id"),
        col("order_item_id"),
        col("customer_id"),
        col("product_id"),

        col("order_purchase_timestamp"),
        to_date("order_purchase_timestamp").alias("order_date"),

        col("price"),
        col("freight_value"),
        (col("price") + col("freight_value")).alias("revenue"),

        col("customer_state"),
        col("product_category_name"),

        col("payment_value"),
        col("order_status"),

        current_date().alias("load_date")
    )
)

#sales_df.printSchema()

fact_sales_df.printSchema()

# Write to Delta Table
fact_sales_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.ecommerce.fact_sales")