# Olist Database EDA

In [1]:
import os
from configparser import ConfigParser

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
# config = ConfigParser()
# config.read('config.cfg')

# S3 buckets
# RAW_DATA = config['S3']['RAW_DATA']
# STAGING_DATA = config['S3']['STAGING_DATA']

# Redshift
# REDSHIFT_ROLE = config['REDSHIFT']['REDSHIFT_S3_IAM_ROLE_ARN']

# EMR
# EC2_KEY_NAME = config['EMR']['EC2_KEY_NAME']

In [3]:
spark = SparkSession.builder \
                    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
                    .getOrCreate()

In [4]:
spark

## Utility Functions

In [5]:
def explain_df(df):
    df.printSchema()
    print()
    df.show(2)
    print()
    df.describe().show()


def unique_count(df):
    print('Number of unique values in each column:')
    print('---------------------------------------')
    for c in df.columns:
        print(c, '-', df.select(c).distinct().count())
    print(f'\nRow Count: {df.count()}')


def null_count(df):
    df.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

## EDA: Customers

In [7]:
customers_df = spark.read.csv('data/olist_customers_dataset.csv',
                              inferSchema=True, header=True)

In [8]:
explain_df(df=customers_df)

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)


+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
+--------------------+--------------------+------------------------+--------------------+--------------+
only showing top 2 rows


+-------+--------------------+--------------------+------------------------+-------------------+

In [9]:
null_count(df=customers_df)

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



***Observation:*** No nulls in customer table

In [10]:
unique_count(df=customers_df)

Number of unique values in each column:
---------------------------------------
customer_id - 99441
customer_unique_id - 96096
customer_zip_code_prefix - 14994
customer_city - 4119
customer_state - 27

Row Count: 99441


## EDA: Sellers

In [11]:
sellers_df = spark.read.csv('data/olist_sellers_dataset.csv',
                            inferSchema=True, header=True)

In [12]:
explain_df(sellers_df)

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)


+--------------------+----------------------+-----------+------------+
|           seller_id|seller_zip_code_prefix|seller_city|seller_state|
+--------------------+----------------------+-----------+------------+
|3442f8959a84dea7e...|                 13023|   campinas|          SP|
|d1b65fc7debc3361e...|                 13844| mogi guacu|          SP|
+--------------------+----------------------+-----------+------------+
only showing top 2 rows


+-------+--------------------+----------------------+-----------+------------+
|summary|           seller_id|seller_zip_code_prefix|seller_city|seller_state|
+-------+--------------------+----------------------+-----------+------------+
|  count|                3095|                  3095|       3095|        3095|
|   mean|                null|    32291.05

In [13]:
null_count(sellers_df)

+---------+----------------------+-----------+------------+
|seller_id|seller_zip_code_prefix|seller_city|seller_state|
+---------+----------------------+-----------+------------+
|        0|                     0|          0|           0|
+---------+----------------------+-----------+------------+



In [14]:
unique_count(sellers_df)

Number of unique values in each column:
---------------------------------------
seller_id - 3095
seller_zip_code_prefix - 2246
seller_city - 611
seller_state - 23

Row Count: 3095


## EDA: Products

In [6]:
products_df = spark.read.csv('data/olist_products_dataset.csv',
                             inferSchema=True, header=True)

In [7]:
explain_df(products_df)

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)


+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+-------------

In [8]:
null_count(products_df)

+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|         0|                  610|                610|                       610|               610|               2|                2|                2|               2|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+



In [9]:
products_df.createOrReplaceTempView('product')

In [19]:
spark.sql("""SELECT COUNT(product_id) FROM product
             WHERE product_category_name IS NULL
               AND product_name_lenght IS NULL
               AND product_description_lenght IS NULL
               AND product_photos_qty IS NULL
          """).show()

+-----------------+
|count(product_id)|
+-----------------+
|              610|
+-----------------+



In [20]:
spark.sql("""SELECT COUNT(product_id) FROM product
             WHERE product_category_name IS NULL
               AND product_name_lenght IS NULL
               AND product_description_lenght IS NULL
               AND product_photos_qty IS NULL
               AND product_weight_g IS NULL
               AND product_length_cm IS NULL
               AND product_height_cm IS NULL
               AND product_width_cm IS NULL
          """).show()

