<h1 style="text-align: center">FINAL PROJECT - OMNI ORACLE</h1>
<hr>
<h2 style="text-align: center">Team members</h2>
<hr>
<h3 style="text-align: center">Francis Tan Hong Xin</h3>
<h3 style="text-align: center">Gan Lai Soon</h3>
<h3 style="text-align: center">Khadijah Anhardeen</h3>
<h3 style="text-align: center">Muhammad Nur Syafaat Bin Mohamed Saat</h3>

<hr>
<h2 style="text-align: center">Table of contents</h2>
<hr>

<ul>
  <li>
    <a href="#section_1">Section 1: Preparation for data transformation</a>
    <ul>
      <li><a href="#section_1a">Section 1-A: Importing libraries and functions</a></li>
      <li><a href="#section_1b">Section 1-B: Initialize logging procedure</a></li>
      <li><a href="#section_1c">Section 1-C: Configuring Data Lake Storage connection</a></li>
      <li><a href="#section_1d">Section 1-D: Checking if data files exist in storage directory</a></li>
      <li><a href="#section_1e">Section 1-E: Define file names and input/output path to Data Lake Storage</a></li>
    </ul>
  </li>
  <li>
    <a href="#section_2">Section 2: Data Transformation</a>
    <ul>
      <li><a href="#section_2a">Section 2-A: The <code>geolocations</code> data</a></li>
      <li><a href="#section_2b">Section 2-B: The <code>customers</code> data</a></li>
      <li><a href="#section_2c">Section 2-C: The <code>sellers</code> data</a></li>
      <li><a href="#section_2d">Section 2-D: Appending new <code>geolocation</code> data</a></li>
      <li><a href="#section_2e">Section 2-E: The <code>orders</code> data</a></li>
      <li><a href="#section_2f">Section 2-F: The <code>product_category_name_translation</code> data</a></li>
      <li><a href="#section_2g">Section 2-G: The <code>products</code> data</a></li>
      <li><a href="#section_2h">Section 2-H: Appending new <code>product_category_name_translation</code> data</a></li>
      <li><a href="#section_2i">Section 2-I: The <code>order_reviews</code> data</a></li>
      <li><a href="#section_2j">Section 2-J: The <code>order_items</code> data</a></li>
      <li><a href="#section_2k">Section 2-K: The <code>order_payments</code> data</a></li>
      <li><a href="#section_2l">Section 2-L: Write to the logging process</a></li>
    </ul>
  </li>
  <li>
    <a href="#section_3">Section 3: DataFrame Export</a>
    <ul>
      <li><a href="#section_3a">Section 3-A: Export tables to Delta Tables and store them in the Data Lake Storage</a></li>
      <li><a href="#section_3b">Section 3-B: Write to the logging process</a></li>
    </ul>
  </li>
</ul>

<section id="section_1">
    <hr>
    <h2 style="text-align: center">Section 1: Preparation for data transformation</h2>
    <hr>
</section>

<section id="section_1a">
    <h3 style="text-align: center">Section 1-A: Importing libraries and functions</h3>
</section>

In [None]:
# for pyspark, connecting to azure storage account
from pyspark.sql import SparkSession, Window, functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *

# for pandas DataFrames, Series
import pandas as pd

# for math, data types
import numpy as np

# for dates, times, etc.
import datetime as dt

# for recording logs
import logging

# for foreign characters (Portuguese) and converting them
from unidecode import unidecode

# for checking the spelling of English words
from spellchecker import SpellChecker

# for translating foreign texts to English
from deep_translator import GoogleTranslator

<section id="section_1b">
    <h3 style="text-align: center">Section 1-B: Initialize logging procedure</h3>
</section>

In [None]:
# Configure logging
logging.basicConfig(level=logging.INFO, 
                    filename='data_transformation.log', 
                    format='%(asctime)s:%(levelname)s:%(message)s')

logging.info("ETL process started.")

<section id="section_1c">
    <h3 style="text-align: center">Section 1-C: Configuring Data Lake Storage connection</h3>
</section>

In [None]:
spark.conf.set(
    "fs.azure.account.key.omnioraclestorage.dfs.core.windows.net",
    dbutils.secrets.get(scope="key-vault-secret", key="storagekey")
)

<section id="section_1d">
    <h3 style="text-align: center">Section 1-D: Checking if data files exist in storage directory</h3>
</section>

In [None]:
dbutils.fs.ls("abfss://raw@omnioraclestorage.dfs.core.windows.net/olist/")

