# Python Script to Handle & Ingest .xlsx to Local Postgres DB

### Install Dependencies

In [1]:
import subprocess
import os


with open(os.devnull, 'wb') as devnull:
    subprocess.run(
        ['pip', 'install', '-r', '../requirements.txt'],
        stdout=devnull,
        stderr=devnull
    )

In [2]:
import sys

# Tambahkan folder root ke sys.path
sys.path.append('/Users/naufalnashif/Desktop/RF-OJK/Mas Adit OJK/Data Profile & Riwayat Entitiy Terbaru')

In [3]:
import pandas as pd
from sqlalchemy import create_engine, text
from config import FOLDER_PATH, APPLICATION_FILES, SCHEMA_NAME, DB_CONFIG, UPDATED_DATE
from ingest_helpers import load_excel_sheets, standardize_columns
import re

### Connect to DB

In [4]:
# DB Engine
engine = create_engine(
    f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
)

#### Make a new schema

In [5]:
print(SCHEMA_NAME)

rfojk_python_final


In [None]:
# SCHEMA_NAME = "rfojk_python_test"

In [6]:
with engine.connect() as conn:
    conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{SCHEMA_NAME}"'))
    conn.commit()

### .Xlsx > Sheet > To tabel > Ingest to DB

#### Script Baru

##### Drop view

In [11]:
import os
import psycopg2

# Ambil konfigurasi DB dari environment
DB_CONFIG = {
    'dbname': os.getenv('POSTGRES_DB'),
    'user': os.getenv('POSTGRES_USER'),
    'password': os.getenv('POSTGRES_PASSWORD'),
    'host': os.getenv('POSTGRES_HOST'),
    'port': os.getenv('POSTGRES_PORT')
}

# Validasi semua config tidak None
for k, v in DB_CONFIG.items():
    if v is None:
        raise ValueError(f"Missing value for DB config: {k}")

# Nama schema
SCHEMA_NAME = "rfojk_python_final"

# List view yang ingin di-drop
views_to_drop = [
    "vw_profil_entity_union",
    "vw_riwayat_pendirian_union",
    "vw_riwayat_direksi_komisaris_union",
    "vw_riwayat_pemegang_saham_union",
    "vw_riwayat_produk_aktivitas_union",
    "vw_riwayat_dps_union"
]

# Generate DROP VIEW statements
drop_scripts = [
    f'DROP VIEW IF EXISTS {SCHEMA_NAME}."{view}";'
    for view in views_to_drop
]

# Eksekusi drop script
try:
    conn = psycopg2.connect(**DB_CONFIG)
    cur = conn.cursor()
    
    for script in drop_scripts:
        print(f"Executing: {script}")
        cur.execute(script)
    
    conn.commit()
    print("✅ All views dropped successfully.")

except Exception as e:
    print("❌ Error during dropping views:", e)
    if conn:
        conn.rollback()

finally:
    if cur:
        cur.close()
    if conn:
        conn.close()

Executing: DROP VIEW IF EXISTS rfojk_python_final."vw_profil_entity_union";
Executing: DROP VIEW IF EXISTS rfojk_python_final."vw_riwayat_pendirian_union";
Executing: DROP VIEW IF EXISTS rfojk_python_final."vw_riwayat_direksi_komisaris_union";
Executing: DROP VIEW IF EXISTS rfojk_python_final."vw_riwayat_pemegang_saham_union";
Executing: DROP VIEW IF EXISTS rfojk_python_final."vw_riwayat_produk_aktivitas_union";
Executing: DROP VIEW IF EXISTS rfojk_python_final."vw_riwayat_dps_union";
✅ All views dropped successfully.


