Install PySpark

In [1]:
!pip -q install pyspark
print("PySpark installed ✅")


PySpark installed ✅


**Start Spark + Load CSV**

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("OnlineRetailPipeline").getOrCreate()

df_raw = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("/content/online_retail.csv")
)

print("Raw rows:", df_raw.count())
df_raw.printSchema()
df_raw.show(5, truncate=False)


Raw rows: 541909
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/2010 8:26|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER    

**Clean Data**

In [3]:
from pyspark.sql.functions import col, trim, to_timestamp

df_clean = (
    df_raw
    .withColumn("InvoiceNo", trim(col("InvoiceNo")))
    .withColumn("StockCode", trim(col("StockCode")))
    .withColumn("Description", trim(col("Description")))
    .withColumn("Country", trim(col("Country")))
    .withColumn("Quantity", col("Quantity").cast("int"))
    .withColumn("UnitPrice", col("UnitPrice").cast("double"))
    .withColumn("CustomerID", col("CustomerID").cast("int"))
    # Format like: 12/1/2010 8:26
    .withColumn("InvoiceTS", to_timestamp(col("InvoiceDate"), "M/d/yyyy H:mm"))
    # Cleaning rules
    .filter(col("CustomerID").isNotNull())
    .filter(col("Quantity") > 0)
    .filter(col("UnitPrice") > 0)
    .dropDuplicates(["InvoiceNo", "StockCode", "InvoiceDate"])
    .withColumn("total_amount", col("Quantity") * col("UnitPrice"))
)

print("Clean rows:", df_clean.count())
df_clean.select("InvoiceDate", "InvoiceTS").show(5, truncate=False)


Clean rows: 387846
+---------------+-------------------+
|InvoiceDate    |InvoiceTS          |
+---------------+-------------------+
|12/1/2010 8:34 |2010-12-01 08:34:00|
|12/1/2010 8:45 |2010-12-01 08:45:00|
|12/1/2010 10:39|2010-12-01 10:39:00|
|12/1/2010 11:41|2010-12-01 11:41:00|
|12/1/2010 12:08|2010-12-01 12:08:00|
+---------------+-------------------+
only showing top 5 rows


**Check Missing/Null Values**

In [4]:
from pyspark.sql.functions import sum

null_counts = df_clean.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df_clean.columns
])

null_counts.show(truncate=False)


+---------+---------+-----------+--------+-----------+---------+----------+-------+---------+------------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|InvoiceTS|total_amount|
+---------+---------+-----------+--------+-----------+---------+----------+-------+---------+------------+
|0        |0        |0          |0       |0          |0        |0         |0      |0        |0           |
+---------+---------+-----------+--------+-----------+---------+----------+-------+---------+------------+



**Save Cleaned Dataset**

In [5]:
df_clean.write.mode("overwrite").parquet("/content/out_clean_retail")
print("Saved cleaned dataset ✅")


Saved cleaned dataset ✅


**Create FACT + DIMENSIONAL Tables (Star Schema)**

In [6]:
from pyspark.sql.functions import date_format, first, year, month, dayofmonth

# FACT
fact_orders = (
    df_clean
    .withColumn("date_key", date_format(col("InvoiceTS"), "yyyyMMdd").cast("int"))
    .select(
        col("InvoiceNo").alias("invoice_no"),
        col("StockCode").alias("product_id"),
        col("CustomerID").alias("customer_id"),
        col("date_key"),
        col("Quantity").alias("quantity"),
        col("UnitPrice").alias("unit_price"),
        col("total_amount")
    )
)

# DIM CUSTOMER
dim_customer = (
    df_clean
    .groupBy("CustomerID")
    .agg(first("Country").alias("country"))
    .select(col("CustomerID").alias("customer_id"), col("country"))
)

# DIM PRODUCT
dim_product = (
    df_clean
    .groupBy("StockCode")
    .agg(first("Description").alias("product_name"))
    .select(col("StockCode").alias("product_id"), col("product_name"))
)

