In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [3]:
from pyspark.sql import functions as F

# Reading raw csv files from hdfs

In [4]:
dir_path = 'hdfs:///user/talentum/olist_dataset/'
customers_df = spark.read.csv(dir_path+'olist_customers_dataset.csv',inferSchema=True,header=True)
order_items_df = spark.read.csv(dir_path+'olist_order_items_dataset.csv',inferSchema=True,header=True)
order_payments_df = spark.read.csv(dir_path+'olist_order_payments_dataset.csv',inferSchema=True,header=True)
order_reviews = spark.read.csv(dir_path+'olist_order_reviews_dataset.csv',inferSchema=True,header=True)
orders_df = spark.read.csv(dir_path+'olist_orders_dataset.csv',inferSchema=True,header=True)
products_df = spark.read.csv(dir_path+'olist_products_dataset.csv',inferSchema=True,header=True)
sellers_df = spark.read.csv(dir_path+'olist_sellers_dataset.csv',inferSchema=True,header=True)
product_category_name_translation_df = spark.read.csv(dir_path+'product_category_name_translation.csv',inferSchema=True,header=True)

# Data Exploration and Wrangling

In [5]:
# defining method for counting nulls in each column
def count_nulls(df):
    null_counts = []
    for col in df.columns:
        null_counts.append((col, df.filter(df[col].isNull()).count()))
    return spark.createDataFrame(null_counts,["column","null_count"])

In [6]:
#checking nulls in customers_df 
count_nulls(customers_df).show()

+--------------------+----------+
|              column|null_count|
+--------------------+----------+
|         customer_id|         0|
|  customer_unique_id|         0|
|customer_zip_code...|         0|
|       customer_city|         0|
|      customer_state|         0|
+--------------------+----------+



In [7]:
customers_df.dtypes

[('customer_id', 'string'),
 ('customer_unique_id', 'string'),
 ('customer_zip_code_prefix', 'int'),
 ('customer_city', 'string'),
 ('customer_state', 'string')]

In [8]:
orders_df.dtypes

[('order_id', 'string'),
 ('customer_id', 'string'),
 ('order_status', 'string'),
 ('order_purchase_timestamp', 'timestamp'),
 ('order_approved_at', 'timestamp'),
 ('order_delivered_carrier_date', 'timestamp'),
 ('order_delivered_customer_date', 'timestamp'),
 ('order_estimated_delivery_date', 'timestamp')]

In [9]:
#checking nulls in orders_df 
count_nulls(orders_df).show()

+--------------------+----------+
|              column|null_count|
+--------------------+----------+
|            order_id|         0|
|         customer_id|         0|
|        order_status|         0|
|order_purchase_ti...|         0|
|   order_approved_at|       160|
|order_delivered_c...|      1783|
|order_delivered_c...|      2965|
|order_estimated_d...|         0|
+--------------------+----------+



In [10]:
#Filtering out records where order is not delivered
orders_df=orders_df[orders_df['order_status']=='delivered']

In [11]:
count_nulls(orders_df).show()

+--------------------+----------+
|              column|null_count|
+--------------------+----------+
|            order_id|         0|
|         customer_id|         0|
|        order_status|         0|
|order_purchase_ti...|         0|
|   order_approved_at|        14|
|order_delivered_c...|         2|
|order_delivered_c...|         8|
|order_estimated_d...|         0|
+--------------------+----------+



In [12]:
#filling null values of order_approved_at column with order_purchase_timestamp
orders_df = orders_df.withColumn('order_delivered_customer_date',
                                 F.coalesce(F.col('order_delivered_customer_date'),
                                            F.col('order_estimated_delivery_date')))

In [13]:
count_nulls(orders_df).show()

+--------------------+----------+
|              column|null_count|
+--------------------+----------+
|            order_id|         0|
|         customer_id|         0|
|        order_status|         0|
|order_purchase_ti...|         0|
|   order_approved_at|        14|
|order_delivered_c...|         2|
|order_delivered_c...|         0|
|order_estimated_d...|         0|
+--------------------+----------+



