In [5]:
import pyspark
from pyspark.sql import SparkSession

In [6]:
# Setting up spark
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.functions import *
conf = SparkConf().setMaster("local").setAppName("Missed_Deadlines")
spark = SparkSession.builder.getOrCreate()
print(spark)

# Set sqlContext from the Spark context
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)

# Edit Spark SQL context for ease of use with Pandas
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Build a spark session
spark = SparkSession.builder.getOrCreate()

<pyspark.sql.session.SparkSession object at 0x7fa234d356c0>


24/02/27 14:58:13 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.


In [7]:
df_customers = spark.read.csv('csv/olist_customers_dataset.csv',header=True)
# df_geolocation = spark.read.csv('olist_geolocation_dataset.csv',header=True)
df_order_items = spark.read.csv('csv/olist_order_items_dataset.csv',header=True)
df_order_payments = spark.read.csv('csv/olist_order_payments_dataset.csv',header=True)
df_order_reviews = spark.read.csv('csv/olist_order_reviews_dataset.csv',header=True)
df_orders = spark.read.csv('csv/olist_orders_dataset.csv',header=True)
df_products = spark.read.csv('csv/olist_products_dataset.csv',header=True)
df_sellers = spark.read.csv('csv/olist_sellers_dataset.csv',header=True)
df_product_translation = spark.read.csv('csv/product_category_name_translation.csv',header=True)

In [24]:
df_customers.describe()

DataFrame[summary: string, customer_id: string, customer_unique_id: string, customer_zip_code_prefix: string, customer_city: string, customer_state: string]

In [9]:
df_orders.columns

['order_id',
 'customer_id',
 'order_status',
 'order_purchase_timestamp',
 'order_approved_at',
 'order_delivered_carrier_date',
 'order_delivered_customer_date',
 'order_estimated_delivery_date']

In [25]:
df_order_items.describe

<bound method DataFrame.describe of DataFrame[order_id: string, order_item_id: string, product_id: string, seller_id: string, shipping_limit_date: string, price: string, freight_value: string]>

In [11]:
df_order_payments.columns

['order_id',
 'payment_sequential',
 'payment_type',
 'payment_installments',
 'payment_value']

In [12]:
df_order_reviews.columns

['review_id',
 'order_id',
 'review_score',
 'review_comment_title',
 'review_comment_message',
 'review_creation_date',
 'review_answer_timestamp']

In [13]:
df_products.columns
df_products.count()

32951

In [14]:
df_sellers.columns

['seller_id', 'seller_zip_code_prefix', 'seller_city', 'seller_state']

In [15]:
df_product_translation.columns
df_product_translation= df_product_translation.withColumnRenamed('product_category_name','product_name')

In [16]:

from pyspark.sql.functions import col,isnan, when, count

In [17]:
df_products =df_products.dropna()

In [18]:
df_product_translation.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_product_translation.columns]).show()

+------------+-----------------------------+
|product_name|product_category_name_english|
+------------+-----------------------------+
|           0|                            0|
+------------+-----------------------------+



In [19]:
# Translate name of product on df_products
df_products_translated = df_products.join(df_product_translation, df_products.product_category_name == df_product_translation.product_name).drop('product_category_name','product_name')
df_products_translated = df_products_translated.withColumnRenamed('product_category_name_english','product_category_name').select(df_products.columns)
df_products_translated.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|            perfumery|                 40|                       287|                 1|             225|               16|               10|              14|
|3aa071139cb16b67c...|                  art|                 44|                       276|                 1|            1000|               30|               18|              20|
|96bd76ec8810374ed...|       sports_leisure|                 46|                       250|    

In [20]:
# Create SQL Table Views from dfs for SQL querying
df_order_items.createOrReplaceTempView('items')
df_orders.createOrReplaceTempView('orders')
df_products.createOrReplaceTempView('products')
df_product_translation.createOrReplaceTempView('products_trans')
df_products_translated.createOrReplaceTempView('products_translated')

In [29]:
late_carrier_deliveries = spark.sql("""
SELECT i.order_id, i.seller_id, DATE(i.shipping_limit_date), FLOAT(i.price), FLOAT(i.freight_value),
       p.product_id, p.product_category_name,
       o.customer_id, o.order_status, DATE(o.order_purchase_timestamp), DATE(o.order_delivered_carrier_date),
       DATE(o.order_delivered_customer_date), DATE(o.order_estimated_delivery_date)
FROM items AS i
JOIN orders AS o
ON i.order_id = o.order_id
JOIN products_translated AS p
ON i.product_id = p.product_id
WHERE i.shipping_limit_date < o.order_delivered_carrier_date
""")

In [30]:
late_carrier_deliveries.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: date (nullable = true)
 |-- price: float (nullable = true)
 |-- freight_value: float (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: date (nullable = true)
 |-- order_delivered_carrier_date: date (nullable = true)
 |-- order_delivered_customer_date: date (nullable = true)
 |-- order_estimated_delivery_date: date (nullable = true)



In [26]:
late_carrier_deliveries.show()

                                                                                

+--------------------+--------------------+-------------------+------+-------------+--------------------+---------------------+--------------------+------------+------------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|           seller_id|shipping_limit_date| price|freight_value|          product_id|product_category_name|         customer_id|order_status|order_purchase_timestamp|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+-------------------+------+-------------+--------------------+---------------------+--------------------+------------+------------------------+----------------------------+-----------------------------+-----------------------------+
|00018f77f2f0320c5...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.90|        19.93|e5f2d52b802189ee6...|             pet_shop|f6dd3ec061db4e398...|   delivered|     2017-0

In [23]:
late_carrier_deliveries.count()

                                                                                

10203