In [None]:
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.tools import tool
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition,InjectedState
from langchain_core.messages import (
    HumanMessage,
    ToolMessage,
)
from langgraph.types import Command
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.tools.base import InjectedToolCallId

from typing_extensions import TypedDict
from typing import Annotated
#get graph visuals
from IPython.display import Image, display
from langchain_core.runnables.graph import MermaidDrawMethod
import os
from dotenv import load_dotenv 

import os.path
import base64
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from email.message import EmailMessage
from langgraph.store.memory import InMemoryStore


load_dotenv()
GOOGLE_API_KEY=os.getenv('google_api_key')

In [2]:
GEMINI_MODEL='gemini-2.0-flash'
llm = ChatGoogleGenerativeAI(google_api_key=GOOGLE_API_KEY, model=GEMINI_MODEL, temperature=0.3)

In [3]:
class State(TypedDict):
    """
    A dictionnary representing the state of the agent.
    """
    messages: Annotated[list, add_messages]
    inbox: dict
    current_draft: dict
    drafts: dict

In [19]:
store=InMemoryStore()

NameError: name 'InMemoryStore' is not defined

In [4]:
if os.path.exists("token.json"):
    creds = Credentials.from_authorized_user_file("token.json")
try:
    # create gmail api client
    service = build("gmail", "v1", credentials=creds)

except HttpError as error:
    print(f"An error occurred: {error}")

In [5]:
# import google.auth
# from googleapiclient.discovery import build
# from googleapiclient.errors import HttpError


# def show_chatty_threads():
#   """Display threads with long conversations(>= 3 messages)
#   Return: None

#   Load pre-authorized user credentials from the environment.
#   TODO(developer) - See https://developers.google.com/identity
#   for guides on implementing OAuth2 for the application.
#   """
# #   creds, _ = google.auth.default()
#   if os.path.exists("token.json"):
#     creds = Credentials.from_authorized_user_file("token.json", SCOPES)
#   try:
#     # create gmail api client
#     service = build("gmail", "v1", credentials=creds)

#     # pylint: disable=maybe-no-member
#     # pylint: disable:R1710
#     threads = (
#         service.users().threads().list(userId="me").execute().get("threads", [])
#     )
#     for thread in threads:
#       tdata = (
#           service.users().threads().get(userId="me", id=thread["id"]).execute()
#       )
#       nmsgs = len(tdata["messages"])

#       # skip if <3 msgs in thread
#       if nmsgs > 2:
#         msg = tdata["messages"][0]["payload"]
#         subject = ""
#         for header in msg["headers"]:
#           if header["name"] == "Subject":
#             subject = header["value"]
#             break
#         if subject:  # skip if no Subject line
#           print(f"- {subject}, {nmsgs}")
#     return threads

#   except HttpError as error:
#     print(f"An error occurred: {error}")


# if __name__ == "__main__":
#   show_chatty_threads()

In [6]:
@tool
def get_new_mail(maxResults: int ,tool_call_id: Annotated[str, InjectedToolCallId])-> Command:

    """
    Tool to get the latest emails
    args: maxResults: int - the number of emails to return
    return: a dict structured as such {email_id:{email content}}
    """
    ids=service.users().messages().list(userId='me', includeSpamTrash=False , maxResults=maxResults).execute().get('messages',[])
    messages={}
    errors=[]
    for id in ids:
        mdata=service.users().messages().get(userId="me", id=id["id"], format='full' ).execute()
        id=mdata.get('id')
        thread=mdata.get('threadId')
        label=mdata.get('labelIds')
        headers={h.get('name'):h.get('value') for h in mdata.get('payload').get('headers')}
        sender=headers.get('From')
        date=headers.get('Date')
        receiver=headers.get('To')
        subject=headers.get('Subject')
        snippet=mdata.get('snippet')
        try:
            body=base64.urlsafe_b64decode(mdata['payload']['parts'][0]['body']['data'].encode("utf-8")).decode("utf-8")
        except:
            try:

                body=base64.urlsafe_b64decode(mdata['payload']['parts'][0]['parts'][0]['body']['data'].encode("utf-8")).decode("utf-8")
            except:
                try: 

                    body=base64.urlsafe_b64decode(mdata['payload']['body']['data'].encode("utf-8")).decode("utf-8")
                except:
                    errors.append(mdata)
        messages[id]={'From':sender,
                    'To':receiver,
                    'Date':date,
                    'label':label,
                    'subject':subject,
                    'Snippet':snippet,
                    'email_id':id,
                    'thread':thread,
                    'body':body
                    }  
    return Command(update={'inbox':messages,
                'messages': [ToolMessage(messages,tool_call_id=tool_call_id)]})