+-----------------+
|count(product_id)|
+-----------------+
|                1|
+-----------------+



***Observation:*** There are around 610 products without any name and description. Also, there is one product with no details at all. Lets see if there are any orders with these products. If there are no orders with these products we can ignore these products while moving to the data warehouse.

Lets save the product_ids of these products for future analysis.

In [21]:
products_partial_details = spark.sql("""SELECT product_id FROM product
                                        WHERE product_category_name IS NULL
                                           AND product_name_lenght IS NULL
                                           AND product_description_lenght IS NULL
                                           AND product_photos_qty IS NULL
                                      """).collect()

products_partial_details = [r.product_id for r in products_partial_details]

In [22]:
products_no_details = spark.sql("""SELECT product_id FROM product
                                   WHERE product_category_name IS NULL
                                     AND product_name_lenght IS NULL
                                     AND product_description_lenght IS NULL
                                     AND product_photos_qty IS NULL
                                     AND product_weight_g IS NULL
                                     AND product_length_cm IS NULL
                                     AND product_height_cm IS NULL
                                     AND product_width_cm IS NULL
                                """).collect()

products_no_details = [r.product_id for r in products_no_details]

In [23]:
unique_count(products_df)

Number of unique values in each column:
---------------------------------------
product_id - 32951
product_category_name - 74
product_name_lenght - 67
product_description_lenght - 2961
product_photos_qty - 20
product_weight_g - 2205
product_length_cm - 100
product_height_cm - 103
product_width_cm - 96

Row Count: 32951


## EDA: Orders

In [19]:
orders_df = spark.read.csv('data/olist_orders_dataset.csv',
                           inferSchema=True, header=True)

In [25]:
explain_df(orders_df)

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)


+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+----------------------

In [115]:
null_count(df=orders_df)

+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|              160|                        1783|                         2965|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



In [27]:
unique_count(orders_df)

Number of unique values in each column:
---------------------------------------
order_id - 99441
customer_id - 99441
order_status - 8
order_purchase_timestamp - 98875
order_approved_at - 90734
order_delivered_carrier_date - 81019
order_delivered_customer_date - 95665
order_estimated_delivery_date - 459

Row Count: 99441


In [20]:
orders_df.createOrReplaceTempView('order')

In [29]:
spark.sql(f"""SELECT MIN(order_purchase_timestamp),
                     MAX(order_purchase_timestamp)
             FROM order
           """).show()

+-----------------------------+-----------------------------+
|min(order_purchase_timestamp)|max(order_purchase_timestamp)|
+-----------------------------+-----------------------------+
|          2016-09-04 21:15:19|          2018-10-17 17:30:18|
+-----------------------------+-----------------------------+



In [114]:
spark.sql(f"""SELECT DISTINCT order_status AS order_status_types
             FROM order
           """).show()

+------------------+
|order_status_types|
+------------------+
|           shipped|
|          canceled|
|          approved|
|          invoiced|
|           created|
|         delivered|
|       unavailable|
|        processing|
+------------------+



***Observation:*** Dataset has almost 2 years of sales data.

# EDA: Order Items

In [26]:
order_items_df = spark.read.csv('data/olist_order_items_dataset.csv',
                                inferSchema=True, header=True)

In [31]:
explain_df(order_items_df)

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)


+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
+--------------------+-------------+--------------------+--------------------+---------------

In [32]:
null_count(order_items_df)

+--------+-------------+----------+---------+-------------------+-----+-------------+
|order_id|order_item_id|product_id|seller_id|shipping_limit_date|price|freight_value|
+--------+-------------+----------+---------+-------------------+-----+-------------+
|       0|            0|         0|        0|                  0|    0|            0|
+--------+-------------+----------+---------+-------------------+-----+-------------+



In [33]:
unique_count(order_items_df)

Number of unique values in each column:
---------------------------------------
order_id - 98666
order_item_id - 21
product_id - 32951
seller_id - 3095
shipping_limit_date - 93318
price - 5968
freight_value - 6999

Row Count: 112650


In [27]:
order_items_df.createOrReplaceTempView('order_item')

In [35]:
spark.sql("""SELECT COUNT(DISTINCT order_id, order_item_id, product_id, seller_id)
             FROM order_item
          """).show()

