# **EXTRACT, LOAD, & TRANSFORM**

In [3]:
%pip install mysql-connector-python

Collecting mysql-connector-python
  Downloading mysql_connector_python-9.4.0-cp39-cp39-win_amd64.whl.metadata (7.5 kB)
Downloading mysql_connector_python-9.4.0-cp39-cp39-win_amd64.whl (16.4 MB)
   ---------------------------------------- 0.0/16.4 MB ? eta -:--:--
    --------------------------------------- 0.3/16.4 MB ? eta -:--:--
   - -------------------------------------- 0.5/16.4 MB 1.5 MB/s eta 0:00:11
   - -------------------------------------- 0.8/16.4 MB 1.3 MB/s eta 0:00:12
   -- ------------------------------------- 1.0/16.4 MB 1.3 MB/s eta 0:00:12
   --- ------------------------------------ 1.3/16.4 MB 1.3 MB/s eta 0:00:12
   --- ------------------------------------ 1.6/16.4 MB 1.3 MB/s eta 0:00:12
   ---- ----------------------------------- 1.8/16.4 MB 1.3 MB/s eta 0:00:12
   ---- ----------------------------------- 1.8/16.4 MB 1.3 MB/s eta 0:00:12
   ----- ---------------------------------- 2.1/16.4 MB 1.2 MB/s eta 0:00:12
   ----- ---------------------------------- 2.4/16

In [1]:
import pandas as pd
from sqlalchemy import create_engine, text
import os
import time

In [2]:
DB_USER = 'root'
DB_PASSWORD = ''  
DB_HOST = 'localhost'
DB_PORT = '3306'
DB_NAME = 'olist_elt_db'

DATASET_DIR = '../dataset/'  

FILES_MAPPING = {
    'olist_customers_dataset.csv': 'raw_customers',
    'olist_geolocation_dataset.csv': 'raw_geolocation',
    'olist_order_items_dataset.csv': 'raw_order_items',
    'olist_order_payments_dataset.csv': 'raw_order_payments',
    'olist_order_reviews_dataset.csv': 'raw_order_reviews',
    'olist_orders_dataset.csv': 'raw_orders',
    'olist_products_dataset.csv': 'raw_products',
    'olist_sellers_dataset.csv': 'raw_sellers',
    'product_category_name_translation.csv': 'raw_category_translation',
    'brazil.inflation.monthly (statbureau.org).csv': 'raw_brazil_inflation'
}

In [5]:
def init_db_connection():
    str_conn_root = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}"
    engine_root = create_engine(str_conn_root)
    with engine_root.connect() as conn:
        conn.execute(text(f"CREATE DATABASE IF NOT EXISTS {DB_NAME}"))
    
    return create_engine(f"{str_conn_root}/{DB_NAME}")

def load_raw_data():
    engine = init_db_connection()
    print("\n--- MULAI PROSES ELT: PHASE 1 (INGEST RAW DATA) ---")

    for csv_file, table_name in FILES_MAPPING.items():
        file_path = os.path.join(DATASET_DIR, csv_file)
        
        if not os.path.exists(file_path):
            print(f"‚ö†Ô∏è  SKIP: File {csv_file} tidak ditemukan.")
            continue
            
        print(f"üìÇ Processing: {csv_file} -> {table_name}")
        
        try:
            df = pd.read_csv(file_path, dtype=str)
            df.columns = [c.strip().lower() for c in df.columns]
            df.to_sql(name=table_name, con=engine, if_exists='replace', index=False, chunksize=5000)
            print(f"   ‚úÖ Sukses! ({len(df)} baris)")
            
        except Exception as e:
            print(f"   ‚ùå GAGAL: {e}")

if __name__ == "__main__":
    load_raw_data()


--- MULAI PROSES ELT: PHASE 1 (INGEST RAW DATA) ---
üìÇ Processing: olist_customers_dataset.csv -> raw_customers
   ‚úÖ Sukses! (99441 baris)
üìÇ Processing: olist_geolocation_dataset.csv -> raw_geolocation
   ‚úÖ Sukses! (1000163 baris)