# DIM DATE (unique by date_key)
dim_date = (
    df_clean
    .withColumn("date_key", date_format(col("InvoiceTS"), "yyyyMMdd").cast("int"))
    .withColumn("year", year(col("InvoiceTS")))
    .withColumn("month", month(col("InvoiceTS")))
    .withColumn("day", dayofmonth(col("InvoiceTS")))
    .select("date_key", "year", "month", "day")
    .dropDuplicates(["date_key"])
)

print("fact_orders:", fact_orders.count())
print("dim_customer:", dim_customer.count())
print("dim_product:", dim_product.count())
print("dim_date:", dim_date.count())


fact_orders: 387846
dim_customer: 4338
dim_product: 3665
dim_date: 305


**Save FACT + DIM tables (Parquet)**

In [7]:
fact_orders.write.mode("overwrite").parquet("/content/out_fact_orders")
dim_customer.write.mode("overwrite").parquet("/content/out_dim_customer")
dim_product.write.mode("overwrite").parquet("/content/out_dim_product")
dim_date.write.mode("overwrite").parquet("/content/out_dim_date")

print("Saved fact + dimension tables ✅")


Saved fact + dimension tables ✅


**Install PostgreSQL + DB Libraries**

In [8]:
!apt-get -y update > /dev/null
!apt-get -y install postgresql postgresql-contrib > /dev/null
!service postgresql start

!pip -q install psycopg2-binary sqlalchemy pandas
print("PostgreSQL installed + started ✅")


