In [1]:
from openai import OpenAI
import os
from dotenv import load_dotenv, find_dotenv
import os.path
import json

import time
import logging
from datetime import datetime

from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

# If modifying these scopes, delete the file token.json.
SCOPES = ["https://www.googleapis.com/auth/gmail.modify"]

load_dotenv(find_dotenv())
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
client = OpenAI(api_key=OPENAI_API_KEY)

In [2]:
creds = None
  # The file token.json stores the user's access and refresh tokens, and is
  # created automatically when the authorization flow completes for the first
  # time.
""" if os.path.exists("token.json"):
    creds = Credentials.from_authorized_user_file("token.json", SCOPES) """
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
    if creds and creds.expired and creds.refresh_token:
        creds.refresh(Request())
    else:
        flow = InstalledAppFlow.from_client_secrets_file(
            "credentials.json", SCOPES
        )
        creds = flow.run_local_server(port=4000)
    # Save the credentials for the next run
    with open("token.json", "w") as token:
        token.write(creds.to_json())

service = None
try:
    # Call the Gmail API
    service = build("gmail", "v1", credentials=creds)

except HttpError as error:
# TODO(developer) - Handle errors from gmail API.
    print(f"An error occurred: {error}")

Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=915927531429-jb5jkim9325sqr9bun1hvo1evtc961rq.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A4000%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fgmail.modify&state=VIZaiKEUQ6MVtPUt2lo0sOkyhsTD6I&access_type=offline


In [3]:
#Gmail API Calls
def get_email(id):
    try:
        result = service.users().messages().get(userId='me', id=id).execute()

        if not result:
            print("Message not found.")
            return
        print("Message:" + result)

    except HttpError as error:
    # TODO(developer) - Handle errors from gmail API.
        print(f"An error occurred: {error}")
    return result

def get_email_list():
    try:
        results = service.users().messages().list(userId="me").execute()
        messages = results.get("messages", [])

        if not messages:
            print("No messages found.")
            return
        print("Message:")
        for message in messages:
            print(message)

    except HttpError as error:
    # TODO(developer) - Handle errors from gmail API.
        print(f"An error occurred: {error}")
    return messages

In [4]:
# Creates a new assistant
assistant = client.beta.assistants.create(
    name = "Gmail Helper",
    instructions = "Your go-to assistant for reading and editing emails. Gmail Helper is a personal assistant dedicated to reading and sending emails, primarily through Google Gmail API integration. Its core functionality includes checking the user’s email to read and summarize emails they receive. It is capable of sending emails to the user's contacts but will always seek confirmation from the user before doing so, ensuring that the user has control over what they send. Gmail Helper maintains a professional demeanor, focusing on providing clear, concise, and accurate scheduling information. It prioritizes user privacy and confidentiality.The assistant is also equipped to guide users through the Google Gmail API’s functionalities and troubleshoot common issues, using language tailored to the user’s level of technical expertise. Overall, Gmail Helper adopts a polite, efficient, and accommodating tone, aiming to make email management and event scheduling as smooth and user-friendly as possible.",
    tools = [{
      "type": "function",
      "function": {
        "name": "get_email_list",
        "description": "Get a list of the user's mailbox contents",
      }
    }, {
      "type": "function",
      "function": {
        "name": "get_email",
        "description": "Get the contents of an email",
        "parameters": {
          "type": "object",
          "properties": {
              "id": {
                  "type": "string",
                  "description": "Email ID",
              },
          },
          "required": ["id"],
        }
      } 
  }],
    model = "gpt-4"
)