+--------------------------------------------------------------+
|count(DISTINCT order_id, order_item_id, product_id, seller_id)|
+--------------------------------------------------------------+
|                                                        112650|
+--------------------------------------------------------------+



***Observation:*** It looks like `order_id, order_item_id, product_id, seller_id` together form a PK for the order_items table in the operational data source. In fact, on grouping the rows based on `order_id, product_id, seller_id` and counting the number of rows gives the quantity/ #units of `product_id` are ordered from the seller with `seller_id` in the order with order_id `order_id`.

In [36]:
spark.sql("""SELECT SUM(qty)
             FROM (SELECT order_id, product_id, seller_id, COUNT(order_item_id) AS qty
                   FROM order_item
                   GROUP BY order_id, product_id, seller_id
                   HAVING qty) AS tempTable
          """).show()

+--------+
|sum(qty)|
+--------+
|  112650|
+--------+



### Check if there are any orders with products having missing details

In [37]:
spark.sql("""SELECT product_id, COUNT(order_id) numberOfOrders
             FROM order_item
             WHERE product_id IN (SELECT product_id FROM product
                                  WHERE product_category_name IS NULL
                                    AND product_name_lenght IS NULL
                                    AND product_description_lenght IS NULL
                                    AND product_photos_qty IS NULL)
             GROUP BY product_id
             HAVING COUNT(order_id) = 0
          """).show()

+----------+--------------+
|product_id|numberOfOrders|
+----------+--------------+
+----------+--------------+



***Observation:*** Products with missing name and description are present in atleast one order. So the products with incomplete details cannot be ignored while moving to the warehouse.

In [38]:
spark.sql(f"""SELECT product_id, COUNT(order_id) numberOfOrders
             FROM order_item
             WHERE product_id = '{products_no_details[0]}'
             GROUP BY product_id
           """).show()

+--------------------+--------------+
|          product_id|numberOfOrders|
+--------------------+--------------+
|5eb564652db742ff8...|            17|
+--------------------+--------------+



***Observation:*** Interestingly there are 17 orders containing the product with no details!!

### How to get number of units of a product per order?

In [201]:
spark.sql("""SELECT order_id, product_id, seller_id, COUNT(DISTINCT shipping_limit_date)
             FROM order_item
             GROUP BY order_id, product_id, seller_id
             HAVING COUNT(DISTINCT shipping_limit_date) > 1
          """).show()

+--------+----------+---------+-----------------------------------+
|order_id|product_id|seller_id|count(DISTINCT shipping_limit_date)|
+--------+----------+---------+-----------------------------------+
+--------+----------+---------+-----------------------------------+



In [202]:
spark.sql("""SELECT order_id, product_id, seller_id, COUNT(DISTINCT price)
             FROM order_item
             GROUP BY order_id, product_id, seller_id
             HAVING COUNT(DISTINCT price) > 1
          """).show()

+--------+----------+---------+---------------------+
|order_id|product_id|seller_id|count(DISTINCT price)|
+--------+----------+---------+---------------------+
+--------+----------+---------+---------------------+



In [200]:
spark.sql("""SELECT order_id, product_id, seller_id, COUNT(DISTINCT freight_value)
             FROM order_item
             GROUP BY order_id, product_id, seller_id
             HAVING COUNT(DISTINCT freight_value) > 1
          """).show()

+--------+----------+---------+-----------------------------+
|order_id|product_id|seller_id|count(DISTINCT freight_value)|
+--------+----------+---------+-----------------------------+
+--------+----------+---------+-----------------------------+



***Observation:*** The `order_items` table contains 1 row for each unit of a product in an `order_id`.

In [215]:
spark.sql("""SELECT order_id, product_id, seller_id,
                    shipping_limit_date, price, freight_value,
                    COUNT(order_item_id) AS qty
             FROM order_item
             GROUP BY order_id, product_id, seller_id,
                      shipping_limit_date, price, freight_value
          """).show(1)

+--------------------+--------------------+--------------------+-------------------+-----+-------------+---+
|            order_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|qty|
+--------------------+--------------------+--------------------+-------------------+-----+-------------+---+
|00c9d9e61ed13f5bf...|2ffb8b836bb62d53a...|391fc6631aebcf300...|2017-03-02 13:25:10|44.55|        11.74|  1|
+--------------------+--------------------+--------------------+-------------------+-----+-------------+---+
only showing top 1 row



