In [2]:
import pandas as pd
from toolkit.language_models.model_connection import ChatModelsSetup
from langchain_core.prompts import PromptTemplate
from pydantic import BaseModel, TypeAdapter
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional, List
from langchain_core.messages import HumanMessage, AIMessage
from toolkit.language_models.model_connection import ChatModelsSetup
from langchain_core.prompts import PromptTemplate
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial
from toolkit.workspace_admin.data_serialization import write_model_list_to_file
import os
from toolkit.workspace_admin.data_serialization import write_model_list_to_file
from concurrent.futures import ThreadPoolExecutor
from token_cost_calc import calculate_total_prompt_cost, ModelName

In [3]:

source_transactions = "C:\\users\\Sean Jackman\\downloads\\transactions_to_be_processed.csv"
source_categories = "C:\\users\\Sean Jackman\\downloads\\categories.csv"
output_file_path = "C:\\users\\Sean Jackman\\downloads\\processed_transactions.json"


def read_transactions_csv(filepath: str) -> pd.DataFrame:
    expected_columns = [
        "Date",
        "Description",
        "Category",
        "Amount",
        "Labels",
        "Notes",
        "Account",
        "Account #",
        "Institution",
        "Month",
        "Week",
        "Transaction ID",
        "Account ID",
        "Check Number",
        "Full Description",
        "Date Added"
    ]

    df = pd.read_csv(
        filepath,
        parse_dates=["Date", "Date Added"],
        dtype={
            "Account #": str,
            "Transaction ID": str,
            "Account ID": str,
            "Check Number": str
        }
    )

    # Ensure all expected columns exist
    for col in expected_columns:
        if col not in df.columns:
            df[col] = None

    return df[expected_columns]


def read_categories_csv(filepath: str) -> pd.DataFrame:
    expected_columns = [
        "Category",
        "Group",
        "Type"
    ]

    df = pd.read_csv(
        filepath,
    )

    # Ensure all expected columns exist
    for col in expected_columns:
        if col not in df.columns:
            df[col] = None

    return df[expected_columns]


transactions_df = read_transactions_csv(source_transactions)
categories_df = read_categories_csv(source_categories)
print(categories_df)

                  Category             Group      Type
0         Auto & Transport  Auto & Transport   Expense
1                  Parking  Auto & Transport   Expense
2    Public Transportation  Auto & Transport   Expense
3          Service & Parts  Auto & Transport   Expense
4            Purchase Fees  Auto & Transport   Expense
..                     ...               ...       ...
137    Credit Card Payment          Transfer  Transfer
138           Kids Clothes              Kids   Expense
139         Kids Furniture              Kids   Expense
140        Kids Activities              Kids   Expense
141        Parenting Stuff              Kids   Expense

[142 rows x 3 columns]


In [4]:
class Transaction(BaseModel):
    date: datetime = Field(alias="Date")
    description: str = Field(alias="Description")
    category: Optional[str] = Field(alias="Category", default=None)
    amount: float = Field(alias="Amount")
    labels: Optional[str] = Field(alias="Labels", default=None)
    notes: Optional[str] = Field(alias="Notes", default=None)
    account: Optional[str] = Field(alias="Account", default=None)
    account_number: Optional[str] = Field(alias="Account #", default=None)
    institution: Optional[str] = Field(alias="Institution", default=None)
    month: Optional[str] = Field(alias="Month", default=None)
    week: Optional[str] = Field(alias="Week", default=None)
    transaction_id: Optional[str] = Field(alias="Transaction ID", default=None)
    account_id: Optional[str] = Field(alias="Account ID", default=None)
    check_number: Optional[str] = Field(alias="Check Number", default=None)
    full_description: Optional[str] = Field(alias="Full Description", default=None)
    date_added: Optional[datetime] = Field(alias="Date Added", default=None)


class Category(BaseModel):
    category: str = Field(alias="Category")
    group: Optional[str] = Field(alias="Group", default=None)
    type: Optional[str] = Field(alias="Type", default=None)


def clean_amount(amount):
    if pd.isna(amount):
        return None
    if isinstance(amount, str):
        # Remove currency symbol and commas
        cleaned = amount.replace('$', '').replace(',', '').strip()
        return float(cleaned)
    return float(amount)


def convert_df_to_transactions(df: pd.DataFrame) -> List[Transaction]:
    df = df.copy()

    # Clean amount column first
    df['Amount'] = df['Amount'].apply(clean_amount)

    # Clean all optional columns
    columns_to_clean = [
        'Category', 'Labels', 'Notes', 'Check Number',
        'Account', 'Account #', 'Institution', 'Month',
        'Week', 'Transaction ID', 'Account ID', 'Full Description'
    ]
    for col in columns_to_clean:
        df[col] = df[col].where(pd.notna(df[col]), None)

    transactions = []
    for index, row in df.iterrows():
        try:
            transaction = Transaction(**row.to_dict())
            transactions.append(transaction)
        except Exception as e:
            raise ValueError(f"Row {index} failed validation: {str(e)}\nData: {row.to_dict()}")

    return transactions


