In [1]:
# =============================================================================
# MAIN ETL PIPELINE - ARCHITETTURA 3 LIVELLI
# Orchestrator principale che coordina tutto il processo ETL
# =============================================================================

import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text
import subprocess
import sys
import logging
from datetime import datetime
logging.getLogger().setLevel(logging.ERROR)

In [2]:
# =============================================================================
# CONFIGURAZIONE
# =============================================================================
print("🔧 CONFIGURAZIONE SISTEMA")
print("="*60)

# Connessione database
engine = create_engine('postgresql://postgres:postgres@localhost:5432/ecommerce')

# Path dei file
BASE_PATH = '1_source_layer/datasets_olist/'
RDL_SCHEMA_PATH = '2_rdl_layer/create_rdl_schema.sql'
DW_SCHEMA_PATH = '3_dw_layer/create_tables.sql'

🔧 CONFIGURAZIONE SISTEMA


In [3]:
# =============================================================================
# FASE 0: SETUP DATABASE
# =============================================================================
print("\n📋 FASE 0: SETUP DATABASE")
print("-"*60)

def setup_database_schemas():
    """Crea gli schemi RDL e DW"""
    try:
        with engine.connect() as conn:
            # Crea schema RDL
            print("📦 Creazione schema RDL...")
            with open(RDL_SCHEMA_PATH, 'r') as f:
                rdl_sql = f.read()
            conn.execute(text(rdl_sql))
            conn.commit()
            print("✅ Schema RDL creato")
            
            # Crea schema DW (le tue tabelle esistenti)
            print("📦 Creazione schema DW...")
            with open(DW_SCHEMA_PATH, 'r') as f:
                dw_sql = f.read()
            conn.execute(text(dw_sql))
            conn.commit()
            print("✅ Schema DW creato")
            
        return True
    except Exception as e:
        print(f"❌ Errore setup database: {e}")
        return False

# Esegui setup
setup_database_schemas()


📋 FASE 0: SETUP DATABASE
------------------------------------------------------------
📦 Creazione schema RDL...
✅ Schema RDL creato
📦 Creazione schema DW...
✅ Schema DW creato


True

In [4]:
# =============================================================================
# FASE 1: CARICAMENTO DATI SORGENTE
# =============================================================================
print("\n📥 FASE 1: CARICAMENTO DATI SORGENTE")
print("-"*60)

# Il tuo codice esistente per caricare i CSV
files_to_load = {
    'orders': 'olist_orders_dataset.csv',
    'order_items': 'olist_order_items_dataset.csv', 
    'customers': 'olist_customers_dataset.csv',
    'products': 'olist_products_dataset.csv',
    'sellers': 'olist_sellers_dataset.csv',
    'payments': 'olist_order_payments_dataset.csv',
    'reviews': 'olist_order_reviews_dataset.csv',
    'geolocation': 'olist_geolocation_dataset.csv',
    'translation': 'product_category_name_translation.csv'
}

source_data = {}
for name, filename in files_to_load.items():
    filepath = BASE_PATH + filename
    print(f"📊 Caricando {filename}...")
    source_data[name] = pd.read_csv(filepath)
    print(f"✅ {name}: {len(source_data[name]):,} righe")


📥 FASE 1: CARICAMENTO DATI SORGENTE
------------------------------------------------------------
📊 Caricando olist_orders_dataset.csv...
✅ orders: 99,441 righe
📊 Caricando olist_order_items_dataset.csv...
✅ order_items: 112,650 righe
📊 Caricando olist_customers_dataset.csv...
✅ customers: 99,441 righe
📊 Caricando olist_products_dataset.csv...
✅ products: 32,951 righe
📊 Caricando olist_sellers_dataset.csv...
✅ sellers: 3,095 righe
📊 Caricando olist_order_payments_dataset.csv...
✅ payments: 103,886 righe
📊 Caricando olist_order_reviews_dataset.csv...
✅ reviews: 99,224 righe
📊 Caricando olist_geolocation_dataset.csv...
✅ geolocation: 1,000,163 righe
📊 Caricando product_category_name_translation.csv...
✅ translation: 71 righe


In [5]:
# VERIFICA STRUTTURA TABELLE RDL
print("🔍 VERIFICA STRUTTURA TABELLE RDL:")
print("-"*60)

with engine.connect() as conn:
    tables = ['orders', 'customers', 'products', 'order_items', 'payments', 'reviews', 'sellers', 'geolocation']
    
    for table in tables:
        print(f"\n📋 Tabella rdl.{table}:")
        try:
            # Query per ottenere le colonne della tabella
            result = conn.execute(text(f"""
                SELECT column_name, data_type 
                FROM information_schema.columns 
                WHERE table_schema = 'rdl' 
                AND table_name = '{table}'
                ORDER BY ordinal_position
            """))
            
            columns = result.fetchall()
            if columns:
                for col_name, data_type in columns:
                    print(f"   - {col_name} ({data_type})")
            else:
                print(f"   ❌ Tabella non trovata o senza colonne")
                
        except Exception as e:
            print(f"   ❌ Errore: {e}")

🔍 VERIFICA STRUTTURA TABELLE RDL:
------------------------------------------------------------

📋 Tabella rdl.orders:
   - order_id (character varying)
   - customer_id (character varying)
   - order_status (character varying)
   - order_purchase_timestamp (timestamp without time zone)
   - order_approved_at (timestamp without time zone)
   - order_delivered_carrier_date (timestamp without time zone)
   - order_delivered_customer_date (timestamp without time zone)
   - order_estimated_delivery_date (timestamp without time zone)
   - rdl_load_timestamp (timestamp without time zone)
   - rdl_source_system (character varying)
   - rdl_is_valid (boolean)
   - rdl_validation_errors (text)
   - rdl_processing_status (character varying)
   - rdl_delivery_delay_days (integer)
   - rdl_approval_delay_hours (integer)