In [14]:
#droping order_delivered_carrier_date column as it is not relevant also its missing values couldn't be replaced 
orders_df = orders_df.drop('order_delivered_carrier_date')
orders_df = orders_df.drop('order_approved_at')

In [15]:
count_nulls(orders_df).show()

+--------------------+----------+
|              column|null_count|
+--------------------+----------+
|            order_id|         0|
|         customer_id|         0|
|        order_status|         0|
|order_purchase_ti...|         0|
|order_delivered_c...|         0|
|order_estimated_d...|         0|
+--------------------+----------+



In [16]:
count_nulls(products_df).show()

+--------------------+----------+
|              column|null_count|
+--------------------+----------+
|          product_id|         0|
|product_category_...|       610|
| product_name_lenght|       610|
|product_descripti...|       610|
|  product_photos_qty|       610|
|    product_weight_g|         2|
|   product_length_cm|         2|
|   product_height_cm|         2|
|    product_width_cm|         2|
+--------------------+----------+



In [17]:
products_df.dtypes

[('product_id', 'string'),
 ('product_category_name', 'string'),
 ('product_name_lenght', 'int'),
 ('product_description_lenght', 'int'),
 ('product_photos_qty', 'int'),
 ('product_weight_g', 'int'),
 ('product_length_cm', 'int'),
 ('product_height_cm', 'int'),
 ('product_width_cm', 'int')]

In [18]:
#Filling "unknown" as category for records where product category is not specified
products_df = products_df.fillna("unknown",subset='product_category_name')

In [19]:
count_nulls(products_df).show()

+--------------------+----------+
|              column|null_count|
+--------------------+----------+
|          product_id|         0|
|product_category_...|         0|
| product_name_lenght|       610|
|product_descripti...|       610|
|  product_photos_qty|       610|
|    product_weight_g|         2|
|   product_length_cm|         2|
|   product_height_cm|         2|
|    product_width_cm|         2|
+--------------------+----------+



In [20]:
# Dropping irrelevant columns
products_df = products_df.drop('product_name_lenght')
products_df = products_df.drop('product_description_lenght')
products_df = products_df.drop('product_photos_qty')
products_df = products_df.drop('product_weight_g')
products_df = products_df.drop('product_length_cm')
products_df = products_df.drop('product_height_cm')
products_df = products_df.drop('product_width_cm')

In [21]:
products_df.columns

['product_id', 'product_category_name']

In [22]:
count_nulls(products_df).show()

+--------------------+----------+
|              column|null_count|
+--------------------+----------+
|          product_id|         0|
|product_category_...|         0|
+--------------------+----------+



In [23]:
order_payments_df.dtypes

[('order_id', 'string'),
 ('payment_sequential', 'int'),
 ('payment_type', 'string'),
 ('payment_installments', 'int'),
 ('payment_value', 'double')]

In [24]:
count_nulls(order_payments_df).show()

+--------------------+----------+
|              column|null_count|
+--------------------+----------+
|            order_id|         0|
|  payment_sequential|         0|
|        payment_type|         0|
|payment_installments|         0|
|       payment_value|         0|
+--------------------+----------+



In [25]:
order_items_df.dtypes

[('order_id', 'string'),
 ('order_item_id', 'int'),
 ('product_id', 'string'),
 ('seller_id', 'string'),
 ('shipping_limit_date', 'timestamp'),
 ('price', 'double'),
 ('freight_value', 'double')]

In [26]:
count_nulls(order_items_df).show()

+-------------------+----------+
|             column|null_count|
+-------------------+----------+
|           order_id|         0|
|      order_item_id|         0|
|         product_id|         0|
|          seller_id|         0|
|shipping_limit_date|         0|
|              price|         0|
|      freight_value|         0|
+-------------------+----------+



In [27]:
order_reviews.dtypes

[('review_id', 'string'),
 ('order_id', 'string'),
 ('review_score', 'string'),
 ('review_comment_title', 'string'),
 ('review_comment_message', 'string'),
 ('review_creation_date', 'string'),
 ('review_answer_timestamp', 'string')]

