# Module 2 - Cleaning & Transforming Data

## Loading Data

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('OlistData').getOrCreate()

25/05/22 17:44:10 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
hdfs_path = '/data/olist/'

In [3]:
customers_df = spark.read.csv(hdfs_path + 'olist_customers_dataset.csv', inferSchema='true', header='true')
geolocation_df = spark.read.csv(hdfs_path + 'olist_geolocation_dataset.csv', inferSchema='true', header='true')
order_items_df = spark.read.csv(hdfs_path + 'olist_order_items_dataset.csv', inferSchema='true', header='true')
order_payments_df = spark.read.csv(hdfs_path + 'olist_order_payments_dataset.csv', inferSchema='true', header='true')
order_reviews_df = spark.read.csv(hdfs_path + 'olist_order_reviews_dataset.csv', inferSchema='true', header='true')
orders_df = spark.read.csv(hdfs_path + 'olist_orders_dataset.csv', inferSchema='true', header='true')
products_df = spark.read.csv(hdfs_path + 'olist_products_dataset.csv', inferSchema='true', header='true')
sellers_df = spark.read.csv(hdfs_path + 'olist_sellers_dataset.csv', inferSchema='true', header='true')
product_category_translation_df = spark.read.csv(hdfs_path + 'product_category_name_translation.csv', inferSchema='true', header='true')

                                                                                

## Identifying missing values

In [4]:
from pyspark.sql.functions import *

def missing_values(df, df_name):
    print(f'Missing values in {df_name} :')
    df.select([count(when(col(c).isNull(), 1)).alias(c)
    for c in df.columns]).show()

In [6]:
# missing values in customer
missing_values(customers_df, 'customer')

Missing values in customer :
+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



In [8]:
# missing values in sellers
missing_values(sellers_df, 'sellers')

Missing values in sellers :
+---------+----------------------+-----------+------------+
|seller_id|seller_zip_code_prefix|seller_city|seller_state|
+---------+----------------------+-----------+------------+
|        0|                     0|          0|           0|
+---------+----------------------+-----------+------------+



In [11]:
missing_values(order_payments_df, 'payments')

Missing values in payments :




+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
|       0|                 0|           0|                   0|            0|
+--------+------------------+------------+--------------------+-------------+



                                                                                

In [7]:
# missing values in order
missing_values(orders_df, 'orders')

Missing values in orders :
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|              160|                        1783|                         2965|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



In [9]:
# missing values in products
missing_values(products_df, 'products')

Missing values in products :
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|         0|                  610|                610|                       610|               610|               2|                2|                2|               2|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+



- need to handle for 'order_delivered_customer_date' and 'order_delivered_carrier_date' in *orders* and the 610 missing values in *products*

## Handling missing values
To handle missing values we can either 
- drop them
- fill missing values (numerical columns) with some constant 
- impute missing values (continuous data) with mean, median, etc.

In [15]:
products_df_cleaned = products_df.na.drop(subset = ['product_name_lenght', 'product_weight_g'])
missing_values(products_df_cleaned, 'cleaned products')

Missing values in cleaned products :
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|         0|                    0|                  0|                         0|                 0|               0|                0|                0|               0|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+



cleaned the products dataframe succesfully by dropping na values

In [18]:
# fill with a constant value for the two cols of the orders dataframe
orders_df_cleaned = orders_df.fillna({'order_delivered_customer_date':'9999-12-31','order_delivered_carrier_date':'9999-12-31','order_approved_at':'9999-12-31'})

In [19]:
missing_values(orders_df_cleaned, 'cleaned orders')

Missing values in cleaned orders :
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|                0|                           0|                            0|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



replaced the missing dates with an extremely large number

In [23]:
# missing values in sellers
missing_values(order_reviews_df, 'reviews')

Missing values in reviews :


                                                                                

+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+
|review_id|order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+
|        1|    2236|        2380|               92157|                 63079|                8764|                   8785|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+



In [46]:
reviews_df_cleaned = order_reviews_df.na.drop(subset = ['order_id','review_id'])
missing_values(reviews_df_cleaned, 'cleaned reviews')

Missing values in cleaned reviews :
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+
|review_id|order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+
|        0|       0|         144|               89922|                 60843|                6528|                   6548|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+



In [47]:
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import col 

reviews_df_cleaned = reviews_df_cleaned.withColumn("review_score", col("review_score").cast("int"))

imputer = Imputer(inputCols=['review_score'],outputCols=['review_score_imputed']).setStrategy('median')
reviews_df_cleaned = imputer.fit(reviews_df_cleaned).transform(reviews_df_cleaned)
missing_values(reviews_df_cleaned, 'cleaned reviews')

Missing values in cleaned reviews :
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+--------------------+
|review_id|order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|review_score_imputed|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+--------------------+
|        0|       0|        2698|               89922|                 60843|                6528|                   6548|                   0|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+--------------------+



missing values increased after casting to int
likely due to corrupt data

In [48]:
order_reviews_df.groupBy('review_score').count().orderBy('count', ascending = False).show(10)

+-------------------+-----+
|       review_score|count|
+-------------------+-----+
|                  5|57328|
|                  4|19142|
|                  1|11424|
|                  3| 8179|
|                  2| 3151|
|               NULL| 2380|
|2018-05-19 00:00:00|    5|
|2017-12-29 00:00:00|    4|
|2018-02-01 00:00:00|    4|
|2018-01-04 00:00:00|    4|
+-------------------+-----+
only showing top 10 rows



corrupt data detected

In [49]:
reviews_df_cleaned = reviews_df_cleaned.na.drop(subset = ['review_score'])
missing_values(reviews_df_cleaned, 'cleaned reviews')

Missing values in cleaned reviews :
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+--------------------+
|review_id|order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|review_score_imputed|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+--------------------+
|        0|       0|           0|               87659|                 58250|                3853|                   3855|                   0|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+--------------------+



                                                                                

In [50]:
reviews_df_cleaned = reviews_df_cleaned.fillna({'review_comment_title':'empty','review_comment_message':'empty'})
missing_values(reviews_df_cleaned, 'cleaned reviews')

Missing values in cleaned reviews :
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+--------------------+
|review_id|order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|review_score_imputed|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+--------------------+
|        0|       0|           0|                   0|                     0|                3853|                   3855|                   0|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+--------------------+



## Verifying the schemas

In [51]:
reviews_df_cleaned.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_comment_title: string (nullable = false)
 |-- review_comment_message: string (nullable = false)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable = true)
 |-- review_score_imputed: integer (nullable = true)



In [52]:
orders_df_cleaned.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [53]:
products_df_cleaned.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)



In [59]:
customers_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [58]:
customers_df = customers_df.withColumn('customer_zip_code_prefix',col('customer_zip_code_prefix').cast('string'))

In [60]:
geolocation_df.printSchema()

root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)



In [61]:
order_items_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



In [62]:
order_payments_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [66]:
sellers_df.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: string (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



In [65]:
sellers_df = sellers_df.withColumn('seller_zip_code_prefix',col('seller_zip_code_prefix').cast('string'))

## Refining datasets

In [71]:
payments_df = order_payments_df.withColumn('payment_type',when(col('payment_type')=='boleto','Bank Transfer')
                                                     .when(col('payment_type')=='credit_card','Credit Card')
                                                     .when(col('payment_type')=='debit_card','Debit Card')
                                                    .otherwise('other'))

In [75]:
payments_df.show(10)

+--------------------+------------------+-------------+--------------------+-------------+
|            order_id|payment_sequential| payment_type|payment_installments|payment_value|
+--------------------+------------------+-------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1|  Credit Card|                   8|        99.33|
|a9810da82917af2d9...|                 1|  Credit Card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1|  Credit Card|                   1|        65.71|
|ba78997921bbcdc13...|                 1|  Credit Card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1|  Credit Card|                   2|       128.45|
|298fcdf1f73eb413e...|                 1|  Credit Card|                   2|        96.12|
|771ee386b001f0620...|                 1|  Credit Card|                   1|        81.16|
|3d7239c394a212faa...|                 1|  Credit Card|                   3|        51.84|

## Removing duplicate records

In [74]:
customers_df = customers_df.dropDuplicates(['customer_id'])
sellers_df = sellers_df.dropDuplicates(['seller_id'])
products_df_cleaned = products_df_cleaned.dropDuplicates(['product_id'])
orders_df_cleaned = orders_df_cleaned.dropDuplicates(['order_id'])
reviews_df_cleaned = reviews_df_cleaned.dropDuplicates(['review_id'])

## Optional EDA

In [77]:
order_with_details = orders_df_cleaned.join(order_items_df,'order_id','left')\
.join(payments_df,'order_id','left')\
.join(customers_df,'customer_id','left')

In [78]:
order_with_details.show(5)

                                                                                

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+--------------------+-------------------+------+-------------+------------------+------------+--------------------+-------------+--------------------+------------------------+-------------+--------------+
|         customer_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|payment_sequential|payment_type|payment_installments|payment_value|  customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+----------

In [80]:
# order with highest value
order_with_total_value = order_with_details.groupBy('order_id')\
.agg(sum('payment_value').alias('order_total_value'))
order_with_total_value.show(5)

                                                                                

+--------------------+-----------------+
|            order_id|order_total_value|
+--------------------+-----------------+
|118045506e1c1dda0...|           1802.0|
|f44cb69655f8e4d13...|           164.32|
|edcc6b79e8394346b...|           162.63|
|9f98d6530155e3b38...|           316.76|
|949280c70c6d62ec9...|            49.42|
+--------------------+-----------------+
only showing top 5 rows



In [85]:
# 0.01% and 99.9%
quantiles = order_items_df.approxQuantile('price',[0.001,0.999],0.0)
low_cutoff,high_cutoff = quantiles[0],quantiles[1]

In [88]:
print(f"0.01% and 99.9% of the orders fall between the range ({low_cutoff},{high_cutoff})")

0.01% and 99.9% of the orders fall between the range (4.99,2110.0)


In [89]:
# assume the rest to be outliers
order_item_df_wo_outliers = order_items_df.filter((col('price') >=low_cutoff) & (col('price') <=high_cutoff))

In [95]:
# classify into small medium large based on weight

products_df_cleaned = products_df_cleaned.withColumn(
    'product_weight_category',
    when(col('product_weight_g') <500,'Light')
    .when(col('product_weight_g').between(500,2000),'Medium')
    .otherwise('Heavy')
)

In [98]:
products_df_cleaned.show(5)

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-----------------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_weight_category|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-----------------------+
|00066f42aeeb9f300...|           perfumaria|                 53|                       596|                 6|             300|               20|               16|              16|                  Light|
|00088930e925c41fd...|           automotivo|                 56|                       752|                 4|            1225|               55|               10|              26|

In [100]:
sales_per_seller = order_item_df_wo_outliers.groupBy('seller_id').agg(sum("price").alias('total_sales')).orderBy('total_sales', ascending  = False)
sales_per_seller.show(5)

+--------------------+------------------+
|           seller_id|       total_sales|
+--------------------+------------------+
|4869f7a5dfa277a7d...|229472.62999999913|
|53243585a1d6dc264...| 222776.0499999998|
|4a3ca9315b744ce9f...| 200472.9199999981|
|7c67e1448b00f6e96...|187923.89000000118|
|fa1c13f2614d7b5c4...| 187043.1299999997|
+--------------------+------------------+
only showing top 5 rows



## Saving the cleaned files

In [114]:
!hadoop fs -mkdir /data/olist/olist_cleaned

In [115]:
payments_df.write.mode('overwrite').parquet('/data/olist/olist_cleaned/payments_df_processed')
geolocation_df.write.mode('overwrite').parquet('/data/olist/olist_cleaned/geolocation_df_processed')
order_items_df.write.mode('overwrite').parquet('/data/olist/olist_cleaned/order_items_df_processed')
products_df_cleaned.write.mode('overwrite').parquet('/data/olist/olist_cleaned/products_df_processed')
reviews_df_cleaned.write.mode('overwrite').parquet('/data/olist/olist_cleaned/reviews_df_processed')
orders_df_cleaned.write.mode('overwrite').parquet('/data/olist/olist_cleaned/orders_df_processed')
customers_df.write.mode('overwrite').parquet('/data/olist/olist_cleaned/customers_df_processed')
sellers_df.write.mode('overwrite').parquet('/data/olist/olist_cleaned/sellers_df_processed')

                                                                                

In [119]:
orders_df_cleaned.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)

