In [1]:
from typing import Optional, List, Dict, Tuple, Any
import pdfplumber
import pandas as pd
import re

# === CONFIGURATION ===
# Make sure you have a config.py in the same directory defining:
#   PDF_PATH: str
#   PASSWORD: Optional[str]
#   CROP_REGIONS: Dict[str, Tuple[float, float, float, float]]
#   X_BOUNDS: Dict[str, float]
#   tolerance_settings: Dict[str, float]
#   TABLE_SETTINGS: Dict[str, Any]
#   DATE_PATTERN: re.Pattern
#   TIME_PATTERN: re.Pattern
#   MONEY_PATTERN: re.Pattern
#   PAGE_ID_PATTERN: re.Pattern
#   NUMERIC_CLEAN_PATTERN: re.Pattern
#
# (See the comments in earlier messages for example stubs—adjust them to match your PDF’s actual layout.)

from config import (
    PDF_PATH,
    PASSWORD,
    CROP_REGIONS,
    X_BOUNDS,
    tolerance_settings,
    TABLE_SETTINGS, 
    DATE_PATTERN,
    TIME_PATTERN,
    MONEY_PATTERN,
    PAGE_ID_PATTERN,
    NUMERIC_CLEAN_PATTERN
)


class KBankExtractor:
    """
    Class to extract header and transaction information from KBank PDF statements.
    You now call `extract_from_pages(pages)` where `pages` is a List[pdfplumber.page.Page].
    """

    def __init__(
        self,
        pdf_path: str = PDF_PATH,
        password: Optional[str] = PASSWORD,
        table_settings: Dict[str, Any] = TABLE_SETTINGS
    ):
        self.pdf_path = pdf_path
        self.password = password
        self.table_settings = table_settings

    def _compute_date_tops(self, words: List[dict]) -> List[float]:
        """
        Identify y-coordinates ("top") of all words matching the DATE_PATTERN within X_BOUNDS.
        Returns a sorted list of those y-coordinates.
        """
        return sorted(
            w["top"] for w in words
            if DATE_PATTERN.match(w["text"]) and X_BOUNDS["date_min"] <= w["x0"] <= X_BOUNDS["date_max"]
        )

    def _compute_intervals(self, date_tops: List[float]) -> List[Tuple[float, float]]:
        """
        Given a sorted list of y-coordinates for dates, compute vertical intervals around each date.
        These intervals define “rows” in the table.
        """
        intervals: List[Tuple[float, float]] = []
        for index, top in enumerate(date_tops):
            start = top - tolerance_settings["y_margin"]
            if index + 1 < len(date_tops):
                end = date_tops[index + 1] - tolerance_settings["y_margin"]
            else:
                if index > 0:
                    delta = top - date_tops[index - 1]
                else:
                    delta = tolerance_settings["y_margin"] * 2
                end = top + delta - tolerance_settings["y_margin"]
            intervals.append((start, end))
        return intervals

    def _assign_rows(self, words: List[dict], intervals: List[Tuple[float, float]]) -> List[List[dict]]:
        """
        Assign each word to the first interval (row) whose vertical range contains the word’s top.
        Returns a list of lists, where each inner list contains the words belonging to that row.
        """
        rows_per_interval: List[List[dict]] = [[] for _ in intervals]
        for word in words:
            for interval_index, (start, end) in enumerate(intervals):
                if start <= word["top"] < end:
                    rows_per_interval[interval_index].append(word)
                    break
        return rows_per_interval

    def _extract_header(self, page) -> Dict[str, Optional[str]]:
        """
        Crop out header fields (account name, account number, period, totals, etc.) using CROP_REGIONS.
        If the "account_name" box contains two lines, the first line becomes account_name
        and the second line becomes address. Numeric totals are cleaned via NUMERIC_CLEAN_PATTERN.
        """
        header_data: Dict[str, Optional[str]] = {}

        for field, bbox in CROP_REGIONS.items():
            raw_text = page.crop(bbox).extract_text() or ""
            text_content = raw_text.strip()

            # Special handling: if account_name crop has two lines, split into name + address
            if field == "account_name":
                lines = text_content.splitlines()
                header_data["account_name"] = lines[0].strip() if lines else ""
                if len(lines) > 1:
                    header_data["address"] = lines[1].strip()
                else:
                    header_data.setdefault("address", "")
                continue

            # If a separate "address" crop exists, skip only if we've already set it above
            if field == "address":
                if "address" in header_data and header_data["address"]:
                    continue
                # otherwise fall through and extract normally

            # Clean numeric totals
            if field in {
                "total_withdrawal_transaction",
                "total_deposit_transaction",
                "total_withdrawal",
                "total_deposit",
                "ending_balance"
            }:
                match = NUMERIC_CLEAN_PATTERN.search(text_content)
                header_data[field] = match.group().replace(",", "") if match else None
            else:
                header_data[field] = text_content

        return header_data

    def _parse_page(self, page) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
        """
        Parse a single pdfplumber Page object:
          1) Extract header info (_extract_header)
          2) Determine page_id via PAGE_ID_PATTERN
          3) If “ENDING BALANCE” or “ยอดยกไป” not found, null out totals
          4) Find all table regions (or entire page if no table), group words into rows,
             and convert each row to a transaction dict via _parse_transaction_row.
        Returns:
            - header_information: dict of header fields + page_id
            - transaction_records: list of dicts (one per valid transaction row)
        """
        # 1) Extract header
        header_information = self._extract_header(page)

        # 2) Clean page_id
        raw_page_id_value = header_information.get("page", "")
        page_id_match = PAGE_ID_PATTERN.search(raw_page_id_value)
        page_identifier = page_id_match.group(1) if page_id_match else raw_page_id_value
        header_information["page_id"] = page_identifier

        # 3) If no “ENDING BALANCE” or “ยอดยกไป” in the text, null out totals
        full_text = page.extract_text() or ""
        if not re.search(r"(ยอดยกไป|ENDING BALANCE)", full_text, re.IGNORECASE):
            for fld in [
                "total_withdrawal_transaction",
                "total_deposit_transaction",
                "total_withdrawal",
                "total_deposit",
                "ending_balance"
            ]:
                header_information[fld] = None

        # 4) Extract transaction rows
        transaction_records: List[Dict[str, Any]] = []
        tables_found = page.find_tables(self.table_settings)
        regions_to_parse = [page.crop(tbl.bbox) for tbl in tables_found] if tables_found else [page]

        for region in regions_to_parse:
            words_in_region = region.extract_words(use_text_flow=True)
            date_positions = self._compute_date_tops(words_in_region)
            if not date_positions:
                continue

            row_intervals = self._compute_intervals(date_positions)
            rows_of_words = self._assign_rows(words_in_region, row_intervals)

            for words_in_row in rows_of_words:
                if not words_in_row:
                    continue
                combined_text = " ".join(w["text"] for w in words_in_row)
                if "ENDING BALANCE" in combined_text or "ยอดยกไป" in combined_text:
                    continue
                sorted_words = sorted(words_in_row, key=lambda w: (w["top"], w["x0"]))
                record = self._parse_transaction_row(
                    sorted_words,
                    page_identifier,
                    header_information.get("address", "")
                )
                if record:
                    transaction_records.append(record)

        return header_information, transaction_records

    def _parse_transaction_row(
        self,
        sorted_words: List[dict],
        page_identifier: str,
        account_address: str
    ) -> Optional[Dict[str, Any]]:
        """
        Given a sorted list of words for one row, assigns each word to:
          - date (DATE_PATTERN)
          - time (TIME_PATTERN)
          - withdrawal, deposit, or balance (MONEY_PATTERN + x-position logic)
          - description_addon (if x in between date_max and amount_desc_split)
          - channel (if x in between amount_desc_split and channel_details_split)
          - detail description (everything else)
        Returns a dict:
            {
                "page_id", "address", "date", "time", "description",
                "withdrawal", "deposit", "balance", "channel", "description_addon"
            }
        If no date is found, returns None.
        """
        date_value = ""
        time_value = ""
        debit_amount = None
        credit_amount = None
        balance_amount = None
        description_text_parts: List[str] = []
        channel_text_parts: List[str] = []
        detail_text_parts: List[str] = []

        for word in sorted_words:
            x_position, text_value,x1_position = word["x0"], word["text"],word['x1']
            # 1) Date
            if DATE_PATTERN.match(text_value) and X_BOUNDS["date_min"] <= x_position <= X_BOUNDS["date_max"]:
                date_value = text_value

            # 2) Time
            elif TIME_PATTERN.match(text_value):
                time_value = text_value

            # 3) Money (withdrawal / deposit / balance)
            elif MONEY_PATTERN.match(text_value):
                numeric_value = float(text_value.replace(",", ""))
                if (
                    x_position <= X_BOUNDS["withdraw_deposit_split"] + tolerance_settings["x_tolerance"]
                    and
                    x1_position <= X_BOUNDS["withdraw_deposit_split_x1"] + tolerance_settings["x_tolerance"]
                ):
                    debit_amount = numeric_value

                elif x_position <= X_BOUNDS["amount_balance_split"] + tolerance_settings["x_tolerance"]:
                    credit_amount = numeric_value
                else:
                    balance_amount = numeric_value

            # 4) description_addon if x just right of date_max but <= amount_desc_split
            elif (x_position > X_BOUNDS["date_max"] + tolerance_settings["x_tolerance"]
                  and x_position <= X_BOUNDS["amount_desc_split"]):
                description_text_parts.append(text_value)

            # 5) channel if x just right of amount_desc_split but <= channel_details_split
            elif (x_position > X_BOUNDS["amount_desc_split"] + tolerance_settings["x_tolerance"]
                  and x_position <= X_BOUNDS["channel_details_split"]):
                channel_text_parts.append(text_value)

            # 6) everything else → detail description
            else:
                detail_text_parts.append(text_value)

        if not date_value:
            return None

        return {
            "page_id": page_identifier,
            "address": account_address,
            "date": pd.to_datetime(date_value, format="%d-%m-%y", errors="coerce"),
            "time": time_value,
            "description": " ".join(detail_text_parts).strip(),
            "withdrawal": debit_amount,
            "deposit": credit_amount,
            "balance": balance_amount,
            "channel": " ".join(channel_text_parts).strip(),
            "description_addon": " ".join(description_text_parts).strip()
        }

    def extract_from_pages(
        self,
        pages: List[pdfplumber.page.Page]
    ) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Parse a list of pdfplumber Page objects (opened externally).
        Returns two DataFrames:
          - transactions_dataframe
          - headers_dataframe

        It does _not_ open any PDF internally; you must pass in `pages`.
        """
        headers_list: List[Dict[str, Any]] = []
        transactions_list: List[Dict[str, Any]] = []

        for page_index, page in enumerate(pages):
            try:
                header_info, transactions = self._parse_page(page)
                headers_list.append(header_info)
                transactions_list.extend(transactions)
            except Exception as error:
                print(f"⚠️ Error on page {page_index + 1}: {error}")

        transactions_dataframe = pd.DataFrame(transactions_list).copy()
        headers_dataframe = pd.DataFrame(headers_list).copy()

        if not transactions_dataframe.empty:
            # 1) Sum total withdrawal & deposit per page
            totals_dataframe = (
                transactions_dataframe
                .groupby("page_id")[["withdrawal", "deposit"]]
                .sum()
                .rename(
                    columns={
                        "withdrawal": "total_withdrawal_each_page",
                        "deposit": "total_deposit_each_page"
                    }
                )
                .reset_index()
            )

            # 2) Count transactions (non-null withdrawal vs deposit) per page
            counts_dataframe = (
                transactions_dataframe
                .groupby("page_id")
                .agg(
                    total_withdrawal_transaction_each_page=pd.NamedAgg(
                        column="withdrawal", aggfunc=lambda x: x.notnull().sum()
                    ),
                    total_deposit_transaction_each_page=pd.NamedAgg(
                        column="deposit", aggfunc=lambda x: x.notnull().sum()
                    )
                )
                .reset_index()
            )

            # 3) Merge back into transactions & headers
            transactions_dataframe = (
                transactions_dataframe
                .merge(totals_dataframe, on="page_id")
                .merge(counts_dataframe, on="page_id")
            )
            headers_dataframe = (
                headers_dataframe
                .merge(totals_dataframe, on="page_id", how="left")
                .merge(counts_dataframe, on="page_id", how="left")
            )

        # 4) Split "period" into start_period / end_period if present
        if "period" in headers_dataframe.columns:
            period_parts = (
                headers_dataframe["period"]
                .str.replace(" ", "", regex=False)
                .str.split(r"[-–]", n=1, expand=True)
            )
            headers_dataframe["start_period"] = pd.to_datetime(
                period_parts[0], dayfirst=True, errors="coerce"
            )
            if period_parts.shape[1] > 1:
                headers_dataframe["end_period"] = pd.to_datetime(
                    period_parts[1], dayfirst=True, errors="coerce"
                )
            else:
                headers_dataframe["end_period"] = None

        # 5) Convert numeric header columns to numeric dtype
        numeric_header_columns = [
            "total_withdrawal",
            "total_deposit",
            "total_withdrawal_transaction",
            "total_deposit_transaction"
        ]
        for col in numeric_header_columns:
            if col in headers_dataframe.columns:
                headers_dataframe[col] = pd.to_numeric(
                    headers_dataframe[col], errors="coerce"
                )

        return transactions_dataframe, headers_dataframe

    def clean_and_format_data(
        self,
        transactions_dataframe: pd.DataFrame,
        headers_dataframe: pd.DataFrame
    ) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Take the raw DataFrames from extract_from_pages() and:
          1) Filter out rows without description or amount
          2) Select & rename columns to a standardized schema (withdrawal→debit, deposit→credit)
          3) Replace NaNs with empty strings
          4) From headers, select & rename total_* → total_debit / total_credit, etc.
          5) Drop header rows whose page_id doesn’t start with a digit
          6) In transactions, add default “code” and “transaction_type” columns
          7) Convert numeric columns (debit, credit, balance, header totals) to float
        Returns:
            (final_transactions_df, final_headers_df)
        """
        # 1) Filter invalid transaction rows
        filtered_transactions = (
            transactions_dataframe[
                ~(transactions_dataframe["description"].isnull())
                & (
                    (~transactions_dataframe["withdrawal"].isnull())
                    | (~transactions_dataframe["deposit"].isnull())
                )
            ]
            .copy()
        )

        # 2) Select & rename columns for transactions
        selected_transactions = filtered_transactions[
            [
                "page_id", "date", "time", "description", "withdrawal",
                "deposit", "balance", "channel", "description_addon"
            ]
        ].copy()
        renamed_transactions = selected_transactions.rename(
            columns={
                "withdrawal": "debit",
                "deposit": "credit"
            }
        )
        renamed_transactions.fillna("", inplace=True)

        # 3) Prepare & clean headers
        cols_to_select = [
            "page_id", "account_name", "account_number", "period",
            "total_withdrawal", "total_deposit",
            "total_withdrawal_transaction", "total_deposit_transaction",
            "address"
        ]
        available_cols = [c for c in cols_to_select if c in headers_dataframe.columns]
        selected_headers = headers_dataframe[available_cols].copy()
        renamed_headers = selected_headers.rename(
            columns={
                "total_withdrawal": "total_debit",
                "total_deposit": "total_credit",
                "total_withdrawal_transaction": "total_debit_transaction",
                "total_deposit_transaction": "total_credit_transaction"
            }
        )
        filtered_headers = (
            renamed_headers[
                renamed_headers["page_id"].str.match(r"^\d", na=False)
            ]
            .reset_index(drop=True)
        )
        filtered_headers.fillna("", inplace=True)

        # 4) Enrich transaction DataFrame
        enriched_transactions = renamed_transactions.copy()
        enriched_transactions["code"] = None
        enriched_transactions["transaction_type"] = enriched_transactions["description_addon"]
        final_transactions = enriched_transactions[
            [
                "page_id", "date", "time", "code", "channel",
                "debit", "credit", "balance", "description", "transaction_type"
            ]
        ].copy()

        # 5) Clean numeric columns in transactions
        for column_name in ["debit", "credit", "balance"]:
            final_transactions[column_name] = (
                final_transactions[column_name]
                .replace(r"[^0-9\.]+", "", regex=True)
                .pipe(pd.to_numeric, errors="coerce")
            )

        # 6) Clean numeric columns in headers
        for column_name in [
            "total_debit", "total_credit",
            "total_debit_transaction", "total_credit_transaction"
        ]:
            if column_name in filtered_headers.columns:
                filtered_headers[column_name] = (
                    filtered_headers[column_name]
                    .replace(r"[^0-9\.]+", "", regex=True)
                    .pipe(pd.to_numeric, errors="coerce")
                )

        return final_transactions, filtered_headers


