In [0]:
from pyspark.sql.functions import*
from pyspark.sql.types import*


# SILVER LAYER SCRIPT

### DATA ACCESS FROM AZURE STORAGE ACCOUNT

In [0]:
# Storage account name and access key
storage_account_name = "Your storage name"
access_key = "Enter your access_key"

# Set the storage account key in Spark config
spark.conf.set(
  f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
  access_key
)

# Example: List files from a container
dbutils.fs.ls(f"wasbs://bronze@{storage_account_name}.blob.core.windows.net/")


## Read Calander data

In [0]:
df_calander = spark.read.option("header", "true")\
        .option("inferSchema",True)\
        .csv("wasbs://bronze@vdatastorage.blob.core.windows.net/Calender")


## Read customer Data

In [0]:
df_customers = spark.read.option("header", "true")\
        .option("inferSchema",True)\
        .csv("wasbs://bronze@vdatastorage.blob.core.windows.net/Customers")

## Read categories data

In [0]:
df_product_categories = spark.read \
    .option("header", "true") \
    .option("inferSchema", True) \
    .csv("wasbs://bronze@vdatastorage.blob.core.windows.net/Product_Categories")

## Read Subcategoris data

In [0]:
df_product_subcategories = spark.read \
    .option("header", "true") \
    .option("inferSchema", True) \
    .csv("wasbs://bronze@vdatastorage.blob.core.windows.net/Product_Subcategories")

## Raed Product data

In [0]:
df_products = spark.read \
    .option("header", "true") \
    .option("inferSchema", True) \
    .csv("wasbs://bronze@vdatastorage.blob.core.windows.net/Products")

## Read returns data

In [0]:
df_returns = spark.read \
    .option("header", "true") \
    .option("inferSchema", True) \
    .csv("wasbs://bronze@vdatastorage.blob.core.windows.net/Returns")

## Read territories data

In [0]:
df_territories = spark.read \
    .option("header", "true") \
    .option("inferSchema", True) \
    .csv("wasbs://bronze@vdatastorage.blob.core.windows.net/Territories")

## Read sales data

In [0]:
df_sales = spark.read \
    .option("header", "true") \
    .option("inferSchema", True) \
    .csv("wasbs://bronze@vdatastorage.blob.core.windows.net/Sales*")

In [0]:
df_cal = df_calander.withColumn('Month',month(col('Date')))\
                    .withColumn('Year',year(col('Date')))
df_cal.show(5)

+----------+-----+----+
|      Date|Month|Year|
+----------+-----+----+
|2015-01-01|    1|2015|
|2015-01-02|    1|2015|
|2015-01-03|    1|2015|
|2015-01-04|    1|2015|
|2015-01-05|    1|2015|
+----------+-----+----+
only showing top 5 rows



# Transformation

### Calender Transformation

In [0]:
df_cal.write.format('parquet')\
            .mode('append')\
            .option("path","wasbs://silver@vdatastorage.blob.core.windows.net/Calander")\
            .option("header",True)\
            .save()

### Customers

In [0]:
df_customers.show(10)

+-----------+------+---------+--------+----------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+
|CustomerKey|Prefix|FirstName|LastName| BirthDate|MaritalStatus|Gender|        EmailAddress|AnnualIncome|TotalChildren|EducationLevel|  Occupation|HomeOwner|
+-----------+------+---------+--------+----------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+
|      11000|   MR.|      JON|    YANG|1966-04-08|            M|     M|jon24@adventure-w...|    $90,000 |            2|     Bachelors|Professional|        Y|
|      11001|   MR.|   EUGENE|   HUANG|1965-05-14|            S|     M|eugene10@adventur...|    $60,000 |            3|     Bachelors|Professional|        N|
|      11002|   MR.|    RUBEN|  TORRES|1965-08-12|            M|     M|ruben35@adventure...|    $60,000 |            3|     Bachelors|Professional|        Y|
|      11003|   MS.|  CHRISTY|     ZHU|1968-02-15|  