In [7]:
@tool
def create_email(receiver: str, content: str, email_subject:str, tool_call_id: Annotated[str, InjectedToolCallId]) -> Command:
    """
    Tool to create a draft of an email
    to create an email you have to make a draft using this tool
    agrs: receiver - the email adress of the person to send the email to
          content - the body of the email
          email_subject - the subject of the email
    """
 
    message = EmailMessage()

    message.set_content(content)

    message["To"] = receiver
    message["From"] = "me"
    message["Subject"] = email_subject

    # encoded message
    encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
    
    create_message = {"raw": encoded_message}
    return Command(update={'current_draft':create_message,
            'messages': [ToolMessage(f'display this as is: {message}, always show the email address the email is sent to and ask if it should be sent.',tool_call_id=tool_call_id)]})


        

In [8]:
@tool
def verify_draft(state: Annotated[dict, InjectedState],tool_call_id: Annotated[str, InjectedToolCallId]):
    """tool to verify the current draft
    
    args: none
    """
    create_message=state['current_draft']
    decoded=base64.urlsafe_b64decode(create_message.get('message').get('raw').encode("utf-8")).decode("utf-8")
    return Command(update={'messages':[ToolMessage(f'display this draft: {decoded} and ask if it should be sent.',tool_call_id=tool_call_id)]})

In [9]:
# draft = (
#             service.users()
#             .drafts()
#             .create(userId="me", body=create_message)
#             .execute()
#         )

#         current_draft={}
#         d=service.users().drafts().get(userId="me", id=draft['id'] , format='full' ).execute()
#         current_draft['id']=d['id']
#         current_draft['snippet']=d['message']['snippet']
#         headers={h['name']:h['value'] for h in d['message']['payload']['headers']}
#         current_draft['To']=headers['To']
#         try:
#             current_draft['body']=base64.urlsafe_b64decode(d['message']['payload']['parts'][0]['body']['data'].encode("utf-8")).decode("utf-8")
#         except:
#             try:
#                 current_draft['body']=base64.urlsafe_b64decode(d['message']['payload']['parts'][0]['parts'][0]['body']['data'].encode("utf-8")).decode("utf-8")
#             except:
#                 current_draft['body']=base64.urlsafe_b64decode(d['message']['payload']['body']['data'].encode("utf-8")).decode("utf-8")
        
#         return Command(update={'current_draft':current_draft,
#                 'messages': [ToolMessage(f'successfully created a draft ',tool_call_id=tool_call_id)]})
#     except:
#         return Command(update={'current_draft':draft,
#             'messages': [ToolMessage(f'created draft but failed to make it more readable ',tool_call_id=tool_call_id)]})

In [10]:
# @tool
# def show_current_draft(state: Annotated[dict, InjectedState], tool_call_id: Annotated[str, InjectedToolCallId]) -> Command:
#     """
#     tool to get the info and show the current_draft
#     args: none
#     returns: the current draft
#     """
#     try:
#         return Command(update={'messages': [ToolMessage(f'{state['current_draft']} ',tool_call_id=tool_call_id)]})
#     except: 
#         return Command(update={'messages': [ToolMessage(f'failed to get the current draft ',tool_call_id=tool_call_id)]})

