In [1]:
import pandas as pd
import json
from datetime import datetime
import re

In [2]:
file_namet = "../data/statements cleaned/transactions_cleaned.csv"
dft = pd.read_csv(file_namet, encoding="latin2")

file_namer = "../data/rules.csv"
dfr = pd.read_csv(file_namer, encoding="latin2")



In [7]:
import json

def apply_conditions(mask, dft, conditions):
    """
    Applies additional AND conditions to an existing mask.
    """

    if not conditions:
        return mask

    # --- amount ---
    if "min_amount" in conditions:
        mask &= dft["amount"] >= conditions["min_amount"]

    if "max_amount" in conditions:
        mask &= dft["amount"] <= conditions["max_amount"]

    if "amount_range" in conditions:
        lo, hi = conditions["amount_range"]
        mask &= dft["amount"].between(lo, hi)

    if "amount_sign" in conditions:
        if conditions["amount_sign"] == "negative":
            mask &= dft["amount"] < 0
        elif conditions["amount_sign"] == "positive":
            mask &= dft["amount"] > 0

    # --- date ---
    if "effective_from" in conditions:
        mask &= dft["date"] >= conditions["effective_from"]

    if "effective_to" in conditions:
        mask &= dft["date"] <= conditions["effective_to"]

    # --- simple IN conditions ---
    for col in ["transaction_type", "country", "city", "currency", "merchant"]:
        if col in conditions:
            mask &= dft[col].isin(conditions[col])

    # --- text exclusions ---
    if "not_contains" in conditions:
        for val in conditions["not_contains"]:
            mask &= ~dft["description"].str.contains(val, case=False, na=False)

    # --- OR logic inside condition ---
    if "must_contain_any" in conditions:
        any_mask = False
        for val in conditions["must_contain_any"]:
            any_mask |= dft["description"].str.contains(val, case=False, na=False)
        mask &= any_mask

    # --- AND logic inside condition ---
    if "must_contain_all" in conditions:
        for val in conditions["must_contain_all"]:
            mask &= dft["description"].str.contains(val, case=False, na=False)

    return mask


In [8]:
def apply_rules(dft: pd.DataFrame, dfr: pd.DataFrame) -> pd.DataFrame:
    """
    Applies classification rules from dfr to transactions dft.
    Assigns merchant, category, subcategory and applied rule_id.
    """

    # --- ensure output columns exist ---
    for col in ["merchant", "category", "subcategory", "rule_id"]:
        if col not in dft.columns:
            dft[col] = None

    # --- sort rules by priority (highest first) ---
    dfr = dfr.sort_values("priority", ascending=False)

    # --- iterate rules ---
    for _, rule in dfr.iterrows():
        src_col = rule["source_column"]

        # skip if source column does not exist
        if src_col not in dft.columns:
            continue

        # --- build base match mask (pattern) ---
        if rule["match_type"] == "contains":
            mask = dft[src_col].str.contains(
                re.escape(rule["pattern"]),
                case=False,
                na=False
            )

        elif rule["match_type"] == "regex":
            try:
                mask = dft[src_col].str.contains(
                    rule["pattern"],
                    case=False,
                    na=False,
                    regex=True
                )
            except re.error:
                # invalid regex → ignore rule
                continue
        else:
            raise ValueError(f"Unsupported match_type: {rule['match_type']}")

        # --- NEW: apply conditions if present ---
        conditions = None
        if "conditions" in dfr.columns and pd.notna(rule.get("conditions")):
            try:
                conditions = json.loads(rule["conditions"])
            except json.JSONDecodeError:
                # malformed JSON → skip rule
                continue

        mask = apply_conditions(mask, dft, conditions)

        # --- apply only where not classified yet ---
        assign_mask = mask & dft["rule_id"].isna()

        if not assign_mask.any():
            continue

        dft.loc[assign_mask, "merchant"] = rule["merchant"]
        dft.loc[assign_mask, "category"] = rule["category"]
        dft.loc[assign_mask, "subcategory"] = rule["subcategory"]
        dft.loc[assign_mask, "rule_id"] = rule["id"]

    return dft


In [9]:
dft

Unnamed: 0,transaction_id,date,transaction_type,amount,currency,description,merchant,country,city,category,subcategory,rule_id
0,01006109774169505330633307706340,2025-11-26,card_payment,-31.00,PLN,OLLI S C,,POLSKA,WARSZAWA,,,
1,01005912774043215330148157931508,2025-11-26,card_payment,-72.16,PLN,APTEKA PRZY DOMANIEWSKIEJ,,POLSKA,WARSZAWA,,,
2,74988855330496415991685,2025-11-26,card_payment,-50.22,PLN,JMP S.A. BIEDRONKA 7224,,POLSKA,WARSZAWA,,,
3,06124000301111111111115990 d339,2025-11-27,transfer_to_account,6281.91,PLN,WYNAGRODZENIE,,,,,,
4,00049884974230785327180203293377,2025-11-23,card_payment,-4.30,PLN,ZABKA Z5795 K.1,,POLSKA,SIEDLCE,,,
...,...,...,...,...,...,...,...,...,...,...,...,...
5243,74378830318043513007084,2020-11-13,card_payment,-11.80,PLN,CYGARO EWA KAFAR,,POLSKA,NOWA WOLA,,,
5244,05147000022375130580000001 8a94,2020-11-13,transfer_blik,50.00,PLN,LA PANDILLA,,,,,,
5245,74378830310043185547183,2020-11-05,card_payment,-8.60,PLN,CYGARO EWA KAFAR,,POLSKA,NOWA WOLA,,,
5246,46103000190109850170141471 08d6,2020-11-05,transfer_to_account,400.00,PLN,KASQA,,,,,,
