# Tutorial Pemodelan Data dan ETL dengan DuckDB

## Pengantar

Dalam tutorial ini, kita akan mempelajari cara implementasi pipeline ETL (Extract, Transform, Load) untuk data warehouse menggunakan DuckDB. Kita akan fokus pada pemodelan dimensi dan proses ETL yang realistis.

## GIT

makan malam

## 1.Persiapan DuckDB

Instal library yang diperlukan<br>
pip install duckdb pandas numpy

Import library

In [2]:
import duckdb
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
import urllib.request
import zipfile
import io

Koneksi ke DuckDB

In [3]:
conn = duckdb.connect('retail_dw.db')
print("Terhubung ke DuckDB!")

Terhubung ke DuckDB!


Table Fact_Transactions {
  Transaction_ID INT [primary key]
  CLIENTNUM INT [ref: > Dim_Customer.CLIENTNUM]
  Card_ID INT [ref: > Dim_Card.Card_ID]
  Demographic_ID INT [ref: > Dim_Demographics.Demographic_ID]
  Time_ID INT [ref: > Dim_Time.Time_ID]
  Credit_Limit FLOAT
  Total_Revolving_Bal INT
  Avg_Open_To_Buy FLOAT
  Total_Amt_Chng_Q4_Q1 FLOAT
  Total_Trans_Amt INT
  Total_Trans_Ct INT
  Total_Ct_Chng_Q4_Q1 FLOAT
  Avg_Utilization_Ratio FLOAT
}

Table Dim_Customer {
  CLIENTNUM INT [primary key]
  Attrition_Flag VARCHAR
  Customer_Age INT
  Months_on_book INT
}

Table Dim_Card {
  Card_ID INT [primary key]
  Card_Category VARCHAR
}

Table Dim_Demographics {
  Demographic_ID INT [primary key]
  Gender VARCHAR
  Dependent_count INT
  Education_Level VARCHAR
  Marital_Status VARCHAR
  Income_Category VARCHAR
}

Table Dim_Time {
  Time_ID INT [primary key]
  Months_Inactive_12_mon INT
  Contacts_Count_12_mon INT
}

## 2. Konsep Dasar Pemodelan Data Warehouse

### OLTP vs OLAP

| Aspek | OLTP | OLAP |
|--------|------|------|
| Tujuan | Pemrosesan transaksi | Analisis data |
| Desain | Ternormalisasi | Denormalisasi |
| Kueri | Sederhana, fokus pada catatan spesifik | Kompleks, melibatkan agregasi dan join |
| Performa | Dioptimalkan untuk transaksi (tulis) | Dioptimalkan untuk query (baca) |
| Data | Data saat ini | Data historis |
| Ukuran | Lebih kecil | Jauh lebih besar |

### Skema Pemodelan Dimensional

**1. Star Schema (Skema Bintang)**
- Tabel fakta pusat dengan pengukuran bisnis
- Tabel dimensi terhubung langsung ke tabel fakta

**2. Snowflake Schema (Skema Kepingan Salju)**
- Perluasan dari skema bintang
- Dimensi-dimensi dinormalisasi lebih lanjut

**3. Fact Constellation (Konstelasi Fakta)**
- Beberapa tabel fakta berbagi tabel dimensi
- Juga dikenal sebagai Galaxy Schema

## 3. Persiapan Data Sumber<br>
<br>
Dalam situasi dunia nyata, data berasal dari berbagai sumber. Mari simulasikan dengan beberapa dataset.

Fungsi untuk mengunduh dataset contoh (lebih realistis dibanding data random)

In [4]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [6]:
from google.colab import files
files.upload()


Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"laurenzjuan","key":"909ab406ffdf0cd705ff9ec2d5fbb6a0"}'}

In [7]:
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

**Unduh Dataset**

In [8]:
!kaggle datasets download -d sakshigoyal7/credit-card-customers

Dataset URL: https://www.kaggle.com/datasets/sakshigoyal7/credit-card-customers
License(s): CC0-1.0
Downloading credit-card-customers.zip to /content
  0% 0.00/379k [00:00<?, ?B/s]
100% 379k/379k [00:00<00:00, 23.4MB/s]


In [9]:
!unzip credit-card-customers.zip


Archive:  credit-card-customers.zip
  inflating: BankChurners.csv        


In [10]:
import pandas as pd

df = pd.read_csv("BankChurners.csv")  # Gantilah dengan nama file yang sesuai


In [11]:
df.head()

Unnamed: 0,CLIENTNUM,Attrition_Flag,Customer_Age,Gender,Dependent_count,Education_Level,Marital_Status,Income_Category,Card_Category,Months_on_book,...,Credit_Limit,Total_Revolving_Bal,Avg_Open_To_Buy,Total_Amt_Chng_Q4_Q1,Total_Trans_Amt,Total_Trans_Ct,Total_Ct_Chng_Q4_Q1,Avg_Utilization_Ratio,Naive_Bayes_Classifier_Attrition_Flag_Card_Category_Contacts_Count_12_mon_Dependent_count_Education_Level_Months_Inactive_12_mon_1,Naive_Bayes_Classifier_Attrition_Flag_Card_Category_Contacts_Count_12_mon_Dependent_count_Education_Level_Months_Inactive_12_mon_2
0,768805383,Existing Customer,45,M,3,High School,Married,$60K - $80K,Blue,39,...,12691.0,777,11914.0,1.335,1144,42,1.625,0.061,9.3e-05,0.99991
1,818770008,Existing Customer,49,F,5,Graduate,Single,Less than $40K,Blue,44,...,8256.0,864,7392.0,1.541,1291,33,3.714,0.105,5.7e-05,0.99994
2,713982108,Existing Customer,51,M,3,Graduate,Married,$80K - $120K,Blue,36,...,3418.0,0,3418.0,2.594,1887,20,2.333,0.0,2.1e-05,0.99998
3,769911858,Existing Customer,40,F,4,High School,Unknown,Less than $40K,Blue,34,...,3313.0,2517,796.0,1.405,1171,20,2.333,0.76,0.000134,0.99987
4,709106358,Existing Customer,40,M,3,Uneducated,Married,$60K - $80K,Blue,21,...,4716.0,0,4716.0,2.175,816,28,2.5,0.0,2.2e-05,0.99998


