# đọc dữ liệu

In [1]:
import os
import glob

In [2]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("CSV Reader") \
    .getOrCreate()


In [3]:
def read_in_data(df_names, directory = './'):
    '''Read in data into seperate dataframes'''
    
    # change directory to data storage location
    os.chdir(directory)
    
    # list of filenames
    extension = 'csv'
    filenames = [i for i in glob.glob('*.{}'.format(extension))]
    print(filenames)
    # create global dfs within function
    for name, file in zip(df_names, filenames):
        globals()[name] = spark.read \
                            .format("csv") \
                            .option("header", "true") \
                            .option("inferSchema", "true") \
                            .option("multiline", "true") \
                            .load(directory + "/" + file)

In [4]:
df_names = ['customers', 'location', 'orders', 'order_items', 'payments', \
                'reviews', 'products', 'sellers', 'translation']

directory = 'D:/UTECollege/program/HK7/bdml/project/olist_recommend_sys/olist_data'

read_in_data(df_names, directory)

['olist_customers_dataset.csv', 'olist_geolocation_dataset.csv', 'olist_orders_dataset.csv', 'olist_order_items_dataset.csv', 'olist_order_payments_dataset.csv', 'olist_order_reviews_dataset.csv', 'olist_products_dataset.csv', 'olist_sellers_dataset.csv', 'product_category_name_translation.csv']


In [5]:
reviews.show()

+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|           review_id|            order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|7bc2406110b926393...|73fc7af87114b3971...|           4|                NULL|                  NULL| 2018-01-18 00:00:00|    2018-01-18 21:46:59|
|80e641a11e56f04c1...|a548910a1c6147796...|           5|                NULL|                  NULL| 2018-03-10 00:00:00|    2018-03-11 03:05:13|
|228ce5500dc1d8e02...|f9e4b658b201a9f2e...|           5|                NULL|                  NULL| 2018-02-17 00:00:00|    2018-02-18 14:36:24|
|e64fb393e7b32834b...|658677c97b385a9be...|           5|                NULL|  Recebi bem antes ...| 2017-04-21 00:00:00|   

In [6]:
reviews.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable = true)



#### Order Reviews Dataset
This dataset includes data about the reviews made by the customers.

After a customer purchases the product from Olist Store a seller gets notified to fulfill that order. Once the customer receives the product, or the estimated delivery date is due, the customer gets a satisfaction survey by email where he can give a note for the purchase experience and write down some comments.

In [7]:
def replace_na_with_text(df, column, new_text):
    '''replace missing text with preferred description'''
    
    # change missing data to appropriate label
    df = df.fillna({column: new_text})
    # return df because in function scope, it cant affect the original df
    return df


In [8]:
reviews = replace_na_with_text(df=reviews, column='review_comment_message', new_text='no comment given')
reviews = replace_na_with_text(df=reviews, column='review_comment_title', new_text='no title')
# reviews.show()

In [9]:
# check any missing value
from pyspark.sql.functions import isnan, when, count, col

reviews.select([count(when(isnan(c), c)).alias(c) for c in reviews.columns]).show()

+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+
|review_id|order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+
|        0|       0|           0|                   0|                     0|                   0|                      0|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+



In [10]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import IntegerType

def string_to_timestamp(df, columns):
    
    for col in columns:
        df = df.withColumn(col, to_timestamp(df[col], 'yyyy-MM-dd HH:mm:ss'))

    return df


In [11]:
columns = ['review_creation_date', 'review_answer_timestamp']
reviews = string_to_timestamp(reviews, columns)
reviews.show()

+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|           review_id|            order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|7bc2406110b926393...|73fc7af87114b3971...|           4|            no title|      no comment given| 2018-01-18 00:00:00|    2018-01-18 21:46:59|
|80e641a11e56f04c1...|a548910a1c6147796...|           5|            no title|      no comment given| 2018-03-10 00:00:00|    2018-03-11 03:05:13|
|228ce5500dc1d8e02...|f9e4b658b201a9f2e...|           5|            no title|      no comment given| 2018-02-17 00:00:00|    2018-02-18 14:36:24|
|e64fb393e7b32834b...|658677c97b385a9be...|           5|            no title|  Recebi bem antes ...| 2017-04-21 00:00:00|   

