In [0]:
# Define your storage account name and key
storage_account_name = "retailanalytics"
storage_account_key = "****"

# Define the container name and mount point
container_name = "staging-data"
mount_point = "/mnt/staging"

# Mount the storage account
dbutils.fs.mount(
    source = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/",
    mount_point = mount_point,
    extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}
)

# List the files in the mounted directory to verify
display(dbutils.fs.ls(mount_point))

path,name,size,modificationTime
dbfs:/mnt/staging/part-merged.csv,part-merged.csv,10574531,1719678194000


In [0]:
df = spark.read.csv("/mnt/staging/part-merged.csv")

In [0]:
%sql
-- create database in hive metastore catalog
CREATE DATABASE IF NOT EXISTS olistdatabase;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS olistdatabase.olisttable
(
id STRING,
order_status STRING,
order_product_value DOUBLE,
order_freight_charge DOUBLE,
order_product_qty INT,
customer_city STRING,
customer_state STRING,
customer_zipcode_prefix INT,
product_name_len INT,
product_description_len INT,
product_photos_qty INT,
product_review_score INT,
order_placed_at TIMESTAMP,
order_approved_at TIMESTAMP,
order_delivered_at TIMESTAMP
)
USING CSV
OPTIONS
(
header='false',
timestampFormat='dd-MM-yyyy HH:mm'
);

In [0]:
%sql
INSERT OVERWRITE olistdatabase.olisttable
SELECT 
    _c0 AS id,
    _c1 AS order_status,
    CAST(_c2 AS DOUBLE) AS order_product_value,
    CAST(_c3 AS DOUBLE) AS order_freight_charge,
    CAST(_c4 AS DOUBLE) AS order_product_qty,
    _c5 AS customer_city,
    _c6 AS customer_state,
    CAST(_c7 AS INT) AS customer_zipcode_prefix,
    CAST(_c8 AS INT) AS product_name_len,
    CAST(_c9 AS INT) AS product_description_len,
    CAST(_c10 AS INT) AS product_photos_qty,
    CAST(_c11 AS INT) AS product_review_score,
    to_timestamp(_c12, 'dd-MM-yyyy HH:mm') AS order_placed_at,
    to_timestamp(_c13, 'dd-MM-yyyy HH:mm') AS order_approved_at,
    to_timestamp(_c14, 'dd-MM-yyyy HH:mm') AS order_delivered_at
FROM csv.`/mnt/staging/part-merged.csv`;


In [0]:
%sql
select * from olistdatabase.olisttable limit 5

id,order_status,order_product_value,order_freight_charge,order_product_qty,customer_city,customer_state,customer_zipcode_prefix,product_name_len,product_description_len,product_photos_qty,product_review_score,order_placed_at,order_approved_at,order_delivered_at
1,delivered,79.0,17.8,1,Luziania,GO,728,50,201,2,5,2017-10-02T10:56:00Z,2017-10-02T11:07:00Z,2017-10-10T21:25:00Z
2,delivered,119.9,27.16,1,Joinville,SC,892,50,511,3,5,2018-07-24T20:41:00Z,2018-07-26T03:24:00Z,2018-08-07T15:27:00Z
3,delivered,519.99,41.69,1,Serra,ES,291,48,1156,2,1,2018-08-08T08:38:00Z,2018-08-08T08:55:00Z,2018-08-17T18:06:00Z
4,delivered,29.5,17.92,1,RIO DE JANEIRO,RJ,222,21,207,2,4,2017-11-18T19:28:00Z,2017-11-18T19:45:00Z,2017-12-02T00:28:00Z
5,delivered,26.77,23.11,1,Sao Paulo,SP,40,41,451,1,5,2018-02-13T21:18:00Z,2018-02-13T22:20:00Z,2018-02-16T18:17:00Z


In [0]:
## Step15: Read the Hive table from the Hive database into a Spark DataFrame

df = spark.table("olistdatabase.olisttable")
#df.display(); 
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_product_value: double (nullable = true)
 |-- order_freight_charge: double (nullable = true)
 |-- order_product_qty: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_zipcode_prefix: integer (nullable = true)
 |-- product_name_len: integer (nullable = true)
 |-- product_description_len: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_review_score: integer (nullable = true)
 |-- order_placed_at: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_at: timestamp (nullable = true)



