In [None]:
print("------------------- extraction and transformation -------------------")
import pandas as pd  
from sqlalchemy import create_engine  

# ------------------- Koneksi Database -------------------
hostname = "localhost"
port = 5432
username = "postgres"
password = "dataEngginer"
source_db = "adventureworks"  
staging_db = "staggingDB"      

# Koneksi ke database source dan staging (sudah diperbaiki)
source_engine = create_engine(f"postgresql+psycopg2://{username}:{password}@{hostname}:{port}/{source_db}")
staging_engine = create_engine(f"postgresql+psycopg2://{username}:{password}@{hostname}:{port}/{staging_db}")

# ------------------- RAW DATA ke raw_schema -------------------
print("------------- inserting raw tables to raw_schema in staging -------------")

df_raw_customer = pd.read_sql("SELECT * FROM sales.customer", source_engine)
df_raw_customer.to_sql('customer', staging_engine, schema='raw_schema', if_exists='replace', index=False)

df_raw_person = pd.read_sql("SELECT * FROM person.person", source_engine)
df_raw_person.to_sql('person', staging_engine, schema='raw_schema', if_exists='replace', index=False)

df_raw_product = pd.read_sql("SELECT * FROM production.product", source_engine)
df_raw_product.to_sql('product', staging_engine, schema='raw_schema', if_exists='replace', index=False)

df_raw_pc = pd.read_sql("SELECT * FROM production.productcategory", source_engine)
df_raw_pc.to_sql('productcategory', staging_engine, schema='raw_schema', if_exists='replace', index=False)

df_raw_psc = pd.read_sql("SELECT * FROM production.productsubcategory", source_engine)
df_raw_psc.to_sql('productsubcategory', staging_engine, schema='raw_schema', if_exists='replace', index=False)

df_raw_soh = pd.read_sql("SELECT * FROM sales.salesorderheader", source_engine)
df_raw_soh.to_sql('salesorderheader', staging_engine, schema='raw_schema', if_exists='replace', index=False)

df_raw_sod = pd.read_sql("SELECT * FROM sales.salesorderdetail", source_engine)
df_raw_sod.to_sql('salesorderdetail', staging_engine, schema='raw_schema', if_exists='replace', index=False)

df_raw_st = pd.read_sql("SELECT * FROM sales.salesterritory", source_engine)
df_raw_st.to_sql('salesterritory', staging_engine, schema='raw_schema', if_exists='replace', index=False)

df_raw_sp = pd.read_sql("SELECT * FROM person.stateprovince", source_engine)
df_raw_sp.to_sql('stateprovince', staging_engine, schema='raw_schema', if_exists='replace', index=False)

df_raw_cr = pd.read_sql("SELECT * FROM person.countryregion", source_engine)
df_raw_cr.to_sql('countryregion', staging_engine, schema='raw_schema', if_exists='replace', index=False)

print("Raw tables successfully inserted into raw_schema.")

# ------------------- EXTRACT + TRANSFORM -------------------

# DIM_CUSTOMER
query_customer = """
SELECT c.customerid,
       pp.firstname || ' ' || COALESCE(pp.middlename || ' ', '') || pp.lastname AS customername
FROM sales.customer c
JOIN person.person pp ON c.personid = pp.businessentityid;
"""
df_customer = pd.read_sql(query_customer, source_engine)
df_customer['customerKey'] = range(1, len(df_customer)+1)

# DIM_PRODUCT
query_product = """
SELECT p.productid, pc.name AS productsubcategory, p.name AS productname
FROM production.product p
JOIN production.productsubcategory psc ON p.productsubcategoryid = psc.productsubcategoryid
JOIN production.productcategory pc ON psc.productcategoryid = pc.productcategoryid;
"""
df_product = pd.read_sql(query_product, source_engine)
df_product['productKey'] = range(1, len(df_product)+1)

# DIM_TERRITORY
query_territory = """
SELECT st.territoryid, sp.name AS provincename, cr.name AS countryregion
FROM sales.salesterritory st
JOIN person.stateprovince sp ON st.territoryID = sp.territoryID
JOIN person.countryregion cr ON st.countryregioncode = cr.countryregioncode;
"""
df_territory = pd.read_sql(query_territory, source_engine)
df_territory['territoryKey'] = range(1, len(df_territory)+1)

