### Project: Data Lake for E-commerce Data Analytics Using Real-Life Dataset

##### Setup environment

In [1]:
!pip install pyspark pandas kaggle


Collecting kaggle
  Downloading kaggle-1.6.17.tar.gz (82 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m82.7/82.7 KB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Collecting bleach
  Downloading bleach-6.1.0-py3-none-any.whl (162 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m162.8/162.8 KB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting python-slugify
  Downloading python_slugify-8.0.4-py2.py3-none-any.whl (10 kB)
Collecting tqdm
  Downloading tqdm-4.66.5-py3-none-any.whl (78 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.4/78.4 KB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
Collecting webencodings
  Downloading webencodings-0.5.1-py2.py3-none-any.whl (11 kB)
Collecting text-unidecode>=1.3
  Downloading text_unidecode-1.3-py2.py3-none-any.whl (78 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.2/78.2 KB[0m 

In [3]:
!kaggle datasets download -d olistbr/brazilian-ecommerce -p ./brazil_data_lake/raw --unzip


Dataset URL: https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce
License(s): CC-BY-NC-SA-4.0
Downloading brazilian-ecommerce.zip to ./brazil_data_lake/raw
100%|██████████████████████████████████████| 42.6M/42.6M [00:04<00:00, 9.97MB/s]
100%|██████████████████████████████████████| 42.6M/42.6M [00:04<00:00, 9.81MB/s]


#### Ingest the data into the datalake

In [5]:
from pathlib import Path

# Define directories
base_dir = Path('./brazil_data_lake')
raw_dir = base_dir / 'raw'
processed_dir = base_dir / 'processed'
cleaned_dir = base_dir / 'cleaned'

# Create directories
for dir in [raw_dir, processed_dir, cleaned_dir]:
    dir.mkdir(parents=True, exist_ok=True)


#### Process Data (ETL)

In [6]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName('EcommerceDataLake').getOrCreate()

# Load data into Spark DataFrames
orders_df = spark.read.csv(str(raw_dir / 'olist_orders_dataset.csv'), header=True, inferSchema=True)
customers_df = spark.read.csv(str(raw_dir / 'olist_customers_dataset.csv'), header=True, inferSchema=True)
order_items_df = spark.read.csv(str(raw_dir / 'olist_order_items_dataset.csv'), header=True, inferSchema=True)
products_df = spark.read.csv(str(raw_dir / 'olist_products_dataset.csv'), header=True, inferSchema=True)

# Example Transformation: Join orders with customer data and order items
enriched_orders_df = orders_df \
    .join(customers_df, on="customer_id", how="left") \
    .join(order_items_df, on="order_id", how="left") \
    .join(products_df, on="product_id", how="left")

# Save processed data
enriched_orders_df.write.parquet(str(processed_dir / 'orders_enriched.parquet'))


your 131072x1 screen size is bogus. expect trouble
24/08/25 08:32:08 WARN Utils: Your hostname, DOOM resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/08/25 08:32:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/25 08:32:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/25 08:32:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/08/25 08:32:25 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/08/25 08:32:25 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memo

#### Clean Data

In [7]:
# Load processed data
enriched_df = spark.read.parquet(str(processed_dir / 'orders_enriched.parquet'))

# Clean data: Filter for completed orders only
cleaned_df = enriched_df.filter(enriched_df.order_status == "delivered")

# Partition cleaned data by order purchase timestamp
cleaned_df.write.partitionBy("order_purchase_timestamp").parquet(str(cleaned_dir / 'orders_cleaned.parquet'))


24/08/25 08:33:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/08/25 08:33:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/08/25 08:33:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/08/25 08:33:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/08/25 08:33:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/08/25 08:33:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/08/25 08:33:39 WARN MemoryManager: Total allocation exceeds 95.00% 

#### Analyze the data

In [8]:
# Load cleaned data
cleaned_data_df = spark.read.parquet(str(cleaned_dir / 'orders_cleaned.parquet'))

# Example Query 1: Total revenue per customer
revenue_per_customer_df = cleaned_data_df.groupBy("customer_id", "customer_unique_id").sum("price")
revenue_per_customer_df.show()

# Example Query 2: Most popular products
popular_products_df = cleaned_data_df.groupBy("product_id", "product_category_name").count().orderBy("count", ascending=False)
popular_products_df.show()

# Example Query 3: Monthly revenue
from pyspark.sql.functions import month, year

monthly_revenue_df = cleaned_data_df \
    .withColumn("month", month(cleaned_data_df.order_purchase_timestamp)) \
    .withColumn("year", year(cleaned_data_df.order_purchase_timestamp)) \
    .groupBy("year", "month").sum("price").orderBy("year", "month")
monthly_revenue_df.show()


24/08/25 08:41:59 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
                                                                                

+--------------------+--------------------+------------------+
|         customer_id|  customer_unique_id|        sum(price)|
+--------------------+--------------------+------------------+
|476578ccb1cfee32b...|d42d80a0e58167c0d...|              96.0|
|4346732f4434b2e2e...|83eac38dcb4ebbc05...|              70.0|
|a22cc44a0d2b4126c...|a1044dd75b74fbc48...|              64.0|
|de1133f37c60eddaa...|18b4a8b791c74d00a...|             499.8|
|d6646ea91d8cd9fc7...|efce1ab3e96ccab8b...|             685.4|
|524735cfd75059d22...|8fef6b37a86902967...|              77.8|
|cb66e8cf0304da059...|60d2143d5622c5f67...|             190.4|
|f8fd9bba9a4358344...|a8cace4dc9e5fcc3b...|              90.0|
|b82c4220110dc5c1c...|1355c77463743fb99...|            119.76|
|9ec11b4d685baa761...|ee43cb811614a10e1...|             332.0|
|2142bf7ae61478494...|6681ad9a826614b39...|101.93999999999998|
|da2060eb547f7d8c1...|78b23d5ff07ebe2f0...|             481.5|
|c455144b0c6e78f25...|d687c16dcf223790d...|            

                                                                                

+--------------------+---------------------+-----+
|          product_id|product_category_name|count|
+--------------------+---------------------+-----+
|aca2eb7d00ea1a7b8...|     moveis_decoracao|  520|
|422879e10f4668299...|   ferramentas_jardim|  484|
|99a4788cb24856965...|      cama_mesa_banho|  477|
|389d119b48cf3043d...|   ferramentas_jardim|  390|
|368c6c730842d7801...|   ferramentas_jardim|  388|
|53759a2ecddad2bb8...|   ferramentas_jardim|  373|
|d1c427060a0f73f6b...| informatica_acess...|  332|
|53b36df67ebb7c415...|   relogios_presentes|  321|
|154e7e31ebfa09220...|         beleza_saude|  274|
|3dd2a17168ec895c7...| informatica_acess...|  272|
|2b4609f8948be1887...|         beleza_saude|  255|
|a62e25e09e05e6faf...|   relogios_presentes|  224|
|7c1bd920dbdf22470...|         beleza_saude|  220|
|bb50f2e236e5eea01...|         beleza_saude|  194|
|e0d64dcfaa3b6db5c...|   relogios_presentes|  193|
|5a848e4ab52fd5445...|                 NULL|  190|
|e53e557d5a159f5aa...| informat



+----+-----+------------------+
|year|month|        sum(price)|
+----+-----+------------------+
|2016|    9|            134.97|
|2016|   10| 40325.11000000011|
|2016|   12|              10.9|
|2017|    1|111798.35999999984|
|2017|    2|234223.39999999778|
|2017|    3| 359198.8499999999|
|2017|    4| 340669.6799999994|
|2017|    5| 489338.2500000021|
|2017|    6| 421923.3700000009|
|2017|    7| 481604.5200000014|
|2017|    8| 554699.7000000043|
|2017|    9| 607399.6700000039|
|2017|   10| 648247.6500000064|
|2017|   11| 987765.3700000094|
|2017|   12| 726033.1900000074|
|2018|    1| 924645.0000000115|
|2018|    2|  826437.130000011|
|2018|    3| 953356.2500000111|
|2018|    4| 973534.0900000116|
|2018|    5| 977544.6900000127|
+----+-----+------------------+
only showing top 20 rows



                                                                                

#### Optimize the query

In [9]:
# Caching the data for faster repeated queries
cleaned_data_df.cache()

# Optimized Query: Average order value per customer
avg_order_value_df = cleaned_data_df.groupBy("customer_unique_id").avg("price")
avg_order_value_df.show()

# Remove cache when done
cleaned_data_df.unpersist()




+--------------------+------------------+
|  customer_unique_id|        avg(price)|
+--------------------+------------------+
|02bb0a3e6081abdf2...|              45.9|
|cb8eb8bac1b1e4a09...|             109.0|
|c8ed31310fc440a3f...|            113.83|
|aa2630ebac79aa0c7...|              73.5|
|d1f6f0484c7664289...|              67.0|
|fb8c74f4449e5a62e...|            79.936|
|d78e7b3a054265931...|              8.82|
|8381bb7c1a0fac124...|            159.75|
|7a1596106b2eba910...|              46.0|
|7eeb6421712844795...|19.083333333333332|
|4546caea018ad8c69...| 1.514285714285714|
|43da534e5e6c052df...|15.963999999999999|
|173377321e73c7f04...|              25.9|
|909d4f97b573322c3...|             104.0|
|6bb6ae0812de4c10a...|              65.0|
|f31d2c22ddcad145e...|             43.28|
|ad39475d18fc97e31...|            219.04|
|ca9ff40e3d69e5ffc...|             379.0|
|6e62ba22142e89bac...|              79.0|
|55ab86d9b0939b03a...|             135.0|
+--------------------+------------

                                                                                

DataFrame[product_id: string, order_id: string, customer_id: string, order_status: string, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, order_item_id: int, seller_id: string, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, order_purchase_timestamp: timestamp]