if __name__ == "__main__":
    # Example usage:
    # 1) Ensure config.py is set up with correct paths & regexes.
    # 2) Open the PDF externally, then pass `pages` into extract_from_pages().

    import pdfplumber
    from config import TABLE_SETTINGS

    with pdfplumber.open("/Users/if658228/Downloads/OneDrive_1_5-20-2025/agentic_extraction/Dataset04/KBANK/108988-02016347-2566_1_KBANK.pdf", password=PASSWORD) as pdf:
        pages = pdf.pages

        # Instantiate the extractor (table_settings is mandatory, pdf_path is no longer used here)
        extractor = KBankExtractor(table_settings=TABLE_SETTINGS)

        # Parse all pages by passing `pages` into extract_from_pages:
        raw_tx_df, raw_hdr_df = extractor.extract_from_pages(pages)

        # Optionally clean & format:
        clean_tx_df, clean_hdr_df = extractor.clean_and_format_data(raw_tx_df, raw_hdr_df)

    # Print a bit of the cleaned result:
    print("=== Cleaned Transactions ===")
    print(clean_tx_df.head().to_string(index=False))
    print("\n=== Cleaned Headers ===")
    print(clean_hdr_df.head().to_string(index=False))


=== Cleaned Transactions ===
page_id       date  time code                   channel  debit  credit  balance                                                                            description  transaction_type
    1/1 2023-05-02 09:23 None                    K PLUS 8321.0     NaN 14755.64                                                       โอนไป X0047 น.ส. ศุภมาศ สำราญถ++           โอนเงิน
    1/1 2023-05-02 09:31 None โอนเข้า/หักบัญชีอัตโนมัติ  837.0     NaN 13918.64                                   เพื่อชำระ Ref X6982 บริษัท พรอมิส (ประเทศ ไทย) จำกัด หักบัญชีอัตโนมัติ
    1/1 2023-05-02 16:23 None                    K PLUS 1500.0     NaN 12418.64                                                        โอนไป X6181 บจก. เกตเวย์ เอสเ++           โอนเงิน
    1/1 2023-05-02 16:48 None                    K PLUS 3544.0     NaN  8874.64                                     เพื่อชำระ Ref X9611 CITICORP LEASING PERSONAL LOAN          ชำระเงิน
    1/1 2023-05-02 16:59 None           EDC/K 

  renamed_transactions.fillna("", inplace=True)
  filtered_headers.fillna("", inplace=True)