In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from functools import reduce

In [0]:
### Step16: Calculate the order metrics

df = df.withColumn("order_sales", col("order_product_value") * col("order_product_qty"))
df = df.withColumn("order_approval_time", col("order_approved_at") - col("order_placed_at"))
df = df.withColumn("order_delivery_time", col("order_delivered_at") - col("order_placed_at"))
df = df.withColumn("order_placed_date", F.to_date(col("order_placed_at")))
df = df.withColumn("order_placed_weekdate", F.trunc(col("order_placed_at"), "week"))
#df.display();

In [0]:
## ---------------------- WEEKLY INSIGHTS ------------------------

# Compute the historical weekly insights (We will need this values in further steps.)

grp_weekdate = df.groupBy(col("order_placed_weekdate"))
grp_city_weekdate = df.groupBy(col("customer_city"), col("order_placed_weekdate"))
grp_state_weekdate = df.groupBy(col("customer_state"), col("order_placed_weekdate"))

In [0]:
## Step18:

## Total sales
grp_date_total_sales = grp_weekdate.agg(F.sum("order_sales").alias("total_sales")).orderBy(col("order_placed_weekdate").asc())

## Total sales per city
grp_city_date_total_sales = grp_city_weekdate.agg(F.sum("order_sales").alias("total_sales")).orderBy(col("customer_city").asc(),
col("order_placed_weekdate").asc())

##Total sales per state
grp_state_date_total_sales = grp_state_weekdate.agg(F.sum("order_sales").alias("total_sales")).orderBy(col("customer_state").asc(),
col("order_placed_weekdate").asc())

In [0]:
## Step19:

#Total Freight Charge
#i. Total freight charge
#ii. Total freight charge per city
#iii. Total freight charge per state
## Total freight charge
grp_date_total_freight_charge = grp_weekdate.agg(F.sum("order_freight_charge").alias("total_freight_charge")).orderBy(col("order_placed_weekdate").asc())

## Total freight charge per city.
grp_city_date_total_freight_charge = grp_city_weekdate.agg(F.sum("order_freight_charge").alias("total_freight_charge")).orderBy(col("customer_city").asc(),col("order_placed_weekdate").asc())

## Total freight charge per state.
grp_state_date_total_freight_charge = grp_state_weekdate.agg(F.sum("order_freight_charge").alias("total_freight_charge")).orderBy(col("customer_state").asc(),col("order_placed_weekdate").asc())

In [0]:
## Step20:

# Total Order Count
# i. Total order count
# ii. Total order count per city
# iii. Total order count per state

## Total order count
grp_date_total_order_count = grp_weekdate.agg(F.count("id").alias("total_order_count")).orderBy(col("order_placed_weekdate").asc())

## Total order count per city
grp_city_date_total_order_count = grp_city_weekdate.agg(F.count("id").alias("total_order_count")).orderBy(col("customer_city").asc(),
col("order_placed_weekdate").asc())

## Total order count per state
grp_state_date_total_order_count = grp_state_weekdate.agg(F.count("id").alias("total_order_count")).orderBy(col("customer_state").asc(),
col("order_placed_weekdate").asc())

In [0]:
#Step21: Average Freight charge

## Total average freight charge.
grp_date_avg_freight_charge = grp_weekdate.agg(F.avg("order_freight_charge").alias("avg_freight_charge")).orderBy(col("order_placed_weekdate").asc())

## Total order freight charge per city
grp_city_date_vg_freight_charge = grp_city_weekdate.agg(F.avg("order_freight_charge").alias("avg_freight_charge")).orderBy(col("customer_city").asc(),col("order_placed_weekdate").asc())

## Total order freight charge per state
grp_state_date_vg_freight_charge = grp_state_weekdate.agg(F.avg("order_freight_charge").alias("avg_freight_charge")).orderBy(col("customer_state").asc(),col("order_placed_weekdate").asc())


In [0]:
# Step22: Average Review score

