# Welcome to the Data Engineering Lab
Run the cells below to generate the datasets needed for learning.

In [None]:
import os
import pandas as pd
import requests
import duckdb
from sqlalchemy import create_engine, text
import opendatasets as od 

# --- CONFIGURATION ---
DATA_DIR = "/home/jovyan/data"

# Postgres Credentials
PG_CONFIG = {
    "user": "admin",
    "password": "password", 
    "host": "postgres", 
    "port": "5432",
    "database": "postgres" 
}

# URLs
OLIST_URL = "https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce"
NYC_TAXI_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"

def get_postgres_engine():
    url = f"postgresql://{PG_CONFIG['user']}:{PG_CONFIG['password']}@{PG_CONFIG['host']}:{PG_CONFIG['port']}/{PG_CONFIG['database']}"
    return create_engine(url)

# --- 1. DEFINE SCHEMA ---
DDL_STATEMENTS = """
-- Clean Slate
DROP TABLE IF EXISTS order_items CASCADE;
DROP TABLE IF EXISTS orders CASCADE;
DROP TABLE IF EXISTS products CASCADE;
DROP TABLE IF EXISTS customers CASCADE;
DROP TABLE IF EXISTS sellers CASCADE;

-- Create Tables
CREATE TABLE customers (
    customer_id VARCHAR(32) PRIMARY KEY,
    customer_unique_id VARCHAR(32) NOT NULL,
    customer_zip_code_prefix VARCHAR(10),
    customer_city VARCHAR(100),
    customer_state VARCHAR(5)
);

CREATE TABLE sellers (
    seller_id VARCHAR(32) PRIMARY KEY,
    seller_zip_code_prefix VARCHAR(10),
    seller_city VARCHAR(100),
    seller_state VARCHAR(5)
);

CREATE TABLE products (
    product_id VARCHAR(32) PRIMARY KEY,
    product_category_name VARCHAR(100),
    product_weight_g INT,
    product_length_cm INT,
    product_height_cm INT,
    product_width_cm INT
);

CREATE TABLE orders (
    order_id VARCHAR(32) PRIMARY KEY,
    customer_id VARCHAR(32) REFERENCES customers(customer_id),
    order_status VARCHAR(20),
    order_purchase_timestamp TIMESTAMP,
    order_approved_at TIMESTAMP,
    order_delivered_carrier_date TIMESTAMP,
    order_delivered_customer_date TIMESTAMP,
    order_estimated_delivery_date TIMESTAMP
);

CREATE TABLE order_items (
    order_id VARCHAR(32) REFERENCES orders(order_id),
    order_item_id INT, 
    product_id VARCHAR(32) REFERENCES products(product_id),
    seller_id VARCHAR(32) REFERENCES sellers(seller_id),
    shipping_limit_date TIMESTAMP,
    price DECIMAL(10,2),
    freight_value DECIMAL(10,2),
    PRIMARY KEY (order_id, order_item_id)
);

"""

def setup_postgres_schema(engine):
    print("--- üî® Building Postgres Schema ---")
    with engine.connect() as conn:
        conn.execute(text(DDL_STATEMENTS))
        conn.commit()
    print("‚úÖ Schema created successfully.")

