# 1. Initial Setup

In [None]:
# Standard library imports
import base64
from datetime import datetime, timedelta, timezone
from email import utils
import chime
import os
import os.path
import sqlite3
import time

# Third-party library imports
from bs4 import BeautifulSoup
import dateutil.parser
from dotenv import load_dotenv
import faiss
import numpy as np
import pyttsx3
import speech_recognition as sr
from tzlocal import get_localzone

# Google API imports
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow

# OpenAI and LangChain imports
from langchain.prompts import ChatPromptTemplate
from langchain_openai.embeddings import OpenAIEmbeddings
from openai import OpenAI

In [None]:
# Parameters
INDEX_NAME = "index_email.index"
DB_FILE = "index_email_metadata.db"
EMBEDDING_DIM = 1536
K = 25 #Number of Fetched Emails for Vector Search 

# Setting up the model & API key
load_dotenv(override=True)
client = OpenAI()
embeddings = OpenAIEmbeddings()

# Speech
engine = pyttsx3.init()
recognizer = sr.Recognizer()

# 2. Utility Functions

## 2.1 Summerize Emails
*Summerize original Email into a shorter brief version*

In [None]:
def summerize_email(mail_from, mail_cc, mail_subject, mail_date, mail_body):
    
    system_content = '''
Summerize the given Email in the following format, keep it brief but don't lose much information:

OUTPUT FORMAT:
<Email Start>
Date and Time:  (format: dd-MMM-yyyy HH h:mmtt [with time zone])
Sender: 
CC:
Subject:
Email Context: 
<Email End>
'''

    assistant_content = f'''
The email is the following: 

date and time: {mail_date}
from: {mail_from}
cc: {mail_cc}
subject: {mail_subject}
body: {mail_body}
'''

    answer = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_content},
            {"role": "assistant", "content": assistant_content}]
    )
    return answer.choices[0].message.content

## 2.2 Gmail API Related Functions
*Authentication, Fetching Emails, Emails Datetime Tracking,.etc.*

In [None]:
# Define the SCOPES for Gmail API
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# Authenticate and create the service
def authenticate_gmail():
    creds = None
    token_file = 'token.json'
    
    if os.path.exists(token_file):
        creds = Credentials.from_authorized_user_file(token_file, SCOPES)
    if not creds or not creds.valid:
        #if creds and creds.expired and creds.refresh_token:
        #    creds.refresh(Request())
        #else:
        flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
        creds = flow.run_local_server(port=0)
        with open(token_file, 'w') as token:
            token.write(creds.to_json())

    service = build('gmail', 'v1', credentials=creds)
    return service

def clean_html(html_content):
    """ Clean HTML content and extract plain text. """
    soup = BeautifulSoup(html_content, 'html.parser')
    return soup.get_text("\n", strip=True)

def get_plain_text_body(parts):
    """ Recursively extract plain text from MIME parts, with fallback to cleaned HTML if necessary. """
    plain_text = None
    html_text = None
    
    for part in parts:
        mime_type = part['mimeType']
        if 'parts' in part:
            # Recursively process nested parts
            text = get_plain_text_body(part['parts'])
            if text:
                return text
        elif mime_type == 'text/plain' and 'data' in part['body']:
            plain_text = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8')
        elif mime_type == 'text/html' and 'data' in part['body']:
            html_body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8')
            html_text = clean_html(html_body)

    return plain_text if plain_text else html_text

def get_message_details(service, user_id, msg_id):
    try:
        message = service.users().messages().get(userId=user_id, id=msg_id, format='full').execute()
        headers = message['payload']['headers']
        details = {header['name']: header['value'] for header in headers if header['name'] in ['From', 'Cc', 'Subject', 'Date']}

        payload = message['payload']
        if 'parts' in payload:
            details['Body'] = get_plain_text_body(payload['parts'])
        elif 'data' in payload['body']:
            body = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8')
            details['Body'] = clean_html(body)
        else:
            details['Body'] = None
        
        return details
    except Exception as error:
        print(f'An error occurred: {error}')
        return None

def list_messages(service, user_id, query=''):
    try:
        messages = []
        request = service.users().messages().list(userId=user_id, q=query)
        while request is not None:
            response = request.execute()
            if 'messages' in response:
                messages.extend(response['messages'])
            request = service.users().messages().list_next(request, response)
        return messages
    except Exception as error:
        print(f'An error occurred: {error}')
        return None

