# AI-Powered Transaction Compliance Monitoring System with Document Ingestion

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/rstephen/GenAI-Showcase/blob/transaction_compliance/notebooks/agents/transaction_compliance_monitoring_system_with_document_ingestion.ipynb)

## Use Case Overview

In today's global financial ecosystem, institutions face the daunting challenge of ensuring every cross-border transaction complies with an increasingly complex web of international regulations. Manual compliance checks create bottlenecks, increase operational costs, and leave organizations vulnerable to costly violations and reputational damage.

In this use case, we are showcasing the foundation of a compliance monitoring system that leverages MongoDB's vector search capabilities, Voyage AI embedding models, and advanced LLMs to automate regulatory checks on financial transactions. This implementation demonstrates how to build a scalable transaction compliance checker with the following components:

### Core Components:
1. **Document Ingestion Pipeline**
   * PDF, DOC, DOCX, and structured text document processing
   * Automated metadata tagging based on document content
2. **Data Layer (Operational and Vector Database) (MongoDB Atlas)**
   * Storage for transaction data and regulatory policies with vector embeddings
   * Vector search index for semantic matching between transactions and applicable regulations
   * Checkpoint storage for LangGraph state management
   * Schema validation using Pydantic models
3. **NLP Processing Pipeline**
   * Text embedding generation via Voyage AI
   * Chunking strategies
4. **Compliance Assessment Engine**
   * ShieldGemma 9B model for transaction compliance evaluation against policies
   * Confidence scoring system for violation probability using softmax normalization
   * Threshold-based classification (Violation, Reporting Required, Compliant)
5. **Agent Orchestration Framework**
   * LangGraph-based workflow for agent coordination and state management
   * Tool-calling pattern for modular assessment capabilities
   * Asynchronous processing with MongoDB checkpointing

In [None]:
%pip install --quiet datasets pandas pymongo langchain_openai langchain_mongodb langchain_community langchain_core langgraph pypdf python-docx unstructured pydantic voyageai

In [2]:
import getpass
import os


# Function to securely get and set environment variables
def set_env_securely(var_name, prompt):
    value = getpass.getpass(prompt)
    os.environ[var_name] = value

## Setup Environment Variables

First, we need to set up our environment variables for connecting to MongoDB Atlas and various AI services. You'll need to provide your own API keys and connection strings.

In [None]:
import os
from datetime import datetime
from typing import Any, Dict, List, Optional, Union

import numpy as np
import pandas as pd

# Set your MongoDB Atlas connection string
set_env_securely("MONGODB_URI", "Enter your MongoDB Atlas connection string: ")

# Set your OpenAI API key for LangChain
set_env_securely("OPENAI_API_KEY", "Enter your OpenAI API key: ")

# Set your Voyage AI API key for embeddings
set_env_securely("VOYAGE_API_KEY", "Enter your Voyage AI API key: ")

# Database configuration
DB_NAME = "compliance_monitoring"
TRANSACTIONS_COLLECTION = "transactions"
REGULATIONS_COLLECTION = "regulations"
VECTOR_INDEX_NAME = "vector_index"
CHECKPOINTS_COLLECTION = "checkpoints"

## MongoDB Atlas Connection

Let's establish a connection to MongoDB Atlas and set up our collections.

In [None]:
# NEW!!!
from pymongo import MongoClient
from pymongo.server_api import ServerApi

# Create a new client and connect to the server
client = MongoClient(os.environ["MONGODB_URI"], server_api=ServerApi("1"))

# Send a ping to confirm a successful connection
try:
    client.admin.command("ping")
    print("Successfully connected to MongoDB!")
except Exception as e:
    print(f"Failed to connect to MongoDB: {e}")

# Access the database and collections
db = client[DB_NAME]
transactions_collection = db[TRANSACTIONS_COLLECTION]
regulations_collection = db[REGULATIONS_COLLECTION]
checkpoints_collection = db[CHECKPOINTS_COLLECTION]


