In [8]:
# run_elt.py
import pandas as pd
import os
import sys
import logging
from sqlalchemy import text
from utils import get_engine
from etl.setup_elt import setup_elt_database

# --- IMPORT MODULE BARU ---
from etl.validator import validate_waste_data, validate_sipsn_data

# Import Warehouse Logic
from warehouse.dim_time import load_dim_time
from warehouse.dim_location import load_dim_location
from warehouse.dim_fleet import load_dim_fleet
from warehouse.fact_waste import load_fact_waste

# Setup Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def run_elt_pipeline():
    engine = get_engine()
    DATA_DIR = os.environ.get("WASTE_DATA_DIR", "./data")
    
    print("\nüöÄ MEMULAI PIPELINE ELT DENGAN VALIDASI")
    print("="*40)
    
    # 1. SETUP INFRASTRUKTUR
    setup_elt_database()
    
    # 2. EXTRACT & VALIDATE & LOAD
    print("\n[STEP 1] Extract, Validate & Load Raw Data...")
    
    try:
        # --- A. PROSES WASTE DATA ---
        waste_path = f"{DATA_DIR}/waste.csv"
        if os.path.exists(waste_path):
            # Baca CSV
            df_waste = pd.read_csv(waste_path, dtype=str) # Baca sebagai string dulu
            
            # üî• DATA QUALITY FIREWALL üî•
            if validate_waste_data(df_waste):
                # Jika LULUS, baru load ke DB
                with engine.begin() as conn:
                    conn.execute(text("TRUNCATE TABLE staging.raw_waste;"))
                
                df_waste.to_sql('raw_waste', engine, schema='staging', if_exists='append', index=False)
                print(f"   -> waste.csv loaded: {len(df_waste)} baris")
            else:
                # Jika GAGAL, stop pipeline atau skip file ini
                logger.error("‚õî Pipeline dihentikan karena Validasi Waste Data Gagal!")
                return # Stop total (Strict Mode)
        else:
            logger.error(f"File {waste_path} tidak ditemukan!")
            return

        # --- B. PROSES SIPSN DATA ---
        sipsn_path = f"{DATA_DIR}/sipsn.csv"
        if os.path.exists(sipsn_path):
            df_sipsn = pd.read_csv(sipsn_path, dtype=str)
            
            # üî• DATA QUALITY FIREWALL üî•
            if validate_sipsn_data(df_sipsn):
                with engine.begin() as conn:
                    conn.execute(text("TRUNCATE TABLE staging.raw_sipsn;"))
                    
                df_sipsn.to_sql('raw_sipsn', engine, schema='staging', if_exists='append', index=False)
                print(f"   -> sipsn.csv loaded: {len(df_sipsn)} baris")
            else:
                logger.error("‚õî Pipeline dihentikan karena Validasi SIPSN Data Gagal!")
                return
        else:
            logger.error(f"File {sipsn_path} tidak ditemukan!")
            return
        
    except Exception as e:
        logger.error(f"‚ùå Gagal pada tahap Extract/Load: {e}")
        return

    # 3. TRANSFORM & WAREHOUSE LOADING
    print("\n[STEP 2] Warehouse Loading (Via SQL Views)...")
    
    try:
        load_dim_time()
        print("   ‚úÖ Dim Time Loaded")
        
        load_dim_location()
        print("   ‚úÖ Dim Location Loaded")
        
        load_dim_fleet()
        print("   ‚úÖ Dim Fleet Loaded")
        
        load_fact_waste()
        print("   ‚úÖ Fact Waste Loaded")
        
    except Exception as e:
        logger.error(f"‚ùå Gagal Warehouse Load: {e}")
        import traceback
        traceback.print_exc()
        return
    
    print("\nüéâ Pipeline Selesai! Data bersih dan tervalidasi siap digunakan.")

if __name__ == "__main__":
    run_elt_pipeline()


üöÄ MEMULAI PIPELINE ELT DENGAN VALIDASI
üõ†Ô∏è  Menyiapkan Struktur Database (Schema, Tables, Views)...
‚úÖ Setup Database ELT Selesai.

[STEP 1] Extract, Validate & Load Raw Data...
   üõ°Ô∏è  Menjalankan Validasi Waste Data...
   ‚úÖ Validasi Waste Data Lulus.
   -> waste.csv loaded: 4004 baris
   üõ°Ô∏è  Menjalankan Validasi SIPSN Data...
   ‚úÖ Validasi SIPSN Data Lulus.
   -> sipsn.csv loaded: 44 baris

[STEP 2] Warehouse Loading (Via SQL Views)...
   ‚úÖ Dim Time Loaded
   ‚úÖ Dim Location Loaded
   ‚úÖ Dim Fleet Loaded
   ‚úÖ Fact Waste Loaded

üéâ Pipeline Selesai! Data bersih dan tervalidasi siap digunakan.
