In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType
from pyspark.sql import functions as F

In [0]:

%fs
ls "/mnt/pizzahut"

path,name,size,modificationTime
dbfs:/mnt/pizzahut/raw-data/,raw-data/,0,1743844722000
dbfs:/mnt/pizzahut/transformed-data/,transformed-data/,0,1743844734000


In [0]:
factsales = spark.read.format("csv").load("/mnt/pizzahut/raw-data/factsales.csv",header ="true", inferSchema = "true")
factbudget = spark.read.format("csv").load("/mnt/pizzahut/raw-data/factbudget.csv",header ="true", inferSchema = "true")
curr = spark.read.format("csv").load("/mnt/pizzahut/raw-data/dimcurrexchange.csv",header ="true", inferSchema = "true")

In [0]:

# table_name = []

# for i in dbutils.fs.ls('mnt/pizzahut/raw-data/'):
#     table_name.append(i.name.split('/')[0])

# table_name
     

['dimcurrexchange.csv', 'factbudget.csv', 'factsales.csv']

In [0]:
factsales.printSchema()

root
 |-- Store_ID: string (nullable = true)
 |-- T_Date: date (nullable = true)
 |-- Region: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- NetAmount_Local: double (nullable = true)



In [0]:
factbudget.printSchema()

root
 |-- Store_ID: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Region: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Budget_Local: double (nullable = true)



In [0]:
curr.printSchema()

root
 |-- Region: string (nullable = true)
 |-- ExchangeRate: double (nullable = true)



In [0]:
factsales.createOrReplaceTempView("FactSales")
factbudget.createOrReplaceTempView("FactBudget")
curr.createOrReplaceTempView("Curr")

In [0]:
%sql
SELECT DISTINCT Region FROM factbudget
ORDER BY Region

Region
Hong Kong
Taiwan
Vietnam


In [0]:
%sql
SELECT DISTINCT Region FROM factsales
ORDER BY Region

Region
Hong Kong
Taiwan
Vietnam


In [0]:
%sql
SELECT * FROM factbudget

Store_ID,Date,Region,Brand,Budget_Local
S0510,2024-02-10,Hong Kong,KFC,31526.83753301364
S0511,2024-02-10,Hong Kong,KFC,60786.75537281521
S0512,2024-02-10,Hong Kong,KFC,20189.382475890256
S0513,2024-02-10,Hong Kong,KFC,92602.83242314954
S0514,2024-02-10,Hong Kong,KFC,134506.21013647746
S0515,2024-02-10,Hong Kong,KFC,60417.59549804029
S0516,2024-02-10,Hong Kong,KFC,83194.30893357098
S0517,2024-02-10,Hong Kong,KFC,66611.91269215195
S0518,2024-02-10,Hong Kong,KFC,68135.77712275305
S0519,2024-02-10,Hong Kong,KFC,105205.01588622868


In [0]:
cleaned_curr = curr.filter(col("Region").isNotNull() & col("ExchangeRate").isNotNull())


In [0]:
cleaned_curr.show()

+---------+------------+
|   Region|ExchangeRate|
+---------+------------+
|  Vietnam|     25320.0|
|   Taiwan|      32.799|
|Hong Kong|    7.783196|
+---------+------------+



In [0]:
factsales = factsales.withColumnRenamed("T_Date", "Date")

In [0]:
factsales.printSchema()

root
 |-- Store_ID: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Region: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- NetAmount_Local: double (nullable = true)



In [0]:
#Create UniqueID for factsales
factsales = factsales.withColumn("Formatted_Date",
                                 F.date_format(F.col("Date"), "yyyyMMdd")
)

factsales = factsales.withColumn(
    "UniqueID", 
    F.concat(F.col("Store_ID"), F.col("Formatted_Date"))  # Concatenate Store_ID and Formatted Date
)

# Drop the 'Formatted_Date' column
factsales =factsales.drop("Formatted_Date")

In [0]:
#Create UniqueID for factbudget
factbudget = factbudget.withColumn("Formatted_Date",
                                 F.date_format(F.col("Date"), "yyyyMMdd")
)

factbudget = factbudget.withColumn(
    "UniqueID", 
    F.concat(F.col("Store_ID"), F.col("Formatted_Date"))  # Concatenate Store_ID and Formatted Date
)

# Drop the 'Formatted_Date' column
factbudget =factbudget.drop("Formatted_Date")

In [0]:
#factsales: add Exchange Rate and USDAmount
# factsales = factsales.join(cleaned_curr, factsales.Region == cleaned_curr.Region, "left")
factsales = factsales.join(cleaned_curr, "Region", "left")
factsales = factsales.withColumn("Sales_USDAmount", col("NetAmount_Local") / col("ExchangeRate"))
factsales = factsales.drop("ExchangeRate")

#factsales: add Exchange Rate and USDAmount
# factbudget = factbudget.join(cleaned_curr, factbudget.Region == cleaned_curr.Region, "left")
factbudget = factbudget.join(cleaned_curr, "Region", "left")
factbudget = factbudget.withColumn("Budget_USDAmount", col("Budget_Local") / col("ExchangeRate"))
factbudget = factbudget.drop("ExchangeRate")

In [0]:

#Dim Store
# Combine and deduplicate across both sources
store_ids = (
    factsales.select("Store_ID", "Region", "Brand")
    .union(factbudget.select("Store_ID", "Region", "Brand"))
    .dropDuplicates(["Store_ID", "Region", "Brand"])
)

In [0]:
#Dim Date
date = factsales.select("Date").union(factbudget.select("Date"))
date = date.dropDuplicates(["Date"])

In [0]:
# #Dim Brand
# brand = factsales.select("Brand").union(factbudget.select("Brand"))
# brand = brand.dropDuplicates(["Brand"])

In [0]:
factsales.write.mode("overwrite").format("delta").save("/mnt/pizzahut/transformed-data/factsales")
factbudget.write.mode("overwrite").format("delta").save("/mnt/pizzahut/transformed-data/factbudget")
cleaned_curr.write.mode("overwrite").format("delta").save("/mnt/pizzahut/transformed-data/dimcurrexchange")
store_ids.write.mode("overwrite").format("delta").save("/mnt/pizzahut/transformed-data/dimstoreids")
date.write.mode("overwrite").format("delta").save("/mnt/pizzahut/transformed-data/dimdate")
# brand.write.mode("overwrite").format("delta").save("/mnt/pizzahut/transformed-data/brand")