# FACT_SALES
query_sales = """
SELECT soh.orderdate, soh.customerid, soh.territoryid, sod.productid,
       sod.orderqty, soh.totaldue
FROM sales.salesorderdetail sod
JOIN sales.salesorderheader soh ON sod.salesorderid = soh.salesorderid;
"""
df_sales = pd.read_sql(query_sales, source_engine)

# DIM_TIME
df_time = df_sales[['orderdate']].drop_duplicates()
df_time['year'] = pd.to_datetime(df_time['orderdate']).dt.year
df_time['month'] = pd.to_datetime(df_time['orderdate']).dt.month
df_time['day'] = pd.to_datetime(df_time['orderdate']).dt.day
df_time['timeKey'] = range(1, len(df_time)+1)

# Join surrogate keys ke fact_sales
df_sales = df_sales.merge(df_customer[['customerid', 'customerKey']], on='customerid', how='left')
df_sales = df_sales.merge(df_product[['productid', 'productKey']], on='productid', how='left')
df_sales = df_sales.merge(df_territory[['territoryid', 'territoryKey']], on='territoryid', how='left')
df_sales = df_sales.merge(df_time[['orderdate', 'timeKey']], on='orderdate', how='left')

# Agregasi fact_sales
df_fact = df_sales.groupby(['customerKey', 'productKey', 'territoryKey', 'timeKey']).agg(
    totalQuantity=('orderqty', 'sum'),
    averageAmount=('totaldue', 'mean'),
    totalRevenue=('totaldue', 'sum')
).reset_index()

df_fact['salesID'] = range(1, len(df_fact)+1)

# ------------------- LOAD KE star_schema -------------------

df_customer.to_sql('dim_customer', staging_engine, schema='star_schema', if_exists='replace', index=False)
df_product.to_sql('dim_product', staging_engine, schema='star_schema', if_exists='replace', index=False)
df_territory.to_sql('dim_territory', staging_engine, schema='star_schema', if_exists='replace', index=False)
df_time[['timeKey', 'year', 'month', 'day']].to_sql('dim_time', staging_engine, schema='star_schema', if_exists='replace', index=False)
df_fact.to_sql('fact_sales', staging_engine, schema='star_schema', if_exists='replace', index=False)

# ------------------- DONE -------------------
print("ETL selesai. Data berhasil disimpan ke staging database:")
print("- raw_schema: semua tabel sumber (sesuai LDM)")
print("- star_schema: tabel dim_customer, dim_product, dim_territory, dim_time, fact_sales")


In [None]:
print("------------------- Load data ke Data Warehouse -------------------")
import pandas as pd  
from sqlalchemy import create_engine, inspect
from sqlalchemy.exc import SQLAlchemyError

# Koneksi database
hostname = "localhost"  
port = 5432  
username = "postgres"  
password = "dataEngginer"  
dw_db = "adventureworksDw"  
staging_db = "staggingDB"  

# Membuat koneksi ke data warehouse menggunakan SQLAlchemy
dw_engine = create_engine(f'postgresql://{username}:{password}@{hostname}:{port}/{dw_db}')
# Membuat koneksi ke staging database menggunakan SQLAlchemy
staging_engine = create_engine(f'postgresql://{username}:{password}@{hostname}:{port}/{staging_db}')

# List tabel star schema yang akan di-load
tables = ['dim_customer', 'dim_product', 'dim_territory', 'dim_time', 'fact_sales']

def load_table(table_name):
    try:
        print(f"Loading table {table_name}...")
        # Ambil data dari staging
        df = pd.read_sql(f'SELECT * FROM public.{table_name}', staging_engine)

        # Cek apakah tabel sudah ada di data warehouse
        inspector = inspect(dw_engine)
        if table_name in inspector.get_table_names():
            # Bisa pilih strategi: replace, append, atau skip
            # Contoh: replace supaya data update total
            df.to_sql(table_name, dw_engine, if_exists='replace', index=False)
            print(f"Table {table_name} berhasil di-replace di Data Warehouse.")
        else:
            df.to_sql(table_name, dw_engine, if_exists='fail', index=False)
            print(f"Table {table_name} berhasil di-load di Data Warehouse.")
    except SQLAlchemyError as e:
        print(f"Gagal load table {table_name}: {e}")

# Looping load semua tabel
for table in tables:
    load_table(table)

print("Load ke Data Warehouse selesai.")


In [None]:
print("------------------- incremental ETL on the phase of Load to Data Warehouse -------------------")

import pandas as pd
from sqlalchemy import create_engine

