In [None]:
import pandas as pd
import numpy as np
import re
import os
import psycopg2
import traceback

from datetime import datetime
from difflib import SequenceMatcher
from sqlalchemy import create_engine, text
from psycopg2 import OperationalError, errors
from json import loads
from io import StringIO

# Fungsi untuk membentuk response sukses
def build_success_response(data):
    return {
        'status': 'success',
        'message': 'OK',
        'code': 'OK',
        'trace': 'OK',
        'data': data
    }

# Fungsi untuk membentuk response error dengan struktur tetap
def build_error_response(message, exception=None, code=None):
    return {
        'status': 'error',
        'message': f"{message}" if exception else message,
        'code': code or getattr(exception, 'pgcode', None),
        'trace': traceback.format_exc() if exception else None,
        'data': None
    }

# Fungsi utama untuk koneksi dan query
def connect_bigdata(schemas, tables):
    conn = None
    cursor = None
    try:
        conn = psycopg2.connect(
            dbname=os.getenv("DB_BIGDATA", "replikasi****"),
            user=os.getenv("DB_USER", "pos****"),
            password=os.getenv("DB_PASSWORD", "2lNyRKW3*****"),
            host=os.getenv("DB_HOST", "103.****"),
            port=os.getenv("DB_PORT", "5432")
        )
        cursor = conn.cursor()
        query = f'SELECT * FROM "{schemas}"."{tables}";'
        cursor.execute(query)
        rows = cursor.fetchall()
        columns = [desc[0] for desc in cursor.description]
        df = pd.DataFrame(rows, columns=columns)
        return build_success_response(df)

    except OperationalError as e:
        if "too many clients" in str(e):
            return build_error_response("Terlalu banyak koneksi ke server database. Coba lagi nanti.", e, code="TOO_MANY_CLIENTS")
        return build_error_response("Gagal koneksi ke database.", e, code="DB_CONNECT_ERROR")

    except errors.UndefinedTable as e:
        return build_error_response(f'Tabel "{schemas}.{tables}" tidak ditemukan.', e, code='TABLE_NOT_FOUND')

    except errors.UndefinedSchema as e:
        return build_error_response(f'Skema "{schemas}" tidak ditemukan.', e, code='SCHEMA_NOT_FOUND')

    except errors.InsufficientPrivilege as e:
        return build_error_response(f'Akses ditolak ke "{schemas}.{tables}".', e, code='ACCESS_DENIED')

    except psycopg2.Error as e:
        return build_error_response("Kesalahan database umum.", e, code="DB_ERROR")

    except Exception as e:
        return build_error_response("Terjadi error tak terduga saat koneksi.", e, code="UNEXPECTED_ERROR")

    finally:
        if cursor: cursor.close()
        if conn: conn.close()

# Fungsi utama preprocessing
def preprocessing(df):
    try:
        data = df.copy()

        # Drop kolom kategori/jumlah jika seluruh nilainya null
        if any(c in data.columns for c in ["kategori", "jumlah"]):
            if all(c in data.columns and data[c].isnull().all() for c in ["kategori", "jumlah"]):
                data.drop(columns=["kategori", "jumlah"], errors="ignore", inplace=True)
            elif "kategori" in data.columns and data["kategori"].isnull().all():
                data.drop(columns=["kategori"], inplace=True)
            elif "jumlah" in data.columns and data["jumlah"].isnull().all():
                data.drop(columns=["jumlah"], inplace=True)

        data.fillna(0, inplace=True)

        # data['satuan'] = 'Satuan'
        # data['nama_provinsi'] = 'Jawa Timur'
        # data['kode_provinsi'] = 35

        if 'satuan' not in data.columns:
            return build_error_response(f"Gagal Preprocessing: Kolom 'satuan' tidak ditemukan saat validasi.", code='SATUAN_NOT_FOUND')

        if (data['satuan'].dropna().nunique() == 1 and data['satuan'].isin(['0', '-', 'N/A', 'NA', 'N\\A']).all()) or data['satuan'].isna().all():
            return build_error_response(f"Gagal Preprocessing: Kolom 'satuan' null / tidak memiliki value.", code='SATUAN_IS_NULL')

        for del_kode in ['bps_kode_desa_kelurahan', 'bps_kode_kecamatan', 'kemendagri_kode_desa_kelurahan', 'kode_kabupaten_kota', 'kemendagri_kode_kecamatan', 'kode_kecamatan', 'kode_kabupaten', 'kode_kelurahan']:
            if del_kode in data.columns:
                data.drop(columns=[del_kode], inplace=True)

        for col in data.columns:
            col_data = data[col]

            if col_data.dtype == object:
                # print(col)
                col_series = col_data.astype(str).str.strip()
                col_series = (
                    col_series.str.upper()
                              .str.replace('_', ' ', regex=False)
                              .str.replace('\\n', ' ', regex=True)
                              .replace(['N/A', 'NA', 'N\\A', '-', ''], '0')
                )
                data[col] = col_series

        return build_success_response(data)

    except Exception as e:
        return build_error_response(f"Terjadi kesalahan saat preprocessing.", exception=e, code="PREPROCESSING_ERROR")

