In [None]:
import jaydebeapi
import psycopg2
import pandas as pd
import os
from io import StringIO

# Koneksi ke Netezza
nz_host = "....."
nz_port = "......"
nz_db = "DWHDBPRD"
nz_user = ".........."
nz_password = "......"
jdbc_driver_loc = os.path.join("D:\\nzjdbc.jar")

# Koneksi ke PostgreSQL
pg_conn = psycopg2.connect(
    dbname="......",
    user=".....",
    password="......",
    host="localhost",
    port="5432"
)
pg_cursor = pg_conn.cursor()

# Table name
table_pq = "actb_yoy"

try:
    # Koneksi ke Netezza
    conn_nz = jaydebeapi.connect(
        "org.netezza.Driver",
        f"jdbc:netezza://{nz_host}:{nz_port}/{nz_db}",
        [nz_user, nz_password],
        jdbc_driver_loc
    )
    cursor_nz = conn_nz.cursor()

    # Menghapus tabel dan membuat ulang di PostgreSQL
    print(f"Menghapus tabel {table_pq} pada PostgreSQL...")
    pg_cursor.execute(f'DROP TABLE IF EXISTS "{table_pq}";')
    pg_cursor.execute(f'''
        CREATE TABLE "{table_pq}" (
            freq_trx INTEGER,
            value_trx NUMERIC,
            mean_trx_val NUMERIC,
            median_trx_val NUMERIC,
            drcr_ind VARCHAR(10),
            trn_dt DATE,
            day_of_week VARCHAR(20)
        );
    ''')
    pg_conn.commit()
    print(f"Tabel {table_pq} telah dibuat ulang.")

    # Ambil data dalam batch
    batch_size = 100000
    offset = 0

    while True:
        query = f"""
        WITH actb_yoy AS (
            SELECT 
                TRN_DT,
                cust_gl,
                module,
                AC_ENTRY_SR_NO,
                AC_BRANCH,
                AC_NO,
                AC_CCY,
                CATEGORY,
                TRN_CODE,
                LCY_AMOUNT,
                USER_ID,
                PRODUCT,
                DRCR_IND
            FROM BMIDWH.ACTB_ALL_HISTORY_SWTB
            WHERE EXTRACT(YEAR FROM TRN_DT) = EXTRACT(YEAR FROM CURRENT_DATE) - 1
            AND EXTRACT(MONTH FROM TRN_DT) <= EXTRACT(MONTH FROM CURRENT_DATE)
            AND EXTRACT(DAY FROM TRN_DT) <= EXTRACT(DAY FROM CURRENT_DATE)
        )
        SELECT 
            COUNT(*) AS freq_trx, 
            SUM(LCY_AMOUNT) AS value_trx,
            SUM(LCY_AMOUNT) / NULLIF(COUNT(*), 0) AS mean_trx_val,
            ROUND(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY LCY_AMOUNT)) AS median_trx_val,
            DRCR_IND, 
            TRN_DT, 
            RTRIM(TO_CHAR(TRN_DT, 'Day')) AS day_of_week
        FROM actb_yoy
        WHERE CUST_GL = 'A'
        AND MODULE = 'RT'
        AND ac_branch != '000'
        GROUP BY TRN_DT, DRCR_IND, RTRIM(TO_CHAR(TRN_DT, 'Day'))
        ORDER BY TRN_DT DESC
        LIMIT {batch_size} OFFSET {offset};
        """

        cursor_nz.execute(query)
        batch = cursor_nz.fetchall()

        if not batch:
            break  # Hentikan jika sudah tidak ada data

        # Ambil nama kolom
        columns = [desc[0] for desc in cursor_nz.description]

        # Konversi ke DataFrame
        df = pd.DataFrame(batch, columns=columns)

        # Hapus karakter NULL di seluruh data
        df = df.applymap(lambda x: x.replace("\x00", "") if isinstance(x, str) else x)

        # Hapus newline untuk mencegah error "literal newline found in data"
        df = df.applymap(lambda x: x.replace("\n", " ").replace("\r", " ") if isinstance(x, str) else x)

        # Simpan ke buffer
        buffer = StringIO()
        df.to_csv(buffer, index=False, header=False, sep="|", encoding="utf-8", na_rep="", quotechar='"')
        buffer.seek(0)

        # Copy ke PostgreSQL dengan kolom yang sesuai
        pg_cursor.copy_from(buffer, table_pq, sep="|", null="", columns=[
            "freq_trx",
            "value_trx",
            "mean_trx_val",
            "median_trx_val",
            "drcr_ind",
            "trn_dt",
            "day_of_week"
        ])
        pg_conn.commit()

        offset += batch_size
        print(f"Batch {offset} inserted.")

        # Kosongkan batch dari memori untuk efisiensi
        del batch