In [12]:
# from pyspark.sql.functions import to_timestamp
# from pyspark.sql.types import IntegerType

# df = df.withColumn('review_score', df['review_score'].cast(IntegerType()))

# # Assuming you have a DataFrame called df with a column called 'date_str' containing the string dates
# df = df.withColumn('review_creation_date', to_timestamp(df.review_creation_date, 'yyyy-MM-dd HH:mm:ss'))
# df = df.withColumn('review_answer_timestamp', to_timestamp(df.review_answer_timestamp, 'yyyy-MM-dd HH:mm:ss'))

# df.show(1)
# df.printSchema()

#### Order Dataset
This is the core dataset. From each order you might find all other information.

In [13]:
from pyspark.sql.functions import col, monotonically_increasing_id

# Drop rows containing NaN values
orders = orders.dropna()

# Drop canceled orders
orders = orders.filter(col("order_status") != "canceled")

# Reset the DataFrame index
orders = orders.withColumn("index", monotonically_increasing_id())
orders = orders.drop("index")

# Show the modified DataFrame
orders.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [14]:
orders.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



#### Products Dataset
This dataset includes data about the products sold by Olist.


In [15]:
translation.show()

+---------------------+-----------------------------+
|product_category_name|product_category_name_english|
+---------------------+-----------------------------+
|         beleza_saude|                health_beauty|
| informatica_acess...|         computers_accesso...|
|           automotivo|                         auto|
|      cama_mesa_banho|               bed_bath_table|
|     moveis_decoracao|              furniture_decor|
|        esporte_lazer|               sports_leisure|
|           perfumaria|                    perfumery|
| utilidades_domest...|                   housewares|
|            telefonia|                    telephony|
|   relogios_presentes|                watches_gifts|
|    alimentos_bebidas|                   food_drink|
|                bebes|                         baby|
|            papelaria|                   stationery|
| tablets_impressao...|         tablets_printing_...|
|           brinquedos|                         toys|
|       telefonia_fixa|     

In [16]:
products.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|
|96bd76ec8810374ed...|        esporte_lazer|                 46|                       250|    

In [17]:
from pyspark.sql.functions import col, monotonically_increasing_id
# Drop rows with null values
products = products.dropna()

# Reset the DataFrame index
products = products.withColumn("index", monotonically_increasing_id())
products = products.drop("index")

# Show the modified DataFrame
products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)



In [18]:
# Perform the join operation
joined_df = products.join(translation, on='product_category_name', how='left')

# Select the translated column and drop the original column

translated_df = joined_df.drop("product_category_name").withColumnRenamed(
    'product_category_name_english', 'product_category_name')

# Show the modified DataFrame
# translated_df.show()
translated_df.show()
products = translated_df

+--------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|          product_id|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_category_name|
+--------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|1e9e8ef04dbcff454...|                 40|                       287|                 1|             225|               16|               10|              14|            perfumery|
|3aa071139cb16b67c...|                 44|                       276|                 1|            1000|               30|               18|              20|                  art|
|96bd76ec8810374ed...|                 46|                       250|                 1|       

In [19]:
# products.describe(['product_id','product_category_name']).show()
# products.groupBy("product_category_name").count().show()
products.select('product_category_name').distinct().count()


72

#### Sellers Dataset
This dataset includes data about the sellers that fulfilled orders made at Olist. Use it to find the seller location and to identify which seller fulfilled each product.

In [20]:
sellers.show(10)
sellers.printSchema()