# Create collections with validation if they don't exist
def create_collections():
    # Get list of existing collections
    existing_collections = db.list_collection_names()

    # Create transactions collection with schema validation if it doesn't exist
    if TRANSACTIONS_COLLECTION not in existing_collections:
        db.create_collection(
            TRANSACTIONS_COLLECTION,
            validator={
                "$jsonSchema": {
                    "bsonType": "object",
                    "required": [
                        "transaction_id",
                        "amount",
                        "currency",
                        "sender",
                        "receiver",
                        "transaction_date",
                    ],
                    "properties": {
                        "transaction_id": {"bsonType": "string"},
                        "amount": {"bsonType": "double", "minimum": 0},
                        "currency": {"bsonType": "string"},
                        "sender": {"bsonType": "object"},
                        "receiver": {"bsonType": "object"},
                        "compliance_status": {"bsonType": "string"},
                    },
                }
            },
            validationLevel="moderate",
        )
        print(f"Created {TRANSACTIONS_COLLECTION} collection with schema validation")

    # Create regulations collection if it doesn't exist
    if REGULATIONS_COLLECTION not in existing_collections:
        db.create_collection(REGULATIONS_COLLECTION)
        print(f"Created {REGULATIONS_COLLECTION} collection")

    # Create checkpoints collection if it doesn't exist
    if CHECKPOINTS_COLLECTION not in existing_collections:
        db.create_collection(CHECKPOINTS_COLLECTION)
        print(f"Created {CHECKPOINTS_COLLECTION} collection")


# Call function to create collections
create_collections()


# Create vector search index if it doesn't exist
def create_vector_search_index():
    # Check if index already exists
    existing_indexes = regulations_collection.list_indexes()
    for index in existing_indexes:
        if index.get("name") == VECTOR_INDEX_NAME:
            print(f"Vector search index '{VECTOR_INDEX_NAME}' already exists.")
            return

    # Create vector search index
    index_model = {
        "mappings": {
            "dynamic": True,
            "fields": {
                "embedding": {
                    "dimensions": 1024,
                    "similarity": "cosine",
                    "type": "knnVector",
                }
            },
        }
    }

    try:
        regulations_collection.create_index(
            [("embedding", "vector")], name=VECTOR_INDEX_NAME, definition=index_model
        )
        print(f"Successfully created vector search index '{VECTOR_INDEX_NAME}'")
    except Exception as e:
        print(f"Error creating vector search index: {e}")


# Call the function to create the vector search index
create_vector_search_index()

In [3]:
# OLD!!! Non-sensitive environment variables
OPEN_AI_EMBEDDING_MODEL = "text-embedding-3-small"
OPEN_AI_EMBEDDING_MODEL_DIMENSION = 256

# Uncomment below to utilize langSmith
# os.environ["LANGCHAIN_TRACING_V2"] = "true"
# os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
# os.environ["LANGCHAIN_PROJECT"] = "factory_safety_assistant"

# Sensitive Environment Variables
set_env_securely("OPENAI_API_KEY", "Enter your OpenAI API key: ")
# Uncomment below to utilize langSmith
# set_env_securely("LANGCHAIN_API_KEY", "Enter your LangChain API key: ")

Enter your OpenAI API key: ··········


## Document Ingestion Pipeline

Now we'll create a document ingestion pipeline that can process various document formats (PDF, DOC, DOCX, and text) and extract their content for further processing.

In [None]:
import io
import re

from docx import Document
from pydantic import BaseModel, Field
from pypdf import PdfReader


class RegulationDocument(BaseModel):
    """Schema for regulatory documents"""

    id: Optional[str] = None
    title: str
    content: str
    source: str
    document_type: str
    jurisdiction: str
    publication_date: str
    tags: List[str] = Field(default_factory=list)
    embedding: Optional[List[float]] = None
    chunks: Optional[List[Dict[str, Any]]] = None

    def to_dict(self):
        return self.model_dump(exclude_none=True)