# --- Konfigurasi koneksi database ---
hostname = "localhost"
port = 5432
username = "postgres"
password = "dataEngginer"
dw_db = "adventureworksdw"
staging_db = "staggingDB"

# Koneksi ke staging dan data warehouse
staging_engine = create_engine(f'postgresql://{username}:{password}@{hostname}:{port}/{staging_db}')
dw_engine = create_engine(f'postgresql://{username}:{password}@{hostname}:{port}/{dw_db}')

# Fungsi bantu untuk load tabel dari staging ke data warehouse
def load_table(table_name):
    try:
        print(f"Loading table {table_name} ke Data Warehouse...")
        
        # Ambil data dari schema stage
        df = pd.read_sql_table(table_name, con=staging_engine, schema='stage')

        if df.empty:
            print(f"Tidak ada data baru untuk dimuat di {table_name}.")
            return

        # Replace data di schema adventureworksdw
        df.to_sql(table_name, con=dw_engine, schema='public', if_exists='replace', index=False)
        print(f"Tabel {table_name} berhasil di-replace di Data Warehouse.\n")

    except Exception as e:
        print(f"Gagal memuat tabel {table_name} ke DW. Error: {e}")

# =====================
# Proses Load Dimensi
# =====================
load_table('dim_customer')
load_table('dim_product')
load_table('dim_territory')
load_table('dim_time')

# =====================
# Proses Load Fakta
# =====================
load_table('fact_sales')

print("✅ Load ke Data Warehouse selesai.\n")


OMG ----------------------------------------------

In [3]:
print("------------------- Extraction and Incremental Transformation -------------------")

import pandas as pd
from sqlalchemy import create_engine, text, inspect
from sqlalchemy.exc import SQLAlchemyError

# ------------------- Koneksi Database -------------------
hostname = "localhost"
port = 5432
username = "postgres"
password = "dataEngginer"
source_db = "adventureworks"
staging_db = "staggingDB"
dw_db = "adventureworksDw"

# Engine koneksi
source_engine = create_engine(f"postgresql+psycopg2://{username}:{password}@{hostname}:{port}/{source_db}")
staging_engine = create_engine(f"postgresql+psycopg2://{username}:{password}@{hostname}:{port}/{staging_db}")
dw_engine = create_engine(f"postgresql://{username}:{password}@{hostname}:{port}/{dw_db}")

# ------------------- Fungsi kontrol last processed date -------------------
def get_last_processed_date():
    try:
        query = "SELECT last_orderdate FROM etl_control WHERE id=1"
        with staging_engine.connect() as conn:
            result = conn.execute(text(query)).fetchone()
            if result and result[0]:
                return result[0]
            else:
                return None
    except Exception as e:
        # Jika tabel etl_control belum ada, buat dan insert row default
        with staging_engine.connect() as conn:
            conn.execute(text("""
                CREATE TABLE IF NOT EXISTS etl_control (
                    id INT PRIMARY KEY,
                    last_orderdate DATE
                )
            """))
            conn.execute(text("""
                INSERT INTO etl_control (id, last_orderdate) VALUES (1, NULL)
                ON CONFLICT (id) DO NOTHING
            """))
            conn.commit()
        return None

def update_last_processed_date(new_date):
    with staging_engine.connect() as conn:
        conn.execute(
            text("UPDATE etl_control SET last_orderdate = :new_date WHERE id=1"),
            {"new_date": new_date}
        )
        conn.commit()

# ------------------- Ambil last processed date -------------------
last_date = get_last_processed_date()
print(f"Last processed orderdate: {last_date}")

# ------------------- Extract incremental data -------------------
if last_date is None:
    # Ambil semua data
    query_sales = """
    SELECT soh.orderdate, soh.customerid, soh.territoryid, sod.productid,
           sod.orderqty, soh.totaldue
    FROM sales.salesorderdetail sod
    JOIN sales.salesorderheader soh ON sod.salesorderid = soh.salesorderid;
    """
else:
    # Ambil data baru yang orderdate > last_date
    query_sales = f"""
    SELECT soh.orderdate, soh.customerid, soh.territoryid, sod.productid,
           sod.orderqty, soh.totaldue
    FROM sales.salesorderdetail sod
    JOIN sales.salesorderheader soh ON sod.salesorderid = soh.salesorderid
    WHERE soh.orderdate > '{last_date}';
    """

df_sales = pd.read_sql(query_sales, source_engine)
if df_sales.empty:
    print("Tidak ada data baru untuk diproses.")
