In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

**Silver Layer Transformation**

Data Access Using App

In [0]:
# service_credential = dbutils.secrets.get(scope="<secret-scope>",key="<service-credential-key>")

spark.conf.set("fs.azure.account.auth.type.dedatalakegen.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.dedatalakegen.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.dedatalakegen.dfs.core.windows.net", "{client_id}")
spark.conf.set("fs.azure.account.oauth2.client.secret.dedatalakegen.dfs.core.windows.net", "{Client_secret}")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.dedatalakegen.dfs.core.windows.net", "https://login.microsoftonline.com/{Tenant_ID}/oauth2/token")

Data Loading

Read Data from data lake

In [0]:
df_cal = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("abfss://bronze@dedatalakegen.dfs.core.windows.net/AdventureWorks_Calendar")



In [0]:
df_cus = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("abfss://bronze@dedatalakegen.dfs.core.windows.net/AdventureWorks_Customers")

In [0]:
df_prod_catog = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("abfss://bronze@dedatalakegen.dfs.core.windows.net/AdventureWorks_Product_Categories")

In [0]:
df_products = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("abfss://bronze@dedatalakegen.dfs.core.windows.net/AdventureWorks_Products")

In [0]:
df_returns = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("abfss://bronze@dedatalakegen.dfs.core.windows.net/AdventureWorks_Returns")

In [0]:
df_sale_2015 = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("abfss://bronze@dedatalakegen.dfs.core.windows.net/AdventureWorks_Sales_2015")

In [0]:
df_sale_2016 = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("abfss://bronze@dedatalakegen.dfs.core.windows.net/AdventureWorks_Sales_2016")

In [0]:
df_sale_2017 = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("abfss://bronze@dedatalakegen.dfs.core.windows.net/AdventureWorks_Sales_2017")

In [0]:
df_territories = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("abfss://bronze@dedatalakegen.dfs.core.windows.net/AdventureWorks_Territories")

In [0]:
df_prod_subcategory = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("abfss://bronze@dedatalakegen.dfs.core.windows.net/Product_Subcategories")

**Transformation of data to silver**

Calender

In [0]:
from pyspark.sql.functions import month, year, col
calender_frame = df_cal.withColumn("Month", month(col('Date')))\
                        .withColumn("Year", year(col('Date')))
calender_frame.display()

In [0]:
df_cal.write.format("parquet")\
    .mode("append")\
    .option("path", "abfss://silver@dedatalakegen.dfs.core.windows.net/AdventureWorks_Calendar")\
    .save()

Customer

In [0]:
df_cus = df_cus.withColumn("Full Name", concat_ws(" ", col('Prefix'),col("FirstName"), col("LastName")))

In [0]:
# df_cus.display()

df_cus.write.format("parquet")\
    .mode("append")\
    .option("path", "abfss://silver@dedatalakegen.dfs.core.windows.net/AdventureWorks_Customers")\
    .save()

Product Categories

In [0]:
# df_prod_catog.display()
df_prod_catog.write.format("parquet")\
    .mode("append")\
    .option("path", "abfss://silver@dedatalakegen.dfs.core.windows.net/AdventureWorks_Product_Categories")\
    .save()


Product Subcategory

In [0]:
# df_prod_subcategory.display()
df_prod_subcategory.write.format("parquet")\
    .mode("append")\
    .option("path", "abfss://silver@dedatalakegen.dfs.core.windows.net/Product_Subcategories")\
    .save()

Products

In [0]:
# df_products.display()
df_products = df_products.withColumn("ProductSKU", split(col("ProductSKU"), "-")[0])\
    .withColumn("ProductName", split(col("ProductName"), " ")[0])
df_products.display()
               

In [0]:
df_products.write.format("parquet")\
    .mode("append")\
    .option("path", "abfss://silver@dedatalakegen.dfs.core.windows.net/AdventureWorks_Products")\
    .save()

Returns

In [0]:
df_returns.write.format("parquet")\
    .mode("append")\
    .option("path", "abfss://silver@dedatalakegen.dfs.core.windows.net/AdventureWorks_Returns")\
    .save()

Territories

In [0]:
df_territories.write.format("parquet")\
    .mode("append")\
    .option("path", "abfss://silver@dedatalakegen.dfs.core.windows.net/AdventureWorks_Territories")\
    .save()

Sales


Fetch bronze sales data

In [0]:
# df_sale_2015.display()
# df_sale_2016.display()
# df_sale_2017.display()

df_sales = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("abfss://bronze@dedatalakegen.dfs.core.windows.net/AdventureWorks_Sales_*")

In [0]:
# df_sales.display()

df_sales = df_sales.withColumn("StockDate",to_timestamp(col("StockDate")))
df_sales = df_sales.withColumn("OrderNumber", regexp_replace(col("OrderNumber"),"S","T"))
df_sales = df_sales.withColumn("Multiply", col("OrderQuantity")*col("OrderLineItem"))

df_sales.display()

In [0]:
df_sales.write.format("parquet")\
    .mode("append")\
    .option("path", "abfss://silver@dedatalakegen.dfs.core.windows.net/AdventureWorks_Sales")\
    .save()

Data Analytics

In [0]:
df_sales.groupBy("OrderDate").agg(count("OrderNumber").alias("Total_orders")).display()

In [0]:
df_prod_catog.display()