In [0]:
# Define the storage account and container details
storage_account_name = "olistdatababita"
container_name = "olistdata"
access_key = ""

# Mount path in Databricks
mount_point = f"/mnt/{storage_account_name}/{container_name}"

# Unmount if already mounted (optional)
try:
    dbutils.fs.unmount(mount_point)
except:
    pass

# Mount the ADLS Gen2 storage
dbutils.fs.mount(
    source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net",
    mount_point=mount_point,
    extra_configs={f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": access_key}
)

# Verify if the mount was successful
display(dbutils.fs.ls(mount_point))


path,name,size,modificationTime
dbfs:/mnt/olistdatababita/olistdata/bronze/,bronze/,0,0
dbfs:/mnt/olistdatababita/olistdata/gold/,gold/,0,0
dbfs:/mnt/olistdatababita/olistdata/silver/,silver/,0,0


In [0]:
customer_df = spark.read.csv(f"/mnt/{storage_account_name}/{container_name}/bronze/olist_customers_dataset.csv", header=True, inferSchema=True)
display(customer_df)

In [0]:
base_path = "/mnt/olistdatababita/olistdata/bronze/"
orders_path = base_path + "olist_orders_dataset.csv"
payments_path = base_path + "olist_order_payments_dataset"
reviews_path = base_path + "olist_order_reviews_dataset.csv"
items_path = base_path + "olist_order_items_dataset.csv"
sellers_path = base_path + "olist_sellers_dataset.csv"
geolocation_path = base_path + "olist_geolocation_dataset.csv"
products_path = base_path + "olist_products_dataset.csv"


orders_df = spark.read.format("csv").option("header", "true").load(orders_path)
payments_df = spark.read.format("csv").option("header", "true").load(payments_path)
reviews_df = spark.read.format("csv").option("header", "true").load(reviews_path)
items_df = spark.read.format("csv").option("header", "true").load(items_path)
sellers_df = spark.read.format("csv").option("header", "true").load(sellers_path)
geolocation_df = spark.read.format("csv").option("header", "true").load(geolocation_path)
products_df = spark.read.format("csv").option("header", "true").load(products_path)

In [0]:
# importing module
from pymongo import MongoClient

hostname = "m1js9.h.filess.io"
database = "olistNoSql_establish"
port = "27018"
username = "olistNoSql_establish"
password = "34647078d307789d6a9c068c87eba23e02d18a59"

uri = "mongodb://" + username + ":" + password + "@" + hostname + ":" + port + "/" + database

# Connect with the portnumber and host
client = MongoClient(uri)

# Access database
mydatabase = client[database]
mydatabase

Database(MongoClient(host=['m1js9.h.filess.io:27018'], document_class=dict, tz_aware=False, connect=True), 'olistNoSql_establish')

In [0]:
import pandas as pd
collection = mydatabase["product_categories"]
mongo_df = pd.DataFrame(list(collection.find())).assign(_id=lambda x: x['_id'].astype(str))
display(mongo_df)

In [0]:
from pyspark.sql.functions import col,to_date,datediff,current_date,when

In [0]:
def clean_dataframe(df,name):
    print("Cleaning "+name)
    return df.dropDuplicates().na.drop('all')
orders_df = clean_dataframe(orders_df,"Orders")
display(orders_df)

Cleaning Orders


order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
3923e3ade70348985bd2ca389905cf19,6454e6cba392b35aa21527063026fc92,delivered,2018-03-07 23:00:33,2018-03-09 03:00:35,2018-03-14 02:08:26,2018-03-19 18:46:32,2018-03-29 00:00:00
c2d07d9078b700b9198a126183867c16,20c5718e5f50e1e3800046039376e216,delivered,2018-08-14 17:48:09,2018-08-14 18:23:55,2018-08-16 09:02:00,2018-08-21 14:06:44,2018-08-30 00:00:00
9db49839ad325c2bf4303df727694be2,3be5a877ceebea2954404c6ef8e70be2,delivered,2017-09-13 17:53:28,2017-09-15 02:24:49,2017-09-20 13:35:16,2017-10-03 21:13:43,2017-10-16 00:00:00
b5e23127e5bc161906c1d23be69f7e16,6e916919988c15d61b259bc9494db0f8,delivered,2017-11-13 16:23:17,2017-11-13 16:56:00,2017-11-16 18:41:59,2017-11-24 18:03:13,2017-12-04 00:00:00
e91398c99556f8519e116e6ca4f774e1,e71b7daec09cf1f42e8764868665e5b0,delivered,2018-03-21 15:30:57,2018-03-21 15:48:35,2018-03-22 20:02:51,2018-04-06 02:02:43,2018-04-10 00:00:00
07a0bc58f3b841724c5b8c649da441d5,b70e717fe0599c3bf2f69725b26fdf8e,delivered,2018-01-17 16:44:42,2018-01-17 16:52:19,2018-01-25 15:39:01,2018-02-06 19:16:30,2018-02-19 00:00:00
3c8876b8e5a484c81a1aa6c8bc4e65d6,e4e69809ae1da631eaa47e73e5f4f641,delivered,2017-09-14 13:25:35,2017-09-15 16:44:24,2017-09-18 16:13:17,2017-10-10 19:42:11,2017-10-18 00:00:00
6e9cd3f36a1e951e29d455913402c171,aa76c84a162fed7b8e5b935ce4ea7533,delivered,2018-07-24 21:54:54,2018-07-24 22:10:19,2018-07-25 12:15:00,2018-07-26 19:58:43,2018-08-09 00:00:00
9d5dfd77cf32bd319a504f77a137491b,c590bde02b403025bd56faa049638968,delivered,2018-03-20 16:43:21,2018-03-20 16:56:06,2018-03-26 20:32:08,2018-04-13 22:58:22,2018-04-06 00:00:00
0957ed870116e596b800540427c61497,0f0603d577f299ca129f39109268c546,delivered,2017-01-29 22:14:49,2017-01-29 22:33:34,2017-01-30 08:27:47,2017-02-08 17:14:55,2017-03-20 00:00:00


In [0]:
# convert Date Colums

orders_df = orders_df.withColumn("order_purchase_timestamp", to_date(col("order_purchase_timestamp")))\
    .withColumn("order_delivered_customer_date", to_date(col("order_delivered_customer_date")))\
        .withColumn("order_estimated_delivery_date", to_date(col("order_estimated_delivery_date")))

In [0]:
# Calculate Delivery and Time Delays

orders_df = orders_df.withColumn("actual_delivery_time", datediff("order_delivered_customer_date", "order_purchase_timestamp"))
orders_df = orders_df.withColumn("estimated_delivery_time", datediff("order_estimated_delivery_date", "order_purchase_timestamp"))
orders_df =orders_df.withColumn("Delay Time", col("actual_delivery_time") - col("estimated_delivery_time"))

display(orders_df)

In [0]:
orders_cutomers_df = orders_df.join(customer_df, orders_df.customer_id == customer_df.customer_id,"left")

orders_payments_df = orders_cutomers_df.join(payments_df, orders_cutomers_df.order_id == payments_df.order_id,"left")

orders_items_df = orders_payments_df.join(items_df,"order_id","left")

orders_items_products_df = orders_items_df.join(products_df, orders_items_df.product_id == products_df.product_id,"left")

final_df = orders_items_products_df.join(sellers_df, orders_items_products_df.seller_id == sellers_df.seller_id,"left")

In [0]:
# mongo_df.drop('_id',axis=1,inplace=True)

mongo_spark_df = spark.createDataFrame(mongo_df)
display(mongo_spark_df)

In [0]:
final_df = final_df.join(mongo_spark_df,"product_category_name","left")

In [0]:
def remove_duplicate_column(df):
    columns = df.columns
    seen_col = []
    drop_col = []
    for col in columns:
        if col in seen_col:
            drop_col.append(col)
        else:
            seen_col.append(col)
    df_cleaned = df.drop(*drop_col)
    return df_cleaned
final_df = remove_duplicate_column(final_df)
display(final_df)

In [0]:
final_df.write.mode("overwrite").parquet("/mnt/olistdatababita/olistdata/silver")