In [None]:
%pip install langchain-community chromadb ollama

# Need to install Ollama from their website (https://ollama.com/download) and then run
# ollama pull nomic-embed-text
# ollama pull llama2
# ollama serve

In [1]:
# thank you Claude.ai

import os
import mailbox
import re
import time
from typing import List, Dict, Generator
from tqdm import tqdm

from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import OllamaEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.chat_models import ChatOllama
from langchain.chains import RetrievalQA

class LargeGmailMboxRAGAgent:
    def __init__(self, mbox_path: str, embedding_model: str = 'nomic-embed-text', persist_directory: str = "./email_db"):
        """
        Initialize the Gmail Mbox RAG Agent for large files
        
        :param mbox_path: Path to the mbox file
        :param embedding_model: Embedding model to use
        :param persist_directory: Directory to store the vector database
        """
        self.mbox_path = mbox_path
        self.persist_directory = persist_directory
        self.embeddings = OllamaEmbeddings(model=embedding_model)
        self.vectorstore = None
        
        # Create persistence directory if needed
        os.makedirs(persist_directory, exist_ok=True)
        
    def parse_email(self, email_message) -> str:
        """
        Extract text content from an email message
        
        :param email_message: mailbox email message
        :return: Extracted text content
        """
        # Extract metadata
        subject = email_message.get('subject', 'No Subject')
        if subject is None:
            subject = 'No Subject'
        
        from_email = email_message.get('from', 'Unknown')
        date = email_message.get('date', 'Unknown Date')
        
        # Handle encoding issues with subject
        if isinstance(subject, bytes):
            try:
                subject = subject.decode('utf-8', errors='ignore')
            except:
                subject = 'Encoding Error in Subject'
        
        # Extract body
        body = ''
        
        # Handle multipart emails
        if email_message.is_multipart():
            for part in email_message.walk():
                content_type = part.get_content_type()
                if content_type == 'text/plain':
                    try:
                        payload = part.get_payload(decode=True)
                        if payload:
                            body += payload.decode('utf-8', errors='ignore')
                    except Exception as e:
                        pass  # Skip problematic parts
        else:
            try:
                payload = email_message.get_payload(decode=True)
                if payload:
                    body = payload.decode('utf-8', errors='ignore')
            except Exception as e:
                body = 'Error extracting body'
        
        # Combine metadata and body
        return f"From: {from_email}\nDate: {date}\nSubject: {subject}\n\n{body}"
    
    def process_emails_in_batches(self, batch_size: int = 500) -> Generator[List[Dict], None, None]:
        """
        Process mbox file in batches to avoid memory issues
        
        :param batch_size: Number of emails to process in each batch
        :return: Generator yielding batches of email documents
        """
        print(f"Opening mbox file: {self.mbox_path}")
        mbox = mailbox.mbox(self.mbox_path)
        total_emails = len(mbox)
        print(f"Found {total_emails} emails in mbox file")
        
        batch = []
        
        for i, message in tqdm(enumerate(mbox), total=total_emails, desc="Processing emails"):
            try:
                email_text = self.parse_email(message)
                batch.append({
                    'page_content': email_text,
                    'metadata': {
                        'source': self.mbox_path,
                        'date': message.get('date', 'Unknown Date'),
                        'from': message.get('from', 'Unknown'),
                        'subject': message.get('subject', 'No Subject')
                    }
                })
                
                # When batch is full, yield it and clear
                if len(batch) >= batch_size:
                    yield batch
                    batch = []
                    
            except Exception as e:
                print(f"Error processing email {i}: {e}")
        
        # Yield any remaining emails
        if batch:
            yield batch
    
    def create_vector_store(self, chunk_size: int = 1000, chunk_overlap: int = 100, batch_size: int = 500):
        """
        Create vector store from processed emails with batching
        
        :param chunk_size: Size of text chunks
        :param chunk_overlap: Overlap between chunks
        :param batch_size: Number of emails to process in each batch
        """
        # Initialize text splitter
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size, 
            chunk_overlap=chunk_overlap
        )
        
        # Check if vectorstore already exists
        if os.path.exists(os.path.join(self.persist_directory, 'chroma.sqlite3')):
            print(f"Loading existing vector store from {self.persist_directory}")
            self.vectorstore = Chroma(
                persist_directory=self.persist_directory,
                embedding_function=self.embeddings
            )
            return
        
        # Create new vectorstore
        print("Creating new vector store")
        self.vectorstore = Chroma(
            persist_directory=self.persist_directory,
            embedding_function=self.embeddings
        )
        
        # Process emails in batches
        batch_count = 0
        total_docs = 0
        
        for batch in self.process_emails_in_batches(batch_size):
            batch_count += 1
            print(f"Processing batch {batch_count} with {len(batch)} emails")
            
            # Split documents into chunks
            split_docs = text_splitter.create_documents(
                [doc['page_content'] for doc in batch],
                metadatas=[doc['metadata'] for doc in batch]
            )
            
            total_docs += len(split_docs)
            print(f"Created {len(split_docs)} chunks from this batch")
            
            # Add documents to vectorstore
            self.vectorstore.add_documents(split_docs)
            
            # Persist after each batch
            self.vectorstore.persist()
            
            print(f"Total chunks in vector store: {total_docs}")
    
    def create_rag_chain(self, model: str = 'llama2'):
        """
        Create RAG chain for querying emails
        
        :param model: Ollama LLM model to use
        """
        if not self.vectorstore:
            print("Vector store not found. Loading or creating...")
            self.create_vector_store()
        
        # Initialize language model
        llm = ChatOllama(model=model, temperature=0.1)
        
        # Create retriever
        retriever = self.vectorstore.as_retriever(
            search_kwargs={'k': 5}  # Return top 5 most relevant documents
        )
        
        # Custom prompt template
        prompt = ChatPromptTemplate.from_template("""
        You are a helpful AI assistant specialized in analyzing emails.
        
        Context information from emails:
        {context}
        
        User Question: {question}
        
        Based on the context of these emails, provide a comprehensive and precise answer.
        If the information is not found in the provided emails, say "I could not find relevant information in the provided emails."
        Include relevant dates, senders, and subjects when appropriate.
        """)
        
        # Create retrieval QA chain
        self.rag_chain = RetrievalQA.from_chain_type(
            llm=llm,
            chain_type='stuff',
            retriever=retriever,
            return_source_documents=True,
            chain_type_kwargs={"prompt": prompt}
        )
    
    def query_emails(self, query: str):
        """
        Query the email corpus
        
        :param query: User's query
        :return: Answer and source documents
        """
        if not hasattr(self, 'rag_chain'):
            self.create_rag_chain()
        
        start_time = time.time()
        result = self.rag_chain({"query": query, "question": query})
        end_time = time.time()
        
        print(f"Query processed in {end_time - start_time:.2f} seconds")
        
        return result['result'], result['source_documents']