📋 Tabella rdl.customers:
   - customer_id (character varying)
   - customer_unique_id (character varying)
   - customer_zip_code_prefix (integer)
   - customer_city (character vary

In [6]:
# FASE 2 - VERSIONE DEFINITIVA CON DEBUG E FIX COMPLETO
print("\n🔄 FASE 2: ETL VERSO RDL (Versione definitiva)")
print("-"*60)

# Prima di tutto, analizziamo il problema geolocation
print("\n🔍 ANALISI PROBLEMA GEOLOCATION:")
geo_df = source_data['geolocation'].copy()

# Converti i tipi per assicurare consistenza
geo_df['geolocation_zip_code_prefix'] = geo_df['geolocation_zip_code_prefix'].astype(int)
geo_df['geolocation_lat'] = geo_df['geolocation_lat'].round(8)  # Precision come nel DB
geo_df['geolocation_lng'] = geo_df['geolocation_lng'].round(8)

print(f"Record totali: {len(geo_df):,}")

# Trova duplicati ESATTI
duplicates = geo_df[geo_df.duplicated(
    subset=['geolocation_zip_code_prefix', 'geolocation_lat', 'geolocation_lng'], 
    keep=False
)]
print(f"Duplicati trovati: {len(duplicates):,}")

# Mostra alcuni esempi di duplicati
if len(duplicates) > 0:
    print("\nEsempi di duplicati:")
    sample = duplicates.head(10)
    for _, row in sample.iterrows():
        print(f"  ZIP: {row['geolocation_zip_code_prefix']}, "
              f"LAT: {row['geolocation_lat']}, LNG: {row['geolocation_lng']}")

# Funzione per pulire COMPLETAMENTE il database
def clean_rdl_completely():
    """Pulisce completamente lo schema RDL"""
    print("\n🧹 PULIZIA COMPLETA RDL...")
    try:
        with engine.connect() as conn:
            # Prima elimina le viste che dipendono dalle tabelle
            conn.execute(text("DROP VIEW IF EXISTS rdl.v_data_quality_summary CASCADE"))
            conn.execute(text("DROP VIEW IF EXISTS rdl.v_geocoding_status CASCADE"))
            conn.commit()
            
            # Poi elimina tutte le tabelle
            tables = ['fact_sales', 'orders', 'customers', 'products', 'order_items', 
                     'payments', 'reviews', 'sellers', 'geolocation', 
                     'data_quality_log', 'etl_process_log', 'data_lineage']
            
            for table in tables:
                try:
                    conn.execute(text(f"DROP TABLE IF EXISTS rdl.{table} CASCADE"))
                    print(f"   ✓ Eliminata rdl.{table}")
                except Exception as e:
                    print(f"   ⚠️ rdl.{table}: {e}")
            
            conn.commit()
            print("✅ Schema RDL pulito completamente")
            
            # Ricrea solo le tabelle necessarie
            print("\n📦 Ricreazione tabelle RDL essenziali...")
            
            # Ricrea geolocation con gestione migliore
            conn.execute(text("""
                CREATE TABLE rdl.geolocation (
                    geolocation_zip_code_prefix INTEGER,
                    geolocation_lat NUMERIC(10,8),
                    geolocation_lng NUMERIC(11,8),
                    geolocation_city VARCHAR(100),
                    geolocation_state VARCHAR(2),
                    rdl_load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    rdl_is_primary BOOLEAN DEFAULT TRUE,
                    rdl_confidence_score NUMERIC(3,2) DEFAULT 1.0,
                    rdl_region VARCHAR(50),
                    CONSTRAINT geolocation_pkey PRIMARY KEY 
                        (geolocation_zip_code_prefix, geolocation_lat, geolocation_lng)
                )
            """))
            
            # Ricrea le altre tabelle dal tuo schema originale
            # -- =====================================================
            conn.execute(text("""
                -- RECONCILED DATA LAYER (RDL) SCHEMA
                -- Intermediate layer for data cleaning and integration
                -- =====================================================
                
                -- Create separate schema for RDL
                CREATE SCHEMA IF NOT EXISTS rdl;
                
                -- =====================================================
                -- RDL TABLES - Mirror operational sources with additions
                -- =====================================================
                
                -- RDL Orders (with data quality flags)
                DROP TABLE IF EXISTS rdl.orders CASCADE;
                CREATE TABLE rdl.orders (
                    -- Original columns
                    order_id VARCHAR(50) PRIMARY KEY,
                    customer_id VARCHAR(50),
                    order_status VARCHAR(50),
                    order_purchase_timestamp TIMESTAMP,
                    order_approved_at TIMESTAMP,
                    order_delivered_carrier_date TIMESTAMP,
                    order_delivered_customer_date TIMESTAMP,
                    order_estimated_delivery_date TIMESTAMP,
                    
                    -- RDL additions for data quality
                    rdl_load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    rdl_source_system VARCHAR(50) DEFAULT 'olist',
                    rdl_is_valid BOOLEAN DEFAULT TRUE,
                    rdl_validation_errors TEXT,
                    rdl_processing_status VARCHAR(20) DEFAULT 'new', -- new, processed, error
                    
                    -- Derived fields for reconciliation
                    rdl_delivery_delay_days INTEGER,
                    rdl_approval_delay_hours INTEGER
                );
                
                -- RDL Customers (with geocoding status)
                DROP TABLE IF EXISTS rdl.customers CASCADE;
                CREATE TABLE rdl.customers (
                    -- Original columns
                    customer_id VARCHAR(50) PRIMARY KEY,
                    customer_unique_id VARCHAR(50),
                    customer_zip_code_prefix INTEGER,
                    customer_city VARCHAR(100),
                    customer_state VARCHAR(2),
                    
                    -- RDL additions
                    rdl_load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    rdl_source_system VARCHAR(50) DEFAULT 'olist',
                    rdl_is_valid BOOLEAN DEFAULT TRUE,
                    rdl_validation_errors TEXT,
                    
                    -- Geocoding fields
                    rdl_latitude DECIMAL(10,8),
                    rdl_longitude DECIMAL(11,8),
                    rdl_geocoding_status VARCHAR(20), -- matched, approximate, not_found
                    rdl_geocoding_confidence DECIMAL(3,2),
                    
                    -- Data quality scores
                    rdl_address_completeness_score DECIMAL(3,2),
                    rdl_data_quality_score DECIMAL(3,2)
                );
                
                -- RDL Products (with enrichment fields)
                DROP TABLE IF EXISTS rdl.products CASCADE;
                CREATE TABLE rdl.products (
                    -- Original columns
                    product_id VARCHAR(50) PRIMARY KEY,
                    product_category_name VARCHAR(100),
                    product_name_lenght INTEGER,
                    product_description_lenght INTEGER,
                    product_photos_qty INTEGER,
                    product_weight_g INTEGER,
                    product_length_cm INTEGER,
                    product_height_cm INTEGER,
                    product_width_cm INTEGER,
                    
                    -- RDL additions
                    rdl_load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    rdl_source_system VARCHAR(50) DEFAULT 'olist',
                    rdl_is_valid BOOLEAN DEFAULT TRUE,
                    rdl_validation_errors TEXT,
                    
                    -- Enrichment fields
                    rdl_category_english VARCHAR(100),
                    rdl_category_hierarchy_1 VARCHAR(50), -- Main category
                    rdl_category_hierarchy_2 VARCHAR(50), -- Subcategory
                    rdl_weight_category VARCHAR(20),
                    rdl_size_category VARCHAR(20),
                    rdl_volume_cm3 INTEGER,
                    
                    -- Quality indicators
                    rdl_has_complete_dimensions BOOLEAN,
                    rdl_has_photos BOOLEAN,
                    rdl_description_quality VARCHAR(20) -- short, medium, long
                );
                
                -- RDL Order Items (with calculations)
                DROP TABLE IF EXISTS rdl.order_items CASCADE;
                CREATE TABLE rdl.order_items (
                    -- Composite primary key
                    order_id VARCHAR(50),
                    order_item_id INTEGER,
                    product_id VARCHAR(50),
                    seller_id VARCHAR(50),
                    shipping_limit_date TIMESTAMP,
                    price DECIMAL(10,2),
                    freight_value DECIMAL(10,2),
                    
                    -- RDL additions
                    rdl_load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    rdl_source_system VARCHAR(50) DEFAULT 'olist',
                    rdl_is_valid BOOLEAN DEFAULT TRUE,
                    
                    -- Calculated fields
                    rdl_total_item_value DECIMAL(10,2),
                    rdl_freight_percentage DECIMAL(8,2),
                    rdl_item_profit_estimate DECIMAL(10,2),
                    
                    PRIMARY KEY (order_id, order_item_id)
                );
                
                -- RDL Payments (with aggregations)
                DROP TABLE IF EXISTS rdl.payments CASCADE;
                CREATE TABLE rdl.payments (
                    -- Original columns
                    order_id VARCHAR(50),
                    payment_sequential INTEGER,
                    payment_type VARCHAR(50),
                    payment_installments INTEGER,
                    payment_value DECIMAL(10,2),
                    
                    -- RDL additions
                    rdl_load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    rdl_source_system VARCHAR(50) DEFAULT 'olist',
                    rdl_is_valid BOOLEAN DEFAULT TRUE,
                    
                    -- Aggregated at order level
                    rdl_total_order_value DECIMAL(10,2),
                    rdl_payment_method_count INTEGER,
                    rdl_is_multi_payment BOOLEAN,
                    
                    PRIMARY KEY (order_id, payment_sequential)
                );
                
                -- RDL Reviews (with sentiment analysis placeholder)
                DROP TABLE IF EXISTS rdl.reviews CASCADE;
                CREATE TABLE rdl.reviews (
                    review_id VARCHAR(50) PRIMARY KEY,
                    order_id VARCHAR(50),
                    review_score INTEGER,
                    review_comment_title VARCHAR(100),
                    review_comment_message TEXT,
                    review_creation_date TIMESTAMP,
                    review_answer_timestamp TIMESTAMP,
                    
                    -- RDL additions
                    rdl_load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    rdl_source_system VARCHAR(50) DEFAULT 'olist',
                    rdl_is_valid BOOLEAN DEFAULT TRUE,
                    
                    -- Text analysis placeholders
                    rdl_has_comment BOOLEAN,
                    rdl_comment_length INTEGER,
                    rdl_response_time_hours INTEGER,
                    rdl_sentiment_score DECIMAL(3,2), -- -1 to 1
                    rdl_review_category VARCHAR(50) -- delivery, quality, price, etc.
                );
                
                -- RDL Sellers (with business metrics)
                DROP TABLE IF EXISTS rdl.sellers CASCADE;
                CREATE TABLE rdl.sellers (
                    seller_id VARCHAR(50) PRIMARY KEY,
                    seller_zip_code_prefix INTEGER,
                    seller_city VARCHAR(100),
                    seller_state VARCHAR(2),
                    
                    -- RDL additions
                    rdl_load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    rdl_source_system VARCHAR(50) DEFAULT 'olist',
                    rdl_is_valid BOOLEAN DEFAULT TRUE,
                    
                    -- Geocoding
                    rdl_latitude DECIMAL(10,8),
                    rdl_longitude DECIMAL(11,8),
                    rdl_geocoding_status VARCHAR(20),
                    rdl_geocoding_confidence DECIMAL(3,2),  -- Aggiunta questa colonna
                    
                    -- Business metrics (to be calculated)
                    rdl_total_orders INTEGER,
                    rdl_total_revenue DECIMAL(12,2),
                    rdl_avg_rating DECIMAL(3,2),
                    rdl_active_status VARCHAR(20) -- active, inactive, new
                );
                
                -- RDL Geolocation (normalized and cleaned)
                DROP TABLE IF EXISTS rdl.geolocation CASCADE;
                CREATE TABLE rdl.geolocation (
                    geolocation_zip_code_prefix INTEGER,
                    geolocation_lat DECIMAL(10,8),
                    geolocation_lng DECIMAL(11,8),
                    geolocation_city VARCHAR(100),
                    geolocation_state VARCHAR(2),
                    
                    -- RDL additions
                    rdl_load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    rdl_is_primary BOOLEAN, -- Primary location for this ZIP
                    rdl_confidence_score DECIMAL(3,2),
                    rdl_region VARCHAR(50),
                    
                    PRIMARY KEY (geolocation_zip_code_prefix, geolocation_lat, geolocation_lng)
                );
                
                -- =====================================================
                -- RDL METADATA TABLES
                -- =====================================================
                
                -- Data Quality Monitoring
                DROP TABLE IF EXISTS rdl.data_quality_log CASCADE;
                CREATE TABLE rdl.data_quality_log (
                    log_id SERIAL PRIMARY KEY,
                    check_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    table_name VARCHAR(50),
                    check_type VARCHAR(50), -- completeness, accuracy, consistency, timeliness
                    total_records INTEGER,
                    valid_records INTEGER,
                    invalid_records INTEGER,
                    quality_score DECIMAL(5,2),
                    error_details JSONB
                );
                
                -- ETL Process Log
                DROP TABLE IF EXISTS rdl.etl_process_log CASCADE;
                CREATE TABLE rdl.etl_process_log (
                    process_id SERIAL PRIMARY KEY,
                    process_name VARCHAR(100),
                    process_type VARCHAR(50), -- extract, transform, load
                    start_timestamp TIMESTAMP,
                    end_timestamp TIMESTAMP,
                    status VARCHAR(20), -- running, completed, failed
                    records_processed INTEGER,
                    records_rejected INTEGER,
                    error_message TEXT,
                    process_metadata JSONB
                );
                
                -- Data Lineage
                DROP TABLE IF EXISTS rdl.data_lineage CASCADE;
                CREATE TABLE rdl.data_lineage (
                    lineage_id SERIAL PRIMARY KEY,
                    source_system VARCHAR(50),
                    source_table VARCHAR(100),
                    source_column VARCHAR(100),
                    rdl_table VARCHAR(100),
                    rdl_column VARCHAR(100),
                    transformation_rule TEXT,
                    created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
                
                -- =====================================================
                -- INDEXES FOR PERFORMANCE
                -- =====================================================
                CREATE INDEX idx_rdl_orders_customer ON rdl.orders(customer_id);
                CREATE INDEX idx_rdl_orders_status ON rdl.orders(order_status);
                CREATE INDEX idx_rdl_orders_timestamp ON rdl.orders(order_purchase_timestamp);
                
                CREATE INDEX idx_rdl_customers_state ON rdl.customers(customer_state);
                CREATE INDEX idx_rdl_customers_valid ON rdl.customers(rdl_is_valid);
                
                CREATE INDEX idx_rdl_products_category ON rdl.products(product_category_name);
                CREATE INDEX idx_rdl_products_valid ON rdl.products(rdl_is_valid);
                
                CREATE INDEX idx_rdl_order_items_order ON rdl.order_items(order_id);
                CREATE INDEX idx_rdl_order_items_product ON rdl.order_items(product_id);
                CREATE INDEX idx_rdl_order_items_seller ON rdl.order_items(seller_id);
                
                -- =====================================================
                -- VIEWS FOR DATA QUALITY MONITORING
                -- =====================================================
                
                -- Overall data quality dashboard
                CREATE OR REPLACE VIEW rdl.v_data_quality_summary AS
                SELECT 
                    'orders' as table_name,
                    COUNT(*) as total_records,
                    SUM(CASE WHEN rdl_is_valid THEN 1 ELSE 0 END) as valid_records,
                    AVG(CASE WHEN rdl_is_valid THEN 1 ELSE 0 END) * 100 as quality_percentage
                FROM rdl.orders
                UNION ALL
                SELECT 
                    'customers' as table_name,
                    COUNT(*) as total_records,
                    SUM(CASE WHEN rdl_is_valid THEN 1 ELSE 0 END) as valid_records,
                    AVG(CASE WHEN rdl_is_valid THEN 1 ELSE 0 END) * 100 as quality_percentage
                FROM rdl.customers
                UNION ALL
                SELECT 
                    'products' as table_name,
                    COUNT(*) as total_records,
                    SUM(CASE WHEN rdl_is_valid THEN 1 ELSE 0 END) as valid_records,
                    AVG(CASE WHEN rdl_is_valid THEN 1 ELSE 0 END) * 100 as quality_percentage
                FROM rdl.products;
                
                -- Geocoding status view
                CREATE OR REPLACE VIEW rdl.v_geocoding_status AS
                SELECT 
                    'customers' as entity_type,
                    rdl_geocoding_status,
                    COUNT(*) as count
                FROM rdl.customers
                WHERE rdl_geocoding_status IS NOT NULL
                GROUP BY rdl_geocoding_status
                UNION ALL
                SELECT 
                    'sellers' as entity_type,
                    rdl_geocoding_status,
                    COUNT(*) as count
                FROM rdl.sellers
                WHERE rdl_geocoding_status IS NOT NULL
                GROUP BY rdl_geocoding_status;
                
                -- =====================================================
                -- COMMENTS FOR DOCUMENTATION
                -- =====================================================
                COMMENT ON SCHEMA rdl IS 'Reconciled Data Layer - Intermediate storage for data cleaning, validation, and enrichment';
                COMMENT ON TABLE rdl.orders IS 'Reconciled orders data with validation flags and calculated fields';
                COMMENT ON TABLE rdl.customers IS 'Reconciled customer data with geocoding and quality scores';
                COMMENT ON TABLE rdl.products IS 'Reconciled product data with translations and categorizations';
                COMMENT ON TABLE rdl.data_quality_log IS 'Tracks data quality metrics over time';
                COMMENT ON TABLE rdl.etl_process_log IS 'Logs all ETL processes for monitoring and debugging';
                COMMENT ON TABLE rdl.data_lineage IS 'Tracks data flow from source to RDL to DW';"""))
            
            conn.commit()
            print("✅ Tabelle RDL ricreate")
            
    except Exception as e:
        print(f"❌ Errore pulizia completa: {e}")
        raise

# Funzione specifica per caricare geolocation
def load_geolocation_safely(df):
    """Carica geolocation gestendo i duplicati in modo sicuro"""
    print("\n📍 Caricamento speciale GEOLOCATION...")
    
    # Prepara i dati
    df = df.copy()
    
    # Assicura tipi corretti e precisione
    df['geolocation_zip_code_prefix'] = df['geolocation_zip_code_prefix'].astype(int)
    df['geolocation_lat'] = df['geolocation_lat'].astype(float).round(8)
    df['geolocation_lng'] = df['geolocation_lng'].astype(float).round(8)
    
    # Rimuovi duplicati
    print(f"   Record originali: {len(df):,}")
    df = df.drop_duplicates(
        subset=['geolocation_zip_code_prefix', 'geolocation_lat', 'geolocation_lng'],
        keep='first'
    )
    print(f"   Record dopo deduplicazione: {len(df):,}")
    
    # Aggiungi campi RDL
    df['rdl_load_timestamp'] = datetime.now()
    df['rdl_is_primary'] = True
    df['rdl_confidence_score'] = 1.0
    
    # Mappa regioni
    region_map = {
        'SP': 'Sudeste', 'RJ': 'Sudeste', 'MG': 'Sudeste', 'ES': 'Sudeste',
        'PR': 'Sul', 'SC': 'Sul', 'RS': 'Sul',
        'BA': 'Nordeste', 'PE': 'Nordeste', 'CE': 'Nordeste', 'PB': 'Nordeste',
        'RN': 'Nordeste', 'SE': 'Nordeste', 'AL': 'Nordeste', 'MA': 'Nordeste', 'PI': 'Nordeste',
        'GO': 'Centro-Oeste', 'MT': 'Centro-Oeste', 'MS': 'Centro-Oeste', 'DF': 'Centro-Oeste',
        'AC': 'Norte', 'AP': 'Norte', 'AM': 'Norte', 'PA': 'Norte', 'RO': 'Norte', 'RR': 'Norte', 'TO': 'Norte'
    }
    df['rdl_region'] = df['geolocation_state'].map(region_map).fillna('Unknown')
    
    print("   Caricamento in batch...")
    try:
        # Prepara le colonne nell'ordine corretto
        columns_order = [
            'geolocation_zip_code_prefix', 'geolocation_lat', 'geolocation_lng',
            'geolocation_city', 'geolocation_state', 'rdl_load_timestamp',
            'rdl_is_primary', 'rdl_confidence_score', 'rdl_region'
        ]
        df_final = df[columns_order]
        
        # Carica in batch usando to_sql
        df_final.to_sql(
            'geolocation', 
            engine, 
            schema='rdl',
            if_exists='append', 
            index=False, 
            method='multi',
            chunksize=10000  # Processa 10000 record alla volta
        )
        
        print(f"✅ GEOLOCATION caricata: {len(df_final):,} record")
        return True
        
    except Exception as e:
        print(f"❌ Errore caricamento: {e}")
        return False

def load_to_rdl_fixed(df, table_name):
    """Carica dati in RDL con gestione duplicati"""
    try:
        df = df.copy()
        
        # GESTIONE SPECIALE PER REVIEWS - duplicati su review_id
        if table_name == 'reviews':
            original_count = len(df)
            df = df.drop_duplicates(subset=['review_id'], keep='first')
            if original_count > len(df):
                print(f"   ⚠️ Rimossi {original_count - len(df)} reviews duplicati")
        
        # GESTIONE SPECIALE PER GEOLOCATION - duplicati sulla chiave composita
        if table_name == 'geolocation':
            original_count = len(df)
            df = df.drop_duplicates(
                subset=['geolocation_zip_code_prefix', 'geolocation_lat', 'geolocation_lng'], 
                keep='first'
            )
            if original_count > len(df):
                print(f"   ⚠️ Rimossi {original_count - len(df)} geolocation duplicati")
        
        # Aggiungi campi RDL comuni
        df['rdl_load_timestamp'] = datetime.now()
        df['rdl_source_system'] = 'olist'
        df['rdl_is_valid'] = True
        
        # Aggiungi campi specifici per tabella
        if table_name == 'orders':
            df['rdl_validation_errors'] = ''
            df['rdl_processing_status'] = 'new'
            # Calcola campi derivati se possibile
            if 'order_delivered_customer_date' in df.columns and 'order_estimated_delivery_date' in df.columns:
                df['order_delivered_customer_date'] = pd.to_datetime(df['order_delivered_customer_date'], errors='coerce')
                df['order_estimated_delivery_date'] = pd.to_datetime(df['order_estimated_delivery_date'], errors='coerce')
                mask_valid = df['order_delivered_customer_date'].notna() & df['order_estimated_delivery_date'].notna()
                df.loc[mask_valid, 'rdl_delivery_delay_days'] = (
                    df.loc[mask_valid, 'order_delivered_customer_date'] - 
                    df.loc[mask_valid, 'order_estimated_delivery_date']
                ).dt.days
            
        elif table_name == 'customers':
            df['rdl_validation_errors'] = ''
            df['rdl_geocoding_status'] = 'not_geocoded'
            df['rdl_geocoding_confidence'] = 0.0
            df['rdl_address_completeness_score'] = 1.0
            df['rdl_data_quality_score'] = 1.0
            
        elif table_name == 'products':
            df['rdl_validation_errors'] = ''
            df['rdl_has_complete_dimensions'] = True
            df['rdl_has_photos'] = df['product_photos_qty'] > 0
            df['rdl_description_quality'] = 'medium'
            df['rdl_weight_category'] = 'medium'
            df['rdl_size_category'] = 'medium'
            
        elif table_name == 'order_items':
            # Calcola totale
            if 'price' in df.columns and 'freight_value' in df.columns:
                df['rdl_total_item_value'] = df['price'] + df['freight_value']
                df['rdl_freight_percentage'] = (df['freight_value'] / df['price'].replace(0, 0.01) * 100).round(2)
                
        elif table_name == 'payments':
            df['rdl_is_multi_payment'] = False
            df['rdl_payment_method_count'] = 1
            
        elif table_name == 'reviews':
            df['rdl_has_comment'] = df['review_comment_message'].notna()
            df['rdl_comment_length'] = df['review_comment_message'].fillna('').str.len()
            
        elif table_name == 'sellers':
            df['rdl_geocoding_status'] = 'not_geocoded'
            df['rdl_geocoding_confidence'] = 0.0
            df['rdl_active_status'] = 'active'
            
        elif table_name == 'geolocation':
            df['rdl_is_primary'] = True
            df['rdl_confidence_score'] = 1.0
            # Mappa regioni
            region_map = {
                'SP': 'Sudeste', 'RJ': 'Sudeste', 'MG': 'Sudeste', 'ES': 'Sudeste',
                'PR': 'Sul', 'SC': 'Sul', 'RS': 'Sul',
                'BA': 'Nordeste', 'PE': 'Nordeste', 'CE': 'Nordeste', 'PB': 'Nordeste',
                'RN': 'Nordeste', 'SE': 'Nordeste', 'AL': 'Nordeste', 'MA': 'Nordeste', 'PI': 'Nordeste',
                'GO': 'Centro-Oeste', 'MT': 'Centro-Oeste', 'MS': 'Centro-Oeste', 'DF': 'Centro-Oeste',
                'AC': 'Norte', 'AP': 'Norte', 'AM': 'Norte', 'PA': 'Norte', 'RO': 'Norte', 'RR': 'Norte', 'TO': 'Norte'
            }
            df['rdl_region'] = df['geolocation_state'].map(region_map).fillna('Unknown')
        
        # Ottieni colonne esistenti nel database
        with engine.connect() as conn:
            result = conn.execute(text(f"""
                SELECT column_name 
                FROM information_schema.columns 
                WHERE table_schema = 'rdl' AND table_name = :table_name
            """), {"table_name": table_name})
            existing_columns = [row[0] for row in result]
        
        # Mantieni solo colonne che esistono
        columns_to_keep = [col for col in df.columns if col in existing_columns]
        df_final = df[columns_to_keep]
        
        # Carica nel database
        df_final.to_sql(table_name, engine, schema='rdl', if_exists='append', index=False, method='multi', chunksize=1000)
        print(f"✅ {table_name}: {len(df_final):,} record caricati")
        
        return True
        
    except Exception as e:
        print(f"❌ {table_name}: Errore - {e}")
        return False

# ESECUZIONE PRINCIPALE
# 1. Pulisci completamente (opzionale, solo se necessario)
if False:  # Cambia in False se non vuoi ripulire tutto
    clean_rdl_completely()
    
    # Dopo la pulizia, devi rieseguire create_rdl_schema.sql
    print("\n⚠️ ATTENZIONE: Devi rieseguire create_rdl_schema.sql per ricreare tutte le tabelle!")
    print("In pgAdmin, esegui il file 2_rdl_layer/create_rdl_schema.sql")
    print("Poi riprendi da qui.")
    # Uncomment per continuare automaticamente se hai il file
    # exec(open('2_rdl_layer/create_rdl_schema.sql').read())

# 2. Carica prima geolocation con il metodo sicuro
load_geolocation_safely(source_data['geolocation'])

# 3. Poi carica le altre tabelle normalmente
print("\n📥 Caricamento altre tabelle RDL...")
for table_name in ['orders', 'customers', 'products', 'order_items', 'payments', 'reviews', 'sellers']:
    if table_name in source_data:
        # Usa la tua funzione esistente per le altre tabelle
        load_to_rdl_fixed(source_data[table_name], table_name)

# Report finale
print("\n📊 REPORT FINALE RDL:")
print("-"*60)
with engine.connect() as conn:
    for table in ['orders', 'customers', 'products', 'order_items', 'payments', 'reviews', 'sellers', 'geolocation']:
        try:
            count = conn.execute(text(f"SELECT COUNT(*) FROM rdl.{table}")).scalar()
            print(f"rdl.{table}: {count:,} record")
        except Exception as e:
            print(f"rdl.{table}: ERRORE - {e}")

print("\n✅ FASE 2 completata con successo!")


🔄 FASE 2: ETL VERSO RDL (Versione definitiva)
------------------------------------------------------------

🔍 ANALISI PROBLEMA GEOLOCATION:
Record totali: 1,000,163
Duplicati trovati: 411,724

Esempi di duplicati:
  ZIP: 1037, LAT: -23.54562128, LNG: -46.63929205
  ZIP: 1046, LAT: -23.54608113, LNG: -46.6448203
  ZIP: 1046, LAT: -23.54612897, LNG: -46.64295148
  ZIP: 1047, LAT: -23.54627311, LNG: -46.64122517
  ZIP: 1013, LAT: -23.54692321, LNG: -46.6342637
  ZIP: 1029, LAT: -23.54376906, LNG: -46.63427784
  ZIP: 1011, LAT: -23.54763955, LNG: -46.63603162
  ZIP: 1013, LAT: -23.54732513, LNG: -46.63418379
  ZIP: 1012, LAT: -23.54894599, LNG: -46.63467113
  ZIP: 1037, LAT: -23.54518734, LNG: -46.63785524

📍 Caricamento speciale GEOLOCATION...
   Record originali: 1,000,163
   Record dopo deduplicazione: 720,148
   Caricamento in batch...
✅ GEOLOCATION caricata: 720,148 record

📥 Caricamento altre tabelle RDL...
✅ orders: 99,441 record caricati
✅ customers: 99,441 record caricati
✅ produ

In [7]:
# =============================================================================
# FASE 3: TRANSFORM & LOAD VERSO DW
# =============================================================================
print("\n🚀 FASE 3: TRANSFORM & LOAD VERSO DW")
print("-"*60)

# Prima pulisci le tabelle DW esistenti
print("🧹 Pulizia tabelle DW...")
with engine.connect() as conn:
    # Pulisci in ordine per rispettare le FK
    conn.execute(text("TRUNCATE TABLE fact_sales CASCADE"))
    conn.execute(text("TRUNCATE TABLE dim_customer CASCADE"))
    conn.execute(text("TRUNCATE TABLE dim_product CASCADE"))
    conn.execute(text("TRUNCATE TABLE dim_seller CASCADE"))
    conn.execute(text("TRUNCATE TABLE dim_time CASCADE"))
    conn.execute(text("TRUNCATE TABLE dim_payment CASCADE"))
    conn.execute(text("TRUNCATE TABLE dim_geography CASCADE"))
    conn.commit()
print("✅ Tabelle DW pulite")

# =============================================================================
# 1. CARICA DIM_GEOGRAPHY (necessaria per le altre dimensioni)
# =============================================================================
print("\n📌 DIM_GEOGRAPHY...")

# METODO 1: Carica direttamente nel DB senza passare da pandas
with engine.connect() as conn:
    # Prima conta quanti record
    count_query = """
        SELECT COUNT(DISTINCT geolocation_zip_code_prefix) 
        FROM rdl.geolocation 
        WHERE rdl_is_primary = true
    """
    total_zips = conn.execute(text(count_query)).scalar()
    print(f"   ZIP codes da caricare: {total_zips:,}")
    
    # Poi inserisci direttamente con SQL
    insert_query = """
        INSERT INTO dim_geography (zip_code_prefix, city, state, latitude, longitude, region)
        SELECT DISTINCT
            geolocation_zip_code_prefix::varchar,
            geolocation_city,
            geolocation_state,
            geolocation_lat,
            geolocation_lng,
            rdl_region
        FROM rdl.geolocation
        WHERE rdl_is_primary = true
        ON CONFLICT (zip_code_prefix) DO NOTHING
    """
    
    result = conn.execute(text(insert_query))
    conn.commit()
    print(f"✅ Caricati {result.rowcount:,} ZIP codes")

# Carica solo le chiavi per i join successivi (molto più leggero)
dim_geography_keys = pd.read_sql(
    "SELECT geography_key, zip_code_prefix FROM dim_geography", 
    engine
)
print(f"   Chiavi caricate: {len(dim_geography_keys):,}")

# =============================================================================
# 2. CARICA DIM_CUSTOMER
# =============================================================================
print("\n📌 DIM_CUSTOMER...")
query_customer = """
    SELECT DISTINCT
        customer_id,
        customer_city,
        customer_state,
        customer_zip_code_prefix::varchar as customer_zip_code_prefix,
        rdl_latitude as customer_latitude,
        rdl_longitude as customer_longitude
    FROM rdl.customers
    WHERE rdl_is_valid = true
"""
dim_customer = pd.read_sql(query_customer, engine)

# Aggiungi geography_key
dim_customer = dim_customer.merge(
    dim_geography_keys,
    left_on='customer_zip_code_prefix',
    right_on='zip_code_prefix',
    how='left'
)
dim_customer = dim_customer.drop('zip_code_prefix', axis=1)

# Carica in DW
dim_customer.to_sql('dim_customer', engine, if_exists='append', index=False, method='multi')
print(f"✅ Caricati {len(dim_customer):,} customers")

# =============================================================================
# 3. CARICA DIM_PRODUCT
# =============================================================================
print("\n📌 DIM_PRODUCT...")
query_product = """
    SELECT DISTINCT
        product_id,
        product_category_name,
        COALESCE(rdl_category_english, product_category_name) as product_category_name_english,
        product_name_lenght,
        product_description_lenght,
        product_photos_qty,
        product_weight_g,
        product_length_cm,
        product_height_cm,
        product_width_cm,
        rdl_weight_category as weight_category,
        rdl_size_category as size_category
    FROM rdl.products
    WHERE rdl_is_valid = true
"""
dim_product = pd.read_sql(query_product, engine)
dim_product.to_sql('dim_product', engine, if_exists='append', index=False, method='multi')
print(f"✅ Caricati {len(dim_product):,} products")

# =============================================================================
# 4. CARICA DIM_SELLER
# =============================================================================
print("\n📌 DIM_SELLER...")
query_seller = """
    SELECT DISTINCT
        seller_id,
        seller_city,
        seller_state,
        seller_zip_code_prefix::varchar as seller_zip_code_prefix,
        rdl_latitude as seller_latitude,
        rdl_longitude as seller_longitude
    FROM rdl.sellers
    WHERE rdl_is_valid = true
"""
dim_seller = pd.read_sql(query_seller, engine)

# Aggiungi geography_key
dim_seller = dim_seller.merge(
    dim_geography_keys,
    left_on='seller_zip_code_prefix',
    right_on='zip_code_prefix',
    how='left'
)
dim_seller = dim_seller.drop('zip_code_prefix', axis=1)

# Carica in DW
dim_seller.to_sql('dim_seller', engine, if_exists='append', index=False, method='multi')
print(f"✅ Caricati {len(dim_seller):,} sellers")

# =============================================================================
# 5. CARICA DIM_TIME
# =============================================================================
print("\n📌 DIM_TIME...")
query_time = """
    SELECT DISTINCT
        DATE(order_purchase_timestamp) as full_date
    FROM rdl.orders
    WHERE order_purchase_timestamp IS NOT NULL
"""
dates_df = pd.read_sql(query_time, engine)

# Crea attributi temporali
dim_time = pd.DataFrame()
dim_time['full_date'] = pd.to_datetime(dates_df['full_date'])
dim_time['day_of_week'] = dim_time['full_date'].dt.dayofweek
dim_time['day_name'] = dim_time['full_date'].dt.day_name()
dim_time['day_of_month'] = dim_time['full_date'].dt.day
dim_time['month_num'] = dim_time['full_date'].dt.month
dim_time['month_name'] = dim_time['full_date'].dt.month_name()
dim_time['quarter'] = dim_time['full_date'].dt.quarter
dim_time['year'] = dim_time['full_date'].dt.year
dim_time['is_weekend'] = dim_time['day_of_week'].isin([5, 6])

# Aggiungi stagione brasiliana
def get_season_brazil(month):
    if month in [12, 1, 2]:
        return 'Verão'
    elif month in [3, 4, 5]:
        return 'Outono'
    elif month in [6, 7, 8]:
        return 'Inverno'
    else:
        return 'Primavera'

dim_time['season_brazil'] = dim_time['month_num'].apply(get_season_brazil)

# Carica in DW
dim_time.to_sql('dim_time', engine, if_exists='append', index=False, method='multi')
print(f"✅ Caricati {len(dim_time):,} giorni")

# =============================================================================
# 6. CARICA DIM_PAYMENT
# =============================================================================
print("\n📌 DIM_PAYMENT...")
query_payment = """
    SELECT DISTINCT
        payment_type,
        payment_installments,
        CASE 
            WHEN payment_installments = 1 THEN 'Cash/Single Payment'
            WHEN payment_installments <= 6 THEN 'Short Term'
            WHEN payment_installments <= 12 THEN 'Medium Term'
            ELSE 'Long Term'
        END as installment_category
    FROM rdl.payments
    WHERE payment_type IS NOT NULL
"""
dim_payment = pd.read_sql(query_payment, engine)
dim_payment.to_sql('dim_payment', engine, if_exists='append', index=False, method='multi')
print(f"✅ Caricati {len(dim_payment):,} metodi di pagamento")

# =============================================================================
# 7. CARICA FACT_SALES
# =============================================================================
print("\n📌 FACT_SALES...")

# METODO OTTIMIZZATO: Processa in chunk
print("   🔑 Caricamento chiavi dimensioni...")
dim_customer_keys = pd.read_sql("SELECT customer_key, customer_id, customer_state FROM dim_customer", engine)
dim_product_keys = pd.read_sql("SELECT product_key, product_id FROM dim_product", engine)
dim_seller_keys = pd.read_sql("SELECT seller_key, seller_id, seller_state FROM dim_seller", engine)
dim_time_keys = pd.read_sql("SELECT time_key, full_date FROM dim_time", engine)
dim_payment_keys = pd.read_sql("SELECT payment_key, payment_type, payment_installments FROM dim_payment", engine)

# Conta totale record da processare
with engine.connect() as conn:
    total_facts = conn.execute(text("""
        SELECT COUNT(*) 
        FROM rdl.orders o
        JOIN rdl.order_items oi ON o.order_id = oi.order_id
        WHERE o.order_status = 'delivered' AND o.rdl_is_valid = true
    """)).scalar()
    print(f"   Record da processare: {total_facts:,}")

# Processa in chunk usando LIMIT/OFFSET
chunk_size = 25000
processed = 0

for offset in range(0, total_facts, chunk_size):
    # Query con LIMIT/OFFSET
    query_facts = f"""
        SELECT 
            o.order_id,
            oi.order_item_id,
            o.customer_id,
            oi.product_id,
            oi.seller_id,
            DATE(o.order_purchase_timestamp) as order_date,
            p.payment_type,
            p.payment_installments,
            oi.price,
            oi.freight_value,
            1 as quantity,
            p.payment_value,
            r.review_score,
            oi.rdl_total_item_value as total_item_value
        FROM rdl.orders o
        JOIN rdl.order_items oi ON o.order_id = oi.order_id
        JOIN rdl.payments p ON o.order_id = p.order_id
        LEFT JOIN rdl.reviews r ON o.order_id = r.order_id
        WHERE o.order_status = 'delivered'
            AND o.rdl_is_valid = true
            AND oi.rdl_is_valid = true
        LIMIT {chunk_size} OFFSET {offset}
    """
    
    # Carica chunk
    chunk_df = pd.read_sql(query_facts, engine)
    
    if len(chunk_df) == 0:
        break
    
    # Join con chiavi
    chunk_df = chunk_df.merge(dim_customer_keys, on='customer_id', how='inner')
    chunk_df = chunk_df.merge(dim_product_keys, on='product_id', how='inner')
    chunk_df = chunk_df.merge(dim_seller_keys, on='seller_id', how='inner')
    chunk_df = chunk_df.merge(dim_time_keys, left_on='order_date', right_on='full_date', how='inner')
    chunk_df = chunk_df.merge(dim_payment_keys, on=['payment_type', 'payment_installments'], how='inner')
    
    # Calcola metriche
    chunk_df['is_cross_state_sale'] = chunk_df['customer_state'] != chunk_df['seller_state']
    chunk_df['shipping_type'] = chunk_df.apply(
        lambda x: 'Local' if x['customer_state'] == x['seller_state'] else 'Interstate',
        axis=1
    )
    chunk_df['freight_percentage'] = (chunk_df['freight_value'] / chunk_df['price'].replace(0, 0.01) * 100).round(2).clip(upper=999.99)
    chunk_df['net_revenue'] = chunk_df['price'] - chunk_df['freight_value']
    chunk_df['customer_seller_distance_km'] = None
    
    # Seleziona colonne finali
    columns_final = [
        'customer_key', 'product_key', 'time_key', 'seller_key', 'payment_key',
        'order_id', 'order_item_id',
        'price', 'freight_value', 'quantity', 'payment_value', 'review_score',
        'total_item_value', 'customer_seller_distance_km', 'is_cross_state_sale',
        'shipping_type', 'freight_percentage', 'net_revenue'
    ]
    
    chunk_final = chunk_df[columns_final]
    
    # Carica chunk
    chunk_final.to_sql('fact_sales', engine, if_exists='append', index=False, method='multi')
    
    processed += len(chunk_df)
    print(f"   ✓ Processati {processed:,}/{total_facts:,} record ({processed/total_facts*100:.1f}%)")
    
    # Libera memoria
    del chunk_df
    del chunk_final

print(f"✅ Caricati {processed:,} record in fact_sales")

# =============================================================================
# VERIFICA FINALE FASE 3
# =============================================================================
print("\n📊 VERIFICA CARICAMENTO DW:")
print("-"*60)

with engine.connect() as conn:
    tables = [
        ('dim_geography', 'ZIP codes'),
        ('dim_customer', 'clienti'),
        ('dim_product', 'prodotti'),
        ('dim_seller', 'venditori'),
        ('dim_time', 'date'),
        ('dim_payment', 'metodi pagamento'),
        ('fact_sales', 'vendite')
    ]
    
    total = 0
    for table, desc in tables:
        count = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar()
        print(f"{table}: {count:,} {desc}")
        total += count
    
    print("-"*60)
    print(f"TOTALE RECORD NEL DW: {total:,}")

print("\n✅ FASE 3 completata con successo!")


🚀 FASE 3: TRANSFORM & LOAD VERSO DW
------------------------------------------------------------
🧹 Pulizia tabelle DW...
✅ Tabelle DW pulite

📌 DIM_GEOGRAPHY...
   ZIP codes da caricare: 19,015
✅ Caricati 19,015 ZIP codes
   Chiavi caricate: 19,015

📌 DIM_CUSTOMER...
✅ Caricati 99,441 customers

📌 DIM_PRODUCT...
✅ Caricati 32,951 products

📌 DIM_SELLER...
✅ Caricati 3,095 sellers

📌 DIM_TIME...
✅ Caricati 634 giorni

📌 DIM_PAYMENT...
✅ Caricati 28 metodi di pagamento

📌 FACT_SALES...
   🔑 Caricamento chiavi dimensioni...
   Record da processare: 110,197
   ✓ Processati 25,000/110,197 record (22.7%)
   ✓ Processati 50,000/110,197 record (45.4%)
   ✓ Processati 75,000/110,197 record (68.1%)
   ✓ Processati 100,000/110,197 record (90.7%)
   ✓ Processati 115,358/110,197 record (104.7%)
✅ Caricati 115,358 record in fact_sales

📊 VERIFICA CARICAMENTO DW:
------------------------------------------------------------
dim_geography: 19,015 ZIP codes
dim_customer: 99,441 clienti
dim_product: 32,

In [8]:
# =============================================================================
# FUNZIONE PER REGISTRARE METRICHE QUALITÀ
# =============================================================================
def log_data_quality_metrics(engine):
    """Registra le metriche di qualità dei dati in RDL"""
    print("\n📊 Registrazione metriche qualità...")
    
    with engine.connect() as conn:
        # Metriche per ogni tabella RDL (esclusa geolocation che non ha rdl_is_valid)
        tables_to_check = [
            'orders', 'customers', 'products', 'order_items', 
            'payments', 'reviews', 'sellers'  # RIMOSSA 'geolocation'
        ]
        
        for table in tables_to_check:
            try:
                # Inizia una nuova transazione per ogni tabella
                trans = conn.begin()
                
                # Conta record totali e validi
                query = f"""
                    SELECT 
                        COUNT(*) as total,
                        SUM(CASE WHEN rdl_is_valid = true THEN 1 ELSE 0 END) as valid,
                        SUM(CASE WHEN rdl_is_valid = false THEN 1 ELSE 0 END) as invalid
                    FROM rdl.{table}
                """
                result = conn.execute(text(query)).fetchone()
                
                if result and result[0] > 0:
                    quality_score = (result[1] / result[0]) * 100 if result[0] > 0 else 0
                    
                    # Inserisci in data_quality_log
                    insert_query = """
                        INSERT INTO rdl.data_quality_log 
                        (table_name, check_type, total_records, valid_records, 
                         invalid_records, quality_score, error_details)
                        VALUES (:table, 'validation', :total, :valid, :invalid, :score, :details)
                    """
                    
                    conn.execute(text(insert_query), {
                        'table': table,
                        'total': int(result[0]),
                        'valid': int(result[1]),
                        'invalid': int(result[2]),
                        'score': float(quality_score),
                        'details': '{"source": "main_etl_pipeline"}'
                    })
                
                trans.commit()
                    
            except Exception as e:
                print(f"   ⚠️ Errore metriche {table}: {e}")
                if trans:
                    trans.rollback()
        
        # Metrica speciale per geolocation (senza rdl_is_valid)
        try:
            trans = conn.begin()
            
            geo_query = """
                SELECT COUNT(*) as total,
                       COUNT(DISTINCT geolocation_zip_code_prefix) as unique_zips
                FROM rdl.geolocation
            """
            result = conn.execute(text(geo_query)).fetchone()
            
            if result and result[0] > 0:
                # Per geolocation consideriamo tutti validi
                insert_query = """
                    INSERT INTO rdl.data_quality_log 
                    (table_name, check_type, total_records, valid_records, 
                     invalid_records, quality_score, error_details)
                    VALUES ('geolocation', 'validation', :total, :valid, 0, 100.0, 
                            :details)
                """
                
                conn.execute(text(insert_query), {
                    'total': int(result[0]),
                    'valid': int(result[0]),  # Tutti considerati validi
                    'details': f'{{"unique_zips": {result[1]}, "source": "main_etl_pipeline"}}'
                })
            
            trans.commit()
            
        except Exception as e:
            print(f"   ⚠️ Errore metriche geolocation: {e}")
            if trans:
                trans.rollback()
        
        # Completezza indirizzi customers
        try:
            trans = conn.begin()
            
            completeness_query = """
                INSERT INTO rdl.data_quality_log 
                (table_name, check_type, total_records, valid_records, invalid_records, quality_score)
                SELECT 
                    'customers',
                    'completeness',
                    COUNT(*),
                    SUM(CASE WHEN customer_city IS NOT NULL 
                        AND customer_state IS NOT NULL 
                        AND customer_zip_code_prefix IS NOT NULL 
                        THEN 1 ELSE 0 END),
                    SUM(CASE WHEN customer_city IS NULL 
                        OR customer_state IS NULL 
                        OR customer_zip_code_prefix IS NULL 
                        THEN 1 ELSE 0 END),
                    AVG(CASE WHEN customer_city IS NOT NULL 
                        AND customer_state IS NOT NULL 
                        AND customer_zip_code_prefix IS NOT NULL 
                        THEN 100.0 ELSE 0.0 END)
                FROM rdl.customers
            """
            conn.execute(text(completeness_query))
            trans.commit()
            
        except Exception as e:
            print(f"   ⚠️ Errore completeness customers: {e}")
            if trans:
                trans.rollback()
        
        # Accuratezza date orders
        try:
            trans = conn.begin()
            
            accuracy_query = """
                INSERT INTO rdl.data_quality_log 
                (table_name, check_type, total_records, valid_records, invalid_records, quality_score)
                SELECT 
                    'orders',
                    'accuracy',
                    COUNT(*),
                    SUM(CASE WHEN order_delivered_customer_date >= order_purchase_timestamp 
                        OR order_delivered_customer_date IS NULL THEN 1 ELSE 0 END),
                    SUM(CASE WHEN order_delivered_customer_date < order_purchase_timestamp THEN 1 ELSE 0 END),
                    AVG(CASE WHEN order_delivered_customer_date >= order_purchase_timestamp 
                        OR order_delivered_customer_date IS NULL THEN 100.0 ELSE 0.0 END)
                FROM rdl.orders
            """
            conn.execute(text(accuracy_query))
            trans.commit()
            
        except Exception as e:
            print(f"   ⚠️ Errore accuracy orders: {e}")
            if trans:
                trans.rollback()
        
        # Consistenza order_items
        try:
            trans = conn.begin()
            
            consistency_query = """
                INSERT INTO rdl.data_quality_log 
                (table_name, check_type, total_records, valid_records, invalid_records, quality_score)
                SELECT 
                    'order_items',
                    'consistency',
                    COUNT(*),
                    SUM(CASE WHEN oi.order_id IN (SELECT order_id FROM rdl.orders) THEN 1 ELSE 0 END),
                    SUM(CASE WHEN oi.order_id NOT IN (SELECT order_id FROM rdl.orders) THEN 1 ELSE 0 END),
                    AVG(CASE WHEN oi.order_id IN (SELECT order_id FROM rdl.orders) THEN 100.0 ELSE 0.0 END)
                FROM rdl.order_items oi
            """
            conn.execute(text(consistency_query))
            trans.commit()
            
        except Exception as e:
            print(f"   ⚠️ Errore consistency order_items: {e}")
            if trans:
                trans.rollback()
        
        print("✅ Metriche qualità registrate")
        
# Pulisci la tabella data_quality_log prima di riprovare
with engine.connect() as conn:
    conn.execute(text("DELETE FROM rdl.data_quality_log"))
    conn.commit()
    print("✅ Tabella data_quality_log pulita")

# Poi richiama la funzione corretta
log_data_quality_metrics(engine)

✅ Tabella data_quality_log pulita

📊 Registrazione metriche qualità...
✅ Metriche qualità registrate


In [9]:
# =============================================================================
# FASE 4: VERIFICA FINALE E STATISTICHE
# =============================================================================
print("\n📈 FASE 4: VERIFICA FINALE")
print("-"*60)

# Confronto qualità: Source → RDL → DW
with engine.connect() as conn:
    # Conteggi originali
    source_orders = len(source_data['orders'])
    source_customers = len(source_data['customers'])
    
    # Conteggi RDL
    rdl_valid_orders = conn.execute(text(
        "SELECT COUNT(*) FROM rdl.orders WHERE rdl_is_valid = true"
    )).scalar()
    rdl_valid_customers = conn.execute(text(
        "SELECT COUNT(*) FROM rdl.customers WHERE rdl_is_valid = true"
    )).scalar()
    
    # Conteggi DW
    dw_orders = conn.execute(text("SELECT COUNT(DISTINCT order_id) FROM fact_sales")).scalar()
    dw_customers = conn.execute(text("SELECT COUNT(*) FROM dim_customer")).scalar()

print("📊 STATISTICHE FLUSSO DATI:")
print(f"Orders:    Source: {source_orders:,} → RDL Valid: {rdl_valid_orders:,} → DW: {dw_orders:,}")
print(f"Customers: Source: {source_customers:,} → RDL Valid: {rdl_valid_customers:,} → DW: {dw_customers:,}")

# Quality metrics
print("\n🏆 METRICHE QUALITÀ:")
with engine.connect() as conn:
    # Prima verifica se ci sono dati
    count_check = conn.execute(text(
        "SELECT COUNT(*) FROM rdl.data_quality_log"
    )).scalar()
    
    if count_check > 0:
        # Media generale
        quality_metrics = conn.execute(text("""
            SELECT 
                AVG(quality_score) as avg_quality,
                MIN(quality_score) as min_quality,
                MAX(quality_score) as max_quality,
                COUNT(DISTINCT table_name) as tables_checked
            FROM rdl.data_quality_log
            WHERE check_timestamp >= CURRENT_DATE - INTERVAL '1 day'
        """)).fetchone()
        
        if quality_metrics[0]:
            print(f"Qualità media RDL: {quality_metrics[0]:.1f}%")
            print(f"Qualità minima: {quality_metrics[1]:.1f}%")
            print(f"Qualità massima: {quality_metrics[2]:.1f}%")
            print(f"Tabelle verificate: {quality_metrics[3]}")
            
            # Dettaglio per tipo di check
            print("\nDettaglio per tipo di controllo:")
            detail_query = """
                SELECT 
                    check_type,
                    COUNT(*) as num_checks,
                    AVG(quality_score) as avg_score
                FROM rdl.data_quality_log
                WHERE check_timestamp >= CURRENT_DATE - INTERVAL '1 day'
                GROUP BY check_type
                ORDER BY check_type
            """
            details = conn.execute(text(detail_query)).fetchall()
            for check_type, num_checks, avg_score in details:
                print(f"  - {check_type}: {avg_score:.1f}% (su {num_checks} controlli)")
            
            # Top 3 migliori e peggiori
            print("\nTop 3 tabelle per qualità:")
            top_query = """
                SELECT table_name, AVG(quality_score) as avg_score
                FROM rdl.data_quality_log
                GROUP BY table_name
                ORDER BY avg_score DESC
                LIMIT 3
            """
            top_tables = conn.execute(text(top_query)).fetchall()
            for table, score in top_tables:
                print(f"  ✅ {table}: {score:.1f}%")
                
    else:
        print("⚠️ Nessuna metrica di qualità registrata.")
        print("Le metriche verranno raccolte nei prossimi run del processo ETL.")


📈 FASE 4: VERIFICA FINALE
------------------------------------------------------------
📊 STATISTICHE FLUSSO DATI:
Orders:    Source: 99,441 → RDL Valid: 99,441 → DW: 96,303
Customers: Source: 99,441 → RDL Valid: 99,441 → DW: 99,441

🏆 METRICHE QUALITÀ:
Qualità media RDL: 100.0%
Qualità minima: 100.0%
Qualità massima: 100.0%
Tabelle verificate: 8

Dettaglio per tipo di controllo:
  - accuracy: 100.0% (su 1 controlli)
  - completeness: 100.0% (su 1 controlli)
  - consistency: 100.0% (su 1 controlli)
  - validation: 100.0% (su 8 controlli)

Top 3 tabelle per qualità:
  ✅ geolocation: 100.0%
  ✅ sellers: 100.0%
  ✅ order_items: 100.0%


In [10]:
# =============================================================================
# FASE 5: CREAZIONE VISTE MATERIALIZZATE
# =============================================================================
print("\n🔨 FASE 5: CREAZIONE VISTE MATERIALIZZATE")
print("-"*60)

# Le tue viste esistenti più alcune nuove per RDL
with engine.connect() as conn:
    # Vista audit trail
    conn.execute(text("""
        CREATE MATERIALIZED VIEW IF NOT EXISTS mv_etl_audit AS
        SELECT 
            process_name,
            process_type,
            start_timestamp,
            end_timestamp,
            status,
            records_processed,
            records_rejected,
            EXTRACT(EPOCH FROM (end_timestamp - start_timestamp)) as duration_seconds
        FROM rdl.etl_process_log
        ORDER BY start_timestamp DESC
    """))
    
    print("✅ Vista audit trail creata")
    
    # Refresh viste esistenti
    conn.execute(text("REFRESH MATERIALIZED VIEW mv_geographic_sales"))
    conn.execute(text("REFRESH MATERIALIZED VIEW mv_category_performance"))
    print("✅ Viste DW refreshed")
    
    conn.commit()


🔨 FASE 5: CREAZIONE VISTE MATERIALIZZATE
------------------------------------------------------------
✅ Vista audit trail creata
✅ Viste DW refreshed


In [13]:
# =============================================================================
# CONCLUSIONE
# =============================================================================
print("\n" + "="*60)
print("🎉 PIPELINE ETL 3-LAYER COMPLETATA CON SUCCESSO!")
print("="*60)

print("\n📋 RIEPILOGO ARCHITETTURA:")
print("1️⃣ SOURCE LAYER: File CSV grezzi")
print("2️⃣ RDL LAYER: Dati validati, arricchiti, con quality scores")
print("3️⃣ DW LAYER: Star schema pronto per analisi OLAP")


🎉 PIPELINE ETL 3-LAYER COMPLETATA CON SUCCESSO!

📋 RIEPILOGO ARCHITETTURA:
1️⃣ SOURCE LAYER: File CSV grezzi
2️⃣ RDL LAYER: Dati validati, arricchiti, con quality scores
3️⃣ DW LAYER: Star schema pronto per analisi OLAP


In [12]:
print("🔍 VERIFICA COMPLETAMENTO ETL - VERSIONE COMPLETA")
print("="*60)

# 1. Verifica schema RDL (include geolocation)
print("\n📊 SCHEMA RDL:")
rdl_query = """
SELECT table_name, record_count
FROM (
    SELECT 'orders' as table_name, COUNT(*) as record_count FROM rdl.orders
    UNION ALL SELECT 'customers', COUNT(*) FROM rdl.customers
    UNION ALL SELECT 'products', COUNT(*) FROM rdl.products
    UNION ALL SELECT 'order_items', COUNT(*) FROM rdl.order_items
    UNION ALL SELECT 'payments', COUNT(*) FROM rdl.payments
    UNION ALL SELECT 'reviews', COUNT(*) FROM rdl.reviews
    UNION ALL SELECT 'sellers', COUNT(*) FROM rdl.sellers
    UNION ALL SELECT 'geolocation', COUNT(*) FROM rdl.geolocation  -- AGGIUNTA
) t
ORDER BY table_name
"""
rdl_counts = pd.read_sql(rdl_query, engine)
print(rdl_counts.to_string(index=False))
print(f"\nTotale record RDL: {rdl_counts['record_count'].sum():,}")

# 2. Verifica schema DW (include dim_geography)
print("\n📊 SCHEMA DATA WAREHOUSE:")
dw_query = """
SELECT table_name, record_count
FROM (
    SELECT 'dim_geography' as table_name, COUNT(*) as record_count FROM dim_geography  -- AGGIUNTA
    UNION ALL SELECT 'dim_customer', COUNT(*) FROM dim_customer
    UNION ALL SELECT 'dim_product', COUNT(*) FROM dim_product
    UNION ALL SELECT 'dim_seller', COUNT(*) FROM dim_seller
    UNION ALL SELECT 'dim_time', COUNT(*) FROM dim_time
    UNION ALL SELECT 'dim_payment', COUNT(*) FROM dim_payment
    UNION ALL SELECT 'fact_sales', COUNT(*) FROM fact_sales
) t
ORDER BY table_name
"""
dw_counts = pd.read_sql(dw_query, engine)
print(dw_counts.to_string(index=False))
print(f"\nTotale record DW: {dw_counts['record_count'].sum():,}")

# 3. Verifica specifica geography/geolocation
print("\n📍 VERIFICA GEOGRAPHY:")
geo_check = """
SELECT 
    'RDL Geolocation' as description,
    COUNT(*) as total_records,
    COUNT(DISTINCT geolocation_zip_code_prefix) as unique_values
FROM rdl.geolocation
UNION ALL
SELECT 
    'DW Geography',
    COUNT(*),
    COUNT(DISTINCT zip_code_prefix)
FROM dim_geography
"""
geo_result = pd.read_sql(geo_check, engine)
print(geo_result.to_string(index=False))

# 4. Test integrità
print("\n🔗 TEST INTEGRITÀ:")
integrity_check = """
SELECT 
    'Customers con geography' as test,
    COUNT(*) as count,
    ROUND(100.0 * COUNT(*) / (SELECT COUNT(*) FROM dim_customer), 2) as percentage
FROM dim_customer
WHERE geography_key IS NOT NULL
UNION ALL
SELECT 
    'Sellers con geography',
    COUNT(*),
    ROUND(100.0 * COUNT(*) / (SELECT COUNT(*) FROM dim_seller), 2)
FROM dim_seller
WHERE geography_key IS NOT NULL
"""
integrity = pd.read_sql(integrity_check, engine)
print(integrity.to_string(index=False))

# 5. Status finale
print("\n✅ VERIFICA COMPLETATA!")
print("\n📊 VALORI ATTESI:")
print("   - rdl.geolocation: ~720,000 record")
print("   - dim_geography: ~19,000 record (ZIP unici)")
print("   - Customers/Sellers con geography_key: ~100%")

🔍 VERIFICA COMPLETAMENTO ETL - VERSIONE COMPLETA

📊 SCHEMA RDL:
 table_name  record_count
  customers         99441
geolocation        720148
order_items        112650
     orders         99441
   payments        103886
   products         32951
    reviews         98410
    sellers          3095

Totale record RDL: 1,270,022

📊 SCHEMA DATA WAREHOUSE:
   table_name  record_count
 dim_customer         99441
dim_geography         19015
  dim_payment            28
  dim_product         32951
   dim_seller          3095
     dim_time           634
   fact_sales        115358

Totale record DW: 270,522

📍 VERIFICA GEOGRAPHY:
    description  total_records  unique_values
RDL Geolocation         720148          19015
   DW Geography          19015          19015

🔗 TEST INTEGRITÀ:
                   test  count  percentage
Customers con geography  99163       99.72
  Sellers con geography   3088       99.77

✅ VERIFICA COMPLETATA!

📊 VALORI ATTESI:
   - rdl.geolocation: ~720,000 record
   - d