def periode_update(df):
    try:
        def standardize_periode(periode):
            month_map = {
                'JANUARI': '01', 'FEBRUARI': '02', 'MARET': '03', 'APRIL': '04',
                'MEI': '05', 'JUNI': '06', 'JULI': '07', 'AGUSTUS': '08',
                'SEPTEMBER': '09', 'OKTOBER': '10', 'NOVEMBER': '11', 'DESEMBER': '12'
            }
            tri_map = {'I': 'Q1', 'II': 'Q2', 'III': 'Q3', 'IV': 'Q4'}
            cat_map = {'I': 'C1', 'II': 'C2', 'III': 'C3'}
            sem_map = {'I': 'S1', 'II': 'S2'}

            if pd.isna(periode) or str(periode).strip() in ['', '-']:
                return '0'

            periode = str(periode).upper().strip()

            # Format: "dd-mm-yyyy"
            if re.match(r"\d{2}-\d{2}-\d{4}", periode):
                try:
                    return datetime.strptime(periode, "%d-%m-%Y").strftime("%Y-%m-%d")
                except:
                    return '0'

            # Format: "12 Juli 2025"
            match = re.match(r"(\d{1,2})\s+([A-Z]+)\s+(\d{4})", periode)
            if match:
                day, month_name, year = match.groups()
                month = month_map.get(month_name.upper())
                return f"{year}-{month}-{int(day):02d}" if month else '0'

            # ISO: "2025-07-12"
            if re.match(r"\d{4}-\d{2}-\d{2}", periode):
                return periode

            year = ['TAHUNAN', 'TAHUN']  # urutkan dari yang lebih panjang agar tidak tumpang tindih

            for y in year:
                if y in periode:
                    return periode.replace(y, '').strip()

            # Format: "JULI 2025"
            for bulan, num in month_map.items():
                if bulan in periode:
                    tahun_match = re.search(r'\d{4}', periode)
                    return f"{tahun_match.group()}-{num}" if tahun_match else '0'

            # Format: "TRIWULAN II 2025"
            if 'TRIWULAN' in periode:
                parts = periode.split()
                if len(parts) >= 3:
                    tahun = re.search(r'\d{4}', periode)
                    if tahun:
                        tri = tri_map.get(parts[1], parts[1])
                        return f"{tahun.group()}-{tri}"

            # Format: "CATURWULAN III 2023"
            if 'CATURWULAN' in periode:
                parts = periode.split()
                if len(parts) >= 3:
                    tahun = re.search(r'\d{4}', periode)
                    if tahun:
                        cat = cat_map.get(parts[1], parts[1])
                        return f"{tahun.group()}-{cat}"

            # Format: "SEMESTER I 2024"
            if 'SEMESTER' in periode:
                parts = periode.split()
                if len(parts) >= 3:
                    tahun = re.search(r'\d{4}', periode)
                    if tahun:
                        sem = sem_map.get(parts[1], parts[1])
                        return f"{tahun.group()}-{sem}"

            # Format akhir fallback: "2023-Q1", "2023-S1", "2023-C2"
            if any(x in periode for x in ['S1', 'S2', 'Q1', 'Q2', 'Q3', 'Q4', 'C1', 'C2', 'C3']):
                tahun_match = re.search(r'\d{4}', periode)
                return periode if tahun_match else '0'

            # Ambil tahun saja jika ditemukan
            tahun_match = re.search(r'\d{4}', periode)

            return tahun_match.group() if tahun_match else '0'

        # Cek kolom sumber
        if 'periode_update' in df.columns:
            df['periode_update'] = df['periode_update'].astype(str)

        elif 'periode' in df.columns:
            if 'tahun' in df.columns:
                # Jika kolom 'periode' bukan format 4 digit (bukan tahun penuh)
                mask = ~df['periode'].astype(str).str.match(r'^\d{4}$')
                if mask.any():
                    df['periode_update'] = df['periode'].astype(str) + " " + df['tahun'].astype(str)
                else:
                    df['periode_update'] = df['periode'].astype(str)

            else:
                df['periode_update'] = df['periode'].astype(str)

            df.drop(columns=['periode'], inplace=True, errors='ignore')


        elif 'tahun' in df.columns:
            df['periode_update'] = df['tahun'].astype(str)
        else:
            return build_error_response("Kolom 'periode_update', 'periode', atau 'tahun' tidak ditemukan.", code="PERIODE_COLUMN_NOT_FOUND")

        # Standarisasi nilai
        df['periode_update'] = df['periode_update'].apply(standardize_periode)
        df['tahun'] = df['periode_update'].str.extract(r'(\d{4})').fillna(0).astype(int)

        # Validasi hasil
        if df['periode_update'].isin(['0']).all():
            return build_error_response("Semua nilai 'periode_update' tidak valid.", code="PERIODE_INVALID_ALL")

        return build_success_response(df)

    except Exception as e:
        return build_error_response(f"Terjadi kesalahan saat memproses periode: {str(e)}", code="PERIODE_EXCEPTION")

