# đọc dữ liệu

In [1]:
import os
import glob

In [2]:
# original working directiory
# you must change this before running nb
owd = "D:/UTECollege/program/HK7/bdml/project/olist_recommend_sys"

In [3]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("CSV Reader") \
    .getOrCreate()


In [4]:
def read_in_data(df_names, directory = './'):
    '''Read in data into seperate dataframes'''
    
    # change directory to data storage location
    os.chdir(directory)
    
    # list of filenames
    extension = 'csv'
    filenames = [i for i in glob.glob('*.{}'.format(extension))]
    print(filenames)
    # create global dfs within function
    for name, file in zip(df_names, filenames):
        globals()[name] = spark.read \
                            .format("csv") \
                            .option("header", "true") \
                            .option("inferSchema", "true") \
                            .option("multiline", "true") \
                            .load(directory + "/" + file)

In [5]:
df_names = ['customers', 'location', 'orders', 'order_items', 'payments', \
                'reviews', 'products', 'sellers', 'translation']

directory = owd + '/olist_data'

read_in_data(df_names, directory)

os.chdir(owd)

['olist_customers_dataset.csv', 'olist_geolocation_dataset.csv', 'olist_orders_dataset.csv', 'olist_order_items_dataset.csv', 'olist_order_payments_dataset.csv', 'olist_order_reviews_dataset.csv', 'olist_products_dataset.csv', 'olist_sellers_dataset.csv', 'product_category_name_translation.csv']


#### Reviews Dataset
This dataset includes data about the reviews made by the customers.

After a customer purchases the product from Olist Store a seller gets notified to fulfill that order. Once the customer receives the product, or the estimated delivery date is due, the customer gets a satisfaction survey by email where he can give a note for the purchase experience and write down some comments.

In [6]:
reviews.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable = true)



In [7]:
# data in this columns is in Portuguess, so we dont need this
reviews = reviews.drop("review_comment_message","review_comment_title")

In [8]:
# check any missing value
from pyspark.sql.functions import isnan, when, count, col

def check_any_missing_value(df):
    missing_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
    missing_counts.show()

In [9]:
check_any_missing_value(reviews)

+---------+--------+------------+--------------------+-----------------------+
|review_id|order_id|review_score|review_creation_date|review_answer_timestamp|
+---------+--------+------------+--------------------+-----------------------+
|        0|       9|          10|                  44|                     46|
+---------+--------+------------+--------------------+-----------------------+



In [10]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import IntegerType

# convert datatype of time columns
def string_to_timestamp(df, columns):
    
    for col in columns:
        df = df.withColumn(col, to_timestamp(df[col], 'yyyy-MM-dd HH:mm:ss'))

    return df

# convert datatype of time columns
def string_to_int(df, columns):
    
    for col in columns:
        df = df.withColumn(col, df[col].cast(IntegerType()))

    return df


In [11]:

columns_to_timestamp = ['review_creation_date', 'review_answer_timestamp']
reviews = string_to_timestamp(reviews, columns_to_timestamp)

columns_to_int = ['review_score']
reviews = string_to_int(reviews, columns_to_int)
reviews.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_creation_date: timestamp (nullable = true)
 |-- review_answer_timestamp: timestamp (nullable = true)



#### Order Dataset
This is the core dataset. From each order you might find all other information.

In [12]:
orders.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [13]:
from pyspark.sql.functions import col, monotonically_increasing_id

# Drop rows containing NaN values
orders = orders.dropna()

# Drop canceled orders
orders = orders.filter(col("order_status") != "canceled")

# Show the modified DataFrame
orders.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

#### Products Dataset
This dataset includes data about the products sold by Olist.


In [14]:
translation.show()

+---------------------+-----------------------------+
|product_category_name|product_category_name_english|
+---------------------+-----------------------------+
|         beleza_saude|                health_beauty|
| informatica_acess...|         computers_accesso...|
|           automotivo|                         auto|
|      cama_mesa_banho|               bed_bath_table|
|     moveis_decoracao|              furniture_decor|
|        esporte_lazer|               sports_leisure|
|           perfumaria|                    perfumery|
| utilidades_domest...|                   housewares|
|            telefonia|                    telephony|
|   relogios_presentes|                watches_gifts|
|    alimentos_bebidas|                   food_drink|
|                bebes|                         baby|
|            papelaria|                   stationery|
| tablets_impressao...|         tablets_printing_...|
|           brinquedos|                         toys|
|       telefonia_fixa|     

In [15]:
products.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|
|96bd76ec8810374ed...|        esporte_lazer|                 46|                       250|    