In [4]:
import re
from typing import Optional, List, Dict, Any, Tuple
import pdfplumber
import pandas as pd

import config  # Assumes config.py defines all constants except PDF_PATH and PASSWORD


class KBankExtractor:
    """
    Class to extract header and transaction information from KBank PDF statements.
    All helper methods are @staticmethod. Only `run()` accepts a pdf_path and password.
    """

    def __init__(self, table_settings: Dict[str, Any] = config.TABLE_SETTINGS):
        self.table_settings = table_settings

    @staticmethod
    def _compute_date_tops(words: List[dict]) -> List[float]:
        """
        Identify y-coordinates ("top") of all words matching DATE_PATTERN within X_BOUNDS.
        Returns a sorted list of those y-coordinates.
        """
        return sorted(
            w["top"]
            for w in words
            if config.DATE_PATTERN.match(w["text"])
            and config.X_BOUNDS["date_min"] <= w["x0"] <= config.X_BOUNDS["date_max"]
        )

    @staticmethod
    def _compute_intervals(date_tops: List[float]) -> List[Tuple[float, float]]:
        """
        Given a sorted list of y-coordinates for dates, compute vertical intervals around each date.
        These intervals define “rows” in the table.
        """
        intervals: List[Tuple[float, float]] = []
        for index, top in enumerate(date_tops):
            start = top - config.tolerance_settings["y_margin"]
            if index + 1 < len(date_tops):
                end = date_tops[index + 1] - config.tolerance_settings["y_margin"]
            else:
                if index > 0:
                    delta = top - date_tops[index - 1]
                else:
                    delta = config.tolerance_settings["y_margin"] * 2
                end = top + delta - config.tolerance_settings["y_margin"]
            intervals.append((start, end))
        return intervals

    @staticmethod
    def _assign_rows(words: List[dict], intervals: List[Tuple[float, float]]) -> List[List[dict]]:
        """
        Assign each word to the first interval (row) whose vertical range contains the word’s top.
        Returns a list of lists, where each inner list contains the words belonging to that row.
        """
        rows_per_interval: List[List[dict]] = [[] for _ in intervals]
        for word in words:
            for interval_index, (start, end) in enumerate(intervals):
                if start <= word["top"] < end:
                    rows_per_interval[interval_index].append(word)
                    break
        return rows_per_interval

    @staticmethod
    def _extract_header(page) -> Dict[str, Optional[str]]:
        """
        Crop out header fields (account name, account number, period, totals, etc.) using CROP_REGIONS.
        If the "account_name" box has two lines, the first line → account_name and second → address.
        Numeric totals are cleaned via NUMERIC_CLEAN_PATTERN.
        """
        header_data: Dict[str, Optional[str]] = {}

        for field, bbox in config.CROP_REGIONS.items():
            raw_text = page.crop(bbox).extract_text() or ""
            text_content = raw_text.strip()

            # Special handling: if account_name crop has two lines, split into name + address
            if field == "account_name":
                lines = text_content.splitlines()
                header_data["account_name"] = lines[0].strip() if lines else ""
                if len(lines) > 1:
                    header_data["address"] = lines[1].strip()
                else:
                    header_data.setdefault("address", "")
                continue

            # If a separate "address" crop exists, skip only if already set above
            if field == "address":
                if "address" in header_data and header_data["address"]:
                    continue

            # Clean numeric totals
            if field in {
                "total_withdrawal_transaction",
                "total_deposit_transaction",
                "total_withdrawal",
                "total_deposit",
                "ending_balance"
            }:
                match = config.NUMERIC_CLEAN_PATTERN.search(text_content)
                header_data[field] = match.group().replace(",", "") if match else None
            else:
                header_data[field] = text_content

        return header_data

    @staticmethod
    def _parse_transaction_row(
        sorted_words: List[dict],
        page_identifier: str,
        account_address: str
    ) -> Optional[Dict[str, Any]]:
        """
        Given a sorted list of words for one row, assigns each word to:
          - date (DATE_PATTERN)
          - time (TIME_PATTERN)
          - withdrawal, deposit, or balance (MONEY_PATTERN + x-position logic)
          - description_addon (if x in between date_max and amount_desc_split)
          - channel (if x in between amount_desc_split and channel_details_split)
          - detail description (everything else)
        Returns a dict:
            {
                "page_id", "address", "date", "time", "description",
                "withdrawal", "deposit", "balance", "channel", "description_addon"
            }
        If no date is found, returns None.
        """
        date_value = ""
        time_value = ""
        debit_amount = None
        credit_amount = None
        balance_amount = None
        description_text_parts: List[str] = []
        channel_text_parts: List[str] = []
        detail_text_parts: List[str] = []

        for word in sorted_words:
            x_position, text_value, x1_position = word["x0"], word["text"], word["x1"]

            # 1) Date
            if (
                config.DATE_PATTERN.match(text_value)
                and config.X_BOUNDS["date_min"] <= x_position <= config.X_BOUNDS["date_max"]
            ):
                date_value = text_value

            # 2) Time
            elif config.TIME_PATTERN.match(text_value):
                time_value = text_value

            # 3) Money (withdrawal / deposit / balance)
            elif config.MONEY_PATTERN.match(text_value):
                numeric_value = float(text_value.replace(",", ""))
                if (
                    x_position <= config.X_BOUNDS["withdraw_deposit_split"] + config.tolerance_settings["x_tolerance"]
                    and x1_position <= config.X_BOUNDS["withdraw_deposit_split_x1"] + config.tolerance_settings["x_tolerance"]
                ):
                    debit_amount = numeric_value
                elif x_position <= config.X_BOUNDS["amount_balance_split"] + config.tolerance_settings["x_tolerance"]:
                    credit_amount = numeric_value
                else:
                    balance_amount = numeric_value

            # 4) description_addon if x just right of date_max but ≤ amount_desc_split
            elif (
                x_position > config.X_BOUNDS["date_max"] + config.tolerance_settings["x_tolerance"]
                and x_position <= config.X_BOUNDS["amount_desc_split"]
            ):
                description_text_parts.append(text_value)

            # 5) channel if x just right of amount_desc_split but ≤ channel_details_split
            elif (
                x_position > config.X_BOUNDS["amount_desc_split"] + config.tolerance_settings["x_tolerance"]
                and x_position <= config.X_BOUNDS["channel_details_split"]
            ):
                channel_text_parts.append(text_value)

            # 6) everything else → detail description
            else:
                detail_text_parts.append(text_value)

        if not date_value:
            return None

        return {
            "page_id": page_identifier,
            "address": account_address,
            "date": pd.to_datetime(date_value, format="%d-%m-%y", errors="coerce"),
            "time": time_value,
            "description": " ".join(detail_text_parts).strip(),
            "withdrawal": debit_amount,
            "deposit": credit_amount,
            "balance": balance_amount,
            "channel": " ".join(channel_text_parts).strip(),
            "description_addon": " ".join(description_text_parts).strip()
        }

    @staticmethod
    def _parse_page(page, table_settings: Dict[str, Any]) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
        """
        Parse a single pdfplumber Page object:
          1) Extract header info (_extract_header)
          2) Determine page_id via PAGE_ID_PATTERN
          3) If “ENDING BALANCE” or “ยอดยกไป” not found, null out totals
          4) Find all table regions (or entire page if no table), group words into rows,
             and convert each row to a transaction dict via _parse_transaction_row.
        Returns:
            - header_information: dict of header fields + page_id
            - transaction_records: list of dicts (one per valid transaction row)
        """
        # 1) Extract header
        header_information = KBankExtractor._extract_header(page)

        # 2) Clean page_id
        raw_page_id_value = header_information.get("page", "")
        page_id_match = config.PAGE_ID_PATTERN.search(raw_page_id_value)
        page_identifier = page_id_match.group(1) if page_id_match else raw_page_id_value
        header_information["page_id"] = page_identifier

        # 3) If no “ENDING BALANCE” or “ยอดยกไป” in the text, null out totals
        full_text = page.extract_text() or ""
        if not re.search(r"(ยอดยกไป|ENDING BALANCE)", full_text, re.IGNORECASE):
            for fld in [
                "total_withdrawal_transaction",
                "total_deposit_transaction",
                "total_withdrawal",
                "total_deposit",
                "ending_balance"
            ]:
                header_information[fld] = None

        # 4) Extract transaction rows
        transaction_records: List[Dict[str, Any]] = []
        tables_found = page.find_tables(table_settings)
        regions_to_parse = [page.crop(tbl.bbox) for tbl in tables_found] if tables_found else [page]

        for region in regions_to_parse:
            words_in_region = region.extract_words(use_text_flow=True)
            date_positions = KBankExtractor._compute_date_tops(words_in_region)
            if not date_positions:
                continue

            row_intervals = KBankExtractor._compute_intervals(date_positions)
            rows_of_words = KBankExtractor._assign_rows(words_in_region, row_intervals)

            for words_in_row in rows_of_words:
                if not words_in_row:
                    continue
                combined_text = " ".join(w["text"] for w in words_in_row)
                if "ENDING BALANCE" in combined_text or "ยอดยกไป" in combined_text:
                    continue
                sorted_words = sorted(words_in_row, key=lambda w: (w["top"], w["x0"]))
                record = KBankExtractor._parse_transaction_row(
                    sorted_words,
                    page_identifier,
                    header_information.get("address", "")
                )
                if record:
                    transaction_records.append(record)

        return header_information, transaction_records

    def extract_from_pages(self, pages: List[pdfplumber.page.Page]) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Parse a list of pdfplumber Page objects (opened externally).
        Returns two DataFrames:
          - transactions_dataframe
          - headers_dataframe

        It does _not_ open any PDF internally; you must pass in `pages`.
        """
        headers_list: List[Dict[str, Any]] = []
        transactions_list: List[Dict[str, Any]] = []

        for page_index, page in enumerate(pages):
            try:
                header_info, transactions = KBankExtractor._parse_page(page, self.table_settings)
                headers_list.append(header_info)
                transactions_list.extend(transactions)
            except Exception as error:
                print(f"⚠️ Error on page {page_index + 1}: {error}")

        transactions_dataframe = pd.DataFrame(transactions_list).copy()
        headers_dataframe = pd.DataFrame(headers_list).copy()

        if not transactions_dataframe.empty:
            # 1) Sum total withdrawal & deposit per page
            totals_dataframe = (
                transactions_dataframe
                .groupby("page_id")[["withdrawal", "deposit"]]
                .sum()
                .rename(
                    columns={
                        "withdrawal": "total_withdrawal_each_page",
                        "deposit": "total_deposit_each_page"
                    }
                )
                .reset_index()
            )

            # 2) Count transactions (non-null withdrawal vs deposit) per page
            counts_dataframe = (
                transactions_dataframe
                .groupby("page_id")
                .agg(
                    total_withdrawal_transaction_each_page=pd.NamedAgg(
                        column="withdrawal", aggfunc=lambda x: x.notnull().sum()
                    ),
                    total_deposit_transaction_each_page=pd.NamedAgg(
                        column="deposit", aggfunc=lambda x: x.notnull().sum()
                    )
                )
                .reset_index()
            )

            # 3) Merge back into transactions & headers
            transactions_dataframe = (
                transactions_dataframe
                .merge(totals_dataframe, on="page_id")
                .merge(counts_dataframe, on="page_id")
            )
            headers_dataframe = (
                headers_dataframe
                .merge(totals_dataframe, on="page_id", how="left")
                .merge(counts_dataframe, on="page_id", how="left")
            )

        # 4) Split "period" into start_period / end_period if present
        if "period" in headers_dataframe.columns:
            period_parts = (
                headers_dataframe["period"]
                .str.replace(" ", "", regex=False)
                .str.split(r"[-–]", n=1, expand=True)
            )
            headers_dataframe["start_period"] = pd.to_datetime(
                period_parts[0], dayfirst=True, errors="coerce"
            )
            if period_parts.shape[1] > 1:
                headers_dataframe["end_period"] = pd.to_datetime(
                    period_parts[1], dayfirst=True, errors="coerce"
                )
            else:
                headers_dataframe["end_period"] = None

        # 5) Convert numeric header columns to numeric dtype
        numeric_header_columns = [
            "total_withdrawal",
            "total_deposit",
            "total_withdrawal_transaction",
            "total_deposit_transaction"
        ]
        for col in numeric_header_columns:
            if col in headers_dataframe.columns:
                headers_dataframe[col] = pd.to_numeric(
                    headers_dataframe[col], errors="coerce"
                )

        return transactions_dataframe, headers_dataframe

    @staticmethod
    def clean_and_format_data(
        transactions_dataframe: pd.DataFrame,
        headers_dataframe: pd.DataFrame
    ) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Take the raw DataFrames from extract_from_pages() and:
          1) Filter out rows without description or amount
          2) Select & rename columns to a standardized schema (withdrawal→debit, deposit→credit)
          3) Replace NaNs with empty strings
          4) From headers, select & rename total_* → total_debit / total_credit, etc.
          5) Drop header rows whose page_id doesn’t start with a digit
          6) In transactions, add default “code” and “transaction_type” columns
          7) Convert numeric columns (debit, credit, balance, header totals) to float
        Returns:
            (final_transactions_df, final_headers_df)
        """
        # 1) Filter invalid transaction rows
        filtered_transactions = (
            transactions_dataframe[
                ~(transactions_dataframe["description"].isnull())
                & (
                    (~transactions_dataframe["withdrawal"].isnull())
                    | (~transactions_dataframe["deposit"].isnull())
                )
            ]
            .copy()
        )

        # 2) Select & rename columns for transactions
        selected_transactions = filtered_transactions[
            [
                "page_id", "date", "time", "description", "withdrawal",
                "deposit", "balance", "channel", "description_addon"
            ]
        ].copy()
        renamed_transactions = selected_transactions.rename(
            columns={
                "withdrawal": "debit",
                "deposit": "credit"
            }
        )
        renamed_transactions.fillna("", inplace=True)

        # 3) Prepare & clean headers
        cols_to_select = [
            "page_id", "account_name", "account_number", "period",
            "total_withdrawal", "total_deposit",
            "total_withdrawal_transaction", "total_deposit_transaction",
            "address"
        ]
        available_cols = [c for c in cols_to_select if c in headers_dataframe.columns]
        selected_headers = headers_dataframe[available_cols].copy()
        renamed_headers = selected_headers.rename(
            columns={
                "total_withdrawal": "total_debit",
                "total_deposit": "total_credit",
                "total_withdrawal_transaction": "total_debit_transaction",
                "total_deposit_transaction": "total_credit_transaction"
            }
        )
        filtered_headers = (
            renamed_headers[
                renamed_headers["page_id"].str.match(r"^\d", na=False)
            ]
            .reset_index(drop=True)
        )
        filtered_headers.fillna("", inplace=True)

        # 4) Enrich transaction DataFrame
        enriched_transactions = renamed_transactions.copy()
        enriched_transactions["code"] = None
        enriched_transactions["transaction_type"] = enriched_transactions["description_addon"]
        final_transactions = enriched_transactions[
            [
                "page_id", "date", "time", "code", "channel",
                "debit", "credit", "balance", "description", "transaction_type"
            ]
        ].copy()

        # 5) Clean numeric columns in transactions
        for column_name in ["debit", "credit", "balance"]:
            final_transactions[column_name] = (
                final_transactions[column_name]
                .replace(r"[^0-9\.]+", "", regex=True)
                .pipe(pd.to_numeric, errors="coerce")
            )

        # 6) Clean numeric columns in headers
        for column_name in [
            "total_debit", "total_credit",
            "total_debit_transaction", "total_credit_transaction"
        ]:
            if column_name in filtered_headers.columns:
                filtered_headers[column_name] = (
                    filtered_headers[column_name]
                    .replace(r"[^0-9\.]+", "", regex=True)
                    .pipe(pd.to_numeric, errors="coerce")
                )

        return final_transactions, filtered_headers

    def run(
        self,
        pdf_path: str,
        password: Optional[str] = None
    ) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Open the PDF at `pdf_path` with optional `password`, parse all pages,
        and return cleaned transaction and header DataFrames.
        """
        with pdfplumber.open(pdf_path, password=password) as pdf:
            pages = pdf.pages

            # Extract raw DataFrames
            raw_tx_df, raw_hdr_df = self.extract_from_pages(pages)

            # Clean & format
            clean_tx_df, clean_hdr_df = KBankExtractor.clean_and_format_data(raw_tx_df, raw_hdr_df)

        return clean_tx_df, clean_hdr_df





In [5]:
if __name__ == "__main__":
    # Example usage:
    # 1) Ensure config.py is set up (without PDF_PATH or PASSWORD).
    # 2) Instantiate the extractor and call run() with path & password.

    pdf_file_path = "/Users/if658228/Downloads/OneDrive_1_5-20-2025/agentic_extraction/Dataset04/KBANK/108988-02009331-2566_1_KBANK.pdf"
    # If your PDF is password-protected, set it here; otherwise, leave as None
    pdf_password = None

    extractor = KBankExtractor()
    cleaned_transactions, cleaned_headers = extractor.run(pdf_file_path, pdf_password)

    print("=== Cleaned Transactions ===")
    print(cleaned_transactions.head().to_string(index=False))
    print("\n=== Cleaned Headers ===")
    print(cleaned_headers.head().to_string(index=False))

=== Cleaned Transactions ===
page_id       date  time code                   channel   debit  credit  balance                          description     transaction_type
   1/11 2023-05-01 02:30 None โอนเข้า/หักบัญชีอัตโนมัติ     NaN 72750.0  73272.0                    จาก หจก.พรเงินทอง รับเงินเดือน/ค่าจ้าง
   1/11 2023-05-01 11:33 None                    K PLUS 69000.0     NaN   4272.0 โอนไป SCB X1774 นางสาว สินนภา เต็ม++              โอนเงิน
   1/11 2023-05-01 16:24 None                    K PLUS   720.0     NaN   3552.0 โอนไป KTB X2552 น.ส. รัชณีวรรณ ภูม++              โอนเงิน
   1/11 2023-05-01 21:31 None โอนเข้า/หักบัญชีอัตโนมัติ  2390.0     NaN   1162.0                 รหัสอ้างอิง PCB09080      หักชำระสินเชื่อ
   1/11 2023-05-02 00:59 None                    K PLUS   300.0     NaN    862.0 เพื่อชำระ Ref X5442 TrueMoney Wallet             ชำระเงิน

=== Cleaned Headers ===
page_id             account_name account_number                  period  total_debit  total_credit  total_debit_

  renamed_transactions.fillna("", inplace=True)
  filtered_headers.fillna("", inplace=True)


In [2]:
# extractor.py

import re
from typing import Optional, List, Dict, Tuple, Any

import pdfplumber
import pandas as pd

import config


class KBANKStatementExtractor:


    def run(
        self,
        pdf_path: str,
        password: Optional[str] = None
    ) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Open the PDF at `pdf_path` using `password`, parse all pages,
        and return two DataFrames:
          - transactions_dataframe
          - headers_dataframe

        Usage:
            extractor = KBANKStatementExtractor()
            transactions_dataframe, headers_dataframe = extractor.run("/path/to/file.pdf", "mypassword")
        """
        # 1) Open the PDF and collect all pages
        with pdfplumber.open(pdf_path, password=password) as pdf_file:
            page_list = pdf_file.pages[:]  # List[pdfplumber.page.Page]

        # 2) Extract raw transaction rows and raw header rows from each page
        raw_transactions_dataframe, raw_headers_dataframe = KBANKStatementExtractor.extractor(page_list)

        # 3) Clean and format the raw DataFrames
        final_transactions_dataframe, final_headers_dataframe = KBANKStatementExtractor.clean_and_format_data(
            raw_transactions_dataframe,
            raw_headers_dataframe
        )

        # 4) Return the cleaned DataFrames
        return final_transactions_dataframe, final_headers_dataframe

    @staticmethod
    def _compute_date_tops(word_list: List[dict]) -> List[float]:
        """
        Identify y-coordinates ("top") of all words matching config.DATE_PATTERN within config.X_BOUNDS.
        Returns a sorted list of those y-coordinates.
        """
        return sorted(
            word["top"]
            for word in word_list
            if config.DATE_PATTERN.match(word["text"])
            and config.X_BOUNDS["date_min"] <= word["x0"] <= config.X_BOUNDS["date_max"]
        )

    @staticmethod
    def _compute_intervals(date_top_list: List[float]) -> List[Tuple[float, float]]:
        """
        Given a sorted list of y-coordinates for dates, compute vertical intervals around each date.
        These intervals define “rows” in the transaction table.
        """
        interval_list: List[Tuple[float, float]] = []
        for index, top_value in enumerate(date_top_list):
            start_value = top_value - config.tolerance_settings["y_margin"]
            if index + 1 < len(date_top_list):
                end_value = date_top_list[index + 1] - config.tolerance_settings["y_margin"]
            else:
                if index > 0:
                    delta_value = top_value - date_top_list[index - 1]
                else:
                    delta_value = config.tolerance_settings["y_margin"] * 2
                end_value = top_value + delta_value - config.tolerance_settings["y_margin"]
            interval_list.append((start_value, end_value))
        return interval_list

    @staticmethod
    def _assign_rows(word_list: List[dict], interval_list: List[Tuple[float, float]]) -> List[List[dict]]:
        """
        Assign each word to the first interval (row) whose vertical range contains that word’s top.
        Returns a list of lists, where each inner list contains the words for one row.
        """
        rows_for_each_interval: List[List[dict]] = [[] for _ in interval_list]
        for word in word_list:
            for interval_index, (start_value, end_value) in enumerate(interval_list):
                if start_value <= word["top"] < end_value:
                    rows_for_each_interval[interval_index].append(word)
                    break
        return rows_for_each_interval

    @staticmethod
    def _extract_header(page) -> Dict[str, Optional[str]]:
        """
        Crop out header fields (account name, account number, statement period, totals, etc.) using config.CROP_REGIONS.
        If "account_name" crop has two lines, the first line becomes account_name and the second line becomes address.
        Numeric totals are cleaned via config.NUMERIC_CLEAN_PATTERN.
        """
        header_data: Dict[str, Optional[str]] = {}

        for field_name, bounding_box in config.CROP_REGIONS.items():
            raw_text = page.crop(bounding_box).extract_text() or ""
            text_content = raw_text.strip()

            # Special handling: if "account_name" crop has two lines → first line is name, second is address
            if field_name == "account_name":
                line_list = text_content.splitlines()
                header_data["account_name"] = line_list[0].strip() if line_list else ""
                if len(line_list) > 1:
                    header_data["address"] = line_list[1].strip()
                else:
                    header_data.setdefault("address", "")
                continue

            # If field_name == "address" but address was already set above, skip
            if field_name == "address" and header_data.get("address"):
                continue

            # Clean numeric totals for specific fields
            if field_name in {
                "total_withdrawal_transaction",
                "total_deposit_transaction",
                "total_withdrawal",
                "total_deposit",
                "ending_balance",
            }:
                match_object = config.NUMERIC_CLEAN_PATTERN.search(text_content)
                header_data[field_name] = match_object.group().replace(",", "") if match_object else None
            else:
                header_data[field_name] = text_content

        return header_data

    @staticmethod
    def _parse_transaction_row(
        sorted_word_list: List[dict],
        page_identifier: str,
        account_address: str
    ) -> Optional[Dict[str, Any]]:
        """
        Given a sorted list of words representing one row, assign each word to:
          - date (config.DATE_PATTERN)
          - time (config.TIME_PATTERN)
          - withdrawal amount / deposit amount / balance (config.MONEY_PATTERN + x-position logic)
          - description_addon (if x between date_max and amount_desc_split)
          - channel (if x between amount_desc_split and channel_details_split)
          - detail description (everything else)
        Returns a dict with keys:
            {
              "page_id", "address", "date", "time", "description",
              "withdrawal", "deposit", "balance", "channel", "description_addon"
            }
        If no date is found in that row, returns None.
        """
        date_value = ""
        time_value = ""
        withdrawal_amount = None
        deposit_amount = None
        balance_amount = None
        description_addon_parts: List[str] = []
        channel_parts: List[str] = []
        detail_description_parts: List[str] = []

        for word in sorted_word_list:
            x_position = word["x0"]
            text_value = word["text"]
            x1_position = word["x1"]

            # 1) Date
            if config.DATE_PATTERN.match(text_value) and config.X_BOUNDS["date_min"] <= x_position <= config.X_BOUNDS["date_max"]:
                date_value = text_value

            # 2) Time
            elif config.TIME_PATTERN.match(text_value):
                time_value = text_value

            # 3) Money (withdrawal / deposit / balance)
            elif config.MONEY_PATTERN.match(text_value):
                numeric_value = float(text_value.replace(",", ""))
                if (
                    x_position <= config.X_BOUNDS["withdraw_deposit_split"] + config.tolerance_settings["x_tolerance"]
                    and
                    x1_position <= config.X_BOUNDS["withdraw_deposit_split_x1"] + config.tolerance_settings["x_tolerance"]
                ):
                    withdrawal_amount = numeric_value
                elif x_position <= config.X_BOUNDS["amount_balance_split"] + config.tolerance_settings["x_tolerance"]:
                    deposit_amount = numeric_value
                else:
                    balance_amount = numeric_value

            # 4) description_addon if x just right of date_max but ≤ amount_desc_split
            elif (
                x_position > config.X_BOUNDS["date_max"] + config.tolerance_settings["x_tolerance"]
                and x_position <= config.X_BOUNDS["amount_desc_split"]
            ):
                description_addon_parts.append(text_value)

            # 5) channel if x just right of amount_desc_split but ≤ channel_details_split
            elif (
                x_position > config.X_BOUNDS["amount_desc_split"] + config.tolerance_settings["x_tolerance"]
                and x_position <= config.X_BOUNDS["channel_details_split"]
            ):
                channel_parts.append(text_value)

            # 6) Everything else → detail description
            else:
                detail_description_parts.append(text_value)

        if not date_value:
            return None

        return {
            "page_id": page_identifier,
            "address": account_address,
            "date": pd.to_datetime(date_value, format="%d-%m-%y", errors="coerce"),
            "time": time_value,
            "description": " ".join(detail_description_parts).strip(),
            "withdrawal": withdrawal_amount,
            "deposit": deposit_amount,
            "balance": balance_amount,
            "channel": " ".join(channel_parts).strip(),
            "description_addon": " ".join(description_addon_parts).strip()
        }

    @staticmethod
    def _parse_page(page) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
        """
        Parse a single pdfplumber Page object:
          1) Extract header info via _extract_header
          2) Determine page_id via config.PAGE_ID_PATTERN
          3) If “ENDING BALANCE” or “ยอดยกไป” not found, set header totals to None
          4) Find all table regions (or entire page if none), group words into rows,
             and convert each row to a transaction dict via _parse_transaction_row.
        Returns:
            - header_information: dict of header fields + page_id
            - transaction_record_list: list of dicts (one per valid transaction row)
        """
        # 1) Extract header fields
        header_information = KBANKStatementExtractor._extract_header(page)

        # 2) Clean page_id
        raw_page_id_value = header_information.get("page", "")
        page_id_match = config.PAGE_ID_PATTERN.search(raw_page_id_value)
        page_identifier = page_id_match.group(1) if page_id_match else raw_page_id_value
        header_information["page_id"] = page_identifier

        # 3) If no “ENDING BALANCE” or “ยอดยกไป” in the page text, null out header totals
        full_page_text = page.extract_text() or ""
        if not re.search(r"(ยอดยกไป|ENDING BALANCE)", full_page_text, re.IGNORECASE):
            for field_name in [
                "total_withdrawal_transaction",
                "total_deposit_transaction",
                "total_withdrawal",
                "total_deposit",
                "ending_balance"
            ]:
                header_information[field_name] = None

        # 4) Extract transaction rows
        transaction_record_list: List[Dict[str, Any]] = []
        table_list = page.find_tables(config.TABLE_SETTINGS)
        region_list = [page.crop(table.bbox) for table in table_list] if table_list else [page]

        for region in region_list:
            word_list = region.extract_words(use_text_flow=True)
            date_top_list = KBANKStatementExtractor._compute_date_tops(word_list)
            if not date_top_list:
                continue

            interval_list = KBANKStatementExtractor._compute_intervals(date_top_list)
            rows_of_words = KBANKStatementExtractor._assign_rows(word_list, interval_list)

            for words_in_one_row in rows_of_words:
                if not words_in_one_row:
                    continue
                combined_row_text = " ".join(word["text"] for word in words_in_one_row)
                if "ENDING BALANCE" in combined_row_text or "ยอดยกไป" in combined_row_text:
                    continue

                sorted_word_list = sorted(words_in_one_row, key=lambda word: (word["top"], word["x0"]))
                transaction_record = KBANKStatementExtractor._parse_transaction_row(
                    sorted_word_list,
                    page_identifier,
                    header_information.get("address", "")
                )
                if transaction_record:
                    transaction_record_list.append(transaction_record)

        return header_information, transaction_record_list

    @staticmethod
    def extractor(
        page_list: List[pdfplumber.page.Page]
    ) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Parse a list of pdfplumber Page objects (opened externally).
        Returns two DataFrames:
          - transactions_dataframe
          - headers_dataframe

        It does NOT open any PDF internally; you must open the PDF and pass `page_list`.
        """
        header_data_list: List[Dict[str, Any]] = []
        transaction_data_list: List[Dict[str, Any]] = []

        for page_index, page in enumerate(page_list):
            try:
                header_information, transaction_record_list = KBANKStatementExtractor._parse_page(page)
                header_data_list.append(header_information)
                transaction_data_list.extend(transaction_record_list)
            except Exception as error:
                print(f"⚠️ Error on page {page_index + 1}: {error}")

        transactions_dataframe = pd.DataFrame(transaction_data_list).copy()
        headers_dataframe = pd.DataFrame(header_data_list).copy()

        if not transactions_dataframe.empty:
            # 1) Sum total withdrawal and total deposit per page
            totals_dataframe = (
                transactions_dataframe
                .groupby("page_id")[["withdrawal", "deposit"]]
                .sum()
                .rename(
                    columns={
                        "withdrawal": "total_withdrawal_each_page",
                        "deposit": "total_deposit_each_page"
                    }
                )
                .reset_index()
            )

            # 2) Count number of withdrawal transactions and deposit transactions per page
            counts_dataframe = (
                transactions_dataframe
                .groupby("page_id")
                .agg(
                    total_withdrawal_transaction_each_page=pd.NamedAgg(
                        column="withdrawal", aggfunc=lambda series: series.notnull().sum()
                    ),
                    total_deposit_transaction_each_page=pd.NamedAgg(
                        column="deposit", aggfunc=lambda series: series.notnull().sum()
                    )
                )
                .reset_index()
            )

            # 3) Merge totals and counts back into transactions_dataframe and headers_dataframe
            transactions_dataframe = (
                transactions_dataframe
                .merge(totals_dataframe, on="page_id")
                .merge(counts_dataframe, on="page_id")
            )
            headers_dataframe = (
                headers_dataframe
                .merge(totals_dataframe, on="page_id", how="left")
                .merge(counts_dataframe, on="page_id", how="left")
            )

        # 4) Split the "period" field into start_period and end_period if present
        if "period" in headers_dataframe.columns:
            period_parts_dataframe = (
                headers_dataframe["period"]
                .str.replace(" ", "", regex=False)
                .str.split(r"[-–]", n=1, expand=True)
            )
            headers_dataframe["start_period"] = pd.to_datetime(
                period_parts_dataframe[0], dayfirst=True, errors="coerce"
            )
            if period_parts_dataframe.shape[1] > 1:
                headers_dataframe["end_period"] = pd.to_datetime(
                    period_parts_dataframe[1], dayfirst=True, errors="coerce"
                )
            else:
                headers_dataframe["end_period"] = None

        # 5) Convert numeric header columns to numeric dtype
        numeric_header_column_list = [
            "total_withdrawal",
            "total_deposit",
            "total_withdrawal_transaction",
            "total_deposit_transaction"
        ]
        for column_name in numeric_header_column_list:
            if column_name in headers_dataframe.columns:
                headers_dataframe[column_name] = pd.to_numeric(headers_dataframe[column_name], errors="coerce")

        return transactions_dataframe, headers_dataframe

    @staticmethod
    def clean_and_format_data(
        transactions_dataframe: pd.DataFrame,
        headers_dataframe: pd.DataFrame
    ) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Take the raw DataFrames from extractor() and:
          1) Filter out rows without description or amount
          2) Select and rename columns to a standardized schema (withdrawal → debit, deposit → credit)
          3) Replace NaNs with empty strings
          4) From headers, select and rename total_* → total_debit / total_credit, etc.
          5) Drop header rows whose page_id does not start with a digit
          6) In transactions, add default "code" and "transaction_type" columns
          7) Convert numeric columns (debit, credit, balance, header totals) to float
        Returns:
          (final_transactions_dataframe, final_headers_dataframe)
        """
        # 1) Filter out any transaction rows missing description or amount
        filtered_transactions = (
            transactions_dataframe[
                ~(transactions_dataframe["description"].isnull())
                & (
                    (~transactions_dataframe["withdrawal"].isnull())
                    | (~transactions_dataframe["deposit"].isnull())
                )
            ]
            .copy()
        )

        # 2) Select and rename columns for transactions
        selected_transactions = filtered_transactions[
            [
                "page_id", "date", "time", "description", "withdrawal",
                "deposit", "balance", "channel", "description_addon"
            ]
        ].copy()
        renamed_transactions = selected_transactions.rename(
            columns={"withdrawal": "debit", "deposit": "credit"}
        )
        renamed_transactions.fillna("", inplace=True)

        # 3) Prepare and clean headers
        header_columns_to_select = [
            "page_id", "account_name", "account_number", "period",
            "total_withdrawal", "total_deposit",
            "total_withdrawal_transaction", "total_deposit_transaction",
            "address"
        ]
        available_header_columns = [column for column in header_columns_to_select if column in headers_dataframe.columns]
        selected_headers = headers_dataframe[available_header_columns].copy()
        renamed_headers = selected_headers.rename(
            columns={
                "total_withdrawal": "total_debit",
                "total_deposit": "total_credit",
                "total_withdrawal_transaction": "total_debit_transaction",
                "total_deposit_transaction": "total_credit_transaction"
            }
        )
        filtered_headers = (
            renamed_headers[
                renamed_headers["page_id"].str.match(r"^\d", na=False)
            ]
            .reset_index(drop=True)
        )
        filtered_headers.fillna("", inplace=True)

        # 4) Enrich transaction DataFrame
        enriched_transactions = renamed_transactions.copy()
        enriched_transactions["code"] = None
        enriched_transactions["transaction_type"] = enriched_transactions["description_addon"]
        final_transactions = enriched_transactions[
            [
                "page_id", "date", "time", "code", "channel",
                "debit", "credit", "balance", "description", "transaction_type"
            ]
        ].copy()

        # 5) Clean numeric columns in transactions
        for column_name in ["debit", "credit", "balance"]:
            final_transactions[column_name] = (
                final_transactions[column_name]
                .replace(r"[^0-9\.]+", "", regex=True)
                .pipe(pd.to_numeric, errors="coerce")
            )

        # 6) Clean numeric columns in headers
        for column_name in [
            "total_debit", "total_credit",
            "total_debit_transaction", "total_credit_transaction"
        ]:
            if column_name in filtered_headers.columns:
                filtered_headers[column_name] = (
                    filtered_headers[column_name]
                    .replace(r"[^0-9\.]+", "", regex=True)
                    .pipe(pd.to_numeric, errors="coerce")
                )

        return final_transactions, filtered_headers


