### Silver Layer Script

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### Data Access Using Application

In [0]:
spark.conf.set("fs.azure.account.auth.type.firststoragedatalake.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.firststoragedatalake.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.firststoragedatalake.dfs.core.windows.net", "f6bc5ec6-c75f-4cab-9c3e-5ee6f34fb22e")
spark.conf.set("fs.azure.account.oauth2.client.secret.firststoragedatalake.dfs.core.windows.net", "xvS8Q~7dIYY0FpwxDp2vSH3-k61DOKihnXwERdr9")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.firststoragedatalake.dfs.core.windows.net", "https://login.microsoftonline.com/108e5c55-88ad-4857-ada5-a80dd98b8e55/oauth2/token")

## Data Loading

### Reading Data

In [0]:
df_cal = spark.read.format('csv')\
                .option("header",True)\
                .option("inferschema",True)\
                .load('abfss://bronze@firststoragedatalake.dfs.core.windows.net/AdventureWorks_Calendar')

In [0]:
df_cus = spark.read.format('csv')\
                .option("header",True)\
                .option("inferschema",True)\
                .load('abfss://bronze@firststoragedatalake.dfs.core.windows.net/AdventureWorks_Customers')

In [0]:
df_procat = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@firststoragedatalake.dfs.core.windows.net/AdventureWorks_Product_Categories')

In [0]:
df_pro = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@firststoragedatalake.dfs.core.windows.net/AdventureWorks_Products')

In [0]:
df_ret = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@firststoragedatalake.dfs.core.windows.net/AdventureWorks_Returns')

In [0]:
df_sales = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@firststoragedatalake.dfs.core.windows.net/AdventureWorks_Sales*')

In [0]:
df_ter = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@firststoragedatalake.dfs.core.windows.net/AdventureWorks_Territories')

In [0]:
df_subcat = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@firststoragedatalake.dfs.core.windows.net/Product_Subcategories')

### Transformations

In [0]:
## Calendar Data

In [0]:
df_cal.limit(25).display()

Date
2015-01-01
2015-01-02
2015-01-03
2015-01-04
2015-01-05
2015-01-06
2015-01-07
2015-01-08
2015-01-09
2015-01-10


In [0]:
df_cal = df_cal.withColumn('Month', month(col('Date')))\
           .withColumn('Year', year(col('Date')))\
           .withColumn('Day', dayofmonth(col('Date')))

df_cal.limit(25).display()

Date,Month,Year,Day
2015-01-01,1,2015,1
2015-01-02,1,2015,2
2015-01-03,1,2015,3
2015-01-04,1,2015,4
2015-01-05,1,2015,5
2015-01-06,1,2015,6
2015-01-07,1,2015,7
2015-01-08,1,2015,8
2015-01-09,1,2015,9
2015-01-10,1,2015,10


In [0]:
## where to save data
df_cal.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@firststoragedatalake.dfs.core.windows.net/AdventureWorks_Calendar")\
            .save()

## Customer

In [0]:
df_cus.limit(25).display()

CustomerKey,Prefix,FirstName,LastName,BirthDate,MaritalStatus,Gender,EmailAddress,AnnualIncome,TotalChildren,EducationLevel,Occupation,HomeOwner
11000,MR.,JON,YANG,1966-04-08,M,M,jon24@adventure-works.com,"$90,000",2,Bachelors,Professional,Y
11001,MR.,EUGENE,HUANG,1965-05-14,S,M,eugene10@adventure-works.com,"$60,000",3,Bachelors,Professional,N
11002,MR.,RUBEN,TORRES,1965-08-12,M,M,ruben35@adventure-works.com,"$60,000",3,Bachelors,Professional,Y
11003,MS.,CHRISTY,ZHU,1968-02-15,S,F,christy12@adventure-works.com,"$70,000",0,Bachelors,Professional,N
11004,MRS.,ELIZABETH,JOHNSON,1968-08-08,S,F,elizabeth5@adventure-works.com,"$80,000",5,Bachelors,Professional,Y
11005,MR.,JULIO,RUIZ,1965-08-05,S,M,julio1@adventure-works.com,"$70,000",0,Bachelors,Professional,Y
11007,MR.,MARCO,MEHTA,1964-05-09,M,M,marco14@adventure-works.com,"$60,000",3,Bachelors,Professional,Y
11008,MRS.,ROBIN,VERHOFF,1964-07-07,S,F,rob4@adventure-works.com,"$60,000",4,Bachelors,Professional,Y
11009,MR.,SHANNON,CARLSON,1964-04-01,S,M,shannon38@adventure-works.com,"$70,000",0,Bachelors,Professional,N
11010,MS.,JACQUELYN,SUAREZ,1964-02-06,S,F,jacquelyn20@adventure-works.com,"$70,000",0,Bachelors,Professional,N