except Exception as e:
    print(f"Error: {e}")

finally:
    # Pastikan koneksi ditutup
    if "cursor_nz" in locals():
        cursor_nz.close()
    if "conn_nz" in locals():
        conn_nz.close()
    pg_cursor.close()
    pg_conn.close()

print("ELT selesai.")


Menghapus tabel actb_yoy pada PostgreSQL...
Tabel actb_yoy telah dibuat ulang.


  df = df.applymap(lambda x: x.replace("\x00", "") if isinstance(x, str) else x)
  df = df.applymap(lambda x: x.replace("\n", " ").replace("\r", " ") if isinstance(x, str) else x)


Batch 100000 inserted.
ELT selesai.


In [None]:
import jaydebeapi
import psycopg2
import pandas as pd
import os
from io import StringIO

# Koneksi ke Netezza
nz_host = "....."
nz_port = "5480"
nz_db = "DWHDBPRD"
nz_user = "....."
nz_password = "......"
jdbc_driver_loc = os.path.join("D:\\nzjdbc.jar")

# Koneksi ke PostgreSQL
pg_conn = psycopg2.connect(
    dbname=".......",
    user="....",
    password="Mu@.......",
    host="localhost",
    port="5432"
)
pg_cursor = pg_conn.cursor()

# Table name
table_pq = "gen_debitur_cust"

