In [1]:
# Imports & Configuration
import os
import zipfile
import pandas as pd
from kaggle.api.kaggle_api_extended import KaggleApi
from sqlalchemy import text
from shutil import rmtree

# --- IMPORT SHARED UTILS ---
%load_ext autoreload
%autoreload 2
from db_utils import get_engine

# Config
DATASET_NAME = 'olistbr/brazilian-ecommerce'
DATA_DIR = '../data'

In [2]:
# ETL Functions

def download_data():
    """Authenticates with Kaggle and downloads the Olist dataset from scratch."""
    
    print(f"‚¨áÔ∏è Downloading dataset '{DATASET_NAME}' from Kaggle...")
    
    if os.path.exists(DATA_DIR):
        rmtree(DATA_DIR)
    os.makedirs(DATA_DIR)

    api = KaggleApi()
    api.authenticate()
    
    api.dataset_download_files(DATASET_NAME, path=DATA_DIR, unzip=True)
    print("‚úÖ Download and extraction complete.")

def process_and_load(engine):
    """Reads CSVs, cleans table names, and loads them into PostgreSQL."""
    print("üîÑ Starting data ingestion to PostgreSQL...")
    
    # Get list of CSVs
    csv_files = [f for f in os.listdir(DATA_DIR) if f.endswith('.csv')]
    
    for filename in csv_files:
        # Construct table name: olist_orders_dataset.csv -> orders
        table_name = filename.replace('olist_', '').replace('_dataset.csv', '').replace('.csv', '')
        file_path = os.path.join(DATA_DIR, filename)
        print(f"   ‚û°Ô∏è Processing '{table_name}'...")
        
        try:
            df = pd.read_csv(file_path)
            # Load to DB (replace if exists)
            df.to_sql(table_name, engine, if_exists='replace', index=False)
            print(f"      OK: Loaded {len(df)} rows.")
        except Exception as e:
            print(f"      ‚ùå Error loading {filename}: {e}")

def check_database(engine):
    """Verifies that tables were created successfully."""
    print("\nüîé Verifying database content...")
    
    with engine.connect() as conn:
        # –í–ò–ü–†–ê–í–õ–ï–ù–ù–Ø –¢–£–¢: –æ–±–≥–æ—Ä—Ç–∞—î–º–æ —Ä—è–¥–æ–∫ —É text()
        query = text("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname='public';")
        result = conn.execute(query)
        tables = [row[0] for row in result]
        
    print(f"‚úÖ Tables in database ({len(tables)}): {', '.join(tables)}")

In [3]:
# Execution (Entry Point)

def main():
    engine = None
    try:
        # 1. Prepare Data
        download_data()
        # 2. Connect
        engine = get_engine()
        # 3. Ingest
        process_and_load(engine)
        # 4. Verify
        check_database(engine)
        print("\nüèÅ ETL Pipeline Finished Successfully.")
    except Exception as e:
        print(f"\n‚ùå Critical Pipeline Error: {e}")
    finally:
        if engine:
            engine.dispose()
            print("üîå Database connection closed.")

# --- ENTRY POINT ---
if __name__ == "__main__":
    main()

‚¨áÔ∏è Downloading dataset 'olistbr/brazilian-ecommerce' from Kaggle...
Dataset URL: https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce
‚úÖ Download and extraction complete.
üîÑ Starting data ingestion to PostgreSQL...
   ‚û°Ô∏è Processing 'customers'...
      OK: Loaded 99441 rows.
   ‚û°Ô∏è Processing 'geolocation'...
      OK: Loaded 1000163 rows.
   ‚û°Ô∏è Processing 'product_category_name_translation'...
      OK: Loaded 71 rows.
   ‚û°Ô∏è Processing 'sellers'...
      OK: Loaded 3095 rows.
   ‚û°Ô∏è Processing 'order_payments'...
      OK: Loaded 103886 rows.
   ‚û°Ô∏è Processing 'order_reviews'...
      OK: Loaded 99224 rows.
   ‚û°Ô∏è Processing 'order_items'...
      OK: Loaded 112650 rows.
   ‚û°Ô∏è Processing 'products'...
      OK: Loaded 32951 rows.
   ‚û°Ô∏è Processing 'orders'...
      OK: Loaded 99441 rows.

üîé Verifying database content...
‚úÖ Tables in database (9): customers, geolocation, product_category_name_translation, sellers, order_payments, order_r