class DocumentProcessor:
    """Processes different document formats and extracts text"""

    @staticmethod
    def extract_text_from_pdf(file_path_or_bytes):
        """Extract text from PDF files"""
        if isinstance(file_path_or_bytes, str):
            # It's a file path
            reader = PdfReader(file_path_or_bytes)
        else:
            # It's bytes
            reader = PdfReader(io.BytesIO(file_path_or_bytes))

        text = ""
        for page in reader.pages:
            text += page.extract_text() + "\n"
        return text

    @staticmethod
    def extract_text_from_docx(file_path_or_bytes):
        """Extract text from DOCX files"""
        if isinstance(file_path_or_bytes, str):
            # It's a file path
            doc = Document(file_path_or_bytes)
        else:
            # It's bytes
            doc = Document(io.BytesIO(file_path_or_bytes))

        text = ""
        for para in doc.paragraphs:
            text += para.text + "\n"
        return text

    @staticmethod
    def extract_text_from_txt(file_path_or_bytes):
        """Extract text from TXT files"""
        if isinstance(file_path_or_bytes, str):
            # It's a file path
            with open(file_path_or_bytes, encoding="utf-8") as f:
                return f.read()
        else:
            # It's bytes
            return file_path_or_bytes.decode("utf-8")

    @staticmethod
    def process_document(file_path, metadata=None):
        """Process a document and extract its text based on file extension"""
        if metadata is None:
            metadata = {}

        file_extension = file_path.split(".")[-1].lower()

        if file_extension == "pdf":
            text = DocumentProcessor.extract_text_from_pdf(file_path)
            doc_type = "pdf"
        elif file_extension == "docx":
            text = DocumentProcessor.extract_text_from_docx(file_path)
            doc_type = "docx"
        elif file_extension == "txt":
            text = DocumentProcessor.extract_text_from_txt(file_path)
            doc_type = "txt"
        else:
            raise ValueError(f"Unsupported file format: {file_extension}")

        # Extract title from filename if not provided
        if "title" not in metadata:
            title = os.path.basename(file_path).rsplit(".", 1)[0]
            metadata["title"] = title

        # Set document type if not provided
        if "document_type" not in metadata:
            metadata["document_type"] = doc_type

        # Create regulation document
        regulation = RegulationDocument(
            title=metadata.get("title", ""),
            content=text,
            source=metadata.get("source", file_path),
            document_type=metadata.get("document_type", doc_type),
            jurisdiction=metadata.get("jurisdiction", "Unknown"),
            publication_date=metadata.get(
                "publication_date", datetime.now().strftime("%Y-%m-%d")
            ),
            tags=metadata.get("tags", []),
        )

        return regulation

    @staticmethod
    def extract_metadata_from_content(content):
        """Extract metadata from document content using regex patterns"""
        metadata = {}

        # Extract jurisdiction
        jurisdiction_pattern = r"(?i)jurisdiction[:\s]+(\w+(?:\s+\w+)*)"
        jurisdiction_match = re.search(jurisdiction_pattern, content)
        if jurisdiction_match:
            metadata["jurisdiction"] = jurisdiction_match.group(1).strip()

        # Extract date
        date_pattern = r"(?i)(?:date|published)[:\s]+(\d{1,2}[/-]\d{1,2}[/-]\d{2,4}|\d{4}[/-]\d{1,2}[/-]\d{1,2})"
        date_match = re.search(date_pattern, content)
        if date_match:
            metadata["publication_date"] = date_match.group(1).strip()

        # Extract tags
        tags_pattern = r"(?i)(?:keywords|tags)[:\s]+([\w\s,]+)"
        tags_match = re.search(tags_pattern, content)
        if tags_match:
            tags = [tag.strip() for tag in tags_match.group(1).split(",")]
            metadata["tags"] = tags

        return metadata

## Text Chunking and Embedding Generation

Now we'll implement text chunking strategies and generate embeddings using Voyage AI.

In [None]:
import voyageai
from langchain_text_splitters import RecursiveCharacterTextSplitter


class TextProcessor:
    """Handles text chunking and embedding generation"""

    def __init__(self, chunk_size=1000, chunk_overlap=200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""],
        )
        self.voyage_client = voyageai.Client(api_key=os.environ["VOYAGE_API_KEY"])
        self.model_name = "voyage-2"

    def chunk_text(self, text):
        """Split text into chunks"""
        return self.text_splitter.split_text(text)

    def generate_embeddings(self, texts):
        """Generate embeddings for a list of texts using Voyage AI"""
        if not texts:
            return []

        embeddings = self.voyage_client.embed(texts, model=self.model_name).embeddings
        return embeddings

    def process_document(self, regulation_doc):
        """Process a regulation document: chunk text and generate embeddings"""
        # Chunk the document content
        chunks = self.chunk_text(regulation_doc.content)

        # Generate embeddings for each chunk
        chunk_embeddings = self.generate_embeddings(chunks)

        # Create chunk objects with embeddings
        processed_chunks = []
        for i, (chunk, embedding) in enumerate(zip(chunks, chunk_embeddings)):
            processed_chunks.append(
                {
                    "chunk_id": f"{regulation_doc.id or 'doc'}_{i}",
                    "content": chunk,
                    "embedding": embedding,
                }
            )

        # Generate embedding for the entire document (using title + first chunk)
        doc_text = f"{regulation_doc.title}\n{chunks[0] if chunks else ''}"
        doc_embedding = self.generate_embeddings([doc_text])[0]

        # Update the regulation document
        regulation_doc.embedding = doc_embedding
        regulation_doc.chunks = processed_chunks

        return regulation_doc

    def store_regulation(self, regulation_doc):
        """Store a processed regulation document in MongoDB"""
        # Convert to dictionary for MongoDB storage
        regulation_dict = regulation_doc.to_dict()

        # Insert into MongoDB
        result = regulations_collection.insert_one(regulation_dict)
        print(f"Stored regulation document with ID: {result.inserted_id}")

        return result.inserted_id