In [11]:
@tool
def send_email(state: Annotated[dict, InjectedState],tool_call_id: Annotated[str, InjectedToolCallId]) -> Command:
    """
    Tool to send the current draft
    args: - none
    """
    try:
        create_message=state['current_draft']
        # pylint: disable=E1101
        send_message = (
            service.users()
            .messages()
            .send(userId="me", body=create_message)
            .execute()
        )
        return Command(update={'messages': [ToolMessage(f'email sent! ',tool_call_id=tool_call_id)]})
    except:
        return Command(update={'messages': [ToolMessage(f'failed to send draft ',tool_call_id=tool_call_id)]})

In [12]:
@tool
def show_inbox(state: Annotated[dict, InjectedState],tool_call_id: Annotated[str, InjectedToolCallId]) -> Command:
    """
    tool to get the info and show the emails in the inbox
    args: none
    returns: the emails in the inbox
    """
     
    try:
        return state['inbox']
    except: 
        ids=service.users().messages().list(userId='me', includeSpamTrash=False, maxResults=10 ).execute().get('messages',[])
        messages={}
        errors=[]
        for id in ids:
            mdata=service.users().messages().get(userId="me", id=id["id"], format='full' ).execute()
            id=mdata.get('id')
            thread=mdata.get('threadId')
            label=mdata.get('labelIds')
            headers={h.get('name'):h.get('value') for h in mdata.get('payload').get('headers')}
            sender=headers.get('From')
            date=headers.get('Date')
            receiver=headers.get('To')
            subject=headers.get('Subject')
            snippet=mdata.get('snippet')
            try:
                body=base64.urlsafe_b64decode(mdata['payload']['parts'][0]['body']['data'].encode("utf-8")).decode("utf-8")
            except:
                try:

                    body=base64.urlsafe_b64decode(mdata['payload']['parts'][0]['parts'][0]['body']['data'].encode("utf-8")).decode("utf-8")
                except:
                    try: 

                        body=base64.urlsafe_b64decode(mdata['payload']['body']['data'].encode("utf-8")).decode("utf-8")
                    except:
                        errors.append(mdata)
            messages[id]={'From':sender,
                        'To':receiver,
                        'Date':date,
                        'label':label,
                        'subject':subject,
                        'Snippet':snippet,
                        'email_id':id,
                        'thread':thread,
                        'body':body
                        }   
            return Command(update={'inbox':messages,
                        'messages': [ToolMessage(messages,tool_call_id=tool_call_id)]})
        

In [13]:
@tool
def display_email(id: str, state: Annotated[dict, InjectedState],tool_call_id: Annotated[str, InjectedToolCallId]) -> Command:

    """
    tool to display a specific email
    args: id - the id associated with the email to read
    returns: the body of the email
    """
    
    try:
        if id in state['inbox']:
            email=state['inbox'].get(str(id))
            return Command(update={'messages': [ToolMessage(email,tool_call_id=tool_call_id)]})
    except: 
        return Command(update={'messages': [ToolMessage(f'failed to get the email ',tool_call_id=tool_call_id)]})

In [14]:
# @tool
# def list_drafts(tool_call_id: Annotated[str, InjectedToolCallId]) -> Command:
#     """
#     tool to get and list all the drafts
#     args: - none
#     """
#     try:
#         draft=service.users().drafts().list(userId='me', includeSpamTrash=False ).execute()
#         drafts={}
#         for id in draft['drafts']:
#             try:
#                 d=service.users().drafts().get(userId="me", id=id['id'] , format='full' ).execute()
#                 id=d['id']
#                 snippet=d['message']['snippet']
#                 headers={h['name']:h['value'] for h in d['message']['payload']['headers']}
#                 receiver=headers['To']
#                 try:
#                     body=base64.urlsafe_b64decode(d['message']['payload']['parts'][0]['body']['data'].encode("utf-8")).decode("utf-8")
#                 except:
#                     try:
#                         body=base64.urlsafe_b64decode(d['message']['payload']['parts'][0]['parts'][0]['body']['data'].encode("utf-8")).decode("utf-8")
#                     except:
#                         body=base64.urlsafe_b64decode(d['message']['payload']['body']['data'].encode("utf-8")).decode("utf-8")
            
