# Multi-Agent Financial Crime Investigation System

This notebook implements a multi-agent system for investigating financial crime cases using the `gemma3:4b` model via Ollama. The system consists of specialized agents that analyze normalized case data, an aggregator to reach consensus, and an adversarial critic for quality control.

In [None]:
import json
import pandas as pd
import requests
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime

## 1. Data Models and Normalizer

We define structured models to ensure consistency across all components.

In [None]:
@dataclass
class Transaction:
    transaction_id: str
    amount: float
    currency: str
    timestamp: str
    counterparty: str
    location: str

@dataclass
class CustomerProfile:
    customer_id: str
    risk_rating: str
    onboarding_date: str
    occupation: str
    expected_monthly_volume: float

@dataclass
class NormalizedCaseData:
    case_id: str
    customer: CustomerProfile
    transactions: List[Transaction]
    alert_type: str
    summary: str

def normalize_input(raw_data: Dict[str, Any]) -> NormalizedCaseData:
    """Standardizes disparate input data into a consistent format."""
    customer_data = raw_data.get('customer', {})
    customer = CustomerProfile(
        customer_id=customer_data.get('id', 'N/A'),
        risk_rating=customer_data.get('risk_level', 'Medium'),
        onboarding_date=customer_data.get('onboarded', 'N/A'),
        occupation=customer_data.get('job', 'Unknown'),
        expected_monthly_volume=float(customer_data.get('expected_vol', 0))
    )
    
    transactions = []
    for tx in raw_data.get('transactions', []):
        transactions.append(Transaction(
            transaction_id=tx.get('tx_id', 'N/A'),
            amount=float(tx.get('amt', 0)),
            currency=tx.get('ccy', 'USD'),
            timestamp=tx.get('time', 'N/A'),
            counterparty=tx.get('to', 'N/A'),
            location=tx.get('loc', 'N/A')
        ))
    
    return NormalizedCaseData(
        case_id=raw_data.get('case_id', 'CASE-' + datetime.now().strftime('%Y%m%d%H%M%S')),
        customer=customer,
        transactions=transactions,
        alert_type=raw_data.get('alert_type', 'General Suspicious Activity'),
        summary=raw_data.get('description', 'No summary provided')
    )

## 2. Ollama Client

A simple wrapper to communicate with the local `gemma3:4b` model.

In [None]:
class OllamaClient:
    def __init__(self, model="gemma3:4b", base_url="http://localhost:11434/api/generate"):
        self.model = model
        self.base_url = base_url

    def query(self, prompt: str, system_prompt: Optional[str] = None) -> str:
        payload = {
            "model": self.model,
            "prompt": prompt,
            "system": system_prompt or "You are a financial crime investigator.",
            "stream": False
        }
        try:
            response = requests.post(self.base_url, json=payload)
            response.raise_for_status()
            return response.json().get('response', 'No response content')
        except Exception as e:
            return f"Error querying Ollama: {str(e)}"

client = OllamaClient()

## 3. Peer Panel Agents

Each agent has a distinct persona and focus area. They use Chain-of-Thought reasoning to arrive at a verdict.

In [None]:
class BaseAgent:
    def __init__(self, name: str, persona: str, client: OllamaClient):
        self.name = name
        self.persona = persona
        self.client = client

    def reason(self, case: NormalizedCaseData) -> Dict[str, Any]:
        prompt = f"""
        Case Data:
        {json.dumps(asdict(case), indent=2)}

        As a {self.name} specializing in {self.persona}, analyze the following case and provide:
        1. Rationale: Your step-by-step reasoning.
        2. Risk Score: A score from 0 to 100.
        3. Recommendation: 'CLEARED' or 'SUSPICIOUS'.

        Return ONLY a JSON object with keys: 'rationale', 'risk_score', 'recommendation'.
        """
        response = self.client.query(prompt, system_prompt=f"You are a {self.name} agent. Persona: {self.persona}")
        try:
            # Basic cleaning to extract JSON if needed
            json_start = response.find('{')
            json_end = response.rfind('}') + 1
            return json.loads(response[json_start:json_end])
        except:
            return {"rationale": f"Failed to parse response: {response}", "risk_score": 50, "recommendation": "SUSPICIOUS"}