In [12]:
for file_name in APPLICATION_FILES:
    app_name = re.sub(r'(_?\d{8})?\.xlsx$', '', file_name).lower()
    full_path = os.path.join(FOLDER_PATH, file_name)
    print(f"📄 Processing {file_name}")

    sheets = load_excel_sheets(full_path)

    sheet_mapping = {
        'profil_entity': sheets.get(list(sheets)[0]),
        'riwayat_pendirian': sheets.get(list(sheets)[1]),
        'riwayat_direksi': sheets.get(list(sheets)[2]),
        'riwayat_komisaris': sheets.get(list(sheets)[3]),
        'riwayat_pemegang_saham': sheets.get(list(sheets)[4]),
        'riwayat_produk_aktivitas': sheets.get(list(sheets)[5]),
        'riwayat_dps': sheets.get(list(sheets)[6])
    }

    for sheet_key, df in sheet_mapping.items():
        if df is not None:
            df = standardize_columns(df)
            df.columns = df.columns.str.lower()  # 🔽 Pastikan semua kolom lowercase
            df.insert(0, 'application', app_name)

            # 🔍 Filter khusus untuk 'riwayat_pendirian'
            if sheet_key == 'riwayat_pendirian':
                app_upper = app_name.upper()

                df = df[
                    (
                        (app_upper == 'DAPOK') &
                        (df['sector'].str.strip() == 'Perbankan') &
                        (df['licensetype'].str.strip().isin(['Izin Usaha']))
                    )
                    |
                    # (
                    #     (app_upper == 'APKAP') &
                    #     (df['sector'].str.strip() == 'Pasar Modal') &
                    #     (df['licensetype'].str.strip().isin(['Izin STTD', 'Izin Usaha']))
                    # )
                    # |
                    (
                        (app_upper == 'SPRINT') &
                        (df['licensetype'].str.strip() == 'Izin Utama - Pendaftaran Izin') &
                        (df['sector'].str.strip().isin([
                            'Lembaga Pembiayaan Perusahaan Modal Ventura LKM dan LJK Lainnya',
                            'Perasuransian Penjaminan dan Dana Pensiun',
                            'Inovasi Teknologi Sektor Keuangan serta Aset Keuangan Digital dan Aset Kripto'
                        ]))
                    )
                ]

            table_name = f"{sheet_key}_{app_name}".lower()

            df.to_sql(
                table_name,
                engine,
                schema=SCHEMA_NAME,
                if_exists='replace',
                index=False
            )
            print(f"✅ Uploaded to table: {SCHEMA_NAME}.{table_name}")

📄 Processing APKAP.xlsx
✅ Uploaded to table: rfojk_python_final.profil_entity_apkap
✅ Uploaded to table: rfojk_python_final.riwayat_pendirian_apkap
✅ Uploaded to table: rfojk_python_final.riwayat_direksi_apkap
✅ Uploaded to table: rfojk_python_final.riwayat_komisaris_apkap
✅ Uploaded to table: rfojk_python_final.riwayat_pemegang_saham_apkap
✅ Uploaded to table: rfojk_python_final.riwayat_produk_aktivitas_apkap
✅ Uploaded to table: rfojk_python_final.riwayat_dps_apkap
📄 Processing SIPM_20250612.xlsx
✅ Uploaded to table: rfojk_python_final.profil_entity_sipm
✅ Uploaded to table: rfojk_python_final.riwayat_pendirian_sipm
✅ Uploaded to table: rfojk_python_final.riwayat_direksi_sipm
✅ Uploaded to table: rfojk_python_final.riwayat_komisaris_sipm
✅ Uploaded to table: rfojk_python_final.riwayat_pemegang_saham_sipm
✅ Uploaded to table: rfojk_python_final.riwayat_produk_aktivitas_sipm
✅ Uploaded to table: rfojk_python_final.riwayat_dps_sipm
📄 Processing SPRINT_20250612.xlsx
✅ Uploaded to table: 

#### Script lama

In [None]:
for file_name in APPLICATION_FILES:
    app_name = re.sub(r'(_?\d{8})?\.xlsx$', '', file_name).lower()
    full_path = os.path.join(FOLDER_PATH, file_name)
    print(f"📄 Processing {file_name}")

    sheets = load_excel_sheets(full_path)

    sheet_mapping = {
        'profil_entity': sheets.get(list(sheets)[0]),
        'riwayat_pendirian': sheets.get(list(sheets)[1]),
        'riwayat_direksi': sheets.get(list(sheets)[2]),
        'riwayat_komisaris': sheets.get(list(sheets)[3]),
        'riwayat_pemegang_saham': sheets.get(list(sheets)[4]),
        'riwayat_produk_aktivitas': sheets.get(list(sheets)[5]),
        'riwayat_dps': sheets.get(list(sheets)[6])
    }

    for sheet_key, df in sheet_mapping.items():
        if df is not None:
            df = standardize_columns(df)
            df.insert(0, 'application', app_name)
            table_name = f"{sheet_key}_{app_name}".lower()

            df.to_sql(
                table_name,
                engine,
                schema=SCHEMA_NAME,
                if_exists='replace',
                index=False
            )
            print(f"✅ Uploaded to table: {SCHEMA_NAME}.{table_name}")

