### Secure Connection with Azure Data Lake Gen2

In [0]:
storage_account = "olistdatastorageact"
application_id = "4b9d1ba6-91eb-4b4b-906b-1dabe62b6510"
directory_id = "2d8cd6ba-c64a-4b3d-85b0-573d3c127c42"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", "rRQ8Q~KYw6ZZSrk2.zLoFtuClObsE6.w1VNxsbS0")
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

**Test if the connexion work well**

In [0]:
df = (
    spark
    .read
    .format("csv")
    .option("header", "true")  # Utilise le header
    # .option("header", "true")  # Utilise le header
    .load("abfss://olistdata@olistdatastorageact.dfs.core.windows.net/bronze/olist_customers_dataset.csv")
)

display(df.head(3))


customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP
18955e83d337fd6b2def6b18a428ac77,290c77bc529b7ac935b93aa66c333dc3,9790,sao bernardo do campo,SP
4e7b3e00288586ebd08712fdd0374a03,060e732b5b29e8181a18229c7b0b2b5e,1151,sao paulo,SP


**Import All the Data from Data Lake Gen2**

In [0]:
path_dfs_dict = {
    "orders_df": "oolist_orders_dataset.csv",
    "geolocation_df": "olist_geolocation_dataset.csv",
    "customers_df": "olist_customers_dataset.csv",
    "order_items_df": "olist_order_items_dataset.csv",
    "order_payments_df": "olist_order_payments_dataset.csv",
    "products_df": "olist_products_dataset.csv",
    "sellers_df": "olist_sellers_dataset.csv"
}

dfs = {}

for index, (df_name, file_name) in enumerate(path_dfs_dict.items()):
    dfs[df_name] = (
        spark.read
        .option('header', "true")
        .csv(f"abfss://olistdata@olistdatastorageact.dfs.core.windows.net/bronze/{file_name}")
    )
    display(dfs[df_name].head(1))


order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00


geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state
1037,-23.54562128115268,-46.63929204800168,sao paulo,SP


customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP


order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
00010242fe8c5a6d1ba2dd792cb16214,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-09-19 09:45:35,58.9,13.29


order_id,payment_sequential,payment_type,payment_installments,payment_value
b81ef226f3fe1789b1e8b2acac839d17,1,credit_card,8,99.3300018


product_id,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm
1e9e8ef04dbcff4541ed26657ea517e5,perfumaria,40,287,1,225,16,10,14


seller_id,seller_zip_code_prefix,seller_city,seller_state
3442f8959a84dea7ee197c632cb2df15,13023,campinas,SP


# Data Transformation

### Data Cleaning

**Import the Bibs of Cleaning data Area**

In [0]:
import pyspark.sql.functions as F

In [0]:
%python
def clean_dataFrame(df, name):
    print(f"Cleaning {name}...") 
    # Supprimer les doublons et les lignes compl√®tement vides (toutes les valeurs NULL)
    return df.dropDuplicates().na.drop('all')

# Appliquer la fonction de nettoyage sur tous les DataFrames dans 'dfs'
for name, df in dfs.items():
    dfs[name] = clean_dataFrame(df, name)

Cleaning orders_df...
Cleaning geolocation_df...
Cleaning customers_df...
Cleaning order_items_df...
Cleaning order_payments_df...
Cleaning products_df...
Cleaning sellers_df...


Convert Date Columns

In [0]:
for index, (df_name, df) in enumerate(dfs.items()):
  for col in df.columns:
    if(  "DATE" in col.upper()):
      df = df.withColumn(col+"_convrted_", F.to_date(F.col(col)))
  print("--------------------"+df_name+"------------------------------")
  display(df.head(1))  

--------------------orders_df------------------------------


order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,actual_delivery_time,estimated_delivery_time,Delay Time,order_delivered_carrier_date_convrted_,order_delivered_customer_date_convrted_,order_estimated_delivery_date_convrted_
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00,8,16,-8,2017-10-04,2017-10-10,2017-10-18


--------------------geolocation_df------------------------------


geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state
1037,-23.54562128115268,-46.63929204800168,sao paulo,SP


--------------------customers_df------------------------------


customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP


--------------------order_items_df------------------------------


order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value,shipping_limit_date_convrted_
00010242fe8c5a6d1ba2dd792cb16214,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-09-19 09:45:35,58.9,13.29,2017-09-19


--------------------order_payments_df------------------------------


order_id,payment_sequential,payment_type,payment_installments,payment_value
b81ef226f3fe1789b1e8b2acac839d17,1,credit_card,8,99.3300018


--------------------products_df------------------------------


product_id,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm
1e9e8ef04dbcff4541ed26657ea517e5,perfumaria,40,287,1,225,16,10,14


--------------------sellers_df------------------------------


seller_id,seller_zip_code_prefix,seller_city,seller_state
3442f8959a84dea7ee197c632cb2df15,13023,campinas,SP