In [None]:
def download_sample_data():
    """Mengunduh dan menyiapkan dataset sampel"""

    # Buat folder data jika belum ada
    if not os.path.exists('data'):
        os.makedirs('data')

    # Data transaksi penjualan (simulasi dari CSV ekspor)
    # Membuat data contoh yang realistis

    # 1. Data produk
    products = pd.DataFrame({
        'product_id': range(1, 51),
        'name': [f'Product-{i}' for i in range(1, 51)],
        'category': np.random.choice(['Electronics', 'Clothing', 'Home', 'Books', 'Food'], 50),
        'subcategory': np.random.choice(['Smartphones', 'Laptops', 'T-shirts', 'Pants', 'Kitchen',
                                       'Bedroom', 'Fiction', 'Non-fiction', 'Snacks', 'Beverages'], 50),
        'base_cost': np.random.uniform(5, 200, 50).round(2),
        'base_price': np.random.uniform(10, 300, 50).round(2)
    })

    # 2. Data toko
    stores = pd.DataFrame({
        'store_id': range(1, 21),
        'name': [f'Store-{i}' for i in range(1, 21)],
        'city': np.random.choice(['Jakarta', 'Bandung', 'Surabaya', 'Yogyakarta', 'Medan'], 20),
        'region': np.random.choice(['Jawa', 'Sumatera', 'Kalimantan', 'Sulawesi'], 20),
        'type': np.random.choice(['Mall', 'Street', 'Standalone'], 20)
    })

    # 3. Data pelanggan
    customers = pd.DataFrame({
        'customer_id': range(1, 201),
        'first_name': [f'First-{i}' for i in range(1, 201)],
        'last_name': [f'Last-{i}' for i in range(1, 201)],
        'email': [f'customer{i}@example.com' for i in range(1, 201)],
        'city': np.random.choice(['Jakarta', 'Bandung', 'Surabaya', 'Yogyakarta', 'Medan',
                                 'Makassar', 'Palembang', 'Semarang'], 200),
        'membership': np.random.choice(['Silver', 'Gold', 'Platinum', 'Regular'], 200)
    })

    # 4. Generate transaksi
    # Simulasi data dari point of sale system
    num_transactions = 5000
    transactions = []

    start_date = datetime(2022, 1, 1)
    end_date = datetime(2023, 12, 31)
    dates = [start_date + timedelta(days=np.random.randint(0, (end_date-start_date).days))
             for _ in range(num_transactions)]

    for i in range(num_transactions):
        tx_date = dates[i]
        store_id = np.random.choice(stores['store_id'])
        customer_id = np.random.choice(customers['customer_id'])

        # Setiap transaksi bisa punya beberapa item
        items_count = np.random.randint(1, 5)

        for j in range(items_count):
            product_id = np.random.choice(products['product_id'])
            product_info = products[products['product_id'] == product_id].iloc[0]

            # Harga bisa bervariasi dari waktu ke waktu
            price_variance = np.random.uniform(0.9, 1.1)
            price = round(product_info['base_price'] * price_variance, 2)

            cost_variance = np.random.uniform(0.95, 1.05)
            cost = round(product_info['base_cost'] * cost_variance, 2)

            quantity = np.random.randint(1, 6)

            # Diskon kadang diberikan
            discount_pct = 0
            if np.random.random() < 0.3:  # 30% transaksi mendapat diskon
                discount_pct = np.random.choice([5, 10, 15, 20, 25, 50]) / 100

            discount_amount = round(price * quantity * discount_pct, 2)
            total = round(price * quantity - discount_amount, 2)
            profit = round(total - (cost * quantity), 2)

            transactions.append({
                'transaction_id': f'TX-{i+1}',
                'date': tx_date.strftime('%Y-%m-%d'),
                'store_id': store_id,
                'customer_id': customer_id,
                'product_id': product_id,
                'quantity': quantity,
                'unit_price': price,
                'unit_cost': cost,
                'discount_pct': discount_pct,
                'discount_amount': discount_amount,
                'total_amount': total,
                'profit': profit
            })

    # Simpan semua data ke CSV (simulasi data dari berbagai sumber)
    products.to_csv('data/products.csv', index=False)
    stores.to_csv('data/stores.csv', index=False)
    customers.to_csv('data/customers.csv', index=False)

    tx_df = pd.DataFrame(transactions)
    tx_df.to_csv('data/transactions.csv', index=False)

    # Buat data yang "kotor" dengan sengaja (seperti di dunia nyata)
    # Salin data pelanggan, tapi dengan beberapa kesalahan
    customers_dirty = customers.copy()
    # Ubah beberapa nilai
    for i in range(20):
        idx = np.random.randint(0, len(customers_dirty))
        if np.random.random() < 0.5:
            customers_dirty.loc[idx, 'city'] = customers_dirty.loc[idx, 'city'].upper()
        else:
            customers_dirty.loc[idx, 'city'] = customers_dirty.loc[idx, 'city'].lower()

    # Tambahkan beberapa nilai duplikat dengan ID berbeda
    duplicates = customers.sample(10).copy()
    duplicates['customer_id'] = range(201, 211)
    customers_dirty = pd.concat([customers_dirty, duplicates])

    # Tambahkan nilai kosong
    for i in range(15):
        idx = np.random.randint(0, len(customers_dirty))
        col = np.random.choice(['first_name', 'last_name', 'email', 'city'])
        customers_dirty.loc[idx, col] = np.nan

    customers_dirty.to_csv('data/customers_dirty.csv', index=False)

    print(f"Data sampel berhasil dibuat:")
    print(f"- {len(products)} produk")
    print(f"- {len(stores)} toko")
    print(f"- {len(customers)} pelanggan")
    print(f"- {len(tx_df)} transaksi")
    print(f"- {len(customers_dirty)} pelanggan (dengan data kotor)")

Unduh/Buat data sampel

In [None]:
download_sample_data()

## 4. Perancangan Model Data Warehouse<br>
<br>
Kita akan menerapkan model Star Schema untuk data warehouse retail kita:

In [None]:
def create_data_warehouse_schema():
    """Membuat skema data warehouse (dimensi & tabel fakta)"""

    # Drop dependent tables first to avoid dependency errors
    try:
        conn.execute("DROP TABLE IF EXISTS fact_sales;")
        conn.execute("DROP TABLE IF EXISTS fact_inventory;")
    except:
        pass

    # 1. Buat dimensi tanggal terlebih dahulu - fixed for DuckDB
    conn.execute("""
    -- Dimensi Tanggal
    CREATE OR REPLACE TABLE dim_date AS
    WITH date_range AS (
      SELECT unnest(generate_series('2022-01-01'::DATE, '2023-12-31'::DATE, INTERVAL '1 day')) as date
    )
    SELECT
      (EXTRACT(YEAR FROM date) * 10000 + EXTRACT(MONTH FROM date) * 100 + EXTRACT(DAY FROM date))::INTEGER AS date_key,
      date,
      EXTRACT(DAY FROM date) AS day,
      EXTRACT(MONTH FROM date) AS month,
      strftime(date, '%B') AS month_name,
      EXTRACT(QUARTER FROM date) AS quarter,
      EXTRACT(YEAR FROM date) AS year,
      EXTRACT(DOW FROM date) AS day_of_week,
      strftime(date, '%A') AS day_name,
      CASE
        WHEN EXTRACT(MONTH FROM date) BETWEEN 3 AND 5 THEN 'Spring'
        WHEN EXTRACT(MONTH FROM date) BETWEEN 6 AND 8 THEN 'Summer'
        WHEN EXTRACT(MONTH FROM date) BETWEEN 9 AND 11 THEN 'Fall'
        ELSE 'Winter'
      END AS season
    FROM date_range;
    """)

    # Add primary key to dim_date table
    conn.execute("""
    ALTER TABLE dim_date ADD PRIMARY KEY (date_key);
    """)

    # 2. Buat tabel dimensi lainnya (kosong)
    conn.execute("""
    -- Dimensi Produk
    CREATE OR REPLACE TABLE dim_product (
      product_key INTEGER PRIMARY KEY,
      product_id INTEGER NOT NULL,
      product_name VARCHAR,
      category VARCHAR,
      subcategory VARCHAR,
      unit_cost DECIMAL(10,2),
      unit_price DECIMAL(10,2),
      effective_date DATE,
      expiration_date DATE,
      current_flag BOOLEAN
    );

    -- Dimensi Toko
    CREATE OR REPLACE TABLE dim_store (
      store_key INTEGER PRIMARY KEY,
      store_id INTEGER NOT NULL,
      store_name VARCHAR,
      city VARCHAR,
      region VARCHAR,
      store_type VARCHAR,
      effective_date DATE,
      expiration_date DATE,
      current_flag BOOLEAN
    );

    -- Dimensi Pelanggan
    CREATE OR REPLACE TABLE dim_customer (
      customer_key INTEGER PRIMARY KEY,
      customer_id INTEGER NOT NULL,
      first_name VARCHAR,
      last_name VARCHAR,
      email VARCHAR,
      city VARCHAR,
      membership VARCHAR,
      effective_date DATE,
      expiration_date DATE,
      current_flag BOOLEAN
    );

    -- Tabel Fakta Penjualan
    CREATE OR REPLACE TABLE fact_sales (
      sales_key INTEGER PRIMARY KEY,
      transaction_id VARCHAR,
      date_key INTEGER,
      product_key INTEGER,
      store_key INTEGER,
      customer_key INTEGER,
      quantity INTEGER,
      unit_price DECIMAL(10,2),
      unit_cost DECIMAL(10,2),
      discount_pct DECIMAL(5,2),
      discount_amount DECIMAL(10,2),
      sales_amount DECIMAL(10,2),
      profit_amount DECIMAL(10,2),

      FOREIGN KEY (date_key) REFERENCES dim_date(date_key),
      FOREIGN KEY (product_key) REFERENCES dim_product(product_key),
      FOREIGN KEY (store_key) REFERENCES dim_store(store_key),
      FOREIGN KEY (customer_key) REFERENCES dim_customer(customer_key)
    );
    """)

    print("Skema data warehouse berhasil dibuat!")

Buat skema data warehouse

In [None]:
create_data_warehouse_schema()

## 5. Proses ETL (Extract, Transform, Load)<br>
<br>
Sekarang kita implementasikan proses ETL yang komprehensif:

### 5.1 Extract - Mengambil Data dari Sumber

In [None]:
def extract_source_data():
    """Ekstrak data dari berbagai sumber"""

    # Baca data dari file CSV
    products_df = pd.read_csv('data/products.csv')
    stores_df = pd.read_csv('data/stores.csv')
    customers_df = pd.read_csv('data/customers_dirty.csv')  # Sengaja menggunakan data kotor
    transactions_df = pd.read_csv('data/transactions.csv')

    print(f"Data berhasil diekstrak:")
    print(f"- Produk: {len(products_df)} baris")
    print(f"- Toko: {len(stores_df)} baris")
    print(f"- Pelanggan: {len(customers_df)} baris")
    print(f"- Transaksi: {len(transactions_df)} baris")

    return products_df, stores_df, customers_df, transactions_df

Ekstrak data dari sumber

In [None]:
products_df, stores_df, customers_df, transactions_df = extract_source_data()

### 5.2 Transform - Membersihkan dan Mengubah Data<br>
<br>
Kita implementasikan dua pendekatan transformasi:<br>
1. Menggunakan pandas (Python)<br>
2. Menggunakan SQL di DuckDB

