# Email Search AI

<img src="email_search_ai.png" style="display:block; margin-left:auto; margin-right:auto;"/>

## Problem Statement

In enterprise environments, email threads often contain critical discussions, decisions, and context across multiple stakeholders. However, locating specific information within large, unstructured, and nested email threads is time-consuming and inefficient using conventional keyword-based search tools. Professionals face challenges in extracting relevant insights without wading through entire conversations manually. There is a pressing need for an intelligent system that enables **semantic search** and **automated summarization** of email threads.

## Project Overview

**HelpMate_Email Search_AI** is an end-to-end **RAG-based AI assistant** designed for semantic search and summarization of email threads. It uses **Sentence Transformers** for creating dense **vector embeddings** of email content, stores them in a **ChromaDB vector store**, and enables **semantic index search**. Upon receiving a user query, it retrieves the most relevant chunks using **vector similarity**, improves result quality through **cross-encoder-based reranking**, and finally, generates context-aware responses using **OpenAI's LLM (e.g., GPT-3.5-turbo)**.

The system employs a **Retrieval-Augmented Generation (RAG)** architecture with **caching** for efficiency and performance.

## Project Objectives

- **Semantic Understanding of Emails**: Use **Sentence Transformers (all-MiniLM-L6-v2)** to convert email chunks into vector representations capturing semantic meaning.

- **Vector Database Indexing**: Store email embeddings in **ChromaDB**, enabling fast approximate nearest neighbor (ANN) vector search.

- **Semantic Search & Retrieval**: Support user queries via **embedding-based similarity search** across indexed email chunks.

- **Result Reranking**: Improve retrieval accuracy with **cross-encoder reranking (ms-marco-MiniLM-L-6-v2)**, scoring relevance between query and result pairs.

- **Contextual Answer Generation**: Use a **Retrieval-Augmented Generation (RAG)** pipeline to feed retrieved results into OpenAI GPT models for answer synthesis.

- **Query Caching**: Implement a file-based **caching** layer to store query results and avoid repeated computations.

## Functional Features


| **Component**                   | **Description**                                                                 | **Technology Used**                             |
|--------------------------------|---------------------------------------------------------------------------------|-------------------------------------------------|
| **Email Preprocessing**        | Cleans raw email bodies by removing quoted replies and normalizing text         | `Regex`, custom cleaning                        |
| **Chunking**                   | Splits cleaned emails into overlapping token-limited chunks                     | Custom logic                                    |
| **Embeddings**                 | Transforms email chunks into dense semantic vectors                             | `SentenceTransformer` (`all-MiniLM-L6-v2`)      |
| **Vector Indexing**            | Stores and indexes embeddings for fast similarity search                        | `ChromaDB`                                      |
| **Query Embedding**            | Embeds natural language queries for semantic comparison                         | `SentenceTransformer`                           |
| **Initial Vector Search**      | Retrieves top-N similar email chunks using ANN search                           | `ChromaDB`                                      |
| **Reranking**                  | Reorders retrieved results by true semantic relevance                           | `CrossEncoder` (`ms-marco-MiniLM-L-6-v2`)       |
| **Retrieval-Augmented Generation (RAG)** | Combines retrieved chunks with the query to form a prompt for GPT      | `OpenAI GPT-3.5-turbo`                          |
| **Answer Generation**          | Synthesizes a coherent answer based on context                                  | `OpenAI Chat Completion API`                    |
| **Caching**                    | Stores query results using hashed query keys for faster repeat access           | JSON file-based custom `Cache` class            |


In [1]:
# Import the libraries
import os
import re
import pandas as pd
from typing import List, Dict
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
from chromadb.utils import embedding_functions

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
#Install required modules
!pip3 install -r requirements.txt

In [2]:
#Let's check the version of OpenAI
import openai
print(openai.__version__)

1.95.1


In [3]:
# Read key from text file
with open("openai_key.txt", "r") as f:
    api_key = f.read().strip()

In [4]:
# Pass key to OpenAI client
from openai import OpenAI
client = OpenAI(api_key=api_key)

In [5]:
# Read the input dataset
df_email_thread = pd.read_csv("email_dataset/email_threads.csv")
df_email_thread.head()

