Simple etl to transform oltp csv files to olap csv files

In [31]:
from pathlib import Path
import csv
from datetime import datetime

BASE_DIR = Path().resolve()
OLTP_DIR = BASE_DIR / "sample_data"
OLAP_DIR = BASE_DIR / "olap_data"
OLAP_DIR.mkdir(exist_ok=True)

USERS_CSV = OLTP_DIR / "Users.csv"
FLASH_SALES_CSV = OLTP_DIR / "FlashSales.csv"
PRODUCTS_CSV = OLTP_DIR / "Products.csv"
ORDERS_CSV = OLTP_DIR / "Orders.csv"
ORDER_ITEMS_CSV = OLTP_DIR / "OrderItems.csv"


# Delete OLAP CSVs in olap_data if exist
for olap_file in OLAP_DIR.glob("*.csv"):
    olap_file.unlink()

print("Input:", OLTP_DIR)
print("Output:", OLAP_DIR)

Input: C:\Users\dell\OneDrive\Documents\GitHub\stadvdb-mco2\database\sample_data
Output: C:\Users\dell\OneDrive\Documents\GitHub\stadvdb-mco2\database\olap_data


In [None]:
import math

def read_csv(path):
    with open(path, newline='', encoding='utf-8') as f:
        return list(csv.DictReader(f))


def write_csv(path, rows, headers):
    with open(path, 'w', newline='', encoding='utf-8') as f:
        w = csv.DictWriter(f, fieldnames=headers)
        w.writeheader()
        w.writerows(rows)


def to_unix(ts_str):
    # 'YYYY-MM-DD HH:MM:SS'
    dt = datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S')
    return int(dt.timestamp())

# Load OLTP CSVs
users = read_csv(USERS_CSV)
flash_sales = read_csv(FLASH_SALES_CSV)
products = read_csv(PRODUCTS_CSV)
orders = read_csv(ORDERS_CSV)
order_items = read_csv(ORDER_ITEMS_CSV)

# Indexes for joins
users_by_id = {int(u['user_id']): u for u in users}
products_by_id = {int(p['product_id']): p for p in products}
order_items_by_order = {}
for oi in order_items:
    oid = int(oi['order_id'])
    if oid not in order_items_by_order:
        order_items_by_order[oid] = []
    order_items_by_order[oid].append(oi)

# Choose first item per order (by smallest order_items_id) to satisfy Fact PK (order_id unique)
for oid in order_items_by_order:
    order_items_by_order[oid].sort(key=lambda r: int(r['order_items_id']))


In [None]:
# Write OLAP dimension CSVs

dim_buyer = [
    {"buyer_id": int(u["user_id"]), "user_name": u["user_name"]}
    for u in users if u.get("user_role") == "BUYER"
]
write_csv(OLAP_DIR / "DimBuyer.csv", dim_buyer, ["buyer_id", "user_name"])

dim_seller = [
    {"seller_id": int(u["user_id"]), "user_name": u["user_name"]}
    for u in users if u.get("user_role") == "SELLER"
]
write_csv(OLAP_DIR / "DimSeller.csv", dim_seller, ["seller_id", "user_name"])

dim_product = [
    {
        "product_id": int(p["product_id"]),
        "product_name": p["product_name"],
        "category": p["category"],
        "original_price": p["original_price"],
    }
    for p in products
]
write_csv(OLAP_DIR / "DimProduct.csv", dim_product, ["product_id", "product_name", "category", "original_price"]) 

# distinct timestamps from Orders.created_at and FlashSales start/end
time_ids = set()
for o in orders:
    time_ids.add(to_unix(o["created_at"]))
for fs in flash_sales:
    time_ids.add(to_unix(fs["start_time"]))
    time_ids.add(to_unix(fs["end_time"]))

dim_time = []
for ts in sorted(time_ids):
    dt = datetime.fromtimestamp(ts)
    dim_time.append({
        "time_id": ts,
        "t_hour": dt.hour,
        "t_day": dt.day,
        "t_month": dt.month,
        "t_year": dt.year,
    })
write_csv(OLAP_DIR / "DimTime.csv", dim_time, ["time_id", "t_hour", "t_day", "t_month", "t_year"]) 

# reference DimTime via IDs
dim_flash_sale = []
for fs in flash_sales:
    dim_flash_sale.append({
        "flash_sale_id": int(fs["flash_sale_id"]),
        "name": fs["name"],
        "start_time_id": to_unix(fs["start_time"]),
        "end_time_id": to_unix(fs["end_time"]),
    })
write_csv(OLAP_DIR / "DimFlashSale.csv", dim_flash_sale, ["flash_sale_id", "name", "start_time_id", "end_time_id"]) 

print("Dimensions written to:", OLAP_DIR)

Dimensions written to: C:\Users\dell\OneDrive\Documents\GitHub\stadvdb-mco2\database\olap_data


In [34]:
# Write FactOrders.csv (one row per order using first order item)

fact_orders = []
for o in orders:
    oid = int(o["order_id"])
    buyer_id = int(o["buyer_id"])
    ts = to_unix(o["created_at"])  # time_id
    items = order_items_by_order.get(oid, [])
    if not items:
        continue  # skip orders without items
    first = items[0]
    product_id = int(first["product_id"])
    qty = int(first["quantity_sold"]) if first["quantity_sold"] != "" else 0
    prod = products_by_id.get(product_id)
    if not prod:
        continue
    seller_id = int(prod["seller_id"]) if prod["seller_id"] != "" else None
    flash_sale_id = int(prod["flash_sale_id"]) if prod["flash_sale_id"] != "" else None
    price_per_item = float(prod["price"]) if prod["price"] != "" else 0.0
    total_sale = round(qty * price_per_item, 2)

    fact_orders.append({
        "order_id": oid,
        "product_id": product_id,
        "time_id": ts,
        "buyer_id": buyer_id,
        "seller_id": seller_id,
        "flash_sale_id": flash_sale_id,
        "quantity_sold": qty,
        "price_per_item": f"{price_per_item:.2f}",
        "total_sale": f"{total_sale:.2f}",
    })

write_csv(
    OLAP_DIR / "FactOrders.csv",
    fact_orders,
    [
        "order_id",
        "product_id",
        "time_id",
        "buyer_id",
        "seller_id",
        "flash_sale_id",
        "quantity_sold",
        "price_per_item",
        "total_sale",
    ],
)

print("FactOrders written.")

FactOrders written.