## Sample Regulatory Documents

Let's create some sample regulatory documents to demonstrate the system.

In [None]:
# Sample regulatory texts
sample_regulations = [
    {
        "title": "Anti-Money Laundering Directive",
        "content": """ANTI-MONEY LAUNDERING DIRECTIVE
Jurisdiction: European Union
Date: 2021-06-15
Keywords: AML, KYC, financial crime, cross-border

Section 1: Scope and Definitions
1.1 This directive applies to all financial institutions operating within the European Union that process cross-border transactions.
1.2 'Cross-border transaction' refers to any financial transfer that originates in one country and terminates in another.
1.3 'High-risk jurisdiction' refers to countries identified by the Financial Action Task Force (FATF) as having strategic deficiencies in their AML/CFT regimes.

Section 2: Due Diligence Requirements
2.1 Enhanced due diligence must be performed for all transactions exceeding €10,000 that involve high-risk jurisdictions.
2.2 Financial institutions must verify the identity of both the sender and recipient for all cross-border transactions exceeding €3,000.
2.3 For transactions with sanctioned countries, prior approval must be obtained from the compliance department.

Section 3: Reporting Requirements
3.1 All suspicious transactions must be reported to the national Financial Intelligence Unit within 24 hours of detection.
3.2 Monthly reports must be submitted detailing all cross-border transactions exceeding €50,000.
3.3 Failure to report suspicious activities may result in fines of up to €5 million or 10% of annual turnover.
""",
        "source": "EU Financial Regulatory Authority",
        "document_type": "directive",
        "jurisdiction": "European Union",
        "publication_date": "2021-06-15",
        "tags": ["AML", "KYC", "financial crime", "cross-border"],
    },
    {
        "title": "Sanctions Compliance Framework",
        "content": """SANCTIONS COMPLIANCE FRAMEWORK
Jurisdiction: United States
Date: 2022-03-10
Keywords: sanctions, OFAC, restricted parties, compliance

Section 1: Overview
1.1 This framework outlines compliance requirements for financial institutions regarding transactions subject to sanctions administered by the Office of Foreign Assets Control (OFAC).
1.2 All US financial institutions and their foreign branches must comply with these requirements.

Section 2: Prohibited Transactions
2.1 No financial institution shall process transactions involving entities listed on the Specially Designated Nationals (SDN) list.
2.2 Transactions with entities in comprehensively sanctioned countries including Iran, North Korea, Syria, Cuba, and the Crimea region are prohibited without specific OFAC authorization.
2.3 Transactions that attempt to circumvent sanctions through third-party intermediaries are strictly prohibited and subject to severe penalties.

Section 3: Screening Requirements
3.1 All parties to a transaction must be screened against the most current OFAC sanctions lists prior to processing.
3.2 Screening must include beneficial owners with 25% or greater ownership interest.
3.3 Institutions must implement real-time screening for all international wire transfers regardless of amount.

Section 4: Penalties for Non-Compliance
4.1 Civil penalties may reach the greater of $1,000,000 per violation or twice the value of the transaction.
4.2 Criminal penalties for willful violations may include fines up to $20 million and imprisonment up to 30 years.
4.3 Financial institutions may be subject to regulatory actions including restrictions on activities or loss of licenses.
""",
        "source": "US Department of Treasury",
        "document_type": "framework",
        "jurisdiction": "United States",
        "publication_date": "2022-03-10",
        "tags": ["sanctions", "OFAC", "restricted parties", "compliance"],
    },
]

# Process and store sample regulations
text_processor = TextProcessor()

