# Reading the Data

In [0]:
storage_account = "olistdatastoragacchet" # Storage account name where the data is stored
container = "olistdata" # Container name where the data is stored

spark.conf.set( # Set the storage account key
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
    {secretKey} # Secret key which user would be getting from the secret scope
)

base_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/bronze/" # Path to the bronze container where the raw data will be stored 

orders_path = base_path + "olist_orders_dataset.csv" # Path insidebronze container where olist_orders_dataset.csv will be stored
payments_path = base_path + "olist_order_payments_dataset.csv" # Path inside bronze container where olist_order_payments_dataset.csv will be stored
reviews_path = base_path + "olist_order_reviews_dataset.csv" # Path inside bronze container where olist_order_reviews_dataset.csv will be stored
items_path = base_path + "olist_order_items_dataset.csv" # Path inside bronze container where olist_order_items_dataset.csv will be stored
customers_path = base_path + "olist_customers_dataset.csv" # Path inside bronze container where olist_customers_dataset.csv will be stored
sellers_path = base_path + "olist_sellers_dataset.csv" # Path inside bronze container where olist_sellers_dataset.csv will be stored
geolocation_path = base_path + "olist_geolocation_dataset.csv" # Path inside bronze container where olist_geolocation_dataset.csv will be stored
products_path = base_path + "olist_products_dataset.csv" # Path inside bronze container where olist_products_dataset.csv will be stored

# Read the csv files with options of header as true(1st row is a column names) and store it in a dataframe
orders_df = spark.read.format("csv").option("header", "true").load(orders_path) 
payments_df = spark.read.format("csv").option("header", "true").load(payments_path)
reviews_df = spark.read.format("csv").option("header", "true").load(reviews_path)
items_df = spark.read.format("csv").option("header", "true").load(items_path)
customers_df = spark.read.format("csv").option("header", "true").load(customers_path)
sellers_df = spark.read.format("csv").option("header", "true").load(sellers_path)
geolocation_df = spark.read.format("csv").option("header", "true").load(geolocation_path)
products_df = spark.read.format("csv").option("header", "true").load(products_path)



In [0]:
df = spark.read.format("csv").option("header", "true").load(f"abfss://olistdata@olistdatastoragacchet.dfs.core.windows.net/bronze/olist_customers_dataset.csv") # Read the csv file with options of header as true(1st row is a column names) and store it in a dataframe

display(df) # Display the dataframe df

#Reading Data from MongoDB database

In [0]:
from pymongo import MongoClient

In [0]:
#importing module
from pymongo import MongoClient

# Credentials of connecting MongoDB dataset stored in files.io 
hostname=""
database = ""
port=""
username = ""
password=""

uri = "mongodb://" + username+":"+ password + "@" + hostname + ":" + port + "/" + database

# Connect with the port number and host
client = MongoClient(uri)

# Access database
mydatabase = client [database]

mydatabase

In [0]:
import pandas as pd # Importing pandas module

collection = mydatabase['product_categories'] # Accessing the collection
mongo_data = pd.DataFrame(list(collection.find())) # Converting the collection into a dataframe


In [0]:
mongo_data

#Cleaning the Data

In [0]:
dbutils.secrets.get(scope="ecom-secret-scope", key="ecom-secret-key")

In [0]:
from pyspark.sql.functions import col, to_date, datediff, current_date # Importing functions

In [0]:
def clean_dataframe(df, name): # Function to clean the dataframe
    print("Cleaning " + name) # Print the name of the dataframe
    return df.drop_duplicates().na.drop("all") # Return the dataframe with duplicates dropped and all null values dropped

orders_df = clean_dataframe(orders_df, "Orders") # Call the function to clean the dataframe
display(orders_df) # Display the dataframe


In [0]:
# Converting Date Columns

from pyspark.sql.functions import to_date, col # import functions

# Converting string columns to date type for accurate date calculations and comparisons
orders_df = orders_df.withColumn("order_purchase_timestamp", to_date(col("order_purchase_timestamp")))\
    .withColumn("order_delivered_customer_date", to_date(col("order_delivered_customer_date")))\
        .withColumn("order_estimated_delivery_date", to_date(col("order_estimated_delivery_date")))



In [0]:
# Calculating Delivery and Time delays
from pyspark.sql.functions import datediff, col

orders_df = orders_df.withColumn("actual_delivery_time", datediff("order_delivered_customer_date", "order_purchase_timestamp")) # Calculating the actual delivery time

orders_df = orders_df.withColumn("estimated_delivery_time", datediff("order_estimated_delivery_date", "order_purchase_timestamp")) # Calculating the estimated delivery time

orders_df = orders_df.withColumn("delay Time",col("actual_delivery_time") - col("estimated_delivery_time")) # Calculating the delay time

display(orders_df) # Displaying the dataframe
                                               

# Joining

In [0]:
# Performing joins between the dataframes

orders_customers_df = orders_df.join(customers_df, orders_df.customer_id == customers_df.customer_id, "left") # Joining the orders and customers dataframes

orders_payments_df = orders_customers_df.join(payments_df, orders_customers_df.order_id == payments_df.order_id, "left") # Joining the orders and payments dataframes

orders_items_df = orders_payments_df.join(items_df, "order_id", "left") # Joining the orders and items dataframes

orders_items_products_df = orders_items_df.join(products_df, orders_items_df.product_id == products_df.product_id, "left") # Joining the orders, items and products dataframes

final_df = orders_items_products_df.join(sellers_df, orders_items_products_df.seller_id == sellers_df.seller_id, "left") # Joining the final dataframe with the sellers dataframe

In [0]:
display(final_df) # Displaying the final dataframe

In [0]:
mongo_data.drop("_id", axis=1, inplace=True) # Dropping the _id column which mongoDB has for all the json objects stored.

mongo_spark_df = spark.createDataFrame(mongo_data) # Creating a spark dataframe from the mongo dataframe

display(mongo_spark_df) # Displaying the mongo spark dataframe

In [0]:
final_df = final_df.join(mongo_spark_df, "product_category_name", "left") # Joining the final dataframe with the mongo spark dataframe

In [0]:
display(final_df) # Displaying the final dataframe

In [0]:
def remove_duplicate_columns(df): # Function to remove duplicate 
    columns = df.columns # Getting the columns 

    seen_columns = set() # Creating a set to store the columns seen
    columns_to_drop = [] # Creating a list to store the columns to drop

    for column in columns: # Looping through the columns
        if column in seen_columns: # Checking if the column is
            columns_to_drop.append(column) # Adding the column to the list
        else:
            seen_columns.add(column) # Adding the column to the set
            
    df_cleaned = df.drop(*columns_to_drop) # Dropping the columns
    return df_cleaned # Returning the dataframe

final_df = remove_duplicate_columns(final_df) # Calling the function to remove duplicate columns

In [0]:
# Writing the final_df Spark DataFrame to the specified Azure Data Lake Storage Gen2 path in Parquet format,
# using 'overwrite' mode to replace any existing data at the destination
# Cleaned data is stored in silver container path as it is business ready data
#  
final_df.write.mode("overwrite").parquet("abfss://olistdata@olistdatastoragacchet.dfs.core.windows.net/silver")