def deteksi_wilayah(df):
    try:

      if df is None or df.empty:
            raise ValueError("DataFrame kosong atau tidak valid.")

      def best_match(target):
          return max(df.columns, key=lambda col: SequenceMatcher(None, target, col).ratio())

      expected = ['nama_provinsi', 'kabupaten_kota', 'nama_kecamatan', 'desa_kelurahan']
      matched = {key: best_match(key) for key in expected}
      scores = {key: SequenceMatcher(None, key, matched[key]).ratio() for key in expected}
    #   level = sum(score > 0.75 for score in scores.values())

      # deteksi eksplisit kolom
      kolom = [col.lower() for col in df.columns]
      kelurahan = any(k in kolom for k in ['bps_nama_desa_kelurahan', 'kemendagri_nama_desa_kelurahan', 'nama_kelurahan/desa', 'nama_kelurahan', 'nama_desa', 'kelurahan', 'desa'])
      kecamatan = any(k in kolom for k in ['kemendagri_nama_kecamatan', 'bps_nama_kecamatan', 'nama_kecamatan', 'kecamatan'])
      kabupaten = any(k in kolom for k in ['nama_kabupaten', 'kabupaten', 'kabupaten_kota', 'nama_kabupaten_kota'])
      provinsi  = any(k in kolom for k in ['nama_provinsi', 'provinsi', 'prov'])
    #   display(level, provinsi, kabupaten, kecamatan, kelurahan, scores, matched)

      if provinsi and not (kabupaten or kecamatan or kelurahan):
          return build_success_response(('data_provinsi', {'nama_provinsi': matched['nama_provinsi']}))

      elif kabupaten and not (kecamatan or kelurahan):
          return build_success_response((
              'data_kabupaten', {
                  'nama_provinsi': matched['nama_provinsi'],
                  'nama_kabupaten_kota': matched['kabupaten_kota']
              }
          ))

      elif kecamatan:
          return build_success_response((
              'data_kecamatan', {
                  'nama_provinsi': matched['nama_provinsi'],
                  'nama_kabupaten_kota': matched['kabupaten_kota'],
                  'bps_nama_kecamatan': matched['nama_kecamatan'],
                  'kemendagri_nama_kecamatan': matched['nama_kecamatan']
              }
          ))

      elif kelurahan:
          return build_success_response((
              'data_kelurahan', {
                  'nama_provinsi': matched['nama_provinsi'],
                  'nama_kabupaten_kota': matched['kabupaten_kota'],
                  'bps_nama_kecamatan': matched['nama_kecamatan'],
                  'kemendagri_nama_kecamatan': matched['nama_kecamatan'],
                  'bps_desa_kelurahan': matched['desa_kelurahan'],
                  'kemendagri_desa_kelurahan': matched['desa_kelurahan']
              }
          ))

      else:
          return build_error_response("Tidak dapat mendeteksi wilayah dari struktur data yang diberikan.", code = "DETECT_LOC_FAILED")

    except Exception as e:
        return build_error_response("Gagal mendeteksi wilayah.", exception=e, code = "DETECT_LOC_ERROR")

def conn_masterdata(status):
    """
    Menghubungkan ke tabel master berdasarkan status wilayah.
    Mengembalikan response success dengan dataframe, atau error jika terjadi masalah.
    """
    try:
        table_map = {
            'data_kabupaten': 'masterkabupaten',
            'data_kecamatan': 'masterkecamatan',
            'data_kelurahan': 'masterdesa'
        }

        if status == 'data_provinsi':
            return build_success_response(None)  # Tidak perlu tabel master untuk provinsi

        table_name = table_map.get(status)

        if table_name is None:
            return build_error_response(f"Tidak ditemukan tabel untuk status wilayah: {status}", code='INVALID_STATUS')

        query = f"SELECT * FROM masterdata.{table_name};"
        db_user = os.getenv("DB_USER", "postgres")
        db_password = os.getenv("DB_PASSWORD", "2lNyRKW3oc9kan8n")
        db_host = os.getenv("DB_HOST", "103.183.92.158")
        db_port = os.getenv("DB_PORT", "5432")
        db_bigdata_cleaned = os.getenv("DB_BIGDATA_CLEANED", "result_cleansing")

        engine = create_engine(f"postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_bigdata_cleaned}")
        datamaster = pd.read_sql(query, engine)

        return build_success_response(datamaster)

    except Exception as e:
        return build_error_response("Gagal mengambil data master wilayah.", exception=e, code="MASTERDATA_ERROR")

# hanya untuk data yang terdapat kabupaten kota
def format_kabupaten_kota(name):
    name = name.upper().strip()  # Pastikan nama dalam huruf besar dan tanpa spasi berlebih
    if (name.startswith('KOTA ')):
        return name # Jika sudah diawali "KOTA ", biarkan

    elif name.startswith('KABUPATEN '):
        return name # Jika sudah diawali "KABUPATEN ", biarkan

    elif name in ['KEDIRI', 'BLITAR', 'MALANG', 'PROBOLINGGO', 'PASURUAN', 'MOJOKERTO', 'MADIUN', 'SURABAYA', 'BATU']:
        return f'KOTA {name}'  # Jika nama kota ada di list, tambahkan "KOTA " di depannya

    elif name.startswith('KAB. '):
        return name.replace('KAB. ', 'KABUPATEN ')  # Jika "KAB. ", ubah menjadi "KABUPATEN "

    elif name.startswith('0'):
        return name  # Jika "0", ubah menjadi "0"

    return f'KABUPATEN {name}'  # Jika tidak, tambahkan "KABUPATEN {name}"

# hanya untuk data yang terdapat kecamatan
def format_kecamatan(name):
    name = name.upper().strip()  # Pastikan nama dalam huruf besar dan tanpa spasi berlebih
    if name.startswith('KECAMATAN '):
        return name.replace('KECAMATAN ', '')  # Jika "KECAMATAN ", ubah menjadi ""

    return name

# merging df dengan masterdata
def merge_df(df, masterdata, left_on_col, right_on):
    try:
        if df is None or masterdata is None:
            return build_error_response("Dataframe utama atau masterdata tidak boleh None.", code ='DATA_NULL')

        # Konversi single string ke list
        if isinstance(left_on_col, str):
            left_on_col = [left_on_col]
        if isinstance(right_on, str):
            right_on = [right_on]

        merged = df.merge(
            masterdata,
            left_on=left_on_col,
            right_on=right_on,
            how='left',
            suffixes=('_x', '_y')
        )

        # Hanya drop kolom dari df yang tidak digunakan sebagai join ke masterdata (menghindari kehilangan info penting)
        cols_to_drop = [col for col in left_on_col if col not in right_on]
        merged.drop(columns=cols_to_drop, errors='ignore', inplace=True)

        dup_bases = {c[:-2] for c in merged.columns if c.endswith(('_x', '_y'))}
        for base in dup_bases:
            col_x, col_y = f"{base}_x", f"{base}_y"

            if col_x in merged.columns and col_y in merged.columns:
                # Jika punya keduanya: pilih nilai _y; jika NaN pakai _x
                merged[base] = merged[col_y].combine_first(merged[col_x])
                merged.drop([col_x, col_y], axis=1, inplace=True)

            elif col_x in merged.columns:          # hanya _x
                merged.rename(columns={col_x: base}, inplace=True)

            elif col_y in merged.columns:          # hanya _y
                merged.rename(columns={col_y: base}, inplace=True)

        return build_success_response(merged)

    except Exception as e:
        return build_error_response(
            message="Gagal melakukan merge ke masterdata pada dataframe.",
            exception=e,
            code="MERGE_ERROR"
        )