+--------------------+----------------------+-----------------+------------+
|           seller_id|seller_zip_code_prefix|      seller_city|seller_state|
+--------------------+----------------------+-----------------+------------+
|3442f8959a84dea7e...|                 13023|         campinas|          SP|
|d1b65fc7debc3361e...|                 13844|       mogi guacu|          SP|
|ce3ad9de960102d06...|                 20031|   rio de janeiro|          RJ|
|c0f3eea2e14555b6f...|                  4195|        sao paulo|          SP|
|51a04a8a6bdcb23de...|                 12914|braganca paulista|          SP|
|c240c4061717ac180...|                 20920|   rio de janeiro|          RJ|
|e49c26c3edfa46d22...|                 55325|           brejao|          PE|
|1b938a7ec6ac5061a...|                 16304|        penapolis|          SP|
|768a86e36ad6aae3d...|                  1529|        sao paulo|          SP|
|ccc4bbb5f32a6ab2b...|                 80310|         curitiba|          PR|

In [21]:
sellers.describe().show()

+-------+--------------------+----------------------+-----------+------------+
|summary|           seller_id|seller_zip_code_prefix|seller_city|seller_state|
+-------+--------------------+----------------------+-----------+------------+
|  count|                3095|                  3095|       3095|        3095|
|   mean|                NULL|    32291.059450726978|  4482255.0|        NULL|
| stddev|                NULL|     32713.45382950901|       NULL|        NULL|
|    min|0015a82c2db000af6...|                  1001|   04482255|          AC|
|    max|ffff564a4f9085cd2...|                 99730|      xaxim|          SP|
+-------+--------------------+----------------------+-----------+------------+



#### Customers Dataset
This dataset has information about the customer and its location. Use it to identify unique customers in the orders dataset and to find the orders delivery location.

At our system each order is assigned to a unique customerid. This means that the same customer will get different ids for different orders. The purpose of having a customerunique_id on the dataset is to allow you to identify customers that made repurchases at the store. Otherwise you would find that each order had a different customer associated with.

In [22]:
customers.show(10)
customers.printSchema()

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

In [23]:
customers.describe().show()

+-------+--------------------+--------------------+------------------------+-------------------+--------------+
|summary|         customer_id|  customer_unique_id|customer_zip_code_prefix|      customer_city|customer_state|
+-------+--------------------+--------------------+------------------------+-------------------+--------------+
|  count|               99441|               99441|                   99441|              99441|         99441|
|   mean|                NULL|                NULL|       35137.47458291851|               NULL|          NULL|
| stddev|                NULL|                NULL|       29797.93899620612|               NULL|          NULL|
|    min|00012a2ce6f8dcda2...|0000366f3b9a7992b...|                    1003|abadia dos dourados|            AC|
|    max|ffffe8b65bbe3087b...|ffffd2657e2aad290...|                   99990|             zortea|            TO|
+-------+--------------------+--------------------+------------------------+-------------------+--------

#### Order Items Dataset
This dataset includes data about the items purchased within each order.

In [24]:
order_items.show(10)
order_items.printSchema()

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51| 199.9|        18.14|
|00048cc3ae777c65d...|            1|ef92

In [25]:
order_items.describe().show()

+-------+--------------------+------------------+--------------------+--------------------+------------------+------------------+
|summary|            order_id|     order_item_id|          product_id|           seller_id|             price|     freight_value|
+-------+--------------------+------------------+--------------------+--------------------+------------------+------------------+
|  count|              112650|            112650|              112650|              112650|            112650|            112650|
|   mean|                NULL|1.1978339991122948|                NULL|                NULL|120.65373901477311| 19.99031992898562|
| stddev|                NULL|0.7051240313951734|                NULL|                NULL| 183.6339280502597|15.806405412296998|
|    min|00010242fe8c5a6d1...|                 1|00066f42aeeb9f300...|0015a82c2db000af6...|              0.85|               0.0|
|    max|fffe41c64501cc87c...|                21|fffe9eeff12fcbd74...|ffff564a4f9085cd2...

#### Geolocation Dataset
This dataset has information Brazilian zip codes and its lat/lng coordinates. Use it to plot maps and find distances between sellers and customers.

In [26]:
location.show(10)
location.printSchema()