# Total average review charge.
grp_date_avg_review_score = grp_weekdate.agg(F.avg("product_review_score").alias("avg_review_score")).orderBy(col("order_placed_weekdate")
.asc())

## Total order average review per city
grp_city_date_avg_review_score = grp_city_weekdate.agg(F.avg("product_review_score").alias("avg_review_score")).orderBy(col("customer_city").asc(),col("order_placed_weekdate").asc())

## Total order average review per state
grp_state_date_avg_review_score = grp_state_weekdate.agg(F.avg("product_review_score").alias("avg_review_score")).orderBy(col("customer_state").asc(),col("order_placed_weekdate").asc())

In [0]:
# Step23: Average Approval Time

grp_date_avg_approval_time = grp_weekdate.agg(F.avg("order_approval_time").alias("avg_approval_time")).orderBy(col("order_placed_weekdate")
.asc())	

## Average Approval Time per city
grp_city_date_avg_approval_time = grp_city_weekdate.agg(F.avg("order_approval_time").alias("avg_approval_time")).orderBy(col("customer_city").asc(),col("order_placed_weekdate").asc())

## Average Approval Time per state
grp_state_date_avg_approval_time = grp_state_weekdate.agg(F.avg("order_approval_time").alias("avg_approval_time")).orderBy(col("customer_state").asc(),col("order_placed_weekdate").asc())

In [0]:
# Step24: Average Delivery Time

grp_date_avg_delivery_time = grp_weekdate.agg(F.avg("order_delivery_time").alias("avg_delivery_time")).orderBy(col("order_placed_weekdate").asc())

## Average Approval Time per city
grp_city_date_avg_delivery_time = grp_city_weekdate.agg(F.avg("order_delivery_time").alias("avg_delivery_time")).orderBy(col("customer_city").asc(),col("order_placed_weekdate").asc())

## Average Approval Time per state
grp_state_date_avg_delivery_time = grp_state_weekdate.agg(F.avg("order_delivery_time").alias("avg_delivery_time")).orderBy(col("customer_state").asc(),col("order_placed_weekdate").asc())

In [0]:
## ---------------------- INSIGHTS TO BE STORED ------------------
# historical Weekly insights into 3 tables

#i. Insights per period
#ii. Insights per period per city
#ii. Insights per period per state

## Per period
grp_date_insights = reduce(
lambda x, y: x.join(y, on=["order_placed_weekdate"], how="left"),
[
grp_date_total_sales,
grp_date_total_freight_charge,
grp_date_total_order_count,
grp_date_avg_freight_charge,
grp_date_avg_review_score,
grp_date_avg_approval_time,
grp_date_avg_delivery_time,
]
)

## Per city
grp_city_date_insights = reduce(
lambda x, y: x.join(y, on=["customer_city", "order_placed_weekdate"], how="left"),
[
grp_city_date_total_sales,
grp_city_date_total_freight_charge,
grp_city_date_total_order_count,
grp_city_date_vg_freight_charge,
grp_city_date_avg_review_score,
grp_city_date_avg_approval_time,
grp_city_date_avg_delivery_time,
]
)

## Per State
grp_state_date_insights = reduce(
lambda x, y: x.join(y, on=["customer_state", "order_placed_weekdate"], how="left"),
[
grp_state_date_total_sales,
grp_state_date_total_freight_charge,
grp_state_date_total_order_count,
grp_state_date_vg_freight_charge,
grp_state_date_avg_review_score,
grp_state_date_avg_approval_time,
grp_state_date_avg_delivery_time,
]
)

In [0]:
# Step27: Write the insights as a CSV file into the file system [DBFS]

# write pyspark dataframe as csv file into file system (dbfs -> Databricks file system assocaited with every dbricks workspace.)
grp_date_insights.write.csv("dbfs:/FileStore/shared_uploads/odl_user_1393753@simplilearnss.onmicrosoft.com/weeklygrpdateinsights", mode="overwrite", header=True)
grp_city_date_insights.write.csv("dbfs:/FileStore/tables/weeklygrpcitydateinsights", mode="overwrite", header=True)
grp_state_date_insights.write.csv("dbfs:/FileStore/tables/weeklygrpstatedateinsights", mode="overwrite", header=True)