def check_num(data, col):
    df = data.copy()
    df.fillna("0", inplace=True)

    try:
        # --- Pre-clean: hilangkan semua karakter non-numerik (kecuali . , -)
        cleaned = (
            df[col]
            .astype(str)
            .str.strip()
            .str.replace(r'[^0-9\.,-]', '', regex=True)
            .str.replace(r',-$', '', regex=True)
            .str.replace(',', '.', regex=True)
            .str.replace(r'\.+', '.', regex=True)  # ubah titik berulang menjadi satu titik
            .str.strip('.')
        )

        # --- Deteksi nilai invalid (kosong atau hanya simbol)
        invalid_mask = cleaned.str.fullmatch(r'[.,-]*', na=False)
        if invalid_mask.any():
            bad_samples = cleaned[invalid_mask].unique()[:3]
            return (
                "error",
                f"Pada kolom '{col}', terdapat {invalid_mask.sum()} nilai tidak valid "
                f"(kosong atau hanya simbol). Contoh: {bad_samples.tolist()}"
            )

        # --- Split berdasarkan titik
        split_parts = cleaned.apply(lambda s: [p for p in s.split('.') if p and p.strip() != ''])
        num_parts = split_parts.apply(len)
        last_len = np.where(
            num_parts >= 2,
            split_parts.apply(lambda x: len(x[-1]) if isinstance(x, list) and x else 0),
            0
        )

        # --- Validasi format
        if (num_parts > 2).any():
            bad_mask = num_parts > 2
            bad_samples = list(cleaned[bad_mask].unique())

            # cek apakah semua contoh berawalan 0. atau -0.
            all_small = all(
                isinstance(s, str) and re.match(r'^-?0\.', s.strip())
                for s in bad_samples
            )

            if all_small:
                # semua contoh seperti 0.xxx ‚Üí aman
                return "pass", "OK"

            return (
                "error",
                f"Pada kolom '{col}', data memiliki lebih dari 2 simbol parsing (titik/koma). "
                f"Contoh: {bad_samples[:-3]}"
                f"\nData yang bukan desimal tidak perlu ada parsial (titik/koma)"
            )


        if ((num_parts <= 2).all()) and (last_len > 2).any():
            bad_mask = (num_parts >= 2) & (last_len > 2)
            bad_samples = list(cleaned[bad_mask].unique())

            # cek apakah semua contoh berawalan 0. atau -0.
            all_small = all(
                isinstance(s, str) and re.match(r'^-?0\.', s.strip())
                for s in bad_samples
            )

            if all_small:
                # semua contoh seperti 0.xxx ‚Üí aman
                return "pass", "OK"

            return (
                "error",
                f"Pada kolom '{col}', data terindikasi desimal namun memiliki lebih dari 2 digit "
                f"di belakang koma. Contoh: {bad_samples.tolist()}"
                f"\nData desimal harus 2 angka di belakang koma, jika bukan desimal tidak perlu ada parsial (titik/koma)"
            )

        if (cleaned.str.contains(r'\.\.', regex=True)).any():
            bad_samples = cleaned[cleaned.str.contains(r'\.\.', regex=True)].unique()[:3]
            return (
                "error",
                f"Pada kolom '{col}', data memiliki dua titik berurutan (format salah). "
                f"Contoh: {bad_samples.tolist()}"
            )

        if cleaned.str.endswith('.').any():
            bad_samples = cleaned[cleaned.str.endswith('.')].unique()[:3]
            return (
                "error",
                f"Pada kolom '{col}', beberapa data berakhir dengan titik (kemungkinan salah format). "
                f"Contoh: {bad_samples.tolist()}"
            )

        # --- Jika semua lolos
        return "pass", "OK"

    except Exception as e:
        return "error", f"[ERROR] check_num gagal pada kolom '{col}': {e}"


def clean_num(value):
    if pd.isna(value):
        return np.nan

    s = str(value).strip()

    # --- tangani nilai kosong atau simbol aneh
    if s in ['', '-', 'nan', 'NaN', 'None', 'null']:
        return np.nan

    neg = False
    # tanda negatif dalam tanda kurung (misal "(123)")
    if s.startswith('(') and s.endswith(')'):
        neg = True
        s = s[1:-1]

    # hapus spasi, plus, dan simbol non-numerik
    s = s.replace(' ', '').replace('+', '')
    s = re.sub(r'[^0-9,.-]', '', s)

    # jika kosong setelah dibersihkan
    if s in ['', '.', ',', '-']:
        return np.nan

    # normalisasi desimal
    s = s.replace(',', '.')

    # ganti titik berurutan menjadi satu titik
    s = re.sub(r'\.+', '.', s)

    # hapus titik di awal/akhir
    s = s.strip('.')

    # jika lebih dari satu titik ‚Üí ambil titik terakhir sebagai desimal
    if s.count('.') > 1:
        parts = s.split('.')
        s = ''.join(parts[:-1]) + '.' + parts[-1]

    # jika string berakhir dengan titik ‚Üí buang
    if s.endswith('.'):
        s = s[:-1]

    # ubah ke float
    try:
        val = float(s)
        # bulatkan ke 2 desimal
        val = round(val, 2)
        # ubah ke int jika tidak punya desimal
        if val.is_integer():
            val = int(val)
    except Exception:
        return np.nan

    return -val if neg else val



