#USE CASE :
### We have OrderHistory received from an OLTP system, create a star schema to do advanced order and shipment analysis

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, split, trim, col, asc

In [0]:
df_order_detail = spark.read.format('csv').option("header", True).option("inferSchema", True).load('/FileStore/tables/synthetic_orders_2500.csv')

### Extracting Retailers

In [0]:
df_retailers = df_order_detail.select("Website").distinct()

In [0]:
retailerWindowSpec = Window.orderBy("Website")
df_retailers = df_retailers.withColumn("RetailerId", row_number().over(retailerWindowSpec))
df_retailers = df_retailers.withColumnRenamed("Website", "Retailer")

In [0]:
df_retailers =  df_retailers.select("RetailerId", "Retailer")

In [0]:
df_currency = df_order_detail.select("Currency").distinct()

In [0]:
currencyWindowSpec = Window.orderBy("Currency")
df_currency = df_currency.withColumn("CurrencyId", row_number().over(currencyWindowSpec))


In [0]:
df_currency = df_currency.select("CurrencyId", "Currency")

In [0]:
df_product = df_order_detail.select( "Product Condition","Product Name" ).distinct()

In [0]:
df_product = df_product.withColumn("Used Condition", trim(split("Product Condition", "-").getItem(1)))\
    .withColumn("Product Condition Changed", trim(split("Product Condition", "-").getItem(0)))

In [0]:
productWindowSpec = Window.orderBy("Product Name")
df_product = df_product.withColumn("ProductId", row_number().over(productWindowSpec) )

In [0]:
df_product = df_product.select("ProductId", "Product Name", "Product Condition", "Used Condition", "Product Condition Changed")

In [0]:
df_product = df_product.na.fill("N/A", ["Used Condition"])

In [0]:
df_paymentInstrumentType = df_order_detail.select("Payment Instrument Type").distinct()

In [0]:
windowPaymentType = Window.orderBy("Payment Instrument Type")
df_paymentInstrumentType = df_paymentInstrumentType.withColumn("PaymentTypeId", row_number().over(windowPaymentType))
df_paymentInstrumentType = df_paymentInstrumentType\
    .withColumnRenamed("Payment Instrument Type", "Payment Type")

In [0]:
df_paymentInstrumentType = df_paymentInstrumentType.select("PaymentTypeId", "Payment Type")

In [0]:
df_status = df_order_detail.select("Order Status", "Shipment Status").distinct().orderBy("Order Status", "Shipment Status")

In [0]:
statusWindow = Window.orderBy("Order Status", "Shipment Status")
df_status = df_status.withColumn("StatusId", row_number().over(statusWindow))\
    .select("StatusId", "Order Status", "Shipment Status")

In [0]:
df_customer = df_order_detail.select("Billing Address", "Shipping Address").distinct()

In [0]:
windowCustomer = Window.orderBy("Billing Address","Shipping Address" )
df_customer = df_customer.withColumn("CustomerId", row_number().over(windowCustomer))\
    .select("CustomerId", "Billing Address", "Shipping Address")

In [0]:
df_gift = df_order_detail.select("Gift Message", "Gift Recipient Contact Details", "Gift Sender Name").distinct()

In [0]:
giftWindow = Window.orderBy(col("Gift Message").asc(), "Gift Recipient Contact Details", "Gift Sender Name")

In [0]:
df_gift = df_gift.withColumn("GiftId", row_number().over(giftWindow)).select("GiftId","Gift Message", "Gift Recipient Contact Details", "Gift Sender Name")

In [0]:
df_gift =df_gift.na.fill("N/A")

In [0]:
df_od = df_order_detail.alias("od")
df_p = df_product.alias("p")
df_pay = df_paymentInstrumentType.alias("pay")
df_cust = df_customer.alias("cus")
df_status = df_status.alias("stat")
df_gift = df_gift.alias("gift")
fact_orders = (
    df_od.join(
        df_retailers, how="left", on=df_order_detail.Website == df_retailers.Retailer
    )
    .join(df_currency, how="left", on=df_order_detail.Currency == df_currency.Currency)
    .join(
        df_p,
        how="left",
        on=(col("od.`Product Name`") == col("p.`Product Name`"))
        & (col("od.`Product Condition`") == col("p.`Product Condition`"))
    )
    .join(
        df_pay,
        how="left",
        on=col("od.`Payment Instrument Type`") == col("pay.`Payment Type`")
    )
    .join(
        df_cust,
        how="left",
        on=(col("od.`Billing Address`") == col("cus.`Billing Address`"))
        & (col("od.`Shipping Address`") == col("cus.`Shipping Address`"))
    )
    .join(
        df_status,
        how="left",
        on=(col("od.`Order Status`") == col("stat.`Order Status`"))
        & (col("od.`Shipment Status`") == col("stat.`Shipment Status`"))
    )
    .join(
        df_gift,
        how="left",
        on=  (col("od.`Gift Message`") == col("gift.`Gift Message`"))
                & (col("od.`Gift Recipient Contact Details`") == col("gift.`Gift Recipient Contact Details`"))
                & (col("od.`Gift Sender Name`") == col("gift.`Gift Sender Name`"))
    )
    .select( col("od.ASIN"),
        df_retailers.RetailerId,
        df_currency.CurrencyId,
        df_product.ProductId,
        df_pay.PaymentTypeId,
        df_cust.CustomerId,
        df_status.StatusId,
        df_gift.GiftId,
        col("od.`Order ID`"),
        col("od.`Order Date`"),
        col("od.`Purchase Order Number`"),
        col("od.`Unit Price`"),
        col("od.`Unit Price Tax`"),
        col("od.`Shipping Charge`"),
        col("od.`Total Discounts`"),
        col("od.`Total Owed`"),
        col("od.`Shipment Item Subtotal Tax`"),
        col("od.`Quantity`"),
        col("od.`Ship Date`"),
        col("od.`Item Serial Number`")
    )
)