## EDA: Payments

In [28]:
payments_df = spark.read.csv('data/olist_order_payments_dataset.csv',
                             inferSchema=True, header=True)

In [40]:
explain_df(payments_df)

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)


+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
+--------------------+------------------+------------+--------------------+-------------+
only showing top 2 rows


+-------+--------------------+------------------+------------+--------------------+------------------+
|summary|            order_id|payment_sequential|payment_type|payment_installments|     pay

In [41]:
null_count(payments_df)

+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
|       0|                 0|           0|                   0|            0|
+--------+------------------+------------+--------------------+-------------+



In [42]:
unique_count(payments_df)

Number of unique values in each column:
---------------------------------------
order_id - 99440
payment_sequential - 29
payment_type - 5
payment_installments - 24
payment_value - 29077

Row Count: 103886


In [29]:
payments_df.createOrReplaceTempView('payment')

In [44]:
spark.sql("""SELECT *
             FROM payment
             WHERE order_id = '7a5472f7c8cecc2e1cf43d12271e4eca'
          """).show(truncate=False)

+--------------------------------+------------------+------------+--------------------+-------------+
|order_id                        |payment_sequential|payment_type|payment_installments|payment_value|
+--------------------------------+------------------+------------+--------------------+-------------+
|7a5472f7c8cecc2e1cf43d12271e4eca|1                 |credit_card |8                   |228.75       |
|7a5472f7c8cecc2e1cf43d12271e4eca|2                 |voucher     |1                   |62.87        |
+--------------------------------+------------------+------------+--------------------+-------------+



***Observation:*** Since the scope of the data warehouse/ mart is sales analysis, we can just take the `payment_value` and aggregate over the `order_id` and store it in the fact table as a measure indicating the total order value.

## EDA: Reviews

In [30]:
reviews_df = spark.read.csv('data/olist_order_reviews_dataset.csv',
                            inferSchema=True, header=True)

In [46]:
explain_df(reviews_df)

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable = true)


+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|           review_id|            order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|7bc2406110b926393...|73fc7af87114b3971...|           4|                null|                  null| 2018-01-18 00:00:00|    2018-01-18 21:46:59|
|80e641a11e56f04c1...|a548910a1c6147796...|           5|               

In [47]:
null_count(reviews_df)

+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+
|review_id|order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+
|        1|    2330|        2497|               93013|                 63321|                9164|                   9187|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+



***Observation:*** We could drop the reviews with NULL `order_id` and NULL `review_id` while populating the datawarehouse dimensions.

In [48]:
unique_count(reviews_df)

Number of unique values in each column:
---------------------------------------
review_id - 103957
order_id - 100563
review_score - 2568
review_comment_title - 5045
review_comment_message - 36664
review_creation_date - 726
review_answer_timestamp - 95073

Row Count: 105189


### Are there any orders with more than 1 review?

In [31]:
reviews_df.createOrReplaceTempView('review')

In [50]:
spark.sql("""SELECT COUNT(order_id)
             FROM (SELECT order_id
                   FROM review
                   GROUP BY order_id
                   HAVING COUNT(review_id) > 1) AS tempTable
          """).show()

+---------------+
|count(order_id)|
+---------------+
|            967|
+---------------+



***Observation:*** There are 967 orders with more than 1 review. Let's record only average `review_score` in the fact table.

## EDA: Geolocation

In [51]:
geo_df = spark.read.csv('data/olist_geolocation_dataset.csv',
                        inferSchema=True, header=True)

In [52]:
explain_df(geo_df)

root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)


+---------------------------+-------------------+------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|   geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+------------------+----------------+-----------------+
|                       1037| -23.54562128115268|-46.63929204800168|       sao paulo|               SP|
|                       1046|-23.546081127035535|-46.64482029837157|       sao paulo|               SP|
+---------------------------+-------------------+------------------+----------------+-----------------+
only showing top 2 rows


+-------+---------------------------+-------------------+-------------------+---------------

In [53]:
null_count(geo_df)

+---------------------------+---------------+---------------+----------------+-----------------+
|geolocation_zip_code_prefix|geolocation_lat|geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+---------------+---------------+----------------+-----------------+
|                          0|              0|              0|               0|                0|
+---------------------------+---------------+---------------+----------------+-----------------+