try:
    # Koneksi ke Netezza
    conn_nz = jaydebeapi.connect(
        "org.netezza.Driver",
        f"jdbc:netezza://{nz_host}:{nz_port}/{nz_db}",
        [nz_user, nz_password],
        jdbc_driver_loc
    )
    cursor_nz = conn_nz.cursor()

    # Menghapus tabel dan membuat ulang di PostgreSQL
    print(f"Menghapus tabel {table_pq} pada PostgreSQL...")
    pg_cursor.execute(f'DROP TABLE IF EXISTS "{table_pq}";')
    pg_cursor.execute(f'''
        CREATE TABLE "{table_pq}" (
            periode VARCHAR(10),
            ceiilingamount FLOAT,
            loanbalance FLOAT,
            opendate DATE,
            maturitydate DATE,
            overdueamount FLOAT,
            overdueinstallment FLOAT,
            overdueinterest FLOAT,
            penalty FLOAT,
            monthlyinstallment FLOAT,
            tenor FLOAT,
            dateofbirth DATE,
            installmentdate DATE,
            downpayment FLOAT,
            penghasilan VARCHAR(255),
            marital_status VARCHAR(50),
            education VARCHAR(50),
            customerid VARCHAR(50),
            accountno VARCHAR(50),
            collectibility VARCHAR(50),
            financingtypecode VARCHAR(50),
            branchcode VARCHAR(50),
            occupation VARCHAR(100),
            gender VARCHAR(10),
            segmentasi VARCHAR(100),
            collateral_category VARCHAR(100)
        );
    ''')
    pg_conn.commit()
    print(f"Tabel {table_pq} telah dibuat ulang.")

    # Ambil data dalam batch
    batch_size = 100000
    offset = 0

    while True:
        query = f"""
        SELECT
            PERIODE,
            CEIILINGAMOUNT,
            LOANBALANCE,
            OPENDATE,
            MATURITYDATE,
            OVERDUEAMOUNT,
            OVERDUEINSTALLMENT,
            OVERDUEINTEREST,
            PENALTY,
            MONTHLYINSTALLMENT,
            TENOR,
            DATEOFBIRTH,
            INSTALLMENTDATE,
            DOWNPAYMENT,
            PENGHASILAN,
            MARITAL_STATUS,
            EDUCATION,
            CUSTOMERID,
            ACCOUNTNO,
            COLLECTIBILITY,
            FINANCINGTYPECODE,
            BRANCHCODE,
            OCCUPATION,
            GENDER,
            SEGMENTASI,
            COLLATERAL_CATEGORY
        FROM BMIRPT.GEN_DEBITUR_CONSUMER
        WHERE PERIODE = '202501'
        ORDER BY PERIODE DESC
        LIMIT {batch_size} OFFSET {offset};
        """

        cursor_nz.execute(query)
        batch = cursor_nz.fetchall()

        if not batch:
            break  # Hentikan jika sudah tidak ada data

        # Ambil nama kolom dan ubah ke huruf kecil
        columns = [desc[0].lower() for desc in cursor_nz.description]

        # Konversi ke DataFrame dengan nama kolom kecil
        df = pd.DataFrame(batch, columns=columns)

        # Hapus karakter NULL di seluruh data
        df = df.map(lambda x: x.replace("\x00", "") if isinstance(x, str) else x)

        # Hapus newline untuk mencegah error "literal newline found in data"
        df = df.map(lambda x: x.replace("\n", " ").replace("\r", " ") if isinstance(x, str) else x)

        # Konversi kolom numerik dan tanggal ke tipe data yang benar
        numeric_cols = ["ceiilingamount", "loanbalance", "overdueamount", "overdueinstallment", "overdueinterest", "penalty", "monthlyinstallment", "downpayment", "tenor"]
        date_cols = ["opendate", "maturitydate", "dateofbirth", "installmentdate"]

        for col in numeric_cols:
            df[col] = pd.to_numeric(df[col], errors='coerce')
        
        for col in date_cols:
            df[col] = pd.to_datetime(df[col], errors='coerce').dt.date

        # Simpan ke buffer
        buffer = StringIO()
        df.to_csv(buffer, index=False, header=False, sep="|", encoding="utf-8", na_rep="", quotechar='"')
        buffer.seek(0)

        # Copy ke PostgreSQL dengan kolom yang sesuai
        pg_cursor.copy_from(buffer, table_pq, sep="|", null="", columns=df.columns.tolist())
        pg_conn.commit()

        offset += batch_size
        print(f"Batch {offset} inserted.")

        # Kosongkan batch dari memori untuk efisiensi
        del batch

except Exception as e:
    print(f"Error: {e}")

finally:
    # Pastikan koneksi ditutup
    if "cursor_nz" in locals():
        cursor_nz.close()
    if "conn_nz" in locals():
        conn_nz.close()
    pg_cursor.close()
    pg_conn.close()

print("ELT selesai.")

Menghapus tabel gen_debitur_cust pada PostgreSQL...
Tabel gen_debitur_cust telah dibuat ulang.
Batch 100000 inserted.
ELT selesai.


** Delete and Create

In [None]:
import jaydebeapi
import psycopg2
import pandas as pd
import os
from io import StringIO

# Koneksi ke Netezza
nz_host = "......"
nz_port = "5480"
nz_db = "....."
nz_user = "......"
nz_password = "....."
jdbc_driver_loc = os.path.join("D:\\nzjdbc.jar")