for reg_data in sample_regulations:
    # Create regulation document
    regulation = RegulationDocument(**reg_data)

    # Process document (chunk and generate embeddings)
    processed_regulation = text_processor.process_document(regulation)

    # Store in MongoDB
    regulation_id = text_processor.store_regulation(processed_regulation)
    print(f"Processed and stored regulation: {regulation.title}")

## Transaction Data Model

Let's define the data model for financial transactions that will be assessed for compliance.

In [None]:
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional, Union

from pydantic import BaseModel, Field, validator


class ComplianceStatus(str, Enum):
    """Enum for compliance status"""

    COMPLIANT = "Compliant"
    REPORTING_REQUIRED = "Reporting Required"
    VIOLATION = "Violation"
    PENDING = "Pending Assessment"


class TransactionParty(BaseModel):
    """Model for a party in a transaction (sender or receiver)"""

    name: str
    country: str
    account_number: str
    institution: str
    is_sanctioned: bool = False
    risk_score: Optional[float] = None


class Transaction(BaseModel):
    """Model for a financial transaction"""

    id: Optional[str] = None
    transaction_id: str
    amount: float
    currency: str
    sender: TransactionParty
    receiver: TransactionParty
    transaction_date: str
    transaction_type: str
    description: str
    compliance_status: ComplianceStatus = ComplianceStatus.PENDING
    compliance_details: Optional[Dict[str, Any]] = None

    @validator("amount")
    def amount_must_be_positive(cls, v):
        if v <= 0:
            raise ValueError("Amount must be positive")
        return v

    def to_dict(self):
        return self.model_dump(exclude_none=True)

    def to_prompt(self):
        """Convert transaction to a prompt-friendly format"""
        return f"""Transaction Details:
- Transaction ID: {self.transaction_id}
- Amount: {self.amount} {self.currency}
- Date: {self.transaction_date}
- Type: {self.transaction_type}
- Description: {self.description}

Sender Information:
- Name: {self.sender.name}
- Country: {self.sender.country}
- Institution: {self.sender.institution}
- Sanctioned: {self.sender.is_sanctioned}

Receiver Information:
- Name: {self.receiver.name}
- Country: {self.receiver.country}
- Institution: {self.receiver.institution}
- Sanctioned: {self.receiver.is_sanctioned}
"""

## Compliance Assessment Engine

Now we'll implement the compliance assessment engine that evaluates transactions against regulatory policies.

In [None]:
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI


class ComplianceEngine:
    """Engine for assessing transaction compliance against regulations"""

    def __init__(self):
        # Initialize LLM (using OpenAI, but could be replaced with ShieldGemma 9B)
        self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
        self.text_processor = TextProcessor()

        # Define compliance assessment prompt
        self.assessment_prompt = ChatPromptTemplate.from_template(
            """You are a financial compliance expert tasked with evaluating if a transaction complies with regulatory requirements.
            
            First, analyze the transaction details:
            {transaction}
            
            Then, evaluate compliance against these relevant regulations:
            {regulations}
            
            Determine if the transaction is:
            1. Compliant - Fully complies with all regulations
            2. Reporting Required - Compliant but requires reporting to authorities
            3. Violation - Violates one or more regulations
            
            Provide your assessment in the following JSON format:
            {{
                "status": "Compliant", "Reporting Required", or "Violation",
                "confidence": <float between 0 and 1>,
                "reasoning": "<detailed explanation of your assessment>",
                "applicable_regulations": ["<list of specific regulation sections that apply>"],
                "recommended_actions": ["<list of recommended actions if any>"],
                "risk_factors": ["<list of identified risk factors if any>"]
            }}
            """
        )

        # Output parser
        self.parser = JsonOutputParser()

        # Create the chain
        self.chain = self.assessment_prompt | self.llm | self.parser

    def retrieve_relevant_regulations(self, transaction):
        """Retrieve relevant regulations for a transaction using vector search"""
        # Generate embedding for the transaction
        transaction_text = transaction.to_prompt()
        transaction_embedding = self.text_processor.generate_embeddings(
            [transaction_text]
        )[0]

        # Perform vector search in MongoDB
        pipeline = [
            {
                "$search": {
                    "index": VECTOR_INDEX_NAME,
                    "knnBeta": {
                        "vector": transaction_embedding,
                        "path": "embedding",
                        "k": 5,
                    },
                }
            },
            {
                "$project": {
                    "_id": 0,
                    "title": 1,
                    "content": 1,
                    "jurisdiction": 1,
                    "publication_date": 1,
                }
            },
        ]

        results = list(regulations_collection.aggregate(pipeline))

        # Format regulations for prompt
        regulations_text = ""
        for i, reg in enumerate(results, 1):
            regulations_text += f"Regulation {i}: {reg['title']} ({reg['jurisdiction']}, {reg['publication_date']})\n"
            regulations_text += f"{reg['content']}\n\n"

        return regulations_text

    def assess_transaction(self, transaction):
        """Assess a transaction for compliance"""
        # Retrieve relevant regulations
        regulations = self.retrieve_relevant_regulations(transaction)

        # Prepare inputs
        inputs = {"transaction": transaction.to_prompt(), "regulations": regulations}

        # Run assessment
        assessment = self.chain.invoke(inputs)

        # Update transaction with assessment results
        transaction.compliance_status = ComplianceStatus(assessment["status"])
        transaction.compliance_details = assessment

        # Store updated transaction in MongoDB
        if transaction.id:
            transactions_collection.update_one(
                {"_id": transaction.id}, {"$set": transaction.to_dict()}
            )
        else:
            result = transactions_collection.insert_one(transaction.to_dict())
            transaction.id = str(result.inserted_id)

        return assessment

    def normalize_confidence(self, assessments):
        """Normalize confidence scores using softmax"""
        # Extract confidence scores
        scores = [assessment["confidence"] for assessment in assessments]

        # Apply softmax normalization
        exp_scores = np.exp(scores)
        normalized_scores = exp_scores / np.sum(exp_scores)

        # Update assessments with normalized scores
        for i, assessment in enumerate(assessments):
            assessment["normalized_confidence"] = float(normalized_scores[i])

        return assessments