# Get the timestamp of the last fetched email in UNIX format
def get_last_checked_time():
    try:
        with open('last_checked.txt', 'r') as file:
            #return datetime.datetime.fromtimestamp(int(file.read().strip()))
            return dateutil.parser.parse(file.read().strip())
    except FileNotFoundError:
        # If the file doesn't exist, use a default very old UNIX timestamp
        return datetime(1970, 1, 1, 0, 0, 0, tzinfo=timezone.utc)

# Update the timestamp of the last fetched email in UNIX format
def update_last_checked_time(timestamp):
    with open('last_checked.txt', 'w') as file:
        file.write(str(timestamp))

## 2.3 Embedding, Vector/Meta Store Operations

In [None]:
# Embedding
def get_embedding(text):
    return np.array(embeddings.embed_query(text)).reshape(1, -1)

# Vector Store
def get_index():
    # Check if the vector store (Faiss index) exists
    if os.path.exists(INDEX_NAME):
        # Load the existing Faiss index
        return faiss.read_index(INDEX_NAME)
        print("Vector store loaded from existing file.")
    else:
        # Create a new Faiss index
        return faiss.IndexFlatL2(EMBEDDING_DIM)
        print("New vector store created.")

def initiate_meta_store():
    '''USAGE: conn, cursor = initiate_meta_store()'''
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS Metadata (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            text TEXT NOT NULL
        )
        ''')
    return (conn, cursor)

def terminate_meta_store(conn):
    conn.commit()
    conn.close()

def insert_email_record(full_email, index, cursor):
    embedding = get_embedding(full_email)
    index.add(embedding)
    cursor.execute("INSERT INTO Metadata (text) VALUES (?)", (full_email,))

## 2.4 Speech and Logging

In [None]:
def log_message_with_voice(text):
    engine.say(text)
    print("## KARVIS:",text)
    engine.runAndWait()

# 3. Service Functions

## 3.1 Load Emails
*Read emails, Get their embeddings and insert it into vector store, insert original text in metastore.*

In [None]:
#Read Emails
def load_emails():
    i = 1
    service = authenticate_gmail()
    # Load the last checked time in UNIX timestamp format
    latest_timestamp = get_last_checked_time()
    query = f'after:{(latest_timestamp - timedelta(days=1)).strftime('%Y/%m/%d')}'
    messages = list_messages(service, 'me', query)
    if not messages:
        print('(EMAILS LOADER): No messages found.')
    else:
        session_latest_timestamp = latest_timestamp
        index = get_index()
        conn, cursor = initiate_meta_store()
        for msg in messages:
            msg_id = msg['id']
            details = get_message_details(service, 'me', msg_id)
            if details:
                message_datetime = utils.parsedate_to_datetime(details['Date'])
                # Filter by time (Gmail API doesn't query by time, only by date. so the following line to do it manually)
                if  message_datetime <= session_latest_timestamp:
                    break

                # Email/Record Logic 
                mail_from = details.get('From')
                mail_cc = details.get('Cc')
                mail_subject = details.get('Subject')
                mail_body = details.get('Body')

                full_email = summerize_email(mail_from, mail_cc, mail_subject, message_datetime, mail_body)

                insert_email_record(full_email, index, cursor)
                
                print(f"(EMAILS LOADER): Email # {i} is detected and inserted: ({message_datetime}), ({mail_subject}).")
                i += 1

                # Update the latest timestamp based on email timestamps
                try:
                    if message_datetime > latest_timestamp:
                        latest_timestamp = message_datetime
                except ValueError as e:
                    print(f"(EMAILS LOADER): Error parsing date: {e}")

        #Save and Close Connections
        terminate_meta_store(conn)
        faiss.write_index(index, INDEX_NAME)
        print("(EMAILS LOADER): Vector store and metadata saved.")
        # Update the last checked time in UNIX timestamp format, if new Email/s found
        if latest_timestamp > session_latest_timestamp:
            update_last_checked_time(latest_timestamp)

load_emails()

## 3.2 Content Search
*Search most (K) similar emails from vector-store and return their original text.* 

In [None]:
def Vector_Search(query, demo = False, k = K):
    # Vector store
    index = get_index()
    # Meta store
    conn, cursor = initiate_meta_store()
    # Generate the embedding for the query
    query_embedding = get_embedding(query)
    #Performe the search and retrieve (k) best matches
    distances, indices = index.search(query_embedding, k)
    # Decode the embeddings: Retrieve original texts from SQLite
    decoded_texts = []
    for idx in indices[0]:
        cursor.execute(f"SELECT text FROM Metadata WHERE id={idx + 1}")  # SQLite is 1-based index
        decoded_texts.append(cursor.fetchone()[0])
    # Close the database connection
    conn.close()
    # Testing/Demo
    if demo:
        # Output the results
        print("Decoded texts of nearest neighbors:")
        for text in decoded_texts:
            print("*********************************************")
            print("########",text[31:56])
            print(text)
        print("*********************************************")
        #print("Decoded texts of nearest neighbors:", decoded_texts)
        print("Distances to nearest neighbors:", distances)
        
    return decoded_texts

#query = "XXXXXXXXXXXXXXX"
#Vector_Search(query,demo = True)

## 3.3 RAG query to LLM model
*Construct the prompt with the related context and talk to the LLM model with it* 

In [None]:
def ask_question(question, messages=None):
    if messages is None:
        related_emails = Vector_Search(question)
        system_content = (
            "You are an AI assistant with access to a collection of emails."
            "Below, you'll find the most relevant emails retrieved for the user's question. Your job is to answer the question based on the provided emails."
            "If you cannot find the answer, please politely inform the user. "
            "Answer in a very short brief, and informative manner."
        )

        local_timezone = get_localzone()
        
        assistant_content = f"Today's Datetime is {datetime.now(local_timezone)}"
        for i, email in enumerate(related_emails):
            assistant_content += f"Email({i+1}):\n\n{email}\n\n"
            #print(i, email)
        
        # Initialize messages with system content and assistant content
        messages = [
            {"role": "system", "content": system_content + "\n\n" + assistant_content},
        ]
    
    # Append the new user message
    messages.append({"role": "user", "content": question})
    
    # Get the assistant's response
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages
    )
    assistant_reply = response.choices[0].message.content
    
    # Append assistant's reply to messages
    messages.append({"role": "assistant", "content": assistant_reply})

    #print(assistant_reply)
    return messages, assistant_reply  # Return messages for the next turn

# 4. Controller Functions
**Programme Enty-Point**

## 4.1 Text Chat

In [None]:
def start_chat():
    
    print("(System): Starting a new conversaion..")
    print("(System): type \"exit\" to quit the chat.")
    print("(System): type \"new conversation\" to start over")
    
    new_conversation = True
    load_emails()
    
    while True:
        query = input("## You: ")
        if query.lower() == "new conversation":
            print("(System): Starting a new conversation..")
            new_conversation = True
            load_emails()
            continue
        elif query.lower() == "exit":
            print("(System): Exiting... Thank you!")
            break
        else:
            if new_conversation:
                messages, assistant_reply  = ask_question(query)
                print("## Model:", assistant_reply)
                if assistant_reply == "I can't find the answer.":
                    continue
            else:
                messages, assistant_reply  =  ask_question(query, messages=messages)
                print("## Model:", assistant_reply)
                
        new_conversation = False
        
start_chat()

## 4.2 Voice Chat

In [None]:
# Ping tone setting
chime.theme('big-sur')

def KARVIS():
    
    print("(System): Starting a new conversaion..")
    print("(System): type \"exit\" to quit the chat.")
    print("(System): type \"new conversation\" to start over")
    
    new_conversation = True
    load_emails()
    while True:
    
        with sr.Microphone() as source:
            print("(System): Listening...")
            chime.success() # Ping to indecate listening
            time.sleep(0.5)
            audio = recognizer.listen(source)
            
        try:
            query = recognizer.recognize_google(audio)
            print("## You   :",query)
            if query.lower() == "new conversation":
                log_message_with_voice("Starting a new conversation..")
                new_conversation = True
                load_emails()
                continue
            elif query.lower() == "exit":
                log_message_with_voice("Exiting... Thank you!")
                break
            else:
                if new_conversation:
                    messages, assistant_reply  = ask_question(query)
                    log_message_with_voice(assistant_reply)
                    if assistant_reply == "I can't find the answer.":
                        continue
                else:
                    messages, assistant_reply  =  ask_question(query, messages=messages)
                    log_message_with_voice(assistant_reply)
                    
            new_conversation = False
            
            #print(f"You said: {query}")
            
        except sr.UnknownValueError:
            log_message_with_voice("Could not understand audio")
        except sr.RequestError:
            log_message_with_voice("Could not request results; check your internet connection")

KARVIS()
    