+---------------------------+-------------------+------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|   geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+------------------+----------------+-----------------+
|                       1037| -23.54562128115268|-46.63929204800168|       sao paulo|               SP|
|                       1046|-23.546081127035535|-46.64482029837157|       sao paulo|               SP|
|                       1046| -23.54612896641469|-46.64295148361138|       sao paulo|               SP|
|                       1041|  -23.5443921648681|-46.63949930627844|       sao paulo|               SP|
|                       1035|-23.541577961711493|-46.64160722329613|       sao paulo|               SP|
|                       1012|-23.547762303364266|-46.63536053788448|       são paulo|               SP|
|                       1047|-23.546273112412678|-46.64122516971

In [27]:
location.describe().show()

+-------+---------------------------+-------------------+-------------------+----------------+-----------------+
|summary|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+-------+---------------------------+-------------------+-------------------+----------------+-----------------+
|  count|                    1000163|            1000163|            1000163|         1000163|          1000163|
|   mean|          36574.16646586607|-21.176152910383102|-46.390541320935995|            NULL|             NULL|
| stddev|          30549.33571031949|  5.715866308823084|  4.269748306619793|            NULL|             NULL|
|    min|                       1001|  -36.6053744107061|-101.46676644931476|        * cidade|               AC|
|    max|                      99990|  45.06593318269697| 121.10539381057764|            óleo|               TO|
+-------+---------------------------+-------------------+-------------------+----------------+--

In [28]:
# Drop duplicates based on the 'geolocation_zip_code_prefix' column
location = location.dropDuplicates(subset=['geolocation_zip_code_prefix'])

# Show the modified DataFrame
location.show()

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                       1001| -23.54929199999999|-46.633559478233785|       sao paulo|               SP|
|                       1002| -23.54831797807146| -46.63542110199666|       sao paulo|               SP|
|                       1003| -23.54903244546711| -46.63531311226845|       sao paulo|               SP|
|                       1004|-23.550115903139222| -46.63512161420169|       sao paulo|               SP|
|                       1005|-23.549819091869107| -46.63560588995324|       sao paulo|               SP|
|                       1006| -23.55052430835593| -46.63669363835193|       sao paulo|               SP|
|                       1007|-23.550392524842728|-46.63

#### Payments Dataset
This dataset includes data about the orders payment options.

In [29]:
payments.show(10)
payments.printSchema()

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|        96.12|
|771ee386b001f0620...|                 1| credit_card|                   1|        81.16|
|3d7239c394a212faa...|                 1| credit_card|                   3|        51.84|
|1f78449c8

In [30]:
payments.describe().show()

+-------+--------------------+------------------+------------+--------------------+------------------+
|summary|            order_id|payment_sequential|payment_type|payment_installments|     payment_value|
+-------+--------------------+------------------+------------+--------------------+------------------+
|  count|              103886|            103886|      103886|              103886|            103886|
|   mean|                NULL|1.0926785129853878|        NULL|   2.853348863176944|154.10038041698365|
| stddev|                NULL|0.7065837791949948|        NULL|   2.687050673856486|217.49406386472384|
|    min|00010242fe8c5a6d1...|                 1|      boleto|                   0|               0.0|
|    max|fffe41c64501cc87c...|                29|     voucher|                  24|          13664.08|
+-------+--------------------+------------------+------------+--------------------+------------------+



In [43]:
master_df = orders.join(order_items, on='order_id') \
        .join(reviews, on='order_id') \
        .join(customers, on='customer_id') \
        .join(products, on='product_id') \



In [47]:
master_df.repartition(1).write.csv("D:/UTECollege/program/HK7/bdml/project/olist_recommend_sys/clean_data/master.csv")

Py4JJavaError: An error occurred while calling o270.csv.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:374)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:374)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:850)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)


In [44]:
master_df.show()

+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+-------------------+-----+-------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+--------------------+------------------------+--------------------+--------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|          product_id|         customer_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|           seller_id|shipping_limit_date|price|freight_value|           review_id|review_score|review_comment_title|review_co