In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

#SILVER LAYER SCRIPT

##DATA ACCESS USING APP

In [None]:
# Azure Data Lake OAuth configuration (SECURE)

storage_account = "destoragedatalake1"

aad_tenant = dbutils.secrets.get(scope="azure-kv", key="tenant-id")
sp_client = dbutils.secrets.get(scope="azure-kv", key="client-id")
aad_secret = dbutils.secrets.get(scope="azure-kv", key="client-secret")

spark.conf.set(
    f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net",
    "OAuth"
)

spark.conf.set(
    f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
)

spark.conf.set(
    f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net",
    sp_client
)

spark.conf.set(
    f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net",
    aad_secret
)

spark.conf.set(
    f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
    f"https://login.microsoftonline.com/{aad_tenant}/oauth2/token"
)

### DATA LOADING

### Reading Data

In [None]:
df_cal = spark.read.format('csv')\
    .option("header", True)\
    .option("inferSchema", True)\
    .load("abfss://bronze@destoragedatalake1.dfs.core.windows.net/AdventureWorks_Calendar")

In [None]:
df_cus = spark.read.format('csv')\
    .option("header", True)\
    .option("inferSchema", True)\
    .load("abfss://bronze@destoragedatalake1.dfs.core.windows.net/AdventureWorks_Customers")

In [None]:
df_procat = spark.read.format('csv')\
    .option("header", True)\
    .option("inferSchema", True)\
    .load("abfss://bronze@destoragedatalake1.dfs.core.windows.net/AdventureWorks_Product_Categories")

In [None]:
df_pro = spark.read.format('csv')\
    .option("header", True)\
    .option("inferSchema", True)\
    .load("abfss://bronze@destoragedatalake1.dfs.core.windows.net/AdventureWorks_Products")

In [None]:
df_ret = spark.read.format('csv')\
    .option("header", True)\
    .option("inferSchema", True)\
    .load("abfss://bronze@destoragedatalake1.dfs.core.windows.net/AdventureWorks_Returns")

In [None]:
df_sales = spark.read.format('csv')\
    .option("header", True)\
    .option("inferSchema", True)\
    .load("abfss://bronze@destoragedatalake1.dfs.core.windows.net/AdventureWorks_Sales*")

In [None]:
df_ter = spark.read.format('csv')\
    .option("header", True)\
    .option("inferSchema", True)\
    .load("abfss://bronze@destoragedatalake1.dfs.core.windows.net/AdventureWorks_Territories")

In [None]:
df_subcat = spark.read.format('csv')\
    .option("header", True)\
    .option("inferSchema", True)\
    .load("abfss://bronze@destoragedatalake1.dfs.core.windows.net/Product_Subcategories")

###TRANSFORMATIONS

#### Calendar

In [None]:
df_cal.display()

In [None]:
df_cal = df_cal.withColumn('Month',month(col('Date')))\
            .withColumn('Year',year(col('Date')))
df_cal.display()

In [None]:
#there are 4 types of mode : append(), overwrite(), error(), ignore()
df_cal.write.format('parquet')\
    .mode('append')\
    .option("path","abfss://silver@destoragedatalake1.dfs.core.windows.net/AdventureWorks_Calendar")\
    .save()

#### Customers

In [None]:
df_cus.display()

In [None]:
#df_cus.withColumn("fullName", concat(col("Prefix"), lit(" "),col("FirstName"), lit(" "), col("LastName"))).display()
df_cus =df_cus.withColumn('fullName',concat_ws(' ',col('Prefix'),col('FirstName'),col('LastName'))) #advance function
df_cus.display()

In [None]:
df_cus.write.format('parquet')\
    .mode('append')\
    .option("path","abfss://silver@destoragedatalake1.dfs.core.windows.net/AdventureWorks_Customers")\
    .save()

### Sub Categories

In [None]:
df_subcat.display()

In [None]:
#there are 4 types of mode : append(), overwrite(), error(), ignore()
df_subcat.write.format('parquet')\
    .mode('append')\
    .option("path","abfss://silver@destoragedatalake1.dfs.core.windows.net/AdventureWorks_SubCategories")\
    .save()

### Products

In [None]:
df_pro.display()

In [None]:
df_pro.withColumn('ProductSKU',split(col('ProductSKU'),'-')[0])\
    .withColumn('ProductName',split(col('ProductName'),' ')[0]).display()

In [None]:
df_pro.write.format('parquet')\
    .mode('append')\
    .option("path","abfss://silver@destoragedatalake1.dfs.core.windows.net/AdventureWorks_Products")\
    .save()

### Returns

In [None]:
df_ret.display()

In [None]:
df_ret.write.format('parquet')\
    .mode('append')\
    .option("path","abfss://silver@destoragedatalake1.dfs.core.windows.net/AdventureWorks_Returns")\
    .save()

### Territiories

In [None]:
df_ter.display()

In [None]:
df_ter.write.format('parquet')\
    .mode('append')\
    .option("path","abfss://silver@destoragedatalake1.dfs.core.windows.net/AdventureWorks_Territories")\
    .save()

### Sales

In [None]:
df_sales.display()

In [None]:
df_sales = df_sales.withColumn('StockDate',to_timestamp('StockDate'))

In [None]:
df_sales = df_sales.withColumn('OrderNumber',regexp_replace(col('OrderNumber'),'S','T'))

In [None]:
df_sales = df_sales.withColumn('multiply',col('OrderLineItem')*col('OrderQuantity'))

In [None]:
df_sales.display()

In [None]:
df_ter.write.format('parquet')\
    .mode('append')\
    .option("path","abfss://silver@destoragedatalake1.dfs.core.windows.net/AdventureWorks_Territories")\
    .save()

##Sales Analysis


In [None]:
df_sales.groupBy('OrderDate').agg(count('OrderNumber').alias('Total_Order')).display()

In [None]:
df_procat.display()

In [None]:
df_ter.display()