def transpose_data(df, data_type):
    """
    Parameters
    - df: pandas DataFrame input
    - data_type: 'agregat' or 'transaksi'
    - clean_numeric_func: function(value) -> float or None ; jika None, pakai pd.to_numeric fallback
    - numeric_threshold: proporsi minimal yang harus convertible untuk dianggap numeric (0..1)
    """
    try:
        data = df.copy()

        # pastikan ada kolom id (jika tidak ada, buat sebagai string index ‚Äî tapi beri tanda)
        if 'id' not in data.columns:
            data['id'] = data.index.astype(str)

        # daftar kolom identitas wilayah (hanya ambil yang ada di df)
        wilayah_cols_map = {
            'provinsi': ['kode_provinsi', 'nama_provinsi'],
            'kabupaten_kota': ['kode_kabupaten_kota', 'nama_kabupaten_kota'],
            'kecamatan': ['bps_kode_kecamatan', 'bps_nama_kecamatan', 'kemendagri_kode_kecamatan', 'kemendagri_nama_kecamatan'],
            'desa': ['bps_kode_desa_kelurahan', 'bps_desa_kelurahan', 'kemendagri_kode_desa_kelurahan', 'kemendagri_desa_kelurahan']
        }

        identitas_wilayah = [c for group in wilayah_cols_map.values() for c in group if c in data.columns]

        # kolom exclude dasar (tambahkan identitas wilayah yang ditemukan)
        exclude_cols = ['id_index', 'id', 'id_kategori', 'kategori', 'jumlah', 'periode_update', 'satuan', 'periode', 'tahun'] + identitas_wilayah
        other_cols = [col for col in data.columns if col not in exclude_cols]

        # cek apakah kolom kategori/jumlah ada dan apakah "kosong"
        def is_col_empty_like(col):
            if col not in data.columns:
                return True
            s = data[col].astype(str).str.strip().replace({'nan': None})
            # treat as empty if all null or all token empty or '0'
            return s.isin(['None', 'nan', '', '0', 'None']).all()

        kategori_exists = 'kategori' in data.columns and not is_col_empty_like('kategori')
        jumlah_exists = 'jumlah' in data.columns and not is_col_empty_like('jumlah')

        if 'kategori' in data.columns or 'jumlah' in data.columns:
            if not kategori_exists and not jumlah_exists:
                # drop both if totally empty
                data.drop(columns=['kategori', 'jumlah'], errors='ignore', inplace=True)

            elif kategori_exists and not jumlah_exists:
                if 'kategorikal' not in other_cols:
                    # rename kategori -> kategorikal (kategori is categorical)
                    data = data.rename(columns={'kategori': 'kategorikal'})
                    other_cols.append('kategorikal')

                elif 'kategorikal' in other_cols:
                    # rename kategori -> kategori_clean (kategori is kategori_clean)
                    data = data.rename(columns={'kategori': 'kategori_clean'})
                    other_cols.append('kategori_clean')

            elif jumlah_exists and not kategori_exists:
                # rename jumlah -> total (jumlah numeric)
                if 'total' not in other_cols:
                    data = data.rename(columns={'jumlah': 'total'})
                    other_cols.append('total')

                elif 'total' in other_cols:
                    data = data.rename(columns={'jumlah': 'jumlah_clean'})
                    other_cols.append('jumlah_clean')

            elif jumlah_exists and kategori_exists:
                if 'kategorikal' not in other_cols:
                    data = data.rename(columns={'kategori': 'kategorikal'})
                    other_cols.append('kategorikal')

                elif 'kategorikal' in other_cols:
                    data = data.rename(columns={'kategori': 'kategori_clean'})
                    other_cols.append('kategori_clean')

                if 'total' not in other_cols:
                    data = data.rename(columns={'jumlah': 'total'})
                    other_cols.append('total')

                elif 'total' in other_cols:
                    data = data.rename(columns={'jumlah': 'jumlah_clean'})
                    other_cols.append('jumlah_clean')

            else:
                pass

        # normalisasi token missing sederhana pada kolom kandidat
        tokens_na = ['N/A', 'NA', 'N\\A', '-', '']
        data[other_cols] = data[other_cols].replace(tokens_na, np.nan)

        # --- Deteksi kolom numerik kandidat ---
        def convert_ratio(series):
            # rasio nilai yang bisa dikonversi ke numeric (before cleaning)
            try:
                return pd.to_numeric(series, errors='coerce').notna().mean()
            except Exception:
                return 0.0

        # kolom yang sudah numeric dtype
        already_numeric = [col for col in other_cols if pd.api.types.is_numeric_dtype(data[col])]

        # kolom yang sebagian besar convertible ke numeric (sebelum cleaning)
        candidate_num_text = [col for col in other_cols if convert_ratio(data[col]) >= 0.5]

        data_df = data.copy()
        clean_success = []
        err_msg = {}
        attempted = []

        for col in candidate_num_text:
            attempted.append(col)
            try:
                stat, msg = check_num(data_df, col)

                if stat == "pass":
                    data[col] = data_df[col].apply(clean_num)
                    clean_success.append(col)
                else:
                    err_msg[col] = msg

            except Exception as e:
                return build_error_response(f"[ERROR] Gagal memproses kolom '{col}': {e}", code="ERROR_CLEAN_NUMTEXT")

        # final numeric columns (unique)
        num_cols_final = list(dict.fromkeys(already_numeric + clean_success))

        # kolom yang gagal dibersihkan
        error_cols = [col for col in attempted if col not in clean_success]

        if error_cols:
            first_col, first_err = next(iter(err_msg.items()), (None, None))
            return build_error_response(
                f"Data pada kolom numerik {error_cols[:3]} dst. belum bisa dikonversi karena berpotensi ambiguitas data.\n{first_err}",
                code="ERROR_CONVERT_VALUE"
            )

        data[num_cols_final] = data[num_cols_final].apply(pd.to_numeric, errors='coerce')

        for col in num_cols_final:
            non_null = data[col].dropna()
            if not non_null.empty and ((non_null.round(6) % 1).abs() < 1e-6).all():
                data[col] = data[col].round().astype('Int64')
            else:
                data[col] = data[col].astype(float)

        # cat columns = other_cols minus numeric final
        cat_cols = [col for col in other_cols if col not in num_cols_final]

        # --- Branch untuk agregat ---
        if data_type == 'agregat':
            try:
                # Jika belum didefinisikan df_melt -> lakukan melt (kebanyakan kasus)
                if not ('kategori' in data.columns and 'jumlah' in data.columns):

                    id_vars = [col for col in data.columns if col not in num_cols_final + ['kategori', 'jumlah']]
                    if not id_vars:
                        return build_error_response("Tidak ada id_vars untuk melt.", code="MELT_NO_IDVARS")

                    df_melt = data.melt(
                        id_vars=id_vars,
                        value_vars=num_cols_final,
                        var_name='kategori',
                        value_name='jumlah'
                    )
                else:
                    # jika kategori & jumlah ada (tidak di-melt), gunakan copy
                    df_melt = data.copy()

                if df_melt.empty:
                    return build_error_response(
                        f"Hasil transpose kosong. Pastikan memiliki kolom numerik yang tepat. Kolom numeric: {num_cols_final}",
                        code="MELT_AGGREGATE_FAILED"
                    )

                # normalize kategori string
                if 'kategori' in df_melt.columns:
                    df_melt['kategori'] = df_melt['kategori'].astype(str).str.replace('_', ' ', regex=False)

                # uppercase & strip object columns
                for col in df_melt.select_dtypes(include='object').columns:
                    df_melt[col] = df_melt[col].fillna('0').astype(str).str.strip().str.upper()

                # fill remaining NaN untuk konsistensi (tidak wajib)
                df_melt.fillna(0, inplace=True)

                # order by id then kategori (dense rank)
                if 'kategori' in df_melt.columns:
                    df_melt['_order'] = df_melt['kategori'].rank(method='dense').astype(int)
                else:
                    df_melt['_order'] = 0

                # reset index, buat id_index incremental (integer)
                df_melt = df_melt.reset_index(drop=True).sort_values(['id', '_order'], ignore_index=True).drop(columns=['_order'])
                df_melt['id_index'] = (df_melt.index + 1).astype(int)

                # build ordered cols, only include cols that exist
                agregat_cols = ['periode_update'] + [c for c in cat_cols if c in df_melt.columns] + ['kategori', 'jumlah', 'satuan', 'tahun']
                ordered_cols = ['id_index', 'id'] + [c for c in identitas_wilayah if c in df_melt.columns] + [c for c in agregat_cols if c in df_melt.columns]

                return build_success_response(df_melt[ordered_cols])

            except Exception as e:
                return build_error_response("Gagal melakukan transposisi data agregat.", exception=e, code="AGGREGATE_TRANSPOSE_ERROR")

        # --- Branch untuk transaksi ---
        elif data_type == 'transaksi':
            try:
                # standardize object cols
                for col in data.select_dtypes(include='object').columns:
                    data[col] = data[col].fillna('0').astype(str).str.strip().str.upper().str.replace('_', ' ', regex=False)

                # if kategori/jumlah actually contain data, ensure they are included
                if 'kategorikal' in data.columns:
                    if not data['kategorikal'].astype(str).str.strip().isin(['0', '-', 'N/A', 'NA', 'N\\A', '']).all():
                        data = data.rename(columns={'kategorikal': 'kategori'})
                        other_cols.remove('kategorikal')

                        if 'kategori' not in other_cols:
                            other_cols.append('kategori')

                if 'total' in data.columns:
                    if not data['total'].astype(str).str.strip().isin(['0', '-', 'N/A', 'NA', 'N\\A', '']).all():
                        data = data.rename(columns={'total': 'jumlah'})
                        other_cols.remove('total')

                        if 'jumlah' not in other_cols:
                            other_cols.append('jumlah')

                # transaksi cols recompute (only existing)
                transaksi_cols = [c for c in other_cols if c in data.columns] + [c for c in ['periode_update', 'satuan', 'tahun'] if c in data.columns]

                # create safe id_index
                data = data.reset_index(drop=True)
                data['id_index'] = (data.index + 1).astype(int)

                ordered_cols = ['id_index', 'id'] + [c for c in identitas_wilayah if c in data.columns] + transaksi_cols

                # ensure columns exist
                ordered_cols = [c for c in ordered_cols if c in data.columns]

                data.fillna(0, inplace=True)
                return build_success_response(data[ordered_cols])

            except Exception as e:
                return build_error_response("Gagal melakukan transposisi data transaksi.", exception=e, code="TRANSACTION_TRANSPOSE_ERROR")

        else:
            return build_error_response("Jenis data_type tidak valid. Harus 'agregat' atau 'transaksi'.", code="INVALID_DATATYPE")

    except Exception as e:
        return build_error_response("Terjadi kesalahan umum dalam fungsi transpose_data.", exception=e, code="TRANSPOSE_DATA_ERROR")