[FileInfo(path='abfss://raw@omnioraclestorage.dfs.core.windows.net/olist/customers/', name='customers/', size=0, modificationTime=1727619381000),
 FileInfo(path='abfss://raw@omnioraclestorage.dfs.core.windows.net/olist/geolocations/', name='geolocations/', size=0, modificationTime=1727619394000),
 FileInfo(path='abfss://raw@omnioraclestorage.dfs.core.windows.net/olist/order_items/', name='order_items/', size=0, modificationTime=1727619380000),
 FileInfo(path='abfss://raw@omnioraclestorage.dfs.core.windows.net/olist/order_payments/', name='order_payments/', size=0, modificationTime=1727619383000),
 FileInfo(path='abfss://raw@omnioraclestorage.dfs.core.windows.net/olist/order_reviews/', name='order_reviews/', size=0, modificationTime=1727619387000),
 FileInfo(path='abfss://raw@omnioraclestorage.dfs.core.windows.net/olist/orders/', name='orders/', size=0, modificationTime=1727619385000),
 FileInfo(path='abfss://raw@omnioraclestorage.dfs.core.windows.net/olist/product_category_name_transla

<section id="section_1e">
    <h3 style="text-align: center">Section 1-E: Define file names and input/output path to Data Lake Storage</h3>
</section>

In [None]:
geolocations = "geolocations"
customers = "customers"
sellers = "sellers"
orders = "orders"
order_items = "order_items"
order_payments = "order_payments"
order_reviews = "order_reviews"
products = "products"
product_category_name_translations = "product_category_name_translations"

read_path = "abfss://raw@omnioraclestorage.dfs.core.windows.net/olist/"
output_path = 'abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/'

<section id="section_2">
    <hr>
    <h2 style="text-align: center">Section 2: Data Transformation</h2>
    <hr>
</section>

<section id="section_2a">
    <h3 style="text-align: center">Section 2-A: The <code>geolocations</code> data</h3>
    <h4>Read in the parquet file and store inside a DataFrame called <code>geolocations_df</code></h4>
</section>

In [None]:
geolocations_df = spark.read.format('parquet')\
                            .options(nullValues='', sep=",")\
                            .load(f"{read_path}{geolocations}/{geolocations}.parquet")

logging.info("Geolocation data loaded successfully.")
print("Geolocation data loaded successfully.")

# get columns with NULL values
geolocations_null_columns = {col: geolocations_df.filter(geolocations_df[col].isNull()).count() > 0 for col in geolocations_df.columns}
if True in geolocations_null_columns.values():
    na_columns = [col for col in geolocations_null_columns.keys() if geolocations_null_columns[col] == True]
    na_columns = ', '.join(na_columns)
    logging.warning(f"Detected columns with NA values: {na_columns}")

Geolocation data loaded successfully.


In [None]:
from pyspark.sql.functions import *

# prints the number of non-null values in the dataframe
display(geolocations_df.select([count(when(col(c).isNotNull(), c)).alias(c) for c in geolocations_df.columns]))

geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state
1000163,1000163,1000163,1000163,1000163


In [None]:
geolocations_df.printSchema()

root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: decimal(38,18) (nullable = true)
 |-- geolocation_lng: decimal(38,18) (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)



In [None]:
display(geolocations_df.sort(asc("geolocation_zip_code_prefix"), desc("geolocation_city"), desc("geolocation_state")).limit(10))

geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state
1001,-23.550263371631395,-46.63419639384839,são paulo,SP
1001,-23.549779299469115,-46.6339571183853,são paulo,SP
1001,-23.549951273933896,-46.63402708563671,são paulo,SP
1001,-23.550497706907517,-46.63433817805407,sao paulo,SP
1001,-23.54929199999999,-46.633559478233785,sao paulo,SP
1001,-23.551426655288804,-46.63407394670785,sao paulo,SP
1001,-23.550497706907517,-46.63433817805407,sao paulo,SP
1001,-23.550641822090153,-46.63440979032252,sao paulo,SP
1001,-23.551336655288804,-46.634026997778314,sao paulo,SP
1001,-23.550497706907517,-46.63433817805407,sao paulo,SP


In [None]:
before_transform_row_count = geolocations_df.count()
before_transform_row_count

1000163

#### Problems and Possible Improvements:

As seen in the outputs from the cells above, there are some things that we needs to be fixed and some improvements that can be done:

1. The columns can be renamed into simpler names (`zip_code_prefix`, `latitude`, `longitude`, `city`, and `state`).

2. The strings under the `city` column has different characters with some using foreign (Portuguese) characters like 'ã', 'ç', 'é', etc.

3. There are duplicate `zip_code_prefix` values which makes it a problem as it is supposed to be the primary key of this table.

4. Because we want to merge the duplicate `zip_code_prefix` values into one, another problem arise where there are multiple different `city` and `state` values under one unique `zip_code_prefix`.

#### Rename the column names and change the data types of the columns.

In [None]:
geolocations_col_names = {
    'geolocation_zip_code_prefix': 'zip_code_prefix',
    'geolocation_lat': 'latitude',
    'geolocation_lng': 'longitude',
    'geolocation_city': 'city',
    'geolocation_state': 'state'
}

for old_col, new_col in geolocations_col_names.items():
    geolocations_df = geolocations_df.withColumnRenamed(old_col, new_col)

geolocations_df.printSchema()

root
 |-- zip_code_prefix: integer (nullable = true)
 |-- latitude: decimal(38,18) (nullable = true)
 |-- longitude: decimal(38,18) (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)



#### Change the data in `city` to lowercase and the data in `state` to uppercase.

This is done just for a good measure in case they are not stored properly.

In [None]:
geolocations_df = geolocations_df.withColumn('city', lower(geolocations_df['city']))
geolocations_df = geolocations_df.withColumn('state', upper(geolocations_df['state']))

display(geolocations_df.limit(5))

zip_code_prefix,latitude,longitude,city,state
1037,-23.54562128115268,-46.63929204800168,sao paulo,SP
1046,-23.54608112703553,-46.64482029837157,sao paulo,SP
1046,-23.546128966414688,-46.64295148361138,sao paulo,SP
1041,-23.5443921648681,-46.63949930627844,sao paulo,SP
1035,-23.541577961711493,-46.64160722329613,sao paulo,SP


#### Normalize the data in `city` to use normal characters, replacing any foreign characters like 'ã', 'ç', 'é', etc. 

This is so that it makes it easier to group the data according to the `zip_code_prefix`.

We will be using the **unidecode** library to detect the foreign characters and convert them to their normal character counterparts. We store the converted `city` data into another column called `city_normalized`.

In [None]:
# Define a UDF to apply the unidecode function to each value in the 'city' column
unidecode_udf = udf(lambda x: unidecode(x) if x is not None else None, StringType())

# Apply the UDF to create a new column 'city_normalized'
geolocations_df = geolocations_df.withColumn('city_normalized', unidecode_udf('city'))

# Sort the DataFrame by 'zip_code_prefix', 'city', and 'state' columns in the specified order
geolocations_df = geolocations_df.orderBy(['zip_code_prefix', 'city', 'state'], ascending=[True, False, False])

# Display the first 10 rows of the DataFrame
display(geolocations_df.limit(10))

zip_code_prefix,latitude,longitude,city,state,city_normalized
1001,-23.550263371631395,-46.63419639384839,são paulo,SP,sao paulo
1001,-23.549779299469115,-46.6339571183853,são paulo,SP,sao paulo
1001,-23.549951273933896,-46.63402708563671,são paulo,SP,sao paulo
1001,-23.550497706907517,-46.63433817805407,sao paulo,SP,sao paulo
1001,-23.54929199999999,-46.633559478233785,sao paulo,SP,sao paulo
1001,-23.551426655288804,-46.63407394670785,sao paulo,SP,sao paulo
1001,-23.550497706907517,-46.63433817805407,sao paulo,SP,sao paulo
1001,-23.550641822090153,-46.63440979032252,sao paulo,SP,sao paulo
1001,-23.551336655288804,-46.634026997778314,sao paulo,SP,sao paulo
1001,-23.550497706907517,-46.63433817805407,sao paulo,SP,sao paulo


#### Group the data by the `zip_code_prefix` and perform aggregations on the other columns.

To group the data by the `zip_code_prefix` in order to get a single unique `zip_code_prefix` in `geolocations_df`, we will perform aggregated functions on the other columns. 

For the `latitude` and `longitude` columns, we will take their average values. 

As for the `city_normalized` and `state`, we will compute the highest occurence of each value partitioned by the `zip_code_prefix` using the Window function (similar to the one in PostgreSQL) where we will compute the occurence and take the value with the highest occurence. 

We store the grouped data into a temporary DataFrame before performing the grouping process where the end result will be stored back to the original `geolocations_df`.

In [None]:
# Create a window partitioned by 'zip_code_prefix' to calculate the most frequent city and state
window_spec = Window.partitionBy("zip_code_prefix")

# Calculate the most frequent 'city' and 'state' using mode-like functionality
df_grouped = geolocations_df.withColumn(
                  "city_freq", F.count("city_normalized").over(window_spec)
               ).withColumn(
                  "state_freq", F.count("state").over(window_spec)
               )

# Display the first 10 rows of the DataFrame
display(df_grouped.limit(10))

zip_code_prefix,latitude,longitude,city,state,city_normalized,city_freq,state_freq
1005,-23.549980033585307,-46.63476783166945,são paulo,SP,sao paulo,25,25
1005,-23.549780031197237,-46.635358988655526,são paulo,SP,sao paulo,25,25
1005,-23.549819091869107,-46.63560588995324,sao paulo,SP,sao paulo,25,25
1005,-23.549819091869107,-46.63560588995324,sao paulo,SP,sao paulo,25,25
1005,-23.549471495989287,-46.63740387774824,sao paulo,SP,sao paulo,25,25
1005,-23.54875839078997,-46.6384109454671,sao paulo,SP,sao paulo,25,25
1005,-23.549819091869107,-46.63560588995324,sao paulo,SP,sao paulo,25,25
1005,-23.54875839078997,-46.6384109454671,sao paulo,SP,sao paulo,25,25
1005,-23.549471495989287,-46.63740387774824,sao paulo,SP,sao paulo,25,25
1005,-23.549770026371952,-46.63583891548848,sao paulo,SP,sao paulo,25,25


In [None]:
# Now, we will group by 'zip_code_prefix' and aggregate city, state, latitude, and longitude
geolocations_df = df_grouped.groupBy("zip_code_prefix").agg(
                  F.first(F.col("city_normalized")).alias("city"),
                  F.first(F.col("state")).alias("state"),
                  F.avg(F.col("latitude")).alias("latitude"),
                  F.avg(F.col("longitude")).alias("longitude")
               )

# Display the first 10 rows of the DataFrame
display(geolocations_df.limit(10))

zip_code_prefix,city,state,latitude,longitude
1001,sao paulo,SP,-23.550189776551765,-46.6340235559042
1002,sao paulo,SP,-23.54814573176355,-46.63497921074497
1003,sao paulo,SP,-23.54899372481316,-46.63573130997588
1004,sao paulo,SP,-23.549798842277006,-46.634756943789704
1005,sao paulo,SP,-23.549456199830743,-46.636732948036894
1006,sao paulo,SP,-23.55010181145287,-46.636136735174176
1007,sao paulo,SP,-23.550046202351663,-46.637251459637646
1008,sao paulo,SP,-23.54600174938346,-46.635885921357776
1009,sao paulo,SP,-23.546835208320864,-46.6364906953122
1010,sao paulo,SP,-23.546389438369268,-46.635226081509046


#### Difference before and after cleaning:

As you can see below, the `geolocations_df` DataFrame lost **981,148** records due to duplicate `zip_code_prefix` values from **1,000,163** records to **19,015** records.

In [None]:
before_transform_row_count

1000163

In [None]:
geolocations_df.count()

19015

<section id="section_2b">
    <h3 style="text-align: center">Section 2-B: The <code>customers</code> data</h3>
    <h4>Read in the parquet file and store inside a DataFrame called <code>customers_df</code></h4>
</section>

In [None]:
customers_df = spark.read.format('parquet')\
                         .options(nullValues='', sep=",")\
                         .load(f"{read_path}{customers}/{customers}.parquet")

logging.info("Customer data loaded successfully.")
print("Customer data loaded successfully.")

# get columns with NULL values
customers_null_columns = {col: customers_df.filter(customers_df[col].isNull()).count() > 0 for col in customers_df.columns}
if True in customers_null_columns.values():
    na_columns = [col for col in customers_null_columns.keys() if customers_null_columns[col] == True]
    na_columns = ', '.join(na_columns)
    logging.warning(f"Detected columns with NA values: {na_columns}")

Customer data loaded successfully.


In [None]:
from pyspark.sql.functions import *

# prints the number of non-null values in the dataframe
display(customers_df.select([count(when(col(c).isNotNull(),c)).alias(c) for c in customers_df.columns]))

customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
99441,99441,99441,99441,99441


In [None]:
customers_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [None]:
display(customers_df.limit(10))

customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP
18955e83d337fd6b2def6b18a428ac77,290c77bc529b7ac935b93aa66c333dc3,9790,sao bernardo do campo,SP
4e7b3e00288586ebd08712fdd0374a03,060e732b5b29e8181a18229c7b0b2b5e,1151,sao paulo,SP
b2b6027bc5c5109e529d4dc6358b12c3,259dac757896d24d7702b9acbbff3f3c,8775,mogi das cruzes,SP
4f2d8ab171c80ec8364f7c12e35b23ad,345ecd01c38d18a9036ed96c73b8d066,13056,campinas,SP
879864dab9bc3047522c92c82e1212b8,4c93744516667ad3b8f1fb645a3116a4,89254,jaragua do sul,SC
fd826e7cf63160e536e0908c76c3f441,addec96d2e059c80c30fe6871d30d177,4534,sao paulo,SP
5e274e7a0c3809e14aba7ad5aae0d407,57b2a98a409812fe9618067b6b8ebe4f,35182,timoteo,MG
5adf08e34b2e993982a47070956c5c65,1175e95fb47ddff9de6b2b06188f7e0d,81560,curitiba,PR
4b7139f34592b3a31687243a302fa75b,9afe194fb833f79e300e37e580171f22,30575,belo horizonte,MG


In [None]:
before_transform_row_count = customers_df.count()
before_transform_row_count

99441

#### Problems and Possible Improvements:

As seen in the outputs from the cells above, there are some things that we needs to be fixed and some improvements that can be done:

1. The `customer_city` and `customer_state` columns can be removed from the DataFrame since we can just reference them from `geolocations_df` using `customer_zip_code_prefix`.

2. We can rename the `customer_zip_code_prefix` column to `zip_code_prefix` to standardise the naming.

3. We need to check if the data under the `zip_code_prefix` column do exist inside the `geolocations_df` DataFrame.

#### Find rows where the `zip_code_prefix` does not exist inside the `geolocations_df` DataFrame.

From the cell below, we found that there are **278** rows that has `zip_code_prefix` which does not exist inside the `geolocations_df`. 

What do we do with these so-called invalid `zip_code_prefix` values? 

We decided to add them into the `geolocations_df` DataFrame so that we don't lose any data from the `customers_df` DataFrame. This will be done later after we collected more invalid `zip_code_prefix` values from the seller data.

In [None]:
# Join customers_df with geolocations_df to find non-matching zip_code_prefix
temp_missing_geo = customers_df.join(
    geolocations_df,
    customers_df['customer_zip_code_prefix'] == geolocations_df['zip_code_prefix'],
    'left_anti'
)

# Rename columns to match geolocations_df
temp_missing_geo = temp_missing_geo[['customer_zip_code_prefix', 'customer_city', 'customer_state']]
temp_missing_geo = temp_missing_geo.withColumnRenamed('customer_zip_code_prefix', 'zip_code_prefix')\
                                   .withColumnRenamed('customer_city', 'city')\
                                   .withColumnRenamed('customer_state', 'state')
# store the rows inside a temporary dataframe
inconsistent_rows = temp_missing_geo

print(f"Number of inconsistent rows: {inconsistent_rows.count()}")
display(inconsistent_rows)

Number of inconsistent rows: 278


zip_code_prefix,city,state
72300,brasilia,DF
11547,cubatao,SP
64605,picos,PI
72465,brasilia,DF
7729,caieiras,SP
72904,santo antonio do descoberto,GO
35408,glaura,MG
78554,sinop,MT
73369,brasilia,DF
8980,nossa senhora do remedio,SP


#### Remove redundant columns, and rename the `customer_zip_code_prefix` column to `zip_code_prefix`.

Since we can reference the `customer_city` and `customer_state` values from `geolocations_df` DataFrame using the `zip_code_prefix`, we can remove those columns from the DataFrame.

In [None]:
customers_df = customers_df[['customer_id', 'customer_unique_id', 'customer_zip_code_prefix']]

customers_df = customers_df.withColumnRenamed('customer_zip_code_prefix', 'zip_code_prefix')

customers_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- zip_code_prefix: integer (nullable = true)



#### As you can see below, no data in `customers_df` is lost during the transformation process.

In [None]:
print("Before transforming: ", before_transform_row_count)
print("After transforming: ", customers_df.count())

Before transforming:  99441
After transforming:  99441


<section id="section_2c">
    <h3 style="text-align: center">Section 2-C: The <code>sellers</code> data</h3>
    <h4>Read in the parquet file and store inside a DataFrame called <code>sellers_df</code></h4>
</section>

In [None]:
sellers_df = spark.read.format('parquet')\
                       .options(nullValues='', sep=",")\
                       .load(f"{read_path}{sellers}/{sellers}.parquet")

logging.info("Seller data loaded successfully.")
print("Seller data loaded successfully.")

# get columns with NULL values
sellers_null_columns = {col: sellers_df.filter(sellers_df[col].isNull()).count() > 0 for col in sellers_df.columns}
if True in sellers_null_columns.values():
    na_columns = [col for col in sellers_null_columns.keys() if sellers_null_columns[col] == True]
    na_columns = ', '.join(na_columns)
    logging.warning(f"Detected columns with NA values: {na_columns}")

Seller data loaded successfully.


In [None]:
from pyspark.sql.functions import *

# prints the number of non-null values in the dataframe
display(sellers_df.select([count(when(col(c).isNotNull(), c)).alias(c) for c in sellers_df.columns]))

seller_id,seller_zip_code_prefix,seller_city,seller_state
3095,3095,3095,3095


In [None]:
sellers_df.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



In [None]:
display(sellers_df.limit(10))

seller_id,seller_zip_code_prefix,seller_city,seller_state
3442f8959a84dea7ee197c632cb2df15,13023,campinas,SP
d1b65fc7debc3361ea86b5f14c68d2e2,13844,mogi guacu,SP
ce3ad9de960102d0677a81f5d0bb7b2d,20031,rio de janeiro,RJ
c0f3eea2e14555b6faeea3dd58c1b1c3,4195,sao paulo,SP
51a04a8a6bdcb23deccc82b0b80742cf,12914,braganca paulista,SP
c240c4061717ac1806ae6ee72be3533b,20920,rio de janeiro,RJ
e49c26c3edfa46d227d5121a6b6e4d37,55325,brejao,PE
1b938a7ec6ac5061a66a3766e0e75f90,16304,penapolis,SP
768a86e36ad6aae3d03ee3c6433d61df,1529,sao paulo,SP
ccc4bbb5f32a6ab2b7066a4130f114e3,80310,curitiba,PR


In [None]:
before_transform_row_count = sellers_df.count()
before_transform_row_count

3095

#### Problems and Possible Improvements:

As seen in the outputs from the cells above, there are some things that we needs to be fixed and some improvements that can be done:

1. Just like the `customers_df`, the `seller_city` and `seller_state` columns can be removed from the DataFrame since we can just reference them from `geolocations_df` using `seller_zip_code_prefix`.

2. We can rename the `seller_zip_code_prefix` column to `zip_code_prefix` to simplify the name.

3. We need to check if the data under the `zip_code_prefix` column do exist inside the `geolocations_df` DataFrame.

#### Find rows where the `zip_code_prefix` does not exist inside the `geolocations_df` DataFrame.

From the cell below, we found that there are **7** rows that has `zip_code_prefix` which does not exist inside the `geolocations_df`, adding on to the **278** rows from `customers_df`, totalling to **285** rows. 

What do we do with these so-called invalid `zip_code_prefix` values? 

Just like what we did in the `customers_df`, we will add them into the `geolocations_df` DataFrame so that we don't lose any data from the `sellers_df` DataFrame. We will do this later.

In [None]:
# Join sellers_df with geolocations_df to find non-matching zip_code_prefix
temp_missing_geo = sellers_df.join(
    geolocations_df,
    sellers_df['seller_zip_code_prefix'] == geolocations_df['zip_code_prefix'],
    'left_anti'
)

# Rename the columns to match the geolocations_df
temp_missing_geo = temp_missing_geo[['seller_zip_code_prefix', 'seller_city', 'seller_state']]
temp_missing_geo = temp_missing_geo.withColumnRenamed('seller_zip_code_prefix', 'zip_code_prefix')\
                                   .withColumnRenamed('seller_city', 'city')\
                                   .withColumnRenamed('seller_state', 'state')
# append the rows to the ones we found from customers_df
inconsistent_rows = inconsistent_rows.union(temp_missing_geo)

print(f"Number of inconsistent rows: {inconsistent_rows.count()}")
display(inconsistent_rows)

Number of inconsistent rows: 285


zip_code_prefix,city,state
72300,brasilia,DF
11547,cubatao,SP
64605,picos,PI
72465,brasilia,DF
7729,caieiras,SP
72904,santo antonio do descoberto,GO
35408,glaura,MG
78554,sinop,MT
73369,brasilia,DF
8980,nossa senhora do remedio,SP


#### Remove redundant columns, and rename the `seller_zip_code_prefix` column to `zip_code_prefix`.

Since we can reference the `seller_city` and `seller_state` values from `geolocations_df` DataFrame using the `zip_code_prefix`, we can remove those columns from the DataFrame.

In [None]:
sellers_df = sellers_df[['seller_id', 'seller_zip_code_prefix']]

sellers_df = sellers_df.withColumnRenamed('seller_zip_code_prefix', 'zip_code_prefix')

sellers_df.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- zip_code_prefix: integer (nullable = true)



#### As you can see below, no data in `sellers_df` is lost during the transformation process.

In [None]:
print("Before transforming: ", before_transform_row_count)
print("After transforming: ", sellers_df.count())

Before transforming:  3095
After transforming:  3095


<section id="section_2d">
    <h3 style="text-align: center">Section 2-D: Appending new <code>geolocation</code> data</h3>
    <h4>Prepare new geolocation data for insertion</h4>
</section>

From the geolocation data we got from the `customers_df` and `sellers_df`, we need to prepare them before inserting into the `geolocations_df`. 

The preparations include:
- Grouping them by the `zip_code_prefix` to only have distinct `zip_code_prefix` values inside the DataFrame
- This also means the `city` and `state` needs to aggregated similar to the one we did in the `geolocations_df`
- We need to add the `latitude` and `longitude` columns with the `null` values temporarily (the actual values will be calculated later)

Only after that, we can insert them into the `geolocations_df` using the `union` function.

In [None]:
# Apply the UDF to create a new column 'city_normalized'
inconsistent_rows = inconsistent_rows.withColumn('city_normalized', unidecode_udf('city'))

# Sort the DataFrame by 'zip_code_prefix', 'city', and 'state' columns in the specified order
inconsistent_rows = inconsistent_rows.orderBy(['zip_code_prefix', 'city', 'state'], ascending=[True, False, False])

# Create a window partitioned by 'zip_code_prefix' to calculate the most frequent city and state
window_spec = Window.partitionBy("zip_code_prefix")

# Calculate the most frequent 'city' and 'state' using mode-like functionality
df_grouped = inconsistent_rows.withColumn(
               "city_freq", F.count("city_normalized").over(window_spec)
            ).withColumn(
               "state_freq", F.count("state").over(window_spec)
            )

# Now, we will group by 'zip_code_prefix' and aggregate city, state
new_geo_df = df_grouped.groupBy("zip_code_prefix").agg(
                  F.first(F.col("city_normalized")).alias("city"),
                  F.first(F.col("state")).alias("state")
               )

# add the two columns with null values by default
new_geo_df = new_geo_df.withColumn("latitude", lit(None))\
                       .withColumn("longitude", lit(None))

# insert the data into the geolocations_df and sort the DataFrame by 'zip_code_prefix'
temp_df = geolocations_df.union(new_geo_df)
temp_df = temp_df.sort(asc("zip_code_prefix"))
display(temp_df)

zip_code_prefix,city,state,latitude,longitude
1001,sao paulo,SP,-23.550189776551765,-46.6340235559042
1002,sao paulo,SP,-23.54814573176355,-46.63497921074497
1003,sao paulo,SP,-23.54899372481316,-46.63573130997588
1004,sao paulo,SP,-23.549798842277006,-46.634756943789704
1005,sao paulo,SP,-23.549456199830743,-46.636732948036894
1006,sao paulo,SP,-23.55010181145287,-46.636136735174176
1007,sao paulo,SP,-23.550046202351663,-46.637251459637646
1008,sao paulo,SP,-23.54600174938346,-46.635885921357776
1009,sao paulo,SP,-23.546835208320864,-46.6364906953122
1010,sao paulo,SP,-23.546389438369268,-46.635226081509046


#### Calculating the `latitude` and `longitude` values.

To compute the `latitude` and `longitude` values for the new `zip_code_prefix` values, we will use two Window functions:
- Average (taking the average of the values from the row before and after the current row)
- Lag (taking the value from the row before the previous row)

The reason why we do two Window functions is that doing the average window function was not enough. There were still some rows with `null` values so we have to do the lag window function after that.

In [None]:
# define the window function to include the previous and next rows
window_spec = Window.orderBy("zip_code_prefix").rowsBetween(-1, 1)

# Fill missing latitude and longitude values with the average of previous and next rows
temp_df = temp_df.withColumn(
            "latitude",
            F.coalesce("latitude", F.avg("latitude").over(window_spec))
        ).withColumn(
            "longitude",
            F.coalesce("longitude", F.avg("longitude").over(window_spec))
        )

# define the window function
window_spec = Window.orderBy("zip_code_prefix")

# Fill missing latitude and longitude values with the previous previous row
temp_df = temp_df.withColumn(
                "latitude",
                F.coalesce("latitude", F.lag("latitude", 2).over(window_spec))
            ).withColumn(
                "longitude",
                F.coalesce("longitude", F.lag("longitude", 2).over(window_spec))
            )

# sort the DataFrame by 'zip_code_prefix' and assign back to geolocations_df
geolocations_df = temp_df.sort(asc("zip_code_prefix"))
display(geolocations_df.limit(50))

zip_code_prefix,city,state,latitude,longitude
1001,sao paulo,SP,-23.550189776551765,-46.6340235559042
1002,sao paulo,SP,-23.54814573176355,-46.63497921074497
1003,sao paulo,SP,-23.54899372481316,-46.63573130997588
1004,sao paulo,SP,-23.549798842277006,-46.634756943789704
1005,sao paulo,SP,-23.549456199830743,-46.636732948036894
1006,sao paulo,SP,-23.55010181145287,-46.636136735174176
1007,sao paulo,SP,-23.550046202351663,-46.637251459637646
1008,sao paulo,SP,-23.54600174938346,-46.635885921357776
1009,sao paulo,SP,-23.546835208320864,-46.6364906953122
1010,sao paulo,SP,-23.546389438369268,-46.635226081509046


#### Verifying that there are no more `null` values in the `geolocations_df`

In [None]:
from pyspark.sql.functions import *

# prints the number of non-null values in the dataframe
display(geolocations_df.select([count(when(col(c).isNotNull(), c)).alias(c) for c in geolocations_df.columns]))

zip_code_prefix,city,state,latitude,longitude
19177,19177,19177,19177,19177


<section id="section_2e">
    <h3 style="text-align: center">Section 2-E: The <code>orders</code> data</h3>
    <h4>Read in the parquet file and store inside a DataFrame called <code>orders_df</code></h4>
</section>

In [None]:
orders_df = spark.read.format('parquet')\
                      .options(nullValues='', sep=",")\
                      .load(f"{read_path}{orders}/{orders}.parquet")

logging.info("Order data loaded successfully.")
print("Order data loaded successfully.")

# get columns with NULL values
orders_null_columns = {col: orders_df.filter(orders_df[col].isNull()).count() > 0 for col in orders_df.columns}
if True in orders_null_columns.values():
    na_columns = [col for col in orders_null_columns.keys() if orders_null_columns[col] == True]
    na_columns = ', '.join(na_columns)
    logging.warning(f"Detected columns with NA values: {na_columns}")

Order data loaded successfully.


In [None]:
from pyspark.sql.functions import *

# prints the number of non-null values in the dataframe
display(orders_df.select([count(when(col(c).isNotNull(), c)).alias(c) for c in orders_df.columns]))

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
99441,99441,99441,99441,99281,97658,96476,99441


In [None]:
orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [None]:
display(orders_df.limit(10))

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02T10:56:33Z,2017-10-02T11:07:15Z,2017-10-04T19:55:00Z,2017-10-10T21:25:13Z,2017-10-18T00:00:00Z
53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24T20:41:37Z,2018-07-26T03:24:27Z,2018-07-26T14:31:00Z,2018-08-07T15:27:45Z,2018-08-13T00:00:00Z
47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08T08:38:49Z,2018-08-08T08:55:23Z,2018-08-08T13:50:00Z,2018-08-17T18:06:29Z,2018-09-04T00:00:00Z
949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18T19:28:06Z,2017-11-18T19:45:59Z,2017-11-22T13:39:59Z,2017-12-02T00:28:42Z,2017-12-15T00:00:00Z
ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13T21:18:39Z,2018-02-13T22:20:29Z,2018-02-14T19:46:34Z,2018-02-16T18:17:02Z,2018-02-26T00:00:00Z
a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608,delivered,2017-07-09T21:57:05Z,2017-07-09T22:10:13Z,2017-07-11T14:58:04Z,2017-07-26T10:57:55Z,2017-08-01T00:00:00Z
136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,invoiced,2017-04-11T12:22:08Z,2017-04-13T13:25:17Z,,,2017-05-09T00:00:00Z
6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222,delivered,2017-05-16T13:10:30Z,2017-05-16T13:22:11Z,2017-05-22T10:07:46Z,2017-05-26T12:55:51Z,2017-06-07T00:00:00Z
76c6e866289321a7c93b82b54852dc33,f54a9f0e6b351c431402b8461ea51999,delivered,2017-01-23T18:29:09Z,2017-01-25T02:50:47Z,2017-01-26T14:16:31Z,2017-02-02T14:08:10Z,2017-03-06T00:00:00Z
e69bfb5eb88e0ed6a785585b27e16dbf,31ad1d1b63eb9962463f764d4e6e0c9d,delivered,2017-07-29T11:55:02Z,2017-07-29T12:05:32Z,2017-08-10T19:45:24Z,2017-08-16T17:14:30Z,2017-08-23T00:00:00Z


In [None]:
before_transform_row_count = orders_df.count()
before_transform_row_count

99441

#### Problems and Possible Improvements:

As seen in the outputs from the cells above, there are some things that we needs to be fixed and some improvements that can be done:

1. We need to change the data type of the `order_estimated_delivery_date` column. Judging from the data, we need to convert them into the Date data type.

2. We need to check if the data under the `customer_id` column do exist inside the `customers_df` DataFrame.

#### Change the data type of the `order_estimated_delivery_date` column to `DateType()`.

In [None]:
orders_df = orders_df.withColumn(
    "order_estimated_delivery_date", 
    orders_df["order_estimated_delivery_date"].cast(DateType())
)

orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: date (nullable = true)



#### Find rows where the `customer_id` does not exist inside the `customers_df` DataFrame.

Fortunately, from the cell below, we found that there are no invalid `customer_id`.

In [None]:
# Join orders_df with customers_df to find non-matching customer_id
inconsistent_rows = orders_df.join(
    customers_df,
    orders_df['customer_id'] == customers_df['customer_id'],
    'left_anti'
)

print(f"Number of inconsistent rows: {inconsistent_rows.count()}")
display(inconsistent_rows)

Number of inconsistent rows: 0


order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date


#### As you can see below, no data in `orders_df` is lost during the transformation process.

In [None]:
print("Before transforming: ", before_transform_row_count)
print("After transforming: ", orders_df.count())

Before transforming:  99441
After transforming:  99441


<section id="section_2f">
    <h3 style="text-align: center">Section 2-F: The <code>product_category_name_translation</code> data</h3>
    <h4>Read in the parquet file and store inside a DataFrame called <code>product_categories_df</code></h4>
</section>

In [None]:
product_categories_df = spark.read.format('parquet')\
                                  .options(nullValues='', sep=",")\
                                  .load(f"{read_path}{product_category_name_translations}/{product_category_name_translations}.parquet")

logging.info("Product category data loaded successfully.")
print("Product category data loaded successfully.")

# get columns with NULL values
product_categories_null_columns = {col: product_categories_df.filter(product_categories_df[col].isNull()).count() > 0 for col in product_categories_df.columns}
if True in product_categories_null_columns.values():
    na_columns = [col for col in product_categories_null_columns.keys() if product_categories_null_columns[col] == True]
    na_columns = ', '.join(na_columns)
    logging.warning(f"Detected columns with NA values: {na_columns}")

Product category data loaded successfully.


In [None]:
from pyspark.sql.functions import *

# prints the number of non-null values in the dataframe
display(product_categories_df.select([count(when(col(c).isNotNull(), c)).alias(c) for c in product_categories_df.columns]))

product_category_name,product_category_name_english
71,71


In [None]:
product_categories_df.printSchema()

root
 |-- product_category_name: string (nullable = true)
 |-- product_category_name_english: string (nullable = true)



In [None]:
temp_prod_cat = product_categories_df.sort(asc('product_category_name_english')).limit(30)
display(temp_prod_cat.sort(desc('product_category_name_english')).limit(10))

product_category_name,product_category_name_english
fashion_roupa_infanto_juvenil,fashion_childrens_clothes
fashion_bolsas_e_acessorios,fashion_bags_accessories
fashion_roupa_feminina,fashio_female_clothing
eletronicos,electronics
dvds_blu_ray,dvds_blu_ray
bebidas,drinks
fraldas_higiene,diapers_and_hygiene
construcao_ferramentas_ferramentas,costruction_tools_tools
construcao_ferramentas_jardim,costruction_tools_garden
cool_stuff,cool_stuff


#### Problems and Possible Improvements:

As seen in the outputs from the cells above, there are some things that we needs to be fixed and some improvements that can be done:

1. We need to remove the `_` underscore from the string values in the dataset.

2. As you can see in the output of the DataFrame above, some of the English words are misspelled so we need to rectify the spelling.

#### Remove the `_` underscore from the string values.

In [None]:
product_categories_df = product_categories_df.withColumn(
                            'product_category_name', 
                            translate(product_categories_df['product_category_name'], '_', ' ')
                        )
product_categories_df = product_categories_df.withColumn(
                            'product_category_name_english', 
                            translate(product_categories_df['product_category_name_english'], '_',' ')
                        )

display(product_categories_df.limit(5))

product_category_name,product_category_name_english
beleza saude,health beauty
informatica acessorios,computers accessories
automotivo,auto
cama mesa banho,bed bath table
moveis decoracao,furniture decor


#### Define the SpellChecker method

We use the SpellChecker library to check the spelling of the English words found under the `product_category_name_english` column.

We also found that some of the terms under the column are correct terms such as `blu`, `cd`, `christmas`, and `dvd`. However, they are deemed incorrect by SpellChecker and are changed to different words. So, we made a list with all these extra words to prevent them from being corrected by SpellChecker.

In [None]:
spell = SpellChecker()
# load in additional words that we deemed to be correct which the SpellChecker does not
acceptable_words = [
    'dvd',
    'dvds',
    'agro',
    'christmas',
    'cd',
    'cds',
    'blu'
] 
spell.word_frequency.load_words(acceptable_words)

# we split the text into individual words and check the words one by one
# afterwards, we join them back together to a single string and return it
def correct_spelling(text):
    list_of_words = text.split()
    for i in range(len(list_of_words)):
        word = list_of_words[i]
        list_of_words[i] = spell.correction(word)
    return ' '.join(list_of_words)

# Register the UDF
correct_spelling_udf = udf(correct_spelling, StringType())

#### Apply the SpellChecker method on the `product_category_name_english` column

As you can see in the output below, the spelling of the English words are corrected.

In [None]:
product_categories_df = product_categories_df.withColumn(
                                'product_category_name_english', 
                                correct_spelling_udf(product_categories_df['product_category_name_english'])
                        )

temp_prod_cat = product_categories_df.sort(asc('product_category_name_english')).limit(30)
display(temp_prod_cat.sort(desc('product_category_name_english')).limit(10))

product_category_name,product_category_name_english
fashion roupa feminina,fashion female clothing
fashion roupa infanto juvenil,fashion children clothes
fashion bolsas e acessorios,fashion bags accessories
eletronicos,electronics
dvds blu ray,dvds blu ray
bebidas,drinks
fraldas higiene,diapers and hygiene
cool stuff,cool stuff
construcao ferramentas ferramentas,construction tools tools
construcao ferramentas seguranca,construction tools safety


#### As there are no problems with membership constraints with other DataFrames and missing values, there are no difference in the number of records before and after cleaning the data for `product_categories_df`.

<section id="section_2g">
    <h3 style="text-align: center">Section 2-G: The <code>products</code> data</h3>
    <h4>Read in the parquet file and store inside a DataFrame called <code>products_df</code></h4>
</section>

In [None]:
products_df = spark.read.format('parquet')\
                        .options(nullValues='', sep=",")\
                        .load(f"{read_path}{products}/{products}.parquet")

logging.info("Product data loaded successfully.")
print("Product data loaded successfully.")

# get columns with NULL values
products_null_columns = {col: products_df.filter(products_df[col].isNull()).count() > 0 for col in products_df.columns}
if True in products_null_columns.values():
    na_columns = [col for col in products_null_columns.keys() if products_null_columns[col] == True]
    na_columns = ', '.join(na_columns)
    logging.warning(f"Detected columns with NA values: {na_columns}")

Product data loaded successfully.


In [None]:
from pyspark.sql.functions import *

# prints the number of non-null values in the dataframe
display(products_df.select([count(when(col(c).isNotNull(), c)).alias(c) for c in products_df.columns]))

product_id,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm
32951,32341,32341,32341,32341,32949,32949,32949,32949


In [None]:
products_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)



In [None]:
display(products_df.limit(5))

product_id,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm
1e9e8ef04dbcff4541ed26657ea517e5,perfumaria,40,287,1,225,16,10,14
3aa071139cb16b67ca9e5dea641aaa2f,artes,44,276,1,1000,30,18,20
96bd76ec8810374ed1b65e291975717f,esporte_lazer,46,250,1,154,18,9,15
cef67bcfe19066a932b7673e239eb23d,bebes,27,261,1,371,26,4,26
9dc1a7de274444849c219cff195d0b71,utilidades_domesticas,37,402,4,625,20,17,13


In [None]:
before_transform_row_count = products_df.count()
before_transform_row_count

32951

#### Problems and Possible Improvements:

As seen in the outputs from the cells above, there are some things that we needs to be fixed and some improvements that can be done:

1. There are some rows that have `null` values under the `product_category_name` so we need to remove them.

2. There are null values under the integer columns so we need to set them as `0`.

3. The columns `product_name_lenght` and `product_description_lenght` have wrong spellings for the word `length`.

4. Since we remove the `_` underscore from the `product_category_name` in the `product_categories_df`, we need to do the same to the same column in this DataFrame.

5. We need to check if the values under the `product_category_name` exists inside the `product_categories_df`.

#### Remove rows with null values under the `product_category_name` and fill in the null values under quantitative columns as `0`.

We decided to remove rows with null values under the `product_category_name` since it makes the data incomplete.

As for the quantitative columns with null values, we need to fill them with a number (zero / 0) before changing their data type to integer. If we change the data type before filling the null values, they will raise an error.

In [None]:
# Remove rows with missing values under product_category_name
products_df = products_df.dropna(subset=['product_category_name'])

# Fill missing values with 0
products_na_values = {
    "product_name_lenght": 0,
    "product_description_lenght": 0,
    "product_photos_qty": 0,
    "product_weight_g": 0,
    "product_length_cm": 0,
    "product_height_cm": 0,
    "product_width_cm": 0
}
products_df = products_df.fillna(products_na_values)

from pyspark.sql.functions import *
# prints the number of non-null values in the dataframe
display(products_df.select([count(when(col(c).isNotNull(), c)).alias(c) for c in products_df.columns]))

product_id,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm
32341,32341,32341,32341,32341,32341,32341,32341,32341


#### Correct the spelling of the columns `product_name_lenght` and `product_description_lenght` for the word `length`

In [None]:
products_col_names = {
    "product_name_lenght": "product_name_length",
    "product_description_lenght": "product_description_length"
}

for old_col, new_col in products_col_names.items():
    products_df = products_df.withColumnRenamed(old_col, new_col)

products_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_length: integer (nullable = false)
 |-- product_description_length: integer (nullable = false)
 |-- product_photos_qty: integer (nullable = false)
 |-- product_weight_g: integer (nullable = false)
 |-- product_length_cm: integer (nullable = false)
 |-- product_height_cm: integer (nullable = false)
 |-- product_width_cm: integer (nullable = false)



#### Remove the `_` underscore from the values under `product_category_name`

In [None]:
products_df = products_df.withColumn(
    'product_category_name', 
    translate(products_df['product_category_name'], '_', ' ')
)

#### Find rows where the `product_category_name` does not exist inside the `product_categories_df` DataFrame.

From the cell below, we found that there are **13** rows that has `product_category_name` which does not exist inside the `product_categories_df`. 

What do we do with these so-called invalid `product_category_name` values? 

We will add them into the `product_categories_df` DataFrame so that we don't lose any data from the `products_df` DataFrame. We will do this later.

In [None]:
# Join products_df with product_categories_df to find non-matching product_category_name
inconsistent_rows = products_df.join(
    product_categories_df,
    products_df['product_category_name'] == product_categories_df['product_category_name'],
    'left_anti'
)

print(f"Number of inconsistent rows: {inconsistent_rows.count()}")
display(inconsistent_rows)

Number of inconsistent rows: 13


product_id,product_category_name,product_name_length,product_description_length,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm
0105b5323d24fc655f73052694dbbb3a,pc gamer,59,621,4,2839,19,16,18
6fd83eb3e0799b775e4f946bd66657c0,portateis cozinha e preparadores de alimentos,52,280,1,1200,25,33,25
5d923ead886c44b86845f69e50520c3e,portateis cozinha e preparadores de alimentos,58,284,1,1200,25,33,25
6727051471a0fc4a0e7737b57bff2549,pc gamer,60,1532,3,650,16,22,20
bed164d9d628cf0593003389c535c6e0,portateis cozinha e preparadores de alimentos,54,382,2,850,30,21,22
1220978a08a6b29a202bc015b18250e9,portateis cozinha e preparadores de alimentos,46,280,1,1200,25,33,25
ae62bb0f95af63d64eae5f93dddea8d3,portateis cozinha e preparadores de alimentos,59,927,1,10600,40,20,38
1954739d84629e7323a4295812a3e0ec,portateis cozinha e preparadores de alimentos,58,792,4,750,30,30,30
dbe520fb381ad695a7e1f2807d20c765,pc gamer,60,840,6,800,18,22,22
c7a3f1a7f9eef146cc499368b578b884,portateis cozinha e preparadores de alimentos,52,1372,5,7350,40,30,23


#### Difference before and after cleaning:

As you can see below, the `products_df` DataFrame lost **610** records due to missing `product_category_name` values from **32,951** records to **32,341** records.

In [None]:
before_transform_row_count

32951

In [None]:
products_df.count()

32341

<section id="section_2h">
    <h3 style="text-align: center">Section 2-H: Appending new <code>product_category_name_translations</code> data</h3>
</section>

#### Preparing the `product_category_name` values before inserting into the `product_categories_df`.

Before adding the new `product_category_name` values to the `product_categories_df`, we need to do a few things:
- Get the distinct `product_category_name` values so we use the `distinct()` function
- Define the Translator to translate the text to English
- Translate the `product_category_name` values into English and store it in a new column called `product_category_name_english`

In [None]:
inconsistent_rows = inconsistent_rows[['product_category_name']].distinct()

translator = GoogleTranslator(source='pt', target='en')

# we split the text into individual words and check the words one by one
# afterwards, we join them back together to a single string and return it
def translate_text(text):
    if text is None or not isinstance(text, str) or len(text) == 0:
        return None
    
    # Translates some text into English
    translated = translator.translate(text)
    
    if translated is None or not isinstance(translated, str) or len(translated) == 0:
        return None
    return translated.strip()

# Register the UDF
translation_udf = udf(translate_text, StringType())

inconsistent_rows = inconsistent_rows.withColumn(
        'product_category_name_english', 
        translation_udf(inconsistent_rows['product_category_name'])
)

display(inconsistent_rows)

product_category_name,product_category_name_english
portateis cozinha e preparadores de alimentos,portable kitchen and food preparers
pc gamer,pc gamer


#### Add the new `product_category_name` values into the `product_categories_df`

In [None]:
product_categories_df = product_categories_df.union(inconsistent_rows)
display(product_categories_df)

product_category_name,product_category_name_english
beleza saude,health beauty
informatica acessorios,computers accessories
automotivo,auto
cama mesa banho,bed bath table
moveis decoracao,furniture decor
esporte lazer,sports leisure
perfumaria,perfumery
utilidades domesticas,housewares
telefonia,telephony
relogios presentes,watches gifts


<section id="section_2i">
    <h3 style="text-align: center">Section 2-I: The <code>order_reviews</code> data</h3>
    <h4>Read in the parquet file and store inside a DataFrame called <code>order_reviews_df</code></h4>
</section>

In [None]:
order_reviews_df = spark.read.format('parquet')\
                             .options(nullValues='', sep=",", quote='"', escape='"', multiLine='True')\
                             .load(f"{read_path}{order_reviews}/{order_reviews}.parquet")

logging.info("Order review data loaded successfully.")
print("Order review data loaded successfully.")

# get columns with NULL values
order_reviews_null_columns = {col: order_reviews_df.filter(order_reviews_df[col].isNull()).count() > 0 for col in order_reviews_df.columns}
if True in order_reviews_null_columns.values():
    na_columns = [col for col in order_reviews_null_columns.keys() if order_reviews_null_columns[col] == True]
    na_columns = ', '.join(na_columns)
    logging.warning(f"Detected columns with NA values: {na_columns}")

Order review data loaded successfully.


In [None]:
from pyspark.sql.functions import *

# prints the number of non-null values in the dataframe
display(order_reviews_df.select([count(when(col(c).isNotNull(), c)).alias(c) for c in order_reviews_df.columns]))

review_id,order_id,review_score,review_comment_title,review_comment_message,review_creation_date,review_answer_timestamp
99224,99224,99224,11568,40977,99224,99224


In [None]:
order_reviews_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: timestamp (nullable = true)
 |-- review_answer_timestamp: timestamp (nullable = true)



In [None]:
display(order_reviews_df.limit(10))

review_id,order_id,review_score,review_comment_title,review_comment_message,review_creation_date,review_answer_timestamp
7bc2406110b926393aa56f80a40eba40,73fc7af87114b39712e6da79b0a377eb,4,,,2018-01-18T00:00:00Z,2018-01-18T21:46:59Z
80e641a11e56f04c1ad469d5645fdfde,a548910a1c6147796b98fdf73dbeba33,5,,,2018-03-10T00:00:00Z,2018-03-11T03:05:13Z
228ce5500dc1d8e020d8d1322874b6f0,f9e4b658b201a9f2ecdecbb34bed034b,5,,,2018-02-17T00:00:00Z,2018-02-18T14:36:24Z
e64fb393e7b32834bb789ff8bb30750e,658677c97b385a9be170737859d3511b,5,,Recebi bem antes do prazo estipulado.,2017-04-21T00:00:00Z,2017-04-21T22:02:06Z
f7c4243c7fe1938f181bec41a392bdeb,8e6bfb81e283fa7e4f11123a3fb894f1,5,,Parabéns lojas lannister adorei comprar pela Internet seguro e prático Parabéns a todos feliz Páscoa,2018-03-01T00:00:00Z,2018-03-02T10:26:53Z
15197aa66ff4d0650b5434f1b46cda19,b18dcdf73be66366873cd26c5724d1dc,1,,,2018-04-13T00:00:00Z,2018-04-16T00:39:37Z
07f9bee5d1b850860defd761afa7ff16,e48aa0d2dcec3a2e87348811bcfdf22b,5,,,2017-07-16T00:00:00Z,2017-07-18T19:30:34Z
7c6400515c67679fbee952a7525281ef,c31a859e34e3adac22f376954e19b39d,5,,,2018-08-14T00:00:00Z,2018-08-14T21:36:06Z
a3f6f7f6f433de0aefbb97da197c554c,9c214ac970e84273583ab523dfafd09b,5,,,2017-05-17T00:00:00Z,2017-05-18T12:05:37Z
8670d52e15e00043ae7de4c01cc2fe06,b9bf720beb4ab3728760088589c62129,4,recomendo,aparelho eficiente. no site a marca do aparelho esta impresso como 3desinfector e ao chegar esta com outro nome...atualizar com a marca correta uma vez que é o mesmo aparelho,2018-05-22T00:00:00Z,2018-05-23T16:45:47Z


In [None]:
before_transform_row_count = order_reviews_df.count()
before_transform_row_count

99224

#### Problems and Possible Improvements:

As seen in the outputs from the cells above, there are some things that we needs to be fixed and some improvements that can be done:

1. We need to change the data type of the `review_creation_date` column to the Date data type.

2. We need to check if the values under the `order_id` exists inside the `orders_df`. If they don't, we need to remove those records.

3. We can remove the `review_comment_title` and `review_comment_message` columns which are in Portuguese and would be difficult to translate them into English. We also think that they would not be beneficial to our analysis on the data.

#### Change the data type of the `review_creation_date` column to `DateType()`

In [None]:
order_reviews_df = order_reviews_df.withColumn(
    'review_creation_date', 
    order_reviews_df['review_creation_date'].cast(DateType())
)

order_reviews_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: date (nullable = true)
 |-- review_answer_timestamp: timestamp (nullable = true)



#### Find rows where the `order_id` does not exist inside the `orders_df`

Fortunately, there are no problems with invalid `order_id`.

In [None]:
# Join order_reviews_df with orders_df to find non-matching order_id
inconsistent_rows = order_reviews_df.join(
    orders_df,
    order_reviews_df['order_id'] == orders_df['order_id'],
    'left_anti'
)

print(f"Number of inconsistent rows: {inconsistent_rows.count()}")
display(inconsistent_rows)

Number of inconsistent rows: 0


review_id,order_id,review_score,review_comment_title,review_comment_message,review_creation_date,review_answer_timestamp


#### Remove the `review_comment_title` and `review_comment_message` columns.

In [None]:
order_reviews_df = order_reviews_df.drop('review_comment_title', 'review_comment_message')

order_reviews_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_creation_date: date (nullable = true)
 |-- review_answer_timestamp: timestamp (nullable = true)



#### As you can see below, no data in `order_reviews_df` is lost during the transformation process.

In [None]:
print("Before transforming: ", before_transform_row_count)
print("After transforming: ", order_reviews_df.count())

Before transforming:  99224
After transforming:  99224


<section id="section_2j">
    <h3 style="text-align: center">Section 2-J: The <code>order_items</code> data</h3>
    <h4>Read in the parquet file and store inside a DataFrame called <code>order_items_df</code></h4>
</section>

In [None]:
order_items_df = spark.read.format('parquet')\
                             .options(nullValues='', sep=",")\
                             .load(f"{read_path}{order_items}/{order_items}.parquet")

logging.info("Order item data loaded successfully.")
print("Order item data loaded successfully.")

# get columns with NULL values
order_items_null_columns = {col: order_items_df.filter(order_items_df[col].isNull()).count() > 0 for col in order_items_df.columns}
if True in order_items_null_columns.values():
    na_columns = [col for col in order_items_null_columns.keys() if order_items_null_columns[col] == True]
    na_columns = ', '.join(na_columns)
    logging.warning(f"Detected columns with NA values: {na_columns}")

Order item data loaded successfully.


In [None]:
from pyspark.sql.functions import *

# prints the number of non-null values in the dataframe
display(order_items_df.select([count(when(col(c).isNotNull(), c)).alias(c) for c in order_items_df.columns]))

order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
112650,112650,112650,112650,112650,112650,112650


In [None]:
order_items_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: decimal(38,18) (nullable = true)
 |-- freight_value: decimal(38,18) (nullable = true)



In [None]:
display(order_items_df.limit(5))

order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
00010242fe8c5a6d1ba2dd792cb16214,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-09-19T09:45:35Z,58.9,13.29
00018f77f2f0320c557190d7a144bdd3,1,e5f2d52b802189ee658865ca93d83a8f,dd7ddc04e1b6c2c614352b383efe2d36,2017-05-03T11:05:13Z,239.9,19.93
000229ec398224ef6ca0657da4fc703e,1,c777355d18b72b67abbeef9df44fd0fd,5b51032eddd242adc84c38acab88f23d,2018-01-18T14:48:30Z,199.0,17.87
00024acbcdf0a6daa1e931b038114c75,1,7634da152a4610f1595efa32f14722fc,9d7a1d34a5052409006425275ba1c2b4,2018-08-15T10:10:18Z,12.99,12.79
00042b26cf59d7ce69dfabb4e55b4fd9,1,ac6c3623068f30de03045865e4e10089,df560393f3a51e74553ab94004ba5c87,2017-02-13T13:57:51Z,199.9,18.14


In [None]:
before_transform_row_count = order_items_df.count()
before_transform_row_count

112650

#### Problems and Possible Improvements:

As seen in the outputs from the cells above, there are some things that we needs to be fixed and some improvements that can be done:

1. We need to change the data types of the `price` and `freight_value` columns to `DecimalType` with 2 decimal points precision.

2. We need to remove rows where the `order_id` does not exist in the `orders_df`.

3. We need to remove rows where the `product_id` does not exist in the `products_df`.

4. We need to remove rows where the `seller_id` does not exist in the `sellers_df`.

#### Change the data types of the `price` and `freight_value` columns to `DecimalType` with 2 decimal points precision.

In [None]:
order_items_data_types = {
    "price": DecimalType(scale=2),
    "freight_value": DecimalType(scale=2)
}

for col, data_type in order_items_data_types.items():
    order_items_df = order_items_df.withColumn(col, order_items_df[col].cast(data_type))

order_items_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: decimal(10,2) (nullable = true)
 |-- freight_value: decimal(10,2) (nullable = true)



#### Find rows where the `order_id` does not exist in the `orders_df`

Fortunately, there are **0** rows with invalid `order_id`.

In [None]:
# Join order_items_df with orders_df to find matching order_id
inconsistent_rows = order_items_df.join(
    orders_df,
    order_items_df['order_id'] == orders_df['order_id'],
    'left_anti'
)

print(f"Number of inconsistent rows: {inconsistent_rows.count()}")
display(inconsistent_rows)

Number of inconsistent rows: 0


order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value


#### Find rows where the `product_id` does not exist in the `products_df`

There are **1,603** rows with invalid `product_id`, so we need to remove them.

In [None]:
# Join order_items_df with products_df to find matching product_id
inconsistent_rows = order_items_df.join(
    products_df,
    order_items_df['product_id'] == products_df['product_id'],
    'left_anti'
)

# Filter order_items_df to keep only consistent rows
order_items_df = order_items_df.join(
    products_df,
    order_items_df['product_id'] == products_df['product_id'],
    'left_semi'
)

print(f"Number of inconsistent rows: {inconsistent_rows.count()}")
display(inconsistent_rows)

Number of inconsistent rows: 1603


order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
0046e1d57f4c07c8c92ab26be8c3dfc0,1,ff6caf9340512b8bf6d2a2a6df032cfa,38e6dada03429a47197d5d584d793b41,2017-10-02T15:49:17Z,7.79,7.78
00482f2670787292280e0a8153d82467,1,a9c404971d1a5b1cbc2e4070e02731fd,702835e4b785b67a084280efca355756,2017-02-17T16:18:07Z,7.6,10.96
004f5d8f238e8908e6864b874eda3391,1,5a848e4ab52fd5445cdc07aab1c40e48,c826c40d7b19f62a09e2d7c5e7295ee2,2018-03-06T09:29:25Z,122.99,15.61
0057199db02d1a5ef41bacbf41f8f63b,1,41eee23c25f7a574dfaf8d5c151dbb12,e5a3438891c0bfdb9394643f95273d8e,2018-01-25T09:07:51Z,20.3,16.79
006cb7cafc99b29548d4f412c7f9f493,1,e10758160da97891c2fdcbc35f0f031d,323ce52b5b81df2cd804b017b7f09aa7,2018-02-22T13:35:28Z,56.0,14.14
00e6786fee8f476835c151a4d3fa9bf1,1,76d1a1a9d21ab677a61c3ae34b1b352f,c826c40d7b19f62a09e2d7c5e7295ee2,2018-03-15T01:31:04Z,159.77,16.0
00ed64bc080d87b4ab7f0c433bc5e98f,1,fbb1cfc2810efabf3235eccf4530f4ae,0c8380b62e38e8a1e6adbeba7eb9688c,2017-06-06T18:22:47Z,44.9,16.11
010a35dc7fc3808a03d243d2fd8c6e35,1,5a848e4ab52fd5445cdc07aab1c40e48,c826c40d7b19f62a09e2d7c5e7295ee2,2017-12-21T19:10:15Z,122.99,9.06
011080783e02d05b26c8c158b3653a42,1,9f69acd4da62618a3f6365b732d00ccd,897060da8b9a21f655304d50fd935913,2018-06-22T15:38:55Z,33.9,13.82
011108c8a3d6eee6807f48a2e639439f,1,ea11e700a343582ad56e4c70e966cb36,01cf7e3d21494c41fb86034f2e714fa1,2017-05-02T17:42:48Z,593.36,98.13


#### Find rows where the `seller_id` does not exist in the `sellers_df`

Fortunately, there are **0** rows with invalid `seller_id`.

In [None]:
# Join order_items_df with sellers_df to find matching seller_id
inconsistent_rows = order_items_df.join(
    sellers_df,
    order_items_df['seller_id'] == sellers_df['seller_id'],
    'left_anti'
)

print(f"Number of inconsistent rows: {inconsistent_rows.count()}")
display(inconsistent_rows)

Number of inconsistent rows: 0


order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value


#### Difference before and after cleaning:

As you can see below, the `order_items_df` DataFrame lost **1,603** records due to duplicate `zip_code_prefix` values from **112,650** records to **111,047** records.

In [None]:
before_transform_row_count

112650

In [None]:
order_items_df.count()

111047

<section id="section_2k">
    <h3 style="text-align: center">Section 2-K: The <code>order_payments</code> data</h3>
    <h4>Read in the parquet file and store inside a DataFrame called <code>order_payments_df</code></h4>
</section>

In [None]:
order_payments_df = spark.read.format('parquet')\
                              .options(nullValues='', sep=",")\
                              .load(f"{read_path}{order_payments}/{order_payments}.parquet")

logging.info("Order payment data loaded successfully.")
print("Order payment data loaded successfully.")

# get columns with NULL values
order_payments_null_columns = {col: order_payments_df.filter(order_payments_df[col].isNull()).count() > 0 for col in order_payments_df.columns}
if True in order_payments_null_columns.values():
    na_columns = [col for col in order_payments_null_columns.keys() if order_payments_null_columns[col] == True]
    na_columns = ', '.join(na_columns)
    logging.warning(f"Detected columns with NA values: {na_columns}")

Order payment data loaded successfully.


In [None]:
from pyspark.sql.functions import *

# prints the number of non-null values in the dataframe
display(order_payments_df.select([count(when(col(c).isNotNull(), c)).alias(c) for c in order_payments_df.columns]))

order_id,payment_sequential,payment_type,payment_installments,payment_value
103886,103886,103886,103886,103886


In [None]:
order_payments_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: decimal(38,18) (nullable = true)



In [None]:
display(order_payments_df.limit(5))

order_id,payment_sequential,payment_type,payment_installments,payment_value
b81ef226f3fe1789b1e8b2acac839d17,1,credit_card,8,99.33
a9810da82917af2d9aefd1278f1dcfa0,1,credit_card,1,24.39
25e8ea4e93396b6fa0d3dd708e76c1bd,1,credit_card,1,65.71
ba78997921bbcdc1373bb41e913ab953,1,credit_card,8,107.78
42fdf880ba16b47b59251dd489d4441a,1,credit_card,2,128.45


In [None]:
before_transform_row_count = order_payments_df.count()
before_transform_row_count

103886

#### Problems and Possible Improvements:

As seen in the outputs from the cells above, there are some things that we needs to be fixed and some improvements that can be done:

1. We need to change the data type of the `payment_value` column to the `DecimalType` with 2 decimal point precision.

2. We need to remove the underscore from the values under the `payment_type` column like "credit_card" to "credit card".

3. We need to remove rows where the `order_id` does not exist in the `orders_df`.

#### Change the data type of the `payment_value` column to the `DecimalType` with 2 decimal point precision.

In [None]:
order_payments_df = order_payments_df.withColumn(
    'payment_value', 
    order_payments_df['payment_value'].cast(DecimalType(scale=2))
)

order_payments_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: decimal(10,2) (nullable = true)



#### Find rows where the `order_id` does not exist inside the `order_id`

Fortunately, there are **0** rows with invalid `order_id`.

In [None]:
# Join order_payments_df with orders_df to find non-matching order_id
inconsistent_rows = order_payments_df.join(
    orders_df,
    order_payments_df['order_id'] == orders_df['order_id'],
    'left_anti'
)

print(f"Number of inconsistent rows: {inconsistent_rows.count()}")
display(inconsistent_rows)

Number of inconsistent rows: 0


order_id,payment_sequential,payment_type,payment_installments,payment_value


#### Remove the underscore from the values under the `payment_type` column like "credit_card" to "credit card".

In [None]:
order_payments_df = order_payments_df.withColumn(
    'payment_type', 
    translate(order_payments_df['payment_type'], '_', ' ')
)

display(order_payments_df.limit(20))

order_id,payment_sequential,payment_type,payment_installments,payment_value
b81ef226f3fe1789b1e8b2acac839d17,1,credit card,8,99.33
a9810da82917af2d9aefd1278f1dcfa0,1,credit card,1,24.39
25e8ea4e93396b6fa0d3dd708e76c1bd,1,credit card,1,65.71
ba78997921bbcdc1373bb41e913ab953,1,credit card,8,107.78
42fdf880ba16b47b59251dd489d4441a,1,credit card,2,128.45
298fcdf1f73eb413e4d26d01b25bc1cd,1,credit card,2,96.12
771ee386b001f06208a7419e4fc1bbd7,1,credit card,1,81.16
3d7239c394a212faae122962df514ac7,1,credit card,3,51.84
1f78449c87a54faf9e96e88ba1491fa9,1,credit card,6,341.09
0573b5e23cbd798006520e1d5b4c6714,1,boleto,1,51.95


#### As you can see below, no data in `order_payments_df` is lost during the transformation process.

In [None]:
print("Before transforming: ", before_transform_row_count)
print("After transforming: ", order_payments_df.count())

Before transforming:  103886
After transforming:  103886


<section id="section_2l">
    <h3 style="text-align: center">Section 2-L: Write to the logging process</h3>
</section>

In [None]:
logging.info("Transformation process completed.")

<section id="section_3">
    <h2 style="text-align: center">Section 3: DataFrame Export</h2>
    <hr>
</section>

<section id="section_3a">
    <h3 style="text-align: center">Section 3-A: Export tables to Delta Tables and store them in the Data Lake Storage</h3>
    <h4><code>geolocations_df</code> --> geolocations</h4>
</section>

In [None]:
path = f"{output_path}{geolocations}/"
path

'abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/geolocations/'

In [None]:
geolocations_df.write.format('delta')\
                     .option("path", path)\
                     .option("overwriteSchema", "true")\
                     .mode('overwrite')\
                     .saveAsTable(geolocations)
dbutils.fs.ls(path)

[FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/geolocations/_delta_log/', name='_delta_log/', size=0, modificationTime=1727621703000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/geolocations/part-00000-1c09b0c6-45cf-469c-befb-2e9418335f92-c000.snappy.parquet', name='part-00000-1c09b0c6-45cf-469c-befb-2e9418335f92-c000.snappy.parquet', size=607595, modificationTime=1727755247000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/geolocations/part-00000-1e62ecf0-a544-4ffb-a609-e8544827a071-c000.snappy.parquet', name='part-00000-1e62ecf0-a544-4ffb-a609-e8544827a071-c000.snappy.parquet', size=607595, modificationTime=1727765712000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/geolocations/part-00000-23cac2ee-0d77-470c-a141-9a408740f913-c000.snappy.parquet', name='part-00000-23cac2ee-0d77-470c-a141-9a408740f913-c000.snappy.parquet', size=607595, modifi

#### `customers_df` --> customers

In [None]:
path = f"{output_path}{customers}/"
path

'abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/customers/'

In [None]:
customers_df.write.format('delta')\
                  .option("path", path)\
                  .option("overwriteSchema", "true")\
                  .mode('overwrite')\
                  .saveAsTable(customers)
dbutils.fs.ls(path)

[FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/customers/_delta_log/', name='_delta_log/', size=0, modificationTime=1727625666000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/customers/part-00000-08433c2a-8860-4dfe-983c-48116c781fce-c000.snappy.parquet', name='part-00000-08433c2a-8860-4dfe-983c-48116c781fce-c000.snappy.parquet', size=6717649, modificationTime=1727752194000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/customers/part-00000-9b0700b0-d838-49ef-9e8b-d3fb3dcaa639-c000.snappy.parquet', name='part-00000-9b0700b0-d838-49ef-9e8b-d3fb3dcaa639-c000.snappy.parquet', size=6698386, modificationTime=1727625668000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/customers/part-00000-d69289a6-1e46-4688-a8b0-28a9dc6433b2-c000.snappy.parquet', name='part-00000-d69289a6-1e46-4688-a8b0-28a9dc6433b2-c000.snappy.parquet', size=6717649, modificationTim

#### `sellers_df` --> sellers

In [None]:
path = f"{output_path}{sellers}/"
path

'abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/sellers/'

In [None]:
sellers_df.write.format('delta')\
                .option("path", path)\
                .option("overwriteSchema", "true")\
                .mode('overwrite')\
                .saveAsTable(sellers)
dbutils.fs.ls(path)

[FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/sellers/_delta_log/', name='_delta_log/', size=0, modificationTime=1727625900000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/sellers/part-00000-16381dd4-86b5-47a7-b93b-39598dd55428-c000.snappy.parquet', name='part-00000-16381dd4-86b5-47a7-b93b-39598dd55428-c000.snappy.parquet', size=113380, modificationTime=1727625901000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/sellers/part-00000-7e4cac2d-0bfc-426c-9380-94d22d053519-c000.snappy.parquet', name='part-00000-7e4cac2d-0bfc-426c-9380-94d22d053519-c000.snappy.parquet', size=114253, modificationTime=1727765726000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/sellers/part-00000-863e9c40-4afe-467e-aa54-1e75c3e84d1b-c000.snappy.parquet', name='part-00000-863e9c40-4afe-467e-aa54-1e75c3e84d1b-c000.snappy.parquet', size=114253, modificationTime=172775978

#### `orders_df` --> orders

In [None]:
path = f"{output_path}{orders}/"
path

'abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/orders/'

In [None]:
orders_df.write.format('delta')\
               .option("path", path)\
               .option("overwriteSchema", "true")\
               .mode('overwrite')\
               .saveAsTable(orders)
dbutils.fs.ls(path)

[FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/orders/_delta_log/', name='_delta_log/', size=0, modificationTime=1727626016000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/orders/part-00000-2e89fc9f-727b-4875-ab7b-bfb7e2818906-c000.snappy.parquet', name='part-00000-2e89fc9f-727b-4875-ab7b-bfb7e2818906-c000.snappy.parquet', size=10638531, modificationTime=1727752205000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/orders/part-00000-3953b616-a1af-43a4-8465-e8da23380636-c000.snappy.parquet', name='part-00000-3953b616-a1af-43a4-8465-e8da23380636-c000.snappy.parquet', size=10638531, modificationTime=1727765731000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/orders/part-00000-7b432f09-0b7e-4c71-86ee-e6d91c9f1c7d-c000.snappy.parquet', name='part-00000-7b432f09-0b7e-4c71-86ee-e6d91c9f1c7d-c000.snappy.parquet', size=10638531, modificationTime=1727759

#### `product_categories_df` --> product_category_name_translations

In [None]:
path = f"{output_path}{product_category_name_translations}/"
path

'abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/product_category_name_translations/'

In [None]:
product_categories_df.write.format('delta')\
                           .option("path", path)\
                           .option("overwriteSchema", "true")\
                           .mode('overwrite')\
                           .saveAsTable(product_category_name_translations)
dbutils.fs.ls(path)

[FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/product_category_name_translations/_delta_log/', name='_delta_log/', size=0, modificationTime=1727626083000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/product_category_name_translations/part-00000-4a56dc55-7e86-4c08-9f0c-1deda40a9990-c000.snappy.parquet', name='part-00000-4a56dc55-7e86-4c08-9f0c-1deda40a9990-c000.snappy.parquet', size=3070, modificationTime=1727759798000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/product_category_name_translations/part-00000-60b6f812-b501-4c4e-9a2c-0b7561656384-c000.snappy.parquet', name='part-00000-60b6f812-b501-4c4e-9a2c-0b7561656384-c000.snappy.parquet', size=3070, modificationTime=1727765737000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/product_category_name_translations/part-00000-6366d76d-079e-4e54-a5f3-7900a3f30c89-c000.snappy.parquet', name='part-

#### `products_df` --> products

In [None]:
path = f"{output_path}{products}/"
path

'abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/products/'

In [None]:
products_df.write.format('delta')\
                 .option("path", path)\
                 .option("overwriteSchema", "true")\
                 .mode('overwrite')\
                 .saveAsTable(products)
dbutils.fs.ls(path)

[FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/products/_delta_log/', name='_delta_log/', size=0, modificationTime=1727626165000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/products/part-00000-1c92392c-8446-4f9c-935e-109f670253a9-c000.snappy.parquet', name='part-00000-1c92392c-8446-4f9c-935e-109f670253a9-c000.snappy.parquet', size=1333943, modificationTime=1727626166000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/products/part-00000-7cce87c9-df5a-447e-b87a-74e60c3c6bc9-c000.snappy.parquet', name='part-00000-7cce87c9-df5a-447e-b87a-74e60c3c6bc9-c000.snappy.parquet', size=1334235, modificationTime=1727755314000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/products/part-00000-8223ffc8-a166-4e3d-857e-d8f7ac8f4cd2-c000.snappy.parquet', name='part-00000-8223ffc8-a166-4e3d-857e-d8f7ac8f4cd2-c000.snappy.parquet', size=1334235, modificationTime=17

#### `order_reviews_df` --> order_reviews

In [None]:
path = f"{output_path}{order_reviews}/"
path

'abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/order_reviews/'

In [None]:
order_reviews_df.write.format('delta')\
                      .option("path", path)\
                      .option("overwriteSchema", "true")\
                      .mode('overwrite')\
                      .saveAsTable(order_reviews)
dbutils.fs.ls(path)

[FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/order_reviews/_delta_log/', name='_delta_log/', size=0, modificationTime=1727756329000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/order_reviews/part-00000-9623f216-6b15-4d77-8850-f8241c2e2608-c000.snappy.parquet', name='part-00000-9623f216-6b15-4d77-8850-f8241c2e2608-c000.snappy.parquet', size=7702143, modificationTime=1727765745000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/order_reviews/part-00000-fba9557d-2052-4925-b3d5-3fcbfdb9b874-c000.snappy.parquet', name='part-00000-fba9557d-2052-4925-b3d5-3fcbfdb9b874-c000.snappy.parquet', size=0, modificationTime=1727760003000)]

#### `order_items_df` --> order_items

In [None]:
path = f"{output_path}{order_items}/"
path

'abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/order_items/'

In [None]:
order_items_df.write.format('delta')\
                    .option("path", path)\
                    .option("overwriteSchema", "true")\
                    .mode('overwrite')\
                    .saveAsTable(order_items)
dbutils.fs.ls(path)

[FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/order_items/_delta_log/', name='_delta_log/', size=0, modificationTime=1727626362000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/order_items/part-00000-1e280443-ebc7-4cb2-a2f4-1eac3eecc818-c000.snappy.parquet', name='part-00000-1e280443-ebc7-4cb2-a2f4-1eac3eecc818-c000.snappy.parquet', size=6603936, modificationTime=1727626365000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/order_items/part-00000-e0c71b1c-0b0d-48a6-abe8-9241b2a6d2a9-c000.snappy.parquet', name='part-00000-e0c71b1c-0b0d-48a6-abe8-9241b2a6d2a9-c000.snappy.parquet', size=6642194, modificationTime=1727765753000)]

#### `order_payments_df` --> order_payments

In [None]:
path = f"{output_path}{order_payments}/"
path

'abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/order_payments/'

In [None]:
order_payments_df.write.format('delta')\
                       .option("path", path)\
                       .option("overwriteSchema", "true")\
                       .mode('overwrite')\
                       .saveAsTable(order_payments)
dbutils.fs.ls(path)

[FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/order_payments/_delta_log/', name='_delta_log/', size=0, modificationTime=1727626447000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/order_payments/part-00000-21bb3426-878a-4e24-904c-168684260132-c000.snappy.parquet', name='part-00000-21bb3426-878a-4e24-904c-168684260132-c000.snappy.parquet', size=3804150, modificationTime=1727626449000),
 FileInfo(path='abfss://transformed@omnioraclestorage.dfs.core.windows.net/olist/order_payments/part-00000-e47e085d-15b9-4f7c-bd56-492d11951543-c000.snappy.parquet', name='part-00000-e47e085d-15b9-4f7c-bd56-492d11951543-c000.snappy.parquet', size=3814788, modificationTime=1727765758000)]

<section id="section_3b">
    <h3 style="text-align: center">Section 3-B: Write to the logging process</h3>
</section>

In [None]:
logging.info("Exported DataFrames to Delta Tables.")