def convert_df_to_categories(df: pd.DataFrame) -> List[Category]:
    df = df.copy()

    # Clean optional columns
    columns_to_clean = ['Group', 'Type']
    for col in columns_to_clean:
        df[col] = df[col].where(pd.notna(df[col]), None)

    categories = []
    for index, row in df.iterrows():
        try:
            category = Category(**row.to_dict())
            categories.append(category)
        except Exception as e:
            raise ValueError(f"Row {index} failed validation: {str(e)}\nData: {row.to_dict()}")

    return categories


transactions = convert_df_to_transactions(transactions_df)
categories = convert_df_to_categories(categories_df)

categorized_transactions = [transaction for transaction in transactions if transaction.category]
uncategorized_transactions = [transaction for transaction in transactions if not transaction.category]

print(f"Uncategorized Transaction Total: {len(uncategorized_transactions)}")


Uncategorized Transaction Total: 86


In [5]:
analysis_template = """
Human: You are an expert personal finance assistant. 
You will help me categorize my spending and income transactions.

Here are my categories:
{categories}

Here are examples of my past transactions and how I categorized them
{examples} 

Here is the transaction I need you to categorize:
{transaction}

Think step by step and explain your reasoning for what category the transaction should fall into. If you are not confident what category the transaction should fall into, you should assign the category as "Unknown". It is very important that you recognize when you are not confident, as it better to assign Unknown than to guess and risk being wrong too often as it would degrade the quality of my financial data.
Assistant:
"""

# Analysis Prompt Step

In [6]:
prompt_template = PromptTemplate.from_template(analysis_template)

formatted_prompt = prompt_template.format(
    categories=TypeAdapter(List[Category]).dump_python(categories),
    examples=TypeAdapter(List[Transaction]).dump_python(categorized_transactions[:200]),
    transaction=uncategorized_transactions[1].model_dump(),
)
# print(formatted_prompt)

chat_models = ChatModelsSetup()
analysis_response = chat_models.claude_35_haiku_chat.invoke(formatted_prompt)

total_cost = calculate_total_prompt_cost(
    analysis_response.response_metadata["usage"]["prompt_tokens"],
    analysis_response.response_metadata["usage"]["completion_tokens"],
    ModelName.SONNET_3_5
)
print(f"Total Cost: ${total_cost}")

print(analysis_response.content)


Total Cost: $0.19763526000000003
Let's categorize this transaction step by step:

1. Description Analysis:
- The transaction is from Amazon
- The description is "Amazon www.amazon.caon"
- The amount is -$25.82 (a purchase)

2. Past Transaction Context:
Looking at the previous transactions, I notice several patterns for Amazon purchases:
- Amazon purchases are often categorized based on the type of item being purchased
- Common Amazon purchase categories in the past include:
  - Teaching Supplies
  - Home Stuff
  - Books
  - Office Supplies
  - Parenting Stuff
  - Clothing
  - Kids Furniture

3. Limitations:
- Without additional context about what was purchased, I cannot definitively determine the specific category
- The amount ($25.82) could potentially fit multiple categories

4. Decision Process:
- Given the lack of specific information about the purchase
- And to avoid incorrectly categorizing the transaction
- I will assign the category as "Unknown"

Recommendation: Category = "Unk

# Categorization Prompt Step

In [9]:
class AssignedCategory(BaseModel):
    transaction_id: str
    description: str
    category: str


serialize_categories_template = """
You will format the assigned categories into a JSON object with a specific schema. Make sure your output adheres to this schema exactly. 

### Transaction Details:
{transaction}

### Output JSON schema:
{json_structure}

Please output the JSON data with no additional text, preamble, separators, or extra characters.

**Assistant:**
"""

In [10]:
serialization_prompt_template = PromptTemplate.from_template(serialize_categories_template)

formatted_prompt = serialization_prompt_template.format(
    transaction=uncategorized_transactions[1],
    json_structure=AssignedCategory.model_json_schema()
)

prompt = [
    AIMessage(content=analysis_response.content),
    HumanMessage(content=formatted_prompt),
]

chat_models = ChatModelsSetup()
category_assignment_response = chat_models.claude_35_haiku_chat.invoke(prompt)
assigned_category = AssignedCategory.model_validate_json(category_assignment_response.content)
# print(assigned_category)

total_cost = calculate_total_prompt_cost(
    category_assignment_response.response_metadata["usage"]["prompt_tokens"],
    category_assignment_response.response_metadata["usage"]["completion_tokens"]
)
print(f"Total Cost: ${total_cost}")


Total Cost: $0.0008597440000000001


# Full Categorization Pipeline

In [42]:
analysis_prompt_template = PromptTemplate.from_template(analysis_template)
serialization_prompt_template = PromptTemplate.from_template(serialize_categories_template)
chat_models = ChatModelsSetup()