else:
    print(f"Data baru ditemukan: {len(df_sales)} baris.")

    # ------------------- Transform -------------------

    # Dim Customer
    query_customer = """
    SELECT c.customerid,
           pp.firstname || ' ' || COALESCE(pp.middlename || ' ', '') || pp.lastname AS customername
    FROM sales.customer c
    JOIN person.person pp ON c.personid = pp.businessentityid;
    """
    df_customer = pd.read_sql(query_customer, source_engine)
    df_customer['customerKey'] = range(1, len(df_customer)+1)

    # Dim Product
    query_product = """
    SELECT p.productid, pc.name AS productsubcategory, p.name AS productname
    FROM production.product p
    JOIN production.productsubcategory psc ON p.productsubcategoryid = psc.productsubcategoryid
    JOIN production.productcategory pc ON psc.productcategoryid = pc.productcategoryid;
    """
    df_product = pd.read_sql(query_product, source_engine)
    df_product['productKey'] = range(1, len(df_product)+1)

    # Dim Territory
    query_territory = """
    SELECT st.territoryid, sp.name AS provincename, cr.name AS countryregion
    FROM sales.salesterritory st
    JOIN person.stateprovince sp ON st.territoryID = sp.territoryID
    JOIN person.countryregion cr ON st.countryregioncode = cr.countryregioncode;
    """
    df_territory = pd.read_sql(query_territory, source_engine)
    df_territory['territoryKey'] = range(1, len(df_territory)+1)

    # Dim Time
    df_time = df_sales[['orderdate']].drop_duplicates()
    df_time['year'] = pd.to_datetime(df_time['orderdate']).dt.year
    df_time['month'] = pd.to_datetime(df_time['orderdate']).dt.month
    df_time['day'] = pd.to_datetime(df_time['orderdate']).dt.day
    df_time['timeKey'] = range(1, len(df_time)+1)

    # Join surrogate keys ke fact_sales
    df_sales = df_sales.merge(df_customer[['customerid', 'customerKey']], on='customerid', how='left')
    df_sales = df_sales.merge(df_product[['productid', 'productKey']], on='productid', how='left')
    df_sales = df_sales.merge(df_territory[['territoryid', 'territoryKey']], on='territoryid', how='left')
    df_sales = df_sales.merge(df_time[['orderdate', 'timeKey']], on='orderdate', how='left')

    # Agregasi fact_sales
    df_fact = df_sales.groupby(['customerKey', 'productKey', 'territoryKey', 'timeKey']).agg(
        totalQuantity=('orderqty', 'sum'),
        averageAmount=('totaldue', 'mean'),
        totalRevenue=('totaldue', 'sum')
    ).reset_index()
    df_fact['salesID'] = range(1, len(df_fact)+1)

    # ------------------- Load ke staging -------------------
    df_customer.to_sql('dim_customer', staging_engine, schema='star_schema', if_exists='replace', index=False)
    df_product.to_sql('dim_product', staging_engine, schema='star_schema', if_exists='replace', index=False)
    df_territory.to_sql('dim_territory', staging_engine, schema='star_schema', if_exists='replace', index=False)
    df_time[['timeKey', 'year', 'month', 'day']].to_sql('dim_time', staging_engine, schema='star_schema', if_exists='replace', index=False)
    df_fact.to_sql('fact_sales', staging_engine, schema='star_schema', if_exists='replace', index=False)

    print("Data berhasil ditransformasi dan dimuat ke staging.")

    # ------------------- Load ke Data Warehouse -------------------
    tables = ['dim_customer', 'dim_product', 'dim_territory', 'dim_time', 'fact_sales']

    def load_table(table_name):
        try:
            print(f"Loading table {table_name} ke Data Warehouse...")
            df = pd.read_sql(f'SELECT * FROM star_schema.{table_name}', staging_engine)

            inspector = inspect(dw_engine)
            if table_name in inspector.get_table_names():
                df.to_sql(table_name, dw_engine, if_exists='replace', index=False)
                print(f"Tabel {table_name} berhasil di-replace di Data Warehouse.")
            else:
                df.to_sql(table_name, dw_engine, if_exists='fail', index=False)
                print(f"Tabel {table_name} berhasil di-load di Data Warehouse.")
        except SQLAlchemyError as e:
            print(f"Gagal load tabel {table_name}: {e}")

    for table in tables:
        load_table(table)

    print("Load ke Data Warehouse selesai.")

    # ------------------- Update last processed date -------------------
    max_date = df_sales['orderdate'].max()
    update_last_processed_date(max_date)
    print(f"Last processed orderdate di-update ke: {max_date}")