In [16]:
from pyspark.sql.functions import col, monotonically_increasing_id
# Drop rows with null values
products = products.dropna()

# Show the modified DataFrame
products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)



In [17]:
# Perform the join operation
joined_df = products.join(translation, on='product_category_name', how='left')

# Select the translated column and drop the original column
translated_df = joined_df.drop("product_category_name").withColumnRenamed(
    'product_category_name_english', 'product_category_name')

# Show the modified DataFrame
translated_df.show()
products = translated_df

+--------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|          product_id|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_category_name|
+--------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|1e9e8ef04dbcff454...|                 40|                       287|                 1|             225|               16|               10|              14|            perfumery|
|3aa071139cb16b67c...|                 44|                       276|                 1|            1000|               30|               18|              20|                  art|
|96bd76ec8810374ed...|                 46|                       250|                 1|       

In [18]:
# products.describe(['product_id','product_category_name']).show()
# products.groupBy("product_category_name").count().show()
products.select('product_category_name').distinct().count()

72

In [19]:
products.describe().show()

+-------+--------------------+-------------------+--------------------------+------------------+-----------------+------------------+------------------+------------------+---------------------+
|summary|          product_id|product_name_lenght|product_description_lenght|product_photos_qty| product_weight_g| product_length_cm| product_height_cm|  product_width_cm|product_category_name|
+-------+--------------------+-------------------+--------------------------+------------------+-----------------+------------------+------------------+------------------+---------------------+
|  count|               32340|              32340|                     32340|             32340|            32340|             32340|             32340|             32340|                32327|
|   mean|                NULL|  48.47659245516388|         771.4923933209648|2.1889610389610388|2276.956586270872|30.854545454545455|16.958812615955473|23.208596165739024|                 NULL|
| stddev|                NULL|

#### Sellers Dataset
This dataset includes data about the sellers that fulfilled orders made at Olist. Use it to find the seller location and to identify which seller fulfilled each product.

In [20]:
# sellers.show(10)
sellers.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



#### Customers Dataset
This dataset has information about the customer and its location. Use it to identify unique customers in the orders dataset and to find the orders delivery location.

At our system each order is assigned to a unique customerid. This means that the same customer will get different ids for different orders. The purpose of having a customerunique_id on the dataset is to allow you to identify customers that made repurchases at the store. Otherwise you would find that each order had a different customer associated with.

In [21]:
# customers.show(10)
customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



#### Order Items Dataset
This dataset includes data about the items purchased within each order.

In [22]:
# order_items.show(10)
order_items.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



In [23]:
order_items.describe().show()

+-------+--------------------+------------------+--------------------+--------------------+------------------+------------------+
|summary|            order_id|     order_item_id|          product_id|           seller_id|             price|     freight_value|
+-------+--------------------+------------------+--------------------+--------------------+------------------+------------------+
|  count|              112650|            112650|              112650|              112650|            112650|            112650|
|   mean|                NULL|1.1978339991122948|                NULL|                NULL|120.65373901477311| 19.99031992898562|
| stddev|                NULL|0.7051240313951734|                NULL|                NULL| 183.6339280502597|15.806405412296998|
|    min|00010242fe8c5a6d1...|                 1|00066f42aeeb9f300...|0015a82c2db000af6...|              0.85|               0.0|
|    max|fffe41c64501cc87c...|                21|fffe9eeff12fcbd74...|ffff564a4f9085cd2...

#### Geolocation Dataset
This dataset has information Brazilian zip codes and its lat/lng coordinates. Use it to plot maps and find distances between sellers and customers.

In [24]:
# location.show(10)
location.printSchema()

root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)



In [25]:
# Drop duplicates based on the 'geolocation_zip_code_prefix' column
location = location.dropDuplicates(subset=['geolocation_zip_code_prefix'])

# Show the modified DataFrame
# location.show()

In [26]:
location.describe().show()

+-------+---------------------------+-------------------+------------------+----------------+-----------------+
|summary|geolocation_zip_code_prefix|    geolocation_lat|   geolocation_lng|geolocation_city|geolocation_state|
+-------+---------------------------+-------------------+------------------+----------------+-----------------+
|  count|                      19015|              19015|             19015|           19015|            19015|
|   mean|          42711.59190113068|-19.062087369373607|-46.05800843240691|            NULL|             NULL|
| stddev|         30905.051745446453| 7.3194019370710715| 5.380750749400437|            NULL|             NULL|
|    min|                       1001|  -36.6053744107061|-72.92729629685648| abadia de goias|               AC|
|    max|                      99990|  42.18400274298598|121.10539381057764|            óleo|               TO|
+-------+---------------------------+-------------------+------------------+----------------+-----------