📄 Processing APKAP.xlsx
✅ Uploaded to table: rfojk_python_test.profil_entity_apkap
✅ Uploaded to table: rfojk_python_test.riwayat_pendirian_apkap
✅ Uploaded to table: rfojk_python_test.riwayat_direksi_apkap
✅ Uploaded to table: rfojk_python_test.riwayat_komisaris_apkap
✅ Uploaded to table: rfojk_python_test.riwayat_pemegang_saham_apkap
✅ Uploaded to table: rfojk_python_test.riwayat_produk_aktivitas_apkap
✅ Uploaded to table: rfojk_python_test.riwayat_dps_apkap
📄 Processing SIPM_20250612.xlsx
✅ Uploaded to table: rfojk_python_test.profil_entity_sipm
✅ Uploaded to table: rfojk_python_test.riwayat_pendirian_sipm
✅ Uploaded to table: rfojk_python_test.riwayat_direksi_sipm
✅ Uploaded to table: rfojk_python_test.riwayat_komisaris_sipm
✅ Uploaded to table: rfojk_python_test.riwayat_pemegang_saham_sipm
✅ Uploaded to table: rfojk_python_test.riwayat_produk_aktivitas_sipm
✅ Uploaded to table: rfojk_python_test.riwayat_dps_sipm
📄 Processing SPRINT_20250612.xlsx
✅ Uploaded to table: rfojk_python_t

### Make View & Datamarts

#### Script baru

In [13]:
applications = ['APKAP', 'SIPM', 'SPRINT', 'DAPOK']

view_definitions = {
    "vw_profil_entity_union": {
        "base_name": "profil_entity",
        "columns": [
            "application", "institutionprofileid", "institutionname",
            "idorigin", "npwp", "companyemail", "headofficeaddress", "webaddress"
        ]
    },
    "vw_riwayat_pendirian_union": {
        "base_name": "riwayat_pendirian",
        "columns": [
            "application", "institutionprofileid", "institutionname", "idorigin", "sector",
            "subsector", "subsubsector", "legalentity", "licensetype",
            "licensenumber", "licensedate", "statusljk"
        ]
    },
    "vw_riwayat_direksi_komisaris_union": {
        "base_name": ["riwayat_direksi", "riwayat_komisaris"],
        "columns": [
            "application", "institutionprofileid", "commissionername",
            "commissionernationality", "commissionernik", "commissionerpassport",
            "position", "officiateeffectivedate", "officiateenddate", "officiateinactivedate"
        ]
    },
    "vw_riwayat_pemegang_saham_union": {
        "base_name": "riwayat_pemegang_saham",
        "columns": [
            "application", "institutionprofileid", "individualownername",
            "individualownernik", "individualownernpwp", "ownershipvalue",
            "ownershippercentage"
        ]
    },
    "vw_riwayat_produk_aktivitas_union": {
        "base_name": "riwayat_produk_aktivitas",
        "columns": [
            "application", "institutionprofileid", "productname", "productdescription",
            "producttype", "letternumber", "letterdate", "produteffectivedate", "productstatus"
        ]
    },
    "vw_riwayat_dps_union": {
        "base_name": "riwayat_dps",
        "columns": [
            "application", "institutionprofileid", "position",
            "shariasupervisoryboardname", "shariasupervisoryboardnationality",
            "shariasupervisoryboardnik", "shariasupervisoryboardpassport",
            "officiateeffectivedate", "officiateenddate", "officiateinactivedate"
        ]
    }
}


def generate_column_list(columns, application, sourcetable=None):
    quoted_columns = []
    for col in columns:
        if col == 'application':
            quoted_columns.append(f"'{application}' AS application")
        else:
            quoted_columns.append(f'"{col.lower()}"::text')

    if sourcetable:  # tambahkan sourcetable jika diperlukan
        quoted_columns.append(f"'{sourcetable}' AS sourcetable")

    quoted_columns.append(f"'{UPDATED_DATE}'::date AS updateddate")
    return ', '.join(quoted_columns)