In [1]:
from kbank_extractor import KBANKStatementExtractor
extractor = KBANKStatementExtractor()
transactions_dataframe, headers_dataframe = extractor.run("/Users/if658228/Downloads/OneDrive_1_5-20-2025/agentic_extraction/Dataset04/KBANK/108988-02009331-2566_1_KBANK.pdf", None)

print(transactions_dataframe.head())
print(headers_dataframe.head())


  page_id       date   time  code                    channel    debit  \
1    1/11 2023-05-01  02:30  None  โอนเข้า/หักบัญชีอัตโนมัติ      NaN   
2    1/11 2023-05-01  11:33  None                     K PLUS  69000.0   
3    1/11 2023-05-01  16:24  None                     K PLUS    720.0   
4    1/11 2023-05-01  21:31  None  โอนเข้า/หักบัญชีอัตโนมัติ   2390.0   
5    1/11 2023-05-02  00:59  None                     K PLUS    300.0   

    credit  balance                           description  \
1  72750.0  73272.0                     จาก หจก.พรเงินทอง   
2      NaN   4272.0  โอนไป SCB X1774 นางสาว สินนภา เต็ม++   
3      NaN   3552.0  โอนไป KTB X2552 น.ส. รัชณีวรรณ ภูม++   
4      NaN   1162.0                  รหัสอ้างอิง PCB09080   
5      NaN    862.0  เพื่อชำระ Ref X5442 TrueMoney Wallet   

       transaction_type  
