###Data access using APP:---allow databricks to copy the data from ADLS
- Pull the crendentials and employee the app(service principle) in our code.
- storage account: storageaccntproj1
- client id/app id:
- client secret: 
- tenant id/directory id: 
- 
- Note: To store the secret, use keyvault and secret scope to access the secret from keyvault.

In [0]:
#syntax:

# service_credential = dbutils.secrets.get(scope="<secret-scope>",key="<service-credential-key>")

# spark.conf.set("fs.azure.account.auth.type.<storage-account>.dfs.core.windows.net", "OAuth")
# spark.conf.set("fs.azure.account.oauth.provider.type.<storage-account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
# spark.conf.set("fs.azure.account.oauth2.client.id.<storage-account>.dfs.core.windows.net", "<application-id>")
# spark.conf.set("fs.azure.account.oauth2.client.secret.<storage-account>.dfs.core.windows.net", service_credential)
# spark.conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

### Data loding:
from ADLS to databricks notebook

In [0]:
# read AdventureWorks_Calendar data

df_calender=spark.read.format('csv')\
    .option('header','true')\
    .option('inferSchema','true')\
    .load('abfss://bronze@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Calendar')

df_Customers=spark.read.format('csv')\
    .option('header','true')\
    .option('inferSchema','true')\
    .load('abfss://bronze@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Customers')

df_Product_Categories=spark.read.format('csv')\
    .option('header','true')\
    .option('inferSchema','true')\
    .load('abfss://bronze@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Product_Categories')

df_Products=spark.read.format('csv')\
    .option('header','true')\
    .option('inferSchema','true')\
    .load('abfss://bronze@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Products')

df_Returns=spark.read.format('csv')\
    .option('header','true')\
    .option('inferSchema','true')\
    .load('abfss://bronze@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Returns')

df_Sales=spark.read.format('csv')\
    .option('header','true')\
    .option('inferSchema','true')\
    .load('abfss://bronze@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Sales*') 
    #* it will load all files with AdventureWorks_Sales in the name

df_Territories=spark.read.format('csv')\
    .option('header','true')\
    .option('inferSchema','true')\
    .load('abfss://bronze@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Territories') 

df_Subcategories=spark.read.format('csv')\
    .option('header','true')\
    .option('inferSchema','true')\
    .load('abfss://bronze@storageaccntproj1.dfs.core.windows.net/Product_Subcategories')

### Transformations

#### Transformation-1: Calender

In [0]:
#TRANSFORMATION-1:
#------------------------------------------------------------
# Add Month_name and year to the data
# Write the data in the parquet format to ADLS

from pyspark.sql.functions import*
from pyspark.sql.types import*

#You need to assign the transformed DataFrame to a variable before writing:

df_calender_transformed = df_calender.withColumn("Month_Name", date_format(col("Date"), "MMM")) \
                                     .withColumn("Year", year(col("Date")))

df_calender_transformed.display()


### Now, push this tranformed data into silver layer in azure

In [0]:
#always check schema first before writing:
df_calender_transformed.printSchema()

#write to ADLS
df_calender_transformed.write.format("parquet")\
        .mode("overwrite")\
        .option("path", "abfss://silver@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Calendar")\
        .save()

#modes: 
# append        ->add at the end
# overwrite     ->replace data
# ignore        ->ignore any update
# error         ->throws error when tried write data

#reading the updated table
silver_path = "abfss://silver@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Calendar"
df_silver = spark.read.format("parquet").load(silver_path)
df_silver.display()

#### Transformation-2 Customer data

In [0]:
# 1. String tranformation: create full name -use concat function

#----old approach
#df_Customers.withColumn("Full_Name", concat(col("Prefix"),lit(' '),col("FirstName"),lit(' '),col("LastName"))).display()

#--new approach--conact_ws (concatenation ws-with seperator)
df_customer_transformed=df_Customers.withColumn("Full_Name", concat_ws(' ',col("Prefix"),col("FirstName"),col("LastName")))

df_customer_transformed.display()