In[8]:

In [None]:
def transform_with_pandas():
    """Transformasi data menggunakan pandas (Python)"""

    # Baca ulang data untuk kejelasan tutorial
    products_df = pd.read_csv('data/products.csv')
    stores_df = pd.read_csv('data/stores.csv')
    customers_df = pd.read_csv('data/customers_dirty.csv')
    transactions_df = pd.read_csv('data/transactions.csv')

    # 1. Transformasi dimensi produk
    # Implementasi Slowly Changing Dimension (SCD) Type 2 sederhana
    dim_product = products_df.copy()
    dim_product['product_key'] = dim_product['product_id']  # Dalam kasus ini menggunakan ID yang sama
    dim_product['effective_date'] = '2022-01-01'  # Tanggal efektif
    dim_product['expiration_date'] = None  # Tidak berakhir karena ini adalah data awal
    dim_product['current_flag'] = True  # Semua catatan aktif

    dim_product = dim_product.rename(columns={
        'name': 'product_name',
        'base_cost': 'unit_cost',
        'base_price': 'unit_price'
    })

    # 2. Transformasi dimensi toko
    dim_store = stores_df.copy()
    dim_store['store_key'] = dim_store['store_id']  # Dalam kasus ini menggunakan ID yang sama
    dim_store['effective_date'] = '2022-01-01'  # Tanggal efektif
    dim_store['expiration_date'] = None  # Tidak berakhir karena ini adalah data awal
    dim_store['current_flag'] = True  # Semua catatan aktif

    dim_store = dim_store.rename(columns={'name': 'store_name', 'type': 'store_type'})

    # 3. Transformasi dimensi pelanggan (dengan pembersihan data)
    # Pembersihan data pelanggan
    dim_customer = customers_df.copy()

    # Menangani nilai yang hilang
    dim_customer['first_name'] = dim_customer['first_name'].fillna('Unknown')
    dim_customer['last_name'] = dim_customer['last_name'].fillna('Unknown')
    dim_customer['email'] = dim_customer['email'].fillna('unknown@example.com')

    # Standarisasi kota (kapitalisasi yang konsisten)
    dim_customer['city'] = dim_customer['city'].str.title()

    # Menangani duplikat berdasarkan email (yang seharusnya unik)
    dim_customer = dim_customer.drop_duplicates(subset=['email'], keep='first')

    # Tambahkan kolom untuk SCD Type 2
    dim_customer['customer_key'] = range(1, len(dim_customer) + 1)  # Buat surrogate key baru
    dim_customer['effective_date'] = '2022-01-01'  # Tanggal efektif
    dim_customer['expiration_date'] = None  # Tidak berakhir karena ini adalah data awal
    dim_customer['current_flag'] = True  # Semua catatan aktif

    # 4. Transformasi fakta penjualan
    # Konversi string ke tanggal
    transactions_df['date'] = pd.to_datetime(transactions_df['date'])

    # Buat date_key berdasarkan format YYYYMMDD
    transactions_df['date_key'] = transactions_df['date'].dt.strftime('%Y%m%d').astype(int)

    # Gunakan pandas merge untuk menggabungkan dengan dimensi untuk mendapatkan kunci surrogate
    # Simulasi fakta setelah dimensi dibangun
    fact_sales = transactions_df.copy()

    # Ganti sales_key dengan kunci surrogate
    fact_sales['sales_key'] = range(1, len(fact_sales) + 1)

    # Rename kolom untuk kejelasan
    fact_sales = fact_sales.rename(columns={
        'total_amount': 'sales_amount'
    })

    # Hapus kolom yang tidak diperlukan lagi
    # fact_sales = fact_sales.drop(['date'], axis=1)

    return dim_product, dim_store, dim_customer, fact_sales

Transformasi dengan pandas

In [None]:
dim_product_pd, dim_store_pd, dim_customer_pd, fact_sales_pd = transform_with_pandas()

In [None]:
print("Transformasi pandas selesai:")
print(f"- Dimensi produk: {len(dim_product_pd)} baris")
print(f"- Dimensi toko: {len(dim_store_pd)} baris")
print(f"- Dimensi pelanggan: {len(dim_customer_pd)} baris")
print(f"- Fakta penjualan: {len(fact_sales_pd)} baris")

In [None]:
def transform_with_sql():
    """Transformasi data menggunakan SQL di DuckDB"""

    # 1. Buat tabel staging untuk data sumber
    conn.execute("CREATE OR REPLACE TABLE staging_products AS SELECT * FROM read_csv_auto('data/products.csv')")
    conn.execute("CREATE OR REPLACE TABLE staging_stores AS SELECT * FROM read_csv_auto('data/stores.csv')")
    conn.execute("CREATE OR REPLACE TABLE staging_customers AS SELECT * FROM read_csv_auto('data/customers_dirty.csv')")
    conn.execute("CREATE OR REPLACE TABLE staging_transactions AS SELECT * FROM read_csv_auto('data/transactions.csv')")

    # 2. Transformasi dimensi produk dengan SQL
    conn.execute("""
    CREATE OR REPLACE TABLE staging_dim_product AS
    SELECT
        product_id AS product_key,
        product_id,
        name AS product_name,
        category,
        subcategory,
        base_cost AS unit_cost,
        base_price AS unit_price,
        '2022-01-01'::DATE AS effective_date,
        NULL::DATE AS expiration_date,
        TRUE AS current_flag
    FROM staging_products
    """)

    # 3. Transformasi dimensi toko dengan SQL
    conn.execute("""
    CREATE OR REPLACE TABLE staging_dim_store AS
    SELECT
        store_id AS store_key,
        store_id,
        name AS store_name,
        city,
        region,
        type AS store_type,
        '2022-01-01'::DATE AS effective_date,
        NULL::DATE AS expiration_date,
        TRUE AS current_flag
    FROM staging_stores
    """)

    # 4. Transformasi dimensi pelanggan dengan SQL (termasuk data cleansing)
    conn.execute("""
    CREATE OR REPLACE TABLE staging_dim_customer AS
    WITH clean_customers AS (
        SELECT
            customer_id,
            COALESCE(first_name, 'Unknown') AS first_name,
            COALESCE(last_name, 'Unknown') AS last_name,
            COALESCE(email, 'unknown@example.com') AS email,
            CASE
                WHEN city IS NULL THEN 'Unknown'
                ELSE UPPER(SUBSTRING(LOWER(city), 1, 1)) || SUBSTRING(LOWER(city), 2) -- Standarisasi kapitalisasi
            END AS city,
            membership,
            -- Ambil hanya baris pertama untuk email duplikat
            ROW_NUMBER() OVER (PARTITION BY email ORDER BY customer_id) AS rn
        FROM staging_customers
    )
    SELECT
        ROW_NUMBER() OVER (ORDER BY customer_id) AS customer_key,
        customer_id,
        first_name,
        last_name,
        email,
        city,
        membership,
        '2022-01-01'::DATE AS effective_date,
        NULL::DATE AS expiration_date,
        TRUE AS current_flag
    FROM clean_customers
    WHERE rn = 1 -- Eliminasi duplikat
    """)

    # 5. Transformasi fakta penjualan dengan SQL
    conn.execute("""
    CREATE OR REPLACE TABLE staging_fact_sales AS
    SELECT
        ROW_NUMBER() OVER (ORDER BY transaction_id, product_id) AS sales_key,
        transaction_id,
        STRFTIME(date, '%Y%m%d')::INTEGER AS date_key,
        product_id AS product_key, -- Akan diganti nanti dengan JOIN
        store_id AS store_key, -- Akan diganti nanti dengan JOIN
        customer_id AS customer_key, -- Akan diganti nanti dengan JOIN
        quantity,
        unit_price,
        unit_cost,
        discount_pct,
        discount_amount,
        total_amount AS sales_amount,
        profit AS profit_amount
    FROM staging_transactions
    """)

    # Dapatkan jumlah baris
    product_count = conn.execute("SELECT COUNT(*) FROM staging_dim_product").fetchone()[0]
    store_count = conn.execute("SELECT COUNT(*) FROM staging_dim_store").fetchone()[0]
    customer_count = conn.execute("SELECT COUNT(*) FROM staging_dim_customer").fetchone()[0]
    sales_count = conn.execute("SELECT COUNT(*) FROM staging_fact_sales").fetchone()[0]

    print("Transformasi SQL selesai:")
    print(f"- Dimensi produk: {product_count} baris")
    print(f"- Dimensi toko: {store_count} baris")
    print(f"- Dimensi pelanggan: {customer_count} baris")
    print(f"- Fakta penjualan: {sales_count} baris")

Transformasi dengan SQL

In [None]:
transform_with_sql()

### 5.3 Load - Memuat Data ke Data Warehouse<br>
<br>
Kita akan mengimplementasikan dua opsi loading:<br>
1. Load dari pandas DataFrame<br>
2. Load dari tabel staging SQL