1  รับเงินเดือน/ค่าจ้าง  
2               โอนเงิน  
3               โอนเงิน  
4       หักชำระสินเชื่อ  
5              ชำระเงิน  
  page_id              account_name ac

  renamed_transactions.fillna("", inplace=True)
  filtered_headers.fillna("", inplace=True)


In [5]:
transactions_dataframe

Unnamed: 0,page_id,date,time,code,channel,debit,credit,balance,description,transaction_type
1,1/11,2023-05-01,02:30,,โอนเข้า/หักบัญชีอัตโนมัติ,,72750.0,73272.00,จาก หจก.พรเงินทอง,รับเงินเดือน/ค่าจ้าง
2,1/11,2023-05-01,11:33,,K PLUS,69000.0,,4272.00,โอนไป SCB X1774 นางสาว สินนภา เต็ม++,โอนเงิน
3,1/11,2023-05-01,16:24,,K PLUS,720.0,,3552.00,โอนไป KTB X2552 น.ส. รัชณีวรรณ ภูม++,โอนเงิน
4,1/11,2023-05-01,21:31,,โอนเข้า/หักบัญชีอัตโนมัติ,2390.0,,1162.00,รหัสอ้างอิง PCB09080,หักชำระสินเชื่อ
5,1/11,2023-05-02,00:59,,K PLUS,300.0,,862.00,เพื่อชำระ Ref X5442 TrueMoney Wallet,ชำระเงิน
...,...,...,...,...,...,...,...,...,...,...
366,11/11,2023-07-31,13:45,,K PLUS,3100.0,,0.75,โอนไป X0395 น.ส. สินนภา เต็มธา++,โอนเงิน
367,11/11,2023-07-31,16:29,,Internet/Mobile SCB,,1000.0,1000.75,จาก SCB X1774 นางสาว สินนภา เต็ม++,รับโอนเงิน
368,11/11,2023-07-31,16:36,,K PLUS,1000.0,,0.75,โอนไป X0395 น.ส. สินนภา เต็มธา++,โอนเงิน
369,11/11,2023-07-31,22:37,,Internet/Mobile SCB,,1500.0,1500.75,จาก SCB X1774 นางสาว สินนภา เต็ม++,รับโอนเงิน