def setup_olist_data(engine):
    print("\n--- üõí Processing Olist Data ---")
    
    target_folder = os.path.join(DATA_DIR, "brazilian-ecommerce")
    if not os.path.exists(target_folder):
        print(f"Downloading Olist dataset...")
        od.download(OLIST_URL, data_dir=DATA_DIR)
    
    # Define explicitly which columns are allowed for each table
    # This acts as a filter to drop the extra columns in the CSV
    table_schemas = {
        "customers": ["customer_id", "customer_unique_id", "customer_zip_code_prefix", "customer_city", "customer_state"],
        "sellers": ["seller_id", "seller_zip_code_prefix", "seller_city", "seller_state"],
        "products": ["product_id", "product_category_name", "product_weight_g", "product_length_cm", "product_height_cm", "product_width_cm"],
        "orders": ["order_id", "customer_id", "order_status", "order_purchase_timestamp", "order_approved_at", "order_delivered_carrier_date", "order_delivered_customer_date", "order_estimated_delivery_date"],
        "order_items": ["order_id", "order_item_id", "product_id", "seller_id", "shipping_limit_date", "price", "freight_value"]
    }

    load_sequence = [
        ("olist_customers_dataset.csv", "customers"),
        ("olist_sellers_dataset.csv", "sellers"),
        ("olist_products_dataset.csv", "products"),
        ("olist_orders_dataset.csv", "orders"),
        ("olist_order_items_dataset.csv", "order_items")
    ]

    for filename, table_name in load_sequence:
        file_path = os.path.join(target_folder, filename)
        if not os.path.exists(file_path):
            print(f"‚ö†Ô∏è Missing {filename}, skipping.")
            continue
            
        print(f"Loading {table_name}...")
        df = pd.read_csv(file_path)
        
        # 1. Rename Columns
        rename_map = {
            "zip_code_prefix": f"{table_name[:-1]}_zip_code_prefix" if table_name in ['customers', 'sellers'] else "zip_code_prefix",
            "city": f"{table_name[:-1]}_city" if table_name in ['customers', 'sellers'] else "city",
            "state": f"{table_name[:-1]}_state" if table_name in ['customers', 'sellers'] else "state"
        }
        
        new_cols = {}
        for col in df.columns:
            if col in rename_map:
                new_cols[col] = rename_map[col]
            # Handle specific CSV vs SQL mismatches
            elif col == 'zip_code_prefix' and table_name == 'customers': new_cols[col] = 'customer_zip_code_prefix'
            elif col == 'city' and table_name == 'customers': new_cols[col] = 'customer_city'
            elif col == 'state' and table_name == 'customers': new_cols[col] = 'customer_state'
            elif col == 'zip_code_prefix' and table_name == 'sellers': new_cols[col] = 'seller_zip_code_prefix'
            elif col == 'city' and table_name == 'sellers': new_cols[col] = 'seller_city'
            elif col == 'state' and table_name == 'sellers': new_cols[col] = 'seller_state'

        df.rename(columns=new_cols, inplace=True)

        # 2. Date Cleaning
        for col in df.columns:
            if 'date' in col or 'timestamp' in col:
                df[col] = pd.to_datetime(df[col], errors='coerce')

        # 3. FILTERING (The Fix)
        # We assume the columns match the names in our table_schemas.
        # We select ONLY the columns that exist in our SQL definition.
        valid_cols = table_schemas[table_name]
        
        # Check if any valid columns are missing from the dataframe (optional safety check)
        missing_cols = [c for c in valid_cols if c not in df.columns]
        if missing_cols:
            print(f"‚ö†Ô∏è Warning: Missing columns {missing_cols} in {filename}. Filling with NULL.")
            for c in missing_cols:
                df[c] = None

        # Keep only the valid columns
        df = df[valid_cols]

        # 4. Insert
        try:
            df.to_sql(table_name, engine, index=False, if_exists='append', chunksize=10000, method='multi')
            print(f"‚úÖ {table_name}: {len(df)} rows loaded.")
        except Exception as e:
            print(f"‚ùå Error loading {table_name}: {e}")
            # If a parent table fails, stop the script because children will fail too
            raise e

def setup_duckdb():
    print("\n--- üöñ Setting up DuckDB (NYC Taxi) ---")
    parquet_path = os.path.join(DATA_DIR, "nyc_taxi_2023_01.parquet")
    if not os.path.exists(parquet_path):
        print("Downloading NYC Taxi Parquet...")
        r = requests.get(NYC_TAXI_URL)
        with open(parquet_path, 'wb') as f:
            f.write(r.content)

    db_path = os.path.join(DATA_DIR, "analytics.duckdb")
    con = duckdb.connect(db_path)
    con.execute(f"CREATE OR REPLACE TABLE taxi_trips AS SELECT * FROM read_parquet('{parquet_path}');")
    count = con.execute("SELECT count(*) FROM taxi_trips").fetchone()[0]
    print(f"‚úÖ DuckDB ready with {count:,} rows.")
    con.close()

if __name__ == "__main__":
    if not os.path.exists(DATA_DIR):
        os.makedirs(DATA_DIR)
        
    try:
        pg_engine = get_postgres_engine()
        setup_postgres_schema(pg_engine) 
        setup_olist_data(pg_engine)
        setup_duckdb()
        print("\nüéâ LAB SETUP COMPLETE üéâ")
    except Exception as e:
        print(f"\n‚ùå Setup Failed: {e}")

In [None]:
import pandas as pd
import duckdb
from sqlalchemy import create_engine

# --- 1. TEST POSTGRES (Relational Check) ---
print("--- üêò Testing PostgreSQL Connection & Joins ---")

# Connect to the Postgres Container
pg_engine = create_engine("postgresql://admin:password@postgres:5432/postgres")

# Run a query that requires JOINING tables
# If this works, your Foreign Keys and Schema are perfect.
sql_query = """
    SELECT 
        c.customer_state,
        COUNT(o.order_id) as total_orders
    FROM orders o
    JOIN customers c ON o.customer_id = c.customer_id
    GROUP BY 1
    ORDER BY 2 DESC
    LIMIT 5;
"""

try:
    df_pg = pd.read_sql(sql_query, pg_engine)
    print("‚úÖ Postgres Query Successful! Top 5 States by Order Volume:")
    display(df_pg)
except Exception as e:
    print(f"‚ùå Postgres Test Failed: {e}")

# --- 2. TEST DUCKDB (Analytics Check) ---
print("\n--- ü¶Ü Testing DuckDB File & Aggregation ---")

# Connect to the persistent file we created
db_path = "/home/jovyan/data/analytics.duckdb"

try:
    con = duckdb.connect(db_path)
    
    # Run a fast aggregation on the Taxi data
    # We use .df() to return a Pandas DataFrame directly
    df_duck = con.execute("""
        SELECT 
            count(*) as total_rides,
            avg(total_amount) as avg_fare,
            max(trip_distance) as max_distance
        FROM taxi_trips
    """).df()
    
    print("‚úÖ DuckDB Query Successful! Taxi Data Summary:")
    display(df_duck)
    con.close()
except Exception as e:
    print(f"‚ùå DuckDB Test Failed: {e}")