## Agent Orchestration with LangGraph

Now we'll implement the agent orchestration framework using LangGraph to coordinate the compliance assessment workflow.

In [None]:
from typing import Any, Dict, List, Optional, TypedDict

import langgraph.graph as lg
from langchain_core.messages import AIMessage, HumanMessage
from langgraph.checkpoint.mongodb import MongoDBCheckpointStore


# Define state for the graph
class ComplianceState(TypedDict):
    """State for the compliance assessment workflow"""

    transaction: Dict[str, Any]  # Transaction data
    regulations: Optional[List[Dict[str, Any]]]  # Retrieved regulations
    assessment: Optional[Dict[str, Any]]  # Compliance assessment results
    messages: List[Union[HumanMessage, AIMessage]]  # Conversation history
    errors: Optional[List[str]]  # Any errors encountered


class ComplianceWorkflow:
    """Orchestrates the compliance assessment workflow using LangGraph"""

    def __init__(self):
        self.compliance_engine = ComplianceEngine()
        self.text_processor = TextProcessor()

        # Create MongoDB checkpoint store
        self.checkpoint_store = MongoDBCheckpointStore(
            connection_string=os.environ["MONGODB_URI"],
            db_name=DB_NAME,
            collection_name=CHECKPOINTS_COLLECTION,
        )

        # Build the graph
        self.workflow = self._build_graph()

    def _parse_transaction(self, state: ComplianceState) -> ComplianceState:
        """Parse transaction data and create Transaction object"""
        try:
            # Create Transaction object from state data
            transaction_data = state["transaction"]
            transaction = Transaction(**transaction_data)

            # Update state with parsed transaction
            state["transaction"] = transaction.to_dict()
            state["messages"].append(
                AIMessage(
                    content=f"Transaction {transaction.transaction_id} parsed successfully."
                )
            )

        except Exception as e:
            error_msg = f"Error parsing transaction: {e!s}"
            state["errors"] = state.get("errors", []) + [error_msg]
            state["messages"].append(AIMessage(content=error_msg))

        return state

    def _retrieve_regulations(self, state: ComplianceState) -> ComplianceState:
        """Retrieve relevant regulations for the transaction"""
        try:
            # Create Transaction object from state
            transaction = Transaction(**state["transaction"])

            # Retrieve relevant regulations
            regulations_text = self.compliance_engine.retrieve_relevant_regulations(
                transaction
            )

            # Update state with retrieved regulations
            state["regulations"] = regulations_text
            state["messages"].append(
                AIMessage(
                    content="Retrieved relevant regulations for compliance assessment."
                )
            )

        except Exception as e:
            error_msg = f"Error retrieving regulations: {e!s}"
            state["errors"] = state.get("errors", []) + [error_msg]
            state["messages"].append(AIMessage(content=error_msg))

        return state

    def _assess_compliance(self, state: ComplianceState) -> ComplianceState:
        """Assess transaction compliance against regulations"""
        try:
            # Create Transaction object from state
            transaction = Transaction(**state["transaction"])

            # Assess compliance
            assessment = self.compliance_engine.assess_transaction(transaction)

            # Update state with assessment results
            state["assessment"] = assessment
            state["transaction"] = (
                transaction.to_dict()
            )  # Update with compliance status

            # Add message with assessment summary
            summary = f"Compliance assessment complete. Status: {assessment['status']} (Confidence: {assessment['confidence']:.2f})\n"
            summary += f"Reasoning: {assessment['reasoning']}\n"
            if assessment.get("recommended_actions"):
                summary += f"Recommended actions: {', '.join(assessment['recommended_actions'])}\n"

            state["messages"].append(AIMessage(content=summary))

        except Exception as e:
            error_msg = f"Error assessing compliance: {e!s}"
            state["errors"] = state.get("errors", []) + [error_msg]
            state["messages"].append(AIMessage(content=error_msg))

        return state

    def _should_retry(self, state: ComplianceState) -> str:
        """Determine if workflow should retry or end based on errors"""
        if state.get("errors") and len(state["errors"]) < 3:
            return "retry"
        return "end"

    def _build_graph(self):
        """Build the LangGraph workflow"""
        # Define the graph
        builder = lg.StateGraph(ComplianceState)

        # Add nodes
        builder.add_node("parse_transaction", self._parse_transaction)
        builder.add_node("retrieve_regulations", self._retrieve_regulations)
        builder.add_node("assess_compliance", self._assess_compliance)

        # Define edges
        builder.add_edge("parse_transaction", "retrieve_regulations")
        builder.add_edge("retrieve_regulations", "assess_compliance")

        # Add conditional edge for error handling
        builder.add_conditional_edges(
            "assess_compliance",
            self._should_retry,
            {"retry": "parse_transaction", "end": lg.END},
        )

        # Set entry point
        builder.set_entry_point("parse_transaction")

        # Compile the graph with MongoDB checkpointing
        return builder.compile(checkpointer=self.checkpoint_store)

    def process_transaction(self, transaction_data: Dict[str, Any]) -> Dict[str, Any]:
        """Process a transaction through the compliance workflow"""
        # Initialize state
        initial_state = ComplianceState(
            transaction=transaction_data,
            regulations=None,
            assessment=None,
            messages=[
                HumanMessage(
                    content=f"Process transaction {transaction_data.get('transaction_id', 'unknown')}"
                )
            ],
            errors=None,
        )

        # Run the workflow
        final_state = self.workflow.invoke(initial_state)

        return final_state