In[12]:

In [None]:
def load_from_pandas(dim_product, dim_store, dim_customer, fact_sales):
    """Memuat data dari pandas DataFrame ke data warehouse"""

    # 1. Muat dimensi produk
    conn.execute("DELETE FROM dim_product")  # Bersihkan tabel sasaran
    conn.execute("INSERT INTO dim_product SELECT * FROM dim_product")

    # 2. Muat dimensi toko
    conn.execute("DELETE FROM dim_store")  # Bersihkan tabel sasaran
    conn.execute("INSERT INTO dim_store SELECT * FROM dim_store")

    # 3. Muat dimensi pelanggan
    conn.execute("DELETE FROM dim_customer")  # Bersihkan tabel sasaran
    conn.execute("INSERT INTO dim_customer SELECT * FROM dim_customer")

    # 4. Muat fakta penjualan
    # Sebelum memuat, kita perlu menyelaraskan kunci surrogate dengan dimensi

    # Buat tabel sementara untuk fakta
    conn.execute("CREATE OR REPLACE TABLE temp_fact_sales AS SELECT * FROM fact_sales")

    # 5. Muat fakta penjualan setelah pemetaan kunci
    conn.execute("""
    INSERT INTO fact_sales
    SELECT
        f.sales_key,
        f.transaction_id,
        f.date_key,
        p.product_key,
        s.store_key,
        c.customer_key,
        f.quantity,
        f.unit_price,
        f.unit_cost,
        f.discount_pct,
        f.discount_amount,
        f.sales_amount,
        f.profit_amount
    FROM temp_fact_sales f
    JOIN dim_product p ON f.product_key = p.product_id
    JOIN dim_store s ON f.store_key = s.store_id
    JOIN dim_customer c ON f.customer_key = c.customer_id
    """)

    # Hapus tabel sementara
    conn.execute("DROP TABLE temp_fact_sales")

    # Menghitung jumlah baris yang dimuat
    product_count = conn.execute("SELECT COUNT(*) FROM dim_product").fetchone()[0]
    store_count = conn.execute("SELECT COUNT(*) FROM dim_store").fetchone()[0]
    customer_count = conn.execute("SELECT COUNT(*) FROM dim_customer").fetchone()[0]
    sales_count = conn.execute("SELECT COUNT(*) FROM fact_sales").fetchone()[0]

    print("Data dari pandas berhasil dimuat ke data warehouse:")
    print(f"- Dimensi produk: {product_count} baris")
    print(f"- Dimensi toko: {store_count} baris")
    print(f"- Dimensi pelanggan: {customer_count} baris")
    print(f"- Fakta penjualan: {sales_count} baris")

In [None]:
def load_from_staging():
    """Memuat data dari tabel staging SQL ke data warehouse"""

    # 1. Muat dimensi produk
    conn.execute("DELETE FROM dim_product")  # Bersihkan tabel sasaran
    conn.execute("INSERT INTO dim_product SELECT * FROM staging_dim_product")

    # 2. Muat dimensi toko
    conn.execute("DELETE FROM dim_store")  # Bersihkan tabel sasaran
    conn.execute("INSERT INTO dim_store SELECT * FROM staging_dim_store")

    # 3. Muat dimensi pelanggan
    conn.execute("DELETE FROM dim_customer")  # Bersihkan tabel sasaran
    conn.execute("INSERT INTO dim_customer SELECT * FROM staging_dim_customer")

    # 4. Muat fakta penjualan dengan pemetaan kunci yang benar
    conn.execute("DELETE FROM fact_sales")  # Bersihkan tabel sasaran
    conn.execute("""
    INSERT INTO fact_sales
    SELECT
        f.sales_key,
        f.transaction_id,
        f.date_key,
        p.product_key,
        s.store_key,
        c.customer_key,
        f.quantity,
        f.unit_price,
        f.unit_cost,
        f.discount_pct,
        f.discount_amount,
        f.sales_amount,
        f.profit_amount
    FROM staging_fact_sales f
    JOIN dim_product p ON f.product_key = p.product_id
    JOIN dim_store s ON f.store_key = s.store_id
    JOIN dim_customer c ON f.customer_key = c.customer_id
    """)

    # Menghitung jumlah baris yang dimuat
    product_count = conn.execute("SELECT COUNT(*) FROM dim_product").fetchone()[0]
    store_count = conn.execute("SELECT COUNT(*) FROM dim_store").fetchone()[0]
    customer_count = conn.execute("SELECT COUNT(*) FROM dim_customer").fetchone()[0]
    sales_count = conn.execute("SELECT COUNT(*) FROM fact_sales").fetchone()[0]

    print("Data dari staging berhasil dimuat ke data warehouse:")
    print(f"- Dimensi produk: {product_count} baris")
    print(f"- Dimensi toko: {store_count} baris")
    print(f"- Dimensi pelanggan: {customer_count} baris")
    print(f"- Fakta penjualan: {sales_count} baris")

Pilih salah satu metode loading:<br>
load_from_pandas(dim_product_pd, dim_store_pd, dim_customer_pd, fact_sales_pd)

In [None]:
load_from_staging()

## 6. Implementasi Snowflake Schema<br>
<br>
Mari buat versi Snowflake Schema dari model kita dengan menormalisasi dimensi produk.

In [None]:
def create_snowflake_schema():
    """Mengimplementasikan Snowflake Schema dari dimensi produk"""

    # Drop existing tables in the correct order to handle dependencies
    conn.execute("DROP TABLE IF EXISTS dim_product_snowflake")
    conn.execute("DROP TABLE IF EXISTS dim_subcategory")
    conn.execute("DROP TABLE IF EXISTS dim_category")

    # 1. Buat tabel kategori
    conn.execute("""
    CREATE TABLE dim_category AS
    SELECT
        ROW_NUMBER() OVER (ORDER BY category) AS category_key,
        category AS category_name
    FROM (SELECT DISTINCT category FROM dim_product)
    """)

    # 2. Buat tabel subkategori
    conn.execute("""
    CREATE TABLE dim_subcategory AS
    WITH subcategory_data AS (
        SELECT DISTINCT
            subcategory,
            category
        FROM dim_product
    )
    SELECT
        ROW_NUMBER() OVER (ORDER BY s.subcategory) AS subcategory_key,
        s.subcategory AS subcategory_name,
        c.category_key
    FROM subcategory_data s
    JOIN dim_category c ON s.category = c.category_name
    """)

    # 3. Buat versi snowflake dari tabel produk
    conn.execute("""
    CREATE TABLE dim_product_snowflake AS
    SELECT
        p.product_key,
        p.product_id,
        p.product_name,
        c.category_key,
        s.subcategory_key,
        p.unit_cost,
        p.unit_price,
        p.effective_date,
        p.expiration_date,
        p.current_flag
    FROM dim_product p
    JOIN dim_category c ON p.category = c.category_name
    JOIN dim_subcategory s ON p.subcategory = s.subcategory_name AND s.category_key = c.category_key
    """)

    # Menghitung jumlah baris
    category_count = conn.execute("SELECT COUNT(*) FROM dim_category").fetchone()[0]
    subcategory_count = conn.execute("SELECT COUNT(*) FROM dim_subcategory").fetchone()[0]
    product_count = conn.execute("SELECT COUNT(*) FROM dim_product_snowflake").fetchone()[0]

    print("Snowflake schema berhasil dibuat:")
    print(f"- Dimensi kategori: {category_count} baris")
    print(f"- Dimensi subkategori: {subcategory_count} baris")
    print(f"- Dimensi produk (snowflake): {product_count} baris")

Buat snowflake schema

In [None]:
create_snowflake_schema()

## 7. Implementasi Fact Constellation (Galaxy Schema)<br>
<br>
Sekarang kita akan menambahkan tabel fakta kedua untuk inventaris produk.