def simpan(data, schemas, tables):
    try:
      # Konversi data ke DataFrame
      if isinstance(data, pd.DataFrame):
          df = data
      elif isinstance(data, str):
          try:
              df = pd.read_json(StringIO(data))
          except Exception as e:
              return build_error_response(f"JSON string tidak valid: {e}", exception=e, code="JSON_SAVE_ERROR")
      elif isinstance(data, (dict, list)):
          try:
              df = pd.DataFrame(data)
          except Exception as e:
              return build_error_response(f"Dictionary tidak bisa dikonversi ke DataFrame: {e}", exception=e, code="DICT_DF_SAVE_FAILED")
      else:
          return build_error_response(f"Input 'data' harus berupa DataFrame, JSON string, atau dictionary.", exception=e, code="DICT_DF_REQ")

      if df is None or df.empty:
          return build_error_response(f"Data Kosong / Terjadi Kesalahan saat Cleansing Sehingga DataFrame Kosong", code="DATA_NULL_SAVE")

      # Deteksi level wilayah dan kolom index
      kolom = set(col.lower() for col in df.columns)

      opsi_wilayah = [
          (['nama_provinsi', 'nama_kabupaten_kota', 'bps_nama_kecamatan', 'bps_nama_desa_kelurahan'], 'data_kelurahan', 'bps_nama_desa_kelurahan'),
          (['nama_provinsi', 'nama_kabupaten_kota', 'bps_nama_kecamatan'], 'data_kecamatan', 'bps_nama_kecamatan'),
          (['nama_provinsi', 'nama_kabupaten_kota'], 'data_kabupaten', 'nama_kabupaten_kota'),
          # (['nama_provinsi'], 'data_provinsi', 'nama_provinsi')
      ]

      for keys, wilayah, idx in opsi_wilayah:
          if all(k in kolom for k in keys):
              level_wilayah = wilayah
              index_col = idx
              break
      else:
          level_wilayah = None
          index_col = None

      primary_key = "id_index"
      tanggal = datetime.now().date()

      # Engine SQLAlchemy untuk df.to_sql
      db_user = os.getenv("DB_USER", "postgres")
      db_password = os.getenv("DB_PASSWORD", "2lNyRKW3oc9kan8n")
      db_host = os.getenv("DB_HOST", "103.183.92.158")
      db_port = os.getenv("DB_PORT", "5432")
      db_bigdata_cleaned = os.getenv("DB_BIGDATA_CLEANED", "result_cleansing")

      engine = create_engine(f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_bigdata_cleaned}')

      # Buat schema dan simpan metadata menggunakan psycopg2
      with psycopg2.connect(
          dbname=db_bigdata_cleaned,
          user=db_user,
          password=db_password,
          host=db_host,
          port=db_port
      ) as conn:
          with conn.cursor() as cur:
              # Buat schema target
              cur.execute(f"CREATE SCHEMA IF NOT EXISTS {schemas};")
              conn.commit()

              # Buat schema dan tabel masterdata jika belum ada
              cur.execute("""
                  CREATE SCHEMA IF NOT EXISTS masterdata;
                  CREATE TABLE IF NOT EXISTS masterdata.master_jenis_data (
                      id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
                      nama_schema TEXT,
                      nama_table TEXT,
                      jenis_data TEXT,
                      modified_date DATE
                  );
              """)

              # Cek apakah metadata sudah ada
              cur.execute("""
                  SELECT id FROM masterdata.master_jenis_data
                  WHERE nama_schema = %s AND nama_table = %s;
              """, (schemas, tables))
              result = cur.fetchone()

              # Deteksi jenis data
              cols = df.columns.str.lower()
              data_type = 'agregat' if 'kategori' in cols and 'jumlah' in cols else 'transaksi'

              if result:
                  cur.execute("""
                      UPDATE masterdata.master_jenis_data
                      SET jenis_data = %s, modified_date = %s
                      WHERE id = %s;
                  """, (data_type, tanggal, result[0]))
                  # print(f"üîÅ Metadata diperbarui: id={result[0]}")
              else:
                  cur.execute("""
                      INSERT INTO masterdata.master_jenis_data
                      (nama_schema, nama_table, jenis_data, modified_date)
                      VALUES (%s, %s, %s, %s);
                  """, (schemas, tables, data_type, tanggal))
                  # print(f"‚úÖ Metadata baru disimpan untuk {schemas}.{tables}")

              conn.commit()

      # Simpan data utama ke tabel
      df.to_sql(
          tables,
          engine,
          schema=schemas,
          if_exists='replace',
          index=False
      )
      # print(f"‚úÖ Data berhasil disimpan ke {schemas}.{tables}.")

      # Tambahkan PRIMARY KEY
      parts = tables.split('_')
      for i in range(3, len(parts) + 1):
          pk_candidate = '_'.join(parts[:i])
          pk_constraint = f"{pk_candidate}_pkey"
          with engine.connect() as con:
              existing = con.execute(text("""
                  SELECT 1 FROM information_schema.table_constraints
                  WHERE constraint_schema = :schema AND constraint_name = :name;
              """), {"schema": schemas, "name": pk_constraint}).fetchone()

              if not existing:
                  try:
                      con.execute(text(f"""
                          ALTER TABLE {schemas}.{tables}
                          ADD CONSTRAINT {pk_constraint} PRIMARY KEY ({primary_key});
                      """))
                      con.commit()
                      break
                  except Exception:
                      break

      # Tambahkan index
      if index_col:
          with engine.connect() as con:
              con.execute(text(f"""
                  CREATE INDEX IF NOT EXISTS idx_{tables}_{index_col}
                  ON {schemas}.{tables} ({index_col});
              """))
              con.commit()

      return build_success_response(f"Data berhasil disimpan ke {schemas}.{tables}")

    except Exception as e:
      return build_error_response("Gagal menyimpan data.", exception=e, code="SAVE_ERROR")

