# Capstone Transformation

In [0]:
# # create access to staging area in adls

# # Provide contaner and container info
# _storage_account = "group4adls"
# _container = "staging"

# # now form blobEnpoint
# _config_key = f"fs.azure.account.key.{_storage_account}.blob.core.windows.net"
# _blobEndpoint = f"wasbs://{_container}@{_storage_account}.blob.core.windows.net"
# _mount_point = f"/mnt/{_storage_account}/{_container}"

# dbutils.fs.mount(
#     source = _blobEndpoint,
#     mount_point = _mount_point,
#     extra_configs = {_config_key:dbutils.secrets.get(scope = "capstone_secret", key = "adls-key") }
#    )
print(_mount_point)

#### Mount raw data blob storage in adls

In [0]:
# create access to staging area in adls

# Provide contaner and container info
_storage_account = "group4adls"
_container = "raw-files"

# now form blobEnpoint
_config_key = f"fs.azure.account.key.{_storage_account}.blob.core.windows.net"
_blobEndpoint = f"wasbs://{_container}@{_storage_account}.blob.core.windows.net"
_mount_point = f"/mnt/{_storage_account}/{_container}"

dbutils.fs.mount(
    source = _blobEndpoint,
    mount_point = _mount_point,
    extra_configs = {_config_key:dbutils.secrets.get(scope = "capstone_secret", key = "adls-key") }
   )
print(_mount_point)

#### Configure access to the `staging` blob storage account to write our final transformations

In [0]:
# create access to staging area in adls

# Provide contaner and container info
_storage_account = "group4adls"
_container = "temp-for-sql-warehouse"

# now form blobEnpoint
_config_key = f"fs.azure.account.key.{_storage_account}.blob.core.windows.net"
_blobEndpoint = f"wasbs://{_container}@{_storage_account}.blob.core.windows.net"
_mount_point = f"/mnt/{_storage_account}/{_container}"

dbutils.fs.mount(
     source = _blobEndpoint,
     mount_point = _mount_point,
     extra_configs = {_config_key:dbutils.secrets.get(scope = "capstone_secret", key = "adls-key") }
    )
print(_mount_point)

#### Configure access to the SQL database

In [0]:
# Connecting to SQL DB
dwHostname = "group4sqlserver.database.windows.net" # this is the server name
dwPort = 1433 # default sql port
dwUsername = dbutils.secrets.get(scope = "capstone_secret", key = "Capstone-Username") 
dwPassword = dbutils.secrets.get(scope = "capstone_secret", key = "Capstone-Password") 
dwDatabase = "group-4-SQLdb"
connectionProperties = {
  "user" : dwUsername,
  "password" : dwPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;"
sqlDwUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4};dwJdbcExtraOptions={5}".format(dwHostname,dwPort,dwDatabase,dwUsername,dwPassword,dwJdbcExtraOptions)

## Read csv data files from Azure Data Lake Storage

In [0]:
_mount_point = '/mnt/group4adls/raw-files'
customers =  spark.read.option("header", True).format("csv").load(f"{_mount_point}/Data/customers.csv")
orders =  spark.read.format("csv").load(f"{_mount_point}/Data/orders.csv")
returns =  spark.read.option("header", True).format("csv").load(f"{_mount_point}/Data/returns.csv")
transactions =  spark.read.option("header", True).format("csv").load(f"{_mount_point}/Data/transactions.csv")
vendors =  spark.read.option("header", True).format("csv").load(f"{_mount_point}/Data/vendors.csv")

In [0]:
display(customers)

## Read tables from SQL DB

In [0]:
orders_df = spark.read.jdbc(url=sqlDwUrl, table='orders', properties=connectionProperties)
returns_df = spark.read.jdbc(url=sqlDwUrl, table='returns', properties=connectionProperties)
transactions_df = spark.read.jdbc(url=sqlDwUrl, table='transactions', properties=connectionProperties)

## Number of orders made per customer

In [0]:
num_of_orders = orders_df.groupBy("customer_id").count()
num_of_orders = num_of_orders.withColumnRenamed('count', 'no_of_orders')

display(num_of_orders)

customer_id,no_of_orders
VW-21775,8
MY-17380,16
EM-13960,20
KH-16630,14
BD-11500,23
JF-15490,14
PH-18790,16
JF-15415,12
JH-15985,14
OT-18730,10


#### SQL Demo

In [0]:
orders_df.createOrReplaceTempView("orders_table")

In [0]:
%sql