In [None]:
def create_fact_constellation():
    """Mengimplementasikan Fact Constellation dengan menambahkan tabel fakta inventaris"""

    # 1. Buat tabel fakta inventaris
    conn.execute("""
    CREATE OR REPLACE TABLE fact_inventory (
        inventory_key INTEGER PRIMARY KEY,
        date_key INTEGER,
        product_key INTEGER,
        store_key INTEGER,
        quantity_on_hand INTEGER,
        quantity_received INTEGER,
        quantity_sold INTEGER,
        stock_value DECIMAL(10,2),

        FOREIGN KEY (date_key) REFERENCES dim_date(date_key),
        FOREIGN KEY (product_key) REFERENCES dim_product(product_key),
        FOREIGN KEY (store_key) REFERENCES dim_store(store_key)
    )
    """)

    # 2. Buat data inventaris berdasarkan data penjualan (simulasi)
    conn.execute("""
    -- Data inventaris untuk bulan terakhir
    WITH latest_month AS (
        SELECT
            MAX(date) as max_date,
            EXTRACT(YEAR FROM MAX(date)) as year,
            EXTRACT(MONTH FROM MAX(date)) as month
        FROM dim_date
    ),

    month_dates AS (
        SELECT
            date_key,
            date
        FROM
            dim_date,
            latest_month
        WHERE
            EXTRACT(YEAR FROM date) = latest_month.year AND
            EXTRACT(MONTH FROM date) = latest_month.month
        ORDER BY
            date
    ),

    store_products AS (
        SELECT DISTINCT
            s.store_key,
            p.product_key
        FROM
            dim_store s,
            dim_product p
        ORDER BY
            s.store_key, p.product_key
        LIMIT 300 -- Batasi kombinasi untuk sampel
    )

    INSERT INTO fact_inventory
    WITH inventory_data AS (
        SELECT
            md.date_key,
            sp.store_key,
            sp.product_key,
            -- Acak untuk data contoh
            CAST(RANDOM() * 100 AS INTEGER) AS base_qty_on_hand,
            CAST(RANDOM() * 20 AS INTEGER) AS base_qty_received,
            CAST(RANDOM() * 15 AS INTEGER) AS base_qty_sold
        FROM
            month_dates md
        CROSS JOIN
            store_products sp
    ),

    -- Tambahkan running total untuk membuat data inventaris yang masuk akal
    running_inventory AS (
        SELECT
            id.date_key,
            id.store_key,
            id.product_key,
            id.base_qty_received AS quantity_received,
            id.base_qty_sold AS quantity_sold,
            CASE
                WHEN ROW_NUMBER() OVER (PARTITION BY id.store_key, id.product_key ORDER BY id.date_key) = 1
                THEN id.base_qty_on_hand
                ELSE NULL -- Akan diisi nanti
            END AS initial_qty
        FROM
            inventory_data id
    ),

    final_inventory AS (
        SELECT
            date_key,
            store_key,
            product_key,
            quantity_received,
            quantity_sold,
            SUM(COALESCE(initial_qty, 0)) OVER (
                PARTITION BY store_key, product_key
                ORDER BY date_key
                ROWS UNBOUNDED PRECEDING
            )
            + SUM(quantity_received) OVER (
                PARTITION BY store_key, product_key
                ORDER BY date_key
                ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
            )
            - SUM(quantity_sold) OVER (
                PARTITION BY store_key, product_key
                ORDER BY date_key
                ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
            )
            + COALESCE(initial_qty, 0)
            AS quantity_on_hand
        FROM
            running_inventory
    )

    SELECT
        ROW_NUMBER() OVER (ORDER BY fi.date_key, fi.store_key, fi.product_key) AS inventory_key,
        fi.date_key,
        fi.product_key,  -- Specify table alias
        fi.store_key,
        fi.quantity_on_hand,
        fi.quantity_received,
        fi.quantity_sold,
        fi.quantity_on_hand * dp.unit_cost AS stock_value
    FROM
        final_inventory fi
    JOIN
        dim_product dp ON fi.product_key = dp.product_key
    """)

    # Menghitung jumlah baris
    inventory_count = conn.execute("SELECT COUNT(*) FROM fact_inventory").fetchone()[0]

    print(f"Fact Constellation berhasil dibuat dengan tabel fakta inventaris ({inventory_count} baris)")

Buat fact constellation

In [None]:
create_fact_constellation()

## 8. Implementasi SCD (Slowly Changing Dimension) Type 2<br>
<br>
Tunjukkan bagaimana menangani perubahan data yang perlu dilacak secara historis:

In [None]:
def implement_scd_type2():
    """Implementasi SCD Type 2 untuk dimensi pelanggan"""

    # 1. Simulasi perubahan data (dengan pandas)
    # Ambil beberapa pelanggan dan ubah data mereka
    customers_df = pd.read_csv('data/customers.csv')

    # Pilih 10 pelanggan secara acak untuk diubah datanya
    updated_customers = customers_df.sample(10).copy()

    # Ubah beberapa data
    for idx, row in updated_customers.iterrows():
        if np.random.random() < 0.5:
            # Perubahan kota
            updated_customers.loc[idx, 'city'] = np.random.choice(['Jakarta Selatan', 'Bandung Barat',
                                                              'Surabaya Timur', 'Medan Utara'])
        else:
            # Perubahan membership
            current_membership = updated_customers.loc[idx, 'membership']
            new_membership = np.random.choice(['Silver', 'Gold', 'Platinum', 'Regular'])
            # Pastikan membership baru berbeda
            while new_membership == current_membership:
                new_membership = np.random.choice(['Silver', 'Gold', 'Platinum', 'Regular'])
            updated_customers.loc[idx, 'membership'] = new_membership

    # Simpan pelanggan yang diperbarui untuk diekstrak
    updated_customers.to_csv('data/updated_customers.csv', index=False)

    # 2. Ekstrak dan bersihkan data yang diperbarui
    conn.execute("CREATE OR REPLACE TABLE staging_updated_customers AS SELECT * FROM read_csv_auto('data/updated_customers.csv')")

    conn.execute("""
    CREATE OR REPLACE TABLE staging_updated_customers_clean AS
    SELECT
        customer_id,
        COALESCE(first_name, 'Unknown') AS first_name,
        COALESCE(last_name, 'Unknown') AS last_name,
        COALESCE(email, 'unknown@example.com') AS email,
        CASE
            WHEN city IS NULL THEN 'Unknown'
            ELSE LOWER(city) -- Standarisasi menjadi lowercase karena INITCAP tidak ada di DuckDB
        END AS city,
        membership,
        CURRENT_DATE AS effective_date
    FROM staging_updated_customers
    """)

    # 3. Identifikasi perubahan
    conn.execute("""
    CREATE OR REPLACE TABLE customer_changes AS
    SELECT
        s.customer_id,
        s.first_name,
        s.last_name,
        s.email,
        s.city,
        s.membership,
        s.effective_date,
        CASE
            WHEN d.customer_id IS NULL THEN 'NEW'
            WHEN (s.city != d.city OR s.membership != d.membership) THEN 'CHANGED'
            ELSE 'UNCHANGED'
        END AS change_type
    FROM
        staging_updated_customers_clean s
    LEFT JOIN
        dim_customer d
    ON
        s.customer_id = d.customer_id
        AND d.current_flag = TRUE
    """)

    # 4. Implementasi SCD Type 2 - Mengakhiri catatan lama
    conn.execute("""
    UPDATE dim_customer
    SET
        current_flag = FALSE,
        expiration_date = CURRENT_DATE - INTERVAL '1 day'
    WHERE
        customer_id IN (SELECT customer_id FROM customer_changes WHERE change_type = 'CHANGED')
        AND current_flag = TRUE
    """)

    # 5. Implementasi SCD Type 2 - Menambahkan catatan baru
    conn.execute("""
    INSERT INTO dim_customer
    SELECT
        (SELECT MAX(customer_key) FROM dim_customer) + ROW_NUMBER() OVER (ORDER BY customer_id) AS customer_key,
        customer_id,
        first_name,
        last_name,
        email,
        city,
        membership,
        effective_date,
        NULL AS expiration_date,
        TRUE AS current_flag
    FROM
        customer_changes
    WHERE
        change_type = 'CHANGED'
    """)

    # Menampilkan perubahan
    changes_count = conn.execute("SELECT change_type, COUNT(*) FROM customer_changes GROUP BY change_type").fetchdf()
    print("Perubahan dimensi pelanggan:")
    print(changes_count)

    # Tampilkan contoh historis
    if conn.execute("SELECT COUNT(*) FROM customer_changes WHERE change_type = 'CHANGED'").fetchone()[0] > 0:
        changed_id = conn.execute("SELECT customer_id FROM customer_changes WHERE change_type = 'CHANGED' LIMIT 1").fetchone()[0]

        history = conn.execute(f"""
            SELECT
                customer_key,
                customer_id,
                city,
                membership,
                effective_date,
                expiration_date,
                current_flag
            FROM
                dim_customer
            WHERE
                customer_id = {changed_id}
            ORDER BY
                effective_date
        """).fetchdf()

        print(f"\nContoh riwayat untuk pelanggan {changed_id}:")
        print(history)

Implementasi SCD Type 2

In [None]:
implement_scd_type2()

## 9. Implementasi Incremental ETL<br>
<br>
Kita akan menunjukkan bagaimana melakukan ETL inkremental, di mana hanya data baru yang diproses:

In [None]:
def generate_incremental_data():
    """Menghasilkan data baru untuk diproses inkremental"""

    # 1. Baca data transaksi yang ada
    transactions_df = pd.read_csv('data/transactions.csv')

    # 2. Buat transaksi baru dengan tanggal yang lebih baru
    new_transactions = []

    start_date = datetime(2024, 1, 1)
    end_date = datetime(2024, 1, 10)

    for i in range(500):  # Buat 500 transaksi baru
        tx_date = start_date + timedelta(days=np.random.randint(0, (end_date-start_date).days))

        # Pilih store, customer, dan product secara acak dari data yang ada
        store_id = np.random.choice(transactions_df['store_id'].unique())
        customer_id = np.random.choice(transactions_df['customer_id'].unique())
        product_id = np.random.choice(transactions_df['product_id'].unique())

        # Dapatkan data produk
        product_info = transactions_df[transactions_df['product_id'] == product_id].iloc[0]

        # Simulasi variasi harga
        price_variance = np.random.uniform(0.9, 1.1)
        price = round(product_info['unit_price'] * price_variance, 2)

        cost_variance = np.random.uniform(0.95, 1.05)
        cost = round(product_info['unit_cost'] * cost_variance, 2)

        quantity = np.random.randint(1, 6)

        # Diskon kadang diberikan
        discount_pct = 0
        if np.random.random() < 0.3:  # 30% transaksi mendapat diskon
            discount_pct = np.random.choice([5, 10, 15, 20, 25, 50]) / 100

        discount_amount = round(price * quantity * discount_pct, 2)
        total = round(price * quantity - discount_amount, 2)
        profit = round(total - (cost * quantity), 2)

        new_transactions.append({
            'transaction_id': f'TX-NEW-{i+1}',
            'date': tx_date.strftime('%Y-%m-%d'),
            'store_id': store_id,
            'customer_id': customer_id,
            'product_id': product_id,
            'quantity': quantity,
            'unit_price': price,
            'unit_cost': cost,
            'discount_pct': discount_pct,
            'discount_amount': discount_amount,
            'total_amount': total,
            'profit': profit
        })

    # Simpan data baru
    new_tx_df = pd.DataFrame(new_transactions)
    new_tx_df.to_csv('data/new_transactions.csv', index=False)

    print(f"Generated {len(new_transactions)} new transactions for incremental processing")
    return new_tx_df

In [None]:
def incremental_etl():
    """Proses ETL inkremental hanya untuk data baru"""

    # 1. Ekstrak data baru
    # Generate data baru dahulu
    generate_incremental_data()

    # Dapatkan tanggal terakhir yang sudah diproses
    last_date = conn.execute("""
        SELECT MAX(d.date)
        FROM fact_sales f
        JOIN dim_date d ON f.date_key = d.date_key
    """).fetchone()[0]

    print(f"Tanggal terakhir terproses: {last_date}")

    # Ekstrak data baru dan filter transaksi yang sudah ada
    conn.execute("""
    CREATE OR REPLACE TABLE staging_new_transactions AS
    SELECT * FROM read_csv_auto('data/new_transactions.csv') t
    WHERE date::DATE > ?
    AND NOT EXISTS (
        SELECT 1 FROM fact_sales fs
        WHERE fs.transaction_id = t.transaction_id
    )
    """, [last_date])

    # Hitung jumlah transaksi baru
    new_count = conn.execute("SELECT COUNT(*) FROM staging_new_transactions").fetchone()[0]
    print(f"Ditemukan {new_count} transaksi baru yang unik untuk diproses")

    # Exit early if no new transactions to process
    if new_count == 0:
        print("Tidak ada transaksi baru untuk diproses")
        return

    # 1.5 Identifikasi dan tambahkan tanggal baru ke dimensi tanggal
    conn.execute("""
    CREATE OR REPLACE TABLE staging_new_dates AS
    SELECT DISTINCT
        strftime(date::DATE, '%Y%m%d')::INTEGER AS date_key,
        date::DATE AS date,
        EXTRACT(YEAR FROM date::DATE) AS year,
        EXTRACT(MONTH FROM date::DATE) AS month,
        EXTRACT(DAY FROM date::DATE) AS day,
        EXTRACT(DOW FROM date::DATE) AS day_of_week
    FROM staging_new_transactions
    WHERE strftime(date::DATE, '%Y%m%d')::INTEGER NOT IN (SELECT date_key FROM dim_date)
    """)

    # Hitung jumlah tanggal baru
    new_dates_count = conn.execute("SELECT COUNT(*) FROM staging_new_dates").fetchone()[0]
    print(f"Menambahkan {new_dates_count} tanggal baru ke dalam dimensi tanggal")

    # Tambahkan tanggal baru ke dimensi tanggal jika ada
    if new_dates_count > 0:
        conn.execute("""
        INSERT INTO dim_date (date_key, date, year, month, day, day_of_week)
        SELECT date_key, date, year, month, day, day_of_week
        FROM staging_new_dates
        """)

    # 2. Dapatkan sales_key terakhir dari fact_sales untuk incremental key generation
    max_sales_key = conn.execute("SELECT COALESCE(MAX(sales_key), 0) FROM fact_sales").fetchone()[0]
    print(f"Sales key terakhir: {max_sales_key}")

    # Dapatkan daftar dari semua sales_key yang sudah ada untuk pengecekan
    existing_keys = set()
    for row in conn.execute("SELECT sales_key FROM fact_sales").fetchall():
        existing_keys.add(row[0])

    # Dapatkan daftar dari semua transaction_id yang sudah ada untuk pengecekan
    existing_transactions = set()
    for row in conn.execute("SELECT transaction_id FROM fact_sales").fetchall():
        existing_transactions.add(row[0])

    # 2. Buat staging untuk fact sales dengan perhitungan kunci yang benar-benar aman
    conn.execute("""
    CREATE OR REPLACE TABLE staging_fact_sales_incremental AS
    SELECT
        0 AS sales_key, -- Placeholder, akan diupdate nanti
        t.transaction_id,
        strftime(t.date::DATE, '%Y%m%d')::INTEGER AS date_key,
        t.product_id AS product_key,
        t.store_id AS store_key,
        t.customer_id AS customer_key,
        t.quantity,
        t.unit_price,
        t.unit_cost,
        t.discount_pct,
        t.discount_amount,
        t.total_amount AS sales_amount,
        t.profit AS profit_amount
    FROM staging_new_transactions t
    WHERE t.transaction_id NOT IN (
        SELECT transaction_id FROM fact_sales
    )
    """)

    # Update kunci sales dengan pengecekan keunikan
    new_rows = conn.execute("SELECT transaction_id FROM staging_fact_sales_incremental").fetchall()
    start_key = max_sales_key + 1

    for i, row in enumerate(new_rows):
        transaction_id = row[0]
        new_key = start_key + i

        # Pastikan kunci baru tidak ada dalam daftar existing_keys
        while new_key in existing_keys:
            new_key += 1

        # Update record dengan kunci yang aman
        conn.execute("""
        UPDATE staging_fact_sales_incremental
        SET sales_key = ?
        WHERE transaction_id = ?
        """, [new_key, transaction_id])

        # Tambahkan ke set existing_keys untuk pengecekan berikutnya
        existing_keys.add(new_key)

    # Final check - pastikan tidak ada kunci duplikat dalam staging
    dup_count = conn.execute("""
    SELECT COUNT(*) FROM (
        SELECT sales_key FROM staging_fact_sales_incremental
        GROUP BY sales_key
        HAVING COUNT(*) > 1
    )
    """).fetchone()[0]

    if dup_count > 0:
        print(f"PERINGATAN: Ditemukan {dup_count} kunci duplikat dalam staging. Transaksi tidak akan dimasukkan.")
        return

    # Verifikasi bahwa kunci sales_key di staging tidak ada di fact_sales
    overlap_keys = conn.execute("""
    SELECT COUNT(*) FROM staging_fact_sales_incremental s
    WHERE s.sales_key IN (SELECT sales_key FROM fact_sales)
    """).fetchone()[0]

    if overlap_keys > 0:
        print(f"PERINGATAN: Ditemukan {overlap_keys} sales_key yang tumpang tindih. Transaksi tidak akan dimasukkan.")
        return

    # Verifikasi bahwa transaction_id di staging tidak ada di fact_sales
    overlap_transactions = conn.execute("""
    SELECT COUNT(*) FROM staging_fact_sales_incremental s
    WHERE s.transaction_id IN (SELECT transaction_id FROM fact_sales)
    """).fetchone()[0]

    if overlap_transactions > 0:
        print(f"PERINGATAN: Ditemukan {overlap_transactions} transaction_id yang tumpang tindih. Transaksi tidak akan dimasukkan.")
        return

    # 3. Load data baru - gunakan INSERT tanpa JOIN kompleks untuk mengurangi risiko
    conn.execute("""
    INSERT INTO fact_sales (
        sales_key, transaction_id, date_key, product_key, store_key,
        customer_key, quantity, unit_price, unit_cost, discount_pct,
        discount_amount, sales_amount, profit_amount
    )
    SELECT
        f.sales_key, f.transaction_id, f.date_key,
        f.product_key, f.store_key, f.customer_key,
        f.quantity, f.unit_price, f.unit_cost,
        f.discount_pct, f.discount_amount, f.sales_amount, f.profit_amount
    FROM staging_fact_sales_incremental f
    """)

    # Dapatkan jumlah total rekaman setelah proses inkremental
    inserted_count = conn.execute("SELECT COUNT(*) FROM staging_fact_sales_incremental").fetchone()[0]
    total_count = conn.execute("SELECT COUNT(*) FROM fact_sales").fetchone()[0]
    print(f"Berhasil menambahkan {inserted_count} transaksi baru")
    print(f"Total transaksi dalam data warehouse setelah proses inkremental: {total_count}")