In [4]:
headers_dataframe

Unnamed: 0,page_id,account_name,account_number,period,total_debit,total_credit,total_debit_transaction,total_credit_transaction,address
0,1/11,น.ส. สินนภา เต็มธารทิพย์,XXX-X-XX272-9,01/05/2023 - 31/07/2023,1480941.31,1480420.06,242.0,118.0,29/58 ม.5 ต.ศาลายา อ.พุทธมณฑล จ.นครปฐม 73170
1,2/11,น.ส. สินนภา เต็มธารทิพย์,XXX-X-XX272-9,01/05/2023 - 31/07/2023,,,,,29/58 ม.5 ต.ศาลายา อ.พุทธมณฑล จ.นครปฐม 73170
2,3/11,น.ส. สินนภา เต็มธารทิพย์,XXX-X-XX272-9,01/05/2023 - 31/07/2023,,,,,29/58 ม.5 ต.ศาลายา อ.พุทธมณฑล จ.นครปฐม 73170
3,4/11,น.ส. สินนภา เต็มธารทิพย์,XXX-X-XX272-9,01/05/2023 - 31/07/2023,,,,,29/58 ม.5 ต.ศาลายา อ.พุทธมณฑล จ.นครปฐม 73170
4,5/11,น.ส. สินนภา เต็มธารทิพย์,XXX-X-XX272-9,01/05/2023 - 31/07/2023,,,,,29/58 ม.5 ต.ศาลายา อ.พุทธมณฑล จ.นครปฐม 73170
5,6/11,น.ส. สินนภา เต็มธารทิพย์,XXX-X-XX272-9,01/05/2023 - 31/07/2023,,,,,29/58 ม.5 ต.ศาลายา อ.พุทธมณฑล จ.นครปฐม 73170
6,7/11,น.ส. สินนภา เต็มธารทิพย์,XXX-X-XX272-9,01/05/2023 - 31/07/2023,,,,,29/58 ม.5 ต.ศาลายา อ.พุทธมณฑล จ.นครปฐม 73170
7,8/11,น.ส. สินนภา เต็มธารทิพย์,XXX-X-XX272-9,01/05/2023 - 31/07/2023,,,,,29/58 ม.5 ต.ศาลายา อ.พุทธมณฑล จ.นครปฐม 73170
8,9/11,น.ส. สินนภา เต็มธารทิพย์,XXX-X-XX272-9,01/05/2023 - 31/07/2023,,,,,29/58 ม.5 ต.ศาลายา อ.พุทธมณฑล จ.นครปฐม 73170
9,10/11,น.ส. สินนภา เต็มธารทิพย์,XXX-X-XX272-9,01/05/2023 - 31/07/2023,,,,,29/58 ม.5 ต.ศาลายา อ.พุทธมณฑล จ.นครปฐม 73170