### With use of concat

In [0]:
df_cus = df_customers.withColumn("FullName",concat(col("Prefix"),lit(" "),col("FirstName"),lit(" "),col("LastName"))).show(10)

+-----------+------+---------+--------+----------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+--------------------+
|CustomerKey|Prefix|FirstName|LastName| BirthDate|MaritalStatus|Gender|        EmailAddress|AnnualIncome|TotalChildren|EducationLevel|  Occupation|HomeOwner|            FullName|
+-----------+------+---------+--------+----------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+--------------------+
|      11000|   MR.|      JON|    YANG|1966-04-08|            M|     M|jon24@adventure-w...|    $90,000 |            2|     Bachelors|Professional|        Y|        MR. JON YANG|
|      11001|   MR.|   EUGENE|   HUANG|1965-05-14|            S|     M|eugene10@adventur...|    $60,000 |            3|     Bachelors|Professional|        N|    MR. EUGENE HUANG|
|      11002|   MR.|    RUBEN|  TORRES|1965-08-12|            M|     M|ruben35@adventure...|    $60,000 |

### With use of concat_ws

In [0]:
df_cust = df_customers.withColumn("FullName",concat_ws(" ",col("Prefix"),col("FirstName"),col("LastName")))

In [0]:
df_cust.show(10)

+-----------+------+---------+--------+----------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+--------------------+
|CustomerKey|Prefix|FirstName|LastName| BirthDate|MaritalStatus|Gender|        EmailAddress|AnnualIncome|TotalChildren|EducationLevel|  Occupation|HomeOwner|            FullName|
+-----------+------+---------+--------+----------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+--------------------+
|      11000|   MR.|      JON|    YANG|1966-04-08|            M|     M|jon24@adventure-w...|    $90,000 |            2|     Bachelors|Professional|        Y|        MR. JON YANG|
|      11001|   MR.|   EUGENE|   HUANG|1965-05-14|            S|     M|eugene10@adventur...|    $60,000 |            3|     Bachelors|Professional|        N|    MR. EUGENE HUANG|
|      11002|   MR.|    RUBEN|  TORRES|1965-08-12|            M|     M|ruben35@adventure...|    $60,000 |

In [0]:
df_cust.write.format('parquet')\
            .mode('append')\
            .option("path","wasbs://silver@vdatastorage.blob.core.windows.net/Customers")\
            .option("header",True)\
            .save()

### Products

In [0]:
df_products.show(10)

+----------+---------------------+----------+--------------------+--------------------+--------------------+------------+-----------+------------+-----------+------------+
|ProductKey|ProductSubcategoryKey|ProductSKU|         ProductName|           ModelName|  ProductDescription|ProductColor|ProductSize|ProductStyle|ProductCost|ProductPrice|
+----------+---------------------+----------+--------------------+--------------------+--------------------+------------+-----------+------------+-----------+------------+
|       214|                   31| HL-U509-R|Sport-100 Helmet,...|           Sport-100|Universal fit, we...|         Red|          0|           0|    13.0863|       34.99|
|       215|                   31|   HL-U509|Sport-100 Helmet,...|           Sport-100|Universal fit, we...|       Black|          0|           0|    12.0278|     33.6442|
|       218|                   23| SO-B909-M|Mountain Bike Soc...| Mountain Bike Socks|Combination of na...|       White|          M|       

In [0]:
df_pro = df_products.withColumn("ProductSKU",split(col("ProductSKU"),'-')[0])\
                    .withColumn("ProductName",split(col("ProductName"),' ')[0])

In [0]:
df_pro.show(10)