# Koneksi ke PostgreSQL
pg_conn = psycopg2.connect(
    dbname="......",
    user="......",
    password="......",
    host="......",
    port="5432"
)
pg_cursor = pg_conn.cursor()

# Table name
table_pq = "neraca_cab_monitoring"

try:
    # Koneksi ke Netezza
    conn_nz = jaydebeapi.connect(
        "org.netezza.Driver",
        f"jdbc:netezza://{nz_host}:{nz_port}/{nz_db}",
        [nz_user, nz_password],
        jdbc_driver_loc
    )
    cursor_nz = conn_nz.cursor()

    # Drop dan create ulang tabel di PostgreSQL
    print(f"Menghapus tabel {table_pq} pada PostgreSQL...")
    pg_cursor.execute(f'DROP TABLE IF EXISTS "{table_pq}";')
    pg_cursor.execute(f'''
        CREATE TABLE "{table_pq}" (
            time_sid INTEGER,
            main_branch VARCHAR(50),
            kategori_transaksi VARCHAR(100),
            kategori_pencatatan VARCHAR(100),
            balance_lcy FLOAT,
            dtd_lcy FLOAT,
            mtd_lcy FLOAT,
            ytd_lcy FLOAT,
            yoy_lcy FLOAT
        );
    ''')
    pg_conn.commit()
    print(f"Tabel {table_pq} telah dibuat ulang.")

    # Jalankan query CTE ke Netezza
    query = """ 
    WITH BI_CODE AS (
        SELECT  PAR_VALUE_2, PAR_VALUE_3, MAX(PAR_VAL_MAP) PAR_VAL_MAP , MAX(PAR_VAL_MAP_2) PAR_VAL_MAP_2, MAX(PAR_SEQ) PAR_SEQ
        FROM STGDWH.MS_BUSINESS_PARAM_DTL
        WHERE PAR_CD IN ('AN003') AND LENGTH(PAR_VALUE_2) = 6
        GROUP BY PAR_VALUE_2, PAR_VALUE_3
    ),
    BRANCH AS (
        SELECT DISTINCT MAIN_BRANCH FROM BMIRPT.DATA_N_REGULATORY WHERE MAIN_BRANCH IS NOT null
    ),
    MASTER_DATA_N AS (
        SELECT TIME_SID, CBANK_LINE_DR, PAR_DESC, AREA, MAIN_BRANCH, BRANCH, BRANCH_CODE, T1.GL_CODE, T1.GL_DESC, CURRENCY_CODE, CATEGORY,
        BALANCE_ORI*SIGN BALANCE_ORI, BALANCE_LCY*SIGN BALANCE_LCY, DTD_LCY*SIGN DTD_LCY, MTD_LCY*SIGN MTD_LCY, YTD_LCY*SIGN YTD_LCY, YOY_LCY*SIGN YOY_LCY
        FROM BMIRPT.DATA_N_REGULATORY T1
        LEFT JOIN (
            SELECT GL_CODE, GL_DESC, CASE WHEN PL_SPLIT_REQD = 'Y' THEN '-1' ELSE '1' END SIGN
            FROM REFDWH.GLTM_GLMASTER
        ) T2 ON T1.GL_CODE = T2.GL_CODE
    ),
    master AS (
        SELECT a.time_sid, A.MAIN_BRANCH, b.par_value_2, b.par_value_3,
        ROUND(COALESCE(SUM(a.BALANCE_LCY/1000),0),0) AS BALANCE_LCY,
        ROUND(COALESCE(SUM(a.DTD_LCY)/1000,0),0) AS DTD_LCY,
        ROUND(COALESCE(SUM(a.MTD_LCY)/1000,0),0) AS MTD_LCY,
        ROUND(COALESCE(SUM(a.YTD_LCY)/1000,0),0) AS YTD_LCY,
        ROUND(COALESCE(SUM(a.YOY_LCY)/1000,0),0) AS YOY_LCY  
        FROM (
            SELECT DISTINCT a.CBANK_LINE_DR, b.PAR_value_2, b.par_value_3
            FROM REFDWH.GLTM_GLMASTER a
            JOIN STGDWH.MS_BUSINESS_PARAM_DTL b ON REPLACE(a.CBANK_LINE_DR,'.','') = REPLACE(b.PAR_VALUE,'.','')
                AND PAR_CD IN ('AN003')
            WHERE leaf = 'Y' AND CBANK_LINE_DR IS NOT NULL
        ) b
        LEFT JOIN MASTER_DATA_N a ON a.CBANK_LINE_DR = b.CBANK_LINE_DR
        WHERE 1=1
        GROUP BY a.TIME_SID, A.MAIN_BRANCH, b.par_value_2, b.par_value_3
    ),
    get_time_sid AS (
        SELECT MAX(time_sid) AS report_date FROM master WHERE time_sid IS NOT NULL
    ),
    FINAL AS (
        SELECT
        COALESCE(T1.time_sid, (SELECT report_date FROM get_time_sid)) AS time_sid,
        MAIN_BRANCH,
        CASE WHEN T2.PAR_VALUE_2 IS NOT NULL AND T1.BALANCE_LCY < 0 THEN T2.PAR_VALUE_2 ELSE T1.PAR_VALUE_2 END PAR_VALUE_2,
        CASE WHEN T2.PAR_VALUE_2 IS NOT NULL AND T1.BALANCE_LCY < 0 THEN T2.PAR_VALUE_3 ELSE T1.PAR_VALUE_3 END PAR_VALUE_3,
        T1.PAR_VALUE_3,
        CASE WHEN T2.PAR_VALUE_2 IS NOT NULL AND T1.BALANCE_LCY < 0 THEN T1.BALANCE_LCY*-1 ELSE T1.BALANCE_LCY END BALANCE_LCY,
        CASE WHEN T2.PAR_VALUE_2 IS NOT NULL AND T1.BALANCE_LCY < 0 THEN 0 ELSE T1.DTD_LCY END DTD_LCY,
        CASE WHEN T2.PAR_VALUE_2 IS NOT NULL AND T1.BALANCE_LCY < 0 THEN 0 ELSE T1.MTD_LCY END MTD_LCY,
        CASE WHEN T2.PAR_VALUE_2 IS NOT NULL AND T1.BALANCE_LCY < 0 THEN 0 ELSE T1.YTD_LCY END YTD_LCY,
        CASE WHEN T2.PAR_VALUE_2 IS NOT NULL AND T1.BALANCE_LCY < 0 THEN 0 ELSE T1.YOY_LCY END YOY_LCY
        FROM master T1
        LEFT JOIN BI_CODE T2 ON T1.PAR_VALUE_2 = T2.PAR_VAL_MAP_2
    ),
    neraca_cab AS (
        SELECT 
            (SELECT report_date FROM get_time_sid) AS time_sid,
            T2.MAIN_BRANCH,
            T1.PAR_VALUE_2,
            T1.PAR_VALUE_3,
            NVL(T3.BALANCE_LCY, 0) BALANCE_LCY,
            NVL(T3.DTD_LCY, 0) DTD_LCY,
            NVL(T3.MTD_LCY, 0) MTD_LCY,
            NVL(T3.YTD_LCY, 0) YTD_LCY,
            NVL(T3.YOY_LCY, 0) YOY_LCY,
            CASE TRIM(SUBSTR(T1.PAR_VALUE_2, 1, 4))
                WHEN '0411' THEN 'PENDAPATAN OPERASI UTAMA'
                WHEN '0511' THEN 'BEBAN OPERASI UTAMA'
                WHEN '0612' THEN 'PENDAPATAN OPERASI LAINNYA'
                WHEN '0712' THEN 'BEBAN OPERASI LAINNYA'
                WHEN '0820' THEN 'PENDAPATAN NON OPERASIONAL'
                WHEN '0920' THEN 'BEBAN NON OPERASIONAL'
                ELSE 'UNMAP'
            END AS KATEGORI
        FROM BI_CODE T1
        CROSS JOIN BRANCH T2
        LEFT JOIN FINAL T3 ON T1.PAR_VALUE_2 = T3.PAR_VALUE_2 AND T2.MAIN_BRANCH = T3.MAIN_BRANCH
    )
    SELECT 
        TIME_SID,
        MAIN_BRANCH,
        PAR_VALUE_3 AS KATEGORI_TRANSAKSI,
        KATEGORI AS KATEGORI_PENCATATAN,
        BALANCE_LCY,
        DTD_LCY,
        MTD_LCY,
        YTD_LCY,
        YOY_LCY
    FROM neraca_cab
    """

    cursor_nz.execute(query)
    rows = cursor_nz.fetchall()
    columns = [desc[0].lower() for desc in cursor_nz.description]

    df = pd.DataFrame(rows, columns=columns)

    # Simpan ke buffer
    buffer = StringIO()
    df.to_csv(buffer, index=False, header=False, sep="|", encoding="utf-8", na_rep="", quotechar='"')
    buffer.seek(0)

    # Copy ke PostgreSQL
    pg_cursor.copy_from(buffer, table_pq, sep="|", null="", columns=df.columns.tolist())
    pg_conn.commit()
    print(f"Data berhasil dimasukkan ke tabel {table_pq}.")