In [0]:
## create new column to concat full name
## lit() function in concat() used when you want to add a column that has the same value for every row

df_cus = df_cus.withColumn('FullName',concat_ws(' ',col('Prefix'),col('FirstName'),col('LastName')))
df_cus.limit(25).display()

CustomerKey,Prefix,FirstName,LastName,BirthDate,MaritalStatus,Gender,EmailAddress,AnnualIncome,TotalChildren,EducationLevel,Occupation,HomeOwner,FullName
11000,MR.,JON,YANG,1966-04-08,M,M,jon24@adventure-works.com,"$90,000",2,Bachelors,Professional,Y,MR. JON YANG
11001,MR.,EUGENE,HUANG,1965-05-14,S,M,eugene10@adventure-works.com,"$60,000",3,Bachelors,Professional,N,MR. EUGENE HUANG
11002,MR.,RUBEN,TORRES,1965-08-12,M,M,ruben35@adventure-works.com,"$60,000",3,Bachelors,Professional,Y,MR. RUBEN TORRES
11003,MS.,CHRISTY,ZHU,1968-02-15,S,F,christy12@adventure-works.com,"$70,000",0,Bachelors,Professional,N,MS. CHRISTY ZHU
11004,MRS.,ELIZABETH,JOHNSON,1968-08-08,S,F,elizabeth5@adventure-works.com,"$80,000",5,Bachelors,Professional,Y,MRS. ELIZABETH JOHNSON
11005,MR.,JULIO,RUIZ,1965-08-05,S,M,julio1@adventure-works.com,"$70,000",0,Bachelors,Professional,Y,MR. JULIO RUIZ
11007,MR.,MARCO,MEHTA,1964-05-09,M,M,marco14@adventure-works.com,"$60,000",3,Bachelors,Professional,Y,MR. MARCO MEHTA
11008,MRS.,ROBIN,VERHOFF,1964-07-07,S,F,rob4@adventure-works.com,"$60,000",4,Bachelors,Professional,Y,MRS. ROBIN VERHOFF
11009,MR.,SHANNON,CARLSON,1964-04-01,S,M,shannon38@adventure-works.com,"$70,000",0,Bachelors,Professional,N,MR. SHANNON CARLSON
11010,MS.,JACQUELYN,SUAREZ,1964-02-06,S,F,jacquelyn20@adventure-works.com,"$70,000",0,Bachelors,Professional,N,MS. JACQUELYN SUAREZ


In [0]:
df_cus.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@firststoragedatalake.dfs.core.windows.net/AdventureWorks_Customers")\
            .save()

### Sub Categories

In [0]:
df_subcat.limit(25).display()

ProductSubcategoryKey,SubcategoryName,ProductCategoryKey
1,Mountain Bikes,1
2,Road Bikes,1
3,Touring Bikes,1
4,Handlebars,2
5,Bottom Brackets,2
6,Brakes,2
7,Chains,2
8,Cranksets,2
9,Derailleurs,2
10,Forks,2


In [0]:
df_subcat.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@firststoragedatalake.dfs.core.windows.net/product_Subcategories")\
            .save()

### Products

In [0]:
df_pro.limit(25).display()

ProductKey,ProductSubcategoryKey,ProductSKU,ProductName,ModelName,ProductDescription,ProductColor,ProductSize,ProductStyle,ProductCost,ProductPrice
214,31,HL-U509-R,"Sport-100 Helmet, Red",Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Red,0,0,13.0863,34.99
215,31,HL-U509,"Sport-100 Helmet, Black",Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Black,0,0,12.0278,33.6442
218,23,SO-B909-M,"Mountain Bike Socks, M",Mountain Bike Socks,Combination of natural and synthetic fibers stays dry and provides just the right cushioning.,White,M,U,3.3963,9.5
219,23,SO-B909-L,"Mountain Bike Socks, L",Mountain Bike Socks,Combination of natural and synthetic fibers stays dry and provides just the right cushioning.,White,L,U,3.3963,9.5
220,31,HL-U509-B,"Sport-100 Helmet, Blue",Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Blue,0,0,12.0278,33.6442
223,19,CA-1098,AWC Logo Cap,Cycling Cap,Traditional style with a flip-up brim; one-size fits all.,Multi,0,U,5.7052,8.6442
226,21,LJ-0192-S,"Long-Sleeve Logo Jersey, S",Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,S,U,31.7244,48.0673
229,21,LJ-0192-M,"Long-Sleeve Logo Jersey, M",Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,M,U,31.7244,48.0673
232,21,LJ-0192-L,"Long-Sleeve Logo Jersey, L",Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,L,U,31.7244,48.0673
235,21,LJ-0192-X,"Long-Sleeve Logo Jersey, XL",Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,XL,U,31.7244,48.0673


