## Module 1 - Data Ingestion & Exploration

#### Step 1: Setting Up the Spark Environment

In a real-world company setup, we wouldn’t use Google Colab directly. Instead, we would:

1. **Deploy a Spark Cluster** (like AWS EMR, GCP Dataproc, or an on-prem Hadoop cluster, Azure HD Insight).

2. **Store Data in HDFS** instead of local storage.
    > Load data from Kaggle i.e., [Data Source](https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce "click the link to go to kaggle")  
    >  ```bash
    > !curl -L -o ~/olist/brazilian-ecommerce.zip https://www.kaggle.com/api/v1/datasets/download/olistbr/brazilian-ecommerce
    >```
    
    > Unzip the file  
    > ```bash
    > !unzip brazilian-ecommerce.zip -d ~/olist/data/
    > ```
     <br>
3. **Use PySpark** to interact with data.

In [8]:
#!/bin/bash
!curl -L -o /data/brazilian-ecommerce.zip https://www.kaggle.com/api/v1/datasets/download/olistbr/brazilian-ecommerce

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 42.6M  100 42.6M    0     0  68.9M      0 --:--:-- --:--:-- --:--:-- 68.9M


In [10]:
!unzip /data/brazilian-ecommerce.zip -d /data/brazilian_ecommerce

Archive:  /data/brazilian-ecommerce.zip
  inflating: /data/brazilian_ecommerce/olist_customers_dataset.csv  
  inflating: /data/brazilian_ecommerce/olist_geolocation_dataset.csv  
  inflating: /data/brazilian_ecommerce/olist_order_items_dataset.csv  
  inflating: /data/brazilian_ecommerce/olist_order_payments_dataset.csv  
  inflating: /data/brazilian_ecommerce/olist_order_reviews_dataset.csv  
  inflating: /data/brazilian_ecommerce/olist_orders_dataset.csv  
  inflating: /data/brazilian_ecommerce/olist_products_dataset.csv  
  inflating: /data/brazilian_ecommerce/olist_sellers_dataset.csv  
  inflating: /data/brazilian_ecommerce/product_category_name_translation.csv  


In [13]:
! ls -lth /data/brazilian_ecommerce

total 121M
-rw-r--r-- 1 root root 171K Oct  1  2021 olist_sellers_dataset.csv
-rw-r--r-- 1 root root 2.6K Oct  1  2021 product_category_name_translation.csv
-rw-r--r-- 1 root root 2.3M Oct  1  2021 olist_products_dataset.csv
-rw-r--r-- 1 root root  14M Oct  1  2021 olist_order_reviews_dataset.csv
-rw-r--r-- 1 root root  17M Oct  1  2021 olist_orders_dataset.csv
-rw-r--r-- 1 root root 5.6M Oct  1  2021 olist_order_payments_dataset.csv
-rw-r--r-- 1 root root  15M Oct  1  2021 olist_order_items_dataset.csv
-rw-r--r-- 1 root root  59M Oct  1  2021 olist_geolocation_dataset.csv
-rw-r--r-- 1 root root 8.7M Oct  1  2021 olist_customers_dataset.csv


In [16]:
!hadoop fs -put /data/brazilian_ecommerce/*.csv /tmp/olist/

In [21]:
!hadoop fs -ls -h /tmp/olist

Found 9 items
-rw-r--r--   2 root hadoop      8.6 M 2025-10-08 11:25 /tmp/olist/olist_customers_dataset.csv
-rw-r--r--   2 root hadoop     58.4 M 2025-10-08 11:25 /tmp/olist/olist_geolocation_dataset.csv
-rw-r--r--   2 root hadoop     14.7 M 2025-10-08 11:25 /tmp/olist/olist_order_items_dataset.csv
-rw-r--r--   2 root hadoop      5.5 M 2025-10-08 11:25 /tmp/olist/olist_order_payments_dataset.csv
-rw-r--r--   2 root hadoop     13.8 M 2025-10-08 11:25 /tmp/olist/olist_order_reviews_dataset.csv
-rw-r--r--   2 root hadoop     16.8 M 2025-10-08 11:25 /tmp/olist/olist_orders_dataset.csv
-rw-r--r--   2 root hadoop      2.3 M 2025-10-08 11:25 /tmp/olist/olist_products_dataset.csv
-rw-r--r--   2 root hadoop    170.6 K 2025-10-08 11:25 /tmp/olist/olist_sellers_dataset.csv
-rw-r--r--   2 root hadoop      2.6 K 2025-10-08 11:25 /tmp/olist/product_category_name_translation.csv


#### Exploration of Datasets

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
appName("First_stage").getOrCreate()
spark

25/10/09 08:46:41 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor). This may indicate an error, since only one SparkContext should be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.Con

In [2]:
hdfs_path = "/tmp/olist/"
customers = spark.read.csv(hdfs_path+"olist_customers_dataset.csv", header = "true", inferSchema = "true") 

                                                                                

In [3]:
customers.show(5)

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
+--------------------+--------------------+------------------------+--------------------+--------------+
only showing top 5 rows



In [4]:
customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [5]:
orders = spark.read.csv(hdfs_path+"olist_orders_dataset.csv", header = "true", inferSchema = "true") 
order_items = spark.read.csv(hdfs_path+"olist_order_items_dataset.csv", header = "true", inferSchema = "true") 
payments = spark.read.csv(hdfs_path+"olist_order_payments_dataset.csv", header = "true", inferSchema = "true") 
geo_location = spark.read.csv(hdfs_path+"olist_geolocation_dataset.csv", header = "true", inferSchema = "true") 
reviews = spark.read.csv(hdfs_path+"olist_order_reviews_dataset.csv", header = "true", inferSchema = "true") 
products = spark.read.csv(hdfs_path+"olist_products_dataset.csv", header = "true", inferSchema = "true") 
sellers = spark.read.csv(hdfs_path+"olist_sellers_dataset.csv", header = "true", inferSchema = "true") 
product_category = spark.read.csv(hdfs_path+"product_category_name_translation.csv", header = "true", inferSchema = "true") 

                                                                                

In [6]:
payments.show(5)
payments.printSchema()

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|
+--------------------+------------------+------------+--------------------+-------------+
only showing top 5 rows

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments

#### Data Leakage - verifying with kaggle

In [8]:
customers.count()

99441

In [10]:
print(orders.count())
print(payments.count())
print(order_items.count())
print(reviews.count())

99441
103886
112650
104162


#### Checking for Null values in dataframes

In [19]:
# null values count in each column
from pyspark.sql.functions import col, when, count
customers.select([count(when(customers[i].isNull(),1)).alias(i) for i in customers.columns]).show()

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



In [20]:
orders.select([count(when(col(i).isNull(),1)).alias(i) for i in orders.columns]).show()

+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|              160|                        1783|                         2965|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



In [21]:
payments.select([count(when(col(i).isNull(),1)).alias(i) for i in payments.columns]).show()

+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
|       0|                 0|           0|                   0|            0|
+--------+------------------+------------+--------------------+-------------+



#### Checking for duplicate values

In [24]:
customers.groupBy("customer_id").count().filter("count>1").show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
+-----------+-----+



In [25]:
orders.groupBy("order_id").count().filter("count>1").show()

+--------+-----+
|order_id|count|
+--------+-----+
+--------+-----+

