# 🏦 Quarterly Earnings Reports Analysis for Bank of England Project

This notebook presents a comprehensive pipeline for analyzing quarterly financial disclosures from **Global Systemically Important Banks (G-SIBs)**. It was developed as part of a data science employer project in collaboration with the **Bank of England** and the **University of Cambridge**.

We combine **financial data extraction**, **language model summarization**, and **credit risk metric analysis** to derive structured insights from messy, unstructured documents such as:

- 📄 Quarterly financial reports (PDFs)
- 🗣️ Earnings call transcripts (SeekingAlpha & official bank websites)

---

### 🎯 Objective

To extract meaningful insights and risk indicators from G-SIB disclosures using a combination of:
- **AI-driven summarization** (Claude, GPT-4o, Mistral)
- **Topic modeling & sentiment analysis** (BERTopic, FinBERT)
- **Metric extraction** (Google Document AI)
- **Semantic + lexical search fusion** (FAISS + BM25)

All outputs are mapped to a unified **credit risk framework**, including metrics like:
- **CET1 Ratio**
- **NPL Ratio**
- **Coverage**
- **PCL**
- **Texas Ratio**

---

### 🏗️ Notebook Flow

1. **Setup & Dependencies**
2. **Configuration & API Keys (via `.env`)**
3. **Data Ingestion (Reports + Transcripts)**
4. **Document Processing Pipelines**
5. **Metrics & Risk Analysis**
6. **LLM-based Insights Generation**
7. **Visualization & Benchmarking**

---

> **Disclaimer:** This project uses paid APIs (e.g., Claude, GPT-4, Document AI). Processing multiple transcripts may incur costs. Credentials should be stored securely via `.env` files and never pushed to public repositories.

---

📎 Learn more in the [README.md](./README.md)


## **IMPORTANT: Using this notebook incurs costs.**  This implementation leverages the Claude API and OpenAI GPT4o, which is a paid service.

While we employ batch processing and prompt caching to reduce costs by approximately 75%, expect to incur a cost of roughly $1 per Q&A transcript processed.

**Cost Justification:** Despite the cost, this is our most advanced and cutting-edge solution for parsing unstructured earnings call transcripts.  The superior accuracy and insights derived from Claude significantly outweigh the expenses, especially when considering the time savings and enhanced analysis compared to other methods.  

**General Applicability:** This solution is not limited to seekingalpha transcripts format but it effectively parses *any* bank transcript format.

> ⚠️ **Note for Users & Reviewers:**  
> This notebook comes with a fully processed SQLite database (`gsib_analysis_FINAL.db`) located in `/data`.  
> You can skip all transcript and document parsing steps and go straight to **visualization, scoring, and benchmarking**.

✅ Simply run the config + setup cells at the top of this notebook, then skip to:
- 📊 Visualization & Reporting
- 🔍 Metric Exploration
- 💬 GPT-powered Answer Generation (optional)

This allows you to view insights instantly without incurring compute/API costs.


## 🚀 Install Packages and Import dependencies

In [1]:
%pip install -q pymupdf pypdf pydrive2 pandas openai google-cloud-documentai google-auth \
                google-auth google-auth-oauthlib google-auth-httplib2 google-api-python-client \
                sentence-transformers transformers torch numpy plotly nltk spacy scikit-learn \
                ipywidgets bertopic faiss-cpu langchain rank-bm25 \
                langchain-community huggingface_hub beautifulsoup4 PyPDF2 anthropic

Note: you may need to restart the kernel to use updated packages.


In [1]:
# ===========================
# 📌 Standard Library Imports
# ===========================
import asyncio
import datetime
import json
import logging
import os
import pickle
import random
import re
import sqlite3
import time
import warnings
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
from dotenv import load_dotenv

# ===========================
# 🤖 AI & LLM API Integrations
# ===========================
import anthropic

# ===========================
# 📌 Machine Learning & NLP
# ===========================
import faiss

# =========================================
# 📌 PDF & Document Processing
# =========================================
import fitz  # PyMuPDF

# ==============================
# 📌 Text Processing & Spell Checking
# ==============================

import nest_asyncio
import nltk

# ================================
# 📌 Numerical & Data Processing
# ================================
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import pypdf
import requests
import spacy
import torch
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request
from bertopic import BERTopic
from bs4 import BeautifulSoup
from google.api_core.client_options import ClientOptions
from google.cloud import documentai

from hdbscan import HDBSCAN
from IPython.display import Markdown, display
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from oauth2client.client import GoogleCredentials
from oauth2client.service_account import ServiceAccountCredentials
from openai import OpenAI
from plotly.subplots import make_subplots

from rank_bm25 import BM25Okapi
from sentence_transformers import CrossEncoder, SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from torch.nn import functional as F

# =====================
# 📌 Utility Libraries
# =====================
from tqdm import tqdm
from transformers import (
    AutoModel,
    AutoModelForSequenceClassification,
    AutoTokenizer,
)
from umap import UMAP

# =============================
# 📌 Configure Warnings & Logging
# =============================
warnings.filterwarnings("ignore", category=DeprecationWarning)
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)

# =============================
# 🔽 Download Required NLTK Data
# =============================
nltk.download("punkt")
nltk.download("stopwords")
nltk.download("wordnet")

# =============================
# ⚙️ Configure GPU (if available)
# =============================
device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu")
logging.info(f"Using device: {device}")


# =============================
# ⚙️ Set Random Seeds for Reproducibility
# =============================
def set_seeds(seed: int = 42):
    """Set random seeds for reproducibility across all libraries"""
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)


set_seeds()

# =============================
# ⏩ Enable tqdm Progress Bars
# =============================
tqdm.pandas()

[nltk_data] Downloading package punkt to /Users/prash/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /Users/prash/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /Users/prash/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
2025-03-28 17:05:46,621 - root - INFO - Using device: mps


## 🎯 Configuration & Global Settings

In [2]:
# Load environment variables from .env file (if present)
load_dotenv()

# ================================
# 📌 Selected Entities & Parameters
# ================================

SELECTED_BANKS = ["JPMorgan Chase", "UBS"]  # Banks to include in analysis

RISK_TYPES_AI_INSIGHTS = [
    "credit_risk",
    # "liquidity_risk",
    # "market_risk",
    # "group_risk",
]  # Risk types to generate insights for

# ================================
# 📂 Local Storage Paths
# ================================

DB_PATH = os.getenv("DB_PATH", "./data/gsib_analysis_FINAL.db")

CALL_TRANSCRIPTS_DOWNLOAD_FOLDER = os.getenv(
    "CALL_TRANSCRIPTS_DOWNLOAD_FOLDER", "./data/earnings_transcripts"
)

QUARTERLY_REPORTS_DOWNLOAD_FOLDER = os.getenv(
    "QUARTERLY_REPORTS_DOWNLOAD_FOLDER", "./data/quarterly_reports"
)

# ================================
# 🔐 API Keys & Credentials
# ================================

# Google Cloud Document AI
GOOGLE_AUTH_JSON_FILE = os.getenv("GOOGLE_APPLICATION_CREDENTIALS", "./secrets/google-auth.json")
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = GOOGLE_AUTH_JSON_FILE

# Search APIs
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_CX = os.getenv("GOOGLE_CX")

SERPER_API_KEY = os.getenv("SERPER_API_KEY")
BRIGHTDATA_API_KEY = os.getenv("BRIGHTDATA_API_KEY")

# OpenAI / Anthropic
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "")
os.environ["ANTHROPIC_API_KEY"] = os.getenv("ANTHROPIC_API_KEY", "")

# ================================
# 🤖 Google Document AI Config
# ================================

DOCUMENT_AI_PROJECT_ID = os.getenv("DOCUMENT_AI_PROJECT_ID", "974627024770")
DOCUMENT_AI_LOCATION = os.getenv("DOCUMENT_AI_LOCATION", "eu")
DOCUMENT_AI_PROCESSOR_ID = os.getenv("DOCUMENT_AI_PROCESSOR_ID", "9d55d1c14e76de83")
DOCUMENT_AI_OUTPUT_DIR = os.getenv("DOCUMENT_AI_OUTPUT_DIR", "output")
# Optional processor version (if needed)
DOCUMENT_AI_PROCESSOR_VERSION = os.getenv("DOCUMENT_AI_PROCESSOR_VERSION", None)

# ================================
# ⚡ Global Caches (for performance)
# ================================

_MODEL_CACHE = {}
_TOKENIZER_CACHE = {}
_PIPELINE_CACHE = {}
os.environ["TOKENIZERS_PARALLELISM"] = "false"  # Disable parallelism for tokenizers

# ================================
# 📘 Utility Functions
# ================================

def md_print(text):
    """Display formatted markdown in notebook output"""
    from IPython.display import Markdown, display
    display(Markdown(text))


## 📂 Database Manager

In [15]:
class GSIBDatabase:
    """Database manager for G-SIB quarterly announcements analysis system"""

    def __init__(
        self,
        db_path: str = "gsib_analysis.db",
        log_level=logging.INFO,
        enable_wal=True,
    ):
        """
        Initialize database connection and logger

        Args:
            db_path: Path to SQLite database file
            log_level: Logging level
            enable_wal: Whether to enable WAL mode (default: True)
        """
        self.db_path = db_path

        # Set up logging
        self.logger = logging.getLogger("GSIBDatabase")
        self.logger.setLevel(log_level)

        if not self.logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter(
                "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
            )
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)

        # Initialize database if it doesn't exist
        if not os.path.exists(db_path):
            self.init_database()

        # Enable WAL mode if requested
        if enable_wal:
            self._enable_wal_mode()

    def _enable_wal_mode(self):
        """Enable Write-Ahead Logging mode for better concurrency and performance"""
        conn = self.get_db_connection()
        try:
            conn.execute("PRAGMA journal_mode=WAL;")
            conn.execute("PRAGMA synchronous=NORMAL;")
            conn.commit()
            self.logger.info("WAL mode enabled for database")
        except Exception as e:
            self.logger.error(f"Error enabling WAL mode: {str(e)}")
        finally:
            conn.close()

    def checkpoint_wal(self, mode: str = "PASSIVE"):
        """
        Checkpoint the WAL file to manage its size

        Args:
            mode: Checkpoint mode ('PASSIVE', 'FULL', 'RESTART', or 'TRUNCATE')
                 PASSIVE: Only checkpoint if no readers are active
                 FULL: Checkpoint even if it delays reads/writes
                 RESTART: Checkpoint and restart WAL file
                 TRUNCATE: Checkpoint and truncate WAL file
        """
        valid_modes = ["PASSIVE", "FULL", "RESTART", "TRUNCATE"]
        if mode not in valid_modes:
            mode = "PASSIVE"
            self.logger.warning(
                f"Invalid checkpoint mode, using PASSIVE. Valid modes: {valid_modes}",
            )

        conn = self.get_db_connection()
        try:
            conn.execute(f"PRAGMA wal_checkpoint({mode});")
            self.logger.info(f"WAL checkpoint completed with mode: {mode}")
        except Exception as e:
            self.logger.error(f"Error during WAL checkpoint: {str(e)}")
        finally:
            conn.close()

    def cursor_to_dict(self, cursor, rows):
        """
        Convert database cursor rows to a list of dictionaries.

        Args:
            cursor: Database cursor object
            rows: Fetched rows from cursor

        Returns:
            List of dictionaries where keys are column names
        """
        # Get column names from cursor description
        columns = [column[0] for column in cursor.description]
        # Convert rows to dictionaries
        return [dict(zip(columns, row)) for row in rows]

    def begin_transaction(self):
        """
        Begin a new transaction and return a connection object

        Returns:
            SQLite connection with active transaction
        """
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        return conn

    def commit_transaction(self, conn):
        """
        Commit an active transaction

        Args:
            conn: SQLite connection with active transaction
        """
        try:
            conn.commit()
        except Exception as e:
            self.logger.error(f"Error committing transaction: {str(e)}")
            raise
        finally:
            conn.close()

    def rollback_transaction(self, conn):
        """
        Roll back an active transaction

        Args:
            conn: SQLite connection with active transaction
        """
        try:
            conn.rollback()
        except Exception as e:
            self.logger.error(f"Error rolling back transaction: {str(e)}")
        finally:
            conn.close()

    def execute_in_transaction(self, func):
        """
        Execute a function within a transaction context

        Args:
            func: Function to execute. Must accept a connection object as its first argument.
                 The function should not commit or close the connection.

        Returns:
            The result of the function call
        """
        conn = self.begin_transaction()
        try:
            result = func(conn)
            conn.commit()
            return result
        except Exception as e:
            conn.rollback()
            self.logger.error(f"Transaction failed and was rolled back: {str(e)}")
            raise
        finally:
            conn.close()

    def get_db_connection(self):
        """Get a connection to the SQLite database"""
        conn = sqlite3.connect(self.db_path, isolation_level="EXCLUSIVE")
        conn.row_factory = sqlite3.Row
        return conn

    def init_database(self):
        """Initialize SQLite database with required tables based on provided schema"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # Create banks table
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS banks (
            id INTEGER PRIMARY KEY,
            bank_name TEXT UNIQUE NOT NULL
        )
        """)

        # Create reports table
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS reports (
            id INTEGER PRIMARY KEY,
            bank_id INTEGER NOT NULL,
            report_name TEXT NOT NULL,
            year INTEGER NOT NULL,
            quarter TEXT NOT NULL,
            report_date TEXT,
            processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (bank_id) REFERENCES banks (id) ON DELETE CASCADE,
            UNIQUE (bank_id, year, quarter)
        )
        """)

        # Create metrics table
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS metrics (
            id INTEGER PRIMARY KEY,
            report_id INTEGER NOT NULL,
            metric_name TEXT NOT NULL,
            metric_value REAL,
            formatted_value TEXT,
            in_typical_range BOOLEAN,
            FOREIGN KEY (report_id) REFERENCES reports (id) ON DELETE CASCADE,
            UNIQUE (report_id, metric_name)
        )
        """)

        # Analyst conversations table (combines all Q&A for one analyst in a report)
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS analyst_conversations (
            id INTEGER PRIMARY KEY,
            report_id INTEGER NOT NULL,
            analyst_name TEXT NOT NULL,
            analyst_company TEXT,
            topic_id INTEGER,
            topic_probability REAL,
            FOREIGN KEY (report_id) REFERENCES reports (id) ON DELETE CASCADE
        )
        """)

        # Topics table
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS topics (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            keywords TEXT NOT NULL,  -- JSON array of keywords
            category TEXT NOT NULL   -- e.g., 'CREDIT_RISK', 'MARKET_RISK', etc.
        );
        """)

        # Individual QA pairs
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS qa_pairs (
            id INTEGER PRIMARY KEY,
            conversation_id INTEGER NOT NULL,
            question TEXT NOT NULL,          -- Original raw question
            answer TEXT NOT NULL,            -- Original raw answer
            answer_speaker TEXT NOT NULL,
            answer_role TEXT NOT NULL,
            FOREIGN KEY (conversation_id) REFERENCES analyst_conversations (id) ON DELETE CASCADE
        );
        """)

        # Simplified preprocessed text for NLP and topic modeling
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS preprocessed_text (
            id INTEGER PRIMARY KEY,
            qa_pair_id INTEGER NOT NULL,
            text_type TEXT NOT NULL,         -- 'question', 'answer', or 'combined'
            preprocessing_level TEXT NOT NULL, -- 'processed'
            processed_text TEXT NOT NULL,
            processing_metadata TEXT,        -- JSON with additional processing info
            FOREIGN KEY (qa_pair_id) REFERENCES qa_pairs (id) ON DELETE CASCADE,
            UNIQUE (qa_pair_id, text_type, preprocessing_level)
        );
        """)

        # Credit risk mention keywords
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS credit_risk_keywords (
            id INTEGER PRIMARY KEY,
            keyword TEXT UNIQUE NOT NULL,
            metric_category TEXT NOT NULL,  -- 'Credit Risk'
            relevance_score REAL            -- Indicates how strongly the keyword relates to the category
        );
        """)

        # Credit risk mentions in conversations with sentiment
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS credit_risk_mentions (
            id INTEGER PRIMARY KEY,
            conversation_id INTEGER NOT NULL,
            keyword_id INTEGER NOT NULL,
            mention_count INTEGER NOT NULL,
            mention_context TEXT,  -- Sample of text containing the mention
            positive_score REAL NOT NULL,
            negative_score REAL NOT NULL,
            neutral_score REAL NOT NULL,
            compound_score REAL NOT NULL, -- -1 to 1 scale
            FOREIGN KEY (conversation_id) REFERENCES analyst_conversations (id) ON DELETE CASCADE,
            FOREIGN KEY (keyword_id) REFERENCES credit_risk_keywords (id) ON DELETE CASCADE,
            UNIQUE (conversation_id, keyword_id)
        );
        """)

        # Sentiment analysis at conversation level
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS conversation_sentiment (
            id INTEGER PRIMARY KEY,
            conversation_id INTEGER NOT NULL,
            positive_score REAL NOT NULL,
            negative_score REAL NOT NULL,
            neutral_score REAL NOT NULL,
            compound_score REAL NOT NULL, -- -1 to 1 scale
            FOREIGN KEY (conversation_id) REFERENCES analyst_conversations (id) ON DELETE CASCADE,
            UNIQUE (conversation_id)
        );
        """)

        # Sentiment analysis per speaker per topic
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS speaker_topic_sentiment (
            id INTEGER PRIMARY KEY,
            conversation_id INTEGER NOT NULL,
            speaker_name TEXT NOT NULL,
            speaker_role TEXT NOT NULL,
            positive_score REAL NOT NULL,
            negative_score REAL NOT NULL,
            neutral_score REAL NOT NULL,
            compound_score REAL NOT NULL, -- -1 to 1 scale
            FOREIGN KEY (conversation_id) REFERENCES analyst_conversations (id) ON DELETE CASCADE,
            UNIQUE (conversation_id, speaker_name, speaker_role)
        );
        """)

        # Vector embeddings table (for FAISS integration)
        cursor.execute("""
        CREATE TABLE vector_embeddings (
            id INTEGER PRIMARY KEY,
            preprocessed_text_id INTEGER NOT NULL,
            embedding_model TEXT NOT NULL,
            preprocessing_level TEXT NOT NULL,
            vector_data BLOB,
            chunk_text TEXT,
            metadata TEXT,
            FOREIGN KEY (preprocessed_text_id) REFERENCES preprocessed_text (id) ON DELETE CASCADE,
            UNIQUE (preprocessed_text_id, embedding_model, preprocessing_level)
        );
        """)

        # Create faiss_indices table if not exists
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS faiss_indices (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            embedding_dim INTEGER NOT NULL,
            model_name TEXT NOT NULL,
            config TEXT,  -- JSON with index parameters
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """)

        # Create search_queries table if not exists
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS search_queries (
            id INTEGER PRIMARY KEY,
            query_text TEXT NOT NULL,
            embedding BLOB,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)
        """)

        # Create search_results table if not exists
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS search_results (
            id INTEGER PRIMARY KEY,
            query_id INTEGER NOT NULL,
            qa_pair_id INTEGER NOT NULL,
            semantic_score REAL,
            lexical_score REAL,
            reranker_score REAL,
            final_score REAL,
            FOREIGN KEY (query_id) REFERENCES search_queries (id),
            FOREIGN KEY (qa_pair_id) REFERENCES qa_pairs (id)
        );
        """)

        conn.commit()
        conn.close()
        self.logger.info(f"Database initialized at {self.db_path}")

    def get_llm_cleaned_entries_for_processing(
        self,
        limit: Optional[int] = None,
    ) -> List[Dict]:
        """
        Get entries with llm_cleaned level but missing tokenized level

        Args:
            limit: Optional limit on number of entries to retrieve

        Returns:
            List of dictionaries with qa_pair_id, text_type, and processed_text
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            # Find QA pairs with llm_cleaned but missing other preprocessing levels
            query = """
            SELECT DISTINCT pt.qa_pair_id, pt.text_type, pt.processed_text
            FROM preprocessed_text pt
            WHERE pt.preprocessing_level = 'llm_cleaned'
            AND NOT EXISTS (
                SELECT 1 FROM preprocessed_text pt2
                WHERE pt2.qa_pair_id = pt.qa_pair_id
                AND pt2.text_type = pt.text_type
                AND pt2.preprocessing_level = 'tokenized'
            )
            """

            if limit:
                query += f" LIMIT {limit}"

            cursor.execute(query)
            rows = cursor.fetchall()
            return self.cursor_to_dict(cursor, rows)

        except Exception as e:
            self.logger.error(f"Error retrieving llm_cleaned entries: {str(e)}")
            return []

        finally:
            conn.close()

    # Bank Methods
    def add_bank(self, bank_name: str) -> int:
        """
        Add a new bank to the database

        Args:
            bank_name: Name of the bank

        Returns:
            ID of the newly created bank
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                "INSERT OR IGNORE INTO banks (bank_name) VALUES (?)",
                (bank_name,),
            )

            conn.commit()
            if cursor.rowcount == 0:
                # Bank already exists, get its ID
                cursor.execute("SELECT id FROM banks WHERE bank_name = ?", (bank_name,))
                return cursor.fetchone()[0]
            return cursor.lastrowid

        except Exception as e:
            conn.rollback()
            self.logger.error(f"Error adding bank: {str(e)}")
            raise

        finally:
            conn.close()

    def get_all_banks(self) -> List[Dict]:
        """
        Get all banks from the database

        Returns:
            List of dictionaries with bank information
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        try:
            cursor.execute("SELECT id, bank_name FROM banks ORDER BY bank_name")
            rows = cursor.fetchall()
            return self.cursor_to_dict(cursor, rows)

        except Exception as e:
            self.logger.error(f"Error retrieving banks: {str(e)}")
            return []

        finally:
            conn.close()

    # Report Methods
    def add_report(
        self,
        bank_id: int,
        report_name: str,
        year: int,
        quarter: str,
        report_date: Optional[str] = None,
    ) -> Optional[int]:
        """
        Add a new report to the database

        Args:
            bank_id: ID of the bank
            report_name: Name of the report
            year: Year of the report
            quarter: Quarter of the report (Q1, Q2, Q3, Q4)
            report_date: Date of the report in ISO format (YYYY-MM-DD)

        Returns:
            ID of the newly created report
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                INSERT OR REPLACE INTO reports
                (bank_id, report_name, year, quarter, report_date)
                VALUES (?, ?, ?, ?, ?)
                """,
                (bank_id, report_name, year, quarter, report_date),
            )

            report_id = cursor.lastrowid
            conn.commit()
            return report_id

        except Exception as e:
            conn.rollback()
            self.logger.error(f"Error adding report: {str(e)}")
            raise

        finally:
            conn.close()

    def delete_report_data(self, report_id: int):
        """
        Delete all data associated with a report

        Args:
            report_id: ID of the report
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                DELETE FROM reports WHERE id = ?
                """,
                (report_id,),
            )

            conn.commit()
            self.logger.info(f"Deleted all data for report ID {report_id}")

        except Exception as e:
            conn.rollback()
            self.logger.error(f"Error deleting report data: {str(e)}")
            raise

        finally:
            conn.close()

    def get_reports_by_bank(self, bank_id: int) -> List[Dict]:
        """
        Get all reports for a specific bank

        Args:
            bank_id: ID of the bank

        Returns:
            List of dictionaries with report information
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                SELECT id, report_name, year, quarter, report_date
                FROM reports
                WHERE bank_id = ?
                ORDER BY year DESC, quarter DESC
                """,
                (bank_id,),
            )

            rows = cursor.fetchall()
            return self.cursor_to_dict(cursor, rows)

        except Exception as e:
            self.logger.error(f"Error retrieving reports: {str(e)}")
            return []

        finally:
            conn.close()

    def get_report_id(self, bank_name: str, year: int, quarter: str) -> Optional[int]:
        """
        Get report ID for a specific bank, year, and quarter

        Args:
            bank_name: Name of the bank
            year: Report year
            quarter: Report quarter

        Returns:
            Report ID if found, None otherwise
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()
        try:
            cursor.execute(
                """
                SELECT r.id
                FROM reports r, banks b
                WHERE r.bank_id = b.id AND b.bank_name = ?  AND year = ? AND quarter = ?
                LIMIT 1
                """,
                (bank_name, year, quarter),
            )
            row = cursor.fetchone()
            if row:
                return self.cursor_to_dict(cursor, [row])[0]["id"]
            return None
        except Exception as e:
            self.logger.error(f"Error retrieving report ID: {str(e)}")
            return None
        finally:
            conn.close()

    def get_report_by_id(self, id: int) -> Dict:
        """
        Get report by id.

        Args:
            report_id: Report Id

        Returns:
            Report if found, None otherwise
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()
        try:
            cursor.execute(
                """
                SELECT r.id, r.year, r.quarter, b.bank_name
                FROM reports r, banks b
                WHERE r.bank_id = b.id AND r.id = ?
                LIMIT 1
                """,
                (id,),
            )
            row = cursor.fetchone()
            if row:
                return self.cursor_to_dict(cursor, [row])[0]
            return None
        except Exception as e:
            self.logger.error(f"Error retrieving report: {str(e)}")
            return None
        finally:
            conn.close()

    def get_bank_by_report(self, report_id: int) -> Optional[str]:
        """
        Get report ID for a specific bank, year, and quarter

        Args:
            report_id: Report Id

        Returns:
            Bank Name if found, None otherwise
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()
        try:
            cursor.execute(
                """
                SELECT b.bank_name
                FROM reports r, banks b
                WHERE r.bank_id = b.id AND r.id = ?
                LIMIT 1
                """,
                (report_id,),
            )
            row = cursor.fetchone()
            if row:
                return self.cursor_to_dict(cursor, [row])[0]["bank_name"]
            return None
        except Exception as e:
            self.logger.error(f"Error retrieving bank name: {str(e)}")
            return None
        finally:
            conn.close()

    # Metrics Methods
    def add_metric(
        self,
        report_id: int,
        metric_name: str,
        metric_value: float,
        formatted_value: Optional[str] = None,
        in_typical_range: Optional[bool] = None,
    ) -> int:
        """
        Add a financial metric to a report

        Args:
            report_id: ID of the report
            metric_name: Name of the metric
            metric_value: Numeric value of the metric
            formatted_value: Formatted string representation of the value
            in_typical_range: Whether the value is within typical range

        Returns:
            ID of the newly created metric
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                INSERT OR REPLACE INTO metrics
                (report_id, metric_name, metric_value, formatted_value, in_typical_range)
                VALUES (?, ?, ?, ?, ?)
                """,
                (
                    report_id,
                    metric_name,
                    metric_value,
                    formatted_value,
                    in_typical_range,
                ),
            )

            metric_id = cursor.lastrowid
            conn.commit()
            return metric_id

        except Exception as e:
            conn.rollback()
            self.logger.error(f"Error adding metric: {str(e)}")
            raise

        finally:
            conn.close()

    def get_metrics_by_report(self, report_id: int) -> List[Dict]:
        """
        Get all metrics for a specific report

        Args:
            report_id: ID of the report

        Returns:
            List of dictionaries with metric information
        """
        if not report_id:
            return []
        conn = self.get_db_connection()
        cursor = conn.cursor()
        try:
            cursor.execute(
                """
                SELECT id, metric_name, metric_value, formatted_value, in_typical_range
                FROM metrics
                WHERE report_id = ?
                ORDER BY metric_name
                """,
                (report_id,),
            )

            rows = cursor.fetchall()
            return self.cursor_to_dict(cursor, rows)

        except Exception as e:
            self.logger.error(
                f"Error retrieving metrics for report {report_id}: {str(e)}",
            )
            return []

        finally:
            conn.close()

    # Method to get metric value from list of dicts
    def get_metric_value(self, metrics_list: List[Dict], metric_name: str) -> float:
        """Helper to get metric value from list of metric dictionaries"""
        for metric in metrics_list:
            if metric.get("metric_name") == metric_name:
                return metric.get("value", 0)
        return 0

    # Method to get metric formatted_value from list of dicts
    def get_metric_formatted_value(
        self, metrics_list: List[Dict], metric_name: str
    ) -> Optional[str]:
        """Helper to get metric value from list of metric dictionaries"""
        for metric in metrics_list:
            if metric.get("metric_name") == metric_name:
                return metric.get("formatted_value", 0)
        return None

    def generate_enhanced_metrics_summary(self) -> pd.DataFrame:
        """
        Generate an enhanced metrics summary with bank name, year, and quarter columns

        Returns:
            DataFrame with concise metrics and metadata
        """
        conn = self.get_db_connection()

        try:
            # Query to join reports, banks, and metrics
            query = """
            SELECT
                b.bank_name,
                r.year,
                r.quarter,
                r.report_name,
                m.metric_name,
                m.formatted_value
            FROM
                reports r
                JOIN banks b ON r.bank_id = b.id
                JOIN metrics m ON r.id = m.report_id
            ORDER BY
                b.bank_name,
                r.year DESC,
                r.quarter DESC,
                r.report_name
            """

            # Execute query and get results
            df = pd.read_sql_query(query, conn)

            # Pivot metrics to columns
            summary_df = df.pivot_table(
                index=["bank_name", "year", "quarter", "report_name"],
                columns="metric_name",
                values="formatted_value",
                aggfunc="first",
            ).reset_index()

            return summary_df

        except Exception as e:
            self.logger.error(f"Error generating enhanced summary: {str(e)}")
            return pd.DataFrame()

        finally:
            conn.close()

    # Conversation Methods
    def add_analyst_conversation(
        self,
        report_id: int,
        analyst_name: str,
        analyst_company: Optional[str] = None,
        topic_id: Optional[int] = None,
        topic_probability: Optional[float] = None,
    ) -> int:
        """
        Add a new analyst conversation to the database

        Args:
            report_id: ID of the report
            analyst_name: Name of the analyst
            analyst_company: Company of the analyst
            topic_id: ID of the main topic
            topic_probability: Probability of the topic

        Returns:
            ID of the newly created conversation
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                INSERT INTO analyst_conversations
                (report_id, analyst_name, analyst_company, topic_id, topic_probability)
                VALUES (?, ?, ?, ?, ?)
                """,
                (report_id, analyst_name, analyst_company, topic_id, topic_probability),
            )

            conversation_id = cursor.lastrowid
            conn.commit()
            return conversation_id

        except Exception as e:
            conn.rollback()
            self.logger.error(f"Error adding conversation: {str(e)}")
            raise

        finally:
            conn.close()

    def add_qa_pair(
        self,
        conversation_id: int,
        question: str,
        answer: str,
        answer_speaker: str,
        answer_role: str,
    ) -> int:
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                INSERT INTO qa_pairs
                (conversation_id, question, answer, answer_speaker, answer_role)
                VALUES (?, ?, ?, ?, ?)
                """,
                (conversation_id, question, answer, answer_speaker, answer_role),
            )

            qa_pair_id = cursor.lastrowid
            conn.commit()
            return qa_pair_id

        except Exception as e:
            conn.rollback()
            self.logger.error(f"Error adding QA pair: {str(e)}")
            raise

        finally:
            conn.close()

    def add_preprocessed_text(
        self,
        qa_pair_id: int,
        text_type: str,
        preprocessing_level: str,
        processed_text: str,
        processing_metadata: Optional[Dict] = None,
    ) -> int:
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            metadata_json = None
            if processing_metadata:
                metadata_json = json.dumps(processing_metadata)

            cursor.execute(
                """
                INSERT OR REPLACE INTO preprocessed_text
                (qa_pair_id, text_type, preprocessing_level, processed_text, processing_metadata)
                VALUES (?, ?, ?, ?, ?)
                """,
                (
                    qa_pair_id,
                    text_type,
                    preprocessing_level,
                    processed_text,
                    metadata_json,
                ),
            )

            preprocessed_id = cursor.lastrowid
            conn.commit()
            return preprocessed_id

        except Exception as e:
            conn.rollback()
            self.logger.error(f"Error adding preprocessed text: {str(e)}")
            raise

        finally:
            conn.close()

    def get_conversations_to_analyze(
        self,
        limit: Optional[int] = None,
        new_only: bool = True,
    ) -> List[Dict]:
        """
        Get conversations for topic analysis

        Args:
            limit: Optional limit on number of conversations
            new_only: Only return conversations without existing topics

        Returns:
            List of conversation dictionaries with metadata
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            if new_only:
                # Get conversations without topics
                cursor.execute(
                    """
                    SELECT ac.id, ac.report_id, ac.analyst_name, ac.analyst_company
                    FROM analyst_conversations ac
                    WHERE ac.topic_id IS NULL
                    ORDER BY ac.id
                    """,
                )
            else:
                # Get all conversations
                cursor.execute(
                    """
                    SELECT ac.id, ac.report_id, ac.analyst_name, ac.analyst_company
                    FROM analyst_conversations ac
                    ORDER BY ac.id
                    """,
                )

            rows = cursor.fetchall()
            conversations = self.cursor_to_dict(cursor, rows)

            if limit and len(conversations) > limit:
                conversations = conversations[:limit]

            return conversations

        except Exception as e:
            self.logger.error(f"Error getting conversations to analyze: {str(e)}")
            return []

        finally:
            conn.close()

    def get_conversation_metadata(self, conversation_id: int) -> Optional[Dict]:
        """
        Get metadata for a conversation

        Args:
            conversation_id: ID of the conversation

        Returns:
            Dictionary with conversation metadata
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                SELECT ac.analyst_name, ac.analyst_company,
                       r.year, r.quarter, b.bank_name
                FROM analyst_conversations ac
                JOIN reports r ON ac.report_id = r.id
                JOIN banks b ON r.bank_id = b.id
                WHERE ac.id = ?
                """,
                (conversation_id,),
            )

            row = cursor.fetchone()
            if row:
                return self.cursor_to_dict(cursor, [row])[0]
            return None

        except Exception as e:
            self.logger.error(f"Error getting conversation metadata: {str(e)}")
            return None

        finally:
            conn.close()

    def update_conversation_topic(
        self,
        conversation_id: int,
        topic_id: int,
        topic_probability: float,
    ) -> bool:
        """
        Update the topic for a conversation

        Args:
            conversation_id: ID of the conversation
            topic_id: ID of the topic
            topic_probability: Probability of the topic

        Returns:
            Success flag
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                UPDATE analyst_conversations
                SET topic_id = ?, topic_probability = ?
                WHERE id = ?
                """,
                (topic_id, topic_probability, conversation_id),
            )

            conn.commit()
            return True

        except Exception as e:
            conn.rollback()
            self.logger.error(f"Error updating conversation topic: {str(e)}")
            return False

        finally:
            conn.close()

    def get_qa_pairs(self, conversation_id: int) -> List[Dict]:
        """
        Get all QA pairs for a conversation

        Args:
            conversation_id: ID of the conversation

        Returns:
            List of QA pair dictionaries
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                SELECT id, question, answer, answer_speaker, answer_role
                FROM qa_pairs
                WHERE conversation_id = ?
                """,
                (conversation_id,),
            )

            rows = cursor.fetchall()
            return self.cursor_to_dict(cursor, rows)

        except Exception as e:
            self.logger.error(f"Error getting QA pairs: {str(e)}")
            return []

        finally:
            conn.close()

    def get_unprocessed_qa_pairs(self, limit: Optional[int] = None) -> List[Dict]:
        """
        Get QA pairs that haven't been processed yet

        Finds QA pairs that don't have any corresponding entries in the
        preprocessed_text table.

        Args:
            limit: Optional limit on number of QA pairs to retrieve

        Returns:
            List of dictionaries with QA pair information
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            # Find QA pairs without any preprocessing
            query = """
            SELECT qp.id, qp.question, qp.answer
            FROM qa_pairs qp
            WHERE NOT EXISTS (
                SELECT 1 FROM preprocessed_text pt
                WHERE pt.qa_pair_id = qp.id
            )
            """

            if limit:
                query += f" LIMIT {limit}"

            cursor.execute(query)
            rows = cursor.fetchall()
            return self.cursor_to_dict(cursor, rows)

        except Exception as e:
            self.logger.error(f"Error retrieving unprocessed QA pairs: {str(e)}")
            return []

        finally:
            conn.close()

    def add_credit_risk_mention(
        self,
        conversation_id: int,
        keyword_id: str,
        mention_count: int,
        mention_contexts: List[str],
        positive_score: float = 0.33,
        negative_score: float = 0.33,
        neutral_score: float = 0.34,
        compound_score: float = 0.0,
    ) -> bool:
        """
        Add a credit risk mention

        Args:
            conversation_id: ID of the conversation
            keyword_id: ID or name of the keyword
            mention_count: Number of mentions
            mention_contexts: List of context excerpts
            positive_score: Positive sentiment score
            negative_score: Negative sentiment score
            neutral_score: Neutral sentiment score
            compound_score: Compound sentiment score

        Returns:
            Success flag
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                INSERT INTO credit_risk_mentions
                (conversation_id, keyword_id, mention_count, mention_context,
                 positive_score, negative_score, neutral_score, compound_score)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                ON CONFLICT (conversation_id, keyword_id)
                DO UPDATE SET
                    mention_count = excluded.mention_count,
                    mention_context = excluded.mention_context,
                    positive_score = excluded.positive_score,
                    negative_score = excluded.negative_score,
                    neutral_score = excluded.neutral_score,
                    compound_score = excluded.compound_score
                """,
                (
                    conversation_id,
                    keyword_id,
                    mention_count,
                    json.dumps(mention_contexts),
                    positive_score,
                    negative_score,
                    neutral_score,
                    compound_score,
                ),
            )
            conn.commit()
            return True

        except Exception as e:
            self.logger.error(f"Error adding credit risk mention: {str(e)}")
            return False

        finally:
            conn.close()

    def add_credit_risk_keyword(
        self,
        keyword: str,
        metric_category: str,
        relevance_score: float = 1.0,
    ) -> int:
        conn = self.get_db_connection()
        cursor = conn.cursor()
        try:
            cursor.execute(
                "INSERT OR IGNORE INTO credit_risk_keywords (keyword, metric_category, relevance_score) VALUES (?, ?, ?)",
                (keyword, metric_category, relevance_score),
            )
            conn.commit()
            return cursor.lastrowid or self.get_credit_risk_keyword_id(keyword)
        except Exception as e:
            self.logger.error(f"Error adding credit risk keyword: {str(e)}")
            raise
        finally:
            conn.close()

    def get_credit_risk_keyword_id(self, keyword: str) -> int:
        conn = self.get_db_connection()
        cursor = conn.cursor()
        try:
            cursor.execute(
                "SELECT id FROM credit_risk_keywords WHERE keyword = ?",
                (keyword,),
            )
            row = cursor.fetchone()
            if row:
                return self.cursor_to_dict(cursor, [row])[0]["id"]
            return None
        finally:
            conn.close()

    def get_all_conversations(self) -> List[Dict]:
        """Get all analyst conversation IDs from the database"""
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                SELECT id, report_id, analyst_name, analyst_company
                FROM analyst_conversations
                ORDER BY id
                """,
            )
            rows = cursor.fetchall()
            return self.cursor_to_dict(cursor, rows)
        except Exception as e:
            self.logger.error(f"Error retrieving conversations: {str(e)}")
            return []
        finally:
            conn.close()

    def get_conversation_text(
        self,
        conversation_id: int,
        level: str = "lemmatized",
    ) -> str:
        """
        Get combined preprocessed text for all QA pairs in a conversation

        Args:
            conversation_id: ID of the conversation
            level: Preprocessing level to retrieve

        Returns:
            Combined preprocessed text for the conversation
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                SELECT pt.processed_text
                FROM preprocessed_text pt
                JOIN qa_pairs qp ON pt.qa_pair_id = qp.id
                WHERE qp.conversation_id = ?
                AND pt.text_type = 'combined'
                AND pt.preprocessing_level = ?
                """,
                (conversation_id, level),
            )
            results = cursor.fetchall()

            if results:
                # Combine all processed texts
                return " ".join([result[0] for result in results])
            self.logger.warning(
                f"No preprocessed text found for conversation {conversation_id}",
            )
            return ""

        except Exception as e:
            self.logger.error(f"Error retrieving conversation text: {str(e)}")
            return ""

        finally:
            conn.close()

    def get_speaker_text(
        self,
        conversation_id: int,
        speaker_name: str,
        speaker_role: str,
        level: str = "lemmatized",
    ) -> str:
        """
        Get all text from a specific speaker in a conversation

        Args:
            conversation_id: ID of the conversation
            speaker_name: Name of the speaker
            speaker_role: Role of the speaker (e.g., "analyst", "executive")
            level: Preprocessing level to retrieve

        Returns:
            Combined preprocessed text for the speaker
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                SELECT pt.processed_text
                FROM preprocessed_text pt
                JOIN qa_pairs qp ON pt.qa_pair_id = qp.id
                WHERE qp.conversation_id = ?
                AND qp.answer_speaker = ?
                AND qp.answer_role = ?
                AND pt.text_type = 'answer'
                AND pt.preprocessing_level = ?
                """,
                (conversation_id, speaker_name, speaker_role, level),
            )
            results = cursor.fetchall()

            if results:
                return " ".join([result[0] for result in results])
            self.logger.warning(
                f"No text found for speaker {speaker_name} in conversation {conversation_id}",
            )
            return ""

        except Exception as e:
            self.logger.error(f"Error retrieving speaker text: {str(e)}")
            return ""

        finally:
            conn.close()

    def get_analyst_questions(
        self,
        conversation_id: int,
        analyst_name: str,
        level: str = "lemmatized",
    ) -> str:
        """
        Get all questions from a specific analyst in a conversation

        Args:
            conversation_id: ID of the conversation
            analyst_name: Name of the analyst
            level: Preprocessing level to retrieve

        Returns:
            Combined preprocessed questions from the analyst
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            # First get all qa_pairs from this analyst
            cursor.execute(
                """
                SELECT qp.id
                FROM qa_pairs qp
                JOIN analyst_conversations ac ON qp.conversation_id = ac.id
                WHERE qp.conversation_id = ?
                AND ac.analyst_name = ?
                """,
                (conversation_id, analyst_name),
            )
            qa_pair_ids = [row[0] for row in cursor.fetchall()]

            if not qa_pair_ids:
                self.logger.warning(
                    f"No questions found for analyst {analyst_name} in conversation {conversation_id}",
                )
                return ""

            # Then get the preprocessed text for these qa_pairs
            question_texts = []
            for qa_pair_id in qa_pair_ids:
                cursor.execute(
                    """
                    SELECT processed_text
                    FROM preprocessed_text
                    WHERE qa_pair_id = ?
                    AND text_type = 'question'
                    AND preprocessing_level = ?
                    """,
                    (qa_pair_id, level),
                )
                result = cursor.fetchone()
                if result:
                    question_texts.append(result[0])

            return " ".join(question_texts)

        except Exception as e:
            self.logger.error(f"Error retrieving analyst questions: {str(e)}")
            return ""

        finally:
            conn.close()

    def get_all_conversation_speakers(
        self,
        conversation_id: int,
    ) -> List[Dict[str, str]]:
        """
        Get all unique speakers in a conversation

        Args:
            conversation_id: ID of the conversation

        Returns:
            List of dictionaries with speaker names and roles
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                SELECT DISTINCT answer_speaker, answer_role
                FROM qa_pairs
                WHERE conversation_id = ?
                """,
                (conversation_id,),
            )

            rows = cursor.fetchall()
            results = self.cursor_to_dict(cursor, rows)

            # Convert to the expected format
            speakers = [
                {"name": row["answer_speaker"], "role": row["answer_role"]}
                for row in results
            ]

            return speakers

        except Exception as e:
            self.logger.error(f"Error retrieving conversation speakers: {str(e)}")
            return []

        finally:
            conn.close()

    def get_bank_conversations(
        self,
        bank_name: str,
        year: int,
        quarter: str,
    ) -> List[int]:
        """Get conversation IDs for a bank's quarterly report"""
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                SELECT ac.id
                FROM analyst_conversations ac
                JOIN reports r ON ac.report_id = r.id
                JOIN banks b ON r.bank_id = b.id
                WHERE b.bank_name = ? AND r.year = ? AND r.quarter = ?
                """,
                (bank_name, year, quarter),
            )
            return [row[0] for row in cursor.fetchall()]

        except Exception as e:
            self.logger.error(f"Error getting bank conversations: {str(e)}")
            return []

        finally:
            conn.close()

    # Sentiment Analysis Methods
    def add_conversation_sentiment(
        self,
        conversation_id: int,
        positive_score: float,
        negative_score: float,
        neutral_score: float,
        compound_score: float,
    ) -> int:
        """
        Add sentiment analysis for a conversation

        Args:
            conversation_id: ID of the conversation
            positive_score: Positive sentiment score
            negative_score: Negative sentiment score
            neutral_score: Neutral sentiment score
            compound_score: Compound sentiment score

        Returns:
            ID of the newly created sentiment entry
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                INSERT OR REPLACE INTO conversation_sentiment
                (conversation_id, positive_score, negative_score, neutral_score, compound_score)
                VALUES (?, ?, ?, ?, ?)
                """,
                (
                    conversation_id,
                    positive_score,
                    negative_score,
                    neutral_score,
                    compound_score,
                ),
            )

            sentiment_id = cursor.lastrowid
            conn.commit()
            return sentiment_id

        except Exception as e:
            conn.rollback()
            self.logger.error(f"Error adding conversation sentiment: {str(e)}")
            raise

        finally:
            conn.close()

    def get_conversation_sentiment(self, conversation_id: int) -> Optional[Dict]:
        """
        Get sentiment analysis for a specific conversation

        Args:
            conversation_id: ID of the conversation

        Returns:
            Dictionary with sentiment information or None if not found
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                SELECT positive_score, negative_score, neutral_score, compound_score
                FROM conversation_sentiment
                WHERE conversation_id = ?
                """,
                (conversation_id,),
            )

            result = cursor.fetchone()
            if result:
                return self.cursor_to_dict(cursor, [result])[0]
            return None

        except Exception as e:
            self.logger.error(f"Error retrieving conversation sentiment: {str(e)}")
            return None

        finally:
            conn.close()

    def add_speaker_sentiment(
        self,
        conversation_id: int,
        speaker_name: str,
        speaker_role: str,
        positive_score: float,
        negative_score: float,
        neutral_score: float,
        compound_score: float,
    ) -> int:
        """
        Add sentiment analysis for a speaker

        Args:
            conversation_id: ID of the conversation
            speaker_name: Name of the speaker
            speaker_role: Role of the speaker
            positive_score: Positive sentiment score
            negative_score: Negative sentiment score
            neutral_score: Neutral sentiment score
            compound_score: Compound sentiment score

        Returns:
            ID of the newly created sentiment entry
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                INSERT INTO speaker_topic_sentiment
                (conversation_id, speaker_name, speaker_role, positive_score, negative_score, neutral_score, compound_score)
                VALUES (?, ?, ?, ?, ?, ?, ?)
                ON CONFLICT (conversation_id, speaker_name, speaker_role)
                DO UPDATE SET
                    positive_score = excluded.positive_score,
                    negative_score = excluded.negative_score,
                    neutral_score = excluded.neutral_score,
                    compound_score = excluded.compound_score
                """,
                (
                    conversation_id,
                    speaker_name,
                    speaker_role,
                    positive_score,
                    negative_score,
                    neutral_score,
                    compound_score,
                ),
            )

            sentiment_id = cursor.lastrowid
            conn.commit()
            return sentiment_id

        except Exception as e:
            conn.rollback()
            self.logger.error(f"Error adding speaker sentiment: {str(e)}")
            raise

        finally:
            conn.close()

    def get_credit_risk_mentions(self, conversation_ids: List[int]) -> List[Dict]:
        """
        Get credit risk mentions for multiple conversations

        Args:
            conversation_ids: List of conversation IDs

        Returns:
            List of dictionaries with mention information
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            # Convert list to string of comma-separated IDs
            placeholders = ",".join("?" * len(conversation_ids))

            query = f"""
                SELECT id, keyword_id, mention_count, mention_context, positive_score, negative_score, neutral_score, compound_score
                FROM credit_risk_mentions
                WHERE conversation_id IN ({placeholders})
            """

            cursor.execute(query, conversation_ids)

            rows = cursor.fetchall()
            return self.cursor_to_dict(cursor, rows)

        except Exception as e:
            self.logger.error(f"Error retrieving credit risk mentions: {str(e)}")
            return []

        finally:
            conn.close()

    def update_credit_risk_mention_sentiment(
        self,
        mention_id: int,
        positive_score: float,
        negative_score: float,
        neutral_score: float,
        compound_score: float,
    ) -> bool:
        """
        Update sentiment scores for a credit risk mention

        Args:
            mention_id: ID of the mention
            positive_score: Positive sentiment score
            negative_score: Negative sentiment score
            neutral_score: Neutral sentiment score
            compound_score: Compound sentiment score

        Returns:
            Success flag
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                UPDATE credit_risk_mentions
                SET positive_score = ?, negative_score = ?, neutral_score = ?, compound_score = ?
                WHERE id = ?
                """,
                (
                    positive_score,
                    negative_score,
                    neutral_score,
                    compound_score,
                    mention_id,
                ),
            )

            conn.commit()
            return True

        except Exception as e:
            conn.rollback()
            self.logger.error(f"Error updating credit risk mention sentiment: {str(e)}")
            return False

        finally:
            conn.close()

    def get_conversations_for_sentiment_analysis(
        self,
        limit: Optional[int] = None,
    ) -> List[Dict]:
        """
        Get conversations that need sentiment analysis

        Args:
            limit: Optional limit on number of conversations

        Returns:
            List of conversation dictionaries with metadata
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            # Get conversations that need sentiment analysis
            query = """
            SELECT DISTINCT ac.id as conversation_id,
                ac.analyst_name, ac.analyst_company,
                r.year, r.quarter, b.bank_name
            FROM analyst_conversations ac
            JOIN reports r ON ac.report_id = r.id
            JOIN banks b ON r.bank_id = b.id
            LEFT JOIN conversation_sentiment cs ON ac.id = cs.conversation_id
            """

            if limit:
                query += f" LIMIT {limit}"

            cursor.execute(query)
            rows = cursor.fetchall()
            self.logger.info(f"Query returned {len(rows)} rows")
            results = self.cursor_to_dict(cursor, rows)
            self.logger.info(f"Converted to {len(results)} dictionaries")
            return results

        except Exception as e:
            self.logger.error(
                f"Error getting conversations for sentiment analysis: {str(e)}",
            )
            return []

        finally:
            conn.close()

    def get_qa_answer(self, qa_pair_id: int) -> Optional[str]:
        """
        Get answer text for a QA pair

        Args:
            qa_pair_id: ID of the QA pair

        Returns:
            Answer text or None if not found
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute("SELECT answer FROM qa_pairs WHERE id = ?", (qa_pair_id,))
            row = cursor.fetchone()
            if row:
                return self.cursor_to_dict(cursor, [row])[0]["answer"]
            return None

        except Exception as e:
            self.logger.error(f"Error getting QA answer: {str(e)}")
            return None

        finally:
            conn.close()

    def get_vector_embeddings(
        self,
        embedding_model: str,
        preprocessing_level: str,
    ) -> List[Dict]:
        """
        Get all vector embeddings for a specific model and preprocessing level

        Args:
            embedding_model: Name of the embedding model
            preprocessing_level: Preprocessing level

        Returns:
            List of dictionaries with embedding information
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                SELECT
                    ve.id, ve.conversation_id, ve.vector_id, ve.vector_data,
                    ve.chunk_text, ve.metadata
                FROM vector_embeddings ve
                WHERE ve.embedding_model = ? AND ve.preprocessing_level = ?
                """,
                (embedding_model, preprocessing_level),
            )

            rows = cursor.fetchall()
            results = self.cursor_to_dict(cursor, rows)

            # Parse JSON metadata
            for data in results:
                if data["metadata"]:
                    data["metadata"] = json.loads(data["metadata"])

            return results

        except Exception as e:
            self.logger.error(f"Error retrieving vector embeddings: {str(e)}")
            return []

        finally:
            conn.close()

    # Topic Methods
    def add_or_update_topic(
        self,
        topic_id: int,
        name: str,
        keywords: List[str],
        category: str,
    ) -> bool:
        """
        Add or update a topic

        Args:
            topic_id: ID of the topic
            name: Name of the topic
            keywords: List of keywords for the topic
            category: Category of the topic

        Returns:
            Success flag
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            # Check if topic exists
            cursor.execute("SELECT id FROM topics WHERE id = ?", (topic_id,))
            if cursor.fetchone():
                # Update existing topic
                cursor.execute(
                    """
                    UPDATE topics
                    SET name = ?, keywords = ?, category = ?
                    WHERE id = ?
                    """,
                    (name, json.dumps(keywords), category, topic_id),
                )
            else:
                # Create new topic
                cursor.execute(
                    """
                    INSERT INTO topics (id, name, keywords, category)
                    VALUES (?, ?, ?, ?)
                    """,
                    (topic_id, name, json.dumps(keywords), category),
                )

            conn.commit()
            return True

        except Exception as e:
            conn.rollback()
            self.logger.error(f"Error adding or updating topic: {str(e)}")
            return False

        finally:
            conn.close()

    def get_topics_by_category(self, category: str) -> List[Dict]:
        """
        Get all topics for a specific category

        Args:
            category: Category of topics to retrieve

        Returns:
            List of dictionaries with topic information
        """
        conn = self.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                SELECT id, name, keywords, category
                FROM topics
                WHERE category = ?
                ORDER BY name
                """,
                (category,),
            )

            rows = cursor.fetchall()
            result = self.cursor_to_dict(cursor, rows)

            # Parse JSON keywords
            for topic in result:
                topic["keywords"] = json.loads(topic["keywords"])

            return result

        except Exception as e:
            self.logger.error(f"Error retrieving topics: {str(e)}")
            return []

        finally:
            conn.close()

    def upload_to_gdrive(self, file_path: str, folder_id=None):
        """
        Upload a file to Google Drive

        Args:
            file_path: Path to the local file
            folder_id: Optional Google Drive folder ID to upload to

        Returns:
            file_id: ID of the uploaded file
        """
        try:
            # Create timestamp for versioning
            base_path = os.path.splitext(os.path.basename(file_path))[0]
            timestamp = datetime.now().strftime("%Y%m%d")
            versioned_filename = f"{base_path}_processed_{timestamp}.db"

            # File metadata
            file_metadata = {
                "title": versioned_filename,
                "mimeType": "application/x-sqlite3",  # for SQLite DB files
            }

            # If folder_id is specified, add it to metadata
            if folder_id:
                file_metadata["parents"] = [{"id": folder_id}]

            # Create file instance
            file = drive.CreateFile(file_metadata)

            # Set content (using the original file path)
            file.SetContentFile(file_path)

            # Upload
            file.Upload()

            print(
                f"Successfully uploaded {file_path} to Google Drive as {versioned_filename}",
            )
            print(f"File ID: {file['id']}")

            return file["id"]

        except Exception as e:
            print(f"Error uploading file: {str(e)}")
            return None