for view_name, info in view_definitions.items():
    base_names = info['base_name']
    base_names = [base_names] if isinstance(base_names, str) else base_names
    columns = info['columns']

    union_queries = []

    for app in applications:
        for base in base_names:
            table = f'{SCHEMA_NAME}."{base.lower()}_{app.lower()}"'

            # Untuk view direksi_komisaris tambahkan flag sourcetable
            if view_name == "vw_riwayat_direksi_komisaris_union":
                select_clause = generate_column_list(columns, app, sourcetable=base)
            else:
                select_clause = generate_column_list(columns, app)

            union_queries.append(f"SELECT {select_clause} FROM {table}")

    union_sql = "\nUNION\n".join(union_queries)
    full_view_name = f'{SCHEMA_NAME}."{view_name}"'
    create_view_sql = f"""
    CREATE OR REPLACE VIEW {full_view_name} AS
    {union_sql};
    """

    dm_table_name = view_name.replace("vw_", "dm_")
    full_dm_table = f'{SCHEMA_NAME}."{dm_table_name}"'
    create_dm_table_sql = f"""
    DROP TABLE IF EXISTS {full_dm_table};

    CREATE TABLE {full_dm_table} AS
    SELECT * FROM {full_view_name};
    """

    try:
        with engine.begin() as conn:
            print(f"➡️ Creating view: {full_view_name}")
            conn.execute(text(create_view_sql))

            count = conn.execute(
                text(f"SELECT COUNT(*) FROM {full_view_name}")
            ).scalar()

            print(f"🧾 View has {count} rows")

            if count > 0:
                conn.execute(text(create_dm_table_sql))
                print(f"📄 Created table: {full_dm_table}")
            else:
                print(f"⚠️ Skipped table creation, view is empty")

    except Exception as e:
        print(f"❌ Error: {e}")

➡️ Creating view: rfojk_python_final."vw_profil_entity_union"
🧾 View has 13811 rows
📄 Created table: rfojk_python_final."dm_profil_entity_union"
➡️ Creating view: rfojk_python_final."vw_riwayat_pendirian_union"
🧾 View has 3402 rows
📄 Created table: rfojk_python_final."dm_riwayat_pendirian_union"
➡️ Creating view: rfojk_python_final."vw_riwayat_direksi_komisaris_union"
🧾 View has 16596 rows
📄 Created table: rfojk_python_final."dm_riwayat_direksi_komisaris_union"
➡️ Creating view: rfojk_python_final."vw_riwayat_pemegang_saham_union"
🧾 View has 16036 rows
📄 Created table: rfojk_python_final."dm_riwayat_pemegang_saham_union"
➡️ Creating view: rfojk_python_final."vw_riwayat_produk_aktivitas_union"
🧾 View has 19413 rows
📄 Created table: rfojk_python_final."dm_riwayat_produk_aktivitas_union"
➡️ Creating view: rfojk_python_final."vw_riwayat_dps_union"
🧾 View has 496 rows
📄 Created table: rfojk_python_final."dm_riwayat_dps_union"


##### Buat view agg relations

In [14]:
import psycopg2

# Ambil konfigurasi DB dari environment
DB_CONFIG = {
    'dbname': os.getenv('POSTGRES_DB'),
    'user': os.getenv('POSTGRES_USER'),
    'password': os.getenv('POSTGRES_PASSWORD'),
    'host': os.getenv('POSTGRES_HOST'),
    'port': os.getenv('POSTGRES_PORT')
}

# Pastikan tidak ada value yang None (optional)
for k, v in DB_CONFIG.items():
    if v is None:
        raise ValueError(f"Missing value for DB config: {k}")

# Koneksi ke database
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()


drop_scripts = [
    f"DROP TABLE IF EXISTS {SCHEMA_NAME}.agg_profil_entity_union_relations;",
    f"DROP TABLE IF EXISTS {SCHEMA_NAME}.agg_riwayat_pendirian_union_relations;",
    f"DROP TABLE IF EXISTS {SCHEMA_NAME}.agg_riwayat_direksi_komisaris_union_relations;",
    f"DROP TABLE IF EXISTS {SCHEMA_NAME}.agg_riwayat_pemegang_saham_union_relations;",
    f"DROP TABLE IF EXISTS {SCHEMA_NAME}.agg_riwayat_produk_aktivitas_union_relations;",
    f"DROP TABLE IF EXISTS {SCHEMA_NAME}.agg_riwayat_dps_union_relations;"
]