In [54]:
unique_count(geo_df)

Number of unique values in each column:
---------------------------------------
geolocation_zip_code_prefix - 19015
geolocation_lat - 717372
geolocation_lng - 717615
geolocation_city - 8011
geolocation_state - 27

Row Count: 1000163


In [55]:
geo_df.createOrReplaceTempView('geolocation')

In [61]:
spark.sql("""SELECT geolocation_zip_code_prefix, COUNT(*) AS distinctCoordinates
             FROM geolocation
             GROUP BY geolocation_zip_code_prefix
          """).show()

+---------------------------+-------------------+
|geolocation_zip_code_prefix|distinctCoordinates|
+---------------------------+-------------------+
|                       1238|                164|
|                       2122|                 33|
|                       2142|                  5|
|                       2366|                 33|
|                       2866|                 41|
|                       3175|                 32|
|                       3918|                 50|
|                       4101|                 72|
|                       4935|                 15|
|                       5518|                 27|
|                       6397|                 95|
|                       6654|                187|
|                       6620|                 66|
|                       7240|                155|
|                       8592|                 18|
|                       9852|                107|
|                      12940|                253|


In [60]:
spark.sql("""SELECT geolocation_zip_code_prefix, geolocation_city, geolocation_state,
             COUNT(DISTINCT geolocation_lat, geolocation_lng) AS distinctCoordinates
             FROM geolocation
             GROUP BY geolocation_zip_code_prefix, geolocation_city, geolocation_state
          """).show()

+---------------------------+--------------------+-----------------+-------------------+
|geolocation_zip_code_prefix|    geolocation_city|geolocation_state|distinctCoordinates|
+---------------------------+--------------------+-----------------+-------------------+
|                       4044|           sao paulo|               SP|                 39|
|                       5503|           são paulo|               SP|                  4|
|                       8225|           sao paulo|               SP|                 83|
|                       9230|         santo andre|               SP|                163|
|                      11050|              santos|               SP|                138|
|                      17320|   mineiros do tiete|               SP|                 42|
|                      21735|      rio de janeiro|               RJ|                 65|
|                      23098|      rio de janeiro|               RJ|                 29|
|                    

***Observation:*** Since each (zip_code, city, state) combination has more than 1 distinct coordinate, it is NOT clear what those coordinates indicate. Do they indicate coordinates from which a customer has placed an order? If so which order is placed from which coordinate? There is no connection to neither the orders nor the orders_items table. **For the purpose of building sales data warehouse we can ignore the geolocation table completely**.

# Sales Data Warehouse Schema
***Data Warehouse/ Mart Scope:*** For Sales analysis targeted towards the sales deparment.  

***Granularity/ Level of Detail of the Data Warehouse:*** Daily  

***Kinds of questions to be answered by the warehouse:***  
- No. of orders received in each month/ year
- Avg total of orders received in each day/ month/ year
- Weekly/ Monthly/ yearly Avg spendings of each customer
- Weekly/ Monthly/ yearly Avg sales of each product type
- Weekly/ Monthly/ yearly Avg sales made by each seller
- Weekly/ Monthly/ yearly city/ state wise sales 

***Possible Project Extension:*** Additionally we could also have a mart for Accounting (fact-payments) and Marketing (fact-reviews) departments  

***Type of dimensional model:*** STAR SCHEMA

## DIMENSIONS:

```plsql
customer(customer_id, customer_unique_id, customer_zip_code_prefix, customer_city, customer_state)  
  seller(seller_id, seller_zip_code_prefix, seller_city, seller_state)
 product(product_id,
         product_category,            --> Translated using "product_category_name_translation.csv"
         product_photos_qty,
         product_weight_g,
         product_length_cm,
         product_height_cm,
         product_width_cm)
calender(calender_id, date, day,
         dayofweek, month, year)
```

## FACT Table:

