In [61]:
import os

DB_CONFIG = {
    "host": "localhost",
    "port": 5432,
    "user": "postgres",
    "password": "admin",
    "database": "nspcc_dwh"
}

DATA_DIR = os.path.abspath(os.path.join(os.getcwd(), '..', 'data'))  # Adjust path if needed
SCHEMA = "nspcc"

In [62]:
# load.py

import json

import os
import json
import time
import psycopg2
# from config.db_config import DB_CONFIG

def load_json_files():
    all_data = []
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()

    for file in os.listdir(DATA_DIR):
        if not file.endswith(".json"):
            continue

        file_path = os.path.join(DATA_DIR, file)
        print(f"Processing file: {file_path}")
        start_time = time.time()
        record_count = 0
        status = 'completed'
        error_message = None

        try:
            with open(file_path, 'r') as f:
                data = json.load(f)
                all_data.extend(data)
                record_count = len(data)
        except Exception as e:
            status = 'failed'
            error_message = str(e)
            print(f"❌ JSON decode error in file: {file} -> {e}")

        processing_time = int(time.time() - start_time)

        # Insert into processed_file_log
        cursor.execute(f"""
            INSERT INTO nspcc.file_error_logging (file_name, record_count, processing_time_seconds, status, error_message)
            VALUES (%s, %s, %s, %s, %s)
        """, (file, record_count, processing_time, status, error_message))

    conn.commit()
    cursor.close()
    conn.close()
    return all_data



In [63]:
# transform.py

from datetime import datetime

def flatten_data(raw_data):
    fact_rows, dim_customers, dim_regions, dim_payments = [], [], [], []

    for entry in raw_data:
        # print(entry)
        cust_id = entry.get("customer_id")
        profile = entry.get("customer_profile", {})
        region = profile.get("region", "Unknown")

        dim_customers.append({
            "customer_id": cust_id,
            "email": entry.get("email"),
            "shirt_size": profile.get("shirt_size"),
            "donates_to_charity": profile.get("donates_to_charity"),
            "bikes_to_work": profile.get("bikes_to_work")
        })
        is_london = "london" in region.strip().lower()

        dim_regions.append({
            "region": region,
            "is_london": is_london
        })

        donations = entry.get("donations") or []
        for d in donations:
            payment_date = d.get("payment_date")
            dim_payments.append({"payment_method": d.get("payment_method")})

            fact_rows.append({
                "payment_id": d.get("payment_id"),
                "customer_id": cust_id,
                "amount": d.get("amount"),
                "status": d.get("status"),
                "payment_method": d.get("payment_method"),
                "payment_date": payment_date,
                "region": region
            })

    return fact_rows, dim_customers, dim_payments, dim_regions



In [75]:
#load.py

import psycopg2
from datetime import datetime

# SCD2 Type Handling for Dimension tables
def insert_scd2(cursor, table, keys, record):
    schema_table = f"{SCHEMA}.{table}"
    cursor.execute(f"SELECT 1 FROM {schema_table} WHERE {' AND '.join([f'{k}=%s' for k in keys])} AND is_active='Y'",
        [record[k] for k in keys]
    )
    print(
    f"SELECT 1 FROM {schema_table} WHERE {' AND '.join([f'{k}=%s' for k in keys])} AND is_active='Y'",
    [record[k] for k in keys]
    )
    if cursor.fetchone():
        schema_table = f"{SCHEMA}.{table}"
        cursor.execute(f"""
            UPDATE {schema_table} SET effective_end_date=%s, is_active='N'
            WHERE {' AND '.join([f'{k}=%s' for k in keys])} AND is_active='Y'
        """, [datetime.today()] + [record[k] for k in keys])

    columns = ', '.join(record.keys())
    values = ', '.join(['%s'] * len(record))
    schema_table = f"{SCHEMA}.{table}"
    cursor.execute(f"""
        INSERT INTO {schema_table} ({columns}, effective_start_date, effective_end_date, is_active)
        VALUES ({values}, %s, NULL, 'Y')
    """, list(record.values()) + [datetime.today()])

