<a href="https://colab.research.google.com/github/pthengtr/kcw-analytics/blob/main/notebooks/01_supabase_upload.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.11-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.11-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (4.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.2/4.2 MB[0m [31m37.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.11


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
import os
import pandas as pd

folder = "/content/drive/MyDrive/kcw_analytics/01_raw"

data = {}

for file in os.listdir(folder):
    if file.endswith(".csv"):
        path = os.path.join(folder, file)
        data[file] = pd.read_csv(
            path,
            dtype={
              "BCODE": "string",
              "ITEMNO": "string",
              "BILLNO": "string",
            },
            encoding="utf-8-sig",
            low_memory=False   # stops chunk guessing
        )
        print(f"Loaded: {file} -> {data[file].shape}")

Loaded: raw_inventory_hq_2024.csv -> (4983, 8)
Loaded: raw_syp_pimas_purchase_bills.csv -> (2881, 49)
Loaded: raw_syp_pidet_purchase_lines.csv -> (26919, 41)
Loaded: raw_syp_sidet_sales_lines.csv -> (35191, 38)
Loaded: raw_syp_simas_sales_bills.csv -> (11923, 49)
Loaded: raw_hq_pimas_purchase_bills.csv -> (83416, 49)
Loaded: raw_hq_pidet_purchase_lines.csv -> (248835, 41)
Loaded: raw_hq_icmas_products.csv -> (114874, 94)
Loaded: raw_hq_sidet_sales_lines.csv -> (1200038, 38)
Loaded: raw_hq_simas_sales_bills.csv -> (486118, 49)


In [5]:
import pandas as pd

def filter_last_year_from_latest(
    df: pd.DataFrame,
    date_col: str = "BILLDATE",
    *,
    years: int = 1,
    keep_invalid: bool = False,
    inplace: bool = False,
) -> pd.DataFrame:
    """
    Keep rows where `date_col` is within `years` years back from the latest date in that column.

    - Parses dates with pd.to_datetime(errors="coerce")
    - If keep_invalid=False (default): drops rows where date_col can't be parsed.
    - If keep_invalid=True: keeps invalid-date rows (NaT) in the output.

    Returns a filtered copy unless inplace=True (then returns the same df reference).
    """
    if date_col not in df.columns:
        raise KeyError(f"Column not found: {date_col}")

    out = df if inplace else df.copy()

    # Parse to datetime safely
    dt = pd.to_datetime(out[date_col], errors="coerce")

    latest = dt.max()
    if pd.isna(latest):
        # No valid dates at all
        return out if keep_invalid else out.iloc[0:0].copy()

    cutoff = latest - pd.DateOffset(years=years)

    mask_recent = dt >= cutoff
    if keep_invalid:
        mask = mask_recent | dt.isna()
    else:
        mask = mask_recent

    return out.loc[mask].copy()


# Example usage:
# df_1y = filter_last_year_from_latest(df, "BILLDATE")
# df_1y = filter_last_year_from_latest(df_pidet, "JOURDATE")

In [14]:
BRANCH = 'hq'

In [20]:
df_sidet = data[f'raw_{BRANCH}_sidet_sales_lines.csv']
df_simas = data[f'raw_{BRANCH}_simas_sales_bills.csv']
df_pidet = data[f'raw_{BRANCH}_pidet_purchase_lines.csv']
df_pimas = data[f'raw_{BRANCH}_pimas_purchase_bills.csv']
df_icmas = data[f'raw_hq_icmas_products.csv']

df_sidet = filter_last_year_from_latest(df_sidet, "BILLDATE")
df_simas = filter_last_year_from_latest(df_simas, "BILLDATE")
df_pidet = filter_last_year_from_latest(df_pidet, "BILLDATE")
df_pimas = filter_last_year_from_latest(df_pimas, "BILLDATE")

print(len(df_sidet))
print(len(df_simas))
print(len(df_pidet))
print(len(df_pimas))
print(len(df_icmas))


193201
61913
39081
10846
114874


In [16]:

import psycopg2

conn = psycopg2.connect(
    host="aws-0-ap-southeast-1.pooler.supabase.com",   # paste yours here
    port=5432,
    dbname="postgres",
    user="postgres.jdzitzsucntqbjvwiwxm",        # note the dot + project ref
    password="3h3aAixsyK4X762r",
    sslmode="require",
)
print("Connected via pooler ✅")


Connected via pooler ✅


In [17]:
import io
import csv
import pandas as pd
import psycopg2


def _qident(name: str) -> str:
    """Quote an identifier safely for Postgres."""
    return '"' + name.replace('"', '""') + '"'


def _get_table_columns(conn, schema: str, table: str) -> list[str]:
    sql = """
    SELECT column_name
    FROM information_schema.columns
    WHERE table_schema = %s AND table_name = %s
    ORDER BY ordinal_position
    """
    with conn.cursor() as cur:
        cur.execute(sql, (schema, table))
        rows = cur.fetchall()
    return [r[0] for r in rows]