# Eksekusi drop script satu per satu
for drop_sql in drop_scripts:
    cur.execute(drop_sql)
    print(f"Executed: {drop_sql}")

conn.commit()

# Daftar query SQL untuk membuat table
sql_scripts = [

    # PROFIL ENTITY
    f"""
    create table if not exists {SCHEMA_NAME}.agg_profil_entity_union_relations as
    with 
    pendirian as (
        select institutionprofileid, count(*) as jml_pendirian
        from {SCHEMA_NAME}.dm_riwayat_pendirian_union
        group by institutionprofileid
    ),
    direksi as (
        select institutionprofileid, count(*) as jml_direksi_komisaris
        from {SCHEMA_NAME}.dm_riwayat_direksi_komisaris_union
        group by institutionprofileid
    ),
    pemegang as (
        select institutionprofileid, count(*) as jml_pemegang_saham
        from {SCHEMA_NAME}.dm_riwayat_pemegang_saham_union
        group by institutionprofileid
    ),
    produk as (
        select institutionprofileid, count(*) as jml_produk_aktivitas
        from {SCHEMA_NAME}.dm_riwayat_produk_aktivitas_union
        group by institutionprofileid
    ),
    dps as (
        select institutionprofileid, count(*) as jml_dps
        from {SCHEMA_NAME}.dm_riwayat_dps_union
        group by institutionprofileid
    )
    select 
        a.*,
        coalesce(b.jml_pendirian, 0) as jml_match_pendirian,
        coalesce(c.jml_direksi_komisaris, 0) as jml_match_direksi_komisaris,
        coalesce(d.jml_pemegang_saham, 0) as jml_match_pemegang_saham,
        coalesce(e.jml_produk_aktivitas, 0) as jml_match_produk_aktivitas,
        coalesce(f.jml_dps, 0) as jml_match_dps
    from {SCHEMA_NAME}.dm_profil_entity_union a
    left join pendirian b using (institutionprofileid)
    left join direksi c using (institutionprofileid)
    left join pemegang d using (institutionprofileid)
    left join produk e using (institutionprofileid)
    left join dps f using (institutionprofileid);
    """,

    # Riwayat Pendirian
    f"""
    create table if not exists {SCHEMA_NAME}.agg_riwayat_pendirian_union_relations as
    with 
    profil as (
        select institutionprofileid, count(*) as jml_profil
        from {SCHEMA_NAME}.dm_profil_entity_union
        group by institutionprofileid
    ),
    direksi as (
        select institutionprofileid, count(*) as jml_direksi_komisaris
        from {SCHEMA_NAME}.dm_riwayat_direksi_komisaris_union
        group by institutionprofileid
    ),
    pemegang as (
        select institutionprofileid, count(*) as jml_pemegang_saham
        from {SCHEMA_NAME}.dm_riwayat_pemegang_saham_union
        group by institutionprofileid
    ),
    produk as (
        select institutionprofileid, count(*) as jml_produk_aktivitas
        from {SCHEMA_NAME}.dm_riwayat_produk_aktivitas_union
        group by institutionprofileid
    ),
    dps as (
        select institutionprofileid, count(*) as jml_dps
        from {SCHEMA_NAME}.dm_riwayat_dps_union
        group by institutionprofileid
    )
    select 
        a.*,
        coalesce(b.jml_profil, 0) as jml_match_profil,
        coalesce(c.jml_direksi_komisaris, 0) as jml_match_direksi_komisaris,
        coalesce(d.jml_pemegang_saham, 0) as jml_match_pemegang_saham,
        coalesce(e.jml_produk_aktivitas, 0) as jml_match_produk_aktivitas,
        coalesce(f.jml_dps, 0) as jml_match_dps
    from {SCHEMA_NAME}.dm_riwayat_pendirian_union a
    left join profil b using (institutionprofileid)
    left join direksi c using (institutionprofileid)
    left join pemegang d using (institutionprofileid)
    left join produk e using (institutionprofileid)
    left join dps f using (institutionprofileid);
    """,

# Riwayat Direksi & Komisaris
f"""
create table if not exists {SCHEMA_NAME}.agg_riwayat_direksi_komisaris_union_relations as
with 
profil as (
    select institutionprofileid, count(*) as jml_profil
    from {SCHEMA_NAME}.dm_profil_entity_union
    group by institutionprofileid
),
pendirian as (
    select institutionprofileid, count(*) as jml_pendirian
    from {SCHEMA_NAME}.dm_riwayat_pendirian_union
    group by institutionprofileid
),
pemegang as (
    select institutionprofileid, count(*) as jml_pemegang_saham
    from {SCHEMA_NAME}.dm_riwayat_pemegang_saham_union
    group by institutionprofileid
),
produk as (
    select institutionprofileid, count(*) as jml_produk_aktivitas
    from {SCHEMA_NAME}.dm_riwayat_produk_aktivitas_union
    group by institutionprofileid
),
dps as (
    select institutionprofileid, count(*) as jml_dps
    from {SCHEMA_NAME}.dm_riwayat_dps_union
    group by institutionprofileid
)
select 
	a. *,
    coalesce(b.jml_profil, 0) as jml_match_profil,
    coalesce(c.jml_pendirian, 0) as jml_match_pendirian,
    coalesce(d.jml_pemegang_saham, 0) as jml_match_pemegang_saham,
    coalesce(e.jml_produk_aktivitas, 0) as jml_match_produk_aktivitas,
    coalesce(f.jml_dps, 0) as jml_match_dps
from {SCHEMA_NAME}.dm_riwayat_direksi_komisaris_union a
left join profil b using (institutionprofileid)
left join pendirian c using (institutionprofileid)
left join pemegang d using (institutionprofileid)
left join produk e using (institutionprofileid)
left join dps f using (institutionprofileid);
""",
#Riwayat Pemegang Saham
f"""
create table if not exists {SCHEMA_NAME}.agg_riwayat_pemegang_saham_union_relations as
with 
profil as (
    select institutionprofileid, count(*) as jml_profil
    from {SCHEMA_NAME}.dm_profil_entity_union
    group by institutionprofileid
),
pendirian as (
    select institutionprofileid, count(*) as jml_pendirian
    from {SCHEMA_NAME}.dm_riwayat_pendirian_union
    group by institutionprofileid
),
dirkom as (
    select institutionprofileid, count(*) as jml_direksi_komisaris
    from {SCHEMA_NAME}.dm_riwayat_direksi_komisaris_union
    group by institutionprofileid
),
produk as (
    select institutionprofileid, count(*) as jml_produk_aktivitas
    from {SCHEMA_NAME}.dm_riwayat_produk_aktivitas_union
    group by institutionprofileid
),
dps as (
    select institutionprofileid, count(*) as jml_dps
    from {SCHEMA_NAME}.dm_riwayat_dps_union
    group by institutionprofileid
)
select 
	a. *,
    coalesce(b.jml_profil, 0) as jml_match_profil,
    coalesce(c.jml_pendirian, 0) as jml_match_pendirian,
    coalesce(d.jml_direksi_komisaris, 0) as jml_match_direksi_komisaris,
    coalesce(e.jml_produk_aktivitas, 0) as jml_match_produk_aktivitas,
    coalesce(f.jml_dps, 0) as jml_match_dps
from {SCHEMA_NAME}.dm_riwayat_pemegang_saham_union a
left join profil b using (institutionprofileid)
left join pendirian c using (institutionprofileid)
left join dirkom d using (institutionprofileid)
left join produk e using (institutionprofileid)
left join dps f using (institutionprofileid);
""",

# Riwayat Produk & Aktivitas
f"""
create table if not exists {SCHEMA_NAME}.agg_riwayat_produk_aktivitas_union_relations as
with 
profil as (
    select institutionprofileid, count(*) as jml_profil
    from {SCHEMA_NAME}.dm_profil_entity_union
    group by institutionprofileid
),
pendirian as (
    select institutionprofileid, count(*) as jml_pendirian
    from {SCHEMA_NAME}.dm_riwayat_pendirian_union
    group by institutionprofileid
),
dirkom as (
    select institutionprofileid, count(*) as jml_direksi_komisaris
    from {SCHEMA_NAME}.dm_riwayat_direksi_komisaris_union
    group by institutionprofileid
),
saham as (
    select institutionprofileid, count(*) as jml_pemegang_saham
    from {SCHEMA_NAME}.dm_riwayat_pemegang_saham_union
    group by institutionprofileid
),
dps as (
    select institutionprofileid, count(*) as jml_dps
    from {SCHEMA_NAME}.dm_riwayat_dps_union
    group by institutionprofileid
)
select 
	a. *,
    coalesce(b.jml_profil, 0) as jml_match_profil,
    coalesce(c.jml_pendirian, 0) as jml_match_pendirian,
    coalesce(d.jml_direksi_komisaris, 0) as jml_match_direksi_komisaris,
    coalesce(e.jml_pemegang_saham, 0) as jml_match_pemegang_saham,
    coalesce(f.jml_dps, 0) as jml_match_dps
from {SCHEMA_NAME}.dm_riwayat_produk_aktivitas_union a
left join profil b using (institutionprofileid)
left join pendirian c using (institutionprofileid)
left join dirkom d using (institutionprofileid)
left join saham e using (institutionprofileid)
left join dps f using (institutionprofileid);
""",

#Riwayat DPS
f"""
create table if not exists {SCHEMA_NAME}.agg_riwayat_dps_union_relations as
with 
profil as (
    select institutionprofileid, count(*) as jml_profil
    from {SCHEMA_NAME}.dm_profil_entity_union
    group by institutionprofileid
),
pendirian as (
    select institutionprofileid, count(*) as jml_pendirian
    from {SCHEMA_NAME}.dm_riwayat_pendirian_union
    group by institutionprofileid
),
dirkom as (
    select institutionprofileid, count(*) as jml_direksi_komisaris
    from {SCHEMA_NAME}.dm_riwayat_direksi_komisaris_union
    group by institutionprofileid
),
saham as (
    select institutionprofileid, count(*) as jml_pemegang_saham
    from {SCHEMA_NAME}.dm_riwayat_pemegang_saham_union
    group by institutionprofileid
),
produk as (
    select institutionprofileid, count(*) as jml_produk
    from {SCHEMA_NAME}.dm_riwayat_produk_aktivitas_union
    group by institutionprofileid
)
select 
	a. *,
    coalesce(b.jml_profil, 0) as jml_match_profil,
    coalesce(c.jml_pendirian, 0) as jml_match_pendirian,
    coalesce(d.jml_direksi_komisaris, 0) as jml_match_direksi_komisaris,
    coalesce(e.jml_pemegang_saham, 0) as jml_match_pemegang_saham,
    coalesce(f.jml_produk, 0) as jml_match_produk
from {SCHEMA_NAME}.dm_riwayat_dps_union a
left join profil b using (institutionprofileid)
left join pendirian c using (institutionprofileid)
left join dirkom d using (institutionprofileid)
left join saham e using (institutionprofileid)
left join produk f using (institutionprofileid);
"""
]