```plsql
order_item(
           order_item_id,                          --> Artificial PK
           order_id,                               --> orders
           customer_id,                            --> orders
           product_id,                             --> order_items
           seller_id,                              --> order_items
           purchase_date,                          --> orders (from "order_purchase_timestamp")
           purchase_year,                          --> orders (from "order_purchase_timestamp")
           purchase_month,                         --> orders (from "order_purchase_timestamp")
           order_purchase_timestamp,               --> orders (from "order_purchase_timestamp")
           order_approved_timestamp,               --> orders (from "order_approved_at")
           order_delivered_carrier_timestamp,      --> orders (EXTRACT DATE from "order_delivered_carrier_date")
           order_delivered_customer_timestamp,     --> orders (EXTRACT DATE from "order_delivered_customer_date")
           order_estimated_delivery_timestamp,     --> orders (EXTRACT DATE from "order_estimated_delivery_date")
           order_status,                           --> orders
           shipping_limit_date,                    --> order_items
           freight_value,                          --> order_items
           unit_price,                             --> order_items (rename "price" -> "unit_price")
           qty,                                    --> order_items (derived measure)
           total_product_price,                    --> (qty * product_unit_price)
           total_order_price,                      --> order_payments (derived measure)
           avg_review_score,                       --> order_reviews (derived measure)
          )
```

# ETL Pipeline

## Customer Dimension  

No transformations needed for customers table. Just saving in parquet format.

In [71]:
customers_df.write.parquet('staging_data/customer', mode='overwrite')

## Sellers Dimension  
No transformations needed for sellers table. Just saving in parquet format.

In [72]:
sellers_df.write.parquet('staging_data/seller', mode='overwrite')

## Product Dimension  
1. Join the `products` table with the `product_category_names` table in `product_category_name_translation.csv` to get the product names in english.  
2. We can choose to drop the column `product_name_lenght` since we have translated the category name to english and the length will no more be relevant for sales analysis.
3. Then save the `product` dimension in parquet format.

In [10]:
category_translation = spark.read.csv('data/product_category_name_translation.csv',
                                      inferSchema=True, header=True)

In [11]:
category_translation.show(2)

+---------------------+-----------------------------+
|product_category_name|product_category_name_english|
+---------------------+-----------------------------+
|         beleza_saude|                health_beauty|
| informatica_acess...|         computers_accesso...|
+---------------------+-----------------------------+
only showing top 2 rows



In [14]:
category_translation.createOrReplaceTempView('translation')

In [17]:
product_dim = spark.sql("""SELECT p.*, t.product_category_name_english AS product_category
                           FROM product p
                           LEFT JOIN translation t
                                   ON p.product_category_name = t.product_category_name
                        """) \
                   .drop('product_category_name', 'product_name_lenght') \
                   .withColumnRenamed('product_description_lenght',
                                      'product_description_length') ## correct the column name "lenght" to "length"

In [131]:
product_dim.show(1)

+--------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------+
|          product_id|product_description_length|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_category|
+--------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------+
|1e9e8ef04dbcff454...|                       287|                 1|             225|               16|               10|              14|       perfumery|
+--------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------+
only showing top 1 row



In [132]:
product_dim.write.parquet('staging_data/product', mode='overwrite')

## Calender Dimension

Using the `order_purchase_timestamp` from the 

In [21]:
calender_dim = spark.sql("""SELECT DISTINCT CAST(order_purchase_timestamp AS DATE) as date
                            FROM order

                            UNION

                            SELECT DISTINCT CAST(order_approved_at AS DATE) as date
                            FROM order

                            UNION

                            SELECT DISTINCT CAST(order_delivered_carrier_date AS DATE) as date
                            FROM order

                            UNION

                            SELECT DISTINCT CAST(order_delivered_customer_date AS DATE) as date
                            FROM order

                            UNION

                            SELECT DISTINCT CAST(order_estimated_delivery_date AS DATE) as date
                            FROM order
                        """)

In [22]:
calender_dim.createOrReplaceTempView('calender')

In [23]:
calender_dim = spark.sql("""SELECT date,
                                   year(date) AS year,
                                   month(date) AS month,
                                   day(date) AS day,
                                   dayofweek(date) AS dayOfWeek
                            FROM calender
                            WHERE date IS NOT NULL
                         """)

In [25]:
calender_dim.printSchema()

root
 |-- date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dayOfWeek: integer (nullable = true)



In [170]:
calender_dim.show(2)

+----------+----+-----+---+---------+
|      date|year|month|day|dayOfWeek|
+----------+----+-----+---+---------+
|2017-09-11|2017|    9| 11|        2|
|2018-08-10|2018|    8| 10|        6|
+----------+----+-----+---+---------+
only showing top 2 rows