SELECT customer_id, COUNT(order_id) AS no_of_orders
FROM orders_table
GROUP BY customer_id;

customer_id,no_of_orders
VW-21775,8
MY-17380,16
EM-13960,20
KH-16630,14
BD-11500,23
JF-15490,14
PH-18790,16
JF-15415,12
JH-15985,14
OT-18730,10


## Number of returns per customer

In [0]:
# Joining order and return df
orders_returns_df = returns_df.join(orders_df,returns_df.order_id ==  orders_df.order_id,"inner")

num_of_returns = orders_returns_df.groupBy("customer_id").count()
num_of_returns = num_of_returns.withColumnRenamed('count', 'no_of_returns')

display(num_of_returns)

customer_id,no_of_returns
EM-13960,1
BD-11500,1
JF-15490,2
PH-18790,1
JH-15985,1
MG-18145,1
SO-20335,1
JK-15370,2
CB-12025,1
DS-13030,2


## Calculating total amount spent grouped by customer

In [0]:
# Joining orders and transactions df
orders_transactions_df = transactions_df.join(orders_df,transactions_df.order_id ==  orders_df.order_id,"inner")

amnt_spent = orders_transactions_df.groupBy("customer_id").sum("sales")
amnt_spent = amnt_spent.withColumnRenamed('sum(sales)', 'amnt_spent')

display(amnt_spent)

## Finding difference between earliest and latest orders

In [0]:
import pyspark.sql.functions as F

#finding max and min of order dates of each customer
orders_max_order_date = orders_df.groupby('customer_id').agg(F.max('order_purchase_date'))
orders_min_order_date = orders_df.groupby('customer_id').agg(F.min('order_purchase_date'))

#finding the difference between earliest and latest orders
order_date_max_min = orders_max_order_date.join(orders_min_order_date,"customer_id")
timeDiff = (F.unix_timestamp(F.col('max(order_purchase_date)'), "yyyy-MM-dd HH:mm:ss") - F.unix_timestamp(F.col('min(order_purchase_date)'), "yyyy-MM-dd HH:mm:ss"))
order_date_max_min = order_date_max_min.withColumn("diff", timeDiff)

## Finding average purchase frequency

In [0]:
from pyspark.sql.functions import round ,col

#finding avg_purchase_freq
final_transformation = order_date_max_min.join(num_of_orders,"customer_id")
#display(final_transformation)
final_transformation = final_transformation.withColumn('avg_purchase_freq', (final_transformation['diff'] / 86400) / (final_transformation['no_of_orders']-1) )

#cleaning up avg_purchase_freq df
final_transformation = final_transformation.drop("max(order_purchase_date)", "min(order_purchase_date)", 'diff', 'count')
final_transformation = final_transformation.select("*", round(col('avg_purchase_freq')))
final_transformation = final_transformation.drop("avg_purchase_freq")
final_transformation = final_transformation.withColumnRenamed('round(avg_purchase_freq, 0)', 'avg_purchase_freq')
display(final_transformation)

customer_id,no_of_orders,avg_purchase_freq
VW-21775,8,71.0
MY-17380,16,34.0
EM-13960,20,28.0
KH-16630,14,41.0
BD-11500,23,23.0
JF-15490,14,34.0
PH-18790,16,38.0
JF-15415,12,49.0
JH-15985,14,39.0
OT-18730,10,38.0


## Joining final customer fact table and final cleaning

In [0]:
customer_fact = final_transformation.join(num_of_returns,"customer_id", "outer")
customer_fact = customer_fact.join(amnt_spent, "customer_id")

customer_fact = customer_fact.select("*", round(col('amnt_spent'), 2))
customer_fact = customer_fact.drop("amnt_spent")
customer_fact = customer_fact.withColumnRenamed('round(amnt_spent, 2)', 'amnt_spent')

customer_fact = customer_fact.na.fill(value=0)

display(customer_fact)

## Write customer_fact table and customer_dim table to staging area(temp-for-sql-warehouse) in ADLS

In [0]:
customers_df = spark.read.jdbc(url=sqlDwUrl, table='customers', properties=connectionProperties)

In [0]:
customers_df.write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv("/mnt/group4adls/temp-for-sql-warehouse/customers_dim")
customer_fact.write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv("/mnt/group4adls/temp-for-sql-warehouse/customers_fact")

In [0]:
customer_fact.write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv("/mnt/group4adls/temp-for-sql-warehouse/customers_fact_demo")