# Eksekusi semua query
for script in sql_scripts:
    cur.execute(script)
    conn.commit()

# Tutup koneksi
cur.close()
conn.close()

Executed: DROP TABLE IF EXISTS rfojk_python_final.agg_profil_entity_union_relations;
Executed: DROP TABLE IF EXISTS rfojk_python_final.agg_riwayat_pendirian_union_relations;
Executed: DROP TABLE IF EXISTS rfojk_python_final.agg_riwayat_direksi_komisaris_union_relations;
Executed: DROP TABLE IF EXISTS rfojk_python_final.agg_riwayat_pemegang_saham_union_relations;
Executed: DROP TABLE IF EXISTS rfojk_python_final.agg_riwayat_produk_aktivitas_union_relations;
Executed: DROP TABLE IF EXISTS rfojk_python_final.agg_riwayat_dps_union_relations;


#### Syntax lama

In [9]:
applications = ['APKAP', 'SIPM', 'SPRINT', 'DAPOK']
schema = SCHEMA_NAME

view_definitions = {
    "vw_profil_entity_union": {
        "base_name": "profil_entity",
        "columns": [
            "application", "institutionprofileid", "institutionname",
            "idorigin", "npwp", "companyemail", "headofficeaddress", "webaddress"
        ]
    },
    "vw_riwayat_pendirian_union": {
        "base_name": "riwayat_pendirian",
        "columns": [
            "application", "institutionprofileid", "institutionname", "idorigin", "sector",
            "subsector", "subsubsector", "legalentity", "licensetype",
            "licensenumber", "licensedate", "statusljk"
        ]
    },
    "vw_riwayat_direksi_komisaris_union": {
        "base_name": ["riwayat_direksi", "riwayat_komisaris"],
        "columns": [
            "application", "institutionprofileid", "commissionername",
            "commissionernationality", "commissionernik", "commissionerpassport",
            "position", "officiateeffectivedate", "officiateenddate", "officiateinactivedate"
        ]
    },
    "vw_riwayat_pemegang_saham_union": {
        "base_name": "riwayat_pemegang_saham",
        "columns": [
            "application", "institutionprofileid", "individualownername",
            "individualownernik", "individualownernpwp", "ownershipvalue",
            "ownershippercentage"
        ]
    },
    "vw_riwayat_produk_aktivitas_union": {
        "base_name": "riwayat_produk_aktivitas",
        "columns": [
            "application", "institutionprofileid", "productname", "productdescription",
            "producttype", "letternumber", "letterdate", "produteffectivedate", "productstatus"
        ]
    },
    "vw_riwayat_dps_union": {
        "base_name": "riwayat_dps",
        "columns": [
            "application", "institutionprofileid", "position",
            "shariasupervisoryboardname", "shariasupervisoryboardnationality",
            "shariasupervisoryboardnik", "shariasupervisoryboardpassport",
            "officiateeffectivedate", "officiateenddate", "officiateinactivedate"
        ]
    }
}