except Exception as e:
    print(f"Error: {e}")

finally:
    if "cursor_nz" in locals():
        cursor_nz.close()
    if "conn_nz" in locals():
        conn_nz.close()
    pg_cursor.close()
    pg_conn.close()

print("ELT selesai.")


Menghapus tabel neraca_cab_monitoring pada PostgreSQL...
Tabel neraca_cab_monitoring telah dibuat ulang.
Data berhasil dimasukkan ke tabel neraca_cab_monitoring.
ELT selesai.


** Append

In [None]:
import jaydebeapi
import psycopg2
import pandas as pd
import os
from io import StringIO

# Koneksi ke Netezza
nz_host = "......"
nz_port = "....."
nz_db = "DWHDBPRD"
nz_user = "......"
nz_password = "......."
jdbc_driver_loc = os.path.join("D:\\nzjdbc.jar")

# Koneksi ke PostgreSQL
pg_conn = psycopg2.connect(
    dbname=".....",
    user=".....",
    password=".....",
    host="1....",
    port="5432"
)
pg_cursor = pg_conn.cursor()

# Table name
table_pq = "neraca_cab_monitoring"

try:
    # Koneksi ke Netezza
    conn_nz = jaydebeapi.connect(
        "org.netezza.Driver",
        f"jdbc:netezza://{nz_host}:{nz_port}/{nz_db}",
        [nz_user, nz_password],
        jdbc_driver_loc
    )
    cursor_nz = conn_nz.cursor()

    # Jalankan query ke Netezza
    print("Menjalankan query ke Netezza...")
    query = """ 
    WITH BI_CODE AS (
        SELECT  PAR_VALUE_2, PAR_VALUE_3, MAX(PAR_VAL_MAP) PAR_VAL_MAP , MAX(PAR_VAL_MAP_2) PAR_VAL_MAP_2, MAX(PAR_SEQ) PAR_SEQ
        FROM STGDWH.MS_BUSINESS_PARAM_DTL
        WHERE PAR_CD IN ('AN003') AND LENGTH(PAR_VALUE_2) = 6
        GROUP BY PAR_VALUE_2, PAR_VALUE_3
    ),
    BRANCH AS (
        SELECT DISTINCT MAIN_BRANCH FROM BMIRPT.DATA_N_REGULATORY WHERE MAIN_BRANCH IS NOT null
    ),
    MASTER_DATA_N AS (
        SELECT TIME_SID, CBANK_LINE_DR, PAR_DESC, AREA, MAIN_BRANCH, BRANCH, BRANCH_CODE, T1.GL_CODE, T1.GL_DESC, CURRENCY_CODE, CATEGORY,
        BALANCE_ORI*SIGN BALANCE_ORI, BALANCE_LCY*SIGN BALANCE_LCY, DTD_LCY*SIGN DTD_LCY, MTD_LCY*SIGN MTD_LCY, YTD_LCY*SIGN YTD_LCY, YOY_LCY*SIGN YOY_LCY
        FROM BMIRPT.DATA_N_REGULATORY T1
        LEFT JOIN (
            SELECT GL_CODE, GL_DESC, CASE WHEN PL_SPLIT_REQD = 'Y' THEN '-1' ELSE '1' END SIGN
            FROM REFDWH.GLTM_GLMASTER
        ) T2 ON T1.GL_CODE = T2.GL_CODE
    ),
    master AS (
        SELECT a.time_sid, A.MAIN_BRANCH, b.par_value_2, b.par_value_3,
        ROUND(COALESCE(SUM(a.BALANCE_LCY/1000),0),0) AS BALANCE_LCY,
        ROUND(COALESCE(SUM(a.DTD_LCY)/1000,0),0) AS DTD_LCY,
        ROUND(COALESCE(SUM(a.MTD_LCY)/1000,0),0) AS MTD_LCY,
        ROUND(COALESCE(SUM(a.YTD_LCY)/1000,0),0) AS YTD_LCY,
        ROUND(COALESCE(SUM(a.YOY_LCY)/1000,0),0) AS YOY_LCY  
        FROM (
            SELECT DISTINCT a.CBANK_LINE_DR, b.PAR_value_2, b.par_value_3
            FROM REFDWH.GLTM_GLMASTER a
            JOIN STGDWH.MS_BUSINESS_PARAM_DTL b ON REPLACE(a.CBANK_LINE_DR,'.','') = REPLACE(b.PAR_VALUE,'.','')
                AND PAR_CD IN ('AN003')
            WHERE leaf = 'Y' AND CBANK_LINE_DR IS NOT NULL
        ) b
        LEFT JOIN MASTER_DATA_N a ON a.CBANK_LINE_DR = b.CBANK_LINE_DR
        WHERE 1=1
        GROUP BY a.TIME_SID, A.MAIN_BRANCH, b.par_value_2, b.par_value_3
    ),
    get_time_sid AS (
        SELECT MAX(time_sid) AS report_date FROM master WHERE time_sid IS NOT NULL
    ),
    FINAL AS (
        SELECT
        COALESCE(T1.time_sid, (SELECT report_date FROM get_time_sid)) AS time_sid,
        MAIN_BRANCH,
        CASE WHEN T2.PAR_VALUE_2 IS NOT NULL AND T1.BALANCE_LCY < 0 THEN T2.PAR_VALUE_2 ELSE T1.PAR_VALUE_2 END PAR_VALUE_2,
        CASE WHEN T2.PAR_VALUE_2 IS NOT NULL AND T1.BALANCE_LCY < 0 THEN T2.PAR_VALUE_3 ELSE T1.PAR_VALUE_3 END PAR_VALUE_3,
        T1.PAR_VALUE_3,
        CASE WHEN T2.PAR_VALUE_2 IS NOT NULL AND T1.BALANCE_LCY < 0 THEN T1.BALANCE_LCY*-1 ELSE T1.BALANCE_LCY END BALANCE_LCY,
        CASE WHEN T2.PAR_VALUE_2 IS NOT NULL AND T1.BALANCE_LCY < 0 THEN 0 ELSE T1.DTD_LCY END DTD_LCY,
        CASE WHEN T2.PAR_VALUE_2 IS NOT NULL AND T1.BALANCE_LCY < 0 THEN 0 ELSE T1.MTD_LCY END MTD_LCY,
        CASE WHEN T2.PAR_VALUE_2 IS NOT NULL AND T1.BALANCE_LCY < 0 THEN 0 ELSE T1.YTD_LCY END YTD_LCY,
        CASE WHEN T2.PAR_VALUE_2 IS NOT NULL AND T1.BALANCE_LCY < 0 THEN 0 ELSE T1.YOY_LCY END YOY_LCY
        FROM master T1
        LEFT JOIN BI_CODE T2 ON T1.PAR_VALUE_2 = T2.PAR_VAL_MAP_2
    ),
    neraca_cab AS (
        SELECT 
            (SELECT report_date FROM get_time_sid) AS time_sid,
            T2.MAIN_BRANCH,
            T1.PAR_VALUE_2,
            T1.PAR_VALUE_3,
            NVL(T3.BALANCE_LCY, 0) BALANCE_LCY,
            NVL(T3.DTD_LCY, 0) DTD_LCY,
            NVL(T3.MTD_LCY, 0) MTD_LCY,
            NVL(T3.YTD_LCY, 0) YTD_LCY,
            NVL(T3.YOY_LCY, 0) YOY_LCY,
            CASE TRIM(SUBSTR(T1.PAR_VALUE_2, 1, 4))
                WHEN '0411' THEN 'PENDAPATAN OPERASI UTAMA'
                WHEN '0511' THEN 'BEBAN OPERASI UTAMA'
                WHEN '0612' THEN 'PENDAPATAN OPERASI LAINNYA'
                WHEN '0712' THEN 'BEBAN OPERASI LAINNYA'
                WHEN '0820' THEN 'PENDAPATAN NON OPERASIONAL'
                WHEN '0920' THEN 'BEBAN NON OPERASIONAL'
                ELSE 'UNMAP'
            END AS KATEGORI
        FROM BI_CODE T1
        CROSS JOIN BRANCH T2
        LEFT JOIN FINAL T3 ON T1.PAR_VALUE_2 = T3.PAR_VALUE_2 AND T2.MAIN_BRANCH = T3.MAIN_BRANCH
    )
    SELECT 
        TIME_SID,
        MAIN_BRANCH,
        PAR_VALUE_3 AS KATEGORI_TRANSAKSI,
        KATEGORI AS KATEGORI_PENCATATAN,
        BALANCE_LCY,
        DTD_LCY,
        MTD_LCY,
        YTD_LCY,
        YOY_LCY
    FROM neraca_cab
    """
    cursor_nz.execute(query)
    rows = cursor_nz.fetchall()
    columns = [desc[0].lower() for desc in cursor_nz.description]

    df = pd.DataFrame(rows, columns=columns)

    if df.empty:
        print("Tidak ada data yang diambil dari Netezza.")
    else:
        # Simpan ke buffer
        buffer = StringIO()
        df.to_csv(buffer, index=False, header=False, sep="|", encoding="utf-8", na_rep="", quotechar='"')
        buffer.seek(0)

        # Append data ke PostgreSQL
        print(f"Menambahkan data ke tabel {table_pq}...")
        pg_cursor.copy_from(buffer, table_pq, sep="|", null="", columns=df.columns.tolist())
        pg_conn.commit()
        print(f"{len(df)} baris data berhasil dimasukkan ke tabel {table_pq}.")

except Exception as e:
    print(f"Error: {e}")

finally:
    if "cursor_nz" in locals():
        cursor_nz.close()
    if "conn_nz" in locals():
        conn_nz.close()
    pg_cursor.close()
    pg_conn.close()

print("ELT selesai.")


Menjalankan query ke Netezza...
Menambahkan data ke tabel neraca_cab_monitoring...
3240 baris data berhasil dimasukkan ke tabel neraca_cab_monitoring.
ELT selesai.