def main(schemas, tables, data_type):
    try:
        # 1. Koneksi Big Data
        result = connect_bigdata(schemas, tables)
        if result['status'] == 'error':
            return result

        elif result['status'] == 'success':
            # 2. Preprocessing
            result = preprocessing(result['data'])

            if result['status'] == 'error':
                return result

            elif result['status'] == 'success':
              # 3. Periode Update
              result = periode_update(result['data'])
              df = result['data']

              if result['status'] == 'error':
                return result

              elif result['status'] == 'success':
                # 4. Deteksi Wilayah
                result = deteksi_wilayah(result['data'])
                # display(result)
                status, col_status = result['data']
                # display(status, col_status)

                # 5. Connect Masterdata
                masterdata = conn_masterdata(status)
                if masterdata['data'] is not None:
                   masterdata = masterdata['data']

                if result['status'] == 'error':
                  return result

                elif result['status'] == 'success':
                  # 6.1 Data Kabupaten
                  if status == 'data_kabupaten':
                    df[col_status['nama_kabupaten_kota']] = df[col_status['nama_kabupaten_kota']].apply(format_kabupaten_kota)
                    result = merge_df(
                        df,
                        masterdata[['kode_kabupaten_kota', 'nama_kabupaten_kota']],
                        col_status['nama_kabupaten_kota'],
                        'nama_kabupaten_kota'
                    )

                  # 6.2 Data Kecamatan
                  elif status == 'data_kecamatan':
                    # 6.2.1 Data Kecamatan Memiliki Kolom Kabupaten
                    if col_status['nama_kabupaten_kota'] in ['nama_kabupaten', 'kabupaten', 'kabupaten_kota', 'nama_kabupaten_kota']:
                      df[col_status['nama_kabupaten_kota']] = df[col_status['nama_kabupaten_kota']].apply(format_kabupaten_kota)
                      df[col_status['bps_nama_kecamatan']] = df[col_status['bps_nama_kecamatan']].apply(format_kecamatan)
                      result = merge_df(
                          df,
                          masterdata[[
                              'kode_kabupaten_kota',
                              'nama_kabupaten_kota',
                              'bps_kode_kecamatan',
                              'bps_nama_kecamatan',
                              'kemendagri_kode_kecamatan',
                              'kemendagri_nama_kecamatan'
                          ]],
                          [col_status['nama_kabupaten_kota'], col_status['bps_nama_kecamatan']],
                          ['nama_kabupaten_kota', 'bps_nama_kecamatan']
                      )

                    else:
                      try:
                        df[col_status['bps_nama_kecamatan']] = df[col_status['bps_nama_kecamatan']].apply(format_kecamatan)

                        # 6.2.2 Data Kecamatan Tidak Memiliki Kolom Kabupaten pada Schema Kabupaten
                        if 'kabupaten' in schemas.lower():
                            split = schemas.upper().split('_')
                            if len(split) == 2:
                              df['nama_kabupaten_kota'] = f'{split[0]} {split[1]}'
                              result = merge_df(
                              df,
                              masterdata[[
                                  'kode_kabupaten_kota',
                                  'nama_kabupaten_kota',
                                  'bps_kode_kecamatan',
                                  'bps_nama_kecamatan',
                                  'kemendagri_kode_kecamatan',
                                  'kemendagri_nama_kecamatan'
                              ]],
                              ['nama_kabupaten_kota', col_status['bps_nama_kecamatan']],
                              ['nama_kabupaten_kota', 'bps_nama_kecamatan'])

                            else:
                              raise ValueError("Melakukan Merge Kecamatan tanpa Kabupaten berisiko duplikasi dengan kecamatan di kabupaten lain")

                        else:
                          # 6.2.3 Data Kecamatan Tidak Memiliki Kolom Kabupaten
                          result = merge_df(
                              df,
                              masterdata[[
                                  'kode_kabupaten_kota',
                                  'nama_kabupaten_kota',
                                  'bps_kode_kecamatan',
                                  'bps_nama_kecamatan',
                                  'kemendagri_kode_kecamatan',
                                  'kemendagri_nama_kecamatan'
                              ]],
                              col_status['bps_nama_kecamatan'],
                              'bps_nama_kecamatan')

                      except Exception as e:
                        return build_error_response('Gagal Merge Kecamatan.', code = 'MERGE_KECAMATAN')

                  # 6.3 Data Kelurahan
                  elif status == 'data_kelurahan':
                      return build_error_response('Proses Data Kelurahan Under Development', code = 'MERGE_KELURAHAN')

                  elif status == 'data_provinsi':
                    result = build_success_response(df)

                  else:
                    return build_error_response('Merge Data dengan Status Wilayah', code = 'MERGE_WILAYAH_FAILED')

                  if result['status'] == 'error':
                    return result

                  elif result['status'] == 'success':
                    if data_type.lower() == 'agregat':
                      # 7.1 Data Agregat
                      result = transpose_data(result['data'], 'agregat')

                    elif data_type.lower() == 'transaksi':
                      # 7.2 Data Transaksi
                      result = transpose_data(result['data'], 'transaksi')

                    if result['status'] == 'error':
                      return result

                    elif result['status'] == 'success':
                      # 8. Data Result
                      data = result['data']
                      json_query = data.to_json(orient='records')
                      return build_success_response(json_query)
    except Exception as e:
        return build_error_response("Terjadi Kesalahan Umum dalam Cleansing.", exception=e, code="MAIN_ERROR")