In [8]:
import pandas as pd
from transaction_keywords import expense_categories, skip_city_country_keywords, income_categories
from api_key import API1
import json
from google import genai
from google.genai import types
from time import sleep
from collections import defaultdict
from rapidfuzz import fuzz
import re

In [9]:
DEBIT_PATH = "../data/rules/rules_debit.json"
CREDIT_PATH = "../data/rules/rules_credit.json"

# Store the rules
def save_rules(rules_dict, path):
    with open(path, "w", encoding="utf-8") as f:
        json.dump(rules_dict, f, indent=2, ensure_ascii=False)

# Read the rules
def load_rules(path):
    import os

    if not os.path.exists(path) or os.path.getsize(path) == 0:
        print("⚠️ Rules file missing or empty. Starting fresh.")
        return {}

    try:
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)
    except json.JSONDecodeError:
        print("❌ Invalid JSON format in rules file. Starting fresh.")
        return {}
    

def save_pending(credit_batch, debit_batch, path="pending_batches.json"):
    with open(path, "w", encoding="utf-8") as f:
        json.dump({
            "credit_batch": credit_batch,
            "debit_batch": debit_batch
        }, f, ensure_ascii=False, indent=2)

def load_pending(path="pending_batches.json"):
    try:
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
            return data.get("credit_batch", []), data.get("debit_batch", [])
    except FileNotFoundError:
        return [], []



In [10]:
# Step 2 - Normalize Texts
def normalize_keyword(name: str) -> str:
    # Turn to lowercase, remove special character and extra white space
    name_clean = name.lower()
    name_clean = re.sub(r"[^a-z0-9\s]", "", name_clean)  # preserve alphbat, number and some white space
    name_clean = re.sub(r"\s+", " ", name_clean)  # remove additional space
    return name_clean.strip()

# Step 3 - Fuzzy Match

def fuzzy_match_transaction(name_normalized, rules_dict, threshold=85):
    best_match = None
    best_score = 0
    best_keyword = None
    matched_entry = None

    for keyword, entries in rules_dict.items():
        for entry in entries:
            original_names = entry["Original Name"]
            original_names = original_names if isinstance(original_names, list) else [original_names]

            for original_name in original_names:
                name_clean = normalize_keyword(original_name)
                score = fuzz.partial_ratio(name_normalized, name_clean)

                if score > best_score:
                    best_score = score
                    best_match = original_name
                    best_keyword = keyword
                    matched_entry = entry

    if best_score >= threshold:
        return {
            "matched_keyword": best_keyword,
            "matched_original_name": best_match,
            "score": best_score,
            "matched_entry": matched_entry
        }
    else:
        return None

    

# Step 4 Assign a cateogry
def assign_rule_to_row(df, idx, match_result, rules_dict):
    keyword = match_result["matched_keyword"]
    matched_name = match_result["matched_original_name"]
    name_norm = normalize_keyword(df.loc[idx, "Name"])

    

    entries = rules_dict.get(keyword, [])
    selected_entry = None

    for entry in entries:
        cities = entry["City"] if isinstance(entry["City"], list) else [entry["City"]]
        for city in cities:
            if city and city.lower() in name_norm:
                selected_entry = entry
                break
        if selected_entry:
            break

    # fallback: first entry
    if not selected_entry and entries:
        selected_entry = entries[0]

    if selected_entry:
        df.loc[idx, "Category"] = selected_entry["Category"]
        df.loc[idx, "Subcategory"] = selected_entry.get("Subcategory")
        df.loc[idx, "Keyword"] = keyword
        df.loc[idx, "Match Type"] = "Fuzzy"