# Example usage
if __name__ == '__main__':
    print("Processing...\n")
    # Initialize the agent
    gmail_agent = LargeGmailMboxRAGAgent('/Users/jigneshjain/Documents/top_100_emails.mbox', persist_directory="/Users/jigneshjain/Documents/my_large_email_db")
    
    # Create vector store
    gmail_agent.create_vector_store(batch_size=250)  # Process 250 emails at a time
    
    # Example query
    query = "Summarize unread emails"
    answer, sources = gmail_agent.query_emails(query)
    
    print("\nAnswer:", answer)
    print("\nSource Documents:")
    for i, doc in enumerate(sources[:3]):  # Show first 3 sources
        print(f"\nSource {i+1}:")
        print(f"- Subject: {doc.metadata.get('subject', 'Unknown')}")
        print(f"- From: {doc.metadata.get('from', 'Unknown')}")
        print(f"- Date: {doc.metadata.get('date', 'Unknown')}")

Processing...

Creating new vector store


  self.embeddings = OllamaEmbeddings(model=embedding_model)
  self.vectorstore = Chroma(


Opening mbox file: /Users/jigneshjain/Documents/top_100_emails.mbox
Found 100 emails in mbox file


Processing emails: 100%|███████████████████████████████████████████████████████████████████████████████████████████| 100/100 [00:00<00:00, 507.55it/s]


Processing batch 1 with 100 emails
Created 1761 chunks from this batch


  self.vectorstore.persist()
  llm = ChatOllama(model=model, temperature=0.1)
  result = self.rag_chain({"query": query, "question": query})


Total chunks in vector store: 1761
Query processed in 9.64 seconds

Answer: The email is an auto-generated newsletter from Axis Bank, informing the recipient about their subscription to the TLDR newsletter. The email provides options for managing subscriptions or unsubscribing from future emails.

Based on the context of these emails, I could not find any specific information related to summarizing unread emails. The email solely focuses on providing instructions for managing subscriptions and unsubscribing from the newsletter. Therefore, I cannot provide a summary of unread emails based on this email chain.

Source Documents:

Source 1:
- Subject: Prime =?UTF-8?B?4oK5MjEsNDAwIEFubnVhbCBTYXZpbmdzISDwn5Kw?=
- From: IndusInd Bank <indusind_bank@indusind.com>
- Date: Wed, 15 Jan 2025 04:58:24 +0000 (UTC)

Source 2:
- Subject: Your transfer is confirmed
- From: "Xe Money Transfer" <xe@service.xe.com>
- Date: Thu, 20 Feb 2025 04:24:20 +0000

Source 3:
- Subject: DeepSeek accelerates =?utf-8

In [3]:
query = "Summarize all emails from Avalon bay"
answer, sources = gmail_agent.query_emails(query)

print("\nAnswer:", answer)

Query processed in 11.18 seconds

Answer: Avalon Bay sends emails to residents of Avalon Esterra Park regarding various events and services. The emails are sent from different addresses, including [avalonesterrapark@avalonbay.com](mailto:avalonesterrapark@avalonbay.com) and include the resident's name in the "To" field.

The emails provide information about upcoming events, such as a Breakfast on the Go session today at 9:30 AM, and refer residents to their team for more details. They also offer insurance benefits and a referral program.

The emails are sent from Avalon Bay Communities, Inc., with the email address [avalonbay.com](mailto:avalonbay.com). The emails include the date, subject, and sender's name and email address.

Based on the provided emails, I could not find any specific information about the residents' personal details or preferences.