In [0]:
# Step 28 : ADLS Blob Object storage
# write pyspark dataframe as csv file into storage account (Blob Storage Azure/AWS protocol)
# Set the Azure storage account access key
spark.conf.set(
    "fs.azure.account.key.retailanalytics.dfs.core.windows.net",
    "<access-key>"
)

grp_date_insights.write.csv(
    "abfss://staging-data@retailanalytics.dfs.core.windows.net/insights/weeklygrpdateinsights",
    mode="overwrite", header=True
)
grp_city_date_insights.write.csv(
    "abfss://staging-data@retailanalytics.dfs.core.windows.net/insights/weeklygrpcitydateinsights",
    mode="overwrite", header=True
)
grp_state_date_insights.write.csv(
    "abfss://staging-data@retailanalytics.dfs.core.windows.net/insights/weeklygrpstatedateinsights",
    mode="overwrite", header=True
)

###### Write to CosmoDB

In [0]:
from pyspark.sql.functions import col, expr

# Assuming avg_approval_time is of type INTERVAL DAY TO SECOND, convert it to seconds
grp_date_insights = grp_date_insights.withColumn(
    "avg_approval_seconds", 
    expr("CAST(avg_approval_time AS LONG)")
)

grp_date_insights = grp_date_insights.withColumn(
    "avg_delivery_seconds", 
    expr("CAST(avg_delivery_time AS LONG)")
)


In [0]:
# Drop unwanted columns
grp_date_insights = grp_date_insights.drop("avg_approval_time", "avg_delivery_time","extracted_seconds")
display(grp_date_insights)

order_placed_weekdate,total_sales,total_freight_charge,total_order_count,avg_freight_charge,avg_review_score,avg_approval_seconds,avg_delivery_seconds
2018-05-28,149265.76000000027,23134.090000000004,1086,21.302108655616948,4.1178637200736645,33306,997092
2017-09-11,164760.99000000025,24325.099999999988,1141,21.319106047326898,4.0359333917616125,32288,1037870
2016-10-03,37213.51999999998,5206.98,238,21.878067226890757,3.983193277310925,78559,1692365
2017-07-31,145720.54000000027,21092.090000000007,961,21.94806451612904,4.112382934443288,32921,960860
2017-10-23,169682.51000000013,20973.05,957,21.915412748171367,4.103448275862069,33149,1004751
2017-08-14,141926.73000000027,21662.48000000001,1014,21.36339250493098,4.059171597633136,34998,969588
2017-12-25,122559.7300000002,19725.13,860,22.9361976744186,4.0162790697674415,37170,1227703
2018-02-26,281564.4199999997,40025.82000000004,1856,21.56563577586209,4.056573275862069,32920,1647681
2017-01-30,69188.16000000006,9610.28,435,22.092597701149423,4.13103448275862,33706,1083125
2018-04-16,262123.9699999998,36301.550000000025,1646,22.05440461725397,4.042527339003645,102838,992814


In [0]:
## Step28: Write the insights to NoSQL

# template
"""
config = {
"spark.cosmos.accountEndpoint": "<cosmos-account-endpoint>",
"spark.cosmos.accountKey": "<cosmos-account-key>",
"spark.cosmos.database": "<database-name>",
"spark.cosmos.container": "<container-name>",
}
"""
# connect to cosmos account using read-write key
config = {
"spark.cosmos.accountEndpoint": "https://deepi.documents.azure.com:443/",
"spark.cosmos.accountKey":
"<access-key>",
"spark.cosmos.database": "ToDoList",
"spark.cosmos.container": "Items",
}

In [0]:
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType

# Add the "id" field with a default value if it is missing
if "id" not in grp_date_insights.columns:
    grp_date_insights = grp_date_insights.withColumn("id", lit("default_id"))

# Convert the "id" field to string if it is not already
if grp_date_insights.schema["id"].dataType != StringType():
    grp_date_insights = grp_date_insights.withColumn("id", grp_date_insights["id"].cast(StringType()))

# Save the DataFrame
grp_date_insights.write.format("cosmos.oltp").options(**config).mode("append").save()