In [28]:
count_nulls(order_reviews).show()

+--------------------+----------+
|              column|null_count|
+--------------------+----------+
|           review_id|         1|
|            order_id|      2236|
|        review_score|      2380|
|review_comment_title|     92157|
|review_comment_me...|     63079|
|review_creation_date|      8764|
|review_answer_tim...|      8785|
+--------------------+----------+



In [29]:
# Dropping rown containing nulls in columns ('order_id','review_id','review_score','review_creation_date','review_answer_timestamp')
order_reviews = order_reviews.dropna(how='any',subset=['order_id','review_id','review_score','review_creation_date',
                                                       'review_answer_timestamp'])

In [30]:
count_nulls(order_reviews).show()

+--------------------+----------+
|              column|null_count|
+--------------------+----------+
|           review_id|         0|
|            order_id|         0|
|        review_score|         0|
|review_comment_title|     84730|
|review_comment_me...|     58247|
|review_creation_date|         0|
|review_answer_tim...|         0|
+--------------------+----------+



In [31]:
#Dropping irrelevant columns
order_reviews = order_reviews.drop('review_comment_title')
order_reviews = order_reviews.drop('review_comment_message')

In [32]:
count_nulls(order_reviews).show()

+--------------------+----------+
|              column|null_count|
+--------------------+----------+
|           review_id|         0|
|            order_id|         0|
|        review_score|         0|
|review_creation_date|         0|
|review_answer_tim...|         0|
+--------------------+----------+



In [33]:
sellers_df.dtypes

[('seller_id', 'string'),
 ('seller_zip_code_prefix', 'int'),
 ('seller_city', 'string'),
 ('seller_state', 'string')]

In [34]:
#Dropping irrelevant columns
sellers_df = sellers_df.drop('seller_zip_code_prefix')

In [35]:
count_nulls(sellers_df).show()

+------------+----------+
|      column|null_count|
+------------+----------+
|   seller_id|         0|
| seller_city|         0|
|seller_state|         0|
+------------+----------+



In [36]:
product_category_name_translation_df.dtypes

[('product_category_name', 'string'),
 ('product_category_name_english', 'string')]

In [37]:
count_nulls(product_category_name_translation_df).show()

+--------------------+----------+
|              column|null_count|
+--------------------+----------+
|product_category_...|         0|
|product_category_...|         0|
+--------------------+----------+



In [38]:
#Joining all datasets into a single dataset (post-processing)
joined_df = orders_df.join(order_items_df,on='order_id',how='inner')\
.join(order_payments_df,on='order_id',how='inner')\
.join(customers_df,on='customer_id',how='inner')\
.join(order_reviews,on='order_id',how='inner')\
.join(products_df,on='product_id',how='inner')\
.join(sellers_df,on='seller_id',how='inner')\
.join(product_category_name_translation_df,on='product_category_name',how='inner')


In [39]:
joined_df.columns

['product_category_name',
 'seller_id',
 'product_id',
 'order_id',
 'customer_id',
 'order_status',
 'order_purchase_timestamp',
 'order_delivered_customer_date',
 'order_estimated_delivery_date',
 'order_item_id',
 'shipping_limit_date',
 'price',
 'freight_value',
 'payment_sequential',
 'payment_type',
 'payment_installments',
 'payment_value',
 'customer_unique_id',
 'customer_zip_code_prefix',
 'customer_city',
 'customer_state',
 'review_id',
 'review_score',
 'review_creation_date',
 'review_answer_timestamp',
 'seller_city',
 'seller_state',
 'product_category_name_english']

In [40]:
# Saving the processed and joined data on HDFS as a CSV file
joined_df.write.csv("hdfs:///user/talentum/processed_combined_data.csv")

In [41]:
# Saving processed data to hive table using PartitionBy by 'customer_state'
joined_df.write.format("orc").mode("overwrite").partitionBy('customer_state')\
.option("path", "hdfs:///user/hive/warehouse/")\
.saveAsTable("ecomm.olist_data_table")