üìÇ Processing: olist_order_items_dataset.csv -> raw_order_items
   ‚úÖ Sukses! (112650 baris)
üìÇ Processing: olist_order_payments_dataset.csv -> raw_order_payments
   ‚úÖ Sukses! (103886 baris)
üìÇ Processing: olist_order_reviews_dataset.csv -> raw_order_reviews
   ‚úÖ Sukses! (99224 baris)
üìÇ Processing: olist_orders_dataset.csv -> raw_orders
   ‚úÖ Sukses! (99441 baris)
üìÇ Processing: olist_products_dataset.csv -> raw_products
   ‚úÖ Sukses! (32951 baris)
üìÇ Processing: olist_sellers_dataset.csv -> raw_sellers
   ‚úÖ Sukses! (3095 baris)
üìÇ Processing: product_category_name_translation.csv -> raw_category_translation
   ‚úÖ Sukses! (71 baris)
üìÇ Processing: brazil.inflation.monthly (statbureau.org).csv -> raw_brazil_inflation
   ‚úÖ

In [6]:
connection_str = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(connection_str)

TRANSFORM_QUERIES = [
    {
        "name": "1. Membuat Dimensi Customers",
        "sql": """
            CREATE TABLE IF NOT EXISTS dim_customers AS
            SELECT customer_id, customer_unique_id, customer_zip_code_prefix, customer_city, customer_state
            FROM raw_customers;
        """
    },
    {
        "name": "2. Membuat Dimensi Products",
        "sql": """
            CREATE TABLE IF NOT EXISTS dim_products AS
            SELECT
                p.product_id,
                COALESCE(t.product_category_name_english, p.product_category_name, 'Unknown') as category_name,
                p.product_weight_g, p.product_length_cm, p.product_height_cm, p.product_width_cm
            FROM raw_products p
            LEFT JOIN raw_category_translation t ON p.product_category_name = t.product_category_name;
        """
    },
    {
        "name": "3. Membuat Fact Sales",
        "sql": """
            CREATE TABLE IF NOT EXISTS fact_sales AS
            SELECT
                o.order_id, o.customer_id, oi.product_id, oi.seller_id, o.order_status,
                CAST(NULLIF(o.order_purchase_timestamp, '') AS DATETIME) as purchase_date,
                CAST(NULLIF(o.order_approved_at, '') AS DATETIME) as approved_date,
                CAST(NULLIF(o.order_delivered_carrier_date, '') AS DATETIME) as carrier_date,
                CAST(NULLIF(o.order_delivered_customer_date, '') AS DATETIME) as delivered_date,
                CAST(NULLIF(o.order_estimated_delivery_date, '') AS DATETIME) as estimated_date,
                CAST(oi.price AS DECIMAL(10,2)) as price,
                CAST(oi.freight_value AS DECIMAL(10,2)) as freight_value
            FROM raw_orders o
            JOIN raw_order_items oi ON o.order_id = oi.order_id;
        """
    }
]

def run_transformations():
    print("\n--- MULAI PROSES ELT: PHASE 2 (TRANSFORM WAREHOUSE - FIX DATE) ---")
    with engine.connect() as conn:
        for task in TRANSFORM_QUERIES:
            print(f"üîÑ Running: {task['name']}...")
            try:
                table_name = task['sql'].split("CREATE TABLE IF NOT EXISTS ")[1].split(" ")[0]
                conn.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
                conn.execute(text(task['sql']))
                print("   ‚úÖ Selesai!")
            except Exception as e:
                print(f"   ‚ùå GAGAL: {e}")

if __name__ == "__main__":
    run_transformations()


--- MULAI PROSES ELT: PHASE 2 (TRANSFORM WAREHOUSE - FIX DATE) ---
üîÑ Running: 1. Membuat Dimensi Customers...
   ‚úÖ Selesai!
üîÑ Running: 2. Membuat Dimensi Products...
   ‚úÖ Selesai!
üîÑ Running: 3. Membuat Fact Sales (Fix: Menggunakan CAST AS DATETIME)...
   ‚úÖ Selesai!


