In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *



### Data Access

In [None]:
spark.conf.set("fs.azure.account.auth.type.storage_account.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.storage_account.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.storage_account.dfs.core.windows.net", "16690690-c6b4-40bf-98cb-531fea249317")
spark.conf.set("fs.azure.account.oauth2.client.secret.storage_account.dfs.core.windows.net", "credential")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.storage_account.dfs.core.windows.net", "https://login.microsoftonline.com/directory_id/oauth2/token")

### Data Loading

#### Reading Data

In [None]:
df_cal = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load("abfss://bronze@awstoragedsk.dfs.core.windows.net/AdventureWorks_Calendar")

In [None]:
df_cus = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load("abfss://bronze@awstoragedsk.dfs.core.windows.net/AdventureWorks_Customers")

In [None]:
df_prod_cat = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load("abfss://bronze@awstoragedsk.dfs.core.windows.net/AdventureWorks_Product_Categories")

In [None]:
df_prod_sub_cat = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load("abfss://bronze@awstoragedsk.dfs.core.windows.net/AdventureWorks_Product_Subcategories")

In [None]:
df_prod = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load("abfss://bronze@awstoragedsk.dfs.core.windows.net/AdventureWorks_Products")

In [None]:
df_sales = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load("abfss://bronze@awstoragedsk.dfs.core.windows.net/AdventureWorks_Sales*")

In [None]:
df_terr = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load("abfss://bronze@awstoragedsk.dfs.core.windows.net/AdventureWorks_Territories")

In [None]:
df_ret = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load("abfss://bronze@awstoragedsk.dfs.core.windows.net/AdventureWorks_returns")

### Transformation

#### Calendar

In [None]:
df_cal = df_cal.withColumn("Month", month(col('Date')))\
               .withColumn("Year", year(col('Date')))

In [None]:
df_cal.write.format('parquet')\
            .mode('append')\
            .option("path", "abfss://silver@awstoragedsk.dfs.core.windows.net/AdventureWorks_Calendar")\
            .save()

#### Customer

In [None]:
df_cus.display()

In [None]:
df_cus.withColumn("fullName", concat(col("Prefix"), lit(' '), col("FirstName"), lit(' '), col("LastName"))).display()

In [None]:
df_cus = df_cus.withColumn('fullName', concat_ws(' ', col('Prefix'), col('FirstName'), col('LastName')))

In [None]:
df_cus.display()

In [None]:
df_cal.write.format('parquet')\
            .mode('append')\
            .option("path", "abfss://silver@awstoragedsk.dfs.core.windows.net/AdventureWorks_Customers")\
            .save()

#### Subcategories

In [None]:
df_prod_sub_cat.display()

In [None]:
df_prod_sub_cat.write.format('parquet')\
    .mode('append')\
    .option("path", "abfss://silver@awstoragedsk.dfs.core.windows.net/AdventureWorks_Product_Subcategories")\
    .save()

#### Products

In [None]:
df_prod.display()

In [None]:
df_prod = df_prod.withColumn('ProductSKU', split(col('ProductSKU'),'-').getItem(0))\
                 .withColumn('ProductName', split(col('ProductName'),' ').getItem(0))

In [None]:
df_prod.display()

In [None]:
df_prod.write.format('parquet')\
    .mode('append')\
    .option("path", "abfss://silver@awstoragedsk.dfs.core.windows.net/AdventureWorks_Products")\
    .save()

#### Returns

In [None]:
df_ret.display()

In [None]:
df_ret.write.format('parquet')\
    .mode('append')\
    .option("path", "abfss://silver@awstoragedsk.dfs.core.windows.net/AdventureWorks_Returns")\
    .save()

#### Territories

In [None]:
df_terr.display()

In [None]:
df_terr.write.format('parquet')\
    .mode('append')\
    .option("path", "abfss://silver@awstoragedsk.dfs.core.windows.net/AdventureWorks_Territories")\
    .save()

#### Sales

In [None]:
df_sales.display()

In [None]:
#1. Convert Date to Timestamp
df_sales = df_sales.withColumn("StockDate", to_timestamp(col("StockDate")))

In [None]:
#2. Replace S to T in OrderNumber
df_sales = df_sales.withColumn('OrderNumber', regexp_replace(col('OrderNumber'), 'S', 'T'))

In [None]:
#3. Multipy the OrderQuantity and OrderLineItem
df_sales = df_sales.withColumn('multiply', col('OrderLineItem')*col('OrderQuantity'))

In [None]:
df_sales.display()

#### Sales Analysis

In [None]:
df_sales.groupby('OrderDate').agg(count("OrderNumber").alias('total_order')).display()

In [None]:
df_prod_cat.display()

In [None]:
df_terr.display()

In [None]:
df_sales.write.format('parquet')\
    .mode('append')\
    .option("path", "abfss://silver@awstoragedsk.dfs.core.windows.net/AdventureWorks_Sales")\
    .save()