## Demonstration: Processing Sample Transactions

Let's demonstrate the system by processing some sample transactions.

In [None]:
# Sample transactions for demonstration
sample_transactions = [
    {
        "transaction_id": "TX123456789",
        "amount": 15000.00,
        "currency": "EUR",
        "sender": {
            "name": "European Trading Ltd",
            "country": "Germany",
            "account_number": "DE89370400440532013000",
            "institution": "Deutsche Bank",
            "is_sanctioned": False,
        },
        "receiver": {
            "name": "Global Imports Inc",
            "country": "United States",
            "account_number": "US12345678901234567890",
            "institution": "Bank of America",
            "is_sanctioned": False,
        },
        "transaction_date": "2023-11-15",
        "transaction_type": "International Wire Transfer",
        "description": "Payment for machinery parts",
    },
    {
        "transaction_id": "TX987654321",
        "amount": 75000.00,
        "currency": "USD",
        "sender": {
            "name": "American Exports LLC",
            "country": "United States",
            "account_number": "US98765432109876543210",
            "institution": "JP Morgan Chase",
            "is_sanctioned": False,
        },
        "receiver": {
            "name": "Tehran Trading Co",
            "country": "Iran",
            "account_number": "IR123456789012345678901234",
            "institution": "Bank Melli Iran",
            "is_sanctioned": True,
        },
        "transaction_date": "2023-12-01",
        "transaction_type": "International Wire Transfer",
        "description": "Consulting services",
    },
    {
        "transaction_id": "TX567891234",
        "amount": 9500.00,
        "currency": "EUR",
        "sender": {
            "name": "French Distributors SA",
            "country": "France",
            "account_number": "FR1420041010050500013M02606",
            "institution": "BNP Paribas",
            "is_sanctioned": False,
        },
        "receiver": {
            "name": "Italian Suppliers SRL",
            "country": "Italy",
            "account_number": "IT60X0542811101000000123456",
            "institution": "UniCredit",
            "is_sanctioned": False,
        },
        "transaction_date": "2023-12-10",
        "transaction_type": "SEPA Transfer",
        "description": "Purchase of luxury goods",
    },
]