In [11]:
def add_to_keyword_rules(target, response_dict, rules_dict):
    for keyword in response_dict["Keyword"]:
        key = keyword.strip().lower()
        new_entry = {
            "Category": response_dict["Category"],
            "Subcategory": response_dict.get("Subcategory"),
            "Country": [response_dict["Country"]],
            "City": [response_dict["City"]],
            "Original Name": [target],
            "Flags":{
                "lock_location": False,
                "manual_check": False
            }
        }

        if key not in rules_dict:
            rules_dict[key] = [new_entry]
            print(f"✅ Added new keyword '{key}' with first entry.")
        else:
            merged = False
            for entry in rules_dict[key]:
                if (
                    entry["Category"] == new_entry["Category"] and
                    entry["Subcategory"] == new_entry["Subcategory"]
                ):
                    flags = entry.get("Flags", {})

                    if not flags.get("manual_check", False):
                        if target not in entry["Original Name"]:
                            entry["Original Name"].append(target)

                    if not flags.get("lock_location", False):
                        if response_dict["Country"] not in entry["Country"]:
                            entry["Country"].append(response_dict["Country"])
                        if response_dict["City"] not in entry["City"]:
                            entry["City"].append(response_dict["City"])

                    print(f"🔁 Merged into existing keyword entry: {key}")
                    merged = True
                    break

            if not merged:
                rules_dict[key].append(new_entry)
                print(f"➕ Added new variant entry under keyword: {key}")

def merge_entries(entries):
    grouped = defaultdict(lambda: {
        "Original Name": [],
        "Category": None,
        "Subcategory": None,
        "Country": [],
        "City": []
    })

    for entry in entries:
        key = (
            entry["Category"],
            entry["Subcategory"]
        )

        group = grouped[key]

        # Set static fields
        group["Category"] = entry["Category"]
        group["Subcategory"] = entry["Subcategory"]

        # Original Name
        orig_name = entry["Original Name"]
        if orig_name not in group["Original Name"]:
            group["Original Name"].append(orig_name)

        # Country
        country = entry["Country"]
        if country not in group["Country"]:
            group["Country"].append(country)

        # City
        city = entry["City"]
        if city not in group["City"]:
            group["City"].append(city)

    return list(grouped.values())




In [12]:
rules_debit = load_rules("../data/rules/rules_debit.json")
rules_credit = load_rules("../data/rules/rules_credit.json")
credit_batch, debit_batch = load_pending()

In [13]:
def gemini_api(transaction_list, current_API, expense_categories, keyword_rules, is_credit=True):
    client = genai.Client(api_key=current_API)
    credit_note = (
        "These transactions are **credits** (money received). "
        "Classify them carefully — common categories include salary, refunds, reimbursements, and transfers."
        if is_credit else
        "These transactions are **debits** (money spent). "
        "Classify them according to your schema — avoid assigning refund or income categories."
    )

    prompt = f"""
You are an expert in payment services and transaction classification.

Here is my classification schema:
{expense_categories}

If a transaction contains the word "Tikkie", label it as "Miscellaneous".

{credit_note}

Please classify the following transactions. For each transaction:

1. Determine the category and subcategory based only on my schema (do not invent new words).
2. Identify the most likely country and city this business is located in (no abbreviations).
3. Provide one most representative keyword (e.g., brand name, but no city/country or invented names).

Return your answer in strict JSON format like the following:

{{
  "Transaction Name": {{
    "Category": "...",
    "Subcategory": "...",
    "Country": "...",
    "City": "...",
    "Keyword": ["..."]
  }},
  ...
}}

Transaction list:
{json.dumps(transaction_list, indent=2)}
"""

    response = client.models.generate_content(
        model="gemini-2.0-flash",
        contents=prompt,
        config=types.GenerateContentConfig(
            max_output_tokens=2000,
            temperature=0.1
        )
    )

    cleaned_text = response.text.strip().removeprefix("```json").removesuffix("```").strip()

    try:
        data = json.loads(cleaned_text)
    except json.JSONDecodeError:
        print("JSON parsing failed. Raw output:")
        print(cleaned_text)
        return {}

    for transaction, details in data.items():
        add_to_keyword_rules(transaction, details, keyword_rules)

    if is_credit:
        save_rules(keyword_rules, CREDIT_PATH)
    else:
        save_rules(keyword_rules, DEBIT_PATH)

    return data