Unnamed: 0,thread_id,subject,from,to,timestamp,body
0,1001,Project Falcon Delay,alice@example.com,bob@example.com,2025-07-01 09:15:00,"Hi Bob,\nWe are experiencing delays in Project..."
1,1001,Project Falcon Delay,bob@example.com,alice@example.com,2025-07-01 10:00:00,Thanks for the update. Can you send revised ti...
2,1001,Project Falcon Delay,alice@example.com,bob@example.com,2025-07-01 11:30:00,Sure. Revised delivery expected by July 14th.\...
3,1002,Q3 Marketing Budget,carol@example.com,team@example.com,2025-06-15 14:30:00,Finance approved 10% increase in marketing for...
4,1002,Q3 Marketing Budget,dave@example.com,team@example.com,2025-06-15 15:00:00,Thanks Carol. Please proceed accordingly.\n-Dave


In [9]:
df_email_thread.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 22 entries, 0 to 21
Data columns (total 6 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   thread_id  22 non-null     int64 
 1   subject    22 non-null     object
 2   from       22 non-null     object
 3   to         22 non-null     object
 4   timestamp  22 non-null     object
 5   body       22 non-null     object
dtypes: int64(1), object(5)
memory usage: 1.2+ KB


**Description**:
The email_thread_details file provides a detailed perspective on individual email threads, encompassing crucial information such as subject, timestamp, sender, recipients, and the content of the email.

**Columns**:
- **thread_id**: A unique identifier for each email thread.
- **subject**: Subject of the email thread.
- **timestamp**: Timestamp indicating when the message was sent.
- **from**: Sender of the email.
- **to**: List of recipients of the email.
- **body**: Content of the email message.

## Overall strcture of the code

## Embedding Layer

<img src="embedding_layer_flow.jpg" style="display:block; margin-left:auto; margin-right:auto;"/>

In [13]:
# src/embedding_layer.py

# Import the libraries
import os
import re
import pandas as pd
from typing import List, Dict
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
from chromadb.utils import embedding_functions

# === CLEANING FUNCTIONS ===

def clean_email_body(body: str) -> str:
    body = re.sub(r'[\r\n]+', ' ', body)  # remove newlines
    body = re.sub(r'\s+', ' ', body)  # normalize spaces
    body = re.sub(r'On .* wrote:', '', body)  # remove quoted replies
    return body.strip()


# === CHUNKING STRATEGY ===

def chunk_text(text: str, max_tokens: int = 512, overlap: int = 50) -> List[str]:
    words = text.split()
    chunks = []
    for i in range(0, len(words), max_tokens - overlap):
        chunk = ' '.join(words[i:i + max_tokens])
        if len(chunk.split()) > 10:
            chunks.append(chunk)
    return chunks


# === EMBEDDING + CHROMA ===

class EmbeddingProcessor:
    def __init__(self, client, chroma_path: str = "chroma_db", model_name: str = "all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)
        self.client = client
        self.chroma_collection = self.client.get_or_create_collection(name="email_chunks")

    def process_emails(self, df: pd.DataFrame):
        all_chunks = []
        metadatas = []

        for idx, row in df.iterrows():
            cleaned_body = clean_email_body(row['body'])
            chunks = chunk_text(cleaned_body)

            for i, chunk in enumerate(chunks):
                chunk_id = f"{row['thread_id']}_{idx}_{i}"
                all_chunks.append({
                    "id": chunk_id,
                    "text": chunk,
                    "metadata": {
                        "thread_id": row["thread_id"],
                        "subject": row["subject"],
                        "from": row["from"],
                        "timestamp": row["timestamp"]
                    }
                })

        print(f"Embedding {len(all_chunks)} chunks...")
        embeddings = self.model.encode([c['text'] for c in all_chunks], show_progress_bar=True).tolist()

        self.chroma_collection.add(
            documents=[c['text'] for c in all_chunks],
            embeddings=embeddings,
            metadatas=[c['metadata'] for c in all_chunks],
            ids=[c['id'] for c in all_chunks]
        )
        

    def get_collection(self):
        return self.chroma_collection


## Cache Layer

<img src="cache_layer_flow1.jpg" style="display:block; margin-left:auto; margin-right:auto;"/>

In [17]:
# src/cache.py

import json
import os


class Cache:
    def __init__(self, cache_file: str):
        self.cache_file = cache_file
        if os.path.exists(cache_file):
            with open(cache_file, "r") as f:
                self.cache = json.load(f)
        else:
            self.cache = {}

    def contains(self, key: str) -> bool:
        return key in self.cache

    def get(self, key: str):
        return self.cache.get(key, None)

    def set(self, key: str, value):
        self.cache[key] = value
        self._save()

    def _save(self):
        with open(self.cache_file, "w") as f:
            json.dump(self.cache, f, indent=2)


## Utils

In [20]:
import numpy as np

def convert_np_types(obj):
    if isinstance(obj, dict):
        return {k: convert_np_types(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_np_types(i) for i in obj]
    elif isinstance(obj, np.float32) or isinstance(obj, np.float64):
        return float(obj)
    elif isinstance(obj, np.int32) or isinstance(obj, np.int64):
        return int(obj)
    else:
        return obj


## Search Layer

<img src="search_layer_flow.jpg" style="display:block; margin-left:auto; margin-right:auto;"/>

In [24]:
# src/search_layer.py

import hashlib
import json
import os
from typing import List, Dict

import numpy as np
from sentence_transformers import SentenceTransformer, CrossEncoder
import chromadb
from chromadb.config import Settings

#from .cache import Cache

# === CONFIGURATION ===
CACHE_PATH = "cache/search_cache.json"
CHROMA_PATH = "chroma_db"

class SearchEngine:
    def __init__(
        self,
        client,
        embedding_model_name: str = "all-MiniLM-L6-v2",
        cross_encoder_model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
    ):
        self.embedder = SentenceTransformer(embedding_model_name)
        self.reranker = CrossEncoder(cross_encoder_model_name)
        self.cache = Cache(CACHE_PATH)

        self.client = client
        self.collection = self.client.get_or_create_collection(name="email_chunks")

    def embed_query(self, query: str) -> List[float]:
        return self.embedder.encode(query).tolist()

    def search(self, query: str, top_k: int = 5, filter_thread_id: int = None) -> List[Dict]:
        query_hash = hashlib.md5(query.encode()).hexdigest()

        if self.cache.contains(query_hash):
            return self.cache.get(query_hash)

        query_embedding = self.embed_query(query)

        search_args = {
                            "query_embeddings": [query_embedding],
                            "n_results": top_k * 2,  # get more to allow for reranking
                      }
        
        if filter_thread_id is not None:
            search_args["where"] = {"thread_id": int(filter_thread_id)}

        results = self.collection.query(**search_args)

        documents = results["documents"][0]
        metadatas = results["metadatas"][0]

        # === Re-ranking ===
        pairs = [(query, doc) for doc in documents]
        scores = self.reranker.predict(pairs)

        reranked = sorted(zip(documents, metadatas, scores), key=lambda x: x[2], reverse=True)

        top_chunks = [
            {"chunk": doc, "metadata": meta, "score": score}
            for doc, meta, score in reranked[:top_k]
        ]

        scored_chunks = convert_np_types(top_chunks)

        self.cache.set(query_hash, scored_chunks)
        return top_chunks


## Generation Layer

<img src="generation_layer_flow.jpg" style="display:block; margin-left:auto; margin-right:auto;"/>

In [64]:
# src/generation_layer.py

import openai
from typing import List, Dict
import os

# Set your OpenAI API key securely
openai.api_key = os.getenv("OPENAI_API_KEY")

# === Prompt Template ===

def build_prompt(query: str, chunks: List[Dict], few_shot: bool = False) -> str:
    prompt = "You are an assistant that summarizes and extracts insights from corporate email threads.\n"
    prompt += "Given a user query and the relevant email thread excerpts, answer the question concisely and accurately. answer the question based on what is explicitly stated in the emails\n\n"

    if few_shot:
        prompt += (
            "Example:\n"
            "Query: What was the decision on the marketing budget for Q2?\n"
            "Context:\n"
            "- The marketing team proposed a 20% increase for digital campaigns.\n"
            "- Finance approved a 10% increase after negotiation.\n"
            "Answer: A 10% increase in the Q2 marketing budget was approved after negotiation.\n\n"
        )

    prompt += f"Query: {query}\nContext:\n"
    for idx, chunk in enumerate(chunks):
        prompt += f"- {chunk['chunk']}\n"
    prompt += "\nAnswer:"
    return prompt


# === Generator Function ===

def generate_answer(query: str, chunks: List[Dict], model: str = "gpt-3.5-turbo") -> str:
    prompt = build_prompt(query, chunks, few_shot=True)

    try:
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.3,
            max_tokens=300
        )
        answer = response.choices[0].message.content.strip()
        return answer
    except Exception as e:
        print(f"Error calling OpenAI API: {e}")
        return "Sorry, I couldn't generate a response due to an error."


In [30]:
# main.py

#import pandas as pd
#from src.embedding_layer import EmbeddingProcessor
#from src.search_layer import SearchEngine
#from src.generation_layer import generate_answer

# === CONFIG ===
DATA_PATH = "email_dataset/email_threads.csv"
TOP_K = 3

# === LOAD DATA ===
print("Loading dataset...")
df = pd.read_csv(DATA_PATH).dropna(subset=["body"])

chroma_client = chromadb.Client(Settings(persist_directory="chroma_db"))

# === EMBEDDING PHASE ===
print("Embedding data...")
embedder = EmbeddingProcessor(client=chroma_client)
embedder.process_emails(df)

# === SEARCH PHASE ===
search_engine = SearchEngine(client=chroma_client)

# === QUERIES TO TEST ===
queries = [
    "What summary does the thread provide about delays in project delivery?",
    "What decision was made about budget increase in email thread about resource allocation?",
    "What strategy was proposed in thread_id 100 regarding risk management?"
]

# === RUN PIPELINE ===
for query in queries:
    print(f"\n=== QUERY: {query} ===")

    # Search
    top_chunks = search_engine.search(query, top_k=TOP_K)

    # Print top chunks
    print("\nTop Retrieved Chunks:")
    for i, chunk in enumerate(top_chunks):
        print(f"\n--- Chunk {i+1} ---")
        print(f"{chunk['chunk']}")
        print(f"Metadata: {chunk['metadata']}")

    # Generate Answer
    answer = generate_answer(query, top_chunks)
    print("\nGenerated Answer:")
    print(answer)


Loading dataset...


Failed to send telemetry event ClientStartEvent: capture() takes 1 positional argument but 3 were given


Embedding data...


Failed to send telemetry event ClientCreateCollectionEvent: capture() takes 1 positional argument but 3 were given


Embedding 10 chunks...


Batches: 100%|██████████| 1/1 [00:01<00:00,  1.34s/it]
Failed to send telemetry event CollectionAddEvent: capture() takes 1 positional argument but 3 were given
Failed to send telemetry event ClientCreateCollectionEvent: capture() takes 1 positional argument but 3 were given



=== QUERY: What summary does the thread provide about delays in project delivery? ===

Top Retrieved Chunks:

--- Chunk 1 ---
Hi Bob, We are experiencing delays in Project Falcon due to supplier issues. Expect 2-week delay. Regards, Alice
Metadata: {'from': 'alice@example.com', 'subject': 'Project Falcon Delay', 'thread_id': 1001, 'timestamp': '2025-07-01 09:15:00'}

--- Chunk 2 ---
Requesting remote work extension till July 31 due to personal reasons. -Tina
Metadata: {'from': 'tina@example.com', 'subject': 'Remote Work Extension Request', 'thread_id': 1009, 'timestamp': '2025-07-12 09:30:00'}

--- Chunk 3 ---
Thanks for the update. Can you send revised timeline? Thanks, Bob
Metadata: {'from': 'bob@example.com', 'subject': 'Project Falcon Delay', 'thread_id': 1001, 'timestamp': '2025-07-01 10:00:00'}

Generated Answer:
Delays in Project Falcon are due to supplier issues, resulting in a 2-week delay. Tina requested a remote work extension until July 31.

=== QUERY: What decision was ma

### Query wise execution & generate results

### Self designed Queries

Here are the 3 required queries (as used in main.py):

1. What summary does the thread provide about delays in project delivery?

2. What decision was made about budget increase in email thread about resource allocation?

3. Was the procurement request for new laptops approved?

### Query1 : What summary does the thread provide about delays in project delivery?

In [32]:
query1 = "What summary does the thread provide about delays in project delivery?"
TOP_K = 3

print(f"\n=== QUERY: {query1} ===")

# Search Layer outputs
top_chunks1 = search_engine.search(query1, top_k=TOP_K)

# Print top chunks
print("\nTop Retrieved Chunks:")
for i, chunk in enumerate(top_chunks1):
    print(f"\n--- Chunk {i+1} ---")
    print(f"{chunk['chunk']}")
    print(f"Metadata: {chunk['metadata']}")


=== QUERY: What summary does the thread provide about delays in project delivery? ===

Top Retrieved Chunks:

--- Chunk 1 ---
Hi Bob, We are experiencing delays in Project Falcon due to supplier issues. Expect 2-week delay. Regards, Alice
Metadata: {'from': 'alice@example.com', 'subject': 'Project Falcon Delay', 'thread_id': 1001, 'timestamp': '2025-07-01 09:15:00'}

--- Chunk 2 ---
Requesting remote work extension till July 31 due to personal reasons. -Tina
Metadata: {'from': 'tina@example.com', 'subject': 'Remote Work Extension Request', 'thread_id': 1009, 'timestamp': '2025-07-12 09:30:00'}

--- Chunk 3 ---
Thanks for the update. Can you send revised timeline? Thanks, Bob
Metadata: {'from': 'bob@example.com', 'subject': 'Project Falcon Delay', 'thread_id': 1001, 'timestamp': '2025-07-01 10:00:00'}


In [34]:
# Generatove layer Outputs
open_ai_output1 = generate_answer(query1, top_chunks1)
print("\nGenerated Layer Output:")
print(open_ai_output1)


Generated Layer Output:
The thread indicates delays in Project Falcon due to supplier issues, resulting in a 2-week delay. Tina requested a remote work extension until July 31.


### Query2 : What decision was made about budget increase in email thread about resource allocation?

In [37]:
query2 = "What decision was made about budget increase in email thread about resource allocation?"
TOP_K = 3

print(f"\n=== QUERY: {query2} ===")

# Search Layer outputs
top_chunks2 = search_engine.search(query2, top_k=TOP_K)

# Print top chunks
print("\nTop Retrieved Chunks:")
for i, chunk in enumerate(top_chunks2):
    print(f"\n--- Chunk {i+1} ---")
    print(f"{chunk['chunk']}")
    print(f"Metadata: {chunk['metadata']}")


=== QUERY: What decision was made about budget increase in email thread about resource allocation? ===

Top Retrieved Chunks:

--- Chunk 1 ---
Finance approved 10% increase in marketing for Q3. Please adjust campaigns accordingly. Carol
Metadata: {'from': 'carol@example.com', 'subject': 'Q3 Marketing Budget', 'thread_id': 1002, 'timestamp': '2025-06-15 14:30:00'}

--- Chunk 2 ---
Thanks for the update. Can you send revised timeline? Thanks, Bob
Metadata: {'from': 'bob@example.com', 'subject': 'Project Falcon Delay', 'thread_id': 1001, 'timestamp': '2025-07-01 10:00:00'}

--- Chunk 3 ---
Some receipts were missing. Please upload them to get full reimbursement. -Accounts
Metadata: {'from': 'accounts@example.com', 'subject': 'Expense Report Clarification', 'thread_id': 1010, 'timestamp': '2025-08-01 14:30:00'}


In [70]:
# Generatove layer Outputs
open_ai_output2 = generate_answer(query2, top_chunks2)
print("\nGenerated Layer Output:")
print(open_ai_output2)


Generated Layer Output:
A 10% increase in marketing budget for Q3 was approved by Finance.


### Query3 : Was the procurement request for new laptops approved?

In [66]:
query3 = "Was the procurement request for new laptops approved?"
TOP_K = 3

print(f"\n=== QUERY: {query3} ===")

# Search Layer outputs
top_chunks3 = search_engine.search(query3, top_k=TOP_K)

# Print top chunks
print("\nTop Retrieved Chunks:")
for i, chunk in enumerate(top_chunks3):
    print(f"\n--- Chunk {i+1} ---")
    print(f"{chunk['chunk']}")
    print(f"Metadata: {chunk['metadata']}")


=== QUERY: Was the procurement request for new laptops approved? ===

Top Retrieved Chunks:

--- Chunk 1 ---
Requesting approval for purchasing new laptops for dev team. Cost: ₹3,00,000. -Admin
Metadata: {'from': 'admin@example.com', 'subject': 'Procurement Request Approval', 'thread_id': 1008, 'timestamp': '2025-07-28 10:00:00'}

--- Chunk 2 ---
Requesting remote work extension till July 31 due to personal reasons. -Tina
Metadata: {'from': 'tina@example.com', 'subject': 'Remote Work Extension Request', 'thread_id': 1009, 'timestamp': '2025-07-12 09:30:00'}

--- Chunk 3 ---
Kavita Sharma will join as intern in product team starting 5th Aug. Recruiter
Metadata: {'from': 'recruiter@example.com', 'subject': 'New Intern Joining', 'thread_id': 1004, 'timestamp': '2025-08-01 12:00:00'}


In [68]:
# Generatove layer Outputs
open_ai_output3 = generate_answer(query3, top_chunks3)
print("\nGenerated Layer Output:")
print(open_ai_output3)


Generated Layer Output:
The procurement request for new laptops for the dev team at a cost of ₹3,00,000 was not explicitly addressed in the email thread excerpts provided.


## Batch Evaluator

In [None]:
import pandas as pd
from tqdm import tqdm
from evaluate import load
#from src.search_layer import SearchEngine
#from src.generation_layer import generate_answer

# Load ROUGE metric
rouge = load("rouge")

# Load datasets
threads_df = pd.read_csv("email_dataset/email_threads.csv")
summaries_df = pd.read_csv("email_dataset/email_summaries.csv")

# Initialize search engine
#search_engine = SearchEngine()

# Number of samples to test (limit for speed, e.g., 10)
N = 10

# Collect results
results = []

for i in tqdm(range(N)):
    row = summaries_df.iloc[i]
    thread_id = row['thread_id']
    reference_summary = row['summary']

    # Get full email body for the thread
    thread_emails = threads_df[threads_df['thread_id'] == thread_id]
    email_texts = thread_emails['body'].tolist()

    if not email_texts:
        continue

    query = "Summarize the key decisions made in this email thread."
    # Perform search on chunks
    top_chunks = search_engine.search(query, top_k=3, filter_thread_id=thread_id)

    # If no chunks found, skip
    if not top_chunks:
        continue

    # Generate answer from chunks
    try:
        generated_summary = generate_answer(query, top_chunks)
    except Exception as e:
        print(f"Error generating summary: {e}")
        continue

    # Compute ROUGE scores
    rouge_scores = rouge.compute(predictions=[generated_summary], references=[reference_summary])

    results.append({
        "thread_id": thread_id,
        "query": query,
        "reference_summary": reference_summary,
        "generated_summary": generated_summary,
        "rouge1": rouge_scores["rouge1"],
        "rouge2": rouge_scores["rouge2"],
        "rougeL": rouge_scores["rougeL"]
    })

# Save to CSV
results_df = pd.DataFrame(results)
results_df.to_csv("batch_evaluation_results.csv", index=False)

print("✅ Batch evaluation complete. Results saved to `batch_evaluation_results.csv`.")


## Future Enhancements

**Embedding Layer Enhancements:**

- Parallelize or batch chunking and embedding for large datasets.

- Support multilingual email embedding using a multilingual transformer model (e.g., distiluse-base-multilingual-cased).

- Add logging and error handling during embedding and chunking.

- Deduplicate similar chunks before storing in the vector DB to reduce redundancy.

- Store additional metadata (e.g., department, priority) to enable advanced filtering during search.

**Cahce Layer Enhancements:**

- Replace JSON with Redis or SQLite for faster lookup and persistence in multi-user environments.

- Add cache eviction policy (e.g., LRU) to avoid unlimited growth.

- Track cache hit/miss stats for performance analytics.

- Encrypt cache contents if storing sensitive queries or responses.

**Search Layer Enhancements:**

- Improve reranking with better cross-encoders like bge-reranker or cohere models.

- Add semantic filters beyond thread_id (e.g., date, sender, topic).

- Support multi-query or follow-up query handling (e.g., thread-based QA).

- Paginate results and allow sorting based on relevance, timestamp, etc.

- Expose the search as an API with configurable parameters.

**Generation Layer Enhancements:**

- Use function calling / structured output instead of plain text (for automation).

- Support custom prompt templates per use case (summarization, classification, etc.).

- Switch to a self-hosted model (e.g., LLaMA 3, Mistral) for cost and privacy control.

- Limit token count dynamically to avoid truncation of large prompts.

- Stream responses if using GPT-4-turbo for better UX.

**Overall Architecture Enhancements**

- Centralized logging and monitoring (e.g., using logging, Sentry, or Prometheus).

- Unit and integration tests for all layers to ensure robustness.

- Add retry mechanisms for external API calls (OpenAI, Chroma).

- Implement role-based access control (RBAC) if deployed in an enterprise environment.

- Deploy as a containerized microservice (Docker + FastAPI) with endpoints for embedding, search, and generation.

- Add a front-end interface for uploading emails, searching threads, and viewing generated insights.