+----------+---------------------+----------+-----------+--------------------+--------------------+------------+-----------+------------+-----------+------------+
|ProductKey|ProductSubcategoryKey|ProductSKU|ProductName|           ModelName|  ProductDescription|ProductColor|ProductSize|ProductStyle|ProductCost|ProductPrice|
+----------+---------------------+----------+-----------+--------------------+--------------------+------------+-----------+------------+-----------+------------+
|       214|                   31|        HL|  Sport-100|           Sport-100|Universal fit, we...|         Red|          0|           0|    13.0863|       34.99|
|       215|                   31|        HL|  Sport-100|           Sport-100|Universal fit, we...|       Black|          0|           0|    12.0278|     33.6442|
|       218|                   23|        SO|   Mountain| Mountain Bike Socks|Combination of na...|       White|          M|           U|     3.3963|         9.5|
|       219|          

In [0]:
df_pro.write.format('parquet')\
            .mode('append')\
            .option("path","wasbs://silver@vdatastorage.blob.core.windows.net/Product")\
            .option("header",True)\
            .save()

### Return


In [0]:
df_returns.show(10)

+----------+------------+----------+--------------+
|ReturnDate|TerritoryKey|ProductKey|ReturnQuantity|
+----------+------------+----------+--------------+
|2015-01-18|           9|       312|             1|
|2015-01-18|          10|       310|             1|
|2015-01-21|           8|       346|             1|
|2015-01-22|           4|       311|             1|
|2015-02-02|           6|       312|             1|
|2015-02-15|           1|       312|             1|
|2015-02-19|           9|       311|             1|
|2015-02-24|           8|       314|             1|
|2015-03-08|           8|       350|             1|
|2015-03-13|           9|       350|             1|
+----------+------------+----------+--------------+
only showing top 10 rows



In [0]:
df_returns.write.format('parquet')\
            .mode('append')\
            .option("path","wasbs://silver@vdatastorage.blob.core.windows.net/Returns")\
            .option('header',True)\
            .save()

### Territories


In [0]:
df_territories.display()

SalesTerritoryKey,Region,Country,Continent
1,Northwest,United States,North America
2,Northeast,United States,North America
3,Central,United States,North America
4,Southwest,United States,North America
5,Southeast,United States,North America
6,Canada,Canada,North America
7,France,France,Europe
8,Germany,Germany,Europe
9,Australia,Australia,Pacific
10,United Kingdom,United Kingdom,Europe


In [0]:
df_territories.write.format('parquet')\
            .mode('append')\
            .option("path","wasbs://silver@vdatastorage.blob.core.windows.net/Territories")\
            .option('header',True)\
            .save()

### Sales

In [0]:
df_sales.show(10)

+----------+-------------------+-----------+----------+-----------+------------+-------------+-------------+--------+
| OrderDate|          StockDate|OrderNumber|ProductKey|CustomerKey|TerritoryKey|OrderLineItem|OrderQuantity|Multiply|
+----------+-------------------+-----------+----------+-----------+------------+-------------+-------------+--------+
|2017-01-01|2003-12-13 00:00:00|    TO61285|       529|      23791|           1|            2|            2|       4|
|2017-01-01|2003-09-24 00:00:00|    TO61285|       214|      23791|           1|            3|            1|       3|
|2017-01-01|2003-09-04 00:00:00|    TO61285|       540|      23791|           1|            1|            1|       1|
|2017-01-01|2003-09-28 00:00:00|    TO61301|       529|      16747|           1|            2|            2|       4|
|2017-01-01|2003-10-21 00:00:00|    TO61301|       377|      16747|           1|            1|            1|       1|
|2017-01-01|2003-10-23 00:00:00|    TO61301|       540| 

#### Adding timestamp to StockDtae

In [0]:
df_sales = df_sales.withColumn('StockDate',to_timestamp('StockDate'))

### Replacing S With T In orederNumber

In [0]:
df_sales = df_sales.withColumn('OrderNumber',regexp_replace(col('OrderNumber'),'S','T'))

In [0]:
df_sales = df_sales.withColumn('Multiply',col("OrderLineItem")*col("OrderQuantity"))