In [14]:
# Read the data
df_previous = pd.read_csv("../data/processed/cleaned_transactions_previous.csv")
df_ing = pd.read_csv("../data/raw/ing_bank_statement_sample_2.csv")
df_revolut = pd.read_csv("../data/raw/revolut_bank_statement_sample_2.csv")


# Clean the data
## ING
""" Column information

    'Date',                -> Year/ Month/ Day
    'Name / Description',  -> Name
    'Account',             -> Remove
    'Counterparty',        -> Remove
    'Code',                -> Remove
    'Debit/credit',        -> Debit/Credit -> Positive/Negative
    'Amount (EUR)',        -> Keep
    'Transaction type',    -> Remove
    'Notifications'        -> Remove
"""
col_for_ING = ["Date", "Name / Description", "Debit/credit", "Amount (EUR)"]
col_rename_dict_ING = {
    "Name / Description": "Name",
    "Debit/credit": "IsCredit",
    "Amount (EUR)": "Amount"
}

# Extract only the needed columns, and rename them so that it can be merged with other dataframe
df_ing = (
    df_ing[col_for_ING]
    .rename(columns=col_rename_dict_ING)
)

## Revlout
""" Column information

    'Type',           -> Remove
    'Product',        -> Remove
    'Started Date',   -> Remove
    'Completed Date', -> Remove
    'Description',    -> Name
    'Amount',         -> Keep
    'Fee',            -> Remove
    'Currency',       -> Remove (So far only Euro)
    'State',          -> Remove
    'Balance'         -> Remove
"""


col_for_revolut = ["Description", "Amount", "Completed Date"]
col_rename_dict_Revlout = {
    "Description": "Name",
    "Completed Date": "Date"
}

df_revolut = (
    df_revolut[col_for_revolut]
    .rename(columns=col_rename_dict_Revlout)
)

## ING, Revlout (Check if the columns matched)

# Transform the data

## ING
### Date -> Year/ Month/ Day
df_ing["Date"] = pd.to_datetime(df_ing["Date"], format="%Y%m%d")
df_ing["Year"] = df_ing["Date"].dt.year
df_ing["Month"] = df_ing["Date"].dt.month
df_ing["Day"] = df_ing["Date"].dt.day
### Amount -> 100,00 => 100.00
df_ing["Amount"] = df_ing["Amount"].apply(
    lambda row: float(row.replace(",", ".")))
### Amount + isCredit -> Amount (Positive/Negative)
df_ing["Amount"] = df_ing.apply(
    lambda row: -row["Amount"] if row["IsCredit"] == "Debit" else row["Amount"],
    axis=1
)

### Create Source column -> Indiciate where the data is from
df_ing["Source"] = "ING"

## Revlout

### Date -> Year/ Month/ Day
df_revolut["Date"] = pd.to_datetime(df_revolut["Date"])
df_revolut["Year"] = df_revolut["Date"].dt.year.astype("Int32") # Int 32 allows  Na
df_revolut["Month"] = df_revolut["Date"].dt.month.astype("Int32")
df_revolut["Day"] = df_revolut["Date"].dt.day.astype("Int32")

### Create IsCredit column
df_revolut["IsCredit"] = df_revolut.apply(
    lambda row: "Debit" if row["Amount"] < 0 else "Credit",
    axis=1
    )

### Create Source column -> Indiciate where the data is from
df_revolut["Source"] = "Revolut"


# Reorder columns 
df_ing = df_ing[["Year", "Month", "Day", "Name", "Amount", "IsCredit", "Source"]]
df_revolut = df_revolut[["Year", "Month", "Day", "Name", "Amount", "IsCredit", "Source"]]


# Merge two data sources
df = pd.concat([df_previous, df_ing, df_revolut], axis=0)
df = df.sort_values(by=["Year", "Month", "Day"], ascending=[False, True, True]).reset_index(drop=True)

# Add new column
df["Normalized Name"] = df["Name"].apply(normalize_keyword)


