**a) Filter out cancellations and invalid quantities/prices
**

In [0]:
from pyspark.sql.functions import col
raw_df = spark.read.table("online_retail.sales_raw")
source_df = raw_df
source_df = source_df.filter(~col("InvoiceNo").startswith("C")) \
                     .filter(col("Quantity") > 0) \
                     .filter(col("UnitPrice") > 0)
source_df.limit(20).display()

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
549251,84596F,SMALL MARSHMALLOWS PINK BOWL,2,4/7/2011 12:16,0.42,14449,United Kingdom
549251,21975,PACK OF 60 DINOSAUR CAKE CASES,1,4/7/2011 12:16,0.55,14449,United Kingdom
549251,22417,PACK OF 60 SPACEBOY CAKE CASES,1,4/7/2011 12:16,0.55,14449,United Kingdom
549251,22326,ROUND SNACK BOXES SET OF4 WOODLAND,1,4/7/2011 12:16,2.95,14449,United Kingdom
549251,22327,ROUND SNACK BOXES SET OF 4 SKULLS,1,4/7/2011 12:16,2.95,14449,United Kingdom
549251,85093,CANDY SPOT EGG WARMER HARE,10,4/7/2011 12:16,0.39,14449,United Kingdom
549251,85094,CANDY SPOT EGG WARMER RABBIT,5,4/7/2011 12:16,0.19,14449,United Kingdom
549251,85132B,CHARLIE AND LOLA TABLE TINS,2,4/7/2011 12:16,1.95,14449,United Kingdom
549251,85132C,CHARLIE AND LOLA FIGURES TINS,2,4/7/2011 12:16,1.95,14449,United Kingdom
549251,22138,BAKING SET 9 PIECE RETROSPOT,1,4/7/2011 12:16,4.95,14449,United Kingdom


**b) Handle null CustomerIDs: Replace with 0 **

In [0]:
from pyspark.sql.functions import lit,when
source_df = source_df.withColumn("CustomerID", 
                                 when(col("CustomerID").isNull(), lit(0))
                                 .otherwise(col("CustomerID")).cast("int")
                                )
source_df.display()

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
549251,84596F,SMALL MARSHMALLOWS PINK BOWL,2,4/7/2011 12:16,0.42,14449,United Kingdom
549251,21975,PACK OF 60 DINOSAUR CAKE CASES,1,4/7/2011 12:16,0.55,14449,United Kingdom
549251,22417,PACK OF 60 SPACEBOY CAKE CASES,1,4/7/2011 12:16,0.55,14449,United Kingdom
549251,22326,ROUND SNACK BOXES SET OF4 WOODLAND,1,4/7/2011 12:16,2.95,14449,United Kingdom
549251,22327,ROUND SNACK BOXES SET OF 4 SKULLS,1,4/7/2011 12:16,2.95,14449,United Kingdom
549251,85093,CANDY SPOT EGG WARMER HARE,10,4/7/2011 12:16,0.39,14449,United Kingdom
549251,85094,CANDY SPOT EGG WARMER RABBIT,5,4/7/2011 12:16,0.19,14449,United Kingdom
549251,85132B,CHARLIE AND LOLA TABLE TINS,2,4/7/2011 12:16,1.95,14449,United Kingdom
549251,85132C,CHARLIE AND LOLA FIGURES TINS,2,4/7/2011 12:16,1.95,14449,United Kingdom
549251,22138,BAKING SET 9 PIECE RETROSPOT,1,4/7/2011 12:16,4.95,14449,United Kingdom


c) Convert InvoiceDate to Timestamp and extract time parts

In [0]:
from pyspark.sql.functions import col, to_timestamp, year, month, dayofweek, hour

source_df = source_df.withColumn("InvoiceTimestamp", 
                                 to_timestamp(col("InvoiceDate"), "M/d/yyyy H:mm")
                                ) \
                     .withColumn("Year", year(col("InvoiceTimestamp"))) \
                     .withColumn("Month", month(col("InvoiceTimestamp"))) \
                     .withColumn("DayOfWeek", dayofweek(col("InvoiceTimestamp"))) \
                     .withColumn("Hour", hour(col("InvoiceTimestamp"))) \
                     .drop("InvoiceDate") # Drop the old string column


In [0]:
source_df.display()