# Initialize database
db = GSIBDatabase(DB_PATH)

2025-03-28 17:06:22,968 - GSIBDatabase - INFO - WAL mode enabled for database
2025-03-28 17:06:22,968 - GSIBDatabase - INFO - WAL mode enabled for database


## 🎙️ Earnings Call Transcript Processor

In [None]:
class ClaudeTranscriptProcessor:
    """
    Transcript processor using Claude API for enhanced text analysis.
    Integrates with GSIBDatabase for persistent storage.
    Supports both direct message and batch processing.
    """

    def __init__(self, db, api_key=None, model="claude-3-sonnet-20240229"):
        """Initialize the Claude Transcript Processor"""
        self.logger = logging.getLogger("ClaudeTranscriptProcessor")
        self.logger.setLevel(logging.INFO)
        self.db = db

        # Initialize Claude client
        self.api_key = api_key or os.environ.get("ANTHROPIC_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Claude API key must be provided or set as ANTHROPIC_API_KEY environment variable"
            )
        self.claude = self._get_anthropic_client()

        # Set model
        self.model = model

        # Constants
        self.unidentified_analyst_name = "Unidentified Analyst"
        self.unknown_firm = "Unknown firm"
        self.host_operator_text = "Operator"

    def _get_anthropic_client(self) -> anthropic.Anthropic:
        """Get Anthropic client with API key"""
        return anthropic.Anthropic(api_key=self.api_key)

    def _read_pdf(self, file_path: str) -> List[str]:
        """Read and extract text from PDF file."""
        self.logger.info(f"Reading PDF from {file_path}")

        try:
            # Open the PDF file
            doc = fitz.open(file_path)
            text_lines = []

            for page in doc:
                # Extract text with layout preservation
                rect = page.rect
                text = page.get_text("text", flags=4)  # preserve layout

                # Split into lines, strip whitespace, and filter empty lines
                page_lines = [t.strip() for t in text.split("\n") if t.strip()]

                # Skip footer lines and known patterns
                footer_pattern = re.compile(
                    r"(?:Page \d+ of \d+$|https://seekingalpha\.com/article/\d+(?:[-\w]+)*)"
                )
                page_lines = [
                    line for line in page_lines if not footer_pattern.search(line)
                ]

                text_lines.extend(page_lines)

            self.logger.info(
                f"Extracted {len(text_lines)} clean lines from {file_path}"
            )
            return text_lines

        except Exception as e:
            self.logger.error(f"Error processing {file_path}: {str(e)}")
            raise

    def create_chunks(
        self, text: str, chunk_size: int = 3000, overlap: int = 250
    ) -> List[str]:
        """Create simple overlapping chunks from text."""
        chunks = []
        start = 0
        text_length = len(text)

        while start < text_length:
            # Calculate end position for current chunk
            end = min(start + chunk_size, text_length)

            # Include overlap unless at the end
            if end < text_length:
                end = min(end + overlap, text_length)

            # Extract chunk
            chunk = text[start:end].strip()
            if chunk:  # Only add non-empty chunks
                chunks.append(chunk)

            # Move start position, accounting for overlap
            start = end - overlap if end < text_length else text_length

        return chunks

    def _get_metadata_extraction_prompt(self) -> List[Dict]:
        """System prompt for metadata extraction with strict JSON formatting"""
        return [
            {
                "type": "text",
                "text": """You are a JSON extraction tool. Your only job is to extract metadata as JSON.

    Extract all participant information from this transcript chunk in these categories:
    1. Bank name
    2. Quarter (Q1, Q2, Q3, Q4) and Year
    3. Bank executive participants with their roles
    4. Analyst participants with their firms

    CRUCIAL INSTRUCTIONS:
    - RESPOND WITH NOTHING BUT VALID JSON. No explanations, no descriptions, no "Based on the transcript" phrases.
    - If information is missing, use empty strings or empty arrays.
    - Never include comments or backticks in your response.

    FORMAT:
    {
        "bank_name": "string",
        "quarter": "string",
        "year": "string",
        "executives": [
            {"name": "string", "role": "string"}
        ],
        "analysts": [
            {"name": "string", "firm": "string"}
        ]
    }""",
            },
        ]

    def _get_metadata_processing_prompt(self) -> List[Dict]:
        """System prompt for consolidating metadata with strict JSON formatting"""
        return [
            {
                "type": "text",
                "text": """You are a JSON processing tool. Your only purpose is to consolidate JSON metadata.

    Process this list of transcript participants using these rules:

    1. Name Consolidation:
    - For each person, find and use their most complete name from any mention
    - Example: If you see "Pam" and later "Pam Kaur", use "Pam Kaur"
    - Match variations like "Ben" and "Benjamin"

    2. Role and Firm:
    - Use the first role found for a person
    - Keep the firm if it appears with role
    - For analysts, keep their research firm

    3. Categorization:
    - Executives: Anyone with a role at the main bank
    - Analysts: People from external firms

    4. Bank and Quarter:
    - Determine the most likely bank name, quarter, and year from the data
    - For bank names, standardize to these formats when applicable:
        - "JPMorgan Chase", "Bank of America", "Citigroup", "Goldman Sachs",
        - "Morgan Stanley", "HSBC", "Barclays", "UBS", "Santander"

    CRUCIAL INSTRUCTIONS:
    - RESPOND WITH NOTHING BUT VALID JSON. No explanations, no descriptions, no "Here is the consolidated data" phrases.
    - Never include comments, backticks, or markdown in your response.
    - The first character of your response must be "{" and the last must be "}".

    FORMAT:
    {
        "bank_name": "Main bank name",
        "quarter": "Q1/Q2/Q3/Q4",
        "year": "YYYY",
        "executives": [
            {"name": "Most complete name found", "role": "Role found"}
        ],
        "analysts": [
            {"name": "Most complete name found", "firm": "Research firm"}
        ]
    }""",
            },
        ]

    def _get_qa_analysis_prompt(self, metadata: str) -> List[Dict]:
        """System prompt for Q&A pair extraction with improved JSON formatting constraints"""
        return [
            {
                "type": "text",
                "text": f"""You are a JSON extraction API that outputs only valid JSON. You must analyze financial transcript chunks and extract structured Q&A pairs.

    TRANSCRIPT METADATA:
    {metadata}

    USE EXACTLY THESE STANDARDIZED BANK NAMES:
    - "JPMorgan Chase" (not JPMorgan Chase & Co, JP Morgan, or similar variants)
    - "Bank of America" (not Bank of America Corporation or BofA)
    - "Citigroup" (not Citibank or Citi)
    - "Goldman Sachs" (not Goldman Sachs Group)
    - "Morgan Stanley" (not Morgan Stanley & Co)
    - "HSBC" (not HSBC Holdings or HSBC Bank)
    - "Barclays" (not Barclays PLC)
    - "UBS" (not UBS Group or UBS AG)
    - "Santander" (not Banco Santander)""",
            },
            {
                "type": "text",
                "text": """EXTRACTION INSTRUCTIONS:
    1. If the chunk contains financial transcript Q&A pairs, extract them as structured JSON
    2. If the chunk contains ONLY prepared remarks, introduction, or no Q&A content, return valid JSON with empty qa_pairs array
    3. NEVER include the transcript title, date, or other metadata as text in your output
    4. NEVER refer to the "transcript chunk" in your response
    5. DO NOT describe what you're doing - ONLY OUTPUT VALID JSON

    EXTRACTION RULES:
    - Extract only COMPLETE Q&A exchanges between analysts and executives
    - Identify the analyst name and their company
    - Preserve the exact verbatim question text
    - Include each executive's name, role, and verbatim answer
    - Skip operator introductions and transitions
    - Only extract text actually present in the transcript

    HANDLING PARTIAL SEGMENTS:
    - If a question is cut off mid-sentence, DO NOT include it
    - If an answer is cut off mid-sentence, DO NOT include it
    - If you find incomplete exchanges, ignore them completely
    - NEVER return raw transcript text in the output
    - If chunk begins mid-answer or mid-question, skip that partial exchange
    - DO NOT try to complete partial text with your own words
    - If no complete Q&A pairs can be found, return JSON with empty qa_pairs array

    CRITICAL FORMAT RULES:
    - First character must be "{" and last character must be "}"
    - No text before or after the JSON object
    - No explanation, even if you find no Q&A pairs
    - No backticks, no markdown formatting
    - If the transcript chunk contains no Q&A, still output valid JSON with empty qa_pairs array""",
            },
            {
                "type": "text",
                "text": """OUTPUT FORMAT (exactly as shown):
    {
        "metadata": {
            "year": "YYYY",
            "quarter": "QX",
            "company": "STANDARDIZED BANK NAME FROM THE LIST ABOVE"
        },
        "qa_pairs": [
            {
                "qa_pair": {
                    "analyst_name": "Full analyst name",
                    "analyst_company": "Analyst firm",
                    "questions": ["Full verbatim question text including all parts"],
                    "answers": [
                        {
                            "speaker_name": "Executive name",
                            "speaker_role": "Executive role",
                            "answer_text": "Full verbatim answer text"
                        }
                    ]
                }
            }
        ]
    }

    If no complete Q&A pairs can be extracted, return:
    {
        "metadata": {
            "year": "YYYY",
            "quarter": "QX",
            "company": "STANDARDIZED BANK NAME FROM THE LIST ABOVE"
        },
        "qa_pairs": []
    }""",
            },
        ]

    async def extract_metadata_direct(
        self, transcript: str, chunk_size: int = 3000, overlap: int = 200
    ) -> Dict:
        """
        Extract metadata using direct Claude API calls

        Args:
            transcript: Full transcript text
            chunk_size: Size of each chunk to process
            overlap: Overlap between chunks

        Returns:
            Dictionary of consolidated metadata
        """
        # Create overlapping chunks for processing
        chunks = self.create_chunks(transcript, chunk_size=chunk_size, overlap=overlap)
        self.logger.info(
            f"Processing {len(chunks)} chunks for metadata extraction (direct method)"
        )

        # Process each chunk directly
        all_chunk_results = []
        for i, chunk in enumerate(chunks):
            try:
                # Create direct message request
                response = await asyncio.to_thread(
                    self.claude.messages.create,
                    model=self.model,
                    max_tokens=2048,
                    system=self._get_metadata_extraction_prompt(),
                    messages=[
                        {
                            "role": "user",
                            "content": f"Extract participant info from this chunk:\n\n{chunk}",
                        },
                    ],
                )

                chunk_result = response.content[0].text.strip()
                try:
                    # Validate it's proper JSON
                    json_result = json.loads(chunk_result)
                    all_chunk_results.append(chunk_result)
                    self.logger.info(
                        f"Successfully processed metadata chunk {i + 1}/{len(chunks)}"
                    )
                except json.JSONDecodeError:
                    self.logger.warning(f"Received non-JSON response from chunk {i}")

            except Exception as e:
                self.logger.error(f"Error processing metadata chunk {i}: {e}")

        # Combine all chunk results into a single JSON
        if all_chunk_results:
            chunk_results_json = (
                '{"metadata_chunks": [' + ",".join(all_chunk_results) + "]}"
            )

            # Process and consolidate the metadata
            return await self._process_metadata_direct(chunk_results_json)
        return {}

    async def _process_metadata_direct(self, raw_metadata: str) -> Dict:
        """
        Process the collected raw metadata to get final consolidated metadata (direct method)

        Args:
            raw_metadata: JSON string of collected metadata chunks

        Returns:
            Dictionary of consolidated metadata
        """
        if not raw_metadata.strip():
            self.logger.warning("No metadata to process")
            return {}

        try:
            # Process metadata with Claude directly
            response = await asyncio.to_thread(
                self.claude.messages.create,
                model=self.model,
                max_tokens=2048,
                system=self._get_metadata_processing_prompt(),
                messages=[
                    {
                        "role": "user",
                        "content": f"Process this JSON of transcript participants:\n\n{raw_metadata}",
                    },
                ],
            )

            result = response.content[0].text
            try:
                # Parse the response as JSON
                metadata = json.loads(result)
                self.logger.info(
                    f"Processed metadata for {metadata.get('bank_name', 'Unknown Bank')}"
                )
                return metadata
            except json.JSONDecodeError:
                self.logger.error(
                    "Failed to parse metadata JSON from Claude's response"
                )
                return {}

        except Exception as e:
            self.logger.error(f"Error processing metadata: {e}", exc_info=True)
            return {}

    async def process_qa_direct(
        self,
        transcript: str,
        metadata: Dict,
        chunk_size: int = 4000,
        overlap: int = 300,
    ) -> Dict:
        """
        Process transcript to extract structured Q&A pairs using direct Claude API calls

        Args:
            transcript: Full transcript text
            metadata: Processed metadata dictionary
            chunk_size: Size of each chunk to process
            overlap: Overlap between chunks

        Returns:
            Dict with metadata and Q&A pairs
        """
        # Create JSON string from metadata for Claude's context
        metadata_json = json.dumps(metadata, indent=2)

        # Create overlapping chunks for processing
        chunks = self.create_chunks(transcript, chunk_size=chunk_size, overlap=overlap)
        self.logger.info(
            f"Processing {len(chunks)} chunks for Q&A extraction (direct method)"
        )

        # Process each chunk directly
        chunk_results = []
        for i, chunk in enumerate(chunks):
            try:
                # Create direct message request
                response = await asyncio.to_thread(
                    self.claude.messages.create,
                    model=self.model,
                    max_tokens=4096,  # Larger tokens for detailed Q&A extraction
                    system=self._get_qa_analysis_prompt(metadata_json),
                    messages=[
                        {
                            "role": "user",
                            "content": f"Extract all Q&A pairs from this transcript chunk. DO NOT RETURN THE RAW TEXT. ONLY RETURN JSON:\n\n{chunk}",
                        },
                    ],
                )

                # Parse the JSON response
                content = response.content[0].text
                try:
                    parsed_result = json.loads(content)
                    chunk_results.append(parsed_result)
                    self.logger.info(
                        f"Successfully processed Q&A chunk {i + 1}/{len(chunks)}"
                    )
                except json.JSONDecodeError as e:
                    self.logger.error(
                        f"Failed to parse JSON for Q&A chunk {i + 1}: {e}"
                    )
                    self.logger.debug(f"First 200 chars of response: {content[:200]}")

            except Exception as e:
                self.logger.error(f"Error processing Q&A chunk {i}: {e}")

        # Merge results from all chunks
        return self._merge_qa_results(chunk_results)

    async def extract_transcript_metadata(
        self,
        transcript: str,
        chunk_size: int = 3000,
        overlap: int = 200,
        use_batch: bool = False,
    ) -> Dict:
        """
        Extract metadata using either batch or direct processing with Claude API

        Args:
            transcript: Full transcript text
            chunk_size: Size of each chunk to process
            overlap: Overlap between chunks
            use_batch: Whether to use batch processing (default: False)

        Returns:
            Dictionary of consolidated metadata
        """
        if use_batch:
            # Create overlapping chunks for processing
            chunks = self.create_chunks(
                transcript, chunk_size=chunk_size, overlap=overlap
            )
            self.logger.info(f"Processing {len(chunks)} chunks for metadata extraction")

            # Create batch requests for Claude API
            requests = []
            for i, chunk in enumerate(chunks):
                request = Request(
                    custom_id=f"chunk_{i}",
                    params=MessageCreateParamsNonStreaming(
                        model=self.model,
                        max_tokens=2048,
                        system=self._get_metadata_extraction_prompt(),
                        messages=[
                            {
                                "role": "user",
                                "content": f"Extract participant info from this chunk:\n\n{chunk}",
                            },
                        ],
                    ),
                )
                requests.append(request)

            # Process batch requests
            try:
                message_batch = self.claude.messages.batches.create(requests=requests)
                batch_id = message_batch.id
                self.logger.info(
                    f"Created batch {batch_id} with {len(chunks)} requests"
                )

                # Monitor batch processing until complete
                while True:
                    batch_status = self.claude.messages.batches.retrieve(batch_id)
                    counts = batch_status.request_counts
                    self.logger.info(
                        f"Batch status: Processing={counts.processing}, "
                        f"Succeeded={counts.succeeded}, Errored={counts.errored}",
                    )

                    if batch_status.processing_status == "ended":
                        break
                    time.sleep(5)  # Check status every 5 seconds

                # Collect results from all chunks
                all_chunk_results = []
                for result in self.claude.messages.batches.results(batch_id):
                    if result.result.type == "succeeded":
                        # Parse response JSON
                        chunk_result = result.result.message.content[0].text.strip()
                        try:
                            # Validate it's proper JSON
                            json_result = json.loads(chunk_result)
                            all_chunk_results.append(chunk_result)
                        except json.JSONDecodeError:
                            self.logger.warning(
                                f"Received non-JSON response from chunk {result.custom_id}"
                            )
                    else:
                        self.logger.error(
                            f"Error in chunk {result.custom_id}: {result.result.error}"
                        )

                # Combine all chunk results into a single JSON
                chunk_results_json = (
                    '{"metadata_chunks": [' + ",".join(all_chunk_results) + "]}"
                )

                # Process and consolidate the metadata
                return self._process_metadata_raw(chunk_results_json)

            except Exception as e:
                self.logger.error(f"Error in batch processing: {e}", exc_info=True)
                return {}
        else:
            # Use direct method
            return await self.extract_metadata_direct(transcript, chunk_size, overlap)

    def _process_metadata_raw(self, raw_metadata: str) -> Dict:
        """
        Process the collected raw metadata to get final consolidated metadata

        Args:
            raw_metadata: JSON string of collected metadata chunks

        Returns:
            Dictionary of consolidated metadata
        """
        if not raw_metadata.strip():
            self.logger.warning("No metadata to process")
            return {}

        try:
            # Process metadata with Claude to consolidate and standardize
            response = self.claude.messages.create(
                model=self.model,
                max_tokens=4096,
                system=self._get_metadata_processing_prompt(),
                messages=[
                    {
                        "role": "user",
                        "content": f"Process this JSON of transcript participants:\n\n{raw_metadata}",
                    },
                ],
            )

            result = response.content[0].text
            try:
                # Parse the response as JSON
                metadata = json.loads(result)
                self.logger.info(
                    f"Processed metadata for {metadata.get('bank_name', 'Unknown Bank')}"
                )
                return metadata
            except json.JSONDecodeError:
                self.logger.error(
                    "Failed to parse metadata JSON from Claude's response"
                )
                return {}

        except Exception as e:
            self.logger.error(f"Error processing metadata: {e}", exc_info=True)
            return {}

    async def process_transcript_qa(
        self,
        transcript: str,
        metadata: Dict,
        chunk_size: int = 4000,
        overlap: int = 300,
        use_batch: bool = False,
    ) -> Dict:
        """
        Process transcript to extract structured Q&A pairs using Claude API

        Args:
            transcript: Full transcript text
            metadata: Processed metadata dictionary
            chunk_size: Size of each chunk to process
            overlap: Overlap between chunks
            use_batch: Whether to use batch processing (default: False)

        Returns:
            Dict with metadata and Q&A pairs
        """
        if use_batch:
            # Create JSON string from metadata for Claude's context
            metadata_json = json.dumps(metadata, indent=2)

            # Create overlapping chunks for processing
            chunks = self.create_chunks(
                transcript, chunk_size=chunk_size, overlap=overlap
            )
            self.logger.info(f"Processing {len(chunks)} chunks for Q&A extraction")

            # Create batch requests for Claude API
            requests = []
            for i, chunk in enumerate(chunks):
                request = Request(
                    custom_id=f"chunk_{i}",
                    params=MessageCreateParamsNonStreaming(
                        model=self.model,
                        max_tokens=4096,  # Larger tokens for detailed Q&A extraction
                        system=self._get_qa_analysis_prompt(metadata_json),
                        messages=[
                            {
                                "role": "user",
                                "content": f"Extract all Q&A pairs from this transcript chunk. DO NOT RETURN THE RAW TEXT. ONLY RETURN JSON:\n\n{chunk}",
                            },
                        ],
                    ),
                )
                requests.append(request)

            # Process batch requests
            try:
                message_batch = self.claude.messages.batches.create(requests=requests)
                batch_id = message_batch.id
                self.logger.info(
                    f"Created batch {batch_id} with {len(chunks)} requests for Q&A extraction"
                )

                # Monitor batch processing until complete
                while True:
                    batch_status = self.claude.messages.batches.retrieve(batch_id)
                    counts = batch_status.request_counts
                    self.logger.info(
                        f"Q&A batch status: Processing={counts.processing}, "
                        f"Succeeded={counts.succeeded}, Errored={counts.errored}",
                    )

                    if batch_status.processing_status == "ended":
                        break
                    time.sleep(5)  # Check status every 5 seconds

                # Collect and process results
                chunk_results = []
                for result in self.claude.messages.batches.results(batch_id):
                    if result.result.type == "succeeded":
                        try:
                            # Parse the JSON response
                            content = result.result.message.content[0].text
                            parsed_result = json.loads(content)
                            chunk_results.append(parsed_result)
                            self.logger.info(
                                f"Successfully processed Q&A chunk {result.custom_id}"
                            )
                        except json.JSONDecodeError as e:
                            self.logger.error(
                                f"Failed to parse JSON for Q&A chunk {result.custom_id}: {e}"
                            )
                    else:
                        self.logger.error(
                            f"Error in Q&A chunk {result.custom_id}: {result.result.error}"
                        )

                # Merge results from all chunks
                return self._merge_qa_results(chunk_results)

            except Exception as e:
                self.logger.error(f"Error in Q&A batch processing: {e}", exc_info=True)
                return {"metadata": {}, "qa_pairs": []}
        else:
            # Use direct method
            return await self.process_qa_direct(
                transcript, metadata, chunk_size, overlap
            )

    def _merge_qa_results(self, results: List[Dict]) -> Dict:
        """
        Merge Q&A results from multiple chunks with deduplication

        Args:
            results: List of chunk results

        Returns:
            Merged results dictionary
        """
        if not results:
            return {"metadata": {}, "qa_pairs": []}

        self.logger.info(f"Merging {len(results)} Q&A batch results")

        # Extract metadata from the most complete chunk
        all_metadata = [r.get("metadata", {}) for r in results if r.get("metadata")]
        final_metadata = {"company": "Unknown", "year": "Unknown", "quarter": "Unknown"}

        # Merge metadata, preferring non-empty values
        for key in ["company", "year", "quarter"]:
            values = [meta.get(key) for meta in all_metadata if meta.get(key)]
            if values:
                final_metadata[key] = values[0]

        # Combine Q&A pairs with deduplication
        all_qa_pairs = []
        seen_pairs = {}  # Track duplicates

        for result in results:
            qa_pairs = result.get("qa_pairs", [])
            if not isinstance(qa_pairs, list):
                continue

            for qa_pair in qa_pairs:
                try:
                    qa_data = qa_pair.get("qa_pair", {})
                    if not qa_data:
                        continue

                    # Get key fields
                    analyst_name = qa_data.get("analyst_name", "Unknown Analyst")
                    questions = qa_data.get("questions", [])
                    answers = qa_data.get("answers", [])

                    # Skip if no meaningful content
                    if not questions or not answers:
                        continue

                    # Create hash for deduplication
                    question_text = " ".join(questions)[
                        :100
                    ]  # First 100 chars of question
                    answer_preview = (
                        answers[0].get("answer_text", "")[:100] if answers else ""
                    )
                    qa_hash = hash(f"{analyst_name}_{question_text}_{answer_preview}")

                    if qa_hash not in seen_pairs:
                        # Store original position for sequencing
                        qa_data["original_position"] = len(all_qa_pairs)
                        all_qa_pairs.append({"qa_pair": qa_data})
                        seen_pairs[qa_hash] = len(all_qa_pairs) - 1
                    else:
                        # Compare lengths and take longer version if significant difference
                        existing_idx = seen_pairs[qa_hash]
                        existing_qa = all_qa_pairs[existing_idx]["qa_pair"]

                        # If new version is significantly longer, replace existing
                        new_length = len(str(qa_data))
                        existing_length = len(str(existing_qa))
                        if new_length > existing_length * 1.2:  # 20% longer threshold
                            all_qa_pairs[existing_idx]["qa_pair"] = qa_data
                            self.logger.info("Replaced QA pair with longer version")

                except Exception as e:
                    self.logger.error(f"Error processing QA pair: {e}", exc_info=True)
                    continue

        # Sort by original position to maintain chronological order
        all_qa_pairs.sort(key=lambda x: x["qa_pair"].get("original_position", 0))

        # Remove temporary position field
        for qa_pair in all_qa_pairs:
            qa_pair["qa_pair"].pop("original_position", None)

        self.logger.info(f"Merged {len(all_qa_pairs)} unique QA pairs")
        return {"metadata": final_metadata, "qa_pairs": all_qa_pairs}

    async def process_pdf_transcript(
        self, pdf_path: str, use_batch: bool = False, selected_banks=None
    ) -> Dict:
        """
        Process a single PDF transcript file

        Args:
            pdf_path: Path to the PDF file
            use_batch: Whether to use batch processing (default: False)
            selected_banks: List of banks to process (if None, process all)

        Returns:
            Dictionary with metadata and Q&A pairs
        """
        try:
            # Extract text from PDF
            text_lines = self._read_pdf(pdf_path)
            if not text_lines:
                self.logger.warning(f"No text extracted from {pdf_path}")
                return {"metadata": {}, "qa_pairs": []}

            # Join lines for processing
            transcript_text = "\n".join(text_lines)

            # Extract metadata
            metadata = await self.extract_transcript_metadata(
                transcript_text, use_batch=use_batch
            )
            if not metadata:
                self.logger.warning(f"Failed to extract metadata from {pdf_path}")
                return {"metadata": {}, "qa_pairs": []}

            # Process Q&A pairs
            qa_result = await self.process_transcript_qa(
                transcript_text, metadata, use_batch=use_batch
            )

            return qa_result

        except Exception as e:
            self.logger.error(
                f"Error processing PDF transcript {pdf_path}: {e}", exc_info=True
            )
            return {"metadata": {}, "qa_pairs": []}

    def store_to_database(self, structured_data: Dict) -> int:
        """
        Store structured transcript data to database, always replacing existing data

        Args:
            structured_data: Dictionary with metadata and Q&A pairs

        Returns:
            ID of the report in the database
        """
        self.logger.info("Storing transcript data to database")

        def _save_transcript_to_db(conn):
            # Get metadata
            metadata = structured_data.get("metadata", {})
            company = metadata.get("company", "Unknown")
            year = metadata.get("year", "Unknown")
            quarter = metadata.get("quarter", "Unknown")

            # Add bank and get ID
            bank_id = self.db.add_bank(company)

            # Check if report already exists and always delete it
            report_id = self.db.get_report_id(bank_id, year, quarter)
            if report_id:
                self.logger.info(f"Deleting existing report ID: {report_id}")
                self.db.delete_report_data(report_id)

            # Create a new report
            report_id = self.db.add_report(
                bank_id=bank_id,
                report_name=f"{company} {year} {quarter} Earnings Call",
                year=year,
                quarter=quarter,
            )

            # Process analyst conversation groups
            analyst_conversations = {}  # Track conversations by analyst

            for qa_item in structured_data.get("qa_pairs", []):
                qa_pair = qa_item.get("qa_pair", {})
                if not qa_pair:
                    continue

                analyst_name = qa_pair.get("analyst_name", "Unknown Analyst")
                analyst_company = qa_pair.get("analyst_company", "Unknown Firm")

                # Create unique key for this analyst
                analyst_key = f"{analyst_name}_{analyst_company}"

                # Create conversation if not seen this analyst yet
                if analyst_key not in analyst_conversations:
                    conversation_id = self.db.add_analyst_conversation(
                        report_id=report_id,
                        analyst_name=analyst_name,
                        analyst_company=analyst_company,
                    )
                    analyst_conversations[analyst_key] = conversation_id
                else:
                    conversation_id = analyst_conversations[analyst_key]

                # Get topic information if available
                topic = qa_pair.get("topic", "")

                # Process questions and answers
                questions = qa_pair.get("questions", [])
                answers = qa_pair.get("answers", [])

                # Store each question-answer pair
                for i, question in enumerate(questions):
                    # Store each answer for this question
                    for answer in answers:
                        answer_text = answer.get("answer_text", "")

                        # Add to database
                        qa_pair_id = self.db.add_qa_pair(
                            conversation_id=conversation_id,
                            question=question,
                            answer=answer_text,
                            answer_speaker=answer.get("speaker_name", "Unknown"),
                            answer_role=answer.get("speaker_role", "Unknown"),
                        )

            return report_id

        try:
            # Use the database's transaction wrapper
            return self.db.execute_in_transaction(_save_transcript_to_db)
        except Exception as e:
            self.logger.error(f"Error storing transcript data: {e}", exc_info=True)
            raise

    async def process_transcript_batch(
        self,
        folder_path: str,
        batch_size: int = 4,
        use_batch: bool = False,
        selected_banks=None,
    ):
        """
        Process multiple transcripts in parallel batches

        Args:
            folder_path: Path to folder containing PDF transcripts
            batch_size: Number of concurrent processes
            use_batch: Whether to use batch processing for Claude API (default: False)
            selected_banks: List of banks to process (if None, process all)
        """
        # Get list of PDF files
        pdf_files = list(Path(folder_path).glob("*.pdf"))
        self.logger.info(f"Found {len(pdf_files)} PDF files to process")

        # Track processing results
        results = []

        # Process files sequentially (still using async for Claude API calls)
        for pdf_file in pdf_files:
            try:
                # Process the transcript
                self.logger.info(f"Processing {pdf_file}")
                result = await self.process_pdf_transcript(
                    str(pdf_file), use_batch=use_batch, selected_banks=selected_banks
                )

                # Store to database if we have valid data
                if result and result.get("metadata") and result.get("qa_pairs"):
                    report_id = self.store_to_database(result)
                    self.logger.info(
                        f"Successfully processed and stored {pdf_file} (Report ID: {report_id})"
                    )
                    results.append(
                        {"file": pdf_file.name, "success": True, "report_id": report_id}
                    )
                else:
                    self.logger.warning(f"No valid data extracted from {pdf_file}")
                    results.append(
                        {
                            "file": pdf_file.name,
                            "success": False,
                            "error": "No valid data extracted",
                        }
                    )
            except Exception as e:
                self.logger.error(f"Error processing {pdf_file}: {e}", exc_info=True)
                results.append(
                    {"file": pdf_file.name, "success": False, "error": str(e)}
                )

        # Return processing results
        return results

