Test NB

In [2]:
import asyncio
import json
from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent
from autogen_agentchat.messages import TextMessage
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination, MaxMessageTermination
from agents.prompts.unstructured_text_parser_message import UNSTRUCTURED_TEXT_PARSER_SYSTEM_MESSAGE
from typing import List, Dict
import pandas as pd
import json

In [3]:
def parse_csv_file(file_path: str) -> str:
    """
    Reads a CSV bank statement, attempts to identify common transaction columns,
    and returns the data as a JSON string.

    This tool is designed to handle CSV files with common column names like
    'Date', 'Transaction', 'Description', 'Amount', 'Credit', 'Debit'.

    Args:
        file_path (str): The local path to the CSV file.

    Returns:
        str: A JSON string representing a list of transaction dictionaries.
             Returns an error message string if parsing fails.
    """
    try:
        df = pd.read_csv(file_path)
        
        # --- Column Name Identification Logic ---
        # Standardize column names to lowercase for easier matching
        df.columns = [col.lower() for col in df.columns]

        # Define potential aliases for our target columns
        date_aliases = ['date', 'transaction date']
        desc_aliases = ['description', 'transaction', 'details']
        amount_aliases = ['amount', 'debit', 'credit']

        # Find the actual column names in the DataFrame
        date_col = next((col for col in df.columns if col in date_aliases), None)
        desc_col = next((col for col in df.columns if col in desc_aliases), None)
        
        # For amount, we might have separate debit/credit columns
        debit_col = next((col for col in df.columns if col == 'debit'), None)
        credit_col = next((col for col in df.columns if col == 'credit'), None)
        amount_col = next((col for col in df.columns if col == 'amount'), None)

        if not date_col or not desc_col:
            return json.dumps({"error": "Could not automatically identify date or description columns."})

        # --- Data Extraction and Formatting ---
        transactions = []
        for index, row in df.iterrows():
            transaction = {
                "date": row[date_col],
                "description": row[desc_col],
                "amount": None
            }

            # Handle different amount representations
            if amount_col:
                transaction["amount"] = row[amount_col]
            elif debit_col and credit_col:
                # Combine debit/credit into a single amount column
                # Debits are negative, credits are positive
                debit = pd.to_numeric(row[debit_col], errors='coerce') or 0
                credit = pd.to_numeric(row[credit_col], errors='coerce') or 0
                transaction["amount"] = credit - debit
            
            if transaction["amount"] is not None:
                transactions.append(transaction)

        return json.dumps(transactions, indent=2)

    except Exception as e:
        return json.dumps({"error": f"Failed to parse CSV file: {str(e)}"})



In [4]:
# Enhanced extract_text_from_pdf function that returns JSON/dict
import pypdf
import json

def extract_text_from_pdf_enhanced(file_path: str) -> dict:
    """
    Enhanced version that extracts text from PDF and returns structured data.
    
    Args:
        file_path (str): The local path to the PDF file.
    
    Returns:
        dict: A dictionary containing extracted text and metadata.
    """
    try:
        reader = pypdf.PdfReader(file_path)
        
        # Extract text from all pages
        full_text = ""
        pages_data = []
        
        for page_num, page in enumerate(reader.pages):
            page_text = page.extract_text()
            full_text += page_text + "\n--- End of Page ---\n"
            
            pages_data.append({
                "page_number": page_num + 1,
                "text": page_text,
                "text_length": len(page_text)
            })
        
        # Create structured response
        result = {
            "success": True,
            "file_path": file_path,
            "total_pages": len(reader.pages),
            "total_text_length": len(full_text),
            "full_text": full_text,
            "pages": pages_data,
            "metadata": {
                "extraction_method": "pypdf",
                "file_size_bytes": len(reader.stream.read()) if hasattr(reader, 'stream') else None
            }
        }
        
        return result
        
    except Exception as e:
        return {
            "success": False,
            "error": str(e),
            "file_path": file_path
        }

# Extract text from PDF with enhanced function
pdf_data = extract_text_from_pdf_enhanced('temp/test_statement.pdf')

# Print summary
if pdf_data["success"]:
    print(f"Successfully extracted text from {pdf_data['total_pages']} pages")
    print(f"Total text length: {pdf_data['total_text_length']} characters")
    print(f"File path: {pdf_data['file_path']}")
    print("\nFirst 200 characters of extracted text:")
    print(pdf_data['full_text'][:200])
    
    # Store the full text for later use
    pdf_text = pdf_data['full_text']
else:
    print(f"Error extracting text: {pdf_data['error']}")
    pdf_text = ""

In [6]:
# Now parse the unstructured text using the AI agents
import asyncio
from tools.parse_unstructured_text import parse_unstructured_text
from models.openai_model_client import OpenAIAPIClient
from config.docker_util import DockerCodeExecutor

# Initialize components
model_client = OpenAIAPIClient()
code_executor = DockerCodeExecutor()

# Parse the PDF text
async def parse_pdf():
    result = await parse_unstructured_text(pdf_text, model_client, code_executor)
    return result

# Run the parsing
parsed_result = await parse_pdf()

TypeError: 'module' object is not callable

In [None]:
# Method 1: Print the parsing result
print(parsed_result)

In [None]:
# Method 2: Parse and pretty print the JSON result
try:
    parsed_data = json.loads(parsed_result)
    print("Parsed JSON data:")
    print(json.dumps(parsed_data, indent=2))
    
    # Show summary if it's a list of transactions
    if isinstance(parsed_data, list):
        print(f"\nFound {len(parsed_data)} transactions")
        if len(parsed_data) > 0:
            print("Sample transaction:")
            print(json.dumps(parsed_data[0], indent=2))
except json.JSONDecodeError as e:
    print(f"Error parsing JSON: {e}")
    print("Raw result:")
    print(parsed_result)

In [None]:
# Working with the JSON/dict output
print("Full PDF data structure:")
print(json.dumps(pdf_data, indent=2, default=str))

print("\n" + "="*50)
print("Accessing specific data:")
print(f"Success: {pdf_data.get('success')}")
print(f"Total pages: {pdf_data.get('total_pages')}")
print(f"Text length: {pdf_data.get('total_text_length')}")

# Show page-by-page breakdown
if pdf_data.get('pages'):
    print("\nPage breakdown:")
    for page in pdf_data['pages']:
        print(f"Page {page['page_number']}: {page['text_length']} characters")
        print(f"  Preview: {page['text'][:100]}...")
        print()

In [None]:
# Convert dict to JSON string if needed
pdf_data_json = json.dumps(pdf_data, indent=2, default=str)
print("PDF data as JSON string:")
print(pdf_data_json)

# You can also save it to a file
# with open('pdf_extraction_result.json', 'w') as f:
#     json.dump(pdf_data, f, indent=2, default=str)

# Or get just the text content as JSON
text_only_data = {
    "text": pdf_data.get('full_text', ''),
    "metadata": {
        "pages": pdf_data.get('total_pages', 0),
        "length": pdf_data.get('total_text_length', 0)
    }
}
text_json = json.dumps(text_only_data, indent=2)
print("\nText-only JSON:")
print(text_json)