# Olist E-commerce: End-to-End Data Engineering Pipeline
### By Sunil Makkar | January 13, 2026
**Links:** [GitHub](https://github.com/sunilmakkar) | [Medium Blog](https://medium.com/@sunil.makkar97)

---

## üìã Project Executive Summary
This project builds a scalable, production-grade data pipeline for the Olist E-commerce dataset (the largest public dataset of Brazilian e-commerce). The pipeline transitions from raw, messy CSV data to clean, validated business intelligence using the **Modern Data Stack**:
* **Storage & Compute:** DuckDB (OLAP)
* **Transformation:** dbt (Data Build Tool)
* **Quality Assurance:** dbt Singular & Generic Testing
* **Visualization:** Plotly & Pandas

The goal is to provide a "single source of truth" for regional sales performance and customer purchasing behavior.

<br>
<hr style="border:2px solid gray"> </hr>
<br>

# üèóÔ∏è Phase 1: Data Ingestion & Cleaning
### The Ingestion Layer

This section handles the initial ETL (Extract, Transform, Load) process. We leverage **Pandas** for rapid cleaning of Brazilian-specific characters and date formats, then persist the data into **DuckDB**. 

**Key Objectives:**
* Schema standardization (Lowercasing, snake_case conversion).
* Handling date-time parsing for all Olist transaction timestamps.
* Establishing the `raw` schema within the DuckDB instance.

**Status:** ‚úÖ Ingestion Complete | Raw data persisted in `olist.duckdb`.

<hr style="border:2px solid gray"> </hr>
<br>

In [5]:
import pandas as pd


In [6]:
import duckdb

# This creates a file in your project folder called olist.duckdb
# Think of this as your "Snowflake Account"
con = duckdb.connect("olist.duckdb")

# To verify it's working, let's just run a dummy query
print(con.execute("SELECT 'DuckDB is Ready'").df())

  'DuckDB is Ready'
0   DuckDB is Ready


In [7]:
import os

#1. Create a schema for raw data to keep it separate from dbt models
con.execute("CREATE SCHEMA IF NOT EXISTS raw")

path = "data"
# List all csv files in your data folder
files = [f for f in os.listdir(path) if f.endswith('.csv')]

for file in files:
    # High-quality naming: raw.customers, raw.orders, etc
    # This removes 'olist_', '_dataset', and '.csv' from the filename
    table_name = file.replace('olist_', '').replace('_dataset', '').replace('.csv', '')
    full_table_path = f"raw.{table_name}"

    # Read CSV with Pandas
    temp_df = pd.read_csv(os.path.join(path, file))

    # Write to DuckDB
    # DuckDB can read the pandas df 'temp_df' directly
    con.execute(f"CREATE OR REPLACE TABLE {full_table_path} AS SELECT * FROM temp_df")

    print(f"Ingested {file} into {full_table_path}")

# 2. Final verification of the 'Raw' Layer
print("\n--- Tables currently in RAW schema ---")
print(con.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'raw'").df())

Ingested olist_sellers_dataset.csv into raw.sellers
Ingested product_category_name_translation.csv into raw.product_category_name_translation
Ingested olist_orders_dataset.csv into raw.orders
Ingested olist_order_items_dataset.csv into raw.order_items
Ingested olist_customers_dataset.csv into raw.customers
Ingested olist_geolocation_dataset.csv into raw.geolocation
Ingested olist_order_payments_dataset.csv into raw.order_payments
Ingested olist_order_reviews_dataset.csv into raw.order_reviews
Ingested olist_products_dataset.csv into raw.products

--- Tables currently in RAW schema ---
                          table_name
0                          customers
1                        geolocation
2                             orders
3                        order_items
4                     order_payments
5                      order_reviews
6                           products
7  product_category_name_translation
8                            sellers
9                           v_orders


In [8]:
# Create a view that casts the types properly at the database level
con.execute("""
    CREATE OR REPLACE VIEW raw.v_orders AS 
    SELECT 
        *, 
        CAST(order_purchase_timestamp AS TIMESTAMP) as order_purchase_timestamp_typed
    FROM raw.orders
""")

<_duckdb.DuckDBPyConnection at 0x10c8c6df0>

<br>
<hr style="border:2px solid gray"> </hr>
<br>

# üîç Phase 2: Data Quality & Integrity
### The Validation Layer

Before committing to downstream analytics, we perform a deep-dive audit of the data relationships. This ensures that our **dbt Staging** models are accurately reflecting reality.

**Key Checks:**
* **Referential Integrity:** Ensuring every order item maps to a valid order and product.
* **Anomaly Detection:** Identifying mismatched totals between item prices and payment values.
* **dbt Logic Verification:** Validating that custom SQL transformations handle Olist's complex voucher system correctly.

**Summary:** üõ°Ô∏è Quality checks passed. Dataset verified as statistically sound for reporting.

<br>
<hr style="border:2px solid gray"> </hr>
<br>

In [10]:
import duckdb

# Connect to the same database the dbt project is using
con = duckdb.connect('olist.duckdb')

# Query to find Portuguese categories that don't have an English match
query = """
SELECT
    category_name_pt,
    COUNT(*) as missing_row_count
FROM main.int_order_items_products
WHERE category_name_en IS NULL
    AND category_name_pt IS NOT NULL
GROUP BY 1
ORDER BY 2 DESC;
"""

df_missing = con.execute(query).df()
print(df_missing)

Empty DataFrame
Columns: [category_name_pt, missing_row_count]
Index: []


<br>
<hr style="border:2px solid gray"> </hr>
<br>

# üìä Phase 3: Business Intelligence & Analytics
### The Presentation Layer

In this final phase, we query the `marts` tables created in dbt to drive executive-level insights. We focus on the "Three C's": **C**oncentration (Geography), **C**hronology (Growth), and **C**ash (Payment behavior).

* **Growth:** Analyzing the Black Friday 2017 revenue spike.
* **Geography:** Visualizing the S√£o Paulo (SP) market dominance.
* **Behavior:** Mapping payment methods to high-value product categories.

<br>
<hr style="border:2px solid gray"> </hr>
<br>

In [11]:
# Cell 1: Connect in Read-Only Mode
import duckdb
import pandas as pd
import plotly.express as px

con = duckdb.connect

In [14]:
# Cell 2: Top 10 Product Categories by Revenue
# Test translations and joins are working

query_top_categories = """
SELECT
    category_name_en,
    ROUND(SUM(price), 2) as total_revenue,
    COUNT(DISTINCT order_id) as total_orders
FROM main.fct_orders_items
GROUP BY 1
ORDER BY total_revenue DESC
LIMIT 10
"""

df_top = con.execute(query_top_categories).df()

fig = px.bar(df_top, x='category_name_en', y='total_revenue',
             title='Top 10 Categories by Revenue',
             labels={'category_name_en': 'Category', 'total_revenue': 'Revenue ($)'},
             template='plotly_white')

fig.show()

In [15]:
# Cell 3: Monthly Sales Trends
# This proves your timestamp standardization in the staging layer was worth the effort.
query_trends = """
SELECT 
    date_trunc('month', purchased_at::TIMESTAMP) as sales_month,
    SUM(price) as monthly_revenue
FROM main.fct_orders_items
GROUP BY 1
ORDER BY 1
"""

df_trends = con.execute(query_trends).df()

fig_trend = px.line(df_trends, x='sales_month', y='monthly_revenue', 
                    title='Monthly Revenue Growth',
                    template='plotly_white')
fig_trend.show()

In [16]:
# Cell 4: Loyal Customers By State
# This identifies which regions have the highest "Customer Lifetime Value" (CLV). 
query_loyalty = """
SELECT 
    c.customer_state,
    COUNT(DISTINCT f.order_id) as total_orders,
    ROUND(SUM(f.price), 2) as total_spend,
    ROUND(SUM(f.price) / COUNT(DISTINCT f.customer_id), 2) as spend_per_customer
FROM main.fct_orders_items f
JOIN main.dim_customers c ON f.customer_id = c.customer_id
GROUP BY 1
ORDER BY total_spend DESC
LIMIT 10
"""
df_loyalty = con.execute(query_loyalty).df()
fig_loyalty = px.treemap(df_loyalty, path=['customer_state'], values='total_spend',
                         title='Revenue Concentration by Brazilian State')
fig_loyalty.show()

In [17]:
# Cell 5: Payment Methods for Top Categories
# This helps the business understand if specific expensive categories (like watches_gifts) rely more on credit card installments.

query_payments = """
WITH top_cats AS (
    SELECT category_name_en
    FROM main.fct_orders_items
    GROUP BY 1 ORDER BY SUM(price) DESC LIMIT 5
)
SELECT 
    f.category_name_en,
    p.payment_type,
    COUNT(*) as transaction_count
FROM main.fct_orders_items f
JOIN main.stg_olist__order_payments p ON f.order_id = p.order_id
WHERE f.category_name_en IN (SELECT category_name_en FROM top_cats)
GROUP BY 1, 2
"""
df_pay = con.execute(query_payments).df()
fig_pay = px.bar(df_pay, x="category_name_en", y="transaction_count", 
                 color="payment_type", title="Payment Method Split for Top 5 Categories",
                 barmode="group")
fig_pay.show()

In [18]:
con.close()

<br>
<hr style="border:2px solid gray"> </hr>
<br>

## üèÅ Project: Pipeline Complete
This concludes the **Olist E-commerce Pipeline**. We have successfully navigated the full Data Engineering lifecycle:
1. **Ingested** raw CSVs into a structured DuckDB environment.
2. **Transformed** and cleaned data using dbt (Staging -> Marts).
3. **Validated** business logic via dbt singular and generic tests.
4. **Visualized** growth trends and customer behavior for stakeholders.

**The pipeline is now production-ready.**

<br>
<hr style="border:2px solid gray"> </hr>
<br>