In [None]:
connection_str = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(connection_str)

CLEANING_QUERIES = [
    {
        "name": "1. Fix Referential Integrity",
        "desc": "Menghapus transaksi penjualan yang product_id-nya tidak ada di dimensi produk.",
        "sql": """
            DELETE f 
            FROM fact_sales f
            LEFT JOIN dim_products p ON f.product_id = p.product_id
            WHERE p.product_id IS NULL;
        """
    },
    {
        "name": "2. Fix Invalid Dates",
        "desc": "Menghapus transaksi yang tanggal pembeliannya gagal dikonversi (NULL).",
        "sql": """
            DELETE FROM fact_sales 
            WHERE purchase_date IS NULL;
        """
    },
    {
        "name": "3. Fix Negative Values",
        "desc": "Menghapus transaksi dengan harga atau ongkir < 0.",
        "sql": """
            DELETE FROM fact_sales 
            WHERE price < 0 OR freight_value < 0;
        """
    }
]

def run_data_cleaning():
    print("\nPROSES CLEANING DATA\n")
    
    with engine.connect() as conn:
        for task in CLEANING_QUERIES:
            print(f"üßπ Running: {task['name']}")
            print(f"   Desc   : {task['desc']}")
            
            try:
                result = conn.execute(text(task['sql']))    
                deleted_rows = result.rowcount 
                if deleted_rows > 0:
                    print(f"   ‚úÖ FIXED: {deleted_rows} baris data kotor telah dihapus.")
                else:
                    print(f"   ‚úÖ OK: Tidak ada data kotor yang ditemukan.")
                
                conn.commit()
                
            except Exception as e:
                print(f"   ‚ùå GAGAL: {e}")
            
            print("-" * 50)
            
    print("\n‚ú® PROSES CLEANING SELESAI. Silakan jalankan validasi (Step 4) lagi untuk verifikasi.")

if __name__ == "__main__":
    run_data_cleaning()


PROSES CLEANING DATA

üßπ Running: 1. Fix Referential Integrity
   Desc   : Menghapus transaksi penjualan yang product_id-nya tidak ada di dimensi produk.


# **VALIDATION**

In [3]:
connection_str = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(connection_str)

def run_data_quality_checks():
    print("\n=== MULAI ELT DATA QUALITY ASSURANCE (6 RULES) ===\n")
    
    validations = [
        {
            "rule": "1. Uniqueness Check",
            "desc": "Memastikan Primary Key (order_id) di Fact Sales unik",
            "sql": """
                SELECT COUNT(order_id) - COUNT(DISTINCT order_id) as duplicate_count 
                FROM fact_sales;
            """,
            "threshold": 0 
        },
        {
            "rule": "2. Null Check",
            "desc": "Memastikan tidak ada Revenue/Price yang NULL di Fact Sales",
            "sql": """
                SELECT COUNT(*) as null_count 
                FROM fact_sales 
                WHERE price IS NULL OR freight_value IS NULL;
            """,
            "threshold": 0
        },
        {
            "rule": "3. Range Check",
            "desc": "Memastikan tidak ada harga atau ongkir bernilai negatif",
            "sql": """
                SELECT COUNT(*) as negative_values 
                FROM fact_sales 
                WHERE price < 0 OR freight_value < 0;
            """,
            "threshold": 0
        },
        {
            "rule": "4. Datatype Consistency",
            "desc": "Memastikan konversi tanggal berhasil (Tidak ada tanggal '0000-00-00' atau NULL akibat salah format)",
            "sql": """
                SELECT COUNT(*) as invalid_dates 
                FROM fact_sales 
                WHERE purchase_date IS NULL;
            """,
            "threshold": 100 
        },
        {
            "rule": "5. Referential Integrity",
            "desc": "Memastikan semua Product ID di tabel Sales ada di tabel Dimensi Produk",
            "sql": """
                SELECT COUNT(*) as orphan_records
                FROM fact_sales f
                LEFT JOIN dim_products p ON f.product_id = p.product_id
                WHERE p.product_id IS NULL;
            """,
            "threshold": 0
        },
        {
            "rule": "6. Distribusi Data (Distribution Check)",
            "desc": "Memastikan Rate Inflasi berada dalam rentang wajar (-10% sampai 100%)",
            "sql": """
                SELECT COUNT(*) as outlier_count
                FROM dim_brazil_inflation
                WHERE inflation_rate < -10 OR inflation_rate > 100;
            """,
            "threshold": 0
        }
    ]


    with engine.connect() as conn:
        all_passed = True
        
        for v in validations:
            print(f"üîé Checking: {v['rule']}")
            print(f"   Context : {v['desc']}")
            
            try:
                result = conn.execute(text(v['sql'])).fetchone()[0]
                
  
                if result <= v['threshold']:
                    print(f"   ‚úÖ PASS (Result: {result} rows)")
                else:
                    print(f"   ‚ùå FAIL (Result: {result} rows found, Threshold allowed: {v['threshold']})")
                    all_passed = False
                    
            
            except Exception as e:
                print(f"   ‚ö†Ô∏è ERROR execution: {e}")
                all_passed = False
            
            print("-" * 50)

        if all_passed:
            print("\nüéâ SELURUH DATA QUALITY CHECK BERHASIL! Data Warehouse Siap Digunakan.")
        else:
            print("\n‚ö†Ô∏è ADA VALIDASI YANG GAGAL. Periksa laporan di atas.")