W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
 * Starting PostgreSQL 14 database server
   ...done.
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.2/4.2 MB[0m [31m42.6 MB/s[0m eta [36m0:00:00[0m
[?25hPostgreSQL installed + started ✅


**Create DB + Warehouse Schema + Tables**

In [9]:
from sqlalchemy import create_engine, text

# Create user/db (safe re-run)
!sudo -u postgres psql -tc "SELECT 1 FROM pg_roles WHERE rolname='de_user';" | grep -q 1 || sudo -u postgres psql -c "CREATE USER de_user WITH PASSWORD 'de_pass';"
!sudo -u postgres psql -tc "SELECT 1 FROM pg_database WHERE datname='retail_dw';" | grep -q 1 || sudo -u postgres psql -c "CREATE DATABASE retail_dw OWNER de_user;"

engine = create_engine("postgresql+psycopg2://de_user:de_pass@localhost:5432/retail_dw")

with engine.begin() as conn:
    conn.execute(text("DROP SCHEMA IF EXISTS dw CASCADE;"))
    conn.execute(text("CREATE SCHEMA dw AUTHORIZATION de_user;"))

ddl = text("""
CREATE TABLE dw.dim_customer (
  customer_id INT PRIMARY KEY,
  country TEXT
);
CREATE TABLE dw.dim_product (
  product_id TEXT PRIMARY KEY,
  product_name TEXT
);
CREATE TABLE dw.dim_date (
  date_key INT PRIMARY KEY,
  year INT,
  month INT,
  day INT
);
CREATE TABLE dw.fact_orders (
  invoice_no TEXT,
  product_id TEXT REFERENCES dw.dim_product(product_id),
  customer_id INT REFERENCES dw.dim_customer(customer_id),
  date_key INT REFERENCES dw.dim_date(date_key),
  quantity INT,
  unit_price DOUBLE PRECISION,
  total_amount DOUBLE PRECISION
);
""")

with engine.begin() as conn:
    conn.execute(ddl)

print("Warehouse created ✅")


CREATE ROLE
CREATE DATABASE
Warehouse created ✅


**Load Tables into PostgreSQL (Dims → Fact)**

In [10]:
# Load dimensions
dim_customer.toPandas().to_sql("dim_customer", engine, schema="dw",
                               if_exists="append", index=False, method="multi")

dim_product.toPandas().to_sql("dim_product", engine, schema="dw",
                              if_exists="append", index=False, method="multi")

dim_date.toPandas().to_sql("dim_date", engine, schema="dw",
                           if_exists="append", index=False, method="multi")

print("Dimensions loaded ✅")

# Load fact in chunks
fact_pd = fact_orders.toPandas()

chunk_size = 50000
for i in range(0, len(fact_pd), chunk_size):
    fact_pd.iloc[i:i+chunk_size].to_sql("fact_orders", engine, schema="dw",
                                        if_exists="append", index=False, method="multi")
    print(f"Loaded {min(i+chunk_size, len(fact_pd))}/{len(fact_pd)} fact rows")

print("Fact loaded ✅")


Dimensions loaded ✅
Loaded 50000/387846 fact rows
Loaded 100000/387846 fact rows
Loaded 150000/387846 fact rows
Loaded 200000/387846 fact rows
Loaded 250000/387846 fact rows
Loaded 300000/387846 fact rows
Loaded 350000/387846 fact rows
Loaded 387846/387846 fact rows
Fact loaded ✅


**Validate Counts + Run 1 Query**

In [11]:
import pandas as pd

print(pd.read_sql("SELECT COUNT(*) AS dim_customer_cnt FROM dw.dim_customer;", engine))
print(pd.read_sql("SELECT COUNT(*) AS dim_product_cnt  FROM dw.dim_product;", engine))
print(pd.read_sql("SELECT COUNT(*) AS dim_date_cnt     FROM dw.dim_date;", engine))
print(pd.read_sql("SELECT COUNT(*) AS fact_orders_cnt  FROM dw.fact_orders;", engine))

pd.read_sql("""
SELECT c.country, ROUND(SUM(f.total_amount)::numeric, 2) AS revenue
FROM dw.fact_orders f
JOIN dw.dim_customer c ON f.customer_id = c.customer_id
GROUP BY c.country
ORDER BY revenue DESC
LIMIT 10;
""", engine)



   dim_customer_cnt
0              4338
   dim_product_cnt
0             3665
   dim_date_cnt
0           305
   fact_orders_cnt
0           387846


Unnamed: 0,country,revenue
0,United Kingdom,7250910.12
1,Netherlands,285446.34
2,EIRE,265245.96
3,Germany,227618.61
4,France,208820.1
5,Australia,139810.75
6,Spain,60247.94
7,Switzerland,56419.29
8,Belgium,41551.26
9,Sweden,38367.83


**Add Data Quality Checks in PostgreSQL**

In [12]:
import pandas as pd

# 1) Orphan checks (FK integrity)
pd.read_sql("""
SELECT COUNT(*) AS orphan_products
FROM dw.fact_orders f
LEFT JOIN dw.dim_product p ON f.product_id = p.product_id
WHERE p.product_id IS NULL;
""", engine)


Unnamed: 0,orphan_products
0,0


In [13]:
pd.read_sql("""
SELECT COUNT(*) AS orphan_customers
FROM dw.fact_orders f
LEFT JOIN dw.dim_customer c ON f.customer_id = c.customer_id
WHERE c.customer_id IS NULL;
""", engine)


Unnamed: 0,orphan_customers
0,0


In [14]:
pd.read_sql("""
SELECT COUNT(*) AS orphan_dates
FROM dw.fact_orders f
LEFT JOIN dw.dim_date d ON f.date_key = d.date_key
WHERE d.date_key IS NULL;
""", engine)


Unnamed: 0,orphan_dates
0,0


**Add Indexes (performance improvement)**

In [15]:
from sqlalchemy import text

with engine.begin() as conn:
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_fact_customer ON dw.fact_orders(customer_id);"))
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_fact_product  ON dw.fact_orders(product_id);"))
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_fact_date     ON dw.fact_orders(date_key);"))

print("Indexes created ✅")


Indexes created ✅


**Install dbt for Postgres**

In [17]:
!pip -q install dbt-postgres
print("dbt-postgres installed ✅")


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m114.4/114.4 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m172.5/172.5 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m87.7/87.7 kB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m95.1/95.1 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m27.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m144.9/144.9 kB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m442.7/442.7 kB[0m [31m28.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m162.8/162.8 kB[0m [31m11.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

**Create a dbt project**

In [18]:
!dbt init retail_dbt --profiles-dir /content


[0m15:25:56  Running with dbt=1.11.2
[0m15:25:57  
Your new dbt project "retail_dbt" was created!

For more information on how to configure the profiles.yml file,
please consult the dbt documentation here:

  https://docs.getdbt.com/docs/configure-your-profile

One more thing:

Need help? Don't hesitate to reach out to us via GitHub issues or on Slack:

  https://community.getdbt.com/

Happy modeling!

[0m15:25:57  Setting up your profile.
Which database would you like to use?
[1] postgres

(Don't see the one you want? https://docs.getdbt.com/docs/available-adapters)

Enter a number: 1
host (hostname for the instance): localhost
port [5432]: 5432
user (dev username): de_user
pass (dev password): 
dbname (default database that dbt will build objects in): retail_dw
schema (default schema that dbt will build objects in): analytics
threads (1 or more) [1]: 4
[0m15:29:50  Profile retail_dbt written to /content/profiles.yml using target's profile_template.yml and your supplied values. Ru

In [20]:
!mkdir -p /content/retail_dbt/models/staging
!mkdir -p /content/retail_dbt/models/marts


**Create sources.yml**

In [21]:
sources_yml = """
version: 2

sources:
  - name: dw
    schema: dw
    tables:
      - name: fact_orders
      - name: dim_customer
      - name: dim_product
      - name: dim_date
"""

with open("/content/retail_dbt/models/staging/sources.yml", "w") as f:
    f.write(sources_yml)

print("sources.yml created ✅")


sources.yml created ✅


**Create staging models**

In [24]:
#stg_fact_orders.sql
stg_fact_orders = """
select
  invoice_no,
  product_id,
  customer_id,
  date_key,
  quantity,
  unit_price,
  total_amount
from {{ source('dw','fact_orders') }}
"""

with open("/content/retail_dbt/models/staging/stg_fact_orders.sql", "w") as f:
    f.write(stg_fact_orders)


In [25]:
#stg_dim_customer.sql
stg_dim_customer = """
select
  customer_id,
  country
from {{ source('dw','dim_customer') }}
"""

with open("/content/retail_dbt/models/staging/stg_dim_customer.sql", "w") as f:
    f.write(stg_dim_customer)


In [26]:
#stg_dim_product.sql
stg_dim_product = """
select
  product_id,
  product_name
from {{ source('dw','dim_product') }}
"""

with open("/content/retail_dbt/models/staging/stg_dim_product.sql", "w") as f:
    f.write(stg_dim_product)


In [28]:
#stg_dim_date.sql
stg_dim_date = """
select
  date_key,
  year,
  month,
  day
from {{ source('dw','dim_date') }}
"""

with open("/content/retail_dbt/models/staging/stg_dim_date.sql", "w") as f:
    f.write(stg_dim_date)

print("Staging models created ✅")


Staging models created ✅


**Create mart model**

In [30]:
#fct_revenue_by_country.sql
mart_sql = """
select
  c.country,
  round(sum(f.total_amount)::numeric, 2) as revenue
from {{ ref('stg_fact_orders') }} f
join {{ ref('stg_dim_customer') }} c
  on f.customer_id = c.customer_id
group by c.country
order by revenue desc
"""

with open("/content/retail_dbt/models/marts/fct_revenue_by_country.sql", "w") as f:
    f.write(mart_sql)

print("Mart model created ✅")


Mart model created ✅


**Verify your mart in Postgres**

In [34]:
import pandas as pd

pd.read_sql("SELECT * FROM analytics.fct_revenue_by_country LIMIT 10;", engine)


Unnamed: 0,country,revenue
0,United Kingdom,7250910.12
1,Netherlands,285446.34
2,EIRE,265245.96
3,Germany,227618.61
4,France,208820.1
5,Australia,139810.75
6,Spain,60247.94
7,Switzerland,56419.29
8,Belgium,41551.26
9,Sweden,38367.83