In [0]:
df_sales.show(10)

+----------+-------------------+-----------+----------+-----------+------------+-------------+-------------+--------+
| OrderDate|          StockDate|OrderNumber|ProductKey|CustomerKey|TerritoryKey|OrderLineItem|OrderQuantity|Multiply|
+----------+-------------------+-----------+----------+-----------+------------+-------------+-------------+--------+
|2017-01-01|2003-12-13 00:00:00|    TO61285|       529|      23791|           1|            2|            2|       4|
|2017-01-01|2003-09-24 00:00:00|    TO61285|       214|      23791|           1|            3|            1|       3|
|2017-01-01|2003-09-04 00:00:00|    TO61285|       540|      23791|           1|            1|            1|       1|
|2017-01-01|2003-09-28 00:00:00|    TO61301|       529|      16747|           1|            2|            2|       4|
|2017-01-01|2003-10-21 00:00:00|    TO61301|       377|      16747|           1|            1|            1|       1|
|2017-01-01|2003-10-23 00:00:00|    TO61301|       540| 

### Sales analysis

In [0]:
df_sales.groupBy('OrderDate').agg(count('OrderNumber').alias('Total Orders')).show(10)

+----------+------------+
| OrderDate|Total Orders|
+----------+------------+
|2017-01-06|         151|
|2017-01-27|         142|
|2017-02-26|         119|
|2017-01-24|         173|
|2017-06-29|         172|
|2017-02-16|         124|
|2017-04-09|         140|
|2017-02-28|         162|
|2017-03-28|         149|
|2017-06-30|         136|
+----------+------------+
only showing top 10 rows



In [0]:
df_product_categories.show(10)

ProductCategoryKey,CategoryName
1,Bikes
2,Components
3,Clothing
4,Accessories


Databricks visualization. Run in Databricks to view.

In [0]:
df_territories.display(10)

SalesTerritoryKey,Region,Country,Continent
1,Northwest,United States,North America
2,Northeast,United States,North America
3,Central,United States,North America
4,Southwest,United States,North America
5,Southeast,United States,North America
6,Canada,Canada,North America
7,France,France,Europe
8,Germany,Germany,Europe
9,Australia,Australia,Pacific
10,United Kingdom,United Kingdom,Europe


Databricks visualization. Run in Databricks to view.

In [0]:
df_sales.display(10)

OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,TerritoryKey,OrderLineItem,OrderQuantity,Multiply
2017-01-01,2003-12-13T00:00:00Z,TO61285,529,23791,1,2,2,4
2017-01-01,2003-09-24T00:00:00Z,TO61285,214,23791,1,3,1,3
2017-01-01,2003-09-04T00:00:00Z,TO61285,540,23791,1,1,1,1
2017-01-01,2003-09-28T00:00:00Z,TO61301,529,16747,1,2,2,4
2017-01-01,2003-10-21T00:00:00Z,TO61301,377,16747,1,1,1,1
2017-01-01,2003-10-23T00:00:00Z,TO61301,540,16747,1,3,1,3
2017-01-01,2003-09-04T00:00:00Z,TO61269,215,11792,4,1,1,1
2017-01-01,2003-10-21T00:00:00Z,TO61269,229,11792,4,2,1,2
2017-01-01,2003-10-24T00:00:00Z,TO61286,528,11530,6,2,2,4
2017-01-01,2003-09-27T00:00:00Z,TO61286,536,11530,6,1,2,2


In [0]:
df_sales.write.format('parquet')\
            .mode('append')\
            .option("path","wasbs://silver@vdatastorage.blob.core.windows.net/Sales")\
            .option('header',True)\
            .save()

In [0]:
df_product_categories.write.format('parquet')\
            .mode('append')\
            .option("path","wasbs://silver@vdatastorage.blob.core.windows.net/ProductCategories")\
            .option('header',True)\
            .save()