InvoiceNo,StockCode,Description,Quantity,UnitPrice,CustomerID,Country,InvoiceTimestamp,Year,Month,DayOfWeek,Hour
549251,84596F,SMALL MARSHMALLOWS PINK BOWL,2,0.42,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12
549251,21975,PACK OF 60 DINOSAUR CAKE CASES,1,0.55,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12
549251,22417,PACK OF 60 SPACEBOY CAKE CASES,1,0.55,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12
549251,22326,ROUND SNACK BOXES SET OF4 WOODLAND,1,2.95,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12
549251,22327,ROUND SNACK BOXES SET OF 4 SKULLS,1,2.95,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12
549251,85093,CANDY SPOT EGG WARMER HARE,10,0.39,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12
549251,85094,CANDY SPOT EGG WARMER RABBIT,5,0.19,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12
549251,85132B,CHARLIE AND LOLA TABLE TINS,2,1.95,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12
549251,85132C,CHARLIE AND LOLA FIGURES TINS,2,1.95,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12
549251,22138,BAKING SET 9 PIECE RETROSPOT,1,4.95,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12


 d) Add the TotalAmount/Revenue column


In [0]:
from pyspark.sql.functions import col, round

source_df = source_df.withColumn("TotalAmount", 
                                 round(col("Quantity") * col("UnitPrice"), 2)
                                )
source_df.limit(20).display()

InvoiceNo,StockCode,Description,Quantity,UnitPrice,CustomerID,Country,InvoiceTimestamp,Year,Month,DayOfWeek,Hour,TotalAmount
549251,84596F,SMALL MARSHMALLOWS PINK BOWL,2,0.42,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,0.84
549251,21975,PACK OF 60 DINOSAUR CAKE CASES,1,0.55,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,0.55
549251,22417,PACK OF 60 SPACEBOY CAKE CASES,1,0.55,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,0.55
549251,22326,ROUND SNACK BOXES SET OF4 WOODLAND,1,2.95,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,2.95
549251,22327,ROUND SNACK BOXES SET OF 4 SKULLS,1,2.95,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,2.95
549251,85093,CANDY SPOT EGG WARMER HARE,10,0.39,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,3.9
549251,85094,CANDY SPOT EGG WARMER RABBIT,5,0.19,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,0.95
549251,85132B,CHARLIE AND LOLA TABLE TINS,2,1.95,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,3.9
549251,85132C,CHARLIE AND LOLA FIGURES TINS,2,1.95,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,3.9
549251,22138,BAKING SET 9 PIECE RETROSPOT,1,4.95,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,4.95


 f) Cast InvoiceNo to integer

In [0]:
from pyspark.sql.functions import col, when, lit
from pyspark.sql.types import IntegerType

source_df = source_df.withColumn("InvoiceNoInt", 
                                 when(col("InvoiceNo").rlike("^[0-9]+$"), 
                                      col("InvoiceNo").cast("int")
                                     ).otherwise(lit(None))
                                )

# Display the result with limit 20
source_df.limit(20).display()

InvoiceNo,StockCode,Description,Quantity,UnitPrice,CustomerID,Country,InvoiceTimestamp,Year,Month,DayOfWeek,Hour,TotalAmount,InvoiceNoInt
549251,84596F,SMALL MARSHMALLOWS PINK BOWL,2,0.42,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,0.84,549251
549251,21975,PACK OF 60 DINOSAUR CAKE CASES,1,0.55,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,0.55,549251
549251,22417,PACK OF 60 SPACEBOY CAKE CASES,1,0.55,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,0.55,549251
549251,22326,ROUND SNACK BOXES SET OF4 WOODLAND,1,2.95,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,2.95,549251
549251,22327,ROUND SNACK BOXES SET OF 4 SKULLS,1,2.95,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,2.95,549251
549251,85093,CANDY SPOT EGG WARMER HARE,10,0.39,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,3.9,549251
549251,85094,CANDY SPOT EGG WARMER RABBIT,5,0.19,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,0.95,549251
549251,85132B,CHARLIE AND LOLA TABLE TINS,2,1.95,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,3.9,549251
549251,85132C,CHARLIE AND LOLA FIGURES TINS,2,1.95,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,3.9,549251
549251,22138,BAKING SET 9 PIECE RETROSPOT,1,4.95,14449,United Kingdom,2011-04-07T12:16:00Z,2011,4,5,12,4.95,549251


# Create the Silver Delta Table and CSV File

In [0]:
spark.conf.set(
    "fs.azure.account.key.project02etlstorage.dfs.core.windows.net",
    "hjU3pTklfet5UlyVsEQa+j78mA2oCqSZCwmmTcvbvP2WTU6ZkTjUvxyWiV+sm+Fsz3IvUdE7rJi2+AStKYSWxg==")

In [0]:
source_df.write.mode("overwrite").format("delta").saveAsTable("online_retail.sales_cleaned")

In [0]:
source_df.write.mode("overwrite").csv(f"abfss://source@project02etlstorage.dfs.core.windows.net/sales_cleaned")