transactions = []
for transaction in uncategorized_transactions:
    formatted_analysis_prompt = analysis_prompt_template.format(
        categories=TypeAdapter(List[Category]).dump_python(categories),
        examples=TypeAdapter(List[Transaction]).dump_python(categorized_transactions[:500]),
        transaction=transaction.model_dump(),
    )

    analysis_response = chat_models.claude_35_v2_sonnet_chat.invoke(formatted_analysis_prompt)

    formatted_serialization_prompt = serialization_prompt_template.format(
        transaction=transaction,
        json_structure=AssignedCategory.model_json_schema()
    )

    serialization_prompt_with_analysis_response = [
        AIMessage(content=analysis_response.content),
        HumanMessage(content=formatted_serialization_prompt),
    ]

    transactions.append(
        AssignedCategory.model_validate_json(
            chat_models.claude_35_v2_sonnet_chat.invoke(serialization_prompt_with_analysis_response).content
        )
    )
    print(f"categorized {len(transactions)} transactions")

print(transactions)
write_model_list_to_file(os.getcwd(), transactions)

categorized 1 transactions
categorized 2 transactions
categorized 3 transactions
categorized 4 transactions
categorized 5 transactions


KeyboardInterrupt: 

# Parallel Categorization

In [17]:
def process_single_transaction(transaction, categories, categorized_transactions):
    chat_models = ChatModelsSetup()

    analysis_prompt_template = PromptTemplate.from_template(analysis_template)
    serialization_prompt_template = PromptTemplate.from_template(serialize_categories_template)

    formatted_analysis_prompt = analysis_prompt_template.format(
        categories=TypeAdapter(List[Category]).dump_python(categories),
        examples=TypeAdapter(List[Transaction]).dump_python(categorized_transactions[:500]),
        transaction=transaction.model_dump(),
    )

    analysis_response = chat_models.claude_35_v2_sonnet_chat.invoke(formatted_analysis_prompt)

    formatted_serialization_prompt = serialization_prompt_template.format(
        transaction=transaction,
        json_structure=AssignedCategory.model_json_schema()
    )

    serialization_prompt_with_analysis_response = [
        AIMessage(content=analysis_response.content),
        HumanMessage(content=formatted_serialization_prompt),
    ]

    return AssignedCategory.model_validate_json(
        chat_models.claude_35_v2_sonnet_chat.invoke(serialization_prompt_with_analysis_response).content
    )


def parallel_process_transactions(uncategorized_transactions, categories, categorized_transactions):
    worker = partial(
        process_single_transaction,
        categories=categories,
        categorized_transactions=categorized_transactions
    )

    processed_transactions = []
    total = len(uncategorized_transactions)

    with ThreadPoolExecutor(max_workers=5) as executor:
        future_to_transaction = {
            executor.submit(worker, transaction): transaction
            for transaction in uncategorized_transactions
        }

        for i, future in enumerate(as_completed(future_to_transaction), 1):
            try:
                result = future.result(timeout=300)
                print(f"Processed {i}/{total} transactions")
                processed_transactions.append(result)
            except Exception as e:
                print(f"Error processing transaction: {str(e)}")

    return processed_transactions


# Test single transaction
# test_result = process_single_transaction(
#     uncategorized_transactions[0], 
#     categories, 
#     categorized_transactions
# )
# print("Test successful:", test_result)

# Run the parallel processing
transactions = parallel_process_transactions(
    uncategorized_transactions,
    categories,
    categorized_transactions
)

print(transactions)

write_model_list_to_file(output_file_path, transactions)


Processed 1/86 transactions
Processed 2/86 transactions
Processed 3/86 transactions
Processed 4/86 transactions
Processed 5/86 transactions
Processed 6/86 transactions
Processed 7/86 transactions
Processed 8/86 transactions
Processed 9/86 transactions
Processed 10/86 transactions
Processed 11/86 transactions
Processed 12/86 transactions
Processed 13/86 transactions
Processed 14/86 transactions
Processed 15/86 transactions
Processed 16/86 transactions
Processed 17/86 transactions
Processed 18/86 transactions
Processed 19/86 transactions
Processed 20/86 transactions
Processed 21/86 transactions
Processed 22/86 transactions
Processed 23/86 transactions
Processed 24/86 transactions
Processed 25/86 transactions
Processed 26/86 transactions
Processed 27/86 transactions
Processed 28/86 transactions
Processed 29/86 transactions
Processed 30/86 transactions
Processed 31/86 transactions
Processed 32/86 transactions
Processed 33/86 transactions
Processed 34/86 transactions
Processed 35/86 transac

In [19]:
def write_transactions_to_csv(transactions: List[AssignedCategory], output_path: str):
    # Convert list of AssignedCategory objects to DataFrame
    df = pd.DataFrame(
        [
            {
                'transaction_id': t.transaction_id,
                'description': t.description,
                'category': t.category
            } for t in transactions
        ]
    )

    # Write to CSV
    df.to_csv(output_path, index=False)


# After parallel processing or regular processing
csv_output_path = "C:\\users\\Sean Jackman\\downloads\\processed_transactions.csv"
write_transactions_to_csv(transactions, csv_output_path)