In [None]:
# Apply nest_asyncio to allow nested event loops in Colab
nest_asyncio.apply()


# Function to process transcripts in Colab
async def process_transcripts_colab(db, folder_path, use_batch=False):
    """Process transcripts in Colab environment"""

    # Initialize processor
    processor = ClaudeTranscriptProcessor(db, model=CLAUDE_API_MODEL)

    # Process transcripts (direct method by default)
    results = await processor.process_transcript_batch(
        folder_path,
        use_batch=use_batch,
        selected_banks=SELECTED_BANKS,
    )
    return results


# Run this instead of asyncio.run()
results = await process_transcripts_colab(db, CALL_TRANSCRIPTS_DOWNLOAD_FOLDER)
print(f"Processed {len(results)} files")

INFO:ClaudeTranscriptProcessor:Found 16 PDF files to process
INFO:ClaudeTranscriptProcessor:Processing earnings_transcripts/JPMorgan Chase & Co. (JPM) Q2 2023 Earnings Call Transcript _ Seeking Alpha.pdf
INFO:ClaudeTranscriptProcessor:Reading PDF from earnings_transcripts/JPMorgan Chase & Co. (JPM) Q2 2023 Earnings Call Transcript _ Seeking Alpha.pdf
INFO:ClaudeTranscriptProcessor:Extracted 897 clean lines from earnings_transcripts/JPMorgan Chase & Co. (JPM) Q2 2023 Earnings Call Transcript _ Seeking Alpha.pdf
INFO:ClaudeTranscriptProcessor:Processing 20 chunks for metadata extraction (direct method)
INFO:ClaudeTranscriptProcessor:Successfully processed metadata chunk 1/20
INFO:ClaudeTranscriptProcessor:Successfully processed metadata chunk 2/20
INFO:ClaudeTranscriptProcessor:Successfully processed metadata chunk 3/20
INFO:ClaudeTranscriptProcessor:Successfully processed metadata chunk 4/20
INFO:ClaudeTranscriptProcessor:Successfully processed metadata chunk 5/20
INFO:ClaudeTranscriptP

Processed 16 files


## 📈 Credit-Risk Metrics Extractor
### Features
* **Smart PDF Processing**: Identifies relevant pages containing financial metrics tables
* **Document AI Integration**: Uses Google Cloud's Document AI for entity extraction
* **Bank-Specific Adaptability**: Automatically detects bank report formats and applies appropriate entity mappings
* **Standardized Metrics Calculation**: Calculates key credit risk indicators including:

    * Non-Performing Loan (NPL) Ratio
    * Coverage Ratio
    * Provision for Credit Losses (PCL)
    * Common Equity Tier 1 (CET1) Ratio
    * Texas Ratio


* **Batch Processing**: Handles multiple financial reports in a single run
* **Output Generation**: Creates structured JSON and CSV outputs for further analysis

### Metrics Explained

1. **NPL Ratio**: Measures the percentage of non-performing loans in the total loan portfolio
2. **Coverage Ratio**: Shows how much of the non-performing loans are covered by allowances
3. **PCL**: Provision for Credit Losses for the current period
4. **CET1 Ratio**: Common Equity Tier 1 Capital Ratio, a measure of bank's core capital
5. **Texas Ratio**: Measures a bank's credit problems relative to its capital resources

In [None]:
class BankingMetricsCalculator:
    """Extract banking metrics from financial reports using Document AI and LLMs."""

    def __init__(
        self,
        project_id: str,
        location: str,
        processor_id: str,
        output_dir: str,
        db: GSIBDatabase,
        processor_version_id: Optional[str] = None,
        batch_size: int = 10,
    ):
        """Initialize the extractor with API credentials and parameters.

        Args:
            project_id: Google Cloud project ID
            location: Google Cloud region (e.g., 'eu')
            processor_id: Document AI processor ID
            output_dir: Directory to save extraction results
            db: Initialized GSIBDatabase instance
            processor_version_id: Optional Document AI processor version ID
            batch_size: Maximum batch size for API calls
        """
        # Set up logging
        self.logger = logging.getLogger(__name__)
        self.logger.info("Initializing BankingMetricsCalculator")

        # Store parameters
        self.project_id = project_id
        self.location = location
        self.processor_id = processor_id
        self.batch_size = batch_size
        self.db = db  # Store the database instance

        # Setup output directory
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True, parents=True)

        # Initialize clients
        self.docai_client = self._setup_docai_client(processor_version_id)

        self.target_metrics = [
            "NPL_Ratio",  # Non-Performing Loan Ratio
            "Coverage_Ratio",  # Coverage Ratio
            "PCL",  # Provision for Credit Losses
            "CET1_Ratio",  # Common Equity Tier 1 Ratio
            "Texas_Ratio",  # Texas Ratio
        ]
        self._define_metrics()

        self.logger.info("BankingMetricsCalculator initialized successfully")

    def _setup_docai_client(
        self,
        processor_version_id: Optional[str],
    ) -> documentai.DocumentProcessorServiceClient:
        """Set up and return a Document AI client.

        Args:
            processor_version_id: Optional processor version ID

        Returns:
            Configured Document AI client

        Raises:
            RuntimeError: If client initialization fails
        """
        try:
            endpoint = f"{self.location}-documentai.googleapis.com"
            self.logger.debug(f"Setting up Document AI client with endpoint {endpoint}")

            opts = ClientOptions(api_endpoint=endpoint)
            client = documentai.DocumentProcessorServiceClient(client_options=opts)

            # Set up processor path
            if processor_version_id:
                self.processor_name = client.processor_version_path(
                    self.project_id,
                    self.location,
                    self.processor_id,
                    processor_version_id,
                )
                self.logger.debug(f"Using processor version: {processor_version_id}")
            else:
                self.processor_name = client.processor_path(
                    self.project_id,
                    self.location,
                    self.processor_id,
                )

            self.logger.debug(f"Document AI processor path: {self.processor_name}")
            return client

        except Exception as e:
            self.logger.error(
                f"Failed to initialize Document AI client: {e}",
                exc_info=True,
            )
            raise RuntimeError(f"Document AI client initialization failed: {e}")

    def _define_metrics(self) -> None:
        """Define target metrics and their calculation methods with bank-specific entity mappings."""
        # Define entity mappings for different banks
        self.entity_mappings = {
            # UBS entity mappings
            "UBS": {
                "nonperforming_loans": "total_nonperforming_assets",
                "nonperforming_assets": "total_nonperforming_assets",
                "total_loans": "total_loans",
                "loan_loss_allowance": "total_allowance_for_loan_losses",
                "cet1_capital": "cet1_capital",
                "risk_weighted_assets": "risk_weighted_assets",
                "provision_for_credit_losses": "provision_for_credit_losses",
                "tangible_equity": "tangible_equity_attributable_shareholders",
            },
            # JPMorgan entity mappings
            "JPMorgan Chase": {
                "nonperforming_loans": "total_nonaccrual_loans",  # Primary source
                "nonperforming_assets": "total_nonperforming_assets",  # Primary source
                "total_loans": "total_loans",
                "loan_loss_allowance": "total_allowance_for_loan_losses",
                "cet1_capital": "cet1_capital",
                "risk_weighted_assets": "risk_weighted_assets",
                "provision_for_credit_losses": "provision_for_credit_losses",
                "tangible_equity": "tangible_equity",
            },
        }

        # Define metrics properties with calculation logic
        self.metrics_info = {
            "NPL_Ratio": {
                "description": "Non-Performing Loan Ratio measures the percentage of non-performing loans in the total loan portfolio.",
                "calculation": "Non-performing loans / Total loans * 100",
                "standardized_entities": ["nonperforming_loans", "total_loans"],
                "compute": lambda vals: (vals[0] / vals[1]) * 100
                if vals[1] and vals[1] > 0
                else None,
                "typical_range": [0.1, 5.0],
                "format": "%.2f%%",
            },
            "Coverage_Ratio": {
                "description": "Coverage Ratio shows how much of the non-performing loans are covered by allowances.",
                "calculation": "Loan loss allowance / Non-performing loans * 100",
                "standardized_entities": ["loan_loss_allowance", "nonperforming_loans"],
                "compute": lambda vals: (vals[0] / vals[1]) * 100
                if vals[1] and vals[1] > 0
                else None,
                "typical_range": [20.0, 500.0],
                "format": "%.2f%%",
            },
            "PCL": {
                "description": "Provision for Credit Losses for the current period.",
                "calculation": "Direct value from Provision for Credit Losses line item",
                "standardized_entities": ["provision_for_credit_losses"],
                "compute": lambda vals: vals[0],
                "typical_range": [0, 4000],
                "format": "%.0f million",
            },
            "CET1_Ratio": {
                "description": "Common Equity Tier 1 Capital Ratio, a measure of bank's core capital.",
                "calculation": "Common Equity Tier 1 Capital / Risk Weighted Assets * 100",
                "standardized_entities": ["cet1_capital", "risk_weighted_assets"],
                "compute": lambda vals: (vals[0] / vals[1]) * 100
                if vals[1] and vals[1] > 0
                else None,
                "typical_range": [4.0, 20.0],
                "format": "%.1f%%",
            },
            "Texas_Ratio": {
                "description": "Texas Ratio measures a bank's credit problems relative to its capital resources.",
                "calculation": "Non-performing assets / (Tangible equity + Loan loss allowance) * 100",
                "standardized_entities": [
                    "nonperforming_assets",
                    "tangible_equity",
                    "loan_loss_allowance",
                ],
                "compute": lambda vals: (vals[0] / (vals[1] + vals[2])) * 100
                if (
                    vals[1] is not None
                    and vals[2] is not None
                    and vals[1] + vals[2] > 0
                )
                else None,
                "typical_range": [0, 100.0],  # Concerning when over 100%
                "format": "%.2f%%",
            },
        }

    def _clean_numeric_value(self, value):
        """
        Convert string values with currency symbols, commas, etc. to float.

        Args:
            value: The value to clean and convert

        Returns:
            float: The cleaned numeric value, or None if conversion fails
        """
        if value is None:
            return None

        # If value is already a number, return it
        if isinstance(value, (int, float)):
            return float(value)

        try:
            # If value is a string, clean and convert
            if isinstance(value, str):
                # Remove currency symbols, commas, parentheses (negative values), and whitespace
                cleaned = (
                    value.replace("$", "")
                    .replace("€", "")
                    .replace("£", "")
                    .replace("CHF", "")
                )
                cleaned = cleaned.replace(",", "").replace(" ", "")

                # Handle parentheses notation for negative numbers: (123) -> -123
                if cleaned.startswith("(") and cleaned.endswith(")"):
                    cleaned = "-" + cleaned[1:-1]

                # Handle percentage values
                if cleaned.endswith("%"):
                    cleaned = cleaned.rstrip("%")
                    return float(cleaned) / 100.0

                return float(cleaned)
            self.logger.warning(
                f"Unexpected value type: {type(value)} for value: {value}",
            )
            return None
        except (ValueError, TypeError) as e:
            self.logger.warning(
                f"Failed to convert value '{value}' to numeric: {str(e)}",
            )
            return None

    def _detect_bank_and_period(
        self,
        entities: Dict[str, Any],
    ) -> Tuple[str, int, str, Optional[str]]:
        """
        Detect bank name, year, and quarter from entities

        Returns:
            Tuple of (bank_name, year, quarter, report_date)
        """
        # Determine bank name
        bank_name = entities.get("bank_name")
        if not bank_name:
            # If bank_name not explicitly present, infer from available fields
            if "tangible_equity_attributable_shareholders" in entities:
                bank_name = "UBS"
            else:
                bank_name = "JPMorgan Chase"

        # Normalize bank name
        if "jpmorgan" in bank_name.lower() or "chase" in bank_name.lower():
            bank_name = "JPMorgan Chase"
        elif "ubs" in bank_name.lower():
            bank_name = "UBS"

        # Extract report period
        report_period = entities.get("report_period", "")
        report_date = None

        # Pattern 1: QNYR format (e.g., "4Q23" or "1Q2023")
        qy_pattern = re.compile(r"(\d)Q(\d{2,4})")
        qy_match = qy_pattern.search(report_period)

        # Pattern 2: ISO date format (e.g., "31.12.24" or "31.12.2024")
        date_pattern = re.compile(r"(\d{1,2})\.(\d{1,2})\.(\d{2,4})")
        date_match = date_pattern.search(report_period)

        if qy_match:
            quarter = f"Q{qy_match.group(1)}"
            year_str = qy_match.group(2)
            # Convert 2-digit year to 4-digit
            year = int(year_str)
            if len(year_str) == 2:
                year = 2000 + year if year < 50 else 1900 + year
        elif date_match:
            # Extract date components
            day = int(date_match.group(1))
            month = int(date_match.group(2))
            year_str = date_match.group(3)

            # Convert 2-digit year to 4-digit
            year = int(year_str)
            if len(year_str) == 2:
                year = 2000 + year if year < 50 else 1900 + year

            # Determine quarter from month
            quarter_map = {
                1: "Q1",
                2: "Q1",
                3: "Q1",
                4: "Q2",
                5: "Q2",
                6: "Q2",
                7: "Q3",
                8: "Q3",
                9: "Q3",
                10: "Q4",
                11: "Q4",
                12: "Q4",
            }
            quarter = quarter_map[month]

            # Format ISO date
            report_date = f"{year}-{month:02d}-{day:02d}"
        else:
            # Default fallback
            current_date = datetime.now()
            year = current_date.year
            quarter = f"Q{(current_date.month - 1) // 3 + 1}"
            self.logger.warning(
                f"Couldn't determine period from '{report_period}', using current: {year} {quarter}",
            )

        return bank_name, year, quarter, report_date

    def _normalize_jpmorgan_entities(self, entities):
        """Handle JPMorgan-specific entity calculations."""
        # Clean all entity values first, but skip report_period, bank_name
        cleaned_entities = {}
        for k, v in entities.items():
            if k in ["report_period", "bank_name"]:
                cleaned_entities[k] = v  # Keep report_period, bank_name as is
            else:
                cleaned_value = self._clean_numeric_value(v)
                if cleaned_value is not None:
                    cleaned_entities[k] = cleaned_value

        # If we have tangible book value per share and share count, calculate total tangible equity
        if (
            "tangible_book_value_per_share" in cleaned_entities
            and "common_shares_at_period_end" in cleaned_entities
        ):
            cleaned_entities["tangible_equity"] = (
                cleaned_entities["tangible_book_value_per_share"]
                * cleaned_entities["common_shares_at_period_end"]
            )
            self.logger.info(
                f"Calculated tangible_equity: {cleaned_entities['tangible_equity']}",
            )

        # Use total_nonperforming_assets if total_nonaccrual_loans is not available
        if (
            "total_nonaccrual_loans" not in cleaned_entities
            and "total_nonperforming_assets" in cleaned_entities
        ):
            cleaned_entities["total_nonaccrual_loans"] = cleaned_entities[
                "total_nonperforming_assets"
            ]

        # Use provision_for_loan_losses if provision_for_credit_losses is not available
        if (
            "provision_for_credit_losses" not in cleaned_entities
            and "provision_for_loan_losses" in cleaned_entities
        ):
            cleaned_entities["provision_for_credit_losses"] = cleaned_entities[
                "provision_for_loan_losses"
            ]

        return cleaned_entities

    def _save_to_database(
        self,
        file_name: str,
        metrics: Dict[str, Dict[str, Any]],
        entities: Dict[str, Any],
    ) -> int:
        """
        Save metrics and entities to database using the GSIBDatabase wrapper.

        Args:
            file_name: Original file name
            metrics: Dictionary of calculated metrics
            entities: Dictionary of extracted entities

        Returns:
            report_id: The database ID of the created report
        """
        try:
            conn = self.db.begin_transaction()
            # Detect bank and period
            bank_name, year, quarter, report_date = self._detect_bank_and_period(
                entities,
            )
            # Add bank and get ID
            bank_id = self.db.add_bank(bank_name)

            # Query for existing report with matching bank_id, year, and quarter
            report_id = self.db.get_report_id(
                bank_name=bank_name,
                year=year,
                quarter=quarter,
            )

            # If no existing report found, create a new one
            if report_id is None:
                report_id = self.db.add_report(
                    bank_id=bank_id,
                    report_name=file_name,
                    year=year,
                    quarter=quarter,
                    report_date=report_date,
                )
                self.logger.info(
                    f"Created new report entry: {bank_name} {year} {quarter}",
                )
            else:
                self.logger.info(
                    f"Using existing report entry: {bank_name} {year} {quarter}",
                )

            # Save metrics
            for metric_name, metric_data in metrics.items():
                try:
                    self.db.add_metric(
                        report_id=report_id,
                        metric_name=metric_name,
                        metric_value=metric_data.get("value"),
                        formatted_value=metric_data.get("formatted_value"),
                        in_typical_range=metric_data.get("in_typical_range", 0),
                    )
                except Exception as e:
                    self.logger.warning(f"Error saving metric {metric_name}: {e}")

            self.logger.info(f"Saved to database: {bank_name} {year} {quarter}")
            self.db.commit_transaction(conn)
            return report_id

        except Exception as e:
            self.logger.error(f"Error saving to database: {e}", exc_info=True)
            self.db.rollback_transaction(conn)
            return None

    def _save_enhanced_summary(self) -> Path:
        """
        Generate and save enhanced summary to CSV

        Returns:
            Path to saved CSV file
        """
        try:
            summary_df = self.db.generate_enhanced_metrics_summary()

            if not summary_df.empty:
                # Save to CSV
                summary_path = f"{self.output_dir}/enhanced_metrics_summary.csv"
                summary_df.to_csv(summary_path, index=False)
                self.logger.info(f"Enhanced summary saved to {summary_path}")
                return summary_path
            self.logger.warning("No data available for enhanced summary")
            return None

        except Exception as e:
            self.logger.error(f"Error saving enhanced summary: {e}", exc_info=True)
            return None

    def _calculate_metrics(self, entities: dict) -> dict:
        """Calculate banking metrics from extracted entities.

        Args:
            entities: Dictionary of extracted entities and values

        Returns:
            Dictionary of calculated metrics with values and metadata
        """
        self.logger.info("Calculating metrics from extracted entities")
        self.logger.debug(f"Available entities: {entities}")

        # Detect which bank's report we're working with
        bank_name, _, _, _ = self._detect_bank_and_period(entities)
        self.logger.info(f"Detected bank: {bank_name}")

        # Handle JPMorgan specific calculations if needed
        if bank_name == "JPMorgan Chase":
            entities = self._normalize_jpmorgan_entities(entities)
        else:
            # For UBS or other banks, clean numeric values but preserve report_period, bank_name
            cleaned_entities = {}
            for k, v in entities.items():
                if k in ["report_period", "bank_name"]:
                    cleaned_entities[k] = v  # Keep report_period, bank_name as is
                else:
                    cleaned_value = self._clean_numeric_value(v)
                    if cleaned_value is not None:
                        cleaned_entities[k] = cleaned_value
            entities = cleaned_entities

        # Get entity mapping for detected bank
        entity_mapping = self.entity_mappings[bank_name]

        results = {}
        for metric_name in self.target_metrics:
            metric_info = self.metrics_info[metric_name]
            self.logger.debug(f"Calculating {metric_name}")

            # Map standardized entity names to bank-specific entity names
            required_entities = metric_info["standardized_entities"]
            entity_values = []
            missing_entities = []

            # Map and collect required values for this metric
            for std_entity in required_entities:
                bank_entity = entity_mapping.get(std_entity)

                if bank_entity and bank_entity in entities:
                    entity_values.append(entities[bank_entity])
                else:
                    missing_entities.append(std_entity)
                    entity_values.append(None)

            # Calculate metric if all required entities are available
            if not missing_entities:
                try:
                    # Apply computation function with entity values
                    computed_value = metric_info["compute"](entity_values)

                    if computed_value is not None:
                        # Format value for display
                        formatted_value = metric_info["format"] % computed_value

                        # Validate against typical range
                        min_val, max_val = metric_info["typical_range"]
                        in_range = min_val <= computed_value <= max_val

                        # Store result with metadata
                        results[metric_name] = {
                            "value": computed_value,
                            "formatted_value": formatted_value,
                            "in_typical_range": in_range,
                            "description": metric_info["description"],
                            "calculation": metric_info["calculation"],
                            "components": dict(zip(required_entities, entity_values)),
                        }

                        self.logger.info(f"Calculated {metric_name}: {formatted_value}")

                        if not in_range:
                            self.logger.warning(
                                f"{metric_name} value {computed_value} outside typical range [{min_val}, {max_val}]",
                            )
                    else:
                        self.logger.warning(
                            f"Calculation for {metric_name} returned None",
                        )
                except Exception as e:
                    self.logger.error(
                        f"Error calculating {metric_name}: {e}",
                        exc_info=True,
                    )
            else:
                self.logger.warning(
                    f"Cannot calculate {metric_name}. Missing standardized entities: {missing_entities}",
                )

        self.logger.info(
            f"Calculated {len(results)} out of {len(self.target_metrics)} metrics",
        )
        return results

    def _find_relevant_pages(self, file_path: str) -> Tuple[List[int], str]:
        """Find pages containing target metrics tables in PDF.

        Args:
            file_path: Path to PDF file

        Returns:
            Tuple containing list of relevant page numbers and extracted text
        """
        self.logger.info(f"Finding relevant pages in {file_path}")

        # Target sections and their related metrics keywords
        target_sections = [
            # JP Morgan Chase format
            {
                "CAPITAL AND OTHER SELECTED BALANCE SHEET ITEMS": [
                    "CET1 capital",
                    "Risk-weighted assets",
                ],
            },
            {
                "CONSOLIDATED FINANCIAL HIGHLIGHTS": [
                    "Provision for credit losses",
                    "Common shares at period-end",
                    "Tangible book value per share",
                ],
            },
            {
                "CREDIT-RELATED INFORMATION, CONTINUED": [
                    "Total nonaccrual loans",
                    "Total nonperforming assets",
                    "Provision for loan losses",
                    "Ending balance",
                    "Net charge-offs",
                ],
            },
            {"CONSOLIDATED FINANCIAL HIGHLIGHTS, CONTINUED": ["Total Loans"]},
            # UBS format
            {
                "Our key figures": [
                    "Common equity tier 1 capital",
                    "Risk-weighted assets",
                    "Credit loss expense",
                ],
            },
            {
                "Group performance": [
                    "Credit loss (expense)",
                    "Credit loss expense",
                ],
            },
            {
                "Banking and traded products exposure": [
                    "Total credit-impaired exposure, gross",
                    "Total allowances and provisions for expected credit losses",
                    "Gross exposure",
                ],
            },
            {
                "Equity, CET1 capital and returns": [
                    "Tangible equity attributable to shareholders",
                ],
            },
            {
                "Return on equity and CET1 capital": [
                    "Tangible equity attributable to shareholders",
                ],
            },
            {
                "UBS shares": [
                    "Tangible equity attributable to shareholders",
                ],
            },
        ]

        extracted_text = ""
        relevant_pages = []

        try:
            # Open the PDF
            reader = pypdf.PdfReader(file_path)
            total_pages = len(reader.pages)
            self.logger.debug(f"PDF has {total_pages} pages")

            # Scan through pages to find relevant content
            for i, page in enumerate(reader.pages):
                page_num = i + 1  # 1-indexed page numbers
                text = page.extract_text()
                extracted_text += text + "\n\n"

                # Check each section and its metrics
                for section_dict in target_sections:
                    section_header = list(section_dict.keys())[0]
                    metrics = section_dict[section_header]

                    # Only mark page as relevant if it contains BOTH:
                    # 1. The exact section header
                    # 2. At least one of the metrics for that section
                    if section_header.lower() in text.lower() and any(
                        metric.lower() in text.lower() for metric in metrics
                    ):
                        self.logger.debug(
                            f"Found section '{section_header}' with relevant metrics on page {page_num}",
                        )
                        relevant_pages.append(page_num)
                        break

            # Sort and deduplicate pages
            relevant_pages = sorted(set(relevant_pages))

            if not relevant_pages:
                self.logger.warning("No relevant pages found in the document")
                # If no relevant pages found, include first few pages as fallback
                relevant_pages = list(range(1, min(6, total_pages + 1)))

            self.logger.info(
                f"Found {len(relevant_pages)} relevant pages: {relevant_pages}",
            )
            return relevant_pages, extracted_text

        except Exception as e:
            self.logger.error(f"Error finding relevant pages: {e}", exc_info=True)
            # Return first 5 pages as fallback
            fallback_pages = list(range(1, min(6, total_pages + 1)))
            self.logger.warning(f"Using fallback pages: {fallback_pages}")
            return fallback_pages, extracted_text

    def _extract_subset_pdf(
        self,
        input_path: str,
        output_path: str,
        pages: List[int],
    ) -> bool:
        """Extract specific pages from PDF to create a smaller document.

        Args:
            input_path: Path to input PDF
            output_path: Path where to save the extracted PDF
            pages: List of page numbers to extract (1-indexed)

        Returns:
            Boolean indicating success
        """
        self.logger.info(f"Extracting pages {pages} from {input_path} to {output_path}")

        try:
            reader = pypdf.PdfReader(input_path)
            writer = pypdf.PdfWriter()

            # Add selected pages to the new document
            for page_num in pages:
                # Convert to 0-indexed
                idx = page_num - 1
                if 0 <= idx < len(reader.pages):
                    writer.add_page(reader.pages[idx])
                else:
                    self.logger.warning(f"Page {page_num} out of range, skipping")

            # Save the new PDF
            with open(output_path, "wb") as output_file:
                writer.write(output_file)

            self.logger.info(f"Successfully created subset PDF with {len(pages)} pages")
            return True

        except Exception as e:
            self.logger.error(f"Error creating subset PDF: {e}", exc_info=True)
            return False

    def _process_document_with_docai(self, file_path: str) -> Dict[str, Any]:
        """Process document with Document AI to extract entities.

        Args:
            file_path: Path to PDF file

        Returns:
            Dictionary of extracted entities and their values
        """
        self.logger.info(f"Processing document with Document AI: {file_path}")
        start_time = time.time()

        extracted_entities = {}

        try:
            # Read file content
            with open(file_path, "rb") as f:
                file_content = f.read()

            # Create Document AI request
            request = documentai.ProcessRequest(
                name=self.processor_name,
                raw_document=documentai.RawDocument(
                    content=file_content,
                    mime_type="application/pdf",
                ),
            )

            # Process the document
            self.logger.debug("Sending request to Document AI processor")
            result = self.docai_client.process_document(request=request)
            document = result.document

            # Extract entities
            if hasattr(document, "entities") and document.entities:
                self.logger.debug(f"Found {len(document.entities)} entities")

                for entity in document.entities:
                    entity_type = entity.type_

                    # Extract the normalized value if available, otherwise use text
                    if hasattr(entity, "normalized_value") and entity.normalized_value:
                        try:
                            value = float(entity.normalized_value.text)
                        except ValueError:
                            value = entity.normalized_value.text
                    else:
                        # Extract text and convert to float if possible
                        text = self._get_text(entity.text_anchor, document)
                        try:
                            # Clean and convert to float
                            text = text.replace(",", "").strip()
                            value = float(text)
                        except ValueError:
                            value = text

                    self.logger.debug(f"Extracted entity: {entity_type} = {value}")
                    extracted_entities[entity_type] = value
            else:
                self.logger.warning("No entities found in the document")

            elapsed_time = time.time() - start_time
            self.logger.info(
                f"Document AI processing completed in {elapsed_time:.2f} seconds",
            )

            return extracted_entities

        except Exception as e:
            self.logger.error(
                f"Error processing document with Document AI: {e}",
                exc_info=True,
            )
            return {}

    def _get_text(self, text_anchor, document) -> str:
        """Extract text from text anchor.

        Args:
            text_anchor: Document AI text anchor
            document: Document AI document

        Returns:
            Extracted text
        """
        try:
            text = ""
            for segment_idx in text_anchor.text_segments:
                start_idx = segment_idx.start_index
                end_idx = segment_idx.end_index
                text += document.text[start_idx:end_idx]
            return text
        except Exception as e:
            self.logger.error(f"Error extracting text from anchor: {e}")
            return ""

    def _extract_metrics_from_pdf(
        self,
        file_path: str,
    ) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]:
        """Main method to extract banking metrics from a PDF file.

        Args:
            file_path: Path to PDF file

        Returns:
            Dictionary of extracted metrics with values and metadata
        """
        self.logger.info(f"Starting metrics extraction from {file_path}")
        start_time = time.time()

        try:
            # Step 1: Find relevant pages in the document
            relevant_pages, extracted_text = self._find_relevant_pages(file_path)

            # Step 2: Create a smaller PDF with only relevant pages
            input_path = Path(file_path)
            subset_pdf_path = f"{self.output_dir}/{input_path.stem}_subset.pdf"
            extracted_entities = []
            if self._extract_subset_pdf(file_path, subset_pdf_path, relevant_pages):
                # Step 3: Process the subset PDF with Document AI
                extracted_entities = self._process_document_with_docai(subset_pdf_path)
            else:
                raise Exception("Failed to create subset PDF")

            # Step 4: Calculate metrics from extracted entities
            metrics_results = self._calculate_metrics(extracted_entities)

            elapsed_time = time.time() - start_time
            self.logger.info(
                f"Metrics extraction completed in {elapsed_time:.2f} seconds. "
                f"Found {len(metrics_results)}/{len(self.target_metrics)} metrics.",
            )

            return metrics_results, extracted_entities

        except Exception as e:
            self.logger.error(
                f"Error extracting metrics from {file_path}: {e}",
                exc_info=True,
            )
            return {}

    def _create_metrics_summary(
        self,
        metrics: Dict[str, Dict[str, Any]],
    ) -> pd.DataFrame:
        """Create a DataFrame summary of metrics results.

        Args:
            metrics: Dictionary of calculated metrics with their metadata

        Returns:
            DataFrame with metrics summary information
        """
        summary_data = []
        for name, metric_data in metrics.items():
            row = {
                "metric": name,
                "value": metric_data.get("value"),
                "formatted_value": metric_data.get("formatted_value", "N/A"),
                "in_typical_range": metric_data.get("in_typical_range", False),
                "description": metric_data.get("description", ""),
            }
            summary_data.append(row)

        return pd.DataFrame(summary_data)

    def _save_metrics_results(
        self,
        file_path: str,
        metrics: Dict[str, Dict[str, Any]],
        entities: Dict[str, Any] = None,
        output_dir: Path = None,
    ) -> Dict[str, Path]:
        """Save extraction results to files.

        Args:
            file_path: Original file path
            metrics: Dictionary of calculated metrics
            entities: Dictionary of extracted entities (optional)
            output_dir: Custom output directory (optional)

        Returns:
            Dictionary mapping file types to their saved paths
        """
        self.logger.debug(f"Saving results for {file_path}")
        saved_paths = {}

        try:
            file_name = Path(file_path).stem

            # Use custom output directory if provided, otherwise use the default
            if output_dir is None:
                output_dir = self.output_dir / file_name

            output_dir.mkdir(exist_ok=True, parents=True)

            # Save metrics to JSON with full metadata
            metrics_path = f"{output_dir}/{file_name}_metrics.json"
            with open(metrics_path, "w") as f:
                json.dump(metrics, f, indent=2)
            saved_paths["metrics_json"] = metrics_path

            # Create and save metrics summary to CSV
            if metrics:
                summary_df = self._create_metrics_summary(metrics)
                summary_path = f"{output_dir}/{file_name}_metrics_summary.csv"
                summary_df.to_csv(summary_path, index=False)
                saved_paths["metrics_summary_csv"] = summary_path

            # Save extracted entities to JSON if provided
            if entities:
                entities_path = f"{output_dir}/{file_name}_entities.json"
                with open(entities_path, "w") as f:
                    json.dump(entities, f, indent=2)
                saved_paths["entities_json"] = entities_path

            self.logger.info(f"Results saved to {output_dir}")

        except Exception as e:
            self.logger.error(f"Error saving results: {e}", exc_info=True)

        return saved_paths

    def batch_process(self, input_path: str) -> Dict[str, Dict[str, Dict[str, Any]]]:
        """Process multiple PDF files in batch.

        Args:
            input_path: Input dir of PDF files

        Returns:
            Dictionary mapping file names to extracted metrics
        """
        start_time = time.time()

        results = {}
        file_paths = []
        entities = {}

        # Process all files
        for root, _, files in os.walk(input_path):
            file_path = [os.path.join(root, f) for f in files if f.endswith(".pdf")]
            if file_path:
                file_paths.extend(file_path)
            else:
                self.logger.warning(f"No PDF files found in {root}")

        self.logger.info(f"Starting batch processing for {len(file_paths)} files")
        for file_path in file_paths:
            try:
                self.logger.info(f"Processing file: {file_path}")
                file_start_time = time.time()

                # Extract metrics from file
                metrics, entities = self._extract_metrics_from_pdf(file_path)

                # Store results
                file_name = Path(file_path).stem
                results[file_name] = metrics

                # Save to database
                self._save_to_database(file_name, metrics, entities)

                file_elapsed_time = time.time() - file_start_time
                self.logger.info(
                    f"Completed processing {file_path} in {file_elapsed_time:.2f} seconds",
                )

            except Exception as e:
                self.logger.error(
                    f"Error processing file {file_path}: {e}",
                    exc_info=True,
                )
                file_name = Path(file_path).stem
                results[file_name] = {}

        # Create enhanced summary from database
        self._save_enhanced_summary()

        elapsed_time = time.time() - start_time
        self.logger.info(f"Completed batch processing in {elapsed_time:.2f} seconds")

        return results

