In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

## SILVER LAYER script

### Data access using application

In [0]:

spark.conf.set("fs.azure.account.auth.type.salekartdatawarehouse.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.salekartdatawarehouse.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.salekartdatawarehouse.dfs.core.windows.net", "96af8ac4-7292-4276-8276-c875591e2f08")
spark.conf.set("fs.azure.account.oauth2.client.secret.salekartdatawarehouse.dfs.core.windows.net", "5HK8Q~Qxyyz7cfgs9qaDY-5QFKvlE~wKQ8oB3dxR")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.salekartdatawarehouse.dfs.core.windows.net", "https://login.microsoftonline.com/3a0dd556-e67b-476c-a147-6be50d13c89a/oauth2/token")

### Data Loading


#### Reading data from ADLS

In [0]:
df_calendar = spark.read.format('csv')\
    .option('header', True)\
    .option('inferSchema', True)\
    .load('abfss://bronze@salekartdatawarehouse.dfs.core.windows.net/calendar')

In [0]:
df_customer = spark.read.format('csv')\
    .option('header', True)\
    .option('inferSchema', True)\
    .load('abfss://bronze@salekartdatawarehouse.dfs.core.windows.net/customers')

In [0]:
df_products = spark.read.format('csv')\
    .option('header', True)\
    .option('inferSchema', True)\
    .load('abfss://bronze@salekartdatawarehouse.dfs.core.windows.net/products')

In [0]:
df_products_cat = spark.read.format('csv')\
    .option('header', True)\
    .option('inferSchema', True)\
    .load('abfss://bronze@salekartdatawarehouse.dfs.core.windows.net/products/products_categories.csv')

In [0]:
df_products_subcat = spark.read.format('csv')\
    .option('header', True)\
    .option('inferSchema', True)\
    .load('abfss://bronze@salekartdatawarehouse.dfs.core.windows.net/products/products_subcategories.csv')

In [0]:
df_sales = spark.read.format('csv')\
     .option('header', True)\
    .option('inferSchema', True)\
    .load('abfss://bronze@salekartdatawarehouse.dfs.core.windows.net/sales/sales*.csv')

In [0]:
df_returns = spark.read.format('csv')\
     .option('header', True)\
    .option('inferSchema', True)\
    .load('abfss://bronze@salekartdatawarehouse.dfs.core.windows.net/returns')

In [0]:
df_territories = spark.read.format('csv')\
     .option('header', True)\
    .option('inferSchema', True)\
    .load('abfss://bronze@salekartdatawarehouse.dfs.core.windows.net/territories')

### Transformations

##### Calendar Transformations

In [0]:
df_calendar = df_calendar.withColumn('Month', month(col('Date')))\
                    .withColumn('Year', year(col('Date')))
df_calendar.write.format('parquet')\
            .mode('append')\
            .option('path', 'abfss://silver@salekartdatawarehouse.dfs.core.windows.net/calendar')\
            .save()


##### Customer Transformation

In [0]:
df_customer = df_customer.withColumn('fullName', concat_ws(' ', col('Prefix'), col('FirstName'), col('LastName')))
df_customer.write.format('parquet')\
            .mode('append')\
            .option('path', 'abfss://silver@salekartdatawarehouse.dfs.core.windows.net/customers')\
            .save()

##### Products Transformations

In [0]:
df_products = df_products.withColumn('ProductSKU', split(col('ProductSKU'), '-')[0])\
        .withColumn('ProductName', split(col('ProductName'), ' ')[0])
df_products.write.format('parquet')\
    .mode('append')\
    .option('path', 'abfss://silver@salekartdatawarehouse.dfs.core.windows.net/products')\
    .save()

##### Product Categories and Subcategories

In [0]:
df_products_cat.write.format('parquet')\
            .mode('append')\
            .option('path', 'abfss://silver@salekartdatawarehouse.dfs.core.windows.net/products')\
            .save()
df_products_subcat.write.format('parquet')\
            .mode('append')\
            .option('path', 'abfss://silver@salekartdatawarehouse.dfs.core.windows.net/products')\
            .save()

Returns Transformations

In [0]:
df_returns.write.format('parquet')\
    .mode('append')\
    .option('path', 'abfss://silver@salekartdatawarehouse.dfs.core.windows.net/returns')\
    .save()

Sales Transformations

In [0]:
df_territories.write.format('parquet')\
    .mode('append')\
    .option('path', 'abfss://silver@salekartdatawarehouse.dfs.core.windows.net/territories')\
    .save()

In [0]:
df_sales_15 = df_sales_15.withColumn('StockDate', to_timestamp(col('StockDate'))).withColumn('OrderNumber', regexp_replace(col('OrderNumber'), 'S', 'T'))
df_sales_15.display()

OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,TerritoryKey,OrderLineItem,OrderQuantity
2015-01-01,2001-09-21T00:00:00Z,TO45080,332,14657,1,1,1
2015-01-01,2001-12-05T00:00:00Z,TO45079,312,29255,4,1,1
2015-01-01,2001-10-29T00:00:00Z,TO45082,350,11455,9,1,1
2015-01-01,2001-11-16T00:00:00Z,TO45081,338,26782,6,1,1
2015-01-02,2001-12-15T00:00:00Z,TO45083,312,14947,10,1,1
2015-01-02,2001-10-12T00:00:00Z,TO45084,310,29143,4,1,1
2015-01-02,2001-12-18T00:00:00Z,TO45086,314,18747,9,1,1
2015-01-02,2001-10-09T00:00:00Z,TO45085,312,18746,9,1,1
2015-01-03,2001-10-03T00:00:00Z,TO45093,312,18906,9,1,1
2015-01-03,2001-09-29T00:00:00Z,TO45090,310,29170,4,1,1


Sales Transformations

In [0]:
df_sales.write.format('parquet')\
    .mode('append')\
    .option('path', 'abfss://silver@salekartdatawarehouse.dfs.core.windows.net/sales')\
    .save()


Sale Analysis

In [0]:
df_sales.groupBy('OrderDate').agg(count(col('OrderNumber')).alias('TotalOrders')).display()

OrderDate,TotalOrders
2017-01-06,151
2017-01-27,142
2017-02-26,119
2017-01-24,173
2017-06-29,172
2017-02-16,124
2017-04-09,140
2017-02-28,162
2017-03-28,149
2017-06-30,136


Databricks visualization. Run in Databricks to view.

In [0]:
df_territories.display()

SalesTerritoryKey,Region,Country,Continent
1,Northwest,United States,North America
2,Northeast,United States,North America
3,Central,United States,North America
4,Southwest,United States,North America
5,Southeast,United States,North America
6,Canada,Canada,North America
7,France,France,Europe
8,Germany,Germany,Europe
9,Australia,Australia,Pacific
10,United Kingdom,United Kingdom,Europe


Databricks visualization. Run in Databricks to view.