#write tranformed data into silver layer
df_customer_transformed.write.format("parquet")\
                             .mode("overwrite")\
                             .option("path", "abfss://silver@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Customers")\
                             .save()

#reading the updated table
silver_path = "abfss://silver@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Customers"
df_silver = spark.read.format("parquet").load(silver_path)
df_silver.display()

In [0]:
# read the products categories and write it to the silver layer

df_Product_Categories.write.format("parquet")\
                             .mode("overwrite")\
                             .option("path", "abfss://silver@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Product_Categories")\
                             .save()
                
#reading the updated table
silver_path = "abfss://silver@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Product_Categories"
df_silver = spark.read.format("parquet").load(silver_path)
df_silver.display()

#### products

In [0]:
#reading products table
df_Products= spark.read.format('csv')\
                .option('header','true')\
                .option('inferSchema','true')\
                .load('abfss://bronze@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Products')

df_Products.display()

In [0]:
#transformation-1 for products: letter before the hyphen from the product SKU column.

df_Products_transformed=df_Products.withColumn("ProductSKU",split(col("ProductSKU"),"-")[0])\
                                   .withColumn("ProductName", split(col("ProductName"), " ")[0])

df_Products_transformed.display()

#write to the Silver layer

df_Products_transformed.write.format("parquet")\
                             .mode("overwrite")\
                             .option("path", "abfss://silver@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Products")\
                             .save()

#reading the updated table
silver_path = "abfss://silver@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Products"
df_silver = spark.read.format("parquet").load(silver_path)
df_silver.display()


In [0]:
# read the subcategories, returns and territories from bronze and write them to silver layer

from pyspark.sql.functions import*
from pyspark.sql.types import *

#subcategories
df_Subcategories= spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://bronze@storageaccntproj1.dfs.core.windows.net/Product_Subcategories")\
    .write.format("parquet") \
    .mode("overwrite") \
    .option("path", "abfss://silver@storageaccntproj1.dfs.core.windows.net/Product_Subcategories") \
    .save()

df_subcat= spark.read.format('parquet')\
                     .option("header", True)\
                     .option("inferSchema", True)\
                     .load("abfss://silver@storageaccntproj1.dfs.core.windows.net/Product_Subcategories")

df_subcat.display()

#returns
df_Returnss= spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://bronze@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Returns")\
    .write.format("parquet") \
    .mode("overwrite") \
    .option("path", "abfss://silver@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Returns") \
    .save()

df_return= spark.read.format('parquet')\
                     .option("header", True)\
                     .option("inferSchema", True)\
                     .load("abfss://silver@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Returns")

df_return.display()

#AdventureWorks_Territories

df_territories= spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://bronze@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Territories")\
    .write.format("parquet") \
    .mode("overwrite") \
    .option("path", "abfss://silver@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Territories") \
    .save()

df_terr= spark.read.format('parquet')\
                     .option("header", True)\
                     .option("inferSchema", True)\
                     .load("abfss://silver@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Territories")

df_terr.display()


#### Cummulative sales of 2015-2017

In [0]:
df_Sales.display()

In [0]:
# 1. convert stodate date to timestamp
# 2. replace S with T in orderNumber column
# 3. Add a column called Multiply which is the product of OrderLineItem and OrderQuantity

df_Sales= df_Sales.withColumn("StockDate", to_timestamp(col("StockDate")))\
                  .withColumn("OrderNumber", regexp_replace(col("OrderNumber"), "S", "T"))\
                  .withColumn("Multiply", col("OrderLineItem")*col("OrderQuantity"))


df_Sales.display()

#write the sales data to the silver layer

df_Sales.write.format("parquet")\
                             .mode("overwrite")\
                             .option("path", "abfss://silver@storageaccntproj1.dfs.core.windows.net/AdventureWorks_Sales")\
                             .save()


#### Sales Analysis

In [0]:
# aggregation 1: How many orders per day.

df_Sales.groupBy("orderDate").agg(count("OrderNumber")).alias("Total_orders_per_day").display()

Databricks visualization. Run in Databricks to view.

In [0]:
# pie chart for product category

df_Product_Categories.display()

Databricks visualization. Run in Databricks to view.