In [171]:
calender_dim.write.parquet('staging_data/calender', mode='overwrite')

## Fact Table: `order_item`

### Getting required fields from `orders` and `order_items`

In [32]:
order_item_fact = spark.sql("""SELECT monotonically_increasing_id() AS order_item_id,
                                      o.order_id,                                           ----> Retaining Business Key for analysis
                                      o.customer_id, i.product_id, i.seller_id,
                                      CAST(o.order_purchase_timestamp AS DATE)              AS purchase_date,
                                      CAST(year(o.order_purchase_timestamp) AS SMALLINT)    AS purchase_year,
                                      CAST(month(o.order_purchase_timestamp) AS SMALLINT)   AS purchase_month,
                                      o.order_purchase_timestamp,
                                      o.order_approved_at                                   AS order_approved_timestamp,
                                      o.order_delivered_carrier_date                        AS order_delivered_carrier_timestamp,
                                      o.order_delivered_customer_date                       AS order_delivered_customer_timestamp,
                                      o.order_estimated_delivery_date                       AS order_estimated_delivery_timestamp,
                                      o.order_status,
                                      i.shipping_limit_date,
                                      
                                      ----------------------------- MEASURES -----------------------------
                                      
                                      CAST(i.freight_value AS DECIMAL(5, 2))                AS freight_value,
                                      CAST(i.price AS DECIMAL(10, 2))                       AS unit_price,
                                      CAST(i.qty AS SMALLINT)                               AS qty,
                                      CAST(ROUND(i.qty * i.price, 2) AS DECIMAL(10, 2))     AS total_product_price,
                                      
                                      (SELECT CAST(SUM(p.payment_value) AS DECIMAL(10, 2))
                                       FROM payment AS p
                                       WHERE p.order_id = o.order_id)                       AS total_order_price,
                                       
                                      (SELECT CAST(ROUND(AVG(r.review_score), 1) AS DECIMAL(2, 1))
                                       FROM review AS r
                                       WHERE r.order_id = o.order_id)                       AS avg_review_score
                               FROM order AS o
                               INNER JOIN (SELECT order_id, product_id, seller_id,
                                                  shipping_limit_date, price, freight_value,
                                                  COUNT(order_item_id) AS qty
                                           FROM order_item
                                           GROUP BY order_id, product_id, seller_id,
                                                    shipping_limit_date, price, freight_value) AS i
                                       ON o.order_id = i.order_id
                          """)

In [33]:
order_item_fact.printSchema()

root
 |-- order_item_id: long (nullable = false)
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- purchase_date: date (nullable = true)
 |-- purchase_year: integer (nullable = true)
 |-- purchase_month: integer (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_timestamp: timestamp (nullable = true)
 |-- order_delivered_carrier_timestamp: timestamp (nullable = true)
 |-- order_delivered_customer_timestamp: timestamp (nullable = true)
 |-- order_estimated_delivery_timestamp: timestamp (nullable = true)
 |-- order_status: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- qty: long (nullable = false)
 |-- total_product_price: double (nullable = true)
 |-- total_order_price: double (nullable = true)
 |-- a

In [244]:
order_item_fact.show(1)

+-------------+--------------------+--------------------+--------------------+--------------------+-----------+-------------+--------------+------------------------+------------------------+---------------------------------+----------------------------------+----------------------------------+------------+-------------------+-------------+----------+---+-------------------+-----------------+----------------+
|order_item_id|            order_id|         customer_id|          product_id|           seller_id|calender_id|purchase_year|purchase_month|order_purchase_timestamp|order_approved_timestamp|order_delivered_carrier_timestamp|order_delivered_customer_timestamp|order_estimated_delivery_timestamp|order_status|shipping_limit_date|freight_value|unit_price|qty|total_product_price|total_order_price|avg_review_score|
+-------------+--------------------+--------------------+--------------------+--------------------+-----------+-------------+--------------+------------------------+-----------

In [245]:
# order_item_fact.write.parquet('staging_data/order_item',
#                               mode='overwrite',
#                               partitionBy=['purchase_year', 'purchase_month'])

In [246]:
spark.read.parquet('staging_data/order_item').count()

102425