def generate_column_list(columns, application):
    quoted_columns = []
    for col in columns:
        if col == 'application':
            quoted_columns.append(f"'{application}' AS application")
        else:
            quoted_columns.append(f'"{col.lower()}"::text')
    return ', '.join(quoted_columns)


for view_name, info in view_definitions.items():
    base_names = info['base_name']
    base_names = [base_names] if isinstance(base_names, str) else base_names
    columns = info['columns']

    union_queries = []

    for app in applications:
        for base in base_names:
            table = f'{schema}."{base.lower()}_{app.lower()}"'
            select_clause = generate_column_list(columns, app)
            union_queries.append(f"SELECT {select_clause} FROM {table}")

    union_sql = "\nUNION\n".join(union_queries)
    full_view_name = f'{schema}."{view_name}"'
    create_view_sql = f"""
    CREATE OR REPLACE VIEW {full_view_name} AS
    {union_sql};
    """

    dm_table_name = view_name.replace("vw_", "dm_")
    full_dm_table = f'{schema}."{dm_table_name}"'
    create_dm_table_sql = f"""
    DROP TABLE IF EXISTS {full_dm_table};

    CREATE TABLE {full_dm_table} AS
    SELECT * FROM {full_view_name};
    """

    try:
        with engine.begin() as conn:
            print(f"➡️ Creating view: {full_view_name}")
            conn.execute(text(create_view_sql))

            count = conn.execute(
                text(f"SELECT COUNT(*) FROM {full_view_name}")
            ).scalar()

            print(f"🧾 View has {count} rows")

            if count > 0:
                conn.execute(text(create_dm_table_sql))
                print(f"📄 Created table: {full_dm_table}")
            else:
                print(f"⚠️ Skipped table creation, view is empty")

    except Exception as e:
        print(f"❌ Error: {e}")


➡️ Creating view: rfojk_python_test."vw_profil_entity_union"
❌ Error: (psycopg2.errors.UndefinedTable) relation "rfojk_python_test.profil_entity_sipm" does not exist
LINE 5: ...headofficeaddress"::text, "webaddress"::text FROM rfojk_pyth...
                                                             ^

[SQL: 
    CREATE OR REPLACE VIEW rfojk_python_test."vw_profil_entity_union" AS
    SELECT 'APKAP' AS application, "institutionprofileid"::text, "institutionname"::text, "idorigin"::text, "npwp"::text, "companyemail"::text, "headofficeaddress"::text, "webaddress"::text FROM rfojk_python_test."profil_entity_apkap"
UNION
SELECT 'SIPM' AS application, "institutionprofileid"::text, "institutionname"::text, "idorigin"::text, "npwp"::text, "companyemail"::text, "headofficeaddress"::text, "webaddress"::text FROM rfojk_python_test."profil_entity_sipm"
UNION
SELECT 'SPRINT' AS application, "institutionprofileid"::text, "institutionname"::text, "idorigin"::text, "npwp"::text, "companyemail"::text