In [32]:
import duckdb

In [33]:
con = duckdb.connect('../localdatabase.db')
con.sql("INSTALL httpfs; LOAD httpfs;")
con.sql("INSTALL ducklake; LOAD ducklake;")

In [34]:
minioAuth = {
    'accessKey': 'minioadmin',
    'secretKey': 'miniopassword',
    'bucket': 'marketing-and-ecommerce',
    'endpoint': 'localhost:9000'
}

# --- 1. Konfigurasi MinIO (S3 API) ---
con.sql(f"SET s3_endpoint = '{minioAuth['endpoint']}';") 
con.sql(f"SET s3_access_key_id = '{minioAuth['accessKey']}';")
con.sql(f"SET s3_secret_access_key = '{minioAuth['secretKey']}';")
con.sql("SET s3_use_ssl = false;")
con.sql("SET s3_url_style = 'path';")

In [35]:
managedStoredDataPath = f"s3://{minioAuth['bucket']}/managedStoreData/" 
dbPath = '../localstagingdatabase.db'

In [36]:
con.sql(f"ATTACH 'ducklake:{dbPath}' AS datalakehouse (DATA_PATH '{managedStoredDataPath}');")
con.sql("CREATE SCHEMA IF NOT EXISTS datalakehouse.raw;") 
con.sql("CREATE SCHEMA IF NOT EXISTS datalakehouse.staging;")
con.sql("CREATE SCHEMA IF NOT EXISTS datalakehouse.model;") 

In [37]:
# List semua operasi CTAS yang akan dieksekusi (diasumsikan skema lama berada di katalog yang sama)
migration_queries = [
    # RAW (datamarketingseeds) -> datalakehouse.raw
    # Perbaikan: Mengganti skema lama menjadi datamarketingseeds.*
    "CREATE OR REPLACE TABLE datalakehouse.raw.campaigns AS SELECT * FROM datamarketingseeds.campaigns;",
    "CREATE OR REPLACE TABLE datalakehouse.raw.customers AS SELECT * FROM datamarketingseeds.customers;",
    "CREATE OR REPLACE TABLE datalakehouse.raw.events AS SELECT * FROM datamarketingseeds.events;",
    "CREATE OR REPLACE TABLE datalakehouse.raw.products AS SELECT * FROM datamarketingseeds.products;",
    "CREATE OR REPLACE TABLE datalakehouse.raw.transactions AS SELECT * FROM datamarketingseeds.transactions;",
    
    # STAGING (datamarketingclean) -> datalakehouse.staging
    # Perbaikan: Mengganti skema lama menjadi datamarketingclean.*
    "CREATE OR REPLACE TABLE datalakehouse.staging.campaigns AS SELECT * FROM datamarketingclean.campaigns;",
    "CREATE OR REPLACE TABLE datalakehouse.staging.customers AS SELECT * FROM datamarketingclean.customers;",
    "CREATE OR REPLACE TABLE datalakehouse.staging.events AS SELECT * FROM datamarketingclean.events;",
    "CREATE OR REPLACE TABLE datalakehouse.staging.products AS SELECT * FROM datamarketingclean.products;",
    "CREATE OR REPLACE TABLE datalakehouse.staging.transactions AS SELECT * FROM datamarketingclean.transactions;",
    
    # MODEL (datamodelling) -> datalakehouse.model
    # Perbaikan: Mengganti skema lama menjadi datamodelling.*
    "CREATE OR REPLACE TABLE datalakehouse.model.transactions_net_revenue AS SELECT * FROM datamodelling.transactions_net_revenue;",
    "CREATE OR REPLACE TABLE datalakehouse.model.customer_rfm AS SELECT * FROM datamodelling.customer_rfm;",
]

# Eksekusi kueri
print("\nüîÑ Mulai memigrasikan 12 tabel menggunakan CTAS (mode tunggal DB)...")
try:
    # --- Asumsi: Inisialisasi Koneksi ---
    # Jika Anda hanya menggunakan satu file database untuk semuanya:
    # con = duckdb.connect(database="localdatabase.db") 
    # con.sql("CREATE SCHEMA IF NOT EXISTS datalakehouse.raw;")
    # ... dst
    
    for query in migration_queries:
        con.sql(query)
        # Ambil nama skema.tabel untuk logging
        target_table_full = query.split(" ")[5]
        print(f"   - Migrasi berhasil: {target_table_full}")
    
    print("\nüéâ Migrasi 12 tabel selesai!")
    
    # Verifikasi jumlah total tabel
    catalog_name = "datalakehouse" # Anggap 'datalakehouse' adalah katalog Anda
    
    total_tables = con.sql(f"""
        SELECT count(*) 
        FROM information_schema.tables 
        WHERE table_catalog = '{catalog_name}' 
        AND table_schema IN ('raw', 'staging', 'model');
    """).fetchone()[0]
    
    print(f"üîé Total tabel yang termigrasi di Katalog **{catalog_name}**: **{total_tables}**")

except Exception as e:
    print(f"‚ùå Terjadi kesalahan saat migrasi: {e}")

finally:
    # con.close()
    print("Koneksi ditutup (asumsi dilakukan di luar skrip ini).")


üîÑ Mulai memigrasikan 12 tabel menggunakan CTAS (mode tunggal DB)...
   - Migrasi berhasil: AS
   - Migrasi berhasil: AS


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

   - Migrasi berhasil: AS
   - Migrasi berhasil: AS
   - Migrasi berhasil: AS
   - Migrasi berhasil: AS
   - Migrasi berhasil: AS


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

   - Migrasi berhasil: AS
   - Migrasi berhasil: AS
   - Migrasi berhasil: AS
   - Migrasi berhasil: AS
   - Migrasi berhasil: AS

üéâ Migrasi 12 tabel selesai!
üîé Total tabel yang termigrasi di Katalog **datalakehouse**: **12**
Koneksi ditutup (asumsi dilakukan di luar skrip ini).


In [38]:
con.close()