In [0]:
# Calculate Delivery and Time Delays

dfs["orders_df"] = dfs["orders_df"].withColumn("actual_delivery_time", F.datediff("order_delivered_customer_date", "order_purchase_timestamp"))
dfs["orders_df"] = dfs["orders_df"].withColumn("estimated_delivery_time", F.datediff("order_estimated_delivery_date", "order_purchase_timestamp"))
dfs["orders_df"] =dfs["orders_df"].withColumn("Delay Time", F.col("actual_delivery_time") - F.col("estimated_delivery_time"))
display(dfs["orders_df"].head(1))

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,actual_delivery_time,estimated_delivery_time,Delay Time
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00,8,16,-8


### Get Nosql Data base(MongoDB)

In [0]:
# importing module
from pymongo import MongoClient
import pandas as pd

hostname = "b1a57.h.filess.io"
database = "NoSQLDatabase_toysociety"
port = "27018"
username = "NoSQLDatabase_toysociety"
password = "f48aeecce95f1930d755082e6ec775ec2bdc5f1e"

uri = "mongodb://" + username + ":" + password + "@" + hostname + ":" + port + "/" + database

# Connect with the portnumber and host
client = MongoClient(uri)

# Access database
mydatabase = client[database]
collection = mydatabase["product_categories"]
product_categories_df = pd.DataFrame(list(collection.find()))

### Joining

In [0]:
orders_cutomers_df = dfs["orders_df"].join(dfs["customers_df"], dfs["orders_df"].customer_id == dfs["customers_df"].customer_id,"left")

orders_payments_df = orders_cutomers_df.join(dfs["order_payments_df"], orders_cutomers_df.order_id == dfs["order_payments_df"].order_id,"left")

orders_items_df = orders_payments_df.join(dfs["order_items_df"],"order_id","left")


orders_items_products_df = orders_items_df.join(dfs["products_df"], orders_items_df.product_id == dfs["products_df"].product_id,"left")

final_df = orders_items_products_df.join(dfs["sellers_df"], orders_items_products_df.seller_id == dfs["sellers_df"].seller_id,"left")

In [0]:
display(final_df.head(1))

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,actual_delivery_time,estimated_delivery_time,Delay Time,customer_id.1,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,order_id.1,payment_sequential,payment_type,payment_installments,payment_value,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value,product_id.1,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm,seller_id.1,seller_zip_code_prefix,seller_city,seller_state
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00,8,16,-8,9ef432eb6251297304e76186b10a928d,7c396fd4830fd04220f754e42b4e5bff,3149,sao paulo,SP,e481f51cbdc54678b7cc49136f2d6af7,2,voucher,1,18.5900002,1,87285b34884572647811a353c7ac498a,3504c0cb71d7fa48d967e0e4c94d59d9,2017-10-06 11:07:15,29.99,8.72,87285b34884572647811a353c7ac498a,utilidades_domesticas,40,268,4,500,19,8,13,3504c0cb71d7fa48d967e0e4c94d59d9,9350,maua,SP


In [0]:
product_categories_df.drop('_id',axis=1,inplace=True)

mongo_sparf_df = spark.createDataFrame(product_categories_df)
display(mongo_sparf_df.head(1))

product_category_name,product_category_name_english
beleza_saude,health_beauty


In [0]:
final_df = final_df.join(mongo_sparf_df,"product_category_name","left")

In [0]:
display(final_df.head(1))

product_category_name,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,actual_delivery_time,estimated_delivery_time,Delay Time,customer_id.1,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,order_id.1,payment_sequential,payment_type,payment_installments,payment_value,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value,product_id.1,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm,seller_id.1,seller_zip_code_prefix,seller_city,seller_state,product_category_name_english
utilidades_domesticas,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00,8,16,-8,9ef432eb6251297304e76186b10a928d,7c396fd4830fd04220f754e42b4e5bff,3149,sao paulo,SP,e481f51cbdc54678b7cc49136f2d6af7,2,voucher,1,18.5900002,1,87285b34884572647811a353c7ac498a,3504c0cb71d7fa48d967e0e4c94d59d9,2017-10-06 11:07:15,29.99,8.72,87285b34884572647811a353c7ac498a,40,268,4,500,19,8,13,3504c0cb71d7fa48d967e0e4c94d59d9,9350,maua,SP,housewares


### Remove Duplicated Columns

In [0]:
def remove_duplicate_columns(df):
    columns = df.columns

    seen_columns = set()
    columns_to_drop = []

    for column in columns:
        if column in seen_columns:
            columns_to_drop.append(column)
        else:
            seen_columns.add(column)
    
    df_cleaned = df.drop(*columns_to_drop)
    return df_cleaned

final_df = remove_duplicate_columns(final_df)

### Push Transformed Data ta to Azure Data Lake Gen2

In [0]:
final_df.write.mode("overwrite").parquet("abfss://olistdata@olistdatastorageact.dfs.core.windows.net/silver")