In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Silver Layer Script

### Data Access Using App

In [0]:
spark.conf.set("fs.azure.account.auth.type.<your-storage-account>.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.<your-storage-account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.<your-storage-account>.dfs.core.windows.net", "<your-application(client)-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret.<your-storage-account>.dfs.core.windows.net", "<your-secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.<your-storage-account>.dfs.core.windows.net", "https://login.microsoftonline.com/<your-directory(tenant)-id>/oauth2/token")

### Data Loading

In [0]:
calendar = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@<your-storage-account>.dfs.core.windows.net/adventure_works_calendar")
customers = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@<your-storage-account>.dfs.core.windows.net/adventure_works_customers")
categories = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@<your-storage-account>.dfs.core.windows.net/adventure_works_product_categories")
subcatergories = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@<your-storage-account>.dfs.core.windows.net/adventure_works_product_subcategories")
products = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@<your-storage-account>.dfs.core.windows.net/adventure_works_products")
returns = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@<your-storage-account>.dfs.core.windows.net/adventure_works_returns")
sales_2015 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@<your-storage-account>.dfs.core.windows.net/adventure_works_sales_2015")
sales_2016 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@<your-storage-account>.dfs.core.windows.net/adventure_works_sales_2016")
sales_2017 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@<your-storage-account>.dfs.core.windows.net/adventure_works_sales_2017")
territories = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@<your-storage-account>.dfs.core.windows.net/adventure_works_territories")

### Transformations

_1. Calendar_

In [0]:
calendar.display()

In [0]:
updatedCalendar = calendar \
    .withColumn("Day", day(col('Date'))) \
    .withColumn("Month", month(col('Date'))) \
    .withColumn("Year", year(col('Date')))

updatedCalendar.display()

In [0]:
updatedCalendar \
    .write.format("parquet") \
    .mode("append") \
    .option("path", "abfss://silver@<your-storage-account>.dfs.core.windows.net/adventure_works_calendar") \
    .save()

_2. Customers_

In [0]:
customers.display()

In [0]:
updatedCustomers = customers.withColumn('Full Name', concat_ws(' ', col('Prefix'), col('FirstName'), col('lastName')))

In [0]:
updatedCustomers.display()

In [0]:
updatedCustomers \
    .write.format('parquet')\
    .mode('append')\
    .option("path","abfss://silver@<your-storage-account>.dfs.core.windows.net/adventure_works_customers")\
    .save()

_3. Categories_

In [0]:
categories.display()

In [0]:
categories \
    .write.format('parquet')\
    .mode('append')\
    .option("path","abfss://silver@<your-storage-account>.dfs.core.windows.net/adventure_works_categories")\
    .save()

_4. Subcategories_

In [0]:
subcatergories.display()

In [0]:
subcatergories \
    .write.format('parquet')\
    .mode('append')\
    .option("path","abfss://silver@<your-storage-account>.dfs.core.windows.net/adventure_works_subcategories")\
    .save()

_5. Products_

In [0]:
products.display()

In [0]:
updatedProducts = products \
    .withColumn('ProductSKU', split(col('ProductSKU'), '-')[0]) \
    .withColumn('ProductName', split(col('ProductName'), ' ')[0])

In [0]:
updatedProducts.display()

In [0]:
updatedProducts \
    .write.format('parquet')\
    .mode('append')\
    .option("path","abfss://silver@<your-storage-account>.dfs.core.windows.net/adventure_works_products")\
    .save()

_6. Returns_

In [0]:
returns.display()

In [0]:
returns \
    .write.format('parquet')\
    .mode('append')\
    .option("path","abfss://silver@<your-storage-account>.dfs.core.windows.net/adventure_works_returns")\
    .save()

_7. Territories_

In [0]:
territories.display()

In [0]:
territories \
    .write.format('parquet')\
    .mode('append')\
    .option("path","abfss://silver@<your-storage-account>.dfs.core.windows.net/adventure_works_territories")\
    .save()

_8. Sales_

In [0]:
sales_2015.display()
sales_2016.display()
sales_2017.display()

In [0]:
sales = sales_2015.union(sales_2016).union(sales_2017)

In [0]:
sales.display()

In [0]:
sales.count()

In [0]:
updatedSales = sales.withColumn('StockDate', to_timestamp('StockDate'))

In [0]:
updatedSales = updatedSales.withColumn('OrderNumber', regexp_replace(col('OrderNumber'), 'S', 'T'))

In [0]:
updatedSales = updatedSales.withColumn('multiply', col('OrderLineItem') * col('OrderQuantity'))

In [0]:
updatedSales.display()

In [0]:
updatedSales \
    .write.format('parquet') \
    .mode('append') \
    .option("path","abfss://silver@<your-storage-account>.dfs.core.windows.net/adventure_works_sales") \
    .save()

### Sales Analysis

In [0]:
updatedSales.groupBy("OrderDate").agg(count("OrderNumber").alias("Total Orders")).orderBy("Total Orders", ascending=False).display()

In [0]:
categories.display()