In [5]:
# Assistant class that can make Gmail API calls
class GmailHelper:
    thread_id = None
    assistant_id = None

    def __init__(self):
        self.client = client
        self.model = "gpt-4"
        self.assistant = None
        self.thread = None
        self.run = None

        if GmailHelper.assistant_id:
            self.assistant = self.client.beta.assistants.retrieve(assistant_id=GmailHelper.assistant_id)

        if GmailHelper.thread_id:
            self.thread = self.client.beta.threads.retrieve(thread_id=GmailHelper.thread_id)

    def create_assistant(self, name, instructions, tools):
        if not self.assistant:
            assistant_obj = self.client.beta.assistants.create(
                name=name,
                instructions=instructions,
                tools=tools,
                model=self.model
            )
            GmailHelper.assistant_id = assistant_obj.id
            self.assistant = assistant_obj
            print(f"Assistant created with ID: {self.assistant.id}")

    def create_thread(self):
        if not self.thread:
            thread_obj = self.client.beta.threads.create()
            GmailHelper.thread_id = thread_obj.id
            self.thread = thread_obj
            print(f"Thread created with ID: {self.thread.id}")

    def add_message_to_thread(self, role, content):
        if self.thread:
            self.client.beta.threads.messages.create(
                thread_id = self.thread.id,
                role = role,
                content = content
            )
    
    def run_assistant(self, instructions):
        if self.thread and self.assistant:
            self.run = self.client.beta.threads.runs.create(
                thread_id = self.thread_id,
                assistant_id = self.assistant_id,
                instructions = instructions
            )

    def print_thread(self):
        if self.thread:
            messages = self.client.beta.threads.messages.list(thread_id = self.thread_id)

            for message in messages:
                role = message.role
                content = message.content[0].text.value
                print(role + ': ' + content)

    def call_required_functions(self, required_actions):
        if not self.run:
            return
        tool_outputs = []

        for action in required_actions["tool_calls"]:
            func_name = action["function"]["name"]
            arguments = json.loads(action["function"]["arguments"])

            if func_name == "get_email_list":
                output = get_email_list()
            elif func_name == "get_email":
                output = get_email(id=arguments["id"])

            final_str = ""
            for item in output:
                final_str += "".join(item)

            print(f"Tool output: {output}")
            tool_outputs.append({"tool_call_id": action["id"], "output": final_str})

        self.client.beta.threads.runs.submit_tool_outputs(
            thread_id = self.thread.id,
            run_id = self.run.id,
            tool_outputs = tool_outputs
        )
    
    def wait_for_completion(self):
        if self.thread and self.run:
            while True:
                time.sleep(5)
                run_status = self.client.beta.threads.runs.retrieve(
                    thread_id = self.thread.id,
                    run_id = self.run.id
                )

                if run_status.status == "completed":
                    break
                elif run_status.status == "requires_action":
                    self.call_required_functions(required_actions=run_status.required_action.submit_tool_outputs.model_dump())

    def run_steps(self):
        run_steps = self.client.beta.threads.runs.steps.list(
            thread_id = self.thread.id,
            run_id = self.run.id
        )
        print(f"Run steps: {run_steps}")
        return run_steps.data


In [6]:
manager = GmailHelper()

In [7]:
manager.create_assistant(
    name = "Gmail Helper",
    instructions = "Your go-to assistant for reading and editing emails. Gmail Helper is a personal assistant dedicated to reading and sending emails, primarily through Google Gmail API integration. Its core functionality includes checking the user’s email to read and summarize emails they receive. It is capable of sending emails to the user's contacts but will always seek confirmation from the user before doing so, ensuring that the user has control over what they send. Gmail Helper maintains a professional demeanor, focusing on providing clear, concise, and accurate scheduling information. It prioritizes user privacy and confidentiality.The assistant is also equipped to guide users through the Google Gmail API’s functionalities and troubleshoot common issues, using language tailored to the user’s level of technical expertise. Overall, Gmail Helper adopts a polite, efficient, and accommodating tone, aiming to make email management and event scheduling as smooth and user-friendly as possible.",
    tools = [{
      "type": "function",
      "function": {
        "name": "get_email_list",
        "description": "Get a list of the user's mailbox contents",
      }
    }, {
      "type": "function",
      "function": {
        "name": "get_email",
        "description": "Get the contents of an email",
        "parameters": {
          "type": "object",
          "properties": {
              "id": {
                  "type": "string",
                  "description": "The ID of the email to be retrieved",
              },
          },
          "required": ["id"],
        }
      } 
  }]
)

manager.create_thread()

manager.add_message_to_thread(
    role = "user",
    content = "What are my latest emails?"
)

manager.run_assistant(instructions="Get the user's latest emails")
manager.wait_for_completion()

manager.print_thread()