In [2]:
#!/usr/bin/env python3
import os, traceback
import numpy as np
import pandas as pd
from typing import Tuple, List, Dict, Any, Optional
import pdfplumber
from kbank_extractor import KBANKStatementExtractor


INPUT_FOLDER = "/Users/if658228/Downloads/OneDrive_1_5-20-2025/agentic_extraction/Dataset04/KBANK"
PASSWORD: Optional[str] = None



def process_folder(input_folder: str) -> Tuple[pd.DataFrame, pd.DataFrame, List[dict]]:
    tx_list, hdr_list, failures = [], [], []
    for fn in os.listdir(input_folder):
        if not fn.lower().endswith(".pdf"):
            continue
        path = os.path.join(input_folder, fn)
        try:
            with pdfplumber.open(path, password=PASSWORD) as pdf:
                pages = pdf.pages
                df_tx, df_hdr = KBANKStatementExtractor.run(path, None)
                df_hdr["source_file"] = fn
                df_tx ["source_file"] = fn
                tx_list.append(df_tx)
                hdr_list.append(df_hdr)
        except Exception as e:
            failures.append({
                "file": fn,
                "error": str(e),
                "traceback": traceback.format_exc()
            })
    all_tx  = pd.concat(tx_list, ignore_index=True) if tx_list else pd.DataFrame()
    all_hdr = pd.concat(hdr_list, ignore_index=True) if hdr_list else pd.DataFrame()
    return all_tx, all_hdr, failures

