In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

## Create bronze layer

In [2]:
from pyspark.sql.types import *

    
 # Create the schema for the table
orderSchema = StructType([
     StructField("SalesOrderNumber", StringType()),
     StructField("SalesOrderLineNumber", IntegerType()),
     StructField("OrderDate", DateType()),
     StructField("CustomerName", StringType()),
     StructField("Email", StringType()),
     StructField("Item", StringType()),
     StructField("Quantity", IntegerType()),
     StructField("UnitPrice", FloatType()),
     StructField("Tax", FloatType())
     ])
    
 # Import all files from bronze folder of lakehouse
df = spark.read.format("csv").option("header", "true").schema(orderSchema).load("data\*.csv")
    
 # Display the first 10 rows of the dataframe to preview your data
display(df.head(10))

[Row(SalesOrderNumber='SO49172', SalesOrderLineNumber=1, OrderDate=datetime.date(2021, 1, 1), CustomerName='Brian Howard', Email='brian23@adventure-works.com', Item='Road-250 Red, 44', Quantity=1, UnitPrice=2443.35009765625, Tax=195.46800231933594),
 Row(SalesOrderNumber='SO49173', SalesOrderLineNumber=1, OrderDate=datetime.date(2021, 1, 1), CustomerName='Linda Alvarez', Email='linda19@adventure-works.com', Item='Mountain-200 Silver, 38', Quantity=1, UnitPrice=2071.419677734375, Tax=165.71359252929688),
 Row(SalesOrderNumber='SO49174', SalesOrderLineNumber=1, OrderDate=datetime.date(2021, 1, 1), CustomerName='Gina Hernandez', Email='gina4@adventure-works.com', Item='Mountain-200 Silver, 42', Quantity=1, UnitPrice=2071.419677734375, Tax=165.71359252929688),
 Row(SalesOrderNumber='SO49178', SalesOrderLineNumber=1, OrderDate=datetime.date(2021, 1, 1), CustomerName='Beth Ruiz', Email='beth4@adventure-works.com', Item='Road-550-W Yellow, 44', Quantity=1, UnitPrice=1000.4375, Tax=80.03500366

 ## Create Silver layer

In [3]:
from pyspark.sql.functions import when, lit, col, current_timestamp, input_file_name
    
 # Add columns IsFlagged, CreatedTS and ModifiedTS
df = df.withColumn("FileName", input_file_name()) \
     .withColumn("IsFlagged", when(col("OrderDate") < '2019-08-01',True).otherwise(False)) \
     .withColumn("CreatedTS", current_timestamp()).withColumn("ModifiedTS", current_timestamp())
    
 # Update CustomerName to "Unknown" if CustomerName null or empty
df = df.withColumn("CustomerName", when((col("CustomerName").isNull() | (col("CustomerName")=="")),lit("Unknown")).otherwise(col("CustomerName")))

In [4]:
df.head(10)

[Row(SalesOrderNumber='SO49172', SalesOrderLineNumber=1, OrderDate=datetime.date(2021, 1, 1), CustomerName='Brian Howard', Email='brian23@adventure-works.com', Item='Road-250 Red, 44', Quantity=1, UnitPrice=2443.35009765625, Tax=195.46800231933594, FileName='file:///d:/IT%20-%20DATA/DE/spark/data/2021.csv', IsFlagged=False, CreatedTS=datetime.datetime(2024, 5, 7, 23, 41, 27, 572000), ModifiedTS=datetime.datetime(2024, 5, 7, 23, 41, 27, 572000)),
 Row(SalesOrderNumber='SO49173', SalesOrderLineNumber=1, OrderDate=datetime.date(2021, 1, 1), CustomerName='Linda Alvarez', Email='linda19@adventure-works.com', Item='Mountain-200 Silver, 38', Quantity=1, UnitPrice=2071.419677734375, Tax=165.71359252929688, FileName='file:///d:/IT%20-%20DATA/DE/spark/data/2021.csv', IsFlagged=False, CreatedTS=datetime.datetime(2024, 5, 7, 23, 41, 27, 572000), ModifiedTS=datetime.datetime(2024, 5, 7, 23, 41, 27, 572000)),
 Row(SalesOrderNumber='SO49174', SalesOrderLineNumber=1, OrderDate=datetime.date(2021, 1, 1

In [5]:
df.write.format("csv").mode("overwrite").save("bronze layer")

In [6]:
df.where(df.CustomerName =="Wyatt Griffin").show()

+----------------+--------------------+----------+-------------+--------------------+----------------+--------+---------+--------+--------------------+---------+--------------------+--------------------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate| CustomerName|               Email|            Item|Quantity|UnitPrice|     Tax|            FileName|IsFlagged|           CreatedTS|          ModifiedTS|
+----------------+--------------------+----------+-------------+--------------------+----------------+--------+---------+--------+--------------------+---------+--------------------+--------------------+
|         SO45260|                   1|2019-12-31|Wyatt Griffin|wyatt49@adventure...|Road-150 Red, 62|       1|  3578.27|286.2616|file:///d:/IT%20-...|    false|2024-05-07 23:41:...|2024-05-07 23:41:...|
+----------------+--------------------+----------+-------------+--------------------+----------------+--------+---------+--------+--------------------+---------+--------------------+--

In [7]:
 # Define the schema for the sales_silver table
    
from pyspark.sql.types import *

df.write.format("csv").mode("overwrite").save("silver layer")




 ## Create gold layer
 

##Dimdate

In [8]:
from pyspark.sql.functions import *
dimdateschema = StructType([ \
    StructField("Date",DateType()), 
    StructField("Day",IntegerType()), 
    StructField("Month", IntegerType()), 
    StructField("Year", IntegerType())
    ])
dfdimDate_gold = df.dropDuplicates(["OrderDate"]).select(col("OrderDate"), \
         dayofmonth("OrderDate").alias("Day"), \
         month("OrderDate").alias("Month"), \
         year("OrderDate").alias("Year"), \
         
     ).orderBy("OrderDate")

 # Display the first 10 rows of the dataframe to preview your data

display(dfdimDate_gold.head(10))

[Row(OrderDate=datetime.date(2019, 7, 1), Day=1, Month=7, Year=2019),
 Row(OrderDate=datetime.date(2019, 7, 2), Day=2, Month=7, Year=2019),
 Row(OrderDate=datetime.date(2019, 7, 3), Day=3, Month=7, Year=2019),
 Row(OrderDate=datetime.date(2019, 7, 4), Day=4, Month=7, Year=2019),
 Row(OrderDate=datetime.date(2019, 7, 5), Day=5, Month=7, Year=2019),
 Row(OrderDate=datetime.date(2019, 7, 6), Day=6, Month=7, Year=2019),
 Row(OrderDate=datetime.date(2019, 7, 7), Day=7, Month=7, Year=2019),
 Row(OrderDate=datetime.date(2019, 7, 8), Day=8, Month=7, Year=2019),
 Row(OrderDate=datetime.date(2019, 7, 9), Day=9, Month=7, Year=2019),
 Row(OrderDate=datetime.date(2019, 7, 10), Day=10, Month=7, Year=2019)]

In [11]:

dfdimDate_gold.write.format("csv").mode("overwrite").save("gold layer\dimdate")

##Create Dimcustomer

In [19]:
dfdimCustomer_silver = df.dropDuplicates(["CustomerName","Email"]).select(col("CustomerName"),col("Email")) \
    .withColumn("First",split(col("CustomerName"), " ").getItem(0)) \
    .withColumn("Last",split(col("CustomerName"), " ").getItem(1)) 

  
dfdimCustomer_gold = dfdimCustomer_silver.withColumn("CustomerID",monotonically_increasing_id()+1)

 # Display the first 10 rows of the dataframe to preview your data

display(dfdimCustomer_gold.head(10))

[Row(CustomerName='Bridget Andersen', Email='bridget15@adventure-works.com', First='Bridget', Last='Andersen', CustomerID=1),
 Row(CustomerName='Mya Butler', Email='mya14@adventure-works.com', First='Mya', Last='Butler', CustomerID=2),
 Row(CustomerName='Deanna Hernandez', Email='deanna29@adventure-works.com', First='Deanna', Last='Hernandez', CustomerID=3),
 Row(CustomerName='Ricky Navarro', Email='ricky10@adventure-works.com', First='Ricky', Last='Navarro', CustomerID=4),
 Row(CustomerName='Omar Ye', Email='omar9@adventure-works.com', First='Omar', Last='Ye', CustomerID=5),
 Row(CustomerName='Kellie Gutierrez', Email='kellie9@adventure-works.com', First='Kellie', Last='Gutierrez', CustomerID=6),
 Row(CustomerName='Raymond Rana', Email='raymond13@adventure-works.com', First='Raymond', Last='Rana', CustomerID=7),
 Row(CustomerName='Derrick Moreno', Email='derrick6@adventure-works.com', First='Derrick', Last='Moreno', CustomerID=8),
 Row(CustomerName='Megan Walker', Email='megan25@adven

In [24]:
dfdimCustomer_gold.write.format("csv").mode("overwrite").save("gold layer\dimCustomer")

##Create dimproduct


In [21]:
dfdimProduct_silver = df.dropDuplicates(["Item"]).select(col("Item")) \
    .withColumn("ItemName",split(col("Item"), ", ").getItem(0)) \
    .withColumn("ItemInfo",when((split(col("Item"), ", ").getItem(1).isNull() | (split(col("Item"), ", ").getItem(1)=="")),lit("")).otherwise(split(col("Item"), ", ").getItem(1))) 
dfdimProduct_gold = dfdimProduct_silver.withColumn("ItemID",monotonically_increasing_id()+1)
display(dfdimProduct_gold.head(10))

[Row(Item='Mountain-200 Black, 42', ItemName='Mountain-200 Black', ItemInfo='42', ItemID=1),
 Row(Item='Touring-1000 Yellow, 46', ItemName='Touring-1000 Yellow', ItemInfo='46', ItemID=2),
 Row(Item='Touring-1000 Blue, 54', ItemName='Touring-1000 Blue', ItemInfo='54', ItemID=3),
 Row(Item='Short-Sleeve Classic Jersey, S', ItemName='Short-Sleeve Classic Jersey', ItemInfo='S', ItemID=4),
 Row(Item="Women's Mountain Shorts, S", ItemName="Women's Mountain Shorts", ItemInfo='S', ItemID=5),
 Row(Item='Long-Sleeve Logo Jersey, L', ItemName='Long-Sleeve Logo Jersey', ItemInfo='L', ItemID=6),
 Row(Item='Mountain-400-W Silver, 42', ItemName='Mountain-400-W Silver', ItemInfo='42', ItemID=7),
 Row(Item='Racing Socks, M', ItemName='Racing Socks', ItemInfo='M', ItemID=8),
 Row(Item='Mountain-100 Silver, 42', ItemName='Mountain-100 Silver', ItemInfo='42', ItemID=9),
 Row(Item='Mountain-200 Silver, 42', ItemName='Mountain-200 Silver', ItemInfo='42', ItemID=10)]

In [23]:
dfdimProduct_gold.write.format("csv").mode("overwrite").save("gold layer\dimProduct")

## Create FactSale


In [25]:
df = df.withColumn("ItemName",split(col("Item"), ", ").getItem(0)) \
    .withColumn("ItemInfo",when((split(col("Item"), ", ").getItem(1).isNull() | (split(col("Item"), ", ").getItem(1)=="")),lit("")).otherwise(split(col("Item"), ", ").getItem(1))) \
    
    
# Create Sales_gold dataframe
    
dffactSales_gold = df.alias("df1").join(dfdimCustomer_gold.alias("df2"),(df.CustomerName == dfdimCustomer_gold.CustomerName) & (df.Email == dfdimCustomer_gold.Email), "left") \
        .join(dfdimProduct_gold.alias("df3"),(df.ItemName == dfdimProduct_gold.ItemName) & (df.ItemInfo == dfdimProduct_gold.ItemInfo), "left") \
    .select(col("df2.CustomerID") \
        , col("df3.ItemID") \
        , col("df1.OrderDate") \
        , col("df1.Quantity") \
        , col("df1.UnitPrice") \
        , col("df1.Tax") \
    ).orderBy(col("df1.OrderDate"), col("df2.CustomerID"), col("df3.ItemID"))
    
# Display the first 10 rows of the dataframe to preview your data
    
display(dffactSales_gold.head(10))

[Row(CustomerID=1737, ItemID=16, OrderDate=datetime.date(2019, 7, 1), Quantity=1, UnitPrice=3399.989990234375, Tax=271.99920654296875),
 Row(CustomerID=2683, ItemID=65, OrderDate=datetime.date(2019, 7, 1), Quantity=1, UnitPrice=3578.27001953125, Tax=286.2615966796875),
 Row(CustomerID=5063, ItemID=55, OrderDate=datetime.date(2019, 7, 1), Quantity=1, UnitPrice=3578.27001953125, Tax=286.2615966796875),
 Row(CustomerID=6095, ItemID=25, OrderDate=datetime.date(2019, 7, 1), Quantity=1, UnitPrice=3399.989990234375, Tax=271.99920654296875),
 Row(CustomerID=6545, ItemID=76, OrderDate=datetime.date(2019, 7, 1), Quantity=1, UnitPrice=699.0982055664062, Tax=55.92789840698242),
 Row(CustomerID=7319, ItemID=55, OrderDate=datetime.date(2019, 7, 1), Quantity=1, UnitPrice=3578.27001953125, Tax=286.2615966796875),
 Row(CustomerID=9243, ItemID=16, OrderDate=datetime.date(2019, 7, 1), Quantity=1, UnitPrice=3399.989990234375, Tax=271.99920654296875),
 Row(CustomerID=11741, ItemID=96, OrderDate=datetime.da

In [27]:
dffactSales_gold.write.format("csv").mode("overwrite").save("gold layer/factSales")