In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [None]:
spark = SparkSession.builder.appName("Retail").getOrCreate()

In [None]:
retail_df = spark.read.table("denormalized_Retail.superstore")
retail_df = retail_df.withColumn("Order_Date", to_date(col("Order_Date"), "dd/MM/yyyy")) \
            .withColumn("Ship_Date", to_date(col("Ship_Date"), "dd/MM/yyyy"))

In [None]:
def DIM_Customer(retail_df):
    dimCustomer= retail_df\
        .select("Customer_ID"
                ,"Customer_Name"
                ,"Segment"
               )
    return dimCustomer

In [None]:
def DIM_Product(retail_df):
    dimProduct= retail_df\
        .select("Product_ID"
                ,"Product_Name"
                ,"Category"
                ,"Sub_Category"
               )
    return dimProduct

In [None]:
def DIM_Location(retail_df):
    dimLocation= retail_df\
        .withColumn(
        "Location_ID",
        md5(concat(col("Country"), col("City"), col("State"), col("Postal_Code"),col("Region")))) \
        .select("Location_ID"
                    ,"Country"
                    ,"City"
                    ,"State"
                    ,"Postal_Code"
                    ,"Region"
                   )
    return dimLocation

In [None]:
def DIM_Date(retail_df):
    dates = retail_df.select(col("Order_Date"),col("Ship_Date")) 
             
    dimDate = dates.select(
    date_format("Order_Date", "yyyyMMdd").alias("Order_DateKey"),
    date_format("Order_Date", "yyyy-MM-dd").alias("Order_Date"),
    date_format("Order_Date", "yyyy").alias("Order_Year"),
    date_format("Order_Date", "MM").alias("Order_Month"),
    date_format("Order_Date", "dd").alias("Order_Day"),
    quarter("Order_Date").alias("Order_Quarter"),
    date_format("Ship_Date", "yyyyMMdd").alias("Ship_DateKey"),
    date_format("Ship_Date", "yyyy-MM-dd").alias("Ship_Date"),
    date_format("Ship_Date", "yyyy").alias("Ship_Year"),
    date_format("Ship_Date", "MM").alias("Ship_Month"),
    date_format("Ship_Date", "dd").alias("Ship_Day"),
    quarter("Ship_Date").alias("Ship_Quarter"))
    return dimDate

In [None]:
def DIM_Ship(retail_df):
    window_spec = Window.orderBy("Ship_Mode")
    dimShip= retail_df.select("Ship_Mode", dense_rank().over(window_spec).alias("Ship_Key")).distinct()
    return dimShip

In [None]:
def Fact_retail(retail_df):
    window_spec = Window.orderBy("Ship_Mode")
    factRetail = retail_df \
        .withColumn(
            "Location_ID",
            md5(concat(col("Country"), col("City"), col("State"), col("Postal_Code"), col("Region")))
        ) \
        .select(
            "Order_ID",
            "Customer_ID",
            "Product_ID",
            "Location_ID",
            dense_rank().over(window_spec).alias("Ship_Key"),
            date_format("Order_Date", "yyyyMMdd").alias("Order_DateKey"),
            date_format("Ship_Date", "yyyyMMdd").alias("Ship_DateKey"),
            "Sales"
        )
    return factRetail

In [None]:
fact = Fact_retail(retail_df)
fact.write.mode("append").saveAsTable("retail_dwh.orders_fact")

In [None]:
customers_dim = DIM_Customer(retail_df)
customers_dim.write.mode("append").saveAsTable("retail_dwh.customer_dim")

In [None]:
products_dim = DIM_Product(retail_df)
products_dim.write.mode("append").saveAsTable("retail_dwh.product_dim")

In [None]:
Location_dim = DIM_Location(retail_df)
Location_dim.write.mode("append").saveAsTable("retail_dwh.location_dim")

In [None]:
ship_dim = DIM_Ship(retail_df)
ship_dim.write.mode("append").saveAsTable("retail_dwh.ship_dim")

In [None]:
date_dim = DIM_Date(retail_df)
date_dim.write.mode("append").saveAsTable("retail_dwh.date_dim")