Jalankan ETL inkremental

In [None]:
incremental_etl()

## 10. Class ETLPipeline untuk Mengotomatisasi Semua Proses

In [None]:
class ETLPipeline:
    """Class untuk mengotomatisasi proses ETL"""

    def __init__(self, db_path='retail_dw.db'):
        """Inisialisasi pipeline"""
        self.conn = duckdb.connect(db_path)
        self.initialized = False
        self.log_table_setup()

    def log_table_setup(self):
        """Membuat tabel log untuk melacak proses ETL"""
        self.conn.execute("""
        CREATE TABLE IF NOT EXISTS etl_log (
            log_id INTEGER PRIMARY KEY,
            process_name VARCHAR,
            start_time TIMESTAMP,
            end_time TIMESTAMP,
            records_processed INTEGER,
            status VARCHAR,
            message VARCHAR
        )
        """)

    def log_process_start(self, process_name):
        """Mencatat mulainya proses ETL"""
        log_id = self.conn.execute("SELECT COALESCE(MAX(log_id), 0) + 1 FROM etl_log").fetchone()[0]
        self.conn.execute("""
        INSERT INTO etl_log (log_id, process_name, start_time, status)
        VALUES (?, ?, CURRENT_TIMESTAMP, 'RUNNING')
        """, [log_id, process_name])
        return log_id

    def log_process_end(self, log_id, records=0, status='SUCCESS', message=None):
        """Mencatat selesainya proses ETL"""
        self.conn.execute("""
        UPDATE etl_log
        SET end_time = CURRENT_TIMESTAMP,
            records_processed = ?,
            status = ?,
            message = ?
        WHERE log_id = ?
        """, [records, status, message, log_id])

    def initialize_warehouse(self):
        """Inisialisasi skema data warehouse jika belum ada"""
        if self.initialized:
            return

        log_id = self.log_process_start("initialize_warehouse")

        try:
            # Dimensi tanggal
            self.conn.execute("""
            CREATE TABLE IF NOT EXISTS dim_date (
              date_key INTEGER PRIMARY KEY,
              date DATE NOT NULL,
              day INTEGER,
              day_of_week INTEGER,
              day_name VARCHAR,
              month INTEGER,
              month_name VARCHAR,
              quarter INTEGER,
              year INTEGER,
              season VARCHAR
            )
            """)

            # Dimensi produk
            self.conn.execute("""
            CREATE TABLE IF NOT EXISTS dim_product (
              product_key INTEGER PRIMARY KEY,
              product_id INTEGER NOT NULL,
              product_name VARCHAR,
              category VARCHAR,
              subcategory VARCHAR,
              unit_cost DECIMAL(10,2),
              unit_price DECIMAL(10,2),
              effective_date DATE,
              expiration_date DATE,
              current_flag BOOLEAN
            )
            """)

            # Dimensi toko
            self.conn.execute("""
            CREATE TABLE IF NOT EXISTS dim_store (
              store_key INTEGER PRIMARY KEY,
              store_id INTEGER NOT NULL,
              store_name VARCHAR,
              city VARCHAR,
              region VARCHAR,
              store_type VARCHAR,
              effective_date DATE,
              expiration_date DATE,
              current_flag BOOLEAN
            )
            """)

            # Dimensi pelanggan
            self.conn.execute("""
            CREATE TABLE IF NOT EXISTS dim_customer (
              customer_key INTEGER PRIMARY KEY,
              customer_id INTEGER NOT NULL,
              first_name VARCHAR,
              last_name VARCHAR,
              email VARCHAR,
              city VARCHAR,
              membership VARCHAR,
              effective_date DATE,
              expiration_date DATE,
              current_flag BOOLEAN
            )
            """)

            # Tabel fakta penjualan
            self.conn.execute("""
            CREATE TABLE IF NOT EXISTS fact_sales (
              sales_key INTEGER PRIMARY KEY,
              transaction_id VARCHAR,
              date_key INTEGER,
              product_key INTEGER,
              store_key INTEGER,
              customer_key INTEGER,
              quantity INTEGER,
              unit_price DECIMAL(10,2),
              unit_cost DECIMAL(10,2),
              discount_pct DECIMAL(5,2),
              discount_amount DECIMAL(10,2),
              sales_amount DECIMAL(10,2),
              profit_amount DECIMAL(10,2),

              FOREIGN KEY (date_key) REFERENCES dim_date(date_key),
              FOREIGN KEY (product_key) REFERENCES dim_product(product_key),
              FOREIGN KEY (store_key) REFERENCES dim_store(store_key),
              FOREIGN KEY (customer_key) REFERENCES dim_customer(customer_key)
            )
            """)

            # Berhasil inisialisasi
            self.initialized = True
            self.log_process_end(log_id, status='SUCCESS', message='Data warehouse schema initialized')

        except Exception as e:
            self.log_process_end(log_id, status='ERROR', message=str(e))
            raise

    def extract_all_sources(self):
        """Ekstrak data dari semua sumber"""
        log_id = self.log_process_start("extract_all_sources")

        try:
            # Bersihkan tabel staging
            self.conn.execute("DROP TABLE IF EXISTS staging_products")
            self.conn.execute("DROP TABLE IF EXISTS staging_stores")
            self.conn.execute("DROP TABLE IF EXISTS staging_customers")
            self.conn.execute("DROP TABLE IF EXISTS staging_transactions")

            # Ekstrak data dari file CSV
            self.conn.execute("CREATE TABLE staging_products AS SELECT * FROM read_csv_auto('data/products.csv')")
            self.conn.execute("CREATE TABLE staging_stores AS SELECT * FROM read_csv_auto('data/stores.csv')")
            self.conn.execute("CREATE TABLE staging_customers AS SELECT * FROM read_csv_auto('data/customers_dirty.csv')")
            self.conn.execute("CREATE TABLE staging_transactions AS SELECT * FROM read_csv_auto('data/transactions.csv')")

            # Hitung total jumlah catatan
            total_records = 0
            total_records += self.conn.execute("SELECT COUNT(*) FROM staging_products").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM staging_stores").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM staging_customers").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM staging_transactions").fetchone()[0]

            self.log_process_end(log_id, records=total_records, status='SUCCESS')
            return True

        except Exception as e:
            self.log_process_end(log_id, status='ERROR', message=str(e))
            raise

    def transform_data(self):
        """Transformasi data dari tabel staging"""
        log_id = self.log_process_start("transform_data")

        try:
            # 1. Transformasi dimensi tanggal
            self.conn.execute("""
            CREATE OR REPLACE TABLE dim_date AS
            WITH date_range AS (
              SELECT date::DATE as date
              FROM generate_series('2022-01-01'::DATE, '2024-12-31'::DATE, INTERVAL '1 day') as date
            )
            SELECT
              TO_VARCHAR(date, 'YYYYMMDD')::INTEGER AS date_key,
              date,
              EXTRACT(DAY FROM date) AS day,
              EXTRACT(DOW FROM date) AS day_of_week,
              dayname(date) AS day_name,
              EXTRACT(MONTH FROM date) AS month,
              monthname(date) AS month_name,
              EXTRACT(QUARTER FROM date) AS quarter,
              EXTRACT(YEAR FROM date) AS year,
              CASE
                WHEN EXTRACT(MONTH FROM date) BETWEEN 3 AND 5 THEN 'Spring'
                WHEN EXTRACT(MONTH FROM date) BETWEEN 6 AND 8 THEN 'Summer'
                WHEN EXTRACT(MONTH FROM date) BETWEEN 9 AND 11 THEN 'Fall'
                ELSE 'Winter'
              END AS season
            FROM date_range
            """)

            # 2. Transformasi dimensi produk
            self.conn.execute("""
            CREATE OR REPLACE TABLE staging_dim_product AS
            SELECT
                product_id AS product_key,
                product_id,
                name AS product_name,
                category,
                subcategory,
                base_cost AS unit_cost,
                base_price AS unit_price,
                '2022-01-01'::DATE AS effective_date,
                NULL::DATE AS expiration_date,
                TRUE AS current_flag
            FROM staging_products
            """)

            # 3. Transformasi dimensi toko
            self.conn.execute("""
            CREATE OR REPLACE TABLE staging_dim_store AS
            SELECT
                store_id AS store_key,
                store_id,
                name AS store_name,
                city,
                region,
                type AS store_type,
                '2022-01-01'::DATE AS effective_date,
                NULL::DATE AS expiration_date,
                TRUE AS current_flag
            FROM staging_stores
            """)

            # 4. Transformasi dimensi pelanggan dengan pembersihan data
            self.conn.execute("""
            CREATE OR REPLACE TABLE staging_dim_customer AS
            WITH clean_customers AS (
                SELECT
                    customer_id,
                    COALESCE(first_name, 'Unknown') AS first_name,
                    COALESCE(last_name, 'Unknown') AS last_name,
                    COALESCE(email, 'unknown@example.com') AS email,
                    CASE
                        WHEN city IS NULL THEN 'Unknown'
                        ELSE INITCAP(LOWER(city)) -- Standarisasi kapitalisasi
                    END AS city,
                    membership,
                    -- Ambil hanya baris pertama untuk email duplikat
                    ROW_NUMBER() OVER (PARTITION BY email ORDER BY customer_id) AS rn
                FROM staging_customers
            )
            SELECT
                ROW_NUMBER() OVER (ORDER BY customer_id) AS customer_key,
                customer_id,
                first_name,
                last_name,
                email,
                city,
                membership,
                '2022-01-01'::DATE AS effective_date,
                NULL::DATE AS expiration_date,
                TRUE AS current_flag
            FROM clean_customers
            WHERE rn = 1 -- Eliminasi duplikat
            """)

            # 5. Transformasi fakta penjualan
            self.conn.execute("""
            CREATE OR REPLACE TABLE staging_fact_sales AS
            SELECT
                ROW_NUMBER() OVER (ORDER BY transaction_id, product_id) AS sales_key,
                transaction_id,
                TO_VARCHAR(DATE(date), 'YYYYMMDD')::INTEGER AS date_key,
                product_id AS product_key, -- Akan diganti nanti dengan JOIN
                store_id AS store_key, -- Akan diganti nanti dengan JOIN
                customer_id AS customer_key, -- Akan diganti nanti dengan JOIN
                quantity,
                unit_price,
                unit_cost,
                discount_pct,
                discount_amount,
                total_amount AS sales_amount,
                profit AS profit_amount
            FROM staging_transactions
            """)

            # Hitung jumlah catatan
            total_records = 0
            total_records += self.conn.execute("SELECT COUNT(*) FROM dim_date").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM staging_dim_product").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM staging_dim_store").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM staging_dim_customer").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM staging_fact_sales").fetchone()[0]

            self.log_process_end(log_id, records=total_records, status='SUCCESS')
            return True

        except Exception as e:
            self.log_process_end(log_id, status='ERROR', message=str(e))
            raise

    def load_data(self):
        """Load data ke data warehouse"""
        log_id = self.log_process_start("load_data")

        try:
            # 1. Muat dimensi produk
            self.conn.execute("DELETE FROM dim_product")  # Bersihkan tabel sasaran
            self.conn.execute("INSERT INTO dim_product SELECT * FROM staging_dim_product")

            # 2. Muat dimensi toko
            self.conn.execute("DELETE FROM dim_store")  # Bersihkan tabel sasaran
            self.conn.execute("INSERT INTO dim_store SELECT * FROM staging_dim_store")

            # 3. Muat dimensi pelanggan
            self.conn.execute("DELETE FROM dim_customer")  # Bersihkan tabel sasaran
            self.conn.execute("INSERT INTO dim_customer SELECT * FROM staging_dim_customer")

            # 4. Muat fakta penjualan dengan pemetaan kunci yang benar
            self.conn.execute("DELETE FROM fact_sales")  # Bersihkan tabel sasaran
            self.conn.execute("""
            INSERT INTO fact_sales
            SELECT
                f.sales_key,
                f.transaction_id,
                f.date_key,
                p.product_key,
                s.store_key,
                c.customer_key,
                f.quantity,
                f.unit_price,
                f.unit_cost,
                f.discount_pct,
                f.discount_amount,
                f.sales_amount,
                f.profit_amount
            FROM staging_fact_sales f
            JOIN dim_product p ON f.product_key = p.product_id
            JOIN dim_store s ON f.store_key = s.store_id
            JOIN dim_customer c ON f.customer_key = c.customer_id
            """)

            # Menghitung jumlah baris yang dimuat
            product_count = self.conn.execute("SELECT COUNT(*) FROM dim_product").fetchone()[0]
            store_count = self.conn.execute("SELECT COUNT(*) FROM dim_store").fetchone()[0]
            customer_count = self.conn.execute("SELECT COUNT(*) FROM dim_customer").fetchone()[0]
            sales_count = self.conn.execute("SELECT COUNT(*) FROM fact_sales").fetchone()[0]

            total_records = product_count + store_count + customer_count + sales_count

            self.log_process_end(log_id, records=total_records, status='SUCCESS',
                                message=f'Loaded {product_count} products, {store_count} stores, '
                                        f'{customer_count} customers, {sales_count} sales')
            return True

        except Exception as e:
            self.log_process_end(log_id, status='ERROR', message=str(e))
            raise

    def run_full_pipeline(self):
        """Jalankan seluruh pipeline ETL"""
        print("Menjalankan ETL Pipeline lengkap...")

        start_time = datetime.now()

        try:
            # 1. Inisialisasi warehouse
            self.initialize_warehouse()
            print("âœ“ Data warehouse diinisialisasi")

            # 2. Ekstrak data
            self.extract_all_sources()
            print("âœ“ Data diekstrak dari semua sumber")

            # 3. Transform data
            self.transform_data()
            print("âœ“ Data ditransformasi")

            # 4. Load data
            self.load_data()
            print("âœ“ Data dimuat ke data warehouse")

            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()

            print(f"\nPipeline ETL selesai dalam {duration:.2f} detik")

            # Menampilkan statistik
            stats = self.conn.execute("""
                SELECT
                    (SELECT COUNT(*) FROM dim_date) as date_count,
                    (SELECT COUNT(*) FROM dim_product) as product_count,
                    (SELECT COUNT(*) FROM dim_store) as store_count,
                    (SELECT COUNT(*) FROM dim_customer) as customer_count,
                    (SELECT COUNT(*) FROM fact_sales) as sales_count
            """).fetchone()

            print("\nStatistik Data Warehouse:")
            print(f"âœ“ Dimensi Tanggal: {stats[0]} baris")
            print(f"âœ“ Dimensi Produk: {stats[1]} baris")
            print(f"âœ“ Dimensi Toko: {stats[2]} baris")
            print(f"âœ“ Dimensi Pelanggan: {stats[3]} baris")
            print(f"âœ“ Fakta Penjualan: {stats[4]} baris")

            return True

        except Exception as e:
            print(f"ERROR: Pipeline ETL gagal: {str(e)}")
            return False

    def get_etl_log(self, limit=10):
        """Menampilkan log ETL terakhir"""
        return self.conn.execute(f"""
            SELECT
                log_id,
                process_name,
                start_time,
                end_time,
                EXTRACT(EPOCH FROM (end_time - start_time)) as duration_seconds,
                records_processed,
                status,
                message
            FROM etl_log
            ORDER BY log_id DESC
            LIMIT {limit}
        """).fetchdf()

    def __del__(self):
        """Menutup koneksi saat objek dihapus"""
        try:
            if hasattr(self, 'conn'):
                self.conn.close()
                print("Koneksi DuckDB ditutup.")
        except:
            pass