agents = [
    BaseAgent("Conservative Compliance", "Strict AML/KYC guidelines and regulatory rulebooks.", client),
    BaseAgent("Risk Appetite", "Bank's financial risk exposure and threshold for high-value transactions.", client),
    BaseAgent("Customer Context", "Historical behavior, KYC history, and customer profile consistency.", client),
    BaseAgent("Legal/Regulatory", "Jurisdictional laws, SAR filing triggers, and legal precedence.", client)
]

## 4. Aggregator and Adversarial Critic

The aggregator computes the final decision, and the critic reviews the panel's work.

In [None]:
class Aggregator:
    def aggregate(self, responses: List[Dict[str, Any]]) -> Dict[str, Any]:
        scores = [r.get('risk_score', 0) for r in responses]
        recommendations = [r.get('recommendation', 'SUSPICIOUS') for r in responses]
        
        final_score = sum(scores) / len(scores)
        # Simple majority vote
        final_rec = max(set(recommendations), key=recommendations.count)
        
        return {
            "final_risk_score": final_score,
            "final_recommendation": final_rec,
            "consensus_level": recommendations.count(final_rec) / len(recommendations)
        }

class AdversarialCritic:
    def __init__(self, client: OllamaClient):
        self.client = client

    def evaluate(self, case: NormalizedCaseData, agent_responses: List[Dict[str, Any]], aggregated: Dict[str, Any]) -> str:
        prompt = f"""
        Case Data: {json.dumps(asdict(case))}
        Agent Responses: {json.dumps(agent_responses)}
        Aggregated Decision: {json.dumps(aggregated)}

        As an Adversarial Critic, evaluate the panel's disagreement and the quality of their rationales.
        If you find flaws or contradictions, highlight them. If the consensus is weak, recommend further manual review.
        """
        return self.client.query(prompt, system_prompt="You are an Adversarial Critic for financial crime investigations.")

## 5. Orchestration and Execution

Let's simulate a case and run the investigation pipeline.

In [None]:
def run_investigation(raw_case: Dict[str, Any]):
    print(f"--- Starting Investigation for Case: {raw_case.get('case_id', 'New Case')} ---")
    normalized_case = normalize_input(raw_case)
    
    agent_responses = []
    for agent in agents:
        print(f"Consulting {agent.name}...")
        res = agent.reason(normalized_case)
        agent_responses.append({"agent": agent.name, **res})
    
    aggregator = Aggregator()
    summary = aggregator.aggregate(agent_responses)
    
    critic = AdversarialCritic(client)
    critique = critic.evaluate(normalized_case, agent_responses, summary)
    
    print("\n--- Investigation Summary ---")
    print(f"Final Recommendation: {summary['final_recommendation']}")
    print(f"Final Risk Score: {summary['final_risk_score']}")
    print(f"Consensus Level: {summary['consensus_level'] * 100:.1f}%")
    
    print("\n--- Critic's Evaluation ---")
    print(critique)
    
    return {
        "timestamp": datetime.now().isoformat(),
        "case": asdict(normalized_case),
        "agent_outputs": agent_responses,
        "aggregation": summary,
        "critique": critique
    }

# Sample Case: High-value transaction to high-risk jurisdiction
sample_case = {
    "case_id": "CASE-999",
    "alert_type": "Rapid Movement of Funds",
    "description": "Customer transferred $45,000 to a newly opened account in a high-risk jurisdiction.",
    "customer": {
        "id": "C123",
        "risk_level": "Low",
        "job": "Engineer",
        "expected_vol": 5000
    },
    "transactions": [
        {"tx_id": "TX1", "amt": 45000, "ccy": "USD", "time": "2025-12-29 10:00", "to": "Account X - Offshore", "loc": "Cayman Islands"}
    ]
}

audit_log = []
result = run_investigation(sample_case)
audit_log.append(result)