def bulk_upsert_via_stage(
    conn,
    df: pd.DataFrame,
    *,
    target_table: str,
    schema: str = "public",
    stage_suffix: str = "_stage",
    pk_cols: list[str] = None,
    truncate_stage: bool = True,
) -> dict:
    """
    Bulk upsert df into target_table by:
      TRUNCATE stage
      COPY df -> stage
      INSERT target SELECT stage ON CONFLICT(pk) DO UPDATE
    Returns basic stats: rows_in_df, target_table, stage_table.
    """
    if pk_cols is None:
        pk_cols = ["ID"]

    if df is None or len(df) == 0:
        return {"rows_in_df": 0, "target_table": target_table, "stage_table": f"{target_table}{stage_suffix}"}

    stage_table = f"{target_table}{stage_suffix}"

    # Fetch canonical column order from the DB (target table)
    target_cols = _get_table_columns(conn, schema, target_table)
    if not target_cols:
        raise ValueError(f"Target table not found: {schema}.{target_table}")

    stage_cols = _get_table_columns(conn, schema, stage_table)
    if not stage_cols:
        raise ValueError(f"Stage table not found: {schema}.{stage_table}")

    if target_cols != stage_cols:
        raise ValueError(
            f"Target/stage column mismatch.\n"
            f"Target({len(target_cols)}): {target_cols}\n"
            f"Stage({len(stage_cols)}):  {stage_cols}"
        )

    missing = [c for c in target_cols if c not in df.columns]
    extra = [c for c in df.columns if c not in target_cols]
    if missing:
        raise ValueError(f"DF is missing columns required by table: {missing}")
    if extra:
        # Not fatal, but usually indicates you loaded the wrong dataframe
        print(f"Warning: DF has extra columns not in table (they will be ignored): {extra}")

    # Reorder and drop extras; keep exactly table columns
    df2 = df[target_cols].copy()

    # Convert NaN -> None so COPY writes empty fields
    df2 = df2.where(pd.notnull(df2), None)

    fq_target = f"{_qident(schema)}.{_qident(target_table)}"
    fq_stage = f"{_qident(schema)}.{_qident(stage_table)}"

    cols_sql = ", ".join(_qident(c) for c in target_cols)
    pk_sql = ", ".join(_qident(c) for c in pk_cols)

    # Build SET clause excluding PK columns
    non_pk_cols = [c for c in target_cols if c not in set(pk_cols)]
    if not non_pk_cols:
        raise ValueError("No non-PK columns to update. Check pk_cols.")

    set_sql = ", ".join(f"{_qident(c)} = EXCLUDED.{_qident(c)}" for c in non_pk_cols)

    merge_sql = f"""
    INSERT INTO {fq_target} ({cols_sql})
    SELECT {cols_sql}
    FROM {fq_stage}
    ON CONFLICT ({pk_sql}) DO UPDATE
    SET {set_sql}
    """

    # Write DF to CSV buffer (no header), then COPY
    buf = io.StringIO()
    writer = csv.writer(buf, quoting=csv.QUOTE_MINIMAL, lineterminator="\n")
    for row in df2.itertuples(index=False, name=None):
        writer.writerow(["" if v is None else v for v in row])
    buf.seek(0)

    try:
        with conn.cursor() as cur:
            if truncate_stage:
                cur.execute(f"TRUNCATE TABLE {fq_stage};")

            cur.copy_expert(
                f"COPY {fq_stage} ({cols_sql}) FROM STDIN WITH (FORMAT CSV)",
                buf
            )

            cur.execute(merge_sql)

        conn.commit()
        return {
            "rows_in_df": len(df2),
            "target_table": f"{schema}.{target_table}",
            "stage_table": f"{schema}.{stage_table}",
        }

    except Exception:
        conn.rollback()
        raise

In [26]:
# Example: upsert product master
result = bulk_upsert_via_stage(
    conn,
    df_sidet,
    target_table=f"raw_{BRANCH}_sidet_lines",
    pk_cols=["ID"],
)

print(result)

{'rows_in_df': 193201, 'target_table': 'public.raw_hq_sidet_lines', 'stage_table': 'public.raw_hq_sidet_lines_stage'}


In [25]:
# Example: upsert product master
result = bulk_upsert_via_stage(
    conn,
    df_pidet,
    target_table=f"raw_{BRANCH}_pidet_lines",
    pk_cols=["ID"],
)

print(result)

{'rows_in_df': 39081, 'target_table': 'public.raw_hq_pidet_lines', 'stage_table': 'public.raw_hq_pidet_lines_stage'}


In [27]:
# Example: upsert product master
result = bulk_upsert_via_stage(
    conn,
    df_simas,
    target_table=f"raw_{BRANCH}_simas_bills",
    pk_cols=["ID"],
)

print(result)

{'rows_in_df': 61913, 'target_table': 'public.raw_hq_simas_bills', 'stage_table': 'public.raw_hq_simas_bills_stage'}


In [21]:
# Example: upsert product master
result = bulk_upsert_via_stage(
    conn,
    df_pimas,
    target_table=f"raw_{BRANCH}_pimas_bills",
    pk_cols=["ID"],
)

print(result)

{'rows_in_df': 10846, 'target_table': 'public.raw_hq_pimas_bills', 'stage_table': 'public.raw_hq_pimas_bills_stage'}


In [18]:
# Example: upsert product master
result = bulk_upsert_via_stage(
    conn,
    df_icmas,
    target_table=f"raw_{BRANCH}_icmas_products",
    pk_cols=["ID"],
)

print(result)

{'rows_in_df': 114874, 'target_table': 'public.raw_hq_icmas_products', 'stage_table': 'public.raw_hq_icmas_products_stage'}