In [0]:
# split amd index
# We can use .getItem(0) alos inplace of [0]

df_pro = df_pro.withColumn('ProductSKU',split(col('ProductSKU'),'-')[0])\
               .withColumn('ProductName',split(col('ProductName'),' ')[0])

In [0]:
df_pro.limit(25).display()

ProductKey,ProductSubcategoryKey,ProductSKU,ProductName,ModelName,ProductDescription,ProductColor,ProductSize,ProductStyle,ProductCost,ProductPrice
214,31,HL,Sport-100,Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Red,0,0,13.0863,34.99
215,31,HL,Sport-100,Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Black,0,0,12.0278,33.6442
218,23,SO,Mountain,Mountain Bike Socks,Combination of natural and synthetic fibers stays dry and provides just the right cushioning.,White,M,U,3.3963,9.5
219,23,SO,Mountain,Mountain Bike Socks,Combination of natural and synthetic fibers stays dry and provides just the right cushioning.,White,L,U,3.3963,9.5
220,31,HL,Sport-100,Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Blue,0,0,12.0278,33.6442
223,19,CA,AWC,Cycling Cap,Traditional style with a flip-up brim; one-size fits all.,Multi,0,U,5.7052,8.6442
226,21,LJ,Long-Sleeve,Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,S,U,31.7244,48.0673
229,21,LJ,Long-Sleeve,Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,M,U,31.7244,48.0673
232,21,LJ,Long-Sleeve,Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,L,U,31.7244,48.0673
235,21,LJ,Long-Sleeve,Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,XL,U,31.7244,48.0673


In [0]:
df_pro.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@firststoragedatalake.dfs.core.windows.net/AdventureWorks_Products")\
            .save()

### Rerurns

In [0]:
df_ret.limit(25).display()

ReturnDate,TerritoryKey,ProductKey,ReturnQuantity
2015-01-18,9,312,1
2015-01-18,10,310,1
2015-01-21,8,346,1
2015-01-22,4,311,1
2015-02-02,6,312,1
2015-02-15,1,312,1
2015-02-19,9,311,1
2015-02-24,8,314,1
2015-03-08,8,350,1
2015-03-13,9,350,1


In [0]:
df_ret.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@firststoragedatalake.dfs.core.windows.net/AdventureWorks_Returns")\
            .save()

### Territories

In [0]:
df_ter.display()

SalesTerritoryKey,Region,Country,Continent
1,Northwest,United States,North America
2,Northeast,United States,North America
3,Central,United States,North America
4,Southwest,United States,North America
5,Southeast,United States,North America
6,Canada,Canada,North America
7,France,France,Europe
8,Germany,Germany,Europe
9,Australia,Australia,Pacific
10,United Kingdom,United Kingdom,Europe


In [0]:
df_ret.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@firststoragedatalake.dfs.core.windows.net/AdventureWorks_Territories")\
            .save()

### Sales

In [0]:
df_sales.limit(25).display()

OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,TerritoryKey,OrderLineItem,OrderQuantity
2017-01-01,2003-12-13,SO61285,529,23791,1,2,2
2017-01-01,2003-09-24,SO61285,214,23791,1,3,1
2017-01-01,2003-09-04,SO61285,540,23791,1,1,1
2017-01-01,2003-09-28,SO61301,529,16747,1,2,2
2017-01-01,2003-10-21,SO61301,377,16747,1,1,1
2017-01-01,2003-10-23,SO61301,540,16747,1,3,1
2017-01-01,2003-09-04,SO61269,215,11792,4,1,1
2017-01-01,2003-10-21,SO61269,229,11792,4,2,1
2017-01-01,2003-10-24,SO61286,528,11530,6,2,2
2017-01-01,2003-09-27,SO61286,536,11530,6,1,2


In [0]:
df_sales = df_sales.withColumn('StockDate', to_timestamp('StockDate'))
df_sales.limit(25).display()

OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,TerritoryKey,OrderLineItem,OrderQuantity
2017-01-01,2003-12-13T00:00:00Z,SO61285,529,23791,1,2,2
2017-01-01,2003-09-24T00:00:00Z,SO61285,214,23791,1,3,1
2017-01-01,2003-09-04T00:00:00Z,SO61285,540,23791,1,1,1
2017-01-01,2003-09-28T00:00:00Z,SO61301,529,16747,1,2,2
2017-01-01,2003-10-21T00:00:00Z,SO61301,377,16747,1,1,1
2017-01-01,2003-10-23T00:00:00Z,SO61301,540,16747,1,3,1
2017-01-01,2003-09-04T00:00:00Z,SO61269,215,11792,4,1,1
2017-01-01,2003-10-21T00:00:00Z,SO61269,229,11792,4,2,1
2017-01-01,2003-10-24T00:00:00Z,SO61286,528,11530,6,2,2
2017-01-01,2003-09-27T00:00:00Z,SO61286,536,11530,6,1,2


In [0]:
df_sales = df_sales.withColumn('OrderNumber', regexp_replace('OrderNumber', 'S', 'T'))
df_sales.limit(25).display()

OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,TerritoryKey,OrderLineItem,OrderQuantity
2017-01-01,2003-12-13T00:00:00Z,TO61285,529,23791,1,2,2
2017-01-01,2003-09-24T00:00:00Z,TO61285,214,23791,1,3,1
2017-01-01,2003-09-04T00:00:00Z,TO61285,540,23791,1,1,1
2017-01-01,2003-09-28T00:00:00Z,TO61301,529,16747,1,2,2
2017-01-01,2003-10-21T00:00:00Z,TO61301,377,16747,1,1,1
2017-01-01,2003-10-23T00:00:00Z,TO61301,540,16747,1,3,1
2017-01-01,2003-09-04T00:00:00Z,TO61269,215,11792,4,1,1
2017-01-01,2003-10-21T00:00:00Z,TO61269,229,11792,4,2,1
2017-01-01,2003-10-24T00:00:00Z,TO61286,528,11530,6,2,2
2017-01-01,2003-09-27T00:00:00Z,TO61286,536,11530,6,1,2


In [0]:
# multiplication of two column
df_sales = df_sales.withColumn('multiply',col('OrderLineItem') * col('OrderQuantity'))
df_sales.limit(25).display()

OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,TerritoryKey,OrderLineItem,OrderQuantity,multiply
2017-01-01,2003-12-13T00:00:00Z,TO61285,529,23791,1,2,2,4
2017-01-01,2003-09-24T00:00:00Z,TO61285,214,23791,1,3,1,3
2017-01-01,2003-09-04T00:00:00Z,TO61285,540,23791,1,1,1,1
2017-01-01,2003-09-28T00:00:00Z,TO61301,529,16747,1,2,2,4
2017-01-01,2003-10-21T00:00:00Z,TO61301,377,16747,1,1,1,1
2017-01-01,2003-10-23T00:00:00Z,TO61301,540,16747,1,3,1,3
2017-01-01,2003-09-04T00:00:00Z,TO61269,215,11792,4,1,1,1
2017-01-01,2003-10-21T00:00:00Z,TO61269,229,11792,4,2,1,2
2017-01-01,2003-10-24T00:00:00Z,TO61286,528,11530,6,2,2,4
2017-01-01,2003-09-27T00:00:00Z,TO61286,536,11530,6,1,2,2


### Sales Analysis

In [0]:
# agg => aggregation
df_sales.groupby('OrderDate').agg(count('OrderNumber').alias('Total_Order')).display()

OrderDate,Total_Order
2017-01-06,151
2017-01-27,142
2017-02-26,119
2017-01-24,173
2017-06-29,172
2017-02-16,124
2017-04-09,140
2017-02-28,162
2017-03-28,149
2017-06-30,136


Databricks visualization. Run in Databricks to view.

In [0]:
df_procat.display()

ProductCategoryKey,CategoryName
1,Bikes
2,Components
3,Clothing
4,Accessories


Databricks visualization. Run in Databricks to view.

In [0]:
df_ter.display()

SalesTerritoryKey,Region,Country,Continent
1,Northwest,United States,North America
2,Northeast,United States,North America
3,Central,United States,North America
4,Southwest,United States,North America
5,Southeast,United States,North America
6,Canada,Canada,North America
7,France,France,Europe
8,Germany,Germany,Europe
9,Australia,Australia,Pacific
10,United Kingdom,United Kingdom,Europe


Databricks visualization. Run in Databricks to view.

In [0]:
df_sales.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@firststoragedatalake.dfs.core.windows.net/AdventureWorks_Sales")\
            .save()