def load_to_db(fact_rows, dim_customers, dim_payments, dim_regions):
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()

    unique_regions = {}
    for rec in dim_regions:
        unique_regions[rec['region']] = rec
    
    
    unique_payments = {}
    for rec in dim_payments:
        unique_payments[rec['payment_method']] = rec
    
    # Load dimension tables with unique records
    for rec in unique_regions.values():
        insert_scd2(cursor, "dim_region", ["region"], rec)
        
    for rec in dim_customers:
        insert_scd2(cursor, "dim_customer", ["customer_id"], rec)
        
    for rec in unique_payments.values():
        insert_scd2(cursor, "dim_payment_method", ["payment_method"], rec)


    for row in fact_rows:
        # 1. Get surrogate keys from dimension tables
        cursor.execute(f"SELECT customer_key FROM {SCHEMA}.dim_customer WHERE customer_id = %s AND is_active = TRUE", (row["customer_id"],))
        customer_key = cursor.fetchone()
        
        cursor.execute(f"SELECT region_key FROM {SCHEMA}.dim_region WHERE region = %s AND is_active = TRUE", (row["region"],))
        region_key = cursor.fetchone()
    
        cursor.execute(f"SELECT payment_method_key FROM {SCHEMA}.dim_payment_method WHERE payment_method = %s AND is_active = TRUE", (row["payment_method"],))
        payment_method_key = cursor.fetchone()
    
        cursor.execute(f"SELECT date_id FROM {SCHEMA}.dim_date WHERE full_date = %s", (row["payment_date"],))
        date_key = cursor.fetchone()
    
        if not (customer_key and region_key and payment_method_key and date_key):
            print(f"Skipping row {row['payment_id']} due to missing dimension keys.")
            continue
    
        # 2. Insert into fact_donations
        cursor.execute(f"""
            INSERT INTO {SCHEMA}.fact_donations (
                payment_id, customer_key, date_key, payment_method_key, region_key,
                amount, status
            )
            VALUES (%s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (payment_id) DO NOTHING
        """, (
            row["payment_id"],
            customer_key[0],
            date_key[0],
            payment_method_key[0],
            region_key[0],
            row["amount"],
            row["status"]
        ))

    conn.commit()
    cursor.close()
    conn.close()


In [76]:
# main.py

import logging

logging.basicConfig(filename='etl.log', level=logging.INFO)

def main():
    logging.info("Starting ETL job")
    raw_data = load_json_files()
    logging.info(f"Loaded {len(raw_data)} customer records")
    fact_rows, dim_customers, dim_payments, dim_regions = flatten_data(raw_data)
    load_to_db(fact_rows, dim_customers, dim_payments, dim_regions)
    logging.info("ETL job complete")

main()


Processing file: C:\Sobz\Learning\Workshops\Interview_Tasks\Solutions\nspcc_etl\data\api_response_0.json
Processing file: C:\Sobz\Learning\Workshops\Interview_Tasks\Solutions\nspcc_etl\data\api_response_1.json
Processing file: C:\Sobz\Learning\Workshops\Interview_Tasks\Solutions\nspcc_etl\data\api_response_10.json
Processing file: C:\Sobz\Learning\Workshops\Interview_Tasks\Solutions\nspcc_etl\data\api_response_11.json
❌ JSON decode error in file: api_response_11.json -> Extra data: line 1 column 5 (char 4)
Processing file: C:\Sobz\Learning\Workshops\Interview_Tasks\Solutions\nspcc_etl\data\api_response_12.json
Processing file: C:\Sobz\Learning\Workshops\Interview_Tasks\Solutions\nspcc_etl\data\api_response_13.json
Processing file: C:\Sobz\Learning\Workshops\Interview_Tasks\Solutions\nspcc_etl\data\api_response_14.json
Processing file: C:\Sobz\Learning\Workshops\Interview_Tasks\Solutions\nspcc_etl\data\api_response_15.json
Processing file: C:\Sobz\Learning\Workshops\Interview_Tasks\Solu