### 1. Setting up spark environment

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


storage_account="accstokad"
application_id="082c0a69-e870-407c-960c-9d7b86a75bf9"
directory_id="26af9d76-35fe-404a-b312-869c37aec9c7"
bronze_container="bronze"
silver_container="silver"

bronze_path=f"abfss://{bronze_container}@{storage_account}.dfs.core.windows.net/"
silver_path=f"abfss://{silver_container}@{storage_account}.dfs.core.windows.net/"


### 2. configuring storage account

In [0]:
service_credential = dbutils.secrets.get(scope="secret-scope", key="DatabricksAppRegSecret")

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
               f"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", service_credential)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
               f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

### 3. Creating DataFrames on raw (bronze layer) files

In [0]:
def create_dataframe(df_Name,file_path):
    print(f"Creating Dataframe for : {df_Name}")
    df=spark.read.format('csv')\
        .option('header',True) \
            .option('inferSchema',True)\
                .load(f"{bronze_path}{file_path}")
    return df

In [0]:
def writeToSilver(df_name,file_format,file_mode,file_path):
    df_name.coalesce(1).write \
        .mode(file_mode)\
            .format(file_format)\
                .save(f"{silver_path}{file_path}")
    print(f"Dataframe {df_name} saved at {silver_path}{file_path}")
    return True


### Calender data transformation and loading to the silver layer

In [0]:
df_AdventureWorks_Calendar=create_dataframe("AdventureWorks_Calendar","AdventureWorks_Calendar/AdventureWorks_Calendar.csv")

In [0]:
df_AdventureWorks_Calendar=df_AdventureWorks_Calendar.withColumn("Month", month(col("Date"))) \
    .withColumn("Year",year(col("Date")))

df_AdventureWorks_Calendar.show()

In [0]:
flag=writeToSilver(df_AdventureWorks_Calendar,"parquet","overwrite","AdventureWorks_Calendar/")
write_status(flag)


### Customer data tranformation and loading to the silver layer

In [0]:

df_AdventureWorks_Customers=create_dataframe("AdventureWorks_Customers","AdventureWorks_Customers/AdventureWorks_Customers.csv")


In [0]:
#there are two aproaches to use concat function 

df_AdventureWorks_Customers=df_AdventureWorks_Customers.withColumn("FullName",concat(col("Prefix"), lit(" "),col("FirstName") ,lit(" "),col("LastName")))

df_AdventureWorks_Customers=df_AdventureWorks_Customers.withColumn("FullName",concat_ws(" ",col("Prefix"), col("FirstName"), col("LastName")))


In [0]:
df_AdventureWorks_Customers.display()

In [0]:
flag=writeToSilver(df_AdventureWorks_Customers,"parquet","overwrite","AdventureWorks_Customers/")


### Product Categories tranformation and loading to the silver layer

In [0]:
df_AdventureWorks_Product_Categories=create_dataframe("AdventureWorks_Product_Categories","AdventureWorks_Product_Categories/AdventureWorks_Product_Categories.csv")

df_AdventureWorks_Product_Categories.display()

In [0]:
flag=writeToSilver(df_AdventureWorks_Product_Categories,"parquet","overwrite","AdventureWorks_Product_Categories/")

### subcategories data tranformation and loading to the silver layer

In [0]:
df_AdventureWorks_Product_Subcategories=create_dataframe("AdventureWorks_Product_Subcategories","AdventureWorks_Product_Subcategories/AdventureWorks_Product_Subcategories.csv")

In [0]:
flag=writeToSilver(df_AdventureWorks_Product_Subcategories,"parquet","overwrite","AdventureWorks_Product_Subcategories/")

### product data tranformation and loading to the silver layer

In [0]:
df_AdventureWorks_Products=create_dataframe("AdventureWorks_Products","AdventureWorks_Products/AdventureWorks_Products.csv")

In [0]:
df_AdventureWorks_Products.display()

In [0]:
df_AdventureWorks_Products=df_AdventureWorks_Products.withColumn("ProductSKU",split(col("ProductSKU"),'-')[0]) \
    .withColumn("ProductName",split(col("ProductName"),',')[0])



In [0]:
df_AdventureWorks_Products.display()

In [0]:
flag=writeToSilver(df_AdventureWorks_Products,"parquet","overwrite","AdventureWorks_Products/")

### Returns data transformation and loading

In [0]:
df_AdventureWorks_Returns=create_dataframe("AdventureWorks_Returns","AdventureWorks_Returns/AdventureWorks_Returns.csv")

In [0]:
flag=writeToSilver(df_AdventureWorks_Returns,"parquet","overwrite","AdventureWorks_Returns/")

### Territories transformation and loading

In [0]:
df_AdventureWorks_Territories=create_dataframe("AdventureWorks_Territories","AdventureWorks_Territories/AdventureWorks_Territories.csv")

In [0]:
flag=writeToSilver(df_AdventureWorks_Territories,"parquet","overwrite","AdventureWorks_Territories/")

### Sales

In [0]:
df_AdventureWorks_Sales_2015=create_dataframe("AdventureWorks_Sales_2015","AdventureWorks_Sales_2015/AdventureWorks_Sales_2015.csv")

In [0]:
df_AdventureWorks_Sales_2016=create_dataframe("AdventureWorks_Sales_2016","AdventureWorks_Sales_2016/AdventureWorks_Sales_2016.csv")

In [0]:
df_AdventureWorks_Sales_2017=create_dataframe("AdventureWorks_Sales_2017","AdventureWorks_Sales_2017/AdventureWorks_Sales_2017.csv")

In [0]:
df__AdventureWorks_Sales=df_AdventureWorks_Sales_2015.unionByName(df_AdventureWorks_Sales_2016).unionByName(df_AdventureWorks_Sales_2017)

In [0]:
# checking Data Lekage or Drop

a=(df_AdventureWorks_Sales_2017.count())
b=(df_AdventureWorks_Sales_2016.count())
c=(df_AdventureWorks_Sales_2015.count())

d=(df__AdventureWorks_Sales.count())

print((a+b+c)==d)

In [0]:
df__AdventureWorks_Sales.display()

In [0]:
#change date column to timestamp

df__AdventureWorks_Sales=df__AdventureWorks_Sales.withColumn("StockDate",to_timestamp(col("StockDate")))



In [0]:
#replacing OrderNumber column value starting from S to letter T

df__AdventureWorks_Sales=df__AdventureWorks_Sales.withColumn("OrderNumber", regexp_replace(col("OrderNumber"), 'S', 'T'))

In [0]:
df__AdventureWorks_Sales.display()

In [0]:
flag=writeToSilver(df__AdventureWorks_Sales,"parquet","overwrite","AdventureWorks_Sales/")

### Sales Analysis

In [0]:
#total orders per date

df__AdventureWorks_Sales.groupBy("OrderDate").agg(count(col("OrderNumber")).alias("OrderCount")).orderBy(col("OrderCount").desc()).display()

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.