### Display the records in place of writing

In [0]:
display(df_customer)
display(df_retailers)
display(df_currency)
display(df_product)
display(df_paymentInstrumentType)
display(df_status)
display(df_gift)

CustomerId,Billing Address,Shipping Address
1,"1 Elm St, City 30, USA","983 Main St, City 37, USA"
2,"1 Elm St, City 69, USA","952 Main St, City 85, USA"
3,"1 Elm St, City 87, USA","376 Main St, City 38, USA"
4,"10 Elm St, City 22, USA","784 Main St, City 41, USA"
5,"10 Elm St, City 42, USA","341 Main St, City 27, USA"
6,"101 Elm St, City 25, USA","889 Main St, City 63, USA"
7,"101 Elm St, City 4, USA","547 Main St, City 59, USA"
8,"102 Elm St, City 12, USA","593 Main St, City 23, USA"
9,"102 Elm St, City 4, USA","936 Main St, City 40, USA"
10,"102 Elm St, City 72, USA","208 Main St, City 45, USA"


RetailerId,Retailer
1,Amazon
2,Best Buy
3,eBay


CurrencyId,Currency
1,EUR
2,GBP
3,USD


ProductId,Product Name,Product Condition,Used Condition,Product Condition Changed
1,Product-100,New,,New
2,Product-100,Used - Acceptable,Acceptable,Used
3,Product-100,Used - Good,Good,Used
4,Product-101,Used - Acceptable,Acceptable,Used
5,Product-101,Used - Good,Good,Used
6,Product-102,Used - Good,Good,Used
7,Product-104,New,,New
8,Product-104,Used - Acceptable,Acceptable,Used
9,Product-105,New,,New
10,Product-106,Used - Acceptable,Acceptable,Used


PaymentTypeId,Payment Type
1,Credit Card
2,Debit Card
3,PayPal


StatusId,Order Status,Shipment Status
1,Pending,Delivered
2,Pending,In Transit
3,Pending,Pending
4,Processing,Delivered
5,Processing,In Transit
6,Processing,Pending
7,Shipped,Delivered
8,Shipped,In Transit
9,Shipped,Pending


GiftId,Gift Message,Gift Recipient Contact Details,Gift Sender Name
1,,,
2,,,Alice
3,,,Bob
4,,,Charlie
5,,,David
6,,Emma Smith,
7,,Emma Smith,Alice
8,,Emma Smith,Bob
9,,Emma Smith,Charlie
10,,Emma Smith,David


In [0]:
display(fact_orders)

ASIN,RetailerId,CurrencyId,ProductId,PaymentTypeId,CustomerId,StatusId,GiftId,Order ID,Order Date,Purchase Order Number,Unit Price,Unit Price Tax,Shipping Charge,Total Discounts,Total Owed,Shipment Item Subtotal Tax,Quantity,Ship Date,Item Serial Number
B07905754,1,2,387,1,147,6,49.0,ORD196440,2025-02-07,PO91148,531.69,80.71,21.31,277.86,1950.92,154.56,4,2025-08-27,SN578924
B05695908,3,2,202,2,2388,3,,ORD413719,2025-12-21,PO41737,814.15,49.54,12.94,1591.05,14754.43,873.1,20,2025-06-10,SN406882
B07135704,3,3,549,1,331,2,55.0,ORD841583,2025-06-25,PO32391,794.03,145.19,34.7,119.16,854.76,70.13,1,2025-06-24,SN201103
B09784558,3,2,612,1,913,1,59.0,ORD175179,2025-07-10,PO30713,647.5,50.94,20.97,318.62,3638.29,773.93,6,2025-06-08,SN747603
B09899840,1,1,1588,1,1281,7,53.0,ORD281006,2025-08-18,PO88746,743.78,108.23,18.12,1804.65,10965.96,776.28,17,2025-11-04,SN662880
B02326346,3,1,1201,3,2154,6,,ORD974613,2025-03-20,PO88247,213.3,21.95,44.68,45.22,4074.11,355.4,19,2025-10-09,SN712696
B03085278,3,2,758,3,314,6,,ORD647014,2025-08-05,PO28107,322.2,33.22,23.06,247.16,6253.12,637.23,20,2025-07-27,SN936639
B05038898,3,1,948,3,1777,4,,ORD603833,2025-11-25,PO93828,514.89,52.4,36.96,86.66,5666.49,463.31,11,2025-11-05,SN468479
B03595115,2,2,921,2,2038,7,69.0,ORD303315,2025-11-19,PO13854,454.09,45.42,26.87,443.12,2353.71,460.9,6,2025-08-17,SN166546
B05871242,2,3,184,3,2389,7,97.0,ORD236693,2025-02-14,PO51379,336.77,49.65,5.83,68.47,3354.71,200.98,10,2025-03-08,SN549165