if __name__ == "__main__":
    run_data_quality_checks()


=== MULAI ELT DATA QUALITY ASSURANCE (6 RULES) ===

üîé Checking: 1. Uniqueness Check
   Context : Memastikan Primary Key (order_id) di Fact Sales unik
   ‚ùå FAIL (Result: 13984 rows found, Threshold allowed: 0)
--------------------------------------------------
üîé Checking: 2. Null Check
   Context : Memastikan tidak ada Revenue/Price yang NULL di Fact Sales
   ‚úÖ PASS (Result: 0 rows)
--------------------------------------------------
üîé Checking: 3. Range Check
   Context : Memastikan tidak ada harga atau ongkir bernilai negatif
   ‚úÖ PASS (Result: 0 rows)
--------------------------------------------------
üîé Checking: 4. Datatype Consistency
   Context : Memastikan konversi tanggal berhasil (Tidak ada tanggal '0000-00-00' atau NULL akibat salah format)
   ‚úÖ PASS (Result: 0 rows)
--------------------------------------------------
üîé Checking: 5. Referential Integrity
   Context : Memastikan semua Product ID di tabel Sales ada di tabel Dimensi Produk
   ‚úÖ PASS (Resul

# **INFLATION ANALYSIS**

In [5]:
connection_str = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(connection_str)

INFLATION_QUERIES = [
    {
        "name": "1. Cleaning & Unpivot Data Inflasi",
        "desc": "Unpivot Data Inflasi (Subquery Method)",
        "sql": """
            CREATE TABLE IF NOT EXISTS dim_brazil_inflation AS
            SELECT * FROM (
                SELECT CAST(year AS UNSIGNED) as year, 1 as month, CAST(january AS DECIMAL(10,2)) as inflation_rate FROM raw_brazil_inflation
                UNION ALL
                SELECT CAST(year AS UNSIGNED), 2, CAST(february AS DECIMAL(10,2)) FROM raw_brazil_inflation
                UNION ALL
                SELECT CAST(year AS UNSIGNED), 3, CAST(march AS DECIMAL(10,2)) FROM raw_brazil_inflation
                UNION ALL
                SELECT CAST(year AS UNSIGNED), 4, CAST(april AS DECIMAL(10,2)) FROM raw_brazil_inflation
                UNION ALL
                SELECT CAST(year AS UNSIGNED), 5, CAST(may AS DECIMAL(10,2)) FROM raw_brazil_inflation
                UNION ALL
                SELECT CAST(year AS UNSIGNED), 6, CAST(june AS DECIMAL(10,2)) FROM raw_brazil_inflation
                UNION ALL
                SELECT CAST(year AS UNSIGNED), 7, CAST(july AS DECIMAL(10,2)) FROM raw_brazil_inflation
                UNION ALL
                SELECT CAST(year AS UNSIGNED), 8, CAST(august AS DECIMAL(10,2)) FROM raw_brazil_inflation
                UNION ALL
                SELECT CAST(year AS UNSIGNED), 9, CAST(september AS DECIMAL(10,2)) FROM raw_brazil_inflation
                UNION ALL
                SELECT CAST(year AS UNSIGNED), 10, CAST(october AS DECIMAL(10,2)) FROM raw_brazil_inflation
                UNION ALL
                SELECT CAST(year AS UNSIGNED), 11, CAST(november AS DECIMAL(10,2)) FROM raw_brazil_inflation
                UNION ALL
                SELECT CAST(year AS UNSIGNED), 12, CAST(december AS DECIMAL(10,2)) FROM raw_brazil_inflation
            ) AS unpivoted_data
            WHERE inflation_rate IS NOT NULL
            ORDER BY year DESC, month DESC;
        """
    },
    {
        "name": "2. Agregasi Penjualan Per Bulan",
        "desc": "Menghitung total pembelian customer per bulan.",
        "sql": """
            CREATE TABLE IF NOT EXISTS fact_monthly_purchasing_power AS
            SELECT 
                CAST(YEAR(purchase_date) AS UNSIGNED) as sales_year,
                CAST(MONTH(purchase_date) AS UNSIGNED) as sales_month,
                COUNT(DISTINCT order_id) as total_transactions,
                SUM(price) as total_spending_revenue,
                AVG(price) as avg_spending_per_item
            FROM fact_sales
            WHERE purchase_date IS NOT NULL
            GROUP BY 1, 2;
        """
    },
    {
        "name": "3. Final Table: Korelasi Inflasi vs Daya Beli",
        "desc": "Menggabungkan Data Inflasi dan Data Penjualan (SAFE DATE CONSTRUCT).",
        "sql": """
            CREATE TABLE IF NOT EXISTS mart_inflation_analysis AS
            SELECT 
                s.sales_year,
                s.sales_month,
                
                CAST(
                    CONCAT(
                        CAST(s.sales_year AS CHAR), '-', 
                        LPAD(CAST(s.sales_month AS CHAR), 2, '0'), 
                        '-01'
                    ) 
                AS DATE) as period_date,
                
                s.total_transactions,
                s.total_spending_revenue,
                s.avg_spending_per_item,
                i.inflation_rate
            FROM fact_monthly_purchasing_power s
            JOIN dim_brazil_inflation i 
                ON s.sales_year = i.year 
                AND s.sales_month = i.month
            ORDER BY s.sales_year DESC, s.sales_month DESC;
        """
    }
]

def run_inflation_analysis():
    print("\n--- MULAI PROSES ELT: PHASE 3 (INFLATION ANALYSIS - FINAL FIX) ---")
    
    with engine.connect() as conn:
        for task in INFLATION_QUERIES:
            print(f"üîÑ Running: {task['name']}...")
            start_time = time.time()
            try:
                table_name = task['sql'].split("CREATE TABLE IF NOT EXISTS ")[1].split(" ")[0]
                conn.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
                conn.execute(text(task['sql']))
                duration = time.time() - start_time
                print(f"   ‚úÖ Selesai! ({duration:.2f} detik)")
            except Exception as e:
                print(f"   ‚ùå GAGAL: {e}")

if __name__ == "__main__":
    run_inflation_analysis()


--- MULAI PROSES ELT: PHASE 3 (INFLATION ANALYSIS - FINAL FIX) ---
üîÑ Running: 1. Cleaning & Unpivot Data Inflasi...
   ‚úÖ Selesai! (0.07 detik)
üîÑ Running: 2. Agregasi Penjualan Per Bulan...
   ‚úÖ Selesai! (2.36 detik)
üîÑ Running: 3. Final Table: Korelasi Inflasi vs Daya Beli...
   ‚úÖ Selesai! (0.04 detik)