def validate_bbl(df_tx_all: pd.DataFrame, df_hdr_all: pd.DataFrame) -> pd.DataFrame:
    # STEP 1: ensure numeric
    tx_clean = df_tx_all.copy()
    tx_clean[['debit','credit']] = (
        tx_clean[['debit','credit']]
        .replace('', np.nan)
        .astype(float)
    )

    # STEP 2: sums per file
    sums = (tx_clean
            .groupby('source_file')[['debit','credit']]
            .sum(min_count=1)
            .rename(columns={
                'debit':'sum_debit',
                'credit'   :'sum_credit'
            }))

    # STEP 3: counts per file
    counts = (tx_clean
              .groupby('source_file')[['debit','credit']]
              .count()
              .rename(columns={
                  'debit':'count_debit_tx',
                  'credit'   :'count_credit_tx'
              }))

    # STEP 4: pick header summary columns (drop pages without both)
    # note: we sum the two *_transaction fields into total_txns
    hdr = (df_hdr_all
           .dropna(subset=['total_debit_transaction','total_credit_transaction'])
           .groupby('source_file')
           .agg({
               'total_debit_transaction':'sum',
               'total_debit'            :'sum',
               'total_credit_transaction'   :'sum',
               'total_credit'               :'sum'
           })
          )
    hdr = hdr.rename(columns={
        'total_debit_transaction':'total_debit_txns',
        'total_debit'            :'total_debit',
        'total_credit_transaction'   :'total_credit_txns',
        'total_credit'               :'total_credit'
    })


    # STEP 5: merge & compare
    cmp = (hdr
           .join(sums,   how='left')
           .join(counts, how='left')
           .reset_index()
           .rename(columns={'source_file':'file'}))

    summary = cmp.assign(
        debit_amount_match = lambda d: np.isclose(d['total_debit'], d['sum_debit'], atol=1e-2),
        credit_amount_match    = lambda d: np.isclose(d['total_credit'],    d['sum_credit'],    atol=1e-2),
        transaction_count_debit_match = lambda d: d['total_debit_txns'] == (d['count_debit_tx']),
        transaction_count_credit_match = lambda d: d['total_credit_txns'] == (d['count_credit_tx'])
        
    )[
        ['file',
         'total_debit','sum_debit','debit_amount_match',
         'total_credit',   'sum_credit',   'credit_amount_match',
         'total_credit_txns','total_debit_txns',      'count_debit_tx','count_credit_tx','transaction_count_debit_match','transaction_count_credit_match']
    ]

    return summary

if __name__ == "__main__":
    df_tx_all, df_hdr_all, failures = process_folder(INPUT_FOLDER)
    print(f"Processed transactions: {df_tx_all.shape}, headers: {df_hdr_all.shape}")
    if failures:
        print(f"\n⚠️ {len(failures)} failures; inspect `failures` list.")
    if df_tx_all.empty or df_hdr_all.empty:
        print("No data to validate; exiting.")
        exit(1)
    print(df_tx_all,df_hdr_all)
    summary = validate_bbl(df_tx_all, df_hdr_all)
    print("\n--- Validation Summary per File ---")
    print(summary.to_string(index=False))

    bad = summary.loc[~(summary.debit_amount_match
                        & summary.credit_amount_match
                        & summary.transaction_count_debit_match
                        & summary.transaction_count_credit_match)]
    if not bad.empty:
        bad = bad.assign(
            diff_debit = bad['sum_debit'] - bad['total_debit'],
            diff_credit    = bad['sum_credit']    - bad['total_credit']
        )
        print("\n❌ Files with mismatches:")
        print(bad.to_string(index=False))
    else:
        print("\n✅ All files validated successfully!")


  renamed_transactions.fillna("", inplace=True)
  filtered_headers.fillna("", inplace=True)
  renamed_transactions.fillna("", inplace=True)
  filtered_headers.fillna("", inplace=True)
  renamed_transactions.fillna("", inplace=True)
  filtered_headers.fillna("", inplace=True)
  renamed_transactions.fillna("", inplace=True)
  filtered_headers.fillna("", inplace=True)
  renamed_transactions.fillna("", inplace=True)
  filtered_headers.fillna("", inplace=True)
  renamed_transactions.fillna("", inplace=True)
  filtered_headers.fillna("", inplace=True)
  renamed_transactions.fillna("", inplace=True)
  filtered_headers.fillna("", inplace=True)
  renamed_transactions.fillna("", inplace=True)
  filtered_headers.fillna("", inplace=True)
  renamed_transactions.fillna("", inplace=True)
  filtered_headers.fillna("", inplace=True)
  renamed_transactions.fillna("", inplace=True)
  filtered_headers.fillna("", inplace=True)
  renamed_transactions.fillna("", inplace=True)
  filtered_headers.fillna("", in

Processed transactions: (185805, 11), headers: (6210, 10)
       page_id       date   time  code              channel  debit  credit  \
0          1/9 2023-05-01  13:34  None               K PLUS  100.0     NaN   
1          1/9 2023-05-01  13:42  None               K PLUS  100.0     NaN   
2          1/9 2023-05-01  14:57  None               K PLUS  200.0     NaN   
3          1/9 2023-05-01  16:03  None               K PLUS   50.0     NaN   
4          1/9 2023-05-01  16:46  None               K PLUS  145.0     NaN   
...        ...        ...    ...   ...                  ...    ...     ...   
185800   33/34 2023-08-20  11:42  None  Internet/Mobile BBL    NaN   600.0   
185801   34/34 2023-08-20  11:43  None  Internet/Mobile BBL    NaN   500.0   
185802   34/34 2023-08-20  11:44  None                  ATM  800.0     NaN   
185803   34/34 2023-08-20  14:51  None               K PLUS  420.0     NaN   
185804   34/34 2023-08-20  15:35  None               K PLUS  129.0     NaN   

     

In [6]:
summary

Unnamed: 0,file,total_debit,sum_debit,debit_amount_match,total_credit,sum_credit,credit_amount_match,total_credit_txns,total_debit_txns,count_debit_tx,count_credit_tx,transaction_count_debit_match,transaction_count_credit_match
0,108988-02009053-2566_1_KBANK.pdf,101617.03,101617.03,True,97295.49,97295.49,True,36.0,143.0,143,36,True,True
1,108988-02009057-2566_1_KBANK.pdf,190945.57,190945.57,True,191945.57,191945.57,True,97.0,403.0,403,97,True,True
2,108988-02009152-2566_1_KBANK.pdf,1765377.80,1765377.80,True,1765433.40,1765433.40,True,195.0,464.0,464,195,True,True
3,108988-02009331-2566_1_KBANK.pdf,1480941.31,1480941.31,True,1480420.06,1480420.06,True,118.0,242.0,242,118,True,True
4,108988-02009407-2566_1_KBANK.pdf,256410.23,256410.23,True,274344.57,274344.57,True,102.0,163.0,163,102,True,True
...,...,...,...,...,...,...,...,...,...,...,...,...,...
354,108988-02040446-2566_1_KBANK.pdf,362950.96,362950.96,True,366790.87,366790.87,True,99.0,440.0,440,99,True,True
355,108988-02061950-2566_1_KBANK.pdf,199329.84,199329.84,True,211531.10,211531.10,True,62.0,201.0,201,62,True,True
356,108988-02068090-2566_1_KBANK.pdf,76249.87,76249.87,True,73941.12,73941.12,True,8.0,103.0,103,8,True,True
357,108988-02069681-2566_1_KBANK.pdf,151062.00,151062.00,True,159912.00,159912.00,True,23.0,63.0,63,23,True,True


In [7]:
bad

Unnamed: 0,file,total_debit,sum_debit,debit_amount_match,total_credit,sum_credit,credit_amount_match,total_credit_txns,total_debit_txns,count_debit_tx,count_credit_tx,transaction_count_debit_match,transaction_count_credit_match,diff_debit,diff_credit
358,หน้าไม่ครบ.pdf,3271149.2,3053359.44,False,3284561.62,3053187.8,False,3623.0,1658.0,1509,3295,False,False,-217789.76,-231373.82


In [8]:
failures

[]