#                 drafts[id]={
#                                 'To':receiver,
#                                 'Snippet':snippet,
#                                 'draft_id':id,
#                                 'body':body
#                                 }
#             except:
#                 drafts[id]=d
#         try:
#             return Command(update={'drafts':drafts,
#                 'messages': [ToolMessage(f'{drafts} ',tool_call_id=tool_call_id)]})
#         except: 
#             return Command(update={'messages': [ToolMessage(f'failed to get the drafts',tool_call_id=tool_call_id)]})
#     except:
#         return Command(update={'messages': [ToolMessage(f'no drafts to get',tool_call_id=tool_call_id)]})


In [None]:
class gmail_agent:
    def __init__(self,llm : any):
        self.agent=self._setup(llm)
    def _setup(self,llm):
        langgraph_tools=[get_new_mail,create_email,display_email,show_inbox,verify_draft,send_email]



        graph_builder = StateGraph(State)

        # Modification: tell the LLM which tools it can call
        llm_with_tools = llm.bind_tools(langgraph_tools)
        tool_node = ToolNode(tools=langgraph_tools)
        def chatbot(state: State):
            """ emails assistant that answers user questions about their emails.
            Depending on the request, leverage which tools to use if necessary."""
            return {"messages": [llm_with_tools.invoke(state['messages'])]}

        graph_builder.add_node("chatbot", chatbot)

        graph_builder.add_node("tools", tool_node)
        # Any time a tool is called, we return to the chatbot to decide the next step
        graph_builder.set_entry_point("chatbot")
        graph_builder.add_edge("tools", "chatbot")
        graph_builder.add_conditional_edges(
            "chatbot",
            tools_condition,
        )
        memory=MemorySaver()
        graph=graph_builder.compile(checkpointer=memory,store=store)
        return graph
        

    def display_graph(self):
        return display(
                        Image(
                                self.agent.get_graph().draw_mermaid_png(
                                    draw_method=MermaidDrawMethod.API,
                                )
                            )
                        )
    def stream(self,input:str):
        config = {"configurable": {"thread_id": "1"}}
        input_message = HumanMessage(content=input)
        for event in self.agent.stream({"messages": [input_message]}, config, stream_mode="values"):
            event["messages"][-1].pretty_print()

    def chat(self,input:str):
        config = {"configurable": {"thread_id": "1"}}
        response=self.agent.invoke({'messages':HumanMessage(content=str(input))},config)
        return response['messages'][-1].content
    
    def get_state(self, state_val:str):
        config = {"configurable": {"thread_id": "1"}}
        return self.agent.get_state(config).values[state_val]

In [16]:
agent=gmail_agent(llm)

In [18]:
agent.stream('show the inbox')


show the inbox
Tool Calls:
  show_inbox (22e84e34-fa31-4df3-ba4a-4bde8b031498)
 Call ID: 22e84e34-fa31-4df3-ba4a-4bde8b031498
  Args:
Name: show_inbox

{"19549310ad6bb546": {"From": "LinkedIn <jobs-noreply@linkedin.com>", "To": "tristan padiou <padioutristan@gmail.com>", "Date": "Thu, 27 Feb 2025 20:55:03 +0000 (UTC)", "label": ["UNREAD", "CATEGORY_UPDATES", "INBOX"], "subject": "New jobs similar to Manager, Data Science - Marketing and Sales at Figma", "Snippet": "Jobs similar to Manager, Data Science - Marketing and Sales at Figma ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏", "email_id": "19549310ad6bb546", "thread": "19549310ad6bb546", "body": "Jobs similar to Manager, Data Science - Marketing and Sales at Figma https://www.linkedin.com/comm/jobs/view/4093029955?refId=D10UFDAmQMC8KHVSd0SrQQ%3D%3D&trackingId=jJJf94w1S12kapgwh%2B04Og%3D%3D&lipi=urn%3Ali%3Apage%3Aemail_email_jobs_viewed_job_reminde