In [None]:
# Initialize extractor with batch processing capabilities
metrics_calculator = BankingMetricsCalculator(
    db=db,
    project_id=DOCUMENT_AI_PROJECT_ID,
    location=DOCUMENT_AI_LOCATION,
    processor_id=DOCUMENT_AI_PROCESSOR_ID,
    output_dir=DOCUMENT_AI_OUTPUT_DIR,
    processor_version_id=DOCUMENT_AI_PROCESSOR_VERSION  # Optional
)

metrics_calculator.batch_process(QUARTERLY_REPORTS_DOWNLOAD_FOLDER)

{'full-report-ubs-group-ag-consolidated-3q24': {'NPL_Ratio': {'value': 0.622780060897806,
   'formatted_value': '0.62%',
   'in_typical_range': True,
   'description': 'Non-Performing Loan Ratio measures the percentage of non-performing loans in the total loan portfolio.',
   'calculation': 'Non-performing loans / Total loans * 100',
   'components': {'nonperforming_loans': 6633.0, 'total_loans': 1065063.0}},
  'Coverage_Ratio': {'value': 36.650082918739635,
   'formatted_value': '36.65%',
   'in_typical_range': True,
   'description': 'Coverage Ratio shows how much of the non-performing loans are covered by allowances.',
   'calculation': 'Loan loss allowance / Non-performing loans * 100',
   'components': {'loan_loss_allowance': 2431.0,
    'nonperforming_loans': 6633.0}},
  'PCL': {'value': 121.0,
   'formatted_value': '121 million',
   'in_typical_range': True,
   'description': 'Provision for Credit Losses for the current period.',
   'calculation': 'Direct value from Provision fo

## 💬📊 Pipeline Components

### 📦 Dataclasses

In [4]:
@dataclass
class CreditMetric:
    """Credit risk metric definition with associated keywords and indicators"""

    name: str
    description: str
    keywords: Set[str]
    indicators: Set[str]
    typical_range: tuple  # (min, max)
    format_string: str = "{:.2f}%"  # Default format
    is_percentage: bool = True

    def format_value(self, value: float) -> str:
        """Format a metric value according to its defined format"""
        if self.is_percentage and "%" not in self.format_string:
            return f"{self.format_string.format(value)}%"
        return self.format_string.format(value)

    def is_in_range(self, value: float) -> bool:
        """Check if value is within typical range"""
        min_val, max_val = self.typical_range
        return min_val <= value <= max_val


@dataclass
class SentimentScore:
    """Sentiment analysis scores with enhanced properties"""

    positive: float
    negative: float
    neutral: float
    compound: float = field(default=0.0)

    def __post_init__(self):
        """Calculate compound score if not provided"""
        if self.compound == 0.0:
            self.compound = self.positive - self.negative

    @property
    def dominant_sentiment(self) -> str:
        """Determine the dominant sentiment"""
        scores = {
            "positive": self.positive,
            "negative": self.negative,
            "neutral": self.neutral,
        }
        return max(scores.items(), key=lambda x: x[1])[0]

    @property
    def sentiment_label(self) -> str:
        """Get a human-readable sentiment label"""
        if self.compound >= 0.05:
            return "Positive"
        if self.compound <= -0.05:
            return "Negative"
        return "Neutral"

    @property
    def sentiment_color(self) -> str:
        """Get a color code for visualization"""
        if self.compound >= 0.05:
            return "#1f77b4"  # Blue
        if self.compound <= -0.05:
            return "#d62728"  # Red
        return "#7f7f7f"  # Gray


@dataclass
class Topic:
    """Topic model result with keywords and metadata"""

    id: int
    name: str
    keywords: List[str]
    category: str
    probability: float = 0.0

    @property
    def is_credit_risk_related(self) -> bool:
        """Check if topic is related to credit risk"""
        return self.category == "CREDIT_RISK"

    @property
    def primary_keyword(self) -> str:
        """Get the most representative keyword"""
        return self.keywords[0] if self.keywords else ""


@dataclass
class CreditRiskMention:
    """Mention of a credit risk metric with context and sentiment"""

    metric_type: str  # 'CET1', 'NPL_RATIO', etc.
    mention_text: str
    mention_count: int = 1
    sentiment: Optional[SentimentScore] = None
    keyword_used: Optional[str] = None


@dataclass
class QAPair:
    """Question-answer pair with metadata"""

    question: str
    answer: str
    answer_speaker: str
    answer_role: str
    preprocessed: Dict[str, Dict[str, str]] = field(default_factory=dict)

    def add_preprocessed(self, text_type: str, level: str, text: str):
        """Add preprocessed text"""
        if text_type not in self.preprocessed:
            self.preprocessed[text_type] = {}
        self.preprocessed[text_type][level] = text


@dataclass
class AnalystConversation:
    """Complete analyst conversation with Q&A pairs and analysis results"""

    id: Optional[int] = None
    analyst_name: str = ""
    analyst_company: str = ""
    qa_pairs: List[QAPair] = field(default_factory=list)
    topic: Optional[Topic] = None
    conversation_sentiment: Optional[SentimentScore] = None
    speaker_sentiments: Dict[str, SentimentScore] = field(default_factory=dict)
    credit_risk_mentions: List[CreditRiskMention] = field(default_factory=list)
    vector_id: Optional[int] = None
    metadata: Dict[str, Any] = field(default_factory=dict)

    @property
    def combined_text(self) -> str:
        """Get combined text of all Q&A pairs"""
        return " ".join([f"{qa.question} {qa.answer}" for qa in self.qa_pairs])

    @property
    def has_credit_risk_mention(self) -> bool:
        """Check if conversation mentions credit risk"""
        return len(self.credit_risk_mentions) > 0

    @property
    def speakers(self) -> List[str]:
        """Get unique list of speakers"""
        return list(set([qa.answer_speaker for qa in self.qa_pairs]))

    @property
    def speaker_roles(self) -> Dict[str, str]:
        """Get mapping of speakers to roles"""
        return {qa.answer_speaker: qa.answer_role for qa in self.qa_pairs}


@dataclass
class ReportAnalysis:
    """Analysis results for a quarterly earnings report"""

    bank_name: str
    year: int
    quarter: str
    credit_risk_score: Dict
    report_date: Optional[datetime] = None
    conversations: List[AnalystConversation] = field(default_factory=list)
    topics: Dict[int, Topic] = field(default_factory=dict)
    credit_metrics: Dict[str, float] = field(default_factory=dict)

    @property
    def period_label(self) -> str:
        """Get formatted period label"""
        return f"{self.year} {self.quarter}"

    @property
    def overall_sentiment(self) -> SentimentScore:
        """Calculate overall sentiment across all conversations"""
        if not self.conversations:
            return SentimentScore(0.33, 0.33, 0.34, 0.0)

        pos = sum(
            c.conversation_sentiment.positive
            for c in self.conversations
            if c.conversation_sentiment
        )
        neg = sum(
            c.conversation_sentiment.negative
            for c in self.conversations
            if c.conversation_sentiment
        )
        neu = sum(
            c.conversation_sentiment.neutral
            for c in self.conversations
            if c.conversation_sentiment
        )
        count = sum(1 for c in self.conversations if c.conversation_sentiment)

        if count == 0:
            return SentimentScore(0.33, 0.33, 0.34, 0.0)

        return SentimentScore(
            positive=pos / count,
            negative=neg / count,
            neutral=neu / count,
        )

    def topic_mention_counts(self) -> Dict[str, int]:
        """Count mentions by topic"""
        topic_counts = {}
        for conv in self.conversations:
            if conv.topic and conv.topic.name:
                topic_counts[conv.topic.name] = topic_counts.get(conv.topic.name, 0) + 1
        return topic_counts

    def credit_metric_mentions(self) -> Dict[str, List[CreditRiskMention]]:
        """Group credit risk mentions by metric type"""
        result = {}
        for conv in self.conversations:
            for mention in conv.credit_risk_mentions:
                if mention.metric_type not in result:
                    result[mention.metric_type] = []
                result[mention.metric_type].append(mention)
        return result


@dataclass
class ComparativeAnalysis:
    """Cross-bank comparison analysis"""

    period: str  # e.g., "2024 Q1"
    banks: List[str]
    topic_sentiments: Dict[str, Dict[str, float]] = field(default_factory=dict)
    metric_values: Dict[str, Dict[str, float]] = field(default_factory=dict)

    def add_topic_sentiment(self, bank: str, topic: str, sentiment: float):
        """Add sentiment data for a bank-topic pair"""
        if topic not in self.topic_sentiments:
            self.topic_sentiments[topic] = {}
        self.topic_sentiments[topic][bank] = sentiment

    def add_metric_value(self, bank: str, metric: str, value: float):
        """Add metric value for a bank-metric pair"""
        if metric not in self.metric_values:
            self.metric_values[metric] = {}
        self.metric_values[metric][bank] = value

    def sentiment_gap(self, bank1: str, bank2: str, topic: str) -> float:
        """Calculate sentiment gap between two banks for a topic"""
        if topic not in self.topic_sentiments:
            return 0.0

        bank1_sentiment = self.topic_sentiments[topic].get(bank1, 0.0)
        bank2_sentiment = self.topic_sentiments[topic].get(bank2, 0.0)

        return bank1_sentiment - bank2_sentiment


# Define our key credit metrics
CREDIT_METRICS = {
    "CET1_RATIO": CreditMetric(
        name="CET1 Ratio",
        description="Common Equity Tier 1 Capital Ratio measures a bank's core equity capital against its risk-weighted assets.",
        keywords={
            "cet1",
            "common equity tier 1",
            "capital ratio",
            "capital adequacy",
            "tier 1",
            "capital buffer",
        },
        indicators={
            "regulatory",
            "requirements",
            "target",
            "minimum",
            "comfortable",
            "strong",
        },
        typical_range=(10.0, 18.0),
        format_string="{:.1f}",
    ),
    "NPL_RATIO": CreditMetric(
        name="NPL Ratio",
        description="Non-Performing Loan Ratio measures the percentage of loans that are in default or close to default.",
        keywords={
            "npl",
            "non-performing loan",
            "bad loan",
            "impaired",
            "delinquent",
            "problem loan",
        },
        indicators={
            "increase",
            "decrease",
            "deteriorate",
            "improve",
            "stable",
            "worsening",
        },
        typical_range=(0.5, 5.0),
        format_string="{:.2f}",
    ),
    "COVERAGE_RATIO": CreditMetric(
        name="Coverage Ratio",
        description="Loan Loss Coverage Ratio shows how much of the non-performing loans are covered by loan loss reserves.",
        keywords={
            "coverage",
            "provision coverage",
            "loan loss reserve",
            "allowance",
            "reserve",
        },
        indicators={"adequate", "sufficient", "conservative", "aggressive", "prudent"},
        typical_range=(60.0, 150.0),
        format_string="{:.1f}",
    ),
    "TEXAS_RATIO": CreditMetric(
        name="Texas Ratio",
        description="Texas Ratio measures credit problems relative to a bank's capital. Higher values indicate greater risk.",
        keywords={
            "texas ratio",
            "problem assets",
            "capital basis",
            "troubled",
            "risk measure",
        },
        indicators={"concerning", "worrying", "manageable", "acceptable", "elevated"},
        typical_range=(0.0, 100.0),
        format_string="{:.1f}",
    ),
    "PCL": CreditMetric(
        name="Provision for Credit Losses",
        description="PCL represents the amount set aside to cover expected loan losses.",
        keywords={
            "pcl",
            "provision",
            "loan loss",
            "credit loss",
            "impairment charge",
            "charge-off",
        },
        indicators={"build", "release", "increase", "decrease", "reserve", "provision"},
        typical_range=(0.0, 5000.0),
        format_string="${:.0f}M",
        is_percentage=False,
    ),
}


@dataclass
class RiskCategory:
    name: str
    metrics: Set[str]
    keywords: Set[str]
    indicators: Set[str]


RISK_CATEGORIES = {
    # Credit Risk: Deterioration in credit quality
    "CREDIT_RISK": RiskCategory(
        name="Credit Risk",
        metrics={
            "NPL",
            "LLP",
            "ECL",
            "PD",
            "LGD",
            "EAD",
            "CCR",
            "CVA",
            "CET1",
            "Tier 1",
            "RWA",
            "IFRS 9",
            "Stage 2",
            "Stage 3",
            "Coverage Ratio",
            "Cost of Risk",
            "Risk Density",
        },
        keywords={
            "default",
            "provision",
            "exposure",
            "collateral",
            "lending",
            "credit quality",
            "restructuring",
            "forbearance",
            "writeoff",
            "delinquency",
            "impairment",
            "recovery",
            "bankruptcy",
            "loan book",
            "portfolio quality",
            "underwriting",
            "concentration",
        },
        indicators={
            "deteriorating",
            "increasing",
            "elevated",
            "concerning",
            "stress",
            "weakness",
            "adverse",
            "negative",
            "challenged",
            "pressure",
            "downgrade",
            "migration",
            "strain",
            "heightened",
        },
    ),
    # Market Risk: Market volatility and uncertainty
    "MARKET_RISK": RiskCategory(
        name="Market Risk",
        metrics={
            "VaR",
            "SVaR",
            "IRC",
            "CRM",
            "DV01",
            "PV01",
            "Beta",
            "Delta",
            "Gamma",
            "Vega",
            "Theta",
            "RNIV",
            "NII",
            "EVE",
            "P&L",
            "MTM",
        },
        keywords={
            "trading",
            "volatility",
            "market value",
            "hedging",
            "position",
            "liquidity",
            "spread",
            "interest rate",
            "fx",
            "commodity",
            "equity",
            "derivative",
            "securities",
            "bond",
            "yield",
            "trading book",
        },
        indicators={
            "volatile",
            "unstable",
            "uncertain",
            "fluctuating",
            "adverse",
            "turbulent",
            "stressed",
            "illiquid",
            "disruption",
            "decline",
            "sharp",
            "rapid",
            "extreme",
            "severe",
        },
    ),
    # Liquidity Risk: Funding and liquidity pressures
    "LIQUIDITY_RISK": RiskCategory(
        name="Liquidity Risk",
        metrics={
            "LCR",
            "NSFR",
            "LDR",
            "HQLA",
            "CFP",
            "MLR",
            "Survival Period",
            "Funding Gap",
            "Buffer",
            "Liquidity Coverage",
            "Asset Encumbrance",
        },
        keywords={
            "funding",
            "deposit",
            "withdrawal",
            "cash flow",
            "maturity",
            "refinancing",
            "encumbrance",
            "contingency",
            "intraday",
            "wholesale",
            "retail",
            "interbank",
            "funding mix",
        },
        indicators={
            "stress",
            "shortage",
            "constraint",
            "tightening",
            "outflow",
            "drain",
            "pressure",
            "strain",
            "mismatch",
            "gap",
            "dry up",
        },
    ),
    # Group Risk: Structural and interconnection concerns
    "GROUP_RISK": RiskCategory(
        name="Group Risk",
        metrics={
            "RoE",
            "RoA",
            "CIR",
            "Capital Ratio",
            "Leverage Ratio",
            "MREL",
            "TLAC",
            "Combined Buffer",
            "Group Solvency",
            "Consolidated Capital",
        },
        keywords={
            "solvency",
            "capital adequacy",
            "risk appetite",
            "risk strategy",
            "group structure",
            "consolidation",
            "interconnectedness",
            "intragroup",
            "cross-border",
            "governance",
            "group exposure",
            "contagion",
            "concentration",
            "diversification",
        },
        indicators={
            "systemic",
            "material",
            "significant",
            "strategic",
            "group-wide",
            "consolidated",
            "aggregate",
            "cumulative",
            "interconnected",
            "concentrated",
            "correlated",
        },
    ),
}

### 📝 Text Preprocessing

In [5]:
class TextPreprocessor:
    def __init__(self, db: GSIBDatabase):
        """
        Initialize text preprocessor with database connection

        Args:
            db: DB Manager
        """
        # Set up logging
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        self.logger.info(f"Initializing {__name__}")

        self.db = db

        # Download required NLTK data
        self._download_nltk_data()

        # Load spaCy model
        try:
            self.nlp = spacy.load("en_core_web_sm")
            self.logger.info("Loaded spaCy model: en_core_web_sm")
        except OSError:
            self.logger.info("Downloading spaCy model: en_core_web_sm")
            import subprocess

            subprocess.run(
                ["python", "-m", "spacy", "download", "en_core_web_sm"],
                check=True,
                capture_output=True,
            )
            self.nlp = spacy.load("en_core_web_sm")

        # Set up stopwords and financial terms
        self._setup_stopwords_and_terms()

    def _download_nltk_data(self):
        """Download required NLTK data"""
        try:
            nltk.download("punkt", quiet=True)
            nltk.download("punkt_tab", quiet=True)
            nltk.download("stopwords", quiet=True)
            self.logger.info("NLTK data downloaded successfully")
        except Exception as e:
            self.logger.error(f"Error downloading NLTK data: {str(e)}")
            raise

    def _setup_stopwords_and_terms(self):
        """Set up stopwords and financial terms"""
        # Base stopwords
        self.base_stopwords = set(stopwords.words("english"))

        # Earnings call specific stopwords
        self.earnings_call_stopwords = {
            "guidance",
            "forward-looking",
            "please",
            "thank",
            "you",
            "statements",
            "morning",
            "good",
            "call",
            "question",
            "questions",
            "answer",
            "answers",
            "afternoon",
            "basically",
            "okay",
            "thanks",
            "sorry",
            "hi",
            "hello",
            "think",
            "hear",
            "heard",
            "like",
            "liked",
        }

        # Financial terms to preserve
        self.financial_terms = {
            "equity",
            "capital",
            "increase",
            "liabilities",
            "interest",
            "earnings",
            "decrease",
            "return",
            "tier",
            "stock",
            "assets",
            "loss",
            "eps",
            "trading",
            "mortgage",
            "investment",
            "regulatory",
            "compliance",
            "volatility",
            "expenses",
            "growth",
            "market",
            "debt",
            "decline",
            "rate",
            "dividend",
            "revenue",
            "margin",
            "risk",
            "deposit",
            "profit",
            "liability",
            "asset",
            "liquidity",
            "credit",
            "ratio",
            "cash flow",
            "loan",
            "ai",
            # Add credit risk specific terms
            "npl",
            "cet1",
            "provision",
            "coverage",
            "texas",
            "nonperforming",
            "impaired",
            "delinquent",
            "reserve",
            "allowance",
            "charge-off",
        }

        # Combine all stopwords, excluding financial terms
        self.all_stopwords = (
            self.base_stopwords.union(self.earnings_call_stopwords)
            - self.financial_terms
        )

        # Common financial abbreviations mapping
        self.abbreviations = {
            "CEO": "chief executive officer",
            "CFO": "chief financial officer",
            "CRO": "chief risk officer",
            "ROE": "return on equity",
            "ROI": "return on investment",
            "YOY": "year over year",
            "QOQ": "quarter over quarter",
            "FY": "fiscal year",
            "IBC": "investment banking",
            "M&A": "mergers and acquisitions",
            "DCM": "debt capital markets",
            "NPL": "non performing loan",
            "CET1": "common equity tier one",
            "PCL": "provision for credit losses",
            "LLP": "loan loss provision",
        }

    def expand_abbreviations(self, text: str) -> str:
        """Expand common financial abbreviations"""
        if not isinstance(text, str):
            return ""

        for abbr, full in self.abbreviations.items():
            text = re.sub(rf"\b{abbr}\b", full, text, flags=re.IGNORECASE)
        return text

    def clean_text(self, text: str, remove_numbers: bool = True) -> str:
        """Basic text cleaning"""
        if not isinstance(text, str):
            return ""

        # Convert to lowercase
        text = text.lower()

        # Expand abbreviations
        text = self.expand_abbreviations(text)

        # Remove special characters and extra whitespace
        text = re.sub(r"[^\w\s]", " ", text)

        # Call artifacts
        text = re.sub(r"\(ph\)|\{.*?\}", " ", text)

        # Remove numbers if specified
        if remove_numbers:
            text = re.sub(r"\d+", "", text)

        # Remove extra whitespace
        text = " ".join(text.split())

        return text

    def remove_stopwords(
        self,
        text: str,
        custom_stopwords: Optional[Set[str]] = None,
    ) -> str:
        """Remove stopwords with optional custom additions"""
        if not text:
            return ""

        # Combine stopwords
        stopwords_to_use = self.all_stopwords
        if custom_stopwords:
            stopwords_to_use = stopwords_to_use.union(set(custom_stopwords))

        # Tokenize and remove stopwords
        words = word_tokenize(text)
        filtered_words = [
            word for word in words if word.lower() not in stopwords_to_use
        ]

        return " ".join(filtered_words)

    def tokenize(self, text: str) -> List[str]:
        """Tokenize text into words"""
        if not text:
            return []

        return word_tokenize(text)

    def lemmatize_with_spacy(self, text: str) -> str:
        """Lemmatize text using spaCy"""
        if not text:
            return ""

        try:
            doc = self.nlp(text)

            # Extract lemmatized tokens, preserving entities and financial terms
            processed_tokens = []
            for token in doc:
                if (
                    token.text.lower() in self.financial_terms
                    or token.ent_type_
                    in {"ORG", "MONEY", "PERCENT", "DATE", "CARDINAL"}
                    or (len(token.text) > 2 and token.ent_type_ != "PERSON")
                ):  # Filter single/double character tokens
                    processed_tokens.append(token.lemma_)

            return " ".join(processed_tokens)
        except Exception as e:
            self.logger.error(f"Error in lemmatization: {str(e)}")
            return text

    def preprocess_text(
        self,
        text: str,
        custom_stopwords: Optional[Set[str]] = None,
        remove_numbers: bool = True,
    ) -> Dict[str, str]:
        """
        Complete preprocessing pipeline for earnings call text

        Args:
            text: Raw text to preprocess
            custom_stopwords: Optional additional stopwords
            remove_numbers: Whether to remove numeric values

        Returns:
            Dictionary with different preprocessing levels
        """
        if not isinstance(text, str) or not text.strip():
            return {"raw": "", "cleaned": "", "tokenized": "", "lemmatized": ""}

        try:
            # Store raw text
            result = {"raw": text}

            # Clean text
            cleaned_text = self.clean_text(text, remove_numbers=remove_numbers)
            result["cleaned"] = cleaned_text

            # Remove stopwords
            no_stopwords = self.remove_stopwords(
                cleaned_text,
                custom_stopwords=custom_stopwords,
            )

            # Tokenize
            tokens = self.tokenize(no_stopwords)
            result["tokenized"] = " ".join(tokens)

            # Lemmatize
            lemmatized = self.lemmatize_with_spacy(no_stopwords)
            result["lemmatized"] = lemmatized

            return result

        except Exception as e:
            self.logger.error(f"Error preprocessing text: {str(e)}")
            return {"raw": text, "cleaned": text, "tokenized": text, "lemmatized": text}

    def process_qa_pairs_from_db(self, limit: Optional[int] = None):
        """
        Pre-process raw QA pairs directly from the qa_pairs table

        This method retrieves raw QA pairs and applies preprocessing steps
        (tokenized, lemmatized) on both question and answer texts.

        Args:
            limit: Optional limit on number of QA pairs to process
        """
        # Get QA pairs needing processing
        raw_qa_pairs = self.db.get_unprocessed_qa_pairs(limit)
        if not raw_qa_pairs:
            self.logger.info("No QA pairs found for preprocessing")
            return

        self.logger.info(f"Found {len(raw_qa_pairs)} QA pairs for preprocessing")

        def process_entries(conn):
            # Process each QA pair
            for qa_pair in tqdm(raw_qa_pairs, desc="Processing QA pairs"):
                qa_pair_id = qa_pair["id"]
                question_text = qa_pair["question"]
                answer_text = qa_pair["answer"]

                # Skip if texts are empty
                if not question_text or not answer_text:
                    self.logger.warning(
                        f"Skipping QA pair with empty text: {qa_pair_id}"
                    )
                    continue

                # Preprocess question text with the full pipeline
                question_results = self.preprocess_text(question_text)

                # Preprocess answer text with the full pipeline
                answer_results = self.preprocess_text(answer_text)

                # Create and preprocess combined text
                combined_text = f"{question_text} {answer_text}"
                combined_results = self.preprocess_text(combined_text)

                # Store all preprocessed texts
                self._store_preprocessed_text(
                    qa_pair_id,
                    question_results,
                    answer_results,
                    combined_results,
                )

            return True

        try:
            # Execute the function within a transaction
            self.db.execute_in_transaction(process_entries)
            self.logger.info("Successfully completed preprocessing")
            # Checkpoint after successful processing
            self.db.checkpoint_wal()
        except Exception as e:
            self.logger.error(f"Error processing QA pairs: {str(e)}")
            raise

    def _store_preprocessed_text(
        self,
        qa_pair_id,
        question_results,
        answer_results,
        combined_results,
    ):
        """
        Store preprocessed text in the database

        Args:
            qa_pair_id: ID of the QA pair
            question_results: Dict with question preprocessing results
            answer_results: Dict with answer preprocessing results
            combined_results: Dict with combined preprocessing results
        """

        def store_text(conn):
            # We're only storing tokenized and lemmatized versions
            preprocessing_levels = ["tokenized", "lemmatized"]

            # Store question preprocessing results with metadata
            for level in preprocessing_levels:
                if level in question_results:
                    text = question_results[level]
                    metadata = {
                        "token_count": len(text.split()),
                        "source": "raw_question",
                    }
                    self.db.add_preprocessed_text(
                        qa_pair_id,
                        "question",
                        level,
                        text,
                        metadata,
                    )

            # Store answer preprocessing results with metadata
            for level in preprocessing_levels:
                if level in answer_results:
                    text = answer_results[level]
                    metadata = {
                        "token_count": len(text.split()),
                        "source": "raw_answer",
                    }
                    self.db.add_preprocessed_text(
                        qa_pair_id,
                        "answer",
                        level,
                        text,
                        metadata,
                    )

            # Store combined preprocessing results with metadata
            for level in preprocessing_levels:
                if level in combined_results:
                    text = combined_results[level]
                    metadata = {
                        "token_count": len(text.split()),
                        "source": "combined_qa",
                    }
                    self.db.add_preprocessed_text(
                        qa_pair_id,
                        "combined",
                        level,
                        text,
                        metadata,
                    )

            return True

        try:
            # Execute the function within a transaction
            self.db.execute_in_transaction(store_text)
        except Exception as e:
            self.logger.error(f"Error storing preprocessed text: {str(e)}")
            raise

### 🔤 Text Embedder

In [6]:
class TextEmbedder:
    def __init__(self, model_name):
        """
        Initialize text embedder with transformer model

        Args:
            model: Transformer model
        """
        self.model_name = model_name

        # Model-specific configurations
        model_configs = {
            "ProsusAI/finbert": {
                "dimension": 768,
                "normalize": False,
                "pooling": "cls",
            },
            "yiyanghkust/finbert-tone": {
                "dimension": 768,
                "normalize": False,
                "pooling": "cls",
            },
            "BAAI/bge-large-en-v1.5": {
                "dimension": 1024,
                "normalize": True,
                "pooling": "mean",
            },
        }

        self.config = model_configs.get(
            model_name,
            {"dimension": 1024, "normalize": True, "pooling": "mean", "batch_size": 32},
        )

        # Initialize model and tokenizer
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)
        self.model.to(device)

    def get_embedding(self, text: str) -> List[float]:
        """
        Generate embedding for a single text

        Args:
            text: Input text to embed

        Returns:
            Embedding as a list of floats
        """
        # Tokenize input
        inputs = self.tokenizer(
            text,
            max_length=self.config["dimension"],
            padding=True,
            truncation=True,
            return_tensors="pt",
        ).to(device)

        # Generate embeddings
        with torch.no_grad():
            outputs = self.model(**inputs)

        # Apply pooling strategy
        if self.config["pooling"] == "cls":
            embeddings = outputs.last_hidden_state[:, 0]
        else:  # mean pooling
            attention_mask = inputs["attention_mask"]
            token_embeddings = outputs.last_hidden_state
            input_mask_expanded = (
                attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
            )
            embeddings = torch.sum(
                token_embeddings * input_mask_expanded,
                1,
            ) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)

        # Normalize if required
        if self.config["normalize"]:
            embeddings = F.normalize(embeddings, p=2, dim=1)

        return embeddings[0].cpu().numpy().tolist()

    def __call__(self, text: str) -> List[float]:
        """
        Allow direct calling of the object

        Args:
            text: Input text to embed

        Returns:
            Embedding as a list of floats
        """
        return self.get_embedding(text)

    def embed_batch(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings for a batch of texts"""
        all_embeddings = []
        for i in range(0, len(texts), self.config["dimension"]):
            batch = texts[i : i + self.config["dimension"]]
            batch_embeddings = [self.get_embedding(text) for text in batch]
            all_embeddings.extend(batch_embeddings)
        return all_embeddings

### 💹 Topic modeling

In [7]:
class TopicAnalyzer:
    """BERTopic based topic clustering for G-SIB quarterly announcement analysis"""

    def __init__(
        self,
        db: GSIBDatabase,
        embedder: TextEmbedder,
        preprocessor=None,
        sentiment_analyzer=None,
    ):
        """
        Initialize topic analyzer with BERTopic model

        Args:
            db: GSIBDatabase instance
            embedder: TextEmbedder instance
            preprocessor: TextPreprocessor instance
            sentiment_analyzer: SentimentAnalyser instance
        """
        # Set up logging
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        self.logger.info(f"Initializing {__name__}")

        # Store database and components
        self.db = db
        self.embedder = embedder
        self.preprocessor = preprocessor
        self.sentiment_analyzer = sentiment_analyzer

        # Configure HDBSCAN parameters for more lenient clustering
        hdbscan_params = {
            "min_cluster_size": 2,  # Minimum size of clusters
            "min_samples": 1,  # More lenient cluster assignment
            "metric": "euclidean",
            "cluster_selection_method": "eom",
            "prediction_data": True,  # Enable prediction for new documents
        }

        # Configure UMAP parameters for better separation
        umap_params = {
            "n_neighbors": 3,  # Smaller local neighborhood
            "n_components": 5,  # Reduced dimensions
            "min_dist": 0.0,  # Allow tighter clusters
            "metric": "cosine",  # Better for text embeddings
        }

        # Initialize BERTopic model
        self.model = BERTopic(
            embedding_model=embedder.model,  # Use the prepared embedding model
            language="english",
            calculate_probabilities=True,
            verbose=False,
            # UMAP configuration
            umap_model=UMAP(**umap_params),
            # HDBSCAN configuration
            hdbscan_model=HDBSCAN(**hdbscan_params),
            # Additional parameters
            nr_topics="auto",  # Let model determine optimal number
            top_n_words=20,  # More words per topic
        )

        # Generate risk category embeddings for similarity comparison
        self._generate_risk_embeddings()

    def _generate_risk_embeddings(self):
        """Generate embeddings for risk categories for similarity comparison"""
        self.risk_embeddings = {}

        for category_id, category in RISK_CATEGORIES.items():
            # Create a rich context description for the category
            context = (
                f"{category.name}: "
                f"Key metrics: {', '.join(sorted(list(category.metrics))[:10])}. "
                f"Keywords: {', '.join(sorted(list(category.keywords))[:10])}. "
                f"Indicators: {', '.join(sorted(list(category.indicators))[:10])}"
            )

            # Generate embedding
            embedding = self.embedder.embed_batch([context])[0]

            # Store embedding
            self.risk_embeddings[category_id] = embedding

        self.logger.info(
            f"Generated embeddings for {len(self.risk_embeddings)} risk categories",
        )

    def analyze_topics(self, limit: Optional[int] = None, new_only: bool = True):
        """
        Analyze topics in conversations

        Args:
            limit: Optional limit on number of conversations to analyze
            new_only: Only analyze conversations without existing topic analysis

        Returns:
            Dictionary mapping conversation IDs to topic information
        """
        try:
            # Get conversations to analyze using the database method
            conversations = self.db.get_conversations_to_analyze(
                limit=limit,
                new_only=new_only,
            )

            self.logger.info(
                f"Found {len(conversations)} conversations for topic clustering",
            )

            # Get texts for each conversation
            texts = []
            conv_ids = []
            conv_metadata = []

            for conversation in tqdm(
                conversations,
                desc="Clustering conversation topics",
            ):
                conversation_id = conversation["id"]

                # Get lemmatized text using preprocessor
                text = self.db.get_conversation_text(
                    conversation_id,
                    level="lemmatized",
                )

                if not text:
                    self.logger.warning(
                        f"No text found for conversation {conversation_id}",
                    )
                    continue

                texts.append(text)
                conv_ids.append(conversation_id)

                # Get metadata for this conversation
                metadata = self.db.get_conversation_metadata(conversation_id)
                if metadata:
                    conv_metadata.append(
                        {
                            "bank": metadata["bank_name"],
                            "year": metadata["year"],
                            "quarter": metadata["quarter"],
                            "analyst_name": metadata["analyst_name"],
                            "analyst_company": metadata["analyst_company"],
                        },
                    )
                else:
                    conv_metadata.append({})

            if not texts:
                self.logger.warning("No texts found for topic analysis")
                return {}

            self.logger.info(f"Running topic modeling on {len(texts)} conversations")

            # Run BERTopic
            topics, probs = self.model.fit_transform(texts)

            # Process results
            topic_info = self.model.get_topic_info()

            # Store results using data classes
            results = {}
            for i, (conv_id, topic_id, prob) in enumerate(zip(conv_ids, topics, probs)):
                # Skip outlier topic
                if topic_id == -1:
                    continue

                # Get top words for this topic
                topic_words = self.model.get_topic(topic_id)
                if not topic_words:
                    continue

                # Create a topic embedding for risk category classification
                topic_text = " ".join([word for word, _ in topic_words[:10]])
                topic_embedding = self.embedder.embed_batch([topic_text])[0]

                # Determine topic category
                topic_category = self._determine_topic_category(
                    topic_embedding,
                    topic_words,
                )

                # Generate topic name
                topic_name = self._generate_topic_name(topic_words, topic_category)

                # Create Topic dataclass instance
                topic_obj = Topic(
                    id=int(topic_id),
                    name=topic_name,
                    keywords=[word for word, _ in topic_words[:10]],
                    category=topic_category,
                    probability=float(prob)
                    if isinstance(prob, (int, float))
                    else float(max(prob)),
                )

                # Store in database
                self.db.update_conversation_topic(
                    conversation_id=conv_id,
                    topic_id=topic_obj.id,
                    topic_probability=topic_obj.probability,
                )

                # Add or update topic in topics table
                self.db.add_or_update_topic(
                    topic_id=topic_obj.id,
                    name=topic_obj.name,
                    keywords=topic_obj.keywords,
                    category=topic_obj.category,
                )

                # Add sentiment analysis if available
                if self.sentiment_analyzer and topic_category.startswith(
                    "CREDIT_RISK:",
                ):
                    # Get sample text for sentiment analysis
                    topic_sample = " ".join([word for word, _ in topic_words[:5]])
                    sentiment = self.sentiment_analyzer.get_sentiment_for_text(
                        topic_sample,
                    )

                    # Add to results
                    results[conv_id] = {
                        "topic": topic_obj,
                        "sentiment": sentiment,
                        "metadata": conv_metadata[i] if i < len(conv_metadata) else {},
                    }
                else:
                    results[conv_id] = {
                        "topic": topic_obj,
                        "metadata": conv_metadata[i] if i < len(conv_metadata) else {},
                    }

            self.logger.info(
                f"Completed topic analysis for {len(results)} conversations",
            )
            return results

        except Exception as e:
            self.logger.error(f"Error analyzing topics: {str(e)}")
            return {}

    def _determine_topic_category(self, topic_embedding, topic_words):
        """Determine topic category using embedding similarity with risk categories"""
        # Calculate similarity with risk categories
        similarities = {
            risk: cosine_similarity([topic_embedding], [embedding])[0][0]
            for risk, embedding in self.risk_embeddings.items()
        }

        # Find best match
        best_match = max(similarities, key=similarities.get)
        confidence = similarities[best_match]

        # If confidence is high enough, use this category
        if confidence > 0.5:
            # For credit risk, check if we can identify subcategory
            if best_match == "CREDIT_RISK":
                # Extract metrics from CREDIT_METRICS
                subcategory_scores = {}
                words = [word.lower() for word, _ in topic_words]

                for metric_key, metric in CREDIT_METRICS.items():
                    metric_score = 0
                    # Check keywords
                    for keyword in metric.keywords:
                        for word in words:
                            if keyword.lower() in word or word in keyword.lower():
                                metric_score += 1

                    # Check indicators
                    for indicator in metric.indicators:
                        for word in words:
                            if indicator.lower() in word or word in indicator.lower():
                                metric_score += 0.5

                    if metric_score > 0:
                        subcategory_scores[metric.name] = metric_score

                # If subcategory found, add it to result
                if subcategory_scores:
                    subcategory = max(subcategory_scores, key=subcategory_scores.get)
                    return f"CREDIT_RISK:{subcategory}"

            return best_match

        # Fall back to keyword matching for low confidence cases
        return self._determine_topic_category_by_keywords(topic_words)

    def _determine_topic_category_by_keywords(self, topic_words):
        """Determine topic category using keyword matching"""
        # Extract words from topic
        words = [word.lower() for word, _ in topic_words]

        # Calculate risk category scores
        category_scores = {}

        # Check against risk categories
        for category_id, category in RISK_CATEGORIES.items():
            # Check for metrics
            metric_matches = sum(
                1
                for word in words
                for metric in category.metrics
                if metric.lower() in word or word in metric.lower()
            )

            # Check for keywords
            keyword_matches = sum(
                1
                for word in words
                for keyword in category.keywords
                if keyword.lower() in word or word in keyword.lower()
            )

            # Check for indicators
            indicator_matches = sum(
                1
                for word in words
                for indicator in category.indicators
                if indicator.lower() in word or word in indicator.lower()
            )

            # Calculate weighted score (metrics are most important)
            score = (
                (metric_matches * 1.5)
                + (keyword_matches * 1.0)
                + (indicator_matches * 0.5)
            )

            if score > 0:
                category_scores[category_id] = score

        # Find category with highest score
        if category_scores:
            max_category = max(category_scores, key=category_scores.get)
            if category_scores[max_category] > 1.0:
                return max_category

        return "OTHER"  # Default category if no clear match

    def _generate_topic_name(
        self,
        topic_words: List[Tuple[str, float]],
        category: str,
    ) -> str:
        """Generate a descriptive name for a topic based on top words and category"""
        # Get top 3 words
        top_words = [word for word, _ in topic_words[:3]]

        # Check if it's a credit risk topic
        if category.startswith("CREDIT_RISK:"):
            subcategory = category.split(":")[1]
            return f"Credit Risk: {subcategory} ({', '.join(top_words)})"

        # For other categories
        if ":" in category:
            main_category, subcategory = category.split(":")
            return f"{main_category}: {subcategory} ({', '.join(top_words)})"

        return f"{category}: {', '.join(top_words)}"

### 😊😐☹️ Sentiment Analysis

In [8]:
class SentimentAnalyser:
    """FinBERT based sentiment analysis for G-SIB earnings call QA pairs"""

    def __init__(
        self,
        db: GSIBDatabase,
        model_name="yiyanghkust/finbert-tone",
        preprocessor=None,
    ):
        """
        Initialize the sentiment analyzer with FinBERT or similar financial NLP model

        Args:
            db: GSIBDatabase instance
            model_name: HuggingFace model name (default: yiyanghkust/finbert-tone)
            preprocessor: TextPreprocessor instance
        """
        # Set up logging
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        self.logger.info(f"Initializing {__name__} with model {model_name}")

        # Store database and components
        self.db = db
        self.preprocessor = preprocessor

        # Initialize model and tokenizer
        try:
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
            self.model.to(device)

            # Cache for sentiment analysis to avoid repeated computations
            self.sentiment_cache = {}

        except Exception as e:
            self.logger.error(f"Error loading model: {str(e)}")
            raise

    def analyze_qa_pairs(self, limit: Optional[int] = None) -> Dict[int, Dict]:
        """
        Analyze sentiment for all QA pairs in the database

        Args:
            limit: Optional limit on number of QA pairs to analyze

        Returns:
            Dictionary mapping conversation IDs to sentiment analysis results
        """
        try:
            # Get conversations that need sentiment analysis
            conversations = self.db.get_conversations_for_sentiment_analysis(
                limit=limit,
            )
            if not conversations:
                self.logger.info("No conversations found for sentiment analysis")
                return {}

            self.logger.info(
                f"Found {len(conversations)} conversations for sentiment analysis",
            )

            # Analyze sentiment for each conversation
            results = {}
            for conversation in tqdm(conversations, desc="Analyzing sentiment"):
                conversation_id = conversation["conversation_id"]

                # Get metadata
                metadata = {
                    "bank": conversation.get("bank_name", ""),
                    "year": conversation.get("year", ""),
                    "quarter": conversation.get("quarter", ""),
                    "analyst_name": conversation.get("analyst_name", ""),
                    "analyst_company": conversation.get("analyst_company", ""),
                }

                # Get QA pairs for this conversation
                qa_pairs = self.db.get_qa_pairs(conversation_id)

                if not qa_pairs:
                    self.logger.warning(
                        f"No QA pairs found for conversation {conversation_id}",
                    )
                    continue

                # Get full conversation text for overall sentiment using preprocessor
                conversation_text = self.db.get_conversation_text(
                    conversation_id,
                    level="lemmatized",
                )
                if not conversation_text:
                    # Fallback to combining all QA pairs
                    conversation_text = " ".join(
                        [f"{qa['question']} {qa['answer']}" for qa in qa_pairs],
                    )

                # Get overall conversation sentiment
                conversation_sentiment_scores = self._get_sentiment(conversation_text)
                conversation_sentiment = SentimentScore(
                    positive=conversation_sentiment_scores["positive"],
                    negative=conversation_sentiment_scores["negative"],
                    neutral=conversation_sentiment_scores["neutral"],
                    compound=conversation_sentiment_scores["compound"],
                )

                # Analyze individual QA pairs
                qa_sentiments = []
                for qa_pair in qa_pairs:
                    # Get individual sentiments
                    question_sentiment_scores = self._get_sentiment(qa_pair["question"])
                    answer_sentiment_scores = self._get_sentiment(qa_pair["answer"])

                    # Create SentimentScore objects
                    question_sentiment = SentimentScore(
                        positive=question_sentiment_scores["positive"],
                        negative=question_sentiment_scores["negative"],
                        neutral=question_sentiment_scores["neutral"],
                        compound=question_sentiment_scores["compound"],
                    )

                    answer_sentiment = SentimentScore(
                        positive=answer_sentiment_scores["positive"],
                        negative=answer_sentiment_scores["negative"],
                        neutral=answer_sentiment_scores["neutral"],
                        compound=answer_sentiment_scores["compound"],
                    )

                    # Calculate combined sentiment
                    combined_sentiment = SentimentScore(
                        positive=(
                            question_sentiment.positive + answer_sentiment.positive
                        )
                        / 2,
                        negative=(
                            question_sentiment.negative + answer_sentiment.negative
                        )
                        / 2,
                        neutral=(question_sentiment.neutral + answer_sentiment.neutral)
                        / 2,
                        compound=(
                            question_sentiment.compound + answer_sentiment.compound
                        )
                        / 2,
                    )

                    # Store QA pair sentiment
                    qa_sentiments.append(
                        {
                            "qa_pair_id": qa_pair["id"],
                            "question_sentiment": question_sentiment,
                            "answer_sentiment": answer_sentiment,
                            "combined_sentiment": combined_sentiment,
                            "speaker": qa_pair["answer_speaker"],
                            "role": qa_pair["answer_role"],
                        },
                    )

                # Get all unique speakers
                speakers = self.db.get_all_conversation_speakers(conversation_id)

                # Calculate speaker-level sentiment
                speaker_sentiments = self._calculate_speaker_sentiments(qa_sentiments)

                # Store results
                results[conversation_id] = {
                    "qa_pairs": qa_sentiments,
                    "conversation": conversation_sentiment,
                    "speakers": speaker_sentiments,
                    "metadata": metadata,
                }

                # Store results in database
                self._store_sentiment_results(
                    conversation_id,
                    qa_sentiments,
                    conversation_sentiment,
                    speaker_sentiments,
                )

            return results

        except Exception as e:
            self.logger.error(f"Error analyzing sentiment: {str(e)}")
            return {}

    def _get_sentiment(self, text: str) -> Dict[str, float]:
        """
        Get sentiment scores for a single text using FinBERT

        Args:
            text: Input text to analyze

        Returns:
            Dictionary with positive, negative, neutral, and compound sentiment scores
        """
        # Check cache first
        cache_key = hash(text[:1000])  # Use first 1000 chars to create key
        if cache_key in self.sentiment_cache:
            return self.sentiment_cache[cache_key]

        try:
            # Truncate text if too long (FinBERT has 512 token limit)
            if len(text) > 5000:
                text = text[:5000]

            # Tokenize and get sentiment
            inputs = self.tokenizer(
                text,
                padding=True,
                truncation=True,
                max_length=512,
                return_tensors="pt",
            ).to(device)

            with torch.no_grad():
                outputs = self.model(**inputs)
                scores = torch.softmax(outputs.logits, dim=1)

            result = {
                "positive": float(scores[0][0]),
                "negative": float(scores[0][1]),
                "neutral": float(scores[0][2]),
                "compound": float(scores[0][0] - scores[0][1]),  # positive - negative
            }

            # Cache result
            self.sentiment_cache[cache_key] = result

            return result

        except Exception as e:
            self.logger.error(f"Error getting sentiment: {str(e)}")
            return {
                "positive": 0.33,
                "negative": 0.33,
                "neutral": 0.34,
                "compound": 0.0,
            }

    def _calculate_speaker_sentiments(
        self,
        qa_sentiments: List[Dict],
    ) -> Dict[str, Dict]:
        """
        Calculate sentiment scores for each speaker

        Args:
            qa_sentiments: List of QA pair sentiment results

        Returns:
            Dictionary mapping speaker names to sentiment scores
        """
        speaker_sentiments = {}

        # Group sentiments by speaker
        for qa in qa_sentiments:
            speaker = qa["speaker"]
            role = qa["role"]
            key = f"{speaker}|{role}"

            if key not in speaker_sentiments:
                speaker_sentiments[key] = {
                    "answer_texts": [],
                    "name": speaker,
                    "role": role,
                }

            # Get the answer text from the QA pair
            qa_pair_id = qa["qa_pair_id"]
            answer = self.db.get_qa_answer(qa_pair_id)

            if answer:
                speaker_sentiments[key]["answer_texts"].append(answer)

        # Calculate sentiment for each speaker using full text
        for key, data in speaker_sentiments.items():
            if data["answer_texts"]:
                # Combine all answers from this speaker
                full_text = " ".join(data["answer_texts"])

                # Get sentiment for combined text
                sentiment_scores = self._get_sentiment(full_text)
                sentiment = SentimentScore(
                    positive=sentiment_scores["positive"],
                    negative=sentiment_scores["negative"],
                    neutral=sentiment_scores["neutral"],
                    compound=sentiment_scores["compound"],
                )

                speaker_sentiments[key]["sentiment"] = sentiment
                del speaker_sentiments[key]["answer_texts"]  # Remove texts after use
            else:
                # Default sentiment if no texts
                speaker_sentiments[key]["sentiment"] = SentimentScore(
                    0.33,
                    0.33,
                    0.34,
                    0.0,
                )

        return speaker_sentiments

    def _store_sentiment_results(
        self,
        conversation_id: int,
        qa_sentiments: List[Dict],
        conversation_sentiment: SentimentScore,
        speaker_sentiments: Dict[str, Dict],
    ) -> None:
        """
        Store sentiment analysis results in the database

        Args:
            conversation_id: ID of the conversation
            qa_sentiments: List of QA pair sentiment results
            conversation_sentiment: Conversation-level sentiment scores
            speaker_sentiments: Speaker-level sentiment scores
        """
        try:
            # Store conversation-level sentiment
            self.db.add_conversation_sentiment(
                conversation_id=conversation_id,
                positive_score=conversation_sentiment.positive,
                negative_score=conversation_sentiment.negative,
                neutral_score=conversation_sentiment.neutral,
                compound_score=conversation_sentiment.compound,
            )

            # Store speaker-level sentiment
            for key, speaker_data in speaker_sentiments.items():
                self.db.add_speaker_sentiment(
                    conversation_id=conversation_id,
                    speaker_name=speaker_data["name"],
                    speaker_role=speaker_data["role"],
                    positive_score=speaker_data["sentiment"].positive,
                    negative_score=speaker_data["sentiment"].negative,
                    neutral_score=speaker_data["sentiment"].neutral,
                    compound_score=speaker_data["sentiment"].compound,
                )

            # Update credit risk mentions with sentiment (if present)
            mentions = self.db.get_credit_risk_mentions([conversation_id])

            for mention in mentions:
                # Get sentiment for the mention context
                if mention["mention_context"]:
                    contexts = json.loads(mention["mention_context"])
                    if contexts:
                        # Get sentiment for first context
                        context_sentiment = self._get_sentiment(contexts[0])
                        self.db.update_credit_risk_mention_sentiment(
                            mention_id=mention["id"],
                            positive_score=context_sentiment["positive"],
                            negative_score=context_sentiment["negative"],
                            neutral_score=context_sentiment["neutral"],
                            compound_score=context_sentiment["compound"],
                        )

            self.logger.info(
                f"Stored sentiment results for conversation {conversation_id}",
            )

        except Exception as e:
            self.logger.error(f"Error storing sentiment results: {str(e)}")
            raise

    def get_sentiment_for_text(self, text: str) -> SentimentScore:
        """
        Get sentiment score for any text

        Args:
            text: Text to analyze

        Returns:
            SentimentScore object with sentiment analysis
        """
        scores = self._get_sentiment(text)
        return SentimentScore(
            positive=scores["positive"],
            negative=scores["negative"],
            neutral=scores["neutral"],
            compound=scores["compound"],
        )

### ⚖️ Credit-Risk Analysis

In [9]:
class CreditRiskAnalyser:
    """Credit risk analysis for G-SIB earnings call QA pairs"""

    def __init__(self, db: GSIBDatabase):
        # Set up logging
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        self.logger.info(f"Initializing {__name__}")

        # Store database and components
        self.db = db
        self._populate_credit_risk_keywords()

    def _populate_credit_risk_keywords(self):
        """Populate database with credit risk keywords"""
        conn = db.begin_transaction()
        try:
            self.logger.info("Populating credit risk keywords")
            for metric_key, metric in CREDIT_METRICS.items():
                for keyword in metric.keywords:
                    self.db.add_credit_risk_keyword(keyword, metric_key, 1.0)
                for indicator in metric.indicators:
                    self.db.add_credit_risk_keyword(indicator, metric_key, 0.7)
            db.commit_transaction(conn)
            self.logger.info("Credit risk keywords populated")
        except Exception as e:
            self.logger.error(f"Error populating credit risk keywords: {str(e)}")
            db.rollback_transaction(conn)
            raise

    def _get_risk_level(self, score: float) -> str:
        """Convert numeric score to risk level category with more granularity"""
        if score < 15:
            return "Very Low"
        if score < 30:
            return "Low"
        if score < 45:
            return "Moderately Low"
        if score < 60:
            return "Moderate"
        if score < 75:
            return "Moderately High"
        if score < 85:
            return "High"
        return "Very High"

    def run_credit_risk_analysis(self, limit=None):
        """
        Run credit risk analysis

        Args:
            limit: Optional limit on number of conversations to analyze
        """
        self.logger.info(
            f"Starting credit risk analysis{' (limited to ' + str(limit) + ' conversations)' if limit else ''}",
        )

        # Get all conversations
        conversations = self.db.get_all_conversations()

        if limit:
            conversations = conversations[:limit]

        # Analyze credit risk mentions
        results = {}
        for conversation in tqdm(conversations, desc="Analyzing credit risk mentions"):
            conversation_id = conversation["id"]

            # Run credit risk analysis
            mentions = self.analyze_credit_risk_mentions(conversation_id)
            self.db.checkpoint_wal()

            if mentions:
                results[conversation_id] = mentions
                self.logger.debug(
                    f"Found {len(mentions)} credit risk mentions in conversation {conversation_id}",
                )

        return results

    def analyze_credit_risk_mentions(self, conversation_id: int) -> Dict[str, Any]:
        """
        Analyze credit risk metric mentions in a conversation

        Args:
            conversation_id: ID of the conversation

        Returns:
            Dictionary with credit risk mention analysis
        """
        try:
            # Get QA pairs for this conversation
            qa_pairs = self.db.get_qa_pairs(conversation_id)

            if not qa_pairs:
                return {}

            # Prepare text data
            qa_texts = [
                {"id": qa["id"], "text": f"{qa['question']} {qa['answer']}"}
                for qa in qa_pairs
            ]

            # Find credit risk metric mentions
            mentions = {}
            for metric_key, metric in CREDIT_METRICS.items():
                # Search for metric keywords in each QA pair
                mention_count = 0
                mention_contexts = []
                used_keywords = set()

                for qa in qa_texts:
                    text = qa["text"].lower()

                    # Check each keyword
                    for keyword in metric.keywords:
                        # Look for exact keyword matches with word boundaries
                        pattern = r"\b" + re.escape(keyword) + r"\b"
                        matches = list(re.finditer(pattern, text))

                        if matches:
                            mention_count += len(matches)
                            used_keywords.add(keyword)

                            # Extract context (50 chars before and after)
                            for match in matches[
                                :2
                            ]:  # Limit to first 2 matches per keyword
                                start_pos = match.start()
                                context_start = max(0, start_pos - 50)
                                context_end = min(
                                    len(text),
                                    start_pos + len(keyword) + 50,
                                )
                                context = text[context_start:context_end]
                                mention_contexts.append(context)

                if mention_count > 0:
                    # Create CreditRiskMention instance
                    mention = CreditRiskMention(
                        metric_type=metric_key,
                        mention_text=mention_contexts[0] if mention_contexts else "",
                        mention_count=mention_count,
                        keyword_used=list(used_keywords)[0] if used_keywords else None,
                    )

                    mentions[metric_key] = {
                        "mention": mention,
                        "contexts": mention_contexts[:3],  # Store up to 3 contexts
                    }

                    # Store in database using our new method
                    keyword_id = list(used_keywords)[0] if used_keywords else metric_key
                    self.db.add_credit_risk_mention(
                        conversation_id=conversation_id,
                        keyword_id=keyword_id,
                        mention_count=mention_count,
                        mention_contexts=mention_contexts[:3],
                        positive_score=0.33,  # Placeholder sentiment scores
                        negative_score=0.33,
                        neutral_score=0.34,
                        compound_score=0.0,
                    )

            return mentions

        except Exception as e:
            self.logger.error(f"Error analyzing credit risk mentions: {str(e)}")
            return {}

    def calculate_credit_risk_score(
        self,
        bank_name: str,
        year: int,
        quarter: str,
    ) -> Dict:
        """Calculate credit risk score for a bank's quarterly report with improved differentiation"""
        # Get metric values from database
        report_id = self.db.get_report_id(bank_name, year, quarter)
        if not report_id:
            return {}
        metrics = self.db.get_metrics_by_report(report_id)

        # Get mentions of credit risk from conversations
        conversations = self.db.get_bank_conversations(bank_name, year, quarter)
        credit_mentions = self.db.get_credit_risk_mentions(conversations)

        # Get bank asset size for scaling
        bank_assets = self._get_bank_assets(bank_name)

        # Calculate base scores WITHOUT size factor
        metric_scores = {
            "NPL_RATIO": self._score_npl_ratio(
                self.db.get_metric_value(metrics, "NPL_RATIO"),
            ),
            "COVERAGE_RATIO": self._score_coverage_ratio(
                self.db.get_metric_value(metrics, "COVERAGE_RATIO"),
            ),
            "PCL": self._score_pcl(self.db.get_metric_value(metrics, "PCL"), bank_name),
            "CET1_RATIO": self._score_cet1_ratio(
                self.db.get_metric_value(metrics, "CET1_RATIO"),
            ),
            "TEXAS_RATIO": self._score_texas_ratio(
                self.db.get_metric_value(metrics, "TEXAS_RATIO"),
            ),
        }

        # Add sentiment risk from mentions
        sentiment_risk = self._calculate_risk_from_mentions(credit_mentions)

        # Adjust weights based on bank size
        weights = self._get_adjusted_weights(bank_assets)

        # Calculate final weighted score
        risk_score = (
            weights["NPL_RATIO"] * metric_scores["NPL_RATIO"]
            + weights["COVERAGE_RATIO"] * metric_scores["COVERAGE_RATIO"]
            + weights["PCL"] * metric_scores["PCL"]
            + weights["CET1_RATIO"] * metric_scores["CET1_RATIO"]
            + weights["TEXAS_RATIO"] * metric_scores["TEXAS_RATIO"]
            # Add sentiment adjustment
            + (sentiment_risk * 1.2)  # Increased impact of sentiment analysis
        )

        # Apply volatility multiplier if available
        volatility_multiplier = self._calculate_volatility_multiplier(
            bank_name,
            year,
            quarter,
        )
        risk_score *= volatility_multiplier

        return {
            "bank": bank_name,
            "period": f"{year} {quarter}",
            "asset_size": f"${bank_assets / 1e12:.2f}T",
            "risk_score": min(100, max(0, risk_score)),
            "risk_level": self._get_risk_level(risk_score),
            "metric_scores": metric_scores,
            "sentiment_adjustment": sentiment_risk,
            "weights": weights,
            "high_risk_mentions": self._extract_high_risk_mentions(credit_mentions),
        }

    def _score_npl_ratio(self, value: float) -> float:
        """Higher NPL = Higher Risk"""
        if value <= 1.0:  # Excellent
            return 10
        if value <= 2.0:  # Good
            return 25
        if value <= 3.5:  # Average
            return 50
        if value <= 5.0:  # Concerning
            return 75
        # High Risk
        return 90 + min(10, (value - 5.0) * 2)  # Can go over 100

    def _score_coverage_ratio(self, value: float) -> float:
        """Lower Coverage = Higher Risk (inverse relationship)"""
        if value >= 120:  # Excellent
            return 10
        if value >= 100:  # Good
            return 25
        if value >= 80:  # Average
            return 50
        if value >= 60:  # Concerning
            return 75
        # High Risk
        return 90 + min(10, (60 - value) / 6)  # Can go over 100

    def _score_cet1_ratio(self, value: float) -> float:
        """Lower CET1 = Higher Risk (inverse relationship)"""
        if value >= 15.0:  # Excellent
            return 10
        if value >= 13.0:  # Good
            return 25
        if value >= 11.0:  # Average
            return 50
        if value >= 9.0:  # Concerning
            return 75
        # High Risk
        return 90 + min(10, (9.0 - value) * 5)  # Can go over 100

    def _score_texas_ratio(self, value: float) -> float:
        """Higher Texas Ratio = Higher Risk"""
        if value <= 20:  # Excellent
            return 10
        if value <= 40:  # Good
            return 25
        if value <= 60:  # Average
            return 50
        if value <= 80:  # Concerning
            return 75
        # High Risk
        return 90 + min(10, (value - 80) / 2)  # Can go over 100

    def _get_bank_assets(self, bank_name: str) -> float:
        """Get total assets for a bank."""
        bank_assets = {
            "JPMorgan Chase": 3.875e12,  # $3.875 trillion as of 2023
            "HSBC": 2.919e12,  # $2.919 trillion as of 2023
            "Bank of America": 3.180e12,  # $3.180 trillion as of 2023
            "UBS": 1.717e12,  # $1.717 trillion as of 2023
            "Morgan Stanley": 1.193e12,  # $1.193 trillion as of 2023
            "Credit Suisse": 0.727e12,  # $727 billion before its acquisition by UBS in 2023
            "Goldman Sachs": 1.642e12,  # $1.642 trillion as of 2023
        }

        return bank_assets.get(bank_name, 1.0e12)  # Default to $1 trillion

    def _calculate_size_factor(self, bank_assets: float) -> float:
        """Calculate a multiplier based on bank size to increase score differentiation"""
        if bank_assets >= 3.0e12:  # $3T+ (mega banks)
            return 1.15  # 15% higher score for mega banks
        if bank_assets >= 2.0e12:  # $2-3T
            return 1.10  # 10% higher score for very large banks
        if bank_assets >= 1.0e12:  # $1-2T
            return 1.0  # Baseline
        # <$1T
        return 0.90  # 10% lower score for smaller banks

    def _score_pcl(self, value: float, bank_name: str) -> float:
        """Higher PCL (relative to bank size) = Higher Risk"""
        # Get bank assets to normalize PCL
        bank_assets = self._get_bank_assets(bank_name)
        if not bank_assets:
            return 50  # Default to average if asset data unavailable

        # Calculate PCL as percentage of assets
        pcl_percentage = (value / bank_assets) * 100

        if pcl_percentage <= 0.05:  # Excellent
            return 10
        if pcl_percentage <= 0.10:  # Good
            return 25
        if pcl_percentage <= 0.20:  # Average
            return 50
        if pcl_percentage <= 0.40:  # Concerning
            return 75
        # High Risk
        return 90 + min(10, (pcl_percentage - 0.40) * 25)  # Can go over 100

    def _get_adjusted_weights(self, bank_assets: float) -> Dict[str, float]:
        """Return adjusted metric weights based on bank size"""
        # Base weights add up to 1.0
        if bank_assets >= 3.0e12:  # Mega banks (JPM, BofA)
            return {
                "NPL_RATIO": 0.30,  # Higher weight on asset quality
                "COVERAGE_RATIO": 0.15,
                "PCL": 0.20,  # Higher weight on provisions
                "CET1_RATIO": 0.20,  # Capital still important
                "TEXAS_RATIO": 0.15,
            }
        if bank_assets >= 1.5e12:  # Large banks (GS, MS)
            return {
                "NPL_RATIO": 0.25,
                "COVERAGE_RATIO": 0.15,
                "PCL": 0.15,
                "CET1_RATIO": 0.30,  # Higher capital requirements for investment banks
                "TEXAS_RATIO": 0.15,
            }
        # Medium sized banks
        return {
            "NPL_RATIO": 0.20,
            "COVERAGE_RATIO": 0.20,  # More focused on provisions
            "PCL": 0.20,
            "CET1_RATIO": 0.15,  # Less focus on capital
            "TEXAS_RATIO": 0.25,  # Higher focus on overall health
        }

    def _calculate_volatility_multiplier(
        self,
        bank_name: str,
        year: int,
        quarter: str,
    ) -> float:
        """Calculate multiplier based on metric volatility from previous quarters"""
        try:
            # Get current and previous quarter metrics
            current_id = self.db.get_report_id(bank_name, year, quarter)
            current_metrics = self.db.get_metrics_by_report(current_id)

            # Determine previous quarter
            prev_year, prev_quarter = self._get_previous_quarter(year, quarter)
            prev_id = self.db.get_report_id(bank_name, prev_year, prev_quarter)
            prev_metrics = self.db.get_metrics_by_report(prev_id)

            if not prev_metrics:
                return 1.0  # No previous data

            # Calculate percentage change in key metrics
            key_metrics = ["NPL_RATIO", "CET1_RATIO", "TEXAS_RATIO"]
            changes = []

            for metric in key_metrics:
                current_value = self.db.get_metric_value(current_metrics, metric)
                prev_value = self.db.get_metric_value(prev_metrics, metric)

                if current_value and prev_value > 0:  # Ensure we have valid values
                    pct_change = abs((current_value - prev_value) / prev_value)
                    changes.append(pct_change)

            if not changes:
                return 1.0

            # Average volatility
            avg_change = sum(changes) / len(changes)

            # Higher volatility = higher multiplier
            if avg_change < 0.05:  # <5% change
                return 1.0
            if avg_change < 0.10:  # 5-10% change
                return 1.05
            if avg_change < 0.20:  # 10-20% change
                return 1.10
            # >20% change
            return 1.15

        except Exception as e:
            self.logger.error(f"Error calculating volatility: {str(e)}")
            return 1.0

    def _get_previous_quarter(self, year: int, quarter: str) -> Tuple[int, str]:
        """Get previous quarter and year"""
        q_map = {"Q1": 4, "Q2": 1, "Q3": 2, "Q4": 3}
        prev_q_num = q_map[quarter]
        prev_year = year - 1 if quarter == "Q1" else year
        prev_quarter = f"Q{prev_q_num}"
        return prev_year, prev_quarter

    def _calculate_risk_from_mentions(self, mentions: List[Dict]) -> float:
        """Calculate risk adjustment based on sentiment with higher impact"""
        if not mentions:
            return 0

        # Aggregate sentiment scores with greater weight for negative mentions
        negative_scores = []
        for mention in mentions:
            neg_score = mention.get("negative_score", 0)
            compound_score = mention.get("compound_score", 0)

            # Higher weight for more concerning sentiment
            if compound_score < -0.5:  # Very negative
                negative_scores.append(neg_score * 2.0)
            elif compound_score < -0.3:  # Moderately negative
                negative_scores.append(neg_score * 1.5)
            else:
                negative_scores.append(neg_score)

        # Average negative sentiment (0-1 scale)
        avg_negative = (
            sum(negative_scores) / len(negative_scores) if negative_scores else 0
        )

        # Convert to 0-15 scale for adjustment (increased from 0-10)
        return avg_negative * 15

    def _extract_high_risk_mentions(self, mentions: List[Dict]) -> List[Dict]:
        """Extract the highest risk mentions as evidence"""
        if not mentions:
            return []

        # Sort mentions by negative sentiment
        sorted_mentions = sorted(
            mentions,
            key=lambda m: m.get("negative_score", 0),
            reverse=True,
        )

        # Return top 3 most negative mentions
        high_risk_mentions = []
        for mention in sorted_mentions[:3]:
            contexts = json.loads(mention.get("mention_context", "[]"))
            if contexts:
                high_risk_mentions.append(
                    {
                        "metric": mention.get("keyword_id", ""),
                        "context": contexts[0],
                        "negative_score": mention.get("negative_score", 0),
                        "compound_score": mention.get("compound_score", 0),
                    },
                )

        return high_risk_mentions

### ⚡ FAISS Vector Store (RAG)

In [10]:
class FAISSManager:
    """Manager class for FAISS vector indices with SQLite integration"""

    def __init__(
        self,
        db_manager: GSIBDatabase,
        model_name: str = "FinLang/finance-embeddings-investopedia",
        reranker_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2",
        index_name: str = "main",
        device: str = "cpu",
        lazy_load: bool = True,
    ):
        """
        Initialize FAISS manager with database connection and embedding models

        Args:
            db_manager: Instance of GSIBDatabase
            model_name: HuggingFace model name for embeddings
            reranker_name: Model name for cross-encoder reranker
            index_name: Name for the FAISS index
            device: Device to use for models ('cpu' or 'cuda')
            lazy_load: Only load the index when needed (default: True)
        """
        # Set up logging
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)

        # Store config
        self.db_manager = db_manager
        self.model_name = model_name
        self.reranker_name = reranker_name
        self.index_name = index_name
        self.device = device
        self.lazy_load = lazy_load

        # Load required models
        self._load_models()

        # Initialize index as None, will be loaded on first use if lazy_load is True
        self.index = None
        if not lazy_load:
            self.index = self._load_or_create_index()

    def _load_models(self) -> None:
        """Load embedding model, reranker, and spaCy model"""
        try:
            # Load embedding model
            self.model = SentenceTransformer(self.model_name, device=self.device)
            self.embedding_dim = self.model.get_sentence_embedding_dimension()
            self.logger.info(
                f"Loaded embedding model: {self.model_name} ({self.embedding_dim} dimensions)",
            )

            # Load reranker model
            self.reranker = CrossEncoder(self.reranker_name, device=self.device)
            self.logger.info(f"Loaded reranker model: {self.reranker_name}")

            # Load spaCy for tokenization
            self.nlp = spacy.load("en_core_web_sm")
            self.logger.info("Loaded spaCy model: en_core_web_sm")
        except Exception as e:
            self.logger.error(f"Error loading models: {str(e)}")
            raise

    def _ensure_index_loaded(self):
        """Ensure the FAISS index is loaded before use"""
        if self.index is None:
            self.index = self._load_or_create_index()

    def _load_or_create_index(self) -> faiss.IndexIDMap2:
        """
        Load existing FAISS index from database or create a new one

        Returns:
            FAISS index with vectors loaded from database if they exist
        """
        conn = self.db_manager.get_db_connection()
        cursor = conn.cursor()

        try:
            # Check if index exists
            cursor.execute(
                "SELECT id, embedding_dim, model_name, config FROM faiss_indices WHERE name = ?",
                (self.index_name,),
            )
            index_info = cursor.fetchone()

            if index_info:
                # Index exists in database, now load the actual vectors
                self.logger.info(f"Loading existing FAISS index: {self.index_name}")

                # Create the index structure
                M = 32
                base_index = faiss.IndexHNSWFlat(self.embedding_dim, M)
                base_index.hnsw.efConstruction = 200
                base_index.hnsw.efSearch = 50
                index = faiss.IndexIDMap2(base_index)

                # Load vectors from database
                cursor.execute(
                    """
                    SELECT id, vector_data
                    FROM vector_embeddings
                    WHERE embedding_model = ? AND preprocessing_level = 'lemmatized'
                    """,
                    (self.model_name,),
                )

                vectors = []
                ids = []
                for row in cursor.fetchall():
                    vector_data = pickle.loads(row["vector_data"])
                    vectors.append(vector_data)
                    ids.append(
                        row["id"]
                    )  # Using the vector_embeddings.id as the FAISS id

                if vectors:
                    # Add vectors to index in batch
                    vectors_array = np.array(vectors, dtype=np.float32)
                    ids_array = np.array(ids, dtype=np.int64)
                    index.add_with_ids(vectors_array, ids_array)
                    self.logger.info(f"Loaded {len(vectors)} vectors into FAISS index")

                return index
            # Create new index and register in database
            self.logger.info(f"Creating new FAISS index: {self.index_name}")
            index = self._create_hnsw_index()

            # Register index in database
            cursor.execute(
                """
                    INSERT INTO faiss_indices (name, embedding_dim, model_name, config)
                    VALUES (?, ?, ?, ?)
                    """,
                (
                    self.index_name,
                    self.embedding_dim,
                    self.model_name,
                    json.dumps(
                        {
                            "type": "hnsw",
                            "M": 32,
                            "efConstruction": 200,
                            "efSearch": 50,
                        }
                    ),
                ),
            )
            conn.commit()
            return index

        except Exception as e:
            self.logger.error(f"Error loading/creating FAISS index: {str(e)}")
            # Create a new index as fallback
            return self._create_hnsw_index()

        finally:
            conn.close()

    def _create_hnsw_index(self) -> faiss.IndexIDMap2:
        """Create a new HNSW index with optimized parameters"""
        # Initialize HNSW index
        M = 32  # Number of neighbors per node
        base_index = faiss.IndexHNSWFlat(self.embedding_dim, M)

        # Set HNSW parameters
        base_index.hnsw.efConstruction = 200  # Construction-time accuracy
        base_index.hnsw.efSearch = 50  # Search-time accuracy

        # Wrap with IDMap for mapping to database IDs
        index = faiss.IndexIDMap2(base_index)
        self.logger.info(f"Created new HNSW index with dim={self.embedding_dim}, M={M}")
        return index

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """
        Generate embeddings for a list of texts

        Args:
            texts: List of texts to embed

        Returns:
            Numpy array of embeddings
        """
        if not texts:
            return np.array([], dtype=np.float32)

        # Generate embeddings
        embeddings = self.model.encode(texts, convert_to_numpy=True)

        # Convert to float32 and normalize
        embeddings = np.array(embeddings, dtype=np.float32)
        faiss.normalize_L2(embeddings)
        return embeddings

    def _get_combined_qa_to_index(self, limit: Optional[int] = None) -> List[Dict]:
        """
        Get QA pairs that need to be indexed

        Retrieves preprocessed text entries of type 'combined' that haven't been indexed yet.

        Args:
            limit: Optional limit on number of QA pairs to retrieve

        Returns:
            List of preprocessed text entries with metadata
        """
        conn = self.db_manager.get_db_connection()
        cursor = conn.cursor()

        try:
            # Find combined preprocessed texts not yet indexed
            query = """
            SELECT
                pt.id as preprocessed_text_id,
                pt.processed_text as combined,
                pt.preprocessing_level,
                qp.id as qa_pair_id,
                ac.id as conversation_id,
                ac.analyst_name,
                ac.analyst_company,
                r.year,
                r.quarter,
                b.bank_name
            FROM preprocessed_text pt
            JOIN qa_pairs qp ON pt.qa_pair_id = qp.id
            JOIN analyst_conversations ac ON qp.conversation_id = ac.id
            JOIN reports r ON ac.report_id = r.id
            JOIN banks b ON r.bank_id = b.id
            WHERE NOT EXISTS (
                SELECT 1 FROM vector_embeddings ve
                WHERE pt.id = ve.preprocessed_text_id
                AND ve.embedding_model = ?
            )
            AND pt.text_type = 'combined'
            AND pt.preprocessing_level = 'lemmatized'
            """

            if limit:
                query += f" LIMIT {limit}"

            cursor.execute(query, (self.model_name,))
            entries = cursor.fetchall()
            return self.db_manager.cursor_to_dict(cursor, entries)
        except Exception as e:
            self.logger.error(f"Error getting QA pairs to index: {str(e)}")
            return []
        finally:
            conn.close()

    def _process_qa_batch(
        self, batch: List[Dict]
    ) -> Tuple[List[str], List[int], List[Dict]]:
        """
        Process a batch of combined texts for indexing.

        Args:
            batch: List of preprocessed text entries with metadata

        Returns:
            Tuple of (text_list, preprocessed_text_ids, metadata_list)
        """
        combined_texts = []
        preprocessed_text_ids = []
        metadata_list = []

        for entry in batch:
            combined_texts.append(entry["combined"])
            preprocessed_text_ids.append(entry["preprocessed_text_id"])

            # Create metadata
            metadata = {
                "preprocessed_text_id": entry["preprocessed_text_id"],
                "qa_pair_id": entry["qa_pair_id"],
                "conversation_id": entry["conversation_id"],
                "analyst_name": entry["analyst_name"],
                "analyst_company": entry["analyst_company"],
                "bank": entry["bank_name"],
                "year": entry["year"],
                "quarter": entry["quarter"],
                "preprocessing_level": entry["preprocessing_level"],
            }
            metadata_list.append(metadata)

        return combined_texts, preprocessed_text_ids, metadata_list

    def _add_vectors_to_index(self, embeddings: np.ndarray, db_ids: np.ndarray) -> bool:
        """
        Add vectors to FAISS index

        Args:
            embeddings: Numpy array of embeddings
            db_ids: Numpy array of database IDs

        Returns:
            Success flag
        """
        try:
            self.index.add_with_ids(embeddings, db_ids)
            self.logger.info(f"Added {len(db_ids)} vectors to FAISS index")
            return True
        except Exception as e:
            self.logger.error(f"Error adding vectors to FAISS index: {str(e)}")
            return False

    def _store_embeddings_in_db(
        self, preprocessed_text_ids, embeddings, metadata_list, qa_texts
    ):
        """
        Store embeddings in the database

        Args:
            preprocessed_text_ids: List of preprocessed text IDs
            embeddings: List of embeddings
            metadata_list: List of metadata dictionaries
            qa_texts: List of text chunks
        """
        conn = self.db_manager.get_db_connection()
        cursor = conn.cursor()

        try:
            for i, (preprocessed_text_id, embedding, metadata, text) in enumerate(
                zip(
                    preprocessed_text_ids,
                    embeddings,
                    metadata_list,
                    qa_texts,
                )
            ):
                cursor.execute(
                    """
                    INSERT INTO vector_embeddings
                    (preprocessed_text_id, embedding_model, preprocessing_level, vector_data, chunk_text, metadata)
                    VALUES (?, ?, ?, ?, ?, ?)
                    ON CONFLICT(preprocessed_text_id, embedding_model, preprocessing_level) DO UPDATE SET
                        vector_data = excluded.vector_data,
                        chunk_text = excluded.chunk_text,
                        metadata = excluded.metadata
                    """,
                    (
                        preprocessed_text_id,
                        self.model_name,
                        metadata["preprocessing_level"],
                        pickle.dumps(embedding),
                        text,
                        json.dumps(metadata),
                    ),
                )
            conn.commit()
        except Exception as e:
            conn.rollback()
            self.logger.error(f"Error storing embeddings in database: {str(e)}")
        finally:
            conn.close()

    def _update_faiss_index_info(self) -> None:
        """Update FAISS index information in database"""
        conn = self.db_manager.get_db_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                UPDATE faiss_indices
                SET embedding_dim = ?, config = ?
                WHERE name = ?
                """,
                (
                    self.embedding_dim,
                    json.dumps(
                        {
                            "type": "hnsw",
                            "M": 32,
                            "efConstruction": 200,
                            "efSearch": 50,
                            "total_vectors": self.index.ntotal,
                        },
                    ),
                    self.index_name,
                ),
            )
            conn.commit()
        except Exception as e:
            conn.rollback()
            self.logger.error(f"Error updating FAISS index info: {str(e)}")
        finally:
            conn.close()

    def index_combined_qa(self, limit: Optional[int] = None) -> int:
        """
        Index all Combined qa pairs from the database that aren't already indexed

        Args:
            limit: Optional limit on number of QA pairs to index

        Returns:
            Number of QA pairs indexed
        """
        self._ensure_index_loaded()

        # Get QA pairs to index
        qa_pairs = self._get_combined_qa_to_index(limit)

        if not qa_pairs:
            self.logger.info("No new QA pairs to index")
            return 0

        self.logger.info(f"Found {len(qa_pairs)} Combined QA's to index")

        # Process in batches to avoid memory issues
        batch_size = 50
        total_indexed = 0

        for i in range(0, len(qa_pairs), batch_size):
            batch = qa_pairs[i : i + batch_size]
            self.logger.info(
                f"Processing batch {i // batch_size + 1}/{(len(qa_pairs) - 1) // batch_size + 1}",
            )

            # Process QA pairs
            qa_texts, qa_ids, metadata_list = self._process_qa_batch(batch)

            # Generate embeddings
            embeddings = self.generate_embeddings(qa_texts)

            # Add to FAISS index
            db_ids = np.array([int(qa_id) for qa_id in qa_ids], dtype=np.int64)
            if not self._add_vectors_to_index(embeddings, db_ids):
                continue

            # Store in database
            self._store_embeddings_in_db(qa_ids, embeddings, metadata_list, qa_texts)
            total_indexed += len(batch)

        # Update FAISS index info
        self._update_faiss_index_info()
        self.logger.info(f"Indexed {total_indexed} QA pairs")
        return total_indexed

    def _fetch_search_results(self, vector_ids: List[int]) -> List[Dict]:
        """
        Fetch search results from database

        Args:
            vector_ids: List of vector IDs from FAISS search

        Returns:
            List of search results with metadata
        """
        conn = self.db_manager.get_db_connection()
        cursor = conn.cursor()

        try:
            placeholders = ",".join(["?"] * len(vector_ids))
            sql = f"""
            SELECT
                ve.id as vector_embedding_id,
                ve.preprocessed_text_id,
                ve.chunk_text,
                ve.metadata,
                pt.qa_pair_id,
                qp.question,
                qp.answer,
                qp.answer_speaker,
                qp.answer_role
            FROM vector_embeddings ve
            JOIN preprocessed_text pt ON ve.preprocessed_text_id = pt.id
            JOIN qa_pairs qp ON pt.qa_pair_id = qp.id
            WHERE ve.id IN ({placeholders})
            """

            cursor.execute(sql, vector_ids)
            results = cursor.fetchall()
            return self.db_manager.cursor_to_dict(cursor, results)
        except Exception as e:
            self.logger.error(f"Error fetching search results: {str(e)}")
            return []
        finally:
            conn.close()

    def _prepare_faiss_results(
        self,
        results: List[Dict],
        vector_ids: List[int],
        distances: np.ndarray,
    ) -> List[Dict]:
        """
        Prepare FAISS search results

        Args:
            results: Raw search results from database
            vector_ids: List of vector IDs from FAISS search
            distances: Distances from FAISS search

        Returns:
            List of formatted search results
        """
        faiss_results = []

        for row in results:
            # Extract vector_id position in the original results
            try:
                position = vector_ids.index(row["vector_embedding_id"])
                distance = float(distances[0][position])
                similarity = 1.0 - distance  # Convert distance to similarity
            except ValueError:
                similarity = 0.0

            # Parse metadata
            try:
                metadata = json.loads(row["metadata"]) if row["metadata"] else {}
            except json.JSONDecodeError:
                metadata = {}

            # Prepare result
            result = {
                "vector_embedding_id": row["vector_embedding_id"],
                "preprocessed_text_id": row["preprocessed_text_id"],
                "qa_pair_id": row["qa_pair_id"],
                "question": row["question"],
                "answer": row["answer"],
                "answer_speaker": row["answer_speaker"],
                "answer_role": row["answer_role"],
                "text": row["chunk_text"] or f"{row['question']} {row['answer']}",
                "vector_score": similarity,
                "metadata": metadata,
            }
            faiss_results.append(result)

        return faiss_results

    def _search_risk_metric(self, doc_id_to_text, query_keywords, top_n=5):
        """
        Apply BM25 search using risk-specific keywords

        Args:
            doc_id_to_text: Dictionary mapping document IDs to text
            query_keywords: List of keywords for the risk type
            top_n: Number of top results to return

        Returns:
            List of (document, score) tuples
        """
        # Tokenize documents with spaCy
        tokenized_corpus = [
            [token.text.lower() for token in self.nlp(text)]
            for text in doc_id_to_text.values()
        ]

        # Build BM25 index
        bm25 = BM25Okapi(tokenized_corpus)

        # Create search query from keywords
        search_query = " ".join(query_keywords)
        tokenized_query = [token.text.lower() for token in self.nlp(search_query)]

        # Get BM25 scores
        scores = bm25.get_scores(tokenized_query)

        # Rank documents based on scores
        documents = list(doc_id_to_text.values())
        ranked_indices = sorted(
            range(len(scores)), key=lambda i: scores[i], reverse=True
        )[:top_n]

        # Return ranked documents with scores
        return [(documents[idx], scores[idx]) for idx in ranked_indices]

    def _create_filtered_index(
        self, preprocessed_text_ids: List[int]
    ) -> faiss.IndexIDMap2:
        """
        Create a temporary FAISS index containing only vectors that match filters

        Args:
            preprocessed_text_ids: List of preprocessed text IDs to include

        Returns:
            Filtered FAISS index
        """
        conn = self.db_manager.get_db_connection()
        cursor = conn.cursor()

        try:
            # Create a new HNSW index
            base_index = faiss.IndexHNSWFlat(self.embedding_dim, 32)
            base_index.hnsw.efConstruction = 200
            base_index.hnsw.efSearch = 50
            filtered_index = faiss.IndexIDMap2(base_index)

            # Get embeddings for the vector IDs
            placeholders = ",".join(["?"] * len(preprocessed_text_ids))
            sql = f"""
            SELECT id, preprocessed_text_id, vector_data
            FROM vector_embeddings
            WHERE preprocessed_text_id IN ({placeholders})
            AND embedding_model = ?
            AND preprocessing_level = 'lemmatized'
            """

            cursor.execute(sql, preprocessed_text_ids + [self.model_name])
            results = cursor.fetchall()

            if not results:
                return filtered_index

            # Add vectors to the filtered index
            for row in results:
                vector_id = row["id"]
                vector_data = pickle.loads(row["vector_data"])

                # Make sure vector is properly shaped and normalized
                vector_data = vector_data.reshape(1, -1)
                faiss.normalize_L2(vector_data)

                # Add to index
                filtered_index.add_with_ids(
                    vector_data, np.array([vector_id], dtype=np.int64)
                )

            return filtered_index

        except Exception as e:
            self.logger.error(f"Error creating filtered index: {str(e)}")
            # Return empty index as fallback
            base_index = faiss.IndexHNSWFlat(self.embedding_dim, 32)
            return faiss.IndexIDMap2(base_index)
        finally:
            conn.close()

    def _apply_reranking(self, query: str, results: List[Dict], k: int) -> List[Dict]:
        """
        Apply cross-encoder reranking to search results

        Args:
            query: Search query
            results: List of search results
            k: Number of results to return

        Returns:
            Reranked results
        """
        # Create query-document pairs for reranking
        query_doc_pairs = [(query, result["text"]) for result in results]

        try:
            # Get reranking scores
            rerank_scores = self.reranker.predict(query_doc_pairs)

            # Add scores to results
            for i, score in enumerate(rerank_scores):
                results[i]["rerank_score"] = float(score)

            # Sort by reranker score
            reranked_results = sorted(
                results,
                key=lambda x: x.get("rerank_score", 0),
                reverse=True,
            )

            return reranked_results[:k]

        except Exception as e:
            self.logger.warning(f"Reranking failed: {str(e)}")
            # Fall back to vector scores
            return sorted(
                results,
                key=lambda x: x.get("vector_score", 0),
                reverse=True,
            )[:k]

    def _enhance_with_bm25(self, faiss_results, bm25_results, doc_id_to_text):
        """
        Enhance FAISS results with BM25 scores

        Args:
            faiss_results: Results from FAISS search
            bm25_results: Results from BM25 search with keyword focus
            doc_id_to_text: Dictionary mapping IDs to texts

        Returns:
            Enhanced results with combined scores
        """
        # Create reverse mapping from text to result
        text_to_result = {result["text"]: result for result in faiss_results}

        # Get max BM25 score for normalization
        max_bm25 = max([score for _, score in bm25_results] or [1.0])

        # Create mapping from text to normalized BM25 score
        bm25_scores = {text: score / max_bm25 for text, score in bm25_results}

        # Enhance results with BM25 scores
        for result in faiss_results:
            text = result["text"]
            # Add BM25 score if available
            result["bm25_score"] = bm25_scores.get(text, 0.0)

            # Calculate combined score (70% vector, 30% BM25)
            vector_score = result.get("vector_score", 0.0)
            bm25_score = result.get("bm25_score", 0.0)
            result["combined_score"] = (0.7 * vector_score) + (0.3 * bm25_score)

        return faiss_results

    def _filtered_hybrid_search(
        self,
        query: str,
        k: int = 10,
        filter_conditions: Dict[str, Any] = None,
        rerank: bool = True,
    ) -> List[Dict]:
        """
        Optimized search that applies filtering at the database level first

        Args:
            query: Search query
            k: Number of results to return
            filter_conditions: Conditions to filter results
            rerank: Whether to apply cross-encoder reranking

        Returns:
            List of filtered search results
        """
        conn = self.db_manager.get_db_connection()
        cursor = conn.cursor()

        try:
            # First, get the preprocessed_text_ids that match the filter conditions
            sql_conditions = []
            sql_params = []

            for key, value in filter_conditions.items():
                sql_conditions.append(f"json_extract(ve.metadata, '$.{key}') = ?")
                sql_params.append(value)

            where_clause = " AND ".join(sql_conditions)

            # Query to get vector_ids that match filter conditions
            sql = f"""
            SELECT ve.id as vector_embedding_id, ve.preprocessed_text_id
            FROM vector_embeddings ve
            WHERE ve.embedding_model = ?
            AND ve.preprocessing_level = 'lemmatized'
            AND {where_clause}
            """

            cursor.execute(sql, [self.model_name] + sql_params)
            filtered_results = cursor.fetchall()

            vector_embedding_ids = [
                row["vector_embedding_id"] for row in filtered_results
            ]
            preprocessed_text_ids = [
                row["preprocessed_text_id"] for row in filtered_results
            ]

            if not vector_embedding_ids:
                self.logger.info(
                    f"No vectors found for filter conditions: {filter_conditions}"
                )
                return []

            # Create a filtered FAISS index for the search
            filtered_index = self._create_filtered_index(preprocessed_text_ids)
            if filtered_index.ntotal == 0:
                self.logger.info("Filtered index is empty")
                return []

            # Perform semantic search on the filtered index
            query_embedding = self.generate_embeddings([query])[0]
            query_embedding = query_embedding / np.linalg.norm(query_embedding)
            query_embedding = np.array([query_embedding]).astype("float32")

            n_search = min(k * 2, filtered_index.ntotal)
            distances, indices = filtered_index.search(query_embedding, k=n_search)
            vector_ids = indices[0].tolist()

            # Get results from database
            results = self._fetch_search_results(vector_ids)
            faiss_results = self._prepare_faiss_results(results, vector_ids, distances)

            # Apply reranking if enabled
            if rerank and faiss_results:
                return self._apply_reranking(query, faiss_results, k)

            # Return sorted results
            return sorted(
                faiss_results, key=lambda x: x.get("vector_score", 0), reverse=True
            )[:k]

        except Exception as e:
            self.logger.error(f"Error in filtered search: {str(e)}")
            return []
        finally:
            conn.close()

    def hybrid_search(
        self,
        query: str,
        k: int = 10,
        filter_conditions: Optional[Dict[str, Any]] = None,
        rerank: bool = True,
        query_keywords: Optional[List[str]] = None,
    ) -> List[Dict]:
        """
        Perform hybrid search using semantic, BM25 ranking and risk-specific filtering

        Args:
            query: Search query
            k: Number of results to return
            filter_conditions: Conditions to filter results (bank, year, quarter, etc.)
            rerank: Whether to apply cross-encoder reranking
            query_keywords: List of keywords for risk-specific filtering

        Returns:
            List of search results with metadata
        """
        self._ensure_index_loaded()

        try:
            # Apply early filtering if possible
            if filter_conditions:
                initial_results = self._filtered_hybrid_search(
                    query, k * 2, filter_conditions, False
                )

                if not initial_results:
                    self.logger.info(
                        f"No results found for filter conditions: {filter_conditions}"
                    )
                    return []

                # Apply keyword-based filtering if keywords provided
                if query_keywords and initial_results:
                    # Create document dictionary for BM25
                    doc_id_to_text = {
                        i: result["text"] for i, result in enumerate(initial_results)
                    }

                    # Apply BM25 search with specific keywords
                    bm25_results = self._search_risk_metric(
                        doc_id_to_text, query_keywords, top_n=len(initial_results)
                    )

                    # Enhance initial results with BM25 scores
                    enhanced_results = self._enhance_with_bm25(
                        initial_results, bm25_results, doc_id_to_text
                    )

                    # Apply reranking if enabled
                    if rerank:
                        final_results = self._apply_reranking(
                            query, enhanced_results, k
                        )
                    else:
                        # Sort by combined score
                        final_results = sorted(
                            enhanced_results,
                            key=lambda x: x.get(
                                "combined_score", x.get("vector_score", 0)
                            ),
                            reverse=True,
                        )[:k]

                    return final_results

                # No keywords provided, just apply reranking
                if rerank:
                    return self._apply_reranking(query, initial_results, k)
                return sorted(
                    initial_results,
                    key=lambda x: x.get("vector_score", 0),
                    reverse=True,
                )[:k]

            # No filter conditions, do regular search
            # Check if index is empty
            if self.index.ntotal == 0:
                self.logger.warning("FAISS index is empty")
                return []

            # Semantic search with FAISS
            query_embedding = self.generate_embeddings([query])[0]
            query_embedding = query_embedding / np.linalg.norm(query_embedding)
            query_embedding = np.array([query_embedding]).astype("float32")

            n_search = min(k * 3, self.index.ntotal)
            distances, indices = self.index.search(query_embedding, k=n_search)
            vector_ids = indices[0].tolist()

            # No results found
            if not vector_ids or vector_ids[0] == -1:
                self.logger.info("No semantic search results found")
                return []

            # Get metadata for retrieved vectors
            results = self._fetch_search_results(vector_ids[:n_search])
            if not results:
                self.logger.warning("No results found in database")
                return []

            # Convert to list of dicts with scores
            faiss_results = self._prepare_faiss_results(results, vector_ids, distances)

            # Apply keyword-based scoring if keywords provided
            if query_keywords and faiss_results:
                doc_id_to_text = {
                    i: result["text"] for i, result in enumerate(faiss_results)
                }
                bm25_results = self._search_risk_metric(
                    doc_id_to_text, query_keywords, top_n=len(faiss_results)
                )
                combined_results = self._enhance_with_bm25(
                    faiss_results, bm25_results, doc_id_to_text
                )
            else:
                combined_results = faiss_results

            # Apply reranking if enabled
            if rerank and combined_results:
                return self._apply_reranking(query, combined_results, k)

            # Return sorted results
            return sorted(
                combined_results,
                key=lambda x: x.get("combined_score", x.get("vector_score", 0)),
                reverse=True,
            )[:k]

        except Exception as e:
            self.logger.error(f"Error in hybrid_search: {str(e)}")
            import traceback

            self.logger.error(traceback.format_exc())
            return []

    def search_similar_conversations(
        self,
        conversation_id: int,
        k: int = 5,
        exclude_same_quarter: bool = True,
    ) -> List[Dict]:
        """
        Find conversations similar to a given conversation

        Args:
            conversation_id: ID of the conversation to compare
            k: Number of similar conversations to return
            exclude_same_quarter: Exclude conversations from the same quarter/year

        Returns:
            List of similar conversations with metadata
        """
        conn = self.db_manager.get_db_connection()
        cursor = conn.cursor()

        try:
            # Get QA pairs for the conversation
            cursor.execute(
                """
                SELECT qp.id, qp.question, qp.answer,
                       ac.analyst_name, r.year, r.quarter, b.bank_name
                FROM qa_pairs qp
                JOIN analyst_conversations ac ON qp.conversation_id = ac.id
                JOIN reports r ON ac.report_id = r.id
                JOIN banks b ON r.bank_id = b.id
                WHERE qp.conversation_id = ?
                """,
                (conversation_id,),
            )
            qa_pairs = cursor.fetchall()
            qa_dict = self.db_manager.cursor_to_dict(cursor, qa_pairs)

            if not qa_dict:
                self.logger.warning(
                    f"No QA pairs found for conversation {conversation_id}",
                )
                return []

            # Combine all QA texts
            qa_texts = [f"{qa['question']} {qa['answer']}" for qa in qa_dict]
            combined_text = " ".join(qa_texts)

            # Get similar conversations
            results = self.hybrid_search(
                combined_text,
                k=k * 2,
            )  # Get more for filtering

            # Filter results
            if exclude_same_quarter and qa_dict:
                # Get quarter and year for original conversation
                quarter = qa_dict[0]["quarter"]
                year = qa_dict[0]["year"]
                bank = qa_dict[0]["bank_name"]

                # Filter out same quarter/year/bank
                filtered_results = []
                for result in results:
                    metadata = result.get("metadata", {})
                    if (
                        metadata.get("quarter") != quarter
                        or metadata.get("year") != year
                        or metadata.get("bank") != bank
                    ):
                        filtered_results.append(result)

                results = filtered_results

            # Return top k results
            return results[:k]

        except Exception as e:
            self.logger.error(f"Error finding similar conversations: {str(e)}")
            return []

        finally:
            conn.close()

    def topic_search(self, topic_keywords: List[str], k: int = 5) -> List[Dict]:
        """
        Search for content related to specific risk topics using keywords

        Args:
            topic_keywords: List of topic keywords to search for
            k: Number of results to return

        Returns:
            List of search results with metadata
        """
        # Combine keywords into a query
        query = " ".join(topic_keywords)

        # Use the hybrid search with the combined keywords
        return self.hybrid_search(query, k=k)

    def save_index(self, path: str) -> bool:
        """
        Save the FAISS index to disk

        Args:
            path: Path to save the index

        Returns:
            True if successful, False otherwise
        """
        try:
            faiss.write_index(self.index, path)
            self.logger.info(f"Saved FAISS index to {path}")
            return True
        except Exception as e:
            self.logger.error(f"Error saving FAISS index: {str(e)}")
            return False

    def load_index(self, path: str) -> bool:
        """
        Load the FAISS index from disk

        Args:
            path: Path to load the index from

        Returns:
            True if successful, False otherwise
        """
        try:
            self.index = faiss.read_index(path)
            self.logger.info(
                f"Loaded FAISS index from {path} with {self.index.ntotal} vectors",
            )
            return True
        except Exception as e:
            self.logger.error(f"Error loading FAISS index: {str(e)}")
            return False

    def get_index_stats(self) -> Dict:
        """
        Get statistics about the FAISS index

        Returns:
            Dictionary with index statistics
        """
        conn = self.db_manager.get_db_connection()
        cursor = conn.cursor()

        try:
            # Get index info from database
            cursor.execute(
                "SELECT * FROM faiss_indices WHERE name = ?",
                (self.index_name,),
            )
            index_info = cursor.fetchone()

            if not index_info:
                return {
                    "name": self.index_name,
                    "vectors": self.index.ntotal,
                    "dimension": self.embedding_dim,
                    "model": self.model_name,
                }

            # Get vector counts by bank, year, quarter
            cursor.execute(
                """
                SELECT
                    json_extract(ve.metadata, '$.bank') as bank,
                    json_extract(ve.metadata, '$.year') as year,
                    json_extract(ve.metadata, '$.quarter') as quarter,
                    COUNT(*) as count
                FROM vector_embeddings ve
                WHERE ve.embedding_model = ?
                GROUP BY bank, year, quarter
                ORDER BY bank, year, quarter
                """,
                (self.model_name,),
            )
            distribution = self.db_manager.cursor_to_dict(cursor, cursor.fetchall())

            # Convert from SQLite row to dict
            info = dict(index_info)

            # Add runtime info
            info.update(
                {"vectors_in_memory": self.index.ntotal, "distribution": distribution},
            )

            return info

        except Exception as e:
            self.logger.error(f"Error getting index stats: {str(e)}")
            return {"name": self.index_name, "error": str(e)}

        finally:
            conn.close()

### 🏦💡🧠 Financial Expert

#### 🌐🔍 Web Search & API Integrations

In [11]:
class BankNewsSearcher:
    """Class to search for and analyze bank news articles"""

    def __init__(
        self,
        openai_api_key: str,
        brightdata_api_key: str,
        google_api_key: Optional[str],
        google_cx: Optional[str],
        serper_api_key: Optional[str],
    ):
        """
        Initialize with necessary API keys

        Args:
            openai_api_key: API key for OpenAI
            brightdata_api_key: API key for BrightData Web Unlocker
            google_api_key: API key for Google Custom Search (optional)
            google_cx: Custom Search Engine ID (optional)
            serper_api_key: API key for Serper.dev (optional)
        """
        self.logger = logging.getLogger(__name__)
        self.OPENAI_API_KEY = openai_api_key
        self.BRIGHTDATA_API_KEY = brightdata_api_key
        self.GOOGLE_API_KEY = google_api_key
        self.GOOGLE_CX = google_cx
        self.SERPER_API_KEY = serper_api_key

    def generate_fact_check_queries(
        self, sentences: List[str], quarter: str, year: str, bank: str, risk_type: str
    ) -> List[str]:
        """
        Generates two highly optimized fact-checking queries using journalistic terms for financial news.

        Args:
            sentences: List of sentences to generate queries for
            quarter: Quarter (e.g., "Q1", "Q2")
            year: Year (e.g., "2023")
            bank: Bank name
            risk_type: Type of risk

        Returns:
            List of generated queries
        """
        prompt = f"""
You are a financial analyst and fact-checking expert. Convert the following financial insights into **two** short, keyword-rich search queries that match **how journalists report financial news**.
### **Example Format**
**Input:**
"In Q3 2024, Goldman Sachs' market risk was significantly impacted by increased equity market volatility. The bank adjusted its trading strategy by increasing hedging in derivatives and diversifying across asset classes. Rising interest rates and geopolitical instability led to higher VaR, prompting risk management interventions."
**Output:**
["Goldman Sachs Q3 2024 stock market volatility impact",
"Goldman Sachs Q3 2024 hedging strategy shifts"]
### **Financial Insights:**
{json.dumps(sentences)}
### **Instructions:**
- **Generate exactly two optimized search queries.**
- **Use journalistic terms** that align with financial news reporting.
- **Prioritize words used in financial headlines** (e.g., "cash reserves," "liquidity crunch," "capital risk").
- Keep **each query under 7 words**.
- **DO NOT** phrase them as questions.
- **Avoid overly technical finance jargon** (e.g., instead of "VaR increase," use "risk exposure rises").
- Format: `"Bank + Quarter + Year + Key Financial Terms"`.
- **Prioritize terms journalists use** to describe financial risk, capital management, and liquidity.
- **Generate two queries per insight.**
**Example Output Format:**
```json
["Query 1", "Query 2"]
```
"""
        headers = {"Authorization": f"Bearer {self.OPENAI_API_KEY}"}
        data = {
            "model": "gpt-4o",
            "messages": [{"role": "user", "content": prompt}],
            "temperature": 0.1,  # Lower temperature for precision
        }
        response = requests.post(
            "https://api.openai.com/v1/chat/completions", headers=headers, json=data
        )
        if response.status_code == 200:
            try:
                response_json = response.json()
                raw_content = (
                    response_json.get("choices", [{}])[0]
                    .get("message", {})
                    .get("content", "")
                    .strip()
                )
                # Remove markdown if present
                if raw_content.startswith("```json"):
                    raw_content = (
                        raw_content.split("```json")[1].split("```")[0].strip()
                    )
                return json.loads(raw_content)  # Convert JSON to Python list
            except json.JSONDecodeError as e:
                self.logger.error(f"JSON Parsing Error: {e}\nResponse: {raw_content}")
                return []
        else:
            self.logger.error(
                f"GPT-4o API Error: {response.status_code} - {response.text}"
            )
            return []

    def build_search_queries(
        self, queries: List[str], bank: str, quarter: str, year: str
    ) -> List[str]:
        """
        Build search queries with source restrictions

        Args:
            queries: List of base queries
            bank: Bank name
            quarter: Quarter
            year: Year

        Returns:
            List of search queries with source restrictions
        """
        sources = [
            # "site:bloomberg.com",
            "site:reuters.com",
            # "site:wsj.com",
            "site:cnbc.com",
        ]
        search_queries = []
        for source in sources:
            for query in queries:
                search_queries.append(f"{query} {source}")
        return search_queries

    def extract_article_summary(self, soup: BeautifulSoup) -> Optional[str]:
        """
        Extracts the main content from an article page.

        Args:
            soup: BeautifulSoup object of article page

        Returns:
            Summary string or None
        """
        # Try different metadata sources for the summary
        for tag in ["description", "og:description", "twitter:description"]:
            meta_tag = soup.find("meta", {"name": tag}) or soup.find(
                "meta", {"property": tag}
            )
            if meta_tag and "content" in meta_tag.attrs:
                return meta_tag["content"].strip()

        # Try extracting text from article body
        paragraphs = soup.find_all("p")
        if paragraphs:
            return " ".join(
                [p.get_text().strip() for p in paragraphs[:3]]
            )  # Take first 3 paragraphs

        return None

    def check_article_summary(self, url: str) -> str:
        """
        Checks if an article is accessible and retrieves a summary using BrightData.

        Args:
            url: URL of article to check

        Returns:
            Summary of article or error message
        """
        # Fetch with BrightData Web Unlocker
        brightdata_url = "https://api.brightdata.com/request"
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.BRIGHTDATA_API_KEY}",
        }
        payload = {
            "zone": "web_unlocker1",  # BrightData Web Unlocker
            "url": url,
            "format": "raw",
        }

        try:
            response = requests.post(
                brightdata_url, headers=headers, json=payload, timeout=15
            )
            if response.status_code == 200:
                soup = BeautifulSoup(response.text, "html.parser")
                summary = self.extract_article_summary(soup)
                if summary:
                    return summary

                # If no main summary is found, check meta description
                soup = BeautifulSoup(response.text, "html.parser")
                meta_description = soup.find("meta", {"name": "description"})
                if meta_description and "content" in meta_description.attrs:
                    meta_summary = meta_description["content"].strip()
                    return meta_summary

                return "No summary available"
            self.logger.warning(f"BrightData request failed: {response.status_code}")
            return "Failed to access article"
        except Exception as e:
            self.logger.error(f"Error checking article summary: {str(e)}")
            return "Error accessing article"

    def search_google_cloud(self, query: str) -> List[Dict]:
        """
        Searches Google for financial news using Google Cloud Custom Search API.

        Args:
            query: Search query

        Returns:
            List of article summaries
        """
        if not self.GOOGLE_API_KEY or not self.GOOGLE_CX:
            self.logger.warning("Google API key or CX not provided")
            return []

        article_summaries = []
        url = f"https://www.googleapis.com/customsearch/v1?q={query}&key={self.GOOGLE_API_KEY}&cx={self.GOOGLE_CX}"

        try:
            response = requests.get(url)
            if response.status_code == 200:
                results = response.json().get("items", [])
                for result in results[:5]:  # Limit to top 5 results
                    summary = self.check_article_summary(result["link"])
                    article = {
                        "title": result["title"],
                        "summary": summary,
                    }
                    article_summaries.append(article)
            else:
                self.logger.warning(
                    f"Google Cloud search failed: {response.status_code}"
                )
        except Exception as e:
            self.logger.error(f"Error in Google Cloud search: {str(e)}")

        return article_summaries

    def search_google_serper(self, query: str) -> List[Dict]:
        """
        Searches Google for financial news using Serper.dev API.

        Args:
            query: Search query

        Returns:
            List of article summaries
        """
        if not self.SERPER_API_KEY:
            self.logger.warning("Serper API key not provided")
            return []

        article_summaries = []
        url = "https://google.serper.dev/news"
        headers = {"X-API-KEY": self.SERPER_API_KEY}
        payload = {"q": query, "num": 3}

        try:
            response = requests.post(url, json=payload, headers=headers)
            if response.status_code == 200:
                results = response.json().get("news", [])
                for result in results:
                    summary = self.check_article_summary(result["link"])
                    article = {
                        "title": result["title"],
                        "summary": summary,
                    }
                    article_summaries.append(article)
            else:
                self.logger.warning(f"Serper search failed: {response.status_code}")
        except Exception as e:
            self.logger.error(f"Error in Serper search: {str(e)}")

        return article_summaries

    def search_all_sources(self, query: str) -> List[Dict]:
        """
        Search all available sources (Google Cloud and Serper)

        Args:
            query: Search query

        Returns:
            Combined list of article summaries
        """
        all_results = []

        # Use Google Cloud if API key and CX provided
        if self.GOOGLE_API_KEY and self.GOOGLE_CX:
            google_results = self.search_google_cloud(query)
            all_results.extend(google_results)

        # Use Serper if API key provided
        if self.SERPER_API_KEY:
            serper_results = self.search_google_serper(query)
            all_results.extend(serper_results)

        return all_results

    def search_with_concurrent_requests(self, queries: List[str]) -> List[Dict]:
        """
        Search multiple queries concurrently for faster results

        Args:
            queries: List of search queries

        Returns:
            Combined list of article summaries
        """
        all_results = []

        # Use ThreadPoolExecutor for concurrent requests
        with ThreadPoolExecutor(max_workers=min(10, len(queries))) as executor:
            future_to_query = {
                executor.submit(self.search_all_sources, query): query
                for query in queries
            }
            for future in as_completed(future_to_query):
                query = future_to_query[future]
                try:
                    results = future.result()
                    all_results.extend(results)
                    self.logger.info(
                        f"Completed search for query: {query} - Found {len(results)} results"
                    )
                except Exception as e:
                    self.logger.error(f"Error searching query {query}: {str(e)}")

        return all_results

    def run_bank_news_search(
        self, bank: str, quarter: str, year: str, sentences: List[str], risk_type: str
    ) -> List[Dict]:
        """
        Run complete bank news search process

        Args:
            bank: Bank name
            quarter: Quarter
            year: Year
            sentences: List of sentences to generate queries from
            risk_type: Type of risk

        Returns:
            List of article summaries
        """
        # Step 1: Generate fact-check queries
        self.logger.info(
            f"Generating queries for {bank} {quarter} {year} ({risk_type})"
        )
        queries = self.generate_fact_check_queries(
            sentences, quarter, year, bank, risk_type
        )

        if not queries:
            self.logger.warning("Failed to generate queries")
            return []

        # Step 2: Build search queries with source restrictions
        search_queries = self.build_search_queries(queries, bank, quarter, year)
        self.logger.info(f"Generated {len(search_queries)} search queries")

        # Step 3: Search with concurrent requests
        self.logger.info("Searching for articles...")
        results = self.search_with_concurrent_requests(search_queries)

        self.logger.info(f"Found {len(results)} total results")
        return results

#### 🤖🛠️ Implementation

In [12]:
class FinancialExpert:
    def __init__(
        self,
        llm,
        db: GSIBDatabase,
        faiss_manager: FAISSManager,
    ):
        self.db = db
        self.faiss_manager = faiss_manager
        self.llm = llm
        self.client = OpenAI()
        self.searcher = BankNewsSearcher(
            openai_api_key=os.getenv('OPENAI_API_KEY'),
            brightdata_api_key=os.getenv('BRIGHTDATA_API_KEY'),
            google_api_key=os.getenv('GOOGLE_API_KEY'),
            google_cx=os.getenv('GOOGLE_CX'),
            serper_api_key=os.getenv('SERPER_API_KEY'),
        )
        self.keywords = {
            "liquidity_risk": [
                "liquidity score",
                "liquidity position",
                "cash reserves",
                "funding costs",
                "liquidity coverage ratio (LCR)",
                "deposit inflows",
                "deposit outflows",
                "cash flow",
                "liquidity risk",
                "short-term funding",
                "working capital",
                "cash holdings",
                "high-quality liquid assets (HQLA)",
                "wholesale funding",
                "liquidity buffers",
                "repo market",
                "liquidity crunch",
                "central bank liquidity support",
                "stress testing",
                "liquidity management",
                "maturity mismatch",
                "funding liquidity",
                "interbank lending market",
                "liquidity shocks",
                "contingent liquidity risk",
                "monetary policy impact on liquidity",
            ],
            "credit_risk": [
                "credit risk score",
                "non-performing loans (NPL)",
                "Stage 3 loans",
                "loan defaults",
                "credit exposure",
                "corporate lending",
                "consumer lending",
                "creditworthiness",
                "loan loss provisions",
                "credit spreads",
                "default probability",
                "credit derivatives",
                "sovereign risk",
                "counterparty credit risk",
                "credit risk stress testing",
                "credit rating downgrades",
                "exposure at default (EAD)",
                "probability of default (PD)",
                "loss given default (LGD)",
                "credit market deterioration",
            ],
            "market_risk": [
                "market risk score",
                "volatility",
                "equity markets",
                "bond markets",
                "interest rate risk",
                "currency risk",
                "foreign exchange risk",
                "commodity prices",
                "value at risk (VaR)",
                "derivatives pricing",
                "yield curve shifts",
                "credit spread risk",
                "equity price fluctuations",
                "market liquidity risk",
                "macro-economic risk",
                "systemic market risk",
                "hedge fund exposure",
                "interest rate swaps",
                "global market trends",
                "inflation risk",
                "cross-asset correlations",
                "geopolitical market uncertainty",
                "financial contagion",
                "option volatility skew",
            ],
            "group_risk": [
                "group risk score",
                "capital allocation",
                "global risk exposure",
                "group-level risks",
                "interconnected risks",
                "economic downturn",
                "derivatives exposure",
                "systemic risk",
                "parent-subsidiary exposure",
                "conglomerate risk",
                "risk concentration",
                "interbank dependencies",
                "shadow banking risks",
                "cross-border regulatory risks",
                "capital adequacy ratios",
                "leverage ratios",
                "financial stability risks",
                "macroprudential policies",
                "contagion effects",
                "operational interdependencies",
                "group solvency stress tests",
            ],
        }

        self.risk_questions_map = {
            "liquidity_risk": [
                lambda bank,
                q,
                y: f"What liquidity challenges did {bank} face in {q} {y}, and how were its liquidity score and deposit stability impacted by external market conditions?",
            ],
            "credit_risk": [
                lambda bank,
                q,
                y: f"What were the key drivers of changes in {bank}'s credit risk during {q} {y}, and what factors influence its risk profile?",
            ],
            "market_risk": [
                lambda bank,
                q,
                y: f"How did {bank} manage market risk in {q} {y}, considering equity volatility, interest rate changes, and exposure to commodity and foreign exchange fluctuations?",
            ],
            "group_risk": [
                lambda bank,
                q,
                y: f"What were the key drivers of {bank}'s group risk in {q} {y}, including global market volatility, capital allocation, and other systemic exposures?",
            ],
        }

        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        self.logger.info("Initializing FinancialExpert")

    def _rate_limited_api_call(self, method, **kwargs):
        """
        Execute API calls with exponential backoff for rate limiting.

        Args:
            method: The API method to call
            **kwargs: Arguments to pass to the method

        Returns:
            API response or None on failure
        """
        max_retries = 5
        base_delay = 1

        for attempt in range(max_retries):
            try:
                return method(**kwargs)
            except Exception as e:
                # Check if this is a rate limit error
                if (
                    "rate_limit" in str(e).lower()
                    or "too many requests" in str(e).lower()
                ):
                    delay = base_delay * (2**attempt) + random.uniform(0, 1)
                    self.logger.warning(
                        f"Rate limit hit. Retrying in {delay:.2f} seconds...",
                    )
                    time.sleep(delay)
                    continue
                # For other exceptions, log and return None
                self.logger.error(f"API call failed: {e}")
                return None

        self.logger.error(f"API call failed after {max_retries} retries")
        return None

    def _get_last_3_quarters(self, quarter=None, year=None):
        """
        Get the last 3 quarters previous to the given input for financial report analysis purposes.

        Args:
            quarter (str): Current quarter (e.g., "Q2"), None for all quarters
            year (int): Current year

        Returns:
            list: List of (quarter, year) tuples for the last 3 quarters
        """
        if not quarter or not year:
            raise ValueError("Both quarter and year must be provided.")

        # Mapping of quarters to their respective numbers
        quarters_mapping = {"Q1": 1, "Q2": 2, "Q3": 3, "Q4": 4}
        reverse_quarters_mapping = {v: k for k, v in quarters_mapping.items()}

        # Initialize the list with the given quarter and year
        quarters_relevant = [(quarter, year)]

        while len(quarters_relevant) < 3:
            current_quarter, current_year = quarters_relevant[-1]
            current_quarter_num = quarters_mapping[current_quarter]

            # Move to the previous quarter
            if current_quarter_num == 1:
                next_quarter_num = 4
                next_year = current_year - 1
            else:
                next_quarter_num = current_quarter_num - 1
                next_year = current_year

            next_quarter = reverse_quarters_mapping[next_quarter_num]
            quarters_relevant.append((next_quarter, next_year))

        return quarters_relevant

    def _get_historical_context(self, bank, risk_type, current_quarter, current_year):
        """
        Get historical context from previous quarters.

        Args:
            bank (str): Bank name
            risk_type (str): Risk type
            current_quarter (str): Current quarter
            current_year (int): Current year

        Returns:
            tuple: (insights_dict, alignment_report_dict)
                insights_dict: Dictionary containing previous quarter and one before previous quarter insights
                alignment_report_dict: Dictionary containing previous quarter and one before previous quarter alignment reports
        """
        # Get the last 3 quarters including the current one
        last_3_quarters = self._get_last_3_quarters(current_quarter, current_year)
        # Skip the current quarter (first in the list)
        historical_quarters = last_3_quarters[1:]

        insights_dict = {
            "previous": "No insights available",
            "one_before_previous": "No insights available",
        }

        alignment_report_dict = {
            "previous": "No alignment data available",
            "one_before_previous": "No alignment data available",
        }

        for idx, (quarter, year) in enumerate(historical_quarters):
            report_id = self.db.get_report_id(bank, year, quarter)
            if not report_id:
                continue

            metrics = self.db.get_metrics_by_report(report_id)
            if len(metrics) == 0:
                continue

            # Get insights
            insight_key = f"AI_INSIGHT_{risk_type.upper()}"
            insight = self.db.get_metric_formatted_value(metrics, insight_key)
            if insight:
                if idx == 0:  # Last quarter
                    insights_dict["previous"] = insight
                elif idx == 1:  # Last but one quarter
                    insights_dict["one_before_previous"] = insight

            # Get alignment reports
            alignment_key = f"AI_ALIGNMENT_{risk_type.upper()}"
            alignment = self.db.get_metric_formatted_value(metrics, alignment_key)
            if alignment:
                if idx == 0:  # Last quarter
                    alignment_report_dict["previous"] = alignment
                elif idx == 1:  # Last but one quarter
                    alignment_report_dict["one_before_previous"] = alignment

        return insights_dict, alignment_report_dict

    def _generate_answers_with_gen_ai(self, bank, quarter, year, risk_type, questions):
        """
        Generate risk assessment answers using RAG with FAISS and GenAI

        Args:
            bank: Bank name
            quarter: Quarter (e.g., 'Q1')
            year: Year (e.g., 2023)
            risk_type: Risk type (e.g., 'liquidity_risk')
            questions: List of questions to answer

        Returns:
            List of generated reports
        """

        all_reports = []

        for question in questions:
            # Retrieve relevant context using FAISS with keywords
            contexts = self.faiss_manager.hybrid_search(
                query=question,
                k=5,
                filter_conditions={"bank": bank, "quarter": quarter, "year": year},
                query_keywords=self.keywords.get(risk_type),
            )

            for context in contexts:
                prompt = f"""
                    You are a senior risk analyst at the Bank of England's Prudential Regulation Authority (PRA), assessing {bank}'s {quarter} {year} earnings call.

                    **Analyst Question:**
                    "{context["question"]}" - {context["metadata"]["analyst_name"]}, {context["metadata"]["analyst_company"]}

                    **Executive Response:**
                    "{context["answer"]}" - {context["answer_speaker"]}, {context["answer_role"]}

                    **Your Task:**
                    Analyze the above exchange for credit risk implications and write a focused assessment that answers:
                    {question}

                    **Instructions:**
                    - Write a **formal financial report** summarizing all relevant insights.
                    - Structure the response with **an introduction, analysis, and a conclusion**.
                    - Avoid bullet points, numbered lists, or itemized formats.
                    - Ensure the report is **concise (100-150 words)** but informative.
                    - If the context does not provide direct insights, infer logical conclusions.
                    - If no relevant information is found, state: "No insights available based on the given data."
                    """

                # Analyze using OpenAI
                response = self._rate_limited_api_call(
                    self.client.chat.completions.create,
                    model=OPENAI_MODEL,
                    messages=[
                        {
                            "role": "system",
                            "content": "You are a financial analyst answering strictly based on the provided context.",
                        },
                        {"role": "user", "content": prompt},
                    ],
                    max_tokens=500,
                    temperature=0.2,
                )
                answer = response.choices[0].message.content.strip()

                # Analyze using Local LLM (Mistral-Small-24B-Instruct)
                # answer = self.llm.generate(prompt=prompt, max_tokens=256, temperature=0.2)

                if (
                    "no insights available" not in answer.lower()
                    and "not enough information" not in answer.lower()
                ):
                    if answer not in all_reports:  # Deduplication
                        all_reports.append(answer)

        return all_reports

    def _validate_article_relevance(self, summaries, document):
        """
        Evaluates if news article summaries align with the document.
        Returns an alignment summary and score.
        """
        if not summaries:
            return "No relevant news articles found to validate alignment."

        messages = [
            {
                "role": "system",
                "content": "You are an AI assistant evaluating whether financial news summaries create a broader financial context that aligns "
                "with the document's claims. The summaries do not need to directly confirm the document's points but should provide "
                "a financial picture that makes the document's statements seem plausible or implausible in a broad way.",
            },
            {
                "role": "user",
                "content": f"The document states:\n\n{document}\n\nHere are some article summaries:\n",
            },
        ]

        for article in summaries:
            messages.append(
                {
                    "role": "user",
                    "content": f"Title: {article['title']}\nSummary: {article['summary']}",
                },
            )

        messages.append(
            {
                "role": "user",
                "content": "Based on these summaries, does the overall financial picture align with what is stated in the document? "
                "If yes, explain briefly why. If no, highlight any contradictions or gaps in information.",
            },
        )

        try:
            response = self._rate_limited_api_call(
                self.client.chat.completions.create,
                model=OPENAI_MODEL,
                messages=messages,
            )

            full_assessment = response.choices[0].message.content.strip()

            # 🔍 Strip potential leaked instructions from full_assessment
            if "You are an AI assistant" in full_assessment:
                full_assessment = full_assessment.split("\n", 1)[
                    -1
                ]  # Remove AI-generated setup lines

            if "Your task is" in full_assessment:
                full_assessment = full_assessment.split("Your task is", 1)[-1].strip()

            # Get concise summary and score
            summary_request = [
                {
                    "role": "system",
                    "content": "You are an AI assistant summarizing a financial alignment assessment. "
                    "Your task is to provide a short summary (2-3 sentences) and an alignment score from 0 to 100, "
                    "where 0 means completely misaligned, and 100 means fully aligned.",
                },
                {
                    "role": "user",
                    "content": f"Here is the full assessment:\n\n{full_assessment}\n\nSummarize it concisely and provide an alignment score.",
                },
            ]

            response_2 = self._rate_limited_api_call(
                self.client.chat.completions.create,
                model=OPENAI_MODEL,
                messages=summary_request,
            )

            return response_2.choices[0].message.content.strip()
        except Exception as e:
            self.logger.error(f"Error validating article relevance: {e}")
            return "Error validating alignment."

    def _analyze_risk(self, report, insights_dict, alignment_report_dict, risk_type):
        """
        Analyzes the risk level based on the insight and alignment report.
        Returns risk level and summary formatted in a professional manner.
        """
        try:
            quarter = report.get("quarter")
            quarter_number = int(
                "".join(filter(str.isdigit, quarter))
            )  # Extracts quarter number
            year = report.get("year")
            bank = report.get("bank_name")
            current_insights = insights_dict.get("current", "No insights available")
            previous_insights = insights_dict.get("previous", "No insights available")
            current_alignment = alignment_report_dict.get(
                "current", "No alignment data available."
            )
            previous_alignment = alignment_report_dict.get(
                "previous", "No alignment data available."
            )
            one_before_previous_insights = insights_dict.get(
                "one_before_previous", "No insights available"
            )
            one_before_previous_alignment = alignment_report_dict.get(
                "one_before_previous", "No alignment data available."
            )

            # Claude/Sonnet suggested
            # messages = [
            #     {
            #         "role": "system",
            #         "content": (
            #             f"You are a financial risk assessment expert at the Bank of England's Prudential Regulation Authority (PRA), "
            #             f"specializing in {risk_type} risk analysis for Global Systemically Important Banks (G-SIBs). "
            #             f"Your task is to produce a professional, concise risk assessment that follows this exact format:\n\n"
            #             f"1. A structured summary section titled '{bank} {quarter} {year} Insights Summary'\n"
            #             f"2. A 'Key Focus Areas' section with 2-3 paragraphs summarizing the bank's primary {risk_type} risk challenges and mitigation strategies\n"
            #             f"3. An 'Evolution from Previous Quarters' section showing clear bullet points of the progression across quarters\n"
            #             f"4. A 'Key {risk_type.title()} Risk Challenges' section with bullet points for each significant risk factor\n"
            #             f"5. A 'Performance Scores' section with objective Alignment and Risk Exposure scores based on the data\n\n"
            #             f"Your assessment must be evidence-based, professional, and formatted exactly like the reference format provided."
            #         )
            #     },
            #     {
            #         "role": "user",
            #         "content": (
            #             f"### Bank: {bank} ({risk_type}) - {quarter} {year}\n\n"
            #             f"🔹 **Current quarter Insights:**\n{current_insights}\n\n"
            #             f"🔹 **Previous quarter Insights:**\n{previous_insights}\n\n"
            #             f"🔹 **Last but one quarter Insights:**\n{one_before_previous_insights}\n\n"
            #             f"🔹 **Previous two quarters Alignment Reports:**\n"
            #             f"Last quarter: {previous_alignment}\n"
            #             f"Quarter before last: {one_before_previous_alignment}\n\n"
            #             f"Create a professional risk assessment report that looks exactly like this format:\n\n"
            #             f"```\n"
            #             f"{bank} {quarter} {year} Insights Summary\n\n"
            #             f"Key Focus Areas\n"
            #             f"[2-3 paragraphs about current quarter key challenges and mitigation strategies]\n\n"
            #             f"Evolution from Previous Quarters\n"
            #             f"• [Quarter-2]: [Brief summary]\n"
            #             f"• [Quarter-1]: [Brief summary]\n"
            #             f"• [Current Quarter]: [Brief summary]\n\n"
            #             f"Key {risk_type.title()} Risk Challenges\n"
            #             f"• [Challenge 1]\n"
            #             f"• [Challenge 2]\n"
            #             f"• [Challenge 3]\n"
            #             f"• [Challenge 4]\n"
            #             f"• [Challenge 5]\n"
            #             f"• [Challenge 6]\n\n"
            #             f"Performance Scores\n"
            #             f"• Alignment Score: [High/Medium/Low]\n"
            #             f"• Risk Exposure Score: [High/Medium/Low]\n"
            #             f"```\n\n"
            #             f"Use the insights data to explicitly identify trends across quarters. The Key Focus Areas should "
            #             f"address both problems and the bank's mitigation strategies. The alignment score measures how well "
            #             f"external information confirms the bank's statements, while the risk score measures their exposure to {risk_type} risk."
            #         )
            #     }
            # ]

            # GPT4o suggested
            messages = [
                {
                    "role": "system",
                    "content": (
                        f"You are a financial risk assessment expert at the Bank of England's Prudential Regulation Authority (PRA), "
                        f"specializing in {risk_type} risk analysis for Global Systemically Important Banks (G-SIBs). "
                        f"Your task is to produce a professional, concise risk assessment that follows this exact format:\n\n"
                        f"1. A structured summary section titled '{bank} {quarter} {year} Insights Summary'\n"
                        f"2. A 'Key Focus Areas' section with 2-3 paragraphs summarizing the bank's primary {risk_type} risk challenges "
                        f"and explicit mitigation strategies implemented.\n"
                        f"3. An 'Evolution from Previous Quarters' section, showing clear bullet points on risk progression, "
                        f"including whether risks have increased, decreased, or remained stable.\n"
                        f"4. A 'Key {risk_type.title()} Risk Challenges' section listing the most significant risk factors.\n"
                        f"5. A 'Performance Scores' section that provides an Alignment Score and a Risk Exposure Score.\n\n"
                        f"The Alignment Score measures how well the current quarter’s financial results align with past alignment reports,"
                        f"forward-looking statements, and broader financial conditions. It does not indicate accuracy, but rather whether "
                        f"the overall financial picture supports or contradicts prior expectations."
                        f"If the past alignment reports are available try to use the current alignment report to score the Alignment."
                    ),
                },
                {
                    "role": "user",
                    "content": (
                        f"### Bank: {bank} ({risk_type}) - {quarter} {year}\n\n"
                        f"🔹 **Current quarter Insights:**\n{current_insights}\n\n"
                        f"🔹 **Previous quarter Insights:**\n{previous_insights}\n\n"
                        f"🔹 **Last but one quarter Insights:**\n{one_before_previous_insights}\n\n"
                        f"🔹 **Previous two quarters Alignment Reports:**\n"
                        f"   - Current quarter: {current_alignment}\n"
                        f"   - Last quarter: {previous_alignment}\n"
                        f"   - Quarter before last: {one_before_previous_alignment}\n\n"
                        f"### **Professional Risk Assessment Report Format**:\n"
                        f"```\n"
                        f"{bank} {quarter} {year} Insights Summary\n\n"
                        f"Key Focus Areas\n"
                        f"[2-3 paragraphs summarizing the bank's key {risk_type} risk challenges and mitigation strategies]\n\n"
                        f"Evolution from Previous Quarters\n"
                        f"• Q{quarter_number - 2 if quarter_number > 2 else 'N/A'}: [Brief summary]\n"
                        f"• Q{quarter_number - 1 if quarter_number > 1 else 'N/A'}: [Brief summary]\n"
                        f"• Q{quarter_number}: [Brief summary]\n"
                        f"• (Explicitly state if risk exposure has increased, decreased, or remained stable.)\n\n"
                        f"Key {risk_type.title()} Risk Challenges\n"
                        f"• [Challenge 1]\n"
                        f"• [Challenge 2]\n"
                        f"• [Challenge 3]\n"
                        f"• [Challenge 4]\n"
                        f"• [Challenge 5]\n\n"
                        f"Performance Scores\n"
                        f"• Alignment Score: [Very Low / Low / Medium / High / Very High] - (Measures how closely the current quarter’s "
                        f"financial picture aligns with past alignment reports and forward-looking statements.)\n"
                        f"• Risk Exposure Score: [Very Low / Low / Medium / High / Very High]\n"
                        f"```\n\n"
                        f"### **Important Instructions:**\n"
                        f"- Compare risk trends across the last three quarters to highlight evolving patterns.\n"
                        f"- Clearly state whether risks have intensified, remained stable, or decreased.\n"
                        f"- Explicitly mention any key risk mitigation strategies the bank has adopted.\n"
                        f"- Don't need to provide explanation for the **Alignment Score**\n"
                    ),
                },
            ]

            response = self._rate_limited_api_call(
                self.client.chat.completions.create,
                model=OPENAI_MODEL,
                messages=messages,
                temperature=0.1,  # Lower temperature for more consistent formatting
                max_tokens=500,
            )

            return response.choices[0].message.content.strip()

        except Exception as e:
            self.logger.error(f"Error analyzing risk level: {e}")
            return None

    async def _generate_ai_insight_and_summary(self, report, risk_type, refresh):
        """Generate AI insight for a report and risk type"""
        report_id = report.get("id")
        insight_key = f"AI_INSIGHT_{risk_type.upper()}"
        metrics = self.db.get_metrics_by_report(report_id)
        if metrics and insight_key in metrics:
            if not refresh:
                self.logger.warning(
                    f"AI insight for {risk_type} already exists for the report {report_id}",
                )
                return None
        self.logger.info(
            f"Updating AI insight for {risk_type} for the report {report_id}",
        )
        bank_name = report.get("bank_name")
        quarter = report.get("quarter")
        year = report.get("year")

        # get questions based on risk type
        question_templates = self.risk_questions_map.get(risk_type, [])
        questions = [
            template(bank_name, quarter, year) for template in question_templates
        ]

        # generate answers based on questions
        answers = self._generate_answers_with_gen_ai(
            bank_name, quarter, year, risk_type, questions
        )

        # Combine answers into a single context
        combined_answers = "\n\n".join(answers)

        prompt = f"""
        You are a **senior financial risk analyst** at the **{bank_name}**, responsible for evaluating {risk_type} for **{quarter} {year}**.

        ### **Bank:** {bank_name}
        ### **Risk Type:** {risk_type}
        ### **Reporting Period:** {quarter} {year}

        Here are the key financial insights that form the foundation of the report:

        ### **Key Financial Insights from Current Quarter:**
        {combined_answers}

        ### **Your Task:**
        - Write a **single structured financial report** summarizing the key {risk_type} insights for {bank_name} in **≤200 words**.
        - Synthesize information into a **logical narrative** instead of listing separate answers.
        - **No bullet points**—write in paragraph format with an introduction, analysis, and conclusion.
        - Compare with historical data where relevant and provide **forward-looking implications**.
        - Ensure **financial clarity**, using quantitative figures where possible.
        - If the reports contain **some** information but lack full detail, still generate a report based on the available data, even if limited.
        - Only return **"Not enough information"** if there is **absolutely no relevant data** to work with.
        """
        try:
            response = self._rate_limited_api_call(
                self.client.chat.completions.create,
                model=OPENAI_MODEL,
                messages=[
                    {
                        "role": "system",
                        "content": "You are a financial risk analyst producing expert-level structured financial reports. Use only the provided context and avoid speculation, but always generate a report if any relevant information is available.",
                    },
                    {"role": "user", "content": prompt},
                ],
                temperature=0.3,
                max_tokens=500,
            )

            insight = response.choices[0].message.content.strip()

            # Store the insight in database
            self.db.add_metric(
                report_id=report_id,
                metric_name=f"AI_INSIGHT_{risk_type.upper()}",
                metric_value=0.0,
                formatted_value=insight,
            )

            results = self.searcher.run_bank_news_search(
                bank=bank_name,
                quarter=quarter,
                year=year,
                sentences=[insight],
                risk_type=risk_type,
            )

            # Validate article relevance
            alignment_report = self._validate_article_relevance(results, insight)

            insights_dict, alignment_reports = self._get_historical_context(
                bank_name,
                risk_type,
                quarter,
                year,
            )
            insights_dict["current"] = insight

            # Analyze risk level
            risk_analysis = self._analyze_risk(
                report,
                insights_dict,
                alignment_reports,
                risk_type,
            )

            # Store the summary and alignment reports in the database
            self.db.add_metric(
                report_id=report_id,
                metric_name=f"AI_SUMMARY_{risk_type.upper()}",
                metric_value=0.0,
                formatted_value=risk_analysis,
            )

            self.db.add_metric(
                report_id=report_id,
                metric_name=f"AI_ALIGNMENT_{risk_type.upper()}",
                metric_value=0.0,
                formatted_value=alignment_report,
            )
            self.logger.info(
                f"Generated and stored AI insight and summary for {bank_name}, {quarter} {year}, {risk_type}",
            )
        except Exception as e:
            self.logger.error(f"Error generating risk insight and summary: {e}")
            return {"error": str(e)}

    async def run_financial_expert(self, report_id=None, refresh=False, limit=None):
        """
        Generate AI insights for reports and process bank report summaries.
        Args:
            report_id: Optional specific report ID to process
            refresh: Whether to refresh existing insights
            limit: Optional limit on reports to process
        Returns:
            dict: Results summary
        """
        # Single report case
        if report_id:
            try:
                conn = self.db.begin_transaction()  # Use transaction management
                try:
                    report = self.db.get_report_by_id(report_id)
                    if not report:
                        self.logger.error(f"Report with ID {report_id} not found")
                        return {
                            "error": f"Report with ID {report_id} not found",
                            "success": False,
                        }

                    for risk_type in RISK_TYPES_AI_INSIGHTS:
                        await self._generate_ai_insight_and_summary(
                            report,
                            risk_type,
                            refresh,
                        )
                        conn.commit()

                    self.db.commit_transaction(conn)
                    return {"success": True}
                except Exception as e:
                    self.db.rollback_transaction(conn)
                    self.logger.error(
                        f"Error processing report {report_id}: {e}",
                    )
                    return {"error": str(e), "success": False}
            except Exception as e:
                self.logger.error(f"Error with database connection: {e}")
                return {"error": str(e), "success": False}

        # Multiple reports case
        summary_count = 0
        reports = None
        try:
            with self.db.get_db_connection() as conn:
                cursor = conn.cursor()
                cursor.execute("""
                SELECT r.id, r.year, r.quarter, b.bank_name
                FROM reports r, banks b
                WHERE r.bank_id = b.id ORDER BY year ASC, quarter ASC
                """)
                reports = self.db.cursor_to_dict(cursor, cursor.fetchall())
        except Exception as e:
            self.logger.error(f"Error fetching reports: {e}")
            return {"error": str(e), "success": False}

        if not reports:
            self.logger.warning("No reports found in the database")
            return {"error": "No reports found in the database"}

        reports_to_process = reports[:limit] if limit else reports

        # Process each report with its own db connection
        for report in tqdm(reports_to_process, desc="Processing reports"):
            try:
                conn = self.db.begin_transaction()
                try:
                    for risk_type in RISK_TYPES_AI_INSIGHTS:
                        await self._generate_ai_insight_and_summary(
                            report,
                            risk_type,
                            refresh,
                        )
                        conn.commit()  # Explicit commit after each risk type

                        # Optional: Checkpoint WAL after each commit to manage file size
                        self.db.checkpoint_wal("PASSIVE")

                    self.db.commit_transaction(conn)
                    summary_count += 1
                except Exception as e:
                    self.db.rollback_transaction(conn)
                    self.logger.error(
                        f"Error processing report {report.get('id')}: {e}",
                    )
                    continue
            except Exception as e:
                self.logger.error(f"Error with transaction: {e}")
                continue

        return {
            "success": True,
            "generated_summaries": summary_count,
            "total_reports": len(reports),
        }

## 🔄 Pipeline Orchestrator

In [13]:
class GSIBAnalysisPipeline:
    """Pipeline for G-SIB quarterly announcement analysis"""

    def __init__(self, db: GSIBDatabase, llm):
        """
        Initialize the G-SIB analysis pipeline with required components

        Args:
            db_path: Path to SQLite database
        """
        # Set up logging
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        self.logger.info("Initializing GSIBAnalysisPipeline")

        self.db = db
        self.llm = llm

        # Core components
        self.preprocessor = TextPreprocessor(db=db)
        self.topic_embedder = TextEmbedder(model_name="ProsusAI/finbert")
        self.sentiment_analyzer = SentimentAnalyser(
            model_name="yiyanghkust/finbert-tone",
            db=db,
            preprocessor=self.preprocessor,
        )
        self.topic_analyzer = TopicAnalyzer(
            embedder=self.topic_embedder,
            db=db,
            preprocessor=self.preprocessor,
            sentiment_analyzer=self.sentiment_analyzer,
        )
        self.creditrisk_analyser = CreditRiskAnalyser(db=db)
        self.faiss_manager = FAISSManager(db_manager=db)
        self.financial_expert = FinancialExpert(
            llm=self.llm, db=self.db, faiss_manager=self.faiss_manager
        )
        self.run_vector_indexing()

    def run_preprocessing(self, limit=None):
        """
        Run text preprocessing

        Args:
            limit: Optional limit on number of documents to process
        """
        self.logger.info(
            f"Starting text preprocessing pipeline{' (limited to ' + str(limit) + ' docs)' if limit else ''}",
        )

        # Process all QA pairs to generate preprocessed versions
        self.preprocessor.process_qa_pairs_from_db(limit)
        self.db.checkpoint_wal()
        self.logger.info("Completed text preprocessing pipeline")

    def run_topic_analysis(self, limit=None):
        """
        Run topic analysis

        Args:
            limit: Optional limit on number of conversations to analyze
        """
        self.logger.info(
            f"Starting topic analysis{' (limited to ' + str(limit) + ' conversations)' if limit else ''}",
        )

        # Run topic clustering
        results = self.topic_analyzer.analyze_topics(limit=limit)
        self.db.checkpoint_wal()

        # Log summary
        if results:
            topics_found = len(set(r["topic"].id for r in results.values()))
            self.logger.info(
                f"Topic analysis complete: Found {topics_found} topics across {len(results)} conversations",
            )
        else:
            self.logger.info("No new topics found")

    def run_sentiment_analysis(self, limit=None):
        """
        Run sentiment analysis

        Args:
            limit: Optional limit on number of conversations to analyze
        """
        self.logger.info(
            f"Starting sentiment analysis{' (limited to ' + str(limit) + ' conversations)' if limit else ''}",
        )

        # Run sentiment analysis
        results = self.sentiment_analyzer.analyze_qa_pairs(limit=limit)
        self.db.checkpoint_wal()

        # Log summary
        if results:
            self.logger.info(
                f"Sentiment analysis complete for {len(results)} conversations",
            )

            # Calculate average sentiment
            avg_positive = np.mean(
                [r["conversation"].positive for r in results.values()],
            )
            avg_negative = np.mean(
                [r["conversation"].negative for r in results.values()],
            )
            avg_compound = np.mean(
                [r["conversation"].compound for r in results.values()],
            )

            self.logger.info(
                f"Average sentiment: Positive={avg_positive:.2f}, Negative={avg_negative:.2f}, Compound={avg_compound:.2f}",
            )
        else:
            self.logger.info("No new sentiment analysis results")

    def run_vector_indexing(self, limit=None):
        """
        Index conversations in FAISS

        Args:
            limit: Optional limit on number of documents to index
        """
        self.logger.info(
            f"Starting vector indexing{' (limited to ' + str(limit) + ' docs)' if limit else ''}",
        )

        # Index QA pairs in FAISS
        count = self.faiss_manager.index_combined_qa(limit=limit)
        self.db.checkpoint_wal()

        self.logger.info(f"Indexed {count} QA pairs in FAISS")

    def run_full_pipeline(self, limit=None):
        """
        Run the full analysis pipeline

        Args:
            limit: Optional limit on number of documents to process at each stage
        """
        self.logger.info(
            f"Starting full G-SIB analysis pipeline{' (limited to ' + str(limit) + ' docs per stage)' if limit else ''}",
        )

        # Run each stage
        self.run_preprocessing(limit=limit)
        self.run_topic_analysis(limit=limit)
        self.creditrisk_analyser.run_credit_risk_analysis(limit=limit)
        self.run_sentiment_analysis(limit=limit)
        self.run_vector_indexing(limit=limit)
        self.financial_expert.run_financial_expert(limit=limit)

        self.logger.info("G-SIB analysis pipeline complete")

    def search_conversations(self, query, k=5, filter_conditions=None):
        """
        Search for conversations related to a query

        Args:
            query: Search query
            k: Number of results to return
            filter_conditions: Optional filter conditions

        Returns:
            List of search results
        """
        self.logger.info(f"Searching for: '{query}'")

        # Run hybrid search
        results = self.faiss_manager.hybrid_search(
            query,
            k=k,
            filter_conditions=filter_conditions,
        )

        return results

    def get_bank_analysis(self, bank_name, year, quarter):
        """
        Generate a comprehensive analysis for a specific bank and quarter

        Args:
            bank_name: Name of the bank
            year: Year
            quarter: Quarter (e.g., "Q1", "Q2")

        Returns:
            ReportAnalysis object with all analysis data
        """
        conn = self.db.get_db_connection()
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()

        try:
            # Get report ID
            cursor.execute(
                """
                SELECT r.id
                FROM reports r
                JOIN banks b ON r.bank_id = b.id
                WHERE b.bank_name = ? AND r.year = ? AND r.quarter = ?
                """,
                (bank_name, year, quarter),
            )
            report = cursor.fetchone()

            if not report:
                self.logger.warning(f"No report found for {bank_name} {year} {quarter}")
                return None

            report_id = report["id"]

            # Get all conversations for this report
            cursor.execute(
                """
                SELECT ac.id, ac.analyst_name, ac.analyst_company, ac.topic_id, ac.topic_probability
                FROM analyst_conversations ac
                WHERE ac.report_id = ?
                """,
                (report_id,),
            )
            conversation_rows = cursor.fetchall()

            if not conversation_rows:
                self.logger.warning(
                    f"No conversations found for {bank_name} {year} {quarter}",
                )
                return None

            # Create ReportAnalysis
            report_analysis = ReportAnalysis(
                bank_name=bank_name,
                year=year,
                quarter=quarter,
                conversations=[],
                topics={},
                credit_risk_score={},
                credit_metrics={},
            )

            # Calculate credit risk score
            risk_score = self.creditrisk_analyser.calculate_credit_risk_score(
                bank_name,
                year,
                quarter,
            )
            report_analysis.credit_risk_score = risk_score

            # Process each conversation
            for conversation_row in conversation_rows:
                conversation_id = conversation_row["id"]

                # Get topic
                topic = None
                topic_id = conversation_row["topic_id"]
                if topic_id:
                    cursor.execute(
                        """
                        SELECT id, name, keywords, category
                        FROM topics
                        WHERE id = ?
                        """,
                        (topic_id,),
                    )
                    topic_row = cursor.fetchone()
                    if topic_row:
                        topic = Topic(
                            id=topic_row["id"],
                            name=topic_row["name"],
                            keywords=json.loads(topic_row["keywords"]),
                            category=topic_row["category"],
                            probability=conversation_row["topic_probability"] or 0.0,
                        )
                        report_analysis.topics[topic_id] = topic

                # Get conversation sentiment
                cursor.execute(
                    """
                    SELECT positive_score, negative_score, neutral_score, compound_score
                    FROM conversation_sentiment
                    WHERE conversation_id = ?
                    """,
                    (conversation_id,),
                )
                sentiment_row = cursor.fetchone()
                sentiment = None
                if sentiment_row:
                    sentiment = SentimentScore(
                        positive=sentiment_row["positive_score"],
                        negative=sentiment_row["negative_score"],
                        neutral=sentiment_row["neutral_score"],
                        compound=sentiment_row["compound_score"],
                    )

                # Get speaker sentiments
                cursor.execute(
                    """
                    SELECT speaker_name, speaker_role, positive_score, negative_score, neutral_score, compound_score
                    FROM speaker_topic_sentiment
                    WHERE conversation_id = ?
                    """,
                    (conversation_id,),
                )
                speaker_rows = cursor.fetchall()
                speaker_sentiments = {}
                for speaker_row in speaker_rows:
                    speaker_key = (
                        f"{speaker_row['speaker_name']}|{speaker_row['speaker_role']}"
                    )
                    speaker_sentiments[speaker_key] = SentimentScore(
                        positive=speaker_row["positive_score"],
                        negative=speaker_row["negative_score"],
                        neutral=speaker_row["neutral_score"],
                        compound=speaker_row["compound_score"],
                    )

                # Get credit risk mentions
                cursor.execute(
                    """
                    SELECT crm.keyword_id, crm.mention_count, crm.mention_context,
                           crm.positive_score, crm.negative_score, crm.neutral_score, crm.compound_score,
                           crk.metric_category
                    FROM credit_risk_mentions crm
                    JOIN credit_risk_keywords crk ON crm.keyword_id = crk.keyword
                    WHERE crm.conversation_id = ?
                    """,
                    (conversation_id,),
                )
                mention_rows = cursor.fetchall()
                credit_risk_mentions = []
                for mention_row in mention_rows:
                    mention_sentiment = SentimentScore(
                        positive=mention_row["positive_score"],
                        negative=mention_row["negative_score"],
                        neutral=mention_row["neutral_score"],
                        compound=mention_row["compound_score"],
                    )

                    mention_text = ""
                    if mention_row["mention_context"]:
                        contexts = json.loads(mention_row["mention_context"])
                        if contexts:
                            mention_text = contexts[0]

                    mention = CreditRiskMention(
                        metric_type=mention_row["metric_category"],
                        mention_text=mention_text,
                        mention_count=mention_row["mention_count"],
                        sentiment=mention_sentiment,
                        keyword_used=mention_row["keyword_id"],
                    )
                    credit_risk_mentions.append(mention)

                # Get QA pairs
                cursor.execute(
                    """
                    SELECT id, question, answer, answer_speaker, answer_role
                    FROM qa_pairs
                    WHERE conversation_id = ?
                    """,
                    (conversation_id,),
                )
                qa_rows = cursor.fetchall()
                qa_pairs = []
                for qa_row in qa_rows:
                    qa_pair = QAPair(
                        question=qa_row["question"],
                        answer=qa_row["answer"],
                        answer_speaker=qa_row["answer_speaker"],
                        answer_role=qa_row["answer_role"],
                    )
                    qa_pairs.append(qa_pair)

                # Create conversation object
                conversation = AnalystConversation(
                    id=conversation_id,
                    analyst_name=conversation_row["analyst_name"],
                    analyst_company=conversation_row["analyst_company"],
                    qa_pairs=qa_pairs,
                    topic=topic,
                    conversation_sentiment=sentiment,
                    speaker_sentiments=speaker_sentiments,
                    credit_risk_mentions=credit_risk_mentions,
                )

                report_analysis.conversations.append(conversation)

            # Get credit metrics if available
            cursor.execute(
                """
                SELECT metric_name, metric_value
                FROM metrics
                WHERE report_id = ?
                """,
                (report_id,),
            )
            metric_rows = cursor.fetchall()
            for metric_row in metric_rows:
                report_analysis.credit_metrics[metric_row["metric_name"]] = metric_row[
                    "metric_value"
                ]

            return report_analysis

        except Exception as e:
            self.logger.error(f"Error getting bank analysis: {str(e)}")
            return None

        finally:
            conn.close()

    def compare_banks(self, banks, year, quarter):
        """
        Compare multiple banks for the same quarter

        Args:
            banks: List of bank names
            year: Year
            quarter: Quarter

        Returns:
            Dictionary with comparative analysis
        """
        # Get analysis for each bank
        bank_analyses = {}
        for bank in banks:
            analysis = self.get_bank_analysis(bank, year, quarter)
            if analysis:
                bank_analyses[bank] = analysis

        if not bank_analyses:
            return None

        # Create comparison results
        comparison = {
            "period": f"{year} {quarter}",
            "banks": list(bank_analyses.keys()),
            "sentiment": {},
            "topics": {},
            "metrics": {},
        }

        # Compare sentiment
        for bank, analysis in bank_analyses.items():
            overall_sentiment = analysis.overall_sentiment
            comparison["sentiment"][bank] = {
                "positive": overall_sentiment.positive,
                "negative": overall_sentiment.negative,
                "compound": overall_sentiment.compound,
            }

        # Compare topics
        all_topics = {}
        for bank, analysis in bank_analyses.items():
            topic_counts = analysis.topic_mention_counts()
            comparison["topics"][bank] = topic_counts

            # Collect all topics
            for topic in topic_counts:
                if topic not in all_topics:
                    all_topics[topic] = {}
                all_topics[topic][bank] = topic_counts[topic]

        # Compare metrics
        all_metrics = {}
        for bank, analysis in bank_analyses.items():
            for metric, value in analysis.credit_metrics.items():
                if metric not in all_metrics:
                    all_metrics[metric] = {}
                all_metrics[metric][bank] = value

        comparison["all_topics"] = all_topics
        comparison["all_metrics"] = all_metrics

        return comparison

    def get_results_dataframe(self):
        """
        Get a DataFrame with analysis results

        Returns:
            Pandas DataFrame with consolidated analysis results
        """
        conn = self.db.get_db_connection()

        # Query for combined results
        query = """
        SELECT
            b.bank_name,
            r.year,
            r.quarter,
            ac.analyst_name,
            ac.analyst_company,
            t.name as topic_name,
            t.category as topic_category,
            ac.topic_probability,
            cs.positive_score,
            cs.negative_score,
            cs.neutral_score,
            cs.compound_score
        FROM analyst_conversations ac
        JOIN reports r ON ac.report_id = r.id
        JOIN banks b ON r.bank_id = b.id
        LEFT JOIN topics t ON ac.topic_id = t.id
        LEFT JOIN conversation_sentiment cs ON ac.id = cs.conversation_id
        ORDER BY b.bank_name, r.year, r.quarter
        """

        df = pd.read_sql_query(query, conn)
        conn.close()

        return df

### ▶️ Execution

In [17]:
gsib_analysis_pipeline = GSIBAnalysisPipeline(db=db, llm=None)

2025-03-28 17:07:22,742 - __main__ - INFO - Initializing GSIBAnalysisPipeline
2025-03-28 17:07:22,743 - __main__ - INFO - Initializing __main__
2025-03-28 17:07:22,743 - __main__ - INFO - NLTK data downloaded successfully
2025-03-28 17:07:26,219 - __main__ - INFO - Loaded spaCy model: en_core_web_sm
2025-03-28 17:07:26,339 - urllib3.connectionpool - DEBUG - https://huggingface.co:443 "HEAD /ProsusAI/finbert/resolve/main/tokenizer_config.json HTTP/1.1" 200 0
2025-03-28 17:07:26,470 - urllib3.connectionpool - DEBUG - https://huggingface.co:443 "HEAD /ProsusAI/finbert/resolve/main/config.json HTTP/1.1" 200 0
2025-03-28 17:07:26,574 - urllib3.connectionpool - DEBUG - https://huggingface.co:443 "HEAD /ProsusAI/finbert/resolve/main/model.safetensors HTTP/1.1" 404 0
2025-03-28 17:07:26,583 - urllib3.connectionpool - DEBUG - Starting new HTTPS connection (1): huggingface.co:443
2025-03-28 17:07:26,739 - urllib3.connectionpool - DEBUG - https://huggingface.co:443 "GET /api/models/ProsusAI/finbe

In [None]:
gsib_analysis_pipeline.run_preprocessing()
gsib_analysis_pipeline.run_topic_analysis()
gsib_analysis_pipeline.creditrisk_analyser.run_credit_risk_analysis()
gsib_analysis_pipeline.run_sentiment_analysis()
result = await gsib_analysis_pipeline.financial_expert.run_financial_expert()

## 📊 Visualisations

### Credit-risk plots

In [None]:
def create_plotly_dashboard(df, bank, current_quarter):
    """Create separate interactive Plotly gauge and line charts for one bank"""

    # Sort the data by period
    bank_data = df[df["bank"] == bank].sort_values("period")

    # Get the latest available data point for the bank
    if not bank_data.empty:
        current_data = bank_data.iloc[-1]
        current_score = current_data["risk_score"]
        current_level = current_data["risk_level"]

        # Get the previous quarter's score if available
        if len(bank_data) >= 2:
            previous_score = bank_data.iloc[-2]["risk_score"]
        else:
            # If no previous quarter data is available, use current score (no change)
            previous_score = current_score
    else:
        current_score = 0
        current_level = "Unknown"
        previous_score = current_score

    # Risk thresholds with labels and colors
    risk_thresholds = [
        (0, "Very Low", "#2ecc71"),  # Bright green
        (15, "Very Low", "#2ecc71"),  # Bright green
        (30, "Low", "#27ae60"),  # Dark green
        (45, "Moderately Low", "#f1c40f"),  # Yellow
        (60, "Moderate", "#e67e22"),  # Orange
        (75, "Moderately High", "#d35400"),  # Dark orange
        (85, "High", "#c0392b"),  # Red
        (100, "Very High", "#7b241c"),  # Dark red
    ]

    # Find risk level colors for gauge steps
    steps = []
    for i in range(len(risk_thresholds) - 1):
        steps.append(
            dict(
                range=[risk_thresholds[i][0], risk_thresholds[i + 1][0]],
                color=risk_thresholds[i][2],
                thickness=0.75,
                line=dict(width=0.5, color="white"),
            ),
        )

    # Create gauge chart
    gauge_fig = go.Figure()

    gauge_fig.add_trace(
        go.Indicator(
            mode="gauge+number+delta",
            value=current_score,
            title={"text": f"Risk Score: {current_level}"},
            delta={
                "reference": previous_score,
                "increasing": {"color": "red"},
                "decreasing": {"color": "green"},
            },
            gauge={
                "axis": {"range": [0, 100], "tickwidth": 1, "tickcolor": "darkblue"},
                "bar": {"color": "purple"},  # Purple
                "bgcolor": "white",
                "borderwidth": 2,
                "bordercolor": "gray",
                "steps": steps,
                "threshold": {
                    "line": {"color": "black", "width": 4},
                    "thickness": 0.75,
                    "value": current_score,
                },
            },
        ),
    )

    gauge_fig.update_layout(
        height=400,
        width=500,
        title_text=f"{bank} Current Risk",
        template="plotly_white",
        margin=dict(t=100, b=50),
    )

    # Create line chart for risk score trend
    line_fig = go.Figure()

    bank_data = df[df["bank"] == bank].sort_values("period")

    if not bank_data.empty:
        # Add the main trend line
        line_fig.add_trace(
            go.Scatter(
                x=bank_data["period"],
                y=bank_data["risk_score"],
                name=bank,
                mode="lines+markers",
                line=dict(width=3, color="#1f77b4"),
                marker=dict(size=10, color="#1f77b4"),
            ),
        )

        # Add horizontal lines for risk thresholds on the trend chart - hide from legend
        for threshold, label, color in risk_thresholds[1:-1]:  # Skip first and last
            line_fig.add_trace(
                go.Scatter(
                    x=[bank_data["period"].min(), bank_data["period"].max()],
                    y=[threshold, threshold],
                    mode="lines",
                    line=dict(color=color, width=1, dash="dash"),
                    showlegend=False,  # Hide from legend
                    hoverinfo="text",
                    text=f"{label} Threshold",
                ),
            )

            # Add annotation for threshold line
            line_fig.add_annotation(
                x=bank_data["period"].min(),
                y=threshold,
                text=label,
                showarrow=False,
                font=dict(size=10, color=color),
                xshift=-40,
                align="right",
            )

        # Add colored background regions for risk levels on trend chart
        shapes = []
        for i in range(1, len(risk_thresholds) - 1):
            shapes.append(
                {
                    "type": "rect",
                    "xref": "x",
                    "yref": "y",
                    "x0": bank_data["period"].min(),
                    "y0": risk_thresholds[i][0],
                    "x1": bank_data["period"].max(),
                    "y1": risk_thresholds[i + 1][0],
                    "fillcolor": risk_thresholds[i][2],
                    "opacity": 0.1,
                    "layer": "below",
                    "line_width": 0,
                },
            )

        line_fig.update_layout(shapes=shapes)

    # Update line chart layout
    line_fig.update_layout(
        height=400,
        width=600,
        title_text="Bank Risk Score Trend (2023-2024)",  # Changed from bank name to generic title
        template="plotly_white",
        margin=dict(t=100, b=50),
        xaxis=dict(
            title="Quarter",
            showgrid=True,
            gridwidth=1,
            gridcolor="LightGray",
        ),
        yaxis=dict(
            title="Risk Score (0-100)",
            range=[0, 100],
            showgrid=True,
            gridwidth=1,
            gridcolor="LightGray",
        ),
    )

    return gauge_fig, line_fig


def run_risk_analysis(gsib_pipeline, bank, current_quarter=("2024", "Q4")):
    """Run risk analysis for a single bank with separate gauge and trend charts"""
    # Define quarters
    years = [2023, 2023, 2023, 2023, 2024, 2024, 2024, 2024]
    quarters = ["Q1", "Q2", "Q3", "Q4", "Q1", "Q2", "Q3", "Q4"]

    # Store results
    results = []

    # Calculate risk scores for all quarters
    for year, quarter in zip(years, quarters):
        try:
            # Extract quarter number (Q1->1, Q2->2, etc.)
            quarter_num = int(quarter[1])
            current_quarter_num = int(current_quarter[1][1])

            # Skip if this quarter is in the future
            if year > int(current_quarter[0]) or (
                year == int(current_quarter[0]) and quarter_num > current_quarter_num
            ):
                continue

            score = gsib_pipeline.creditrisk_analyser.calculate_credit_risk_score(
                bank,
                year,
                quarter,
            )

            results.append(
                {
                    "bank": bank,
                    "period": f"{year} {quarter}",
                    "year": year,
                    "quarter": quarter,
                    "risk_score": score["risk_score"],
                    "risk_level": score["risk_level"],
                    "asset_size": score.get("asset_size", "Unknown"),
                },
            )

        except Exception as e:
            print(f"Error calculating {bank} {year} {quarter}: {str(e)}")

    # Convert to DataFrame
    df = pd.DataFrame(results)

    # Create separate visualizations with Plotly - gauge and trend chart
    gauge_fig, line_fig = create_plotly_dashboard(df, bank, current_quarter)

    return df, gauge_fig, line_fig


# Run the analysis and display both charts separately
df, gauge_fig, line_fig = run_risk_analysis(
    gsib_analysis_pipeline, "JPMorgan Chase", current_quarter=("2024", "Q4")
)
gauge_fig.show()
line_fig.show()

In [22]:
df, gauge_fig, line_fig = run_risk_analysis(
    gsib_analysis_pipeline, "UBS", current_quarter=("2024", "Q4")
)
gauge_fig.show()
line_fig.show()

### Sentiment

In [23]:
def get_sentiment_data(db_path, bank_name, year):
    """Extract sentiment data from the database for visualizations"""

    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # Get the bank_id for the given bank name
    cursor.execute("SELECT id FROM banks WHERE bank_name = ?", (bank_name,))
    bank_id = cursor.fetchone()

    if not bank_id:
        conn.close()
        raise ValueError(f"Bank '{bank_name}' not found in the database")

    bank_id = bank_id[0]

    # Get report IDs for all quarters of the specified year
    cursor.execute(
        """
        SELECT id, quarter
        FROM reports
        WHERE bank_id = ? AND year = ?
        ORDER BY
            CASE quarter
                WHEN 'Q1' THEN 1
                WHEN 'Q2' THEN 2
                WHEN 'Q3' THEN 3
                WHEN 'Q4' THEN 4
                ELSE 5
            END
    """,
        (bank_id, year),
    )

    reports = cursor.fetchall()

    if not reports:
        conn.close()
        raise ValueError(f"No reports found for {bank_name} in {year}")

    # Initialize data structures
    quarterly_sentiments = {}
    all_role_sentiments = {}

    # Process each report (quarter)
    for report_id, quarter in reports:
        # Get overall sentiment for this quarter's report
        cursor.execute(
            """
            SELECT AVG(cs.compound_score)
            FROM conversation_sentiment cs
            JOIN analyst_conversations ac ON cs.conversation_id = ac.id
            WHERE ac.report_id = ?
        """,
            (report_id,),
        )

        overall_sentiment = cursor.fetchone()[0]
        quarterly_sentiments[quarter] = overall_sentiment

        # Get sentiments by role for this quarter
        cursor.execute(
            """
            SELECT speaker_role, AVG(compound_score)
            FROM speaker_topic_sentiment
            JOIN analyst_conversations ON speaker_topic_sentiment.conversation_id = analyst_conversations.id
            WHERE analyst_conversations.report_id = ?
            GROUP BY speaker_role
        """,
            (report_id,),
        )

        role_sentiments = {role: score for role, score in cursor.fetchall()}

        # Store role sentiments by quarter
        for role, sentiment in role_sentiments.items():
            if role not in all_role_sentiments:
                all_role_sentiments[role] = {}
            all_role_sentiments[role][quarter] = sentiment

    # For the latest quarter, determine sentiment gap between roles
    latest_quarter = max(
        quarterly_sentiments.keys(),
        key=lambda q: {"Q1": 1, "Q2": 2, "Q3": 3, "Q4": 4}.get(q, 0),
    )

    # Get role sentiments for the latest quarter
    latest_report_id = [
        report_id for report_id, quarter in reports if quarter == latest_quarter
    ][0]

    cursor.execute(
        """
        SELECT speaker_role, AVG(compound_score)
        FROM speaker_topic_sentiment
        JOIN analyst_conversations ON speaker_topic_sentiment.conversation_id = analyst_conversations.id
        WHERE analyst_conversations.report_id = ?
        GROUP BY speaker_role
    """,
        (latest_report_id,),
    )

    latest_role_sentiments = {role: score for role, score in cursor.fetchall()}

    # Calculate sentiment gap between roles
    sentiment_gap = None
    if len(latest_role_sentiments) >= 2:
        roles = list(latest_role_sentiments.keys())
        max_gap = 0
        gap_roles = (roles[0], roles[1])

        for i in range(len(roles)):
            for j in range(i + 1, len(roles)):
                gap = abs(
                    latest_role_sentiments[roles[i]] - latest_role_sentiments[roles[j]]
                )
                if gap > max_gap:
                    max_gap = gap
                    gap_roles = (roles[i], roles[j])

        sentiment_gap = {
            "role1": gap_roles[0],
            "sentiment1": latest_role_sentiments[gap_roles[0]],
            "role2": gap_roles[1],
            "sentiment2": latest_role_sentiments[gap_roles[1]],
            "gap": max_gap,
        }

    conn.close()

    return {
        "overall_sentiment": quarterly_sentiments.get(latest_quarter, 0),
        "role_sentiments": latest_role_sentiments,
        "quarterly_sentiments": quarterly_sentiments,
        "sentiment_gap": sentiment_gap,
        "latest_quarter": latest_quarter,
        "all_role_sentiments": all_role_sentiments,
    }


def create_sentiment_gauge(sentiment_score):
    """Create a gauge chart for the overall sentiment"""

    # Define thresholds for sentiment interpretation
    sentiment_thresholds = [
        (-1.0, "Very Negative", "#7b241c"),  # Dark red
        (-0.6, "Negative", "#c0392b"),  # Red
        (-0.2, "Slightly Negative", "#e74c3c"),  # Light red
        (0.0, "Neutral", "#f39c12"),  # Yellow
        (0.2, "Slightly Positive", "#2ecc71"),  # Light green
        (0.6, "Positive", "#27ae60"),  # Green
        (1.0, "Very Positive", "#1e8449"),  # Dark green
    ]

    # Create steps for gauge
    steps = []
    for i in range(len(sentiment_thresholds) - 1):
        steps.append(
            dict(
                range=[sentiment_thresholds[i][0], sentiment_thresholds[i + 1][0]],
                color=sentiment_thresholds[i][2],
                thickness=0.75,
                line=dict(width=0.5, color="white"),
            ),
        )

    # Determine sentiment label based on score
    sentiment_label = "Unknown"
    for i in range(len(sentiment_thresholds) - 1):
        if (
            sentiment_thresholds[i][0]
            <= sentiment_score
            < sentiment_thresholds[i + 1][0]
        ):
            sentiment_label = sentiment_thresholds[i + 1][1]
            break

    fig = go.Figure()

    fig.add_trace(
        go.Indicator(
            mode="gauge+number",
            value=sentiment_score,
            title={"text": f"Overall Sentiment: {sentiment_label}"},
            gauge={
                "axis": {"range": [-1, 1], "tickwidth": 1, "tickcolor": "darkblue"},
                "bar": {"color": "purple"},  # Purple
                "bgcolor": "white",
                "borderwidth": 2,
                "bordercolor": "gray",
                "steps": steps,
                "threshold": {
                    "line": {"color": "black", "width": 4},
                    "thickness": 0.75,
                    "value": sentiment_score,
                },
            },
        ),
    )

    fig.update_layout(
        height=300,
        width=400,
        margin=dict(t=30, b=30, l=30, r=30),
        template="plotly_white",
    )

    return fig


def create_sentiment_by_role_chart(role_sentiments):
    """Create a bar chart for sentiment by role"""

    roles = list(role_sentiments.keys())
    sentiments = list(role_sentiments.values())

    # Define colors based on sentiment values
    colors = []
    for sentiment in sentiments:
        if sentiment < -0.6:
            colors.append("#7b241c")  # Very negative
        elif sentiment < -0.2:
            colors.append("#c0392b")  # Negative
        elif sentiment < 0:
            colors.append("#e74c3c")  # Slightly negative
        elif sentiment < 0.2:
            colors.append("#f39c12")  # Neutral
        elif sentiment < 0.6:
            colors.append("#2ecc71")  # Slightly positive
        else:
            colors.append("#27ae60")  # Positive

    fig = go.Figure()

    fig.add_trace(
        go.Bar(
            x=roles,
            y=sentiments,
            marker_color=colors,
            text=[f"{s:.2f}" for s in sentiments],
            textposition="auto",
        ),
    )

    fig.update_layout(
        height=300,
        width=500,
        title="Sentiment by Role",
        template="plotly_white",
        margin=dict(t=40, b=30, l=40, r=30),
        yaxis=dict(
            title="Sentiment Score",
            range=[-1, 1],
        ),
        xaxis=dict(
            title="Role",
        ),
    )

    return fig


def create_quarterly_sentiment_chart(quarterly_sentiments):
    """Create a line chart for quarterly sentiment trend"""

    # Sort quarters in chronological order
    quarter_order = {"Q1": 1, "Q2": 2, "Q3": 3, "Q4": 4}
    quarters = sorted(
        quarterly_sentiments.keys(), key=lambda q: quarter_order.get(q, 5)
    )
    sentiments = [quarterly_sentiments[q] for q in quarters]

    fig = go.Figure()

    fig.add_trace(
        go.Scatter(
            x=quarters,
            y=sentiments,
            mode="lines+markers",
            line=dict(width=3, color="#1f77b4"),
            marker=dict(size=10, color="#1f77b4"),
        ),
    )

    # Add a horizontal line at sentiment=0
    fig.add_shape(
        type="line",
        x0=quarters[0],
        y0=0,
        x1=quarters[-1],
        y1=0,
        line=dict(
            color="gray",
            width=1,
            dash="dash",
        ),
    )

    fig.update_layout(
        height=300,
        width=400,
        title="Quarterly Sentiment Trend",
        template="plotly_white",
        margin=dict(t=40, b=30, l=40, r=30),
        yaxis=dict(
            title="Sentiment Score",
            range=[-1, 1],
        ),
        xaxis=dict(
            title="Quarter",
        ),
    )

    return fig


def create_sentiment_gap_card(gap_data):
    """Create a card-style visualization for sentiment gap using an indicator."""

    # Create a figure with subplots
    fig = make_subplots(rows=1, cols=1, specs=[[{"type": "indicator"}]])

    # Determine color based on gap size
    gap_color = (
        "red"
        if gap_data["gap"] > 0.5
        else ("orange" if gap_data["gap"] > 0.3 else "black")
    )

    # Use an indicator for a cleaner, more separated display
    fig.add_trace(
        go.Indicator(
            mode="number+delta",
            value=gap_data["gap"],
            number={"font": {"size": 50, "color": gap_color}},
            title={
                "text": f"<b>Sentiment Gap</b><br>{gap_data['role1']} vs {gap_data['role2']}",
                "font": {"size": 16},
            },
            delta={
                "reference": 0.3,  # Threshold for concern
                "increasing": {"color": "red"},
                "decreasing": {"color": "green"},
            },
        )
    )

    # Add details for individual sentiment scores
    fig.add_annotation(
        x=0.5,
        y=0.15,
        text=f"{gap_data['role1']}: {gap_data['sentiment1']:.2f} | {gap_data['role2']}: {gap_data['sentiment2']:.2f}",
        showarrow=False,
        font=dict(size=14),
        xref="paper",
        yref="paper",
    )

    # Update layout for better spacing and separation
    fig.update_layout(
        height=350,  # Taller than default
        margin=dict(t=100, b=100, l=60, r=60),  # Increased margins
        paper_bgcolor="white",
        plot_bgcolor="white",
        # Add a border around the card
        shapes=[
            dict(
                type="rect",
                xref="paper",
                yref="paper",
                x0=0,
                y0=0,
                x1=1,
                y1=1,
                line=dict(color="rgba(0,0,0,0.2)", width=1),
            ),
        ],
    )

    return fig

In [25]:
bank_name = "JPMorgan Chase"
year = 2024

sentiment_data = get_sentiment_data(DB_PATH, bank_name, year)

print(f"Successfully retrieved sentiment data for {bank_name} in {year}")
print(f"Latest quarter: {sentiment_data['latest_quarter']}")
print(f"Overall sentiment: {sentiment_data['overall_sentiment']:.2f}")
print(f"Roles with sentiment data: {list(sentiment_data['role_sentiments'].keys())}")

# Create figures using real data
gauge_fig = create_sentiment_gauge(sentiment_data["overall_sentiment"])
bar_fig = create_sentiment_by_role_chart(sentiment_data["role_sentiments"])
line_fig = create_quarterly_sentiment_chart(sentiment_data["quarterly_sentiments"])
gap_fig = create_sentiment_gap_card(sentiment_data["sentiment_gap"])

# Display the figures
gauge_fig.show()
bar_fig.show()
line_fig.show()
gap_fig.show()

Successfully retrieved sentiment data for JPMorgan Chase in 2024
Latest quarter: Q4
Overall sentiment: -0.19
Roles with sentiment data: ['CFO', 'Chairman and CEO']


### Topic clustering + Sentiment

In [26]:
def get_topic_data(db_path, bank_name, year, quarter):
    """Extract topic data from the database for visualizations"""

    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # Get the bank_id for the given bank name
    cursor.execute("SELECT id FROM banks WHERE bank_name = ?", (bank_name,))
    bank_id = cursor.fetchone()

    if not bank_id:
        conn.close()
        raise ValueError(f"Bank '{bank_name}' not found in the database")

    bank_id = bank_id[0]

    # Get report ID for the specified quarter
    cursor.execute(
        """
        SELECT id
        FROM reports
        WHERE bank_id = ? AND year = ? AND quarter = ?
    """,
        (bank_id, year, quarter),
    )

    report_id = cursor.fetchone()

    if not report_id:
        conn.close()
        raise ValueError(f"No report found for {bank_name} in {year} {quarter}")

    report_id = report_id[0]

    # Get topic mentions and sentiment data
    cursor.execute(
        """
        SELECT t.name as topic_name,
               COUNT(*) as mention_count,
               AVG(sts.compound_score) as avg_sentiment
        FROM analyst_conversations ac
        JOIN topics t ON ac.topic_id = t.id
        LEFT JOIN speaker_topic_sentiment sts ON sts.conversation_id = ac.id
        WHERE ac.report_id = ?
        GROUP BY t.name
        ORDER BY mention_count DESC
    """,
        (report_id,),
    )

    topic_data = [
        {
            "topic_name": row[0],
            "mention_count": row[1],
            "avg_sentiment": row[2] if row[2] is not None else 0,
        }
        for row in cursor.fetchall()
    ]

    conn.close()

    return {
        "topic_data": topic_data,
        "bank_name": bank_name,
        "year": year,
        "quarter": quarter,
    }


def get_topic_trends(db_path, bank_name, year, topic_names=None):
    """Extract topic sentiment trends data from the database"""

    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # Get the bank_id for the given bank name
    cursor.execute("SELECT id FROM banks WHERE bank_name = ?", (bank_name,))
    bank_id = cursor.fetchone()

    if not bank_id:
        conn.close()
        raise ValueError(f"Bank '{bank_name}' not found in the database")

    bank_id = bank_id[0]

    # Get report IDs for all quarters of the specified year
    cursor.execute(
        """
        SELECT id, quarter
        FROM reports
        WHERE bank_id = ? AND year = ?
        ORDER BY
            CASE quarter
                WHEN 'Q1' THEN 1
                WHEN 'Q2' THEN 2
                WHEN 'Q3' THEN 3
                WHEN 'Q4' THEN 4
                ELSE 5
            END
    """,
        (bank_id, year),
    )

    reports = cursor.fetchall()

    if not reports:
        conn.close()
        raise ValueError(f"No reports found for {bank_name} in {year}")

    # Initialize data structure
    topic_trends = {}

    # Process each report (quarter)
    for report_id, quarter in reports:
        query_params = [report_id]
        topic_filter = ""

        if topic_names:
            topic_filter = "AND t.name IN ({})".format(
                ",".join("?" for _ in topic_names)
            )
            query_params.extend(topic_names)

        # Get topic sentiment for this quarter
        cursor.execute(
            f"""
            SELECT t.name as topic_name,
                   AVG(sts.compound_score) as avg_sentiment
            FROM analyst_conversations ac
            JOIN topics t ON ac.topic_id = t.id
            LEFT JOIN speaker_topic_sentiment sts ON sts.conversation_id = ac.id
            WHERE ac.report_id = ? {topic_filter}
            GROUP BY t.name
        """,
            query_params,
        )

        quarter_data = cursor.fetchall()

        for topic_name, avg_sentiment in quarter_data:
            if topic_name not in topic_trends:
                topic_trends[topic_name] = {}

            topic_trends[topic_name][quarter] = (
                avg_sentiment if avg_sentiment is not None else 0
            )

    conn.close()

    return {
        "topic_trends": topic_trends,
        "bank_name": bank_name,
        "year": year,
        "quarters": [q for _, q in reports],
    }


def get_bank_comparison_data(db_path, bank_names, year, quarter):
    """Extract data for comparing multiple banks"""

    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # Initialize data structures
    risk_metrics = []
    sentiment_data = []

    for bank_name in bank_names:
        # Get the bank_id for the given bank name
        cursor.execute("SELECT id FROM banks WHERE bank_name = ?", (bank_name,))
        bank_id = cursor.fetchone()

        if not bank_id:
            print(f"Bank '{bank_name}' not found in the database, skipping")
            continue

        bank_id = bank_id[0]

        # Get report ID for the specified quarter
        cursor.execute(
            """
            SELECT id
            FROM reports
            WHERE bank_id = ? AND year = ? AND quarter = ?
        """,
            (bank_id, year, quarter),
        )

        report_id = cursor.fetchone()

        if not report_id:
            print(f"No report found for {bank_name} in {year} {quarter}, skipping")
            continue

        report_id = report_id[0]

        # Get risk metrics
        cursor.execute(
            """
            SELECT metric_name, metric_value
            FROM metrics
            WHERE report_id = ?
            AND metric_name IN ('NPL Ratio', 'Coverage Ratio', 'Provisions', 'CET1 Ratio')
        """,
            (report_id,),
        )

        metrics = {metric: value for metric, value in cursor.fetchall()}
        metrics["bank_name"] = bank_name
        risk_metrics.append(metrics)

        # Get sentiment data
        cursor.execute(
            """
            SELECT AVG(cs.compound_score)
            FROM conversation_sentiment cs
            JOIN analyst_conversations ac ON cs.conversation_id = ac.id
            WHERE ac.report_id = ?
        """,
            (report_id,),
        )

        overall_sentiment = cursor.fetchone()[0]

        sentiment_data.append(
            {
                "bank_name": bank_name,
                "overall_sentiment": overall_sentiment
                if overall_sentiment is not None
                else 0,
            }
        )

    conn.close()

    return {
        "risk_metrics": risk_metrics,
        "sentiment_data": sentiment_data,
        "year": year,
        "quarter": quarter,
    }


def get_risk_metrics_trend(db_path, bank_name, year):
    """Extract risk metrics trends for a bank"""

    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # Get the bank_id for the given bank name
    cursor.execute("SELECT id FROM banks WHERE bank_name = ?", (bank_name,))
    bank_id = cursor.fetchone()

    if not bank_id:
        conn.close()
        raise ValueError(f"Bank '{bank_name}' not found in the database")

    bank_id = bank_id[0]

    # Get report IDs for all quarters of the specified year
    cursor.execute(
        """
        SELECT id, quarter
        FROM reports
        WHERE bank_id = ? AND year = ?
        ORDER BY
            CASE quarter
                WHEN 'Q1' THEN 1
                WHEN 'Q2' THEN 2
                WHEN 'Q3' THEN 3
                WHEN 'Q4' THEN 4
                ELSE 5
            END
    """,
        (bank_id, year),
    )

    reports = cursor.fetchall()

    if not reports:
        conn.close()
        raise ValueError(f"No reports found for {bank_name} in {year}")

    # Get all metrics for these reports
    report_ids = [r[0] for r in reports]
    quarters = [r[1] for r in reports]

    placeholders = ",".join("?" for _ in report_ids)

    cursor.execute(
        f"""
        SELECT r.quarter, m.metric_name, m.metric_value, m.in_typical_range
        FROM metrics m
        JOIN reports r ON m.report_id = r.id
        WHERE m.report_id IN ({placeholders})
    """,
        report_ids,
    )

    metrics_data = cursor.fetchall()

    # Transform to structured format
    metrics_by_quarter = {}
    for quarter, metric_name, metric_value, in_typical_range in metrics_data:
        if quarter not in metrics_by_quarter:
            metrics_by_quarter[quarter] = {}

        metrics_by_quarter[quarter][metric_name] = {
            "value": metric_value,
            "in_typical_range": in_typical_range,
        }

    conn.close()

    return {
        "metrics_by_quarter": metrics_by_quarter,
        "bank_name": bank_name,
        "year": year,
        "quarters": quarters,
    }


def create_topic_mentions_plot(db_path, bank_name, year, quarter, n_topics=4):
    """Create a Plotly bar chart for top topics by mention count"""
    try:
        topic_data = get_topic_data(db_path, bank_name, year, quarter)
    except ValueError as e:
        print(str(e))
        return None, []

    # Sort by mention count and select top N
    sorted_topics = sorted(
        topic_data["topic_data"], key=lambda x: x["mention_count"], reverse=True
    )

    top_n = sorted_topics[:n_topics]

    if not top_n:
        print(f"No topic data found for {bank_name} in {year} {quarter}")
        return None, []

    # Create Plotly figure
    fig = go.Figure()

    fig.add_trace(
        go.Bar(
            x=[t["topic_name"] for t in top_n],
            y=[t["mention_count"] for t in top_n],
            marker_color="#4682B4",  # Steel blue color
            text=[t["mention_count"] for t in top_n],
            textposition="auto",
        ),
    )

    fig.update_layout(
        height=400,
        width=600,
        title_text=f"{bank_name} - {year} {quarter}: Top {n_topics} Topics by Mention Count",
        template="plotly_white",
        margin=dict(t=100, b=80, l=40, r=40),
        xaxis=dict(
            title="Topic",
            tickangle=45,
        ),
        yaxis=dict(
            title="Mention Count",
            gridcolor="LightGray",
        ),
    )

    return fig, [t["topic_name"] for t in top_n]


def create_topic_sentiment_plot(
    db_path, bank_name, year, quarter, n_topics=None, topic_names=None
):
    """Create a Plotly bar chart for topic sentiment comparison"""
    try:
        topic_data = get_topic_data(db_path, bank_name, year, quarter)
    except ValueError as e:
        print(str(e))
        return None

    # Filter by provided topic names or use n_topics
    if topic_names:
        selected_topics = [
            t for t in topic_data["topic_data"] if t["topic_name"] in topic_names
        ]
    else:
        # Sort by absolute sentiment to show most polarized topics
        selected_topics = sorted(
            topic_data["topic_data"],
            key=lambda x: abs(x["avg_sentiment"]),
            reverse=True,
        )
        if n_topics:
            selected_topics = selected_topics[:n_topics]

    if not selected_topics:
        print(f"No matching topics found for {bank_name} in {year} {quarter}")
        return None

    # Create Plotly figure
    fig = go.Figure()

    # Create color mapping based on sentiment
    colors = []
    for topic in selected_topics:
        sentiment = topic["avg_sentiment"]
        if sentiment < -0.6:
            colors.append("#7b241c")  # Very negative
        elif sentiment < -0.2:
            colors.append("#c0392b")  # Negative
        elif sentiment < 0:
            colors.append("#e74c3c")  # Slightly negative
        elif sentiment < 0.2:
            colors.append("#f39c12")  # Neutral
        elif sentiment < 0.6:
            colors.append("#2ecc71")  # Slightly positive
        else:
            colors.append("#27ae60")  # Positive

    fig.add_trace(
        go.Bar(
            x=[t["topic_name"] for t in selected_topics],
            y=[t["avg_sentiment"] for t in selected_topics],
            marker_color=colors,
            text=[f"{t['avg_sentiment']:.2f}" for t in selected_topics],
            textposition="auto",
            hovertext=[f"Mentions: {t['mention_count']}" for t in selected_topics],
        ),
    )

    # Add a horizontal line at y=0
    fig.add_shape(
        type="line",
        x0=-0.5,
        y0=0,
        x1=len(selected_topics) - 0.5,
        y1=0,
        line=dict(
            color="black",
            width=1,
            dash="dash",
        ),
    )

    fig.update_layout(
        height=400,
        width=600,
        title_text=f"{bank_name} - {year} {quarter}: Topic Sentiment Analysis",
        template="plotly_white",
        margin=dict(t=100, b=80, l=40, r=40),
        xaxis=dict(
            title="Topic",
            tickangle=45,
        ),
        yaxis=dict(
            title="Average Sentiment Score",
            range=[-1, 1],
            gridcolor="LightGray",
        ),
    )

    return fig


def create_topic_sentiment_trends_plot(db_path, bank_name, year, topic_names=None):
    """Create a Plotly line chart for topic sentiment trends over time"""
    try:
        trend_data = get_topic_trends(db_path, bank_name, year, topic_names)
    except ValueError as e:
        print(str(e))
        return None

    if not trend_data["topic_trends"]:
        print(f"No topic sentiment trend data found for {bank_name}")
        return None

    # Create Plotly figure
    fig = go.Figure()

    # Sort quarters chronologically
    quarter_order = {"Q1": 1, "Q2": 2, "Q3": 3, "Q4": 4}
    quarters = sorted(trend_data["quarters"], key=lambda q: quarter_order.get(q, 5))

    # Define a consistent color palette
    colors = [
        "#1f77b4",  # blue
        "#ff7f0e",  # orange
        "#2ca02c",  # green
        "#d62728",  # red
        "#9467bd",  # purple
        "#8c564b",  # brown
        "#e377c2",  # pink
        "#7f7f7f",  # gray
        "#bcbd22",  # olive
        "#17becf",  # teal
    ]

    # Plot each topic
    i = 0
    for topic_name, sentiment_by_quarter in trend_data["topic_trends"].items():
        # Only include topics with data for at least 2 quarters
        quarters_with_data = [q for q in quarters if q in sentiment_by_quarter]
        if len(quarters_with_data) < 2:
            continue

        # Create data points
        x_values = quarters_with_data
        y_values = [sentiment_by_quarter.get(q, 0) for q in quarters_with_data]

        fig.add_trace(
            go.Scatter(
                x=x_values,
                y=y_values,
                mode="lines+markers",
                name=topic_name,
                line=dict(width=2, color=colors[i % len(colors)]),
                marker=dict(size=8, color=colors[i % len(colors)]),
            ),
        )
        i += 1

    # Add a horizontal line at y=0
    if quarters:
        fig.add_shape(
            type="line",
            x0=quarters[0],
            y0=0,
            x1=quarters[-1],
            y1=0,
            line=dict(
                color="gray",
                width=1,
                dash="dash",
            ),
        )

    fig.update_layout(
        height=400,
        width=600,
        title_text=f"{bank_name}: Topic Sentiment Trends ({year})",
        template="plotly_white",
        margin=dict(t=100, b=80, l=40, r=40),
        xaxis=dict(
            title="Quarter",
            tickangle=45,
        ),
        yaxis=dict(
            title="Average Sentiment Score",
            range=[-1, 1],
            gridcolor="LightGray",
        ),
        legend_title="Topics",
    )

    return fig


def create_bank_comparison_plot(
    db_path, bank_names, year, quarter, metric_type="sentiment"
):
    """Create Plotly comparison charts between banks"""
    try:
        comparison_data = get_bank_comparison_data(db_path, bank_names, year, quarter)
    except Exception as e:
        print(f"Error retrieving comparison data: {str(e)}")
        return None

    if metric_type == "sentiment":
        # Create sentiment comparison plot
        sentiment_data = comparison_data["sentiment_data"]
        if not sentiment_data:
            print("No sentiment data available for comparison")
            return None

        fig = go.Figure()

        # Determine colors based on sentiment
        colors = []
        for bank in sentiment_data:
            sentiment = bank["overall_sentiment"]
            if sentiment < -0.2:
                colors.append("#c0392b")  # Negative - red
            elif sentiment < 0.2:
                colors.append("#f1c40f")  # Neutral - yellow
            else:
                colors.append("#2ecc71")  # Positive - green

        fig.add_trace(
            go.Bar(
                x=[b["bank_name"] for b in sentiment_data],
                y=[b["overall_sentiment"] for b in sentiment_data],
                marker_color=colors,
                text=[f"{s['overall_sentiment']:.2f}" for s in sentiment_data],
                textposition="auto",
            ),
        )

        # Add a horizontal line at y=0
        fig.add_shape(
            type="line",
            x0=-0.5,
            y0=0,
            x1=len(sentiment_data) - 0.5,
            y1=0,
            line=dict(
                color="gray",
                width=1,
                dash="dash",
            ),
        )

        fig.update_layout(
            height=400,
            width=600,
            title_text=f"Bank Sentiment Comparison - {year} {quarter}",
            template="plotly_white",
            margin=dict(t=100, b=50, l=40, r=40),
            xaxis=dict(
                title="Bank",
            ),
            yaxis=dict(
                title="Overall Sentiment",
                range=[-1, 1],
                gridcolor="LightGray",
            ),
        )

    else:
        # Create risk metric comparison plot
        risk_metrics = comparison_data["risk_metrics"]
        if not risk_metrics:
            print("No risk metric data available for comparison")
            return None

        # Find the most common available metric
        available_metrics = ["NPL Ratio", "Coverage Ratio", "Provisions", "CET1 Ratio"]
        selected_metric = None

        for metric in available_metrics:
            count = sum(1 for bank in risk_metrics if metric in bank)
            if count >= len(risk_metrics) / 2:  # Available in at least half of banks
                selected_metric = metric
                break

        if not selected_metric:
            print("No consistent risk metric available across banks")
            return None

        # Filter banks with this metric
        filtered_data = [
            {"bank_name": bank["bank_name"], "value": bank[selected_metric]}
            for bank in risk_metrics
            if selected_metric in bank
        ]

        if not filtered_data:
            return None

        fig = go.Figure()

        # Determine colors based on metric type (higher is better or worse)
        if selected_metric in ["NPL Ratio"]:
            # Lower is better
            median_value = np.median([b["value"] for b in filtered_data])
            colors = [
                "#2ecc71" if b["value"] < median_value else "#c0392b"
                for b in filtered_data
            ]
        else:
            # Higher is better
            median_value = np.median([b["value"] for b in filtered_data])
            colors = [
                "#2ecc71" if b["value"] > median_value else "#c0392b"
                for b in filtered_data
            ]

        fig.add_trace(
            go.Bar(
                x=[b["bank_name"] for b in filtered_data],
                y=[b["value"] for b in filtered_data],
                marker_color=colors,
                text=[f"{b['value']:.2f}" for b in filtered_data],
                textposition="auto",
            ),
        )

        fig.update_layout(
            height=400,
            width=600,
            title_text=f"Bank Comparison: {selected_metric} - {year} {quarter}",
            template="plotly_white",
            margin=dict(t=100, b=50, l=40, r=40),
            xaxis=dict(
                title="Bank",
            ),
            yaxis=dict(
                title=selected_metric,
                gridcolor="LightGray",
            ),
        )

    return fig


def create_risk_metrics_plot(db_path, bank_name, year, metric_name=None):
    """Create Plotly line charts for risk metrics trends"""
    try:
        metrics_data = get_risk_metrics_trend(db_path, bank_name, year)
    except ValueError as e:
        print(str(e))
        return None

    if not metrics_data["metrics_by_quarter"]:
        print(f"No risk metrics data found for {bank_name} in {year}")
        return None

    # Identify available metrics
    all_metrics = set()
    for quarter_data in metrics_data["metrics_by_quarter"].values():
        all_metrics.update(quarter_data.keys())

    # If metric_name is provided, check if it's available
    if metric_name and metric_name not in all_metrics:
        print(f"Metric '{metric_name}' not found for {bank_name}")
        metric_name = None

    # If no specific metric requested, take the first available one
    if not metric_name:
        # Prefer some common metrics if available
        preferred_metrics = ["NPL Ratio", "Coverage Ratio", "CET1 Ratio", "ROE", "ROA"]
        for metric in preferred_metrics:
            if metric in all_metrics:
                metric_name = metric
                break

        # If none of the preferred metrics are available, use the first one
        if not metric_name and all_metrics:
            metric_name = next(iter(all_metrics))

    if not metric_name:
        print("No metrics found for plotting")
        return None

    # Create Plotly figure
    fig = go.Figure()

    # Sort quarters chronologically
    quarter_order = {"Q1": 1, "Q2": 2, "Q3": 3, "Q4": 4}
    quarters = sorted(metrics_data["quarters"], key=lambda q: quarter_order.get(q, 5))

    # Get values for the selected metric across quarters
    values = []
    in_range_status = []

    for quarter in quarters:
        quarter_data = metrics_data["metrics_by_quarter"].get(quarter, {})
        metric_data = quarter_data.get(metric_name)

        if metric_data:
            values.append(metric_data["value"])
            in_range_status.append(metric_data.get("in_typical_range", True))
        else:
            values.append(None)
            in_range_status.append(True)

    # Remove None values while preserving quarter alignment
    plot_quarters = []
    plot_values = []
    plot_colors = []

    for i, val in enumerate(values):
        if val is not None:
            plot_quarters.append(quarters[i])
            plot_values.append(val)
            # Use red for out-of-range values
            plot_colors.append("#c0392b" if in_range_status[i] is False else "#1f77b4")

    if not plot_values:
        print(f"No data available for {metric_name}")
        return None

    # Add traces for the metric
    fig.add_trace(
        go.Scatter(
            x=plot_quarters,
            y=plot_values,
            mode="lines+markers",
            line=dict(width=3, color="#1f77b4"),
            marker=dict(
                size=10,
                color=plot_colors,
            ),
            text=[f"{v:.2f}" for v in plot_values],
            textposition="top center",
        ),
    )

    fig.update_layout(
        height=400,
        width=600,
        title_text=f"{bank_name}: {metric_name} Trend ({year})",
        template="plotly_white",
        margin=dict(t=100, b=80, l=40, r=40),
        xaxis=dict(
            title="Quarter",
            tickangle=45,
        ),
        yaxis=dict(
            title=metric_name,
            gridcolor="LightGray",
        ),
    )

    return fig


def run_comprehensive_topic_analysis(
    db_path, bank_name, year, quarter, n_topics=4, display=True
):
    """Run a comprehensive topic analysis and display all visualizations"""

    # Create topic mentions plot
    mentions_fig, top_topics = create_topic_mentions_plot(
        db_path,
        bank_name,
        year,
        quarter,
        n_topics,
    )

    # Create topic sentiment plot for the same topics
    sentiment_fig = create_topic_sentiment_plot(
        db_path,
        bank_name,
        year,
        quarter,
        topic_names=top_topics,
    )

    # Create topic trends plot
    trends_fig = create_topic_sentiment_trends_plot(
        DB_PATH,
        bank_name,
        year,
        topic_names=top_topics,
    )

    # Display the figures
    if display:
        if mentions_fig:
            mentions_fig.show()
        if sentiment_fig:
            sentiment_fig.show()
        if trends_fig:
            trends_fig.show()

In [27]:
# Display the plot
run_comprehensive_topic_analysis(DB_PATH, "JPMorgan Chase", 2023, "Q4")
run_comprehensive_topic_analysis(DB_PATH, "UBS", 2023, "Q4")

## 🔍 Testing FAISS Semantic Search

In [None]:
user_query = "What is the affecct of tighter lending standards?"
results = gsib_analysis_pipeline.faiss_manager.hybrid_search(user_query, k=3)

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

In [None]:
results

[{'vector_embedding_id': 343,
  'preprocessed_text_id': 2058,
  'qa_pair_id': 343,
  'question': "All fair points. And maybe just a follow-up on John's question on the lending environment. You talked about the industry likely pulling back. Are you changing your underwriting standards in any way? Just trying to think through, is there a potential for some market share gains given your strength of capital and liquidity, or how are you thinking about the loan environment?",
  'answer': "Yes. And we always say, right, we underwrite through the cycle. And I think notably, we don't loosen our underwriting standards when all the numbers looked crazy good during the pandemic. And we're not going to like overreact now and tighten unreasonably. Some of that correction happens naturally. Credit metrics deteriorate for borrowers, whether in consumer or wholesale and that might make them leave our pre-existing risk appetite. But we're not running around aggressively tightening standards right now."

In [None]:
# Pass to LLM with context
contexts = [result["text"] for result in results]
prompt = f"Answer based on this context: {' '.join(contexts)}\n\nQuestion: {user_query}"
response = llm.generate(prompt)