# Initialize the compliance workflow
workflow = ComplianceWorkflow()

# Process each transaction
results = []
for tx_data in sample_transactions:
    print(f"\nProcessing transaction {tx_data['transaction_id']}...")
    result = workflow.process_transaction(tx_data)
    results.append(result)

    # Display messages from the workflow
    for message in result["messages"]:
        if isinstance(message, AIMessage):
            print(f"System: {message.content}")

    # Display final assessment
    if result.get("assessment"):
        assessment = result["assessment"]
        print(f"\nFinal Assessment for {tx_data['transaction_id']}:")
        print(f"Status: {assessment['status']}")
        print(f"Confidence: {assessment['confidence']:.2f}")
        print(f"Reasoning: {assessment['reasoning']}")
        if assessment.get("recommended_actions"):
            print(
                f"Recommended Actions: {', '.join(assessment['recommended_actions'])}"
            )
        print("-" * 80)

## Visualizing Compliance Results

Let's visualize the compliance assessment results.

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Extract assessment data
assessment_data = []
for result in results:
    if result.get("assessment") and result.get("transaction"):
        assessment_data.append(
            {
                "transaction_id": result["transaction"]["transaction_id"],
                "amount": result["transaction"]["amount"],
                "currency": result["transaction"]["currency"],
                "sender_country": result["transaction"]["sender"]["country"],
                "receiver_country": result["transaction"]["receiver"]["country"],
                "status": result["assessment"]["status"],
                "confidence": result["assessment"]["confidence"],
            }
        )

# Create DataFrame
df = pd.DataFrame(assessment_data)

# Set up the figure
plt.figure(figsize=(12, 8))

# Create a bar chart of compliance status by transaction
plt.subplot(2, 1, 1)
status_colors = {
    "Compliant": "green",
    "Reporting Required": "orange",
    "Violation": "red",
}
colors = [status_colors.get(status, "gray") for status in df["status"]]
sns.barplot(
    x="transaction_id", y="confidence", hue="status", data=df, palette=status_colors
)
plt.title("Compliance Assessment Results by Transaction")
plt.xlabel("Transaction ID")
plt.ylabel("Confidence Score")
plt.ylim(0, 1)

# Create a scatter plot of transaction amount vs. confidence
plt.subplot(2, 1, 2)
scatter = plt.scatter(
    x=df["amount"],
    y=df["confidence"],
    c=[status_colors.get(status, "gray") for status in df["status"]],
    s=100,
    alpha=0.7,
)
plt.title("Transaction Amount vs. Compliance Confidence")
plt.xlabel("Transaction Amount")
plt.ylabel("Confidence Score")
plt.ylim(0, 1)

# Add transaction IDs as annotations
for i, txt in enumerate(df["transaction_id"]):
    plt.annotate(txt, (df["amount"].iloc[i], df["confidence"].iloc[i]), fontsize=9)

plt.tight_layout()
plt.show()

## Conclusion

In this notebook, we've demonstrated a comprehensive AI-powered transaction compliance monitoring system that leverages MongoDB's vector search capabilities, Voyage AI embeddings, and advanced LLMs to automate regulatory checks on financial transactions.

The system includes:
1. A document ingestion pipeline for processing regulatory documents
2. A MongoDB Atlas data layer for storing transactions, regulations, and vector embeddings
3. An NLP processing pipeline for text chunking and embedding generation
4. A compliance assessment engine for evaluating transactions against regulations
5. A LangGraph-based agent orchestration framework for workflow management

This implementation provides a foundation that can be extended with additional features such as:
- Real-time transaction monitoring
- Integration with existing financial systems
- Advanced risk scoring algorithms
- Customizable compliance rules and thresholds
- Audit trail and reporting capabilities

By automating compliance checks, financial institutions can reduce operational costs, minimize human error, and ensure consistent application of regulatory requirements across all transactions.