Jalankan pipeline otomatis

In [None]:
pipeline = ETLPipeline()
pipeline.run_full_pipeline()

Lihat log ETL

In [None]:
pipeline.get_etl_log()

## 11. Kesimpulan

Dalam tutorial ini, kita telah mempelajari:

1. **Konsep dasar data warehouse** - OLTP vs OLAP, desain dimensional
2. **Implementasi Schema** - Star Schema, Snowflake Schema, dan Fact Constellation
3. **Proses ETL** - Extract, Transform, Load dengan berbagai pendekatan
   - Transformasi menggunakan pandas (Python)
   - Transformasi menggunakan SQL di DuckDB
4. **SCD Type 2** - Menangani perubahan historis pada dimensi
5. **ETL Inkremental** - Memproses hanya data baru
6. **Otomatisasi Pipeline** - Membuat class untuk mengelola seluruh proses

Beberapa _best practice_ yang telah kita terapkan:

1. **Pembersihan data** - Menangani data yang hilang, duplikat, dan inkonsistensi
2. **Pelacakan historis** - Menggunakan SCD Type 2 untuk _tracking_ perubahan
3. **Pemisahan tanggung jawab** - Memisahkan ekstraksi, transformasi, dan loading
4. **Logging** - Mencatat semua proses ETL
5. **Error handling** - Menangani kesalahan dengan baik
6. **Dokumentasi** - Menjelaskan setiap langkah proses