df[df["Name"] == "Camelot Vastgoedbeheer BV"]

# Init Columns

df["Category"] = None
df["Subcategory"] = None
df["Keyword"] = None
df["Match Type"] = None


# Init Pending List
credit_batch = []
debit_batch = []


# # Classification
for idx, row in df.iterrows():
    is_credit = row["IsCredit"] == "Credit"
    name_norm = normalize_keyword(row["Name"])
    
    if is_credit:
        match = fuzzy_match_transaction(name_norm, rules_credit)
        if match:
            assign_rule_to_row(df, idx, match, rules_credit)
        else:
            credit_batch.append(idx)
            df.loc[idx, "Match Type"] = "Pending"
    else:
        match = fuzzy_match_transaction(name_norm, rules_debit)
        if match:
            assign_rule_to_row(df, idx, match, rules_debit)
        else:
            debit_batch.append(idx)
            df.loc[idx, "Match Type"] = "Pending"

    # Trigger Gemini if enough new items
    if len(credit_batch) >= 15:
        print("Hit the quota! ", credit_batch)
        batch = df.loc[credit_batch]["Name"].tolist()
        result = gemini_api(batch, API1, income_categories, rules_credit, is_credit=True)
        for name, info in result.items():
            add_to_keyword_rules(name, info, rules_credit)
        credit_batch.clear()
        save_pending(credit_batch, debit_batch)
        print("Clear!: ", credit_batch)
        sleep(10)

    if len(debit_batch) >= 15:
        print("Hit the quota! ", debit_batch)
        batch = df.loc[debit_batch]["Name"].tolist()
        result = gemini_api(batch, API1, expense_categories, rules_debit, is_credit=False)
        for name, info in result.items():
            add_to_keyword_rules(name, info, rules_debit)
        debit_batch.clear()
        save_pending(credit_batch, debit_batch)
        print("Clear!: ", debit_batch)
        sleep(10)
        
if credit_batch:
    print("🔁 Processing remaining credit batch:", credit_batch)
    batch = df.loc[credit_batch]["Name"].tolist()
    result = gemini_api(batch, API1, income_categories, rules_credit, is_credit=True)
    for name, info in result.items():
        add_to_keyword_rules(name, info, rules_credit)
    credit_batch.clear()
    save_pending(credit_batch, debit_batch)
    print("✅ Credit batch cleared.")

if debit_batch:
    print("🔁 Processing remaining debit batch:", debit_batch)
    batch = df.loc[debit_batch]["Name"].tolist()
    result = gemini_api(batch, API1, expense_categories, rules_debit, is_credit=False)
    for name, info in result.items():
        add_to_keyword_rules(name, info, rules_debit)
    debit_batch.clear()
    save_pending(credit_batch, debit_batch)
    print("✅ Debit batch cleared.")

# Change the Type
df[["Year", "Month", "Day"]] = df[["Year", "Month", "Day"]].astype("Int64")

# Output file
df.to_csv("../data/processed/cleaned_transactions.csv")


🔁 Processing remaining debit batch: [150, 174, 420, 424, 480, 538, 587, 617, 645, 1102, 1291, 1358, 1405]
🔁 Merged into existing keyword entry: ov
🔁 Merged into existing keyword entry: ov
🔁 Merged into existing keyword entry: cili pizza
🔁 Merged into existing keyword entry: kruidvat
🔁 Merged into existing keyword entry: ov
🔁 Merged into existing keyword entry: jumbo
🔁 Merged into existing keyword entry: ov
🔁 Merged into existing keyword entry: olympos
🔁 Merged into existing keyword entry: ov
🔁 Merged into existing keyword entry: ov
🔁 Merged into existing keyword entry: cili pizza
🔁 Merged into existing keyword entry: kruidvat
🔁 Merged into existing keyword entry: ov
🔁 Merged into existing keyword entry: jumbo
🔁 Merged into existing keyword entry: ov
🔁 Merged into existing keyword entry: olympos
✅ Debit batch cleared.