#### Payments Dataset
This dataset includes data about the orders payment options.

In [27]:
# payments.show(10)
payments.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [28]:
payments.describe().show()

+-------+--------------------+------------------+------------+--------------------+------------------+
|summary|            order_id|payment_sequential|payment_type|payment_installments|     payment_value|
+-------+--------------------+------------------+------------+--------------------+------------------+
|  count|              103886|            103886|      103886|              103886|            103886|
|   mean|                NULL|1.0926785129853878|        NULL|   2.853348863176944|154.10038041698365|
| stddev|                NULL|0.7065837791949948|        NULL|   2.687050673856486|217.49406386472384|
|    min|00010242fe8c5a6d1...|                 1|      boleto|                   0|               0.0|
|    max|fffe41c64501cc87c...|                29|     voucher|                  24|          13664.08|
+-------+--------------------+------------------+------------+--------------------+------------------+



# combine dataset

## write data to csv

In [29]:
def write_df_as_csv_file(df, path,csvsavename, mode="overwrite", header=True, inferschema = True ):
    import shutil

    df = df.coalesce(1)  # join partitions to produce 1 csv file

    header = "true" if header else "false"
    inferschema = "true" if inferschema else "false"

    dfw = df.write.format("csv")\
                    .option("header", header)\
                    .mode(mode)\
                    .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")\
                    .option("inferSchema", inferschema)\
                    .option("multipleLine", "false")
    dfw.save(path + csvsavename)


    csv_filenames = [filename for filename in os.listdir(path + csvsavename) if filename.endswith(".csv")]

    os.chdir(owd)
    source_path = path + csvsavename + "/" + csv_filenames[0]
    destination_path = path +"/"+ csvsavename

    shutil.copyfile(source_path, destination_path)

    shutil.rmtree(path + csvsavename)


## master df for spark model

In [30]:
master_df = orders.join(order_items, on='order_id') \
        .join(reviews, on='order_id') \
        .join(customers, on='customer_id') \
        .join(products, on='product_id') 


In [31]:
master_df.show()

+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+-------------------+-----+-------------+--------------------+------------+--------------------+-----------------------+--------------------+------------------------+--------------------+--------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|          product_id|         customer_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|           seller_id|shipping_limit_date|price|freight_value|           review_id|review_score|review_creation_date|review_answer_timestamp|  customer_unique_id|customer

In [32]:
from pyspark.sql.functions import col

# Assuming your DataFrame is named 'df'
timestamp_columns = [col_name for col_name, col_type in master_df.dtypes if col_type == 'timestamp']

# Drop the timestamp columns from the DataFrame
master_df = master_df.drop(*timestamp_columns)

master_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)
 |-- product_

## customers df

In [33]:
customer_df = customers.join(orders, on='customer_id') \
        .join(reviews, on='order_id') \
        .join(order_items, on='order_id') \
        .join(products, on='product_id')

In [34]:
# drop order delivery details, this will go into the seller_info df
# keep 'order_id', 'customer_id', 'order_purchase_timestamp', 'order_delivered_customer_date', 'order_estimated_delivery_date'
columns_to_exclude_order = ['order_status', 'order_approved_at', 'order_delivered_carrier_date']

customer_df = customer_df.drop(*columns_to_exclude_order)


In [35]:
customer_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_creation_date: timestamp (nullable = true)
 |-- review_answer_timestamp: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- produc

## sellers df

In [36]:
seller_df = order_items.join(sellers, on='seller_id') \
        .join(products, on='product_id') \
        .join(reviews, on='order_id') \
        

In [37]:
# Get common orders between the two DataFrames
common_orders = seller_df.select("order_id").intersect(customer_df.select("order_id"))

# Filter the DataFrames for common orders
seller_df = seller_df.join(common_orders, on="order_id", how="inner")
customer_df = customer_df.join(common_orders, on="order_id", how="inner")

In [38]:
customer_df = customer_df.dropna()
seller_df = seller_df.dropna()

# check_any_missing_value(customer_df)
# check_any_missing_value(seller_df)

In [39]:
savepath = owd + "/clean_data"

write_df_as_csv_file(master_df, savepath, "spark_master.csv")
# write_df_as_csv_file(customer_df, savepath, "customers.csv")
# write_df_as_csv_file(seller_df, savepath, "sellers.csv")