------------------- Extraction and Incremental Transformation -------------------


ProgrammingError: (psycopg2.errors.InvalidSchemaName) no schema has been selected to create in
LINE 2:                 CREATE TABLE IF NOT EXISTS etl_control (
                                                   ^

[SQL: 
                CREATE TABLE IF NOT EXISTS etl_control (
                    id INT PRIMARY KEY,
                    last_orderdate DATE
                )
            ]
(Background on this error at: https://sqlalche.me/e/20/f405)

In [None]:
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime

# ------------------- Koneksi Database -------------------
hostname = "localhost"
port = 5432
username = "postgres"
password = "dataEngginer"
source_db = "adventureworks"
staging_db = "staggingDB"

source_engine = create_engine(f"postgresql+psycopg2://{username}:{password}@{hostname}:{port}/{source_db}")
staging_engine = create_engine(f"postgresql+psycopg2://{username}:{password}@{hostname}:{port}/{staging_db}")

# ------------------- PARAMETER BATAS WAKTU -------------------
# Ambil waktu terakhir dari staging (misal dari fact_sales, kolom orderdate)
try:
    last_time_query = "SELECT MAX(orderdate) FROM star_schema.fact_sales;"
    last_order_date = pd.read_sql(last_time_query, staging_engine).iloc[0, 0]
    if pd.isna(last_order_date):
        last_order_date = datetime(1900, 1, 1)  # jika kosong
except:
    last_order_date = datetime(1900, 1, 1)

print(f"Last order date from staging: {last_order_date}")

# ------------------- EXTRACT & TRANSFORM INCREMENTAL -------------------
query_sales = f"""
SELECT soh.orderdate, soh.customerid, soh.territoryid, sod.productid,
       sod.orderqty, soh.totaldue
FROM sales.salesorderdetail sod
JOIN sales.salesorderheader soh ON sod.salesorderid = soh.salesorderid
WHERE soh.orderdate > '{last_order_date}';
"""
df_sales = pd.read_sql(query_sales, source_engine)

if df_sales.empty:
    print("Tidak ada data baru untuk diproses.")
    exit()

# Dimensi
df_customer = pd.read_sql("SELECT customerid, personid FROM sales.customer", source_engine)
df_person = pd.read_sql("SELECT businessentityid, firstname, middlename, lastname FROM person.person", source_engine)
df_customer = df_customer.merge(df_person, left_on='personid', right_on='businessentityid', how='left')
df_customer['customername'] = df_customer['firstname'] + ' ' + df_customer['middlename'].fillna('') + ' ' + df_customer['lastname']
df_customer = df_customer[['customerid', 'customername']].drop_duplicates()

df_product = pd.read_sql("""
SELECT p.productid, pc.name AS productsubcategory, p.name AS productname
FROM production.product p
JOIN production.productsubcategory psc ON p.productsubcategoryid = psc.productsubcategoryid
JOIN production.productcategory pc ON psc.productcategoryid = pc.productcategoryid
""", source_engine)

df_territory = pd.read_sql("""
SELECT st.territoryid, sp.name AS provincename, cr.name AS countryregion
FROM sales.salesterritory st
JOIN person.stateprovince sp ON st.territoryID = sp.territoryID
JOIN person.countryregion cr ON st.countryregioncode = cr.countryregioncode;
""", source_engine)

# Time dimension
df_time = df_sales[['orderdate']].drop_duplicates()
df_time['year'] = pd.to_datetime(df_time['orderdate']).dt.year
df_time['month'] = pd.to_datetime(df_time['orderdate']).dt.month
df_time['day'] = pd.to_datetime(df_time['orderdate']).dt.day

# Simpan ke file sementara / staging
df_customer.to_csv("staging_dim_customer.csv", index=False)
df_product.to_csv("staging_dim_product.csv", index=False)
df_territory.to_csv("staging_dim_territory.csv", index=False)
df_time.to_csv("staging_dim_time.csv", index=False)
df_sales.to_csv("staging_fact_sales.csv", index=False)

print("Incremental extract & transform selesai. File CSV siap untuk load.")


In [None]:
print(f"Data baru dim_product sejak {last_etl_time}: {len(df_product)} rows")
print(df_product.head())
