In [3]:
spark

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('OlistData')\
.getOrCreate()

25/04/16 02:07:17 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [5]:
customer_df = spark.read.csv('/data/olist/olist_customers_dataset.csv',header=True,inferSchema=True)
geolocation_df = spark.read.csv('/data/olist/olist_geolocation_dataset.csv',header=True,inferSchema=True)
order_item_df = spark.read.csv('/data/olist/olist_order_items_dataset.csv',header=True,inferSchema=True)
payment_df = spark.read.csv('/data/olist/olist_order_payments_dataset.csv',header=True,inferSchema=True)
review_df = spark.read.csv('/data/olist/olist_order_reviews_dataset.csv',header=True,inferSchema=True)
order_df = spark.read.csv('/data/olist/olist_orders_dataset.csv',header=True,inferSchema=True)
product_df = spark.read.csv('/data/olist/olist_products_dataset.csv',header=True,inferSchema=True)
seller_df = spark.read.csv('/data/olist/olist_sellers_dataset.csv',header=True,inferSchema=True)
category_translation_df = spark.read.csv('/data/olist/product_category_name_translation.csv',header=True,inferSchema=True)

                                                                                

In [6]:
from pyspark.sql.functions import *

In [7]:
def missing_values(df,df_name):
    print(f'Missing value in {df_name}:')
    df.select([count(when(col(c).isNull(),1)).alias(c) for c in df.columns]).show()

In [8]:
missing_values(customer_df , 'customer')

Missing value in customer:




+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



                                                                                

In [9]:
missing_values(order_df,'order')

Missing value in order:
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|              160|                        1783|                         2965|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



In [10]:
missing_values(order_item_df,'item')

Missing value in item:
+--------+-------------+----------+---------+-------------------+-----+-------------+
|order_id|order_item_id|product_id|seller_id|shipping_limit_date|price|freight_value|
+--------+-------------+----------+---------+-------------------+-----+-------------+
|       0|            0|         0|        0|                  0|    0|            0|
+--------+-------------+----------+---------+-------------------+-----+-------------+



In [11]:
order_df_cleaned = order_df.na.drop(subset = ['order_id','customer_id','order_status'])

In [12]:
order_df_cleaned.show(1)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
only showing top 1 row



In [13]:
order_df_cleaned = order_df.fillna({'order_delivered_customer_date' : '9999-12-31'})

In [14]:
order_df_cleaned.show(1)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
only showing top 1 row



In [15]:
from pyspark.ml.feature import Imputer

In [16]:
missing_values(payment_df,'payment')

Missing value in payment:
+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
|       0|                 0|           0|                   0|            0|
+--------+------------------+------------+--------------------+-------------+



In [17]:
# if there any missing value in payment value then this imputer can be used
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=['payment_value'],outputCols = ['payment_value_inputed']).setStrategy('median')
payment_df_cleaned = imputer.fit(payment_df).transform(payment_df)

                                                                                

In [19]:
payment_df_cleaned.show(1)

+--------------------+------------------+------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|payment_value_inputed|
+--------------------+------------------+------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|                99.33|
+--------------------+------------------+------------+--------------------+-------------+---------------------+
only showing top 1 row



In [20]:
# standardizing the format

In [21]:
def print_schema(df,df_name):
    print(f'schema of {df_name}:')
    df.printSchema()

In [22]:
print_schema(order_df,'order')

schema of order:
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [24]:
order_df_cleaned = order_df_cleaned.withColumn('order_purchase_timestamp',to_date(col('order_purchase_timestamp')))

In [25]:
order_df_cleaned.show(1)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|              2017-10-02|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
only showing top 1 row



In [26]:
payment_df_cleaned = payment_df_cleaned.withColumn('payment_type',when(col('payment_type')=='boleto','Bank Trnasfer')
                                                   .when(col('payment_type') == 'credit_card','Credit Card')
                                                   .when(col('payment_type') == 'debit_card' , 'Debit Card')
                                                   .otherwise('other'))

In [27]:
payment_df_cleaned.show()

+--------------------+------------------+-------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential| payment_type|payment_installments|payment_value|payment_value_inputed|
+--------------------+------------------+-------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1|  Credit Card|                   8|        99.33|                99.33|
|a9810da82917af2d9...|                 1|  Credit Card|                   1|        24.39|                24.39|
|25e8ea4e93396b6fa...|                 1|  Credit Card|                   1|        65.71|                65.71|
|ba78997921bbcdc13...|                 1|  Credit Card|                   8|       107.78|               107.78|
|42fdf880ba16b47b5...|                 1|  Credit Card|                   2|       128.45|               128.45|
|298fcdf1f73eb413e...|                 1|  Credit Card|                   2|        96.12|      