Assistant created with ID: asst_OMFTNVmqjwCsGsMHrEnbDybo
Thread created with ID: thread_OYfqnz1kqZRiTvrrFUPLLHPS
Message:
{'id': '18df22a75161b7f1', 'threadId': '18df22a3997106a8'}
{'id': '18df225f27e1f449', 'threadId': '18df225f27e1f449'}
{'id': '18df22407a8640fc', 'threadId': '18df222e7e511d23'}
{'id': '18de8b8714db0604', 'threadId': '18de8b7251cdf5a7'}
{'id': '18de893c00e18a79', 'threadId': '18de88a6d840c251'}
{'id': '18de885f11826280', 'threadId': '18de883f12d7b3a5'}
{'id': '18de880ae1b5fef8', 'threadId': '18de87e58a4c9ee6'}
{'id': '18de87dc25afef15', 'threadId': '18de87dc25afef15'}
{'id': '18de87bebf3201ef', 'threadId': '18de87a30d46742c'}
{'id': '18de879583e990e2', 'threadId': '18de877096d1555d'}
{'id': '18de876f41ab2bd5', 'threadId': '18de8767d1a745dd'}
{'id': '18de8730936d61a5', 'threadId': '18de87072eb6b747'}
{'id': '18de86add7cd9bc2', 'threadId': '18de86925a76db73'}
{'id': '18de86817af56850', 'threadId': '18de86817af56850'}
{'id': '18de77c3870f442f', 'threadId': '18de777fb32a

In [8]:
thread = client.beta.threads.create()
print(thread)

Thread(id='thread_NTgsiCvKzUXfhABucDUy2hjq', created_at=1709316563, metadata={}, object='thread')


In [9]:
message = client.beta.threads.messages.create(
    thread_id = thread.id,
    role = "user",
    content = "Show me my latest email."
)

In [10]:
print(message)

ThreadMessage(id='msg_8dTHykcE4gUTknMR7YMuKact', assistant_id=None, content=[MessageContentText(text=Text(annotations=[], value='Show me my latest email.'), type='text')], created_at=1709316565, file_ids=[], metadata={}, object='thread.message', role='user', run_id=None, thread_id='thread_NTgsiCvKzUXfhABucDUy2hjq')


In [11]:
run = client.beta.threads.runs.create(
    thread_id = thread.id,
    assistant_id = assistant.id
)

In [12]:
run = client.beta.threads.runs.retrieve(
    thread_id = thread.id,
    run_id = run.id
)

In [13]:
def wait_for_run_completion(client, thread_id, run_id, sleep_interval=5):
    """

    Waits for a run to complete and prints the elapsed time.:param client: The OpenAI client object.
    :param thread_id: The ID of the thread.
    :param run_id: The ID of the run.
    :param sleep_interval: Time in seconds to wait between checks.
    """
    while True:
        try:
            run = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run_id)
            if run.completed_at:
                elapsed_time = run.completed_at - run.created_at
                formatted_elapsed_time = time.strftime(
                    "%H:%M:%S", time.gmtime(elapsed_time)
                )
                print(f"Run completed in {formatted_elapsed_time}")
                logging.info(f"Run completed in {formatted_elapsed_time}")
                # Get messages here once Run is completed!
                messages = client.beta.threads.messages.list(thread_id=thread_id)
                last_message = messages.data[0]
                response = last_message.content[0].text.value
                print(f"Assistant Response: {response}")
                return response
        except Exception as e:
            logging.error(f"An error occurred while retrieving the run: {e}")
            break
        logging.info("Waiting for run to complete...")
        time.sleep(sleep_interval)

In [14]:
def checkingStatus(thread, run):
    status = run.status
    print('Current status: ' + status)
    if status == 'completed':
        messages_list = assistant.beta.threads.messages.list(thread)
        messages = [message.content for message in messages_list.body.data]
        return messages
    elif status == 'requires_action':
        print('requires_action.. looking for a function')
        if run.required_action.type == 'submit_tool_outputs':
            print('submit tool outputs ... ')
            tool_calls = run.required_action.submit_tool_outputs.tool_calls
            available_functions = {
                "get_email_list": get_email_list,
                "get_email": get_email
            }
            function_name = tool_calls[0].function.name
            function_to_call = available_functions[function_name]
            function_args = json.loads(tool_calls[0].function.arguments)
            api_response = get_email_list()
            if(function_name == 'get_email'):
                api_response = function_to_call(id=function_args.get("id"))
            #print('Query to search for: ' + parsed_args['query'])
            run = client.beta.threads.runs.submit_tool_outputs(thread_id=thread.id, run_id=run.id, tool_outputs=[{"tool_call_id": tool_calls[0].id, "output": json.dumps(api_response)},])
            wait_for_run_completion(client, thread, run)
checkingStatus(thread, run)

Current status: in_progress


In [15]:
messages = client.beta.threads.messages.list(
    thread_id = thread.id
)

In [16]:
for message in reversed(messages.data):
    print(message.role + ': ' + message.content[0].text.value)

user: Show me my latest email.


In [17]:
service.close()