In [29]:
customer_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [31]:
customer_df_cleaned = customer_df.withColumn('customer_zip_code_prefix',col('customer_zip_code_prefix').cast('string'))

In [33]:
customer_df_cleaned.printSchema(1)

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [37]:
order_df_cleaned.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: date (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [38]:
order_item_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



REMOVE DUPLICATE RECORDS

In [36]:
customer_df_cleaned = customer_df_cleaned.dropDuplicates(['customer_id'])

In [40]:
order_with_detail = order_df_cleaned.join(order_item_df,'order_id','left')\
.join(payment_df_cleaned,'order_id','left')\
.join(customer_df_cleaned,'customer_id','left')

In [41]:
order_with_detail.show(1)

                                                                                

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+------------------+------------+--------------------+-------------+---------------------+--------------------+------------------------+-------------+--------------+
|         customer_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|payment_sequential|payment_type|payment_installments|payment_value|payment_value_inputed|  customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+--------------------+--------------------+------------+------------------------+-----------------

In [45]:
order_with_detail.printSchema

<bound method DataFrame.printSchema of DataFrame[customer_id: string, order_id: string, order_status: string, order_purchase_timestamp: date, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, product_id: string, seller_id: string, shipping_limit_date: timestamp, price: double, freight_value: double, payment_sequential: int, payment_type: string, payment_installments: int, payment_value: double, payment_value_inputed: double, customer_unique_id: string, customer_zip_code_prefix: string, customer_city: string, customer_state: string]>

In [46]:
order_with_total_value = order_with_detail.groupBy('order_id').agg(sum('payment_value').alias('total_order_value'))

In [51]:
order_with_total_value.show(5)

+--------------------+-----------------+
|            order_id|total_order_value|
+--------------------+-----------------+
|118045506e1c1dda0...|           1802.0|
|f44cb69655f8e4d13...|           164.32|
|edcc6b79e8394346b...|           162.63|
|9f98d6530155e3b38...|           316.76|
|949280c70c6d62ec9...|            49.42|
+--------------------+-----------------+
only showing top 5 rows



advance transformation

In [52]:
order_item_df.show()

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51| 199.9|        18.14|
|00048cc3ae777c65d...|            1|ef92

In [53]:
quantiles = order_item_df.approxQuantile('price',[0.01,0.99],0.0)

In [54]:
low_cutoff,high_cutoff = quantiles[0],quantiles[1]

In [56]:
low_cutoff,high_cutoff

(9.99, 890.0)

In [55]:
order_item_df.select('price').summary().show()



+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|            112650|
|   mean|120.65373901471354|
| stddev|183.63392805026012|
|    min|              0.85|
|    25%|              39.9|
|    50%|             74.99|
|    75%|             134.9|
|    max|            6735.0|
+-------+------------------+



                                                                                

In [58]:
order_item_df_cleaned = order_item_df.filter((col('price')>= low_cutoff) & (col('price') <= high_cutoff)) 

In [60]:
order_item_df_cleaned.show(1)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
only showing top 1 row



In [62]:
order_item_df_cleaned = order_item_df.filter((col('price') >= low_cutoff) & (col('price') <= high_cutoff))

In [64]:
payment_df_cleaned.select('payment_installments').summary().show()



+-------+--------------------+
|summary|payment_installments|
+-------+--------------------+
|  count|              103886|
|   mean|   2.853348863176944|
| stddev|  2.6870506738564925|
|    min|                   0|
|    25%|                   1|
|    50%|                   1|
|    75%|                   4|
|    max|                  24|
+-------+--------------------+



                                                                                

In [65]:
product_df.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|
|96bd76ec8810374ed...|        esporte_lazer|                 46|                       250|    

In [66]:
product_df_cleaned = product_df.withColumn(
    'product_size_category',
    when(col('product_weight_g') < 500,'small')
    .when(col('product_weight_g').between(500,2000),'medium')
    .otherwise('large'))

In [67]:
product_df_cleaned.show(5)

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_size_category|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|                small|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|        

In [72]:
!hadoop fs -mkdir /data/olist_proc

mkdir: `/data/olist_proc': File exists


In [74]:
order_with_detail.write.mode('overwrite').parquet('/data/olist_proc/clean_data.parquent')

                                                                                

In [75]:
!hadoop fs -ls /data/olist_proc/

Found 1 items
drwxr-xr-x   - root hadoop          0 2025-04-16 07:54 /data/olist_proc/clean_data.parquent
