A DIY AI-drive Email-Based Meeting Scheduler agent
===


Introduction
---

Large Language Models (LLMs) have unlocked new paradigms in automation and intelligent systems design, where agents when powered by LLMs, can plan, reason, and interact with tools or other agents to achieve complex objectives. The purpose of this notebook is to illustrates these principles through a toy example: the agent reads emails that are requests for scheduling meetings, finds available times in the calendar, schedules them, and sends confirmations to the requestor. This implementation is built from scratch, without relying on popular agentic frameworks like LangChain, CrewAI, AWS Bedrock Agents so that we can understand the principles behind such systems and components involved in builiding such systems. It also demonstrates how (natural) language can be used as a flexible and powerful interface between system components. The example also shows how we can use LLM to determine if we need to abort or terminate prematurely due to failures or exceptional conditions when executing the task plan.

Pre-requisites:
* Google email and calendar python APIs are used to read emails and google calendar to schedule the meetings. Get necessary permissions with Google services and download the authentication credentials as a local JSON file. Please refer to Google's official documentation for step-by-step instructions on how to complete this setup.
* Anthropic Sonnet model is used for LLM through AWS Bedrock conversation API. Enable Anthropic Claude 3 Sonnet model in your AWS account and verify that you are able be able to connect to it via boto library.

Implementation
---

Start importing libraries we would be using.

In [None]:
import base64
import datetime
import email
import json
import os
import os.path
import pickle
import re
import subprocess
import time
import traceback

from typing import Optional, NamedTuple

from dateutil import parser, tz
from datetime import datetime, date, time, timedelta
from email.mime.text import MIMEText

import boto3
import requests
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

GOOGLE_CREDENTALS points to a google authentication credentials in the local file system.

In [None]:
GOOGLE_CREDENTALS = "<full path to google client secret json file, something like client_secret_...apps.googleusercontent.com.json>"

In [None]:
# Maximum number of emails to read to find a meeting request. Gives up if not found any in this range.
MAX_EMAILS_TO_READ = 2

In [None]:
# Anthropic model id, make sure to get permissions to it in your AWS account

MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0"

In [None]:
class GoogleServices:

    """
    This class encapsulates all interactions with google services like email and calendar.
    """
    
    # WARNING: If modifying these scopes, delete the cache file token.pickle
    SCOPES = ['https://www.googleapis.com/auth/gmail.readonly',
              'https://www.googleapis.com/auth/gmail.send',
              'https://www.googleapis.com/auth/gmail.modify',
              'https://www.googleapis.com/auth/calendar.events'
             ]

    def __init__(self, credentials_json):
        self.creds = self._google_authenticate(credentials_json, GoogleServices.SCOPES)
        self.gmail_service = self._get_gmail_service(self.creds)
        self.google_calendar_service = self._get_google_calendar_service(self.creds)

    def _get_gmail_service(self, creds):
        return build('gmail', 'v1', credentials=creds)

    def _get_google_calendar_service(self, creds):
        return build("calendar", "v3", credentials=creds)    
    
    def _google_authenticate(self, creds_json, scopes):
        creds = None
        if os.path.exists('token.pickle'):
            with open('token.pickle', 'rb') as token:
                creds = pickle.load(token)
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(creds_json, scopes)
                creds = flow.run_local_server(port=0)
            with open('token.pickle', 'wb') as token:
                pickle.dump(creds, token)
    
        return creds

    def _create_message(self, sender, to, subject, message_text):
        message = MIMEText(message_text)
        message['to'] = to
        message['from'] = sender
        message['subject'] = subject
        raw_message = base64.urlsafe_b64encode(message.as_bytes())
        return {'raw': raw_message.decode()}
    
    def send_message(self, sender, to, message_text, message_subject = "Test Email via Gmail API"):        
        message = self._create_message(sender, to, message_subject, message_text)
        message = self.gmail_service.users().messages().send(userId="me", body=message).execute()
        print(f'Message sent: {message["id"]}')

    def mark_message_read(self, msg_id):
        self.gmail_service.users().messages().modify(userId='me', id=msg_id, body={'removeLabelIds': ['UNREAD']}).execute()

    def get_message(self, msg_id):
    
        try:
            message_list = self.gmail_service.users().messages().get(userId='me', id=msg_id, format='raw').execute()
    
            msg_raw = base64.urlsafe_b64decode(message_list['raw'].encode('ASCII'))
    
            msg_str = email.message_from_bytes(msg_raw)

            
            sender = msg_str['from']
            to = msg_str['to']

            content_types = msg_str.get_content_maintype()
            
            if content_types == 'multipart':
                part1, part2 = msg_str.get_payload()
                if self._is_base64(part1.get_payload()):
                    part1_decoded = base64.b64decode(part1.get_payload()) 
                else:
                    part1_decoded = part1.get_payload()
                return {"from": sender, "to" : to, "body": part1_decoded }
            else:
                return {"from": sender, "to" : to, "id": msg_id, "body": msg_str.get_payload() }    
    
        except HttpError as error:
            # TODO - Handle errors from gmail API.
            print(f'An error occurred: {error}')

    def _get_events(self):
        try:
            now = datetime.utcnow().isoformat() + "Z"
            # This is a toy program -- a more robust solution would get all events in a date/time range
            events_result = (
                self.google_calendar_service.events()
                .list(
                    calendarId="primary",
                    timeMin=now,
                    maxResults=25,
                    singleEvents=True,
                    orderBy="startTime",
                )
                .execute()
            )
            events = events_result.get("items", [])
            
            if not events:
                print("No upcoming events found.")
                return []
    
            # Prints the start and name of the next 10 events
            for event in events:
                start = event["start"].get("dateTime", event["start"].get("date"))
    
            return events
            
        except HttpError as error:
            print(f"An error occurred: {error}")
        return []
    
    def simple_events(self):

        events = self._get_events()
        
        summaries = []
        
        for event in events:
            start = event["start"].get("dateTime", event["start"].get("date"))
            end = event["end"].get("dateTime", event["end"].get("date"))
            summary = event.get("summary", "There is no summary for the event")
            desc = event.get("description", "There is no description for the event")
            summaries.append({"start": start, "end": end, "summary": summary, "description": desc})
    
        return summaries

    def _convert_end_datetime(self, start, duration):
        start_dt = parser.isoparse(start)
        delta = timedelta(minutes=int(duration))
        end_dt = start_dt + delta
        return end_dt.isoformat()

    def create_calendar_event(self, summary, start, duration):

        duration = int(duration) # llm response that passed here is string
        
        end = self._convert_end_datetime(start, duration)
        
        event = {
          'summary': summary,
          'description': summary,
          'start': {
            'dateTime': start
          },
          'end': {
            'dateTime': end,
          },
          'visibility': 'public'
        }
        event = self.google_calendar_service.events().insert(calendarId='primary', body=event).execute()
        response = f"Event created: {event.get('htmlLink')}. Event start date/time: {start} and end date/time: {end}"
        return response

    def _is_base64(self, s: str) -> bool:
        # Check valid characters and length
        if not re.fullmatch(r'^[A-Za-z0-9+/]*={0,2}$', s):
            return False
        if len(s) % 4 != 0:
            return False
    
        try:
            # Try decoding and re-encoding to see if matches
            decoded = base64.b64decode(s, validate=True)
            return base64.b64encode(decoded).decode('utf-8').rstrip('=') == s.rstrip('=')
        except Exception:
            return False

In [None]:
class Llm:
    """
    Just a wrapper to AWS Bedrock API
    """
    
    def __init__(self, runtime, model):
        self.runtime = runtime
        self.model = model

    def invoke(self, sys_prompt, user_prompt, verbose=False):
        messages = [
            {
                "role": "user",
                "content": [{ 
                    "text": user_prompt,
                }]
            }
        ]
        system = [
            {
                "text": sys_prompt
            }
        ]
    
        if (verbose):
            print("--------")        
            print(sys_prompt)
            print(user_prompt)
    
        infcfg = {
            'maxTokens': 4096,
            'temperature': 0
        }
        response = self.runtime.converse(
            messages=messages,
            system=system,
            modelId = self.model, 
            inferenceConfig = infcfg)
        if verbose:
            print(response)
            print("--------")
        return response["output"]["message"]["content"][0]["text"]

    def __call__(self, sys_prompt, user_prompt, verbose=False):
        return self.invoke(sys_prompt, user_prompt, verbose)

**Notes**
* Opted to use global functions for most interactions with the LLM and tools to keep the code minimal and easy to follow, avoiding extra layers of abstraction that could add cognitive overhead—especially when studying how prompts are constructed and used. That said, it's easy to imagine these functions being wrapped in Tool or Agent classes if desired.
* State refers to a dictionary that stores variables shared across tools and not in LLM prompt construction.
* AgentMemory is a data structure that keeps a record of the tools invoked, along with their parameters and responses. This information is used in various LLM prompts to help guide the model in generating more appropriate and context-aware responses.
* All tools take these 3 parameters in addition to their own specific ones: (1) llm - instance of LLM (2) google_services - instance of google service class defined above (3) state - instance of state dictionary

In [None]:
class State(dict):
    """
    Holds any data that may be required to be accessed between executing
    the task plan steps or between the tools.
    """
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

In [None]:
class AgentMemory:

    """
    Keeps a record of the tools invoked, along with their parameters and responses. 
    This information is used in various LLM prompts as part of context.
    """
    
    def __init__(self):
        self.clear()

    def add(self, task, response):
        self.tasks.append(task)
        self.responses.append(response)

    def clear(self):
        self.tasks = []
        self.responses = []

    def format(self):
        data = []
        for a, r in zip(self.tasks, self.responses):
            item = f"<tool_invoked>{a}</tool_invoked>\n<tool_response>{r}</tool_response>"
            data.append(item)
        return "<items>"+"".join(data)+"</items>"

In [None]:
def extract_between_tags(tag, string, strip = False):
    """
    For Anthropic Claude models, wrapping output in XML (a similar markup) improves reliability of output parsing; 
    keeps free-form output separate from structured output (like json) and makes it easy to parse output without 
    relying on complex or brittle regular expressions. Following is a simple function that extracts content from given XML from the output.
    """
    ext_list = re.findall(f"<{tag}>(.+?)</{tag}>", string, re.DOTALL)
    if strip:
        ext_list = [e.strip() for e in ext_list]
    return ext_list

In [None]:
def check_meeting_request(llm, email_body):

    """
    A function that uses an LLM to parse email body (which is in natural language)
    and determine if it is a request for meeting and if so, return the meeting details
    in a structured output that could be used by a deterministic code.
    """
    
    system_prompt = """
        You are an intelligent AI assistant designed to analyze the content of an email.
        
        Your task:
        - Carefully read the provided email body.
        - Determine if the email is a request for a meeting.
        - If it is a meeting request, extract the following details:
            - Subject of the meeting: a short phrase describing what the meeting is about.
            - Proposed time or date: when the sender suggests holding the meeting (if mentioned).
        - If the email is not a meeting request, respond with:
            <output>
            {
                "is_meeting_request": false
            }
            </output>
        
        Extraction Guidelines:
        - The subject should be derived from the purpose or key topic mentioned in the email.
        - The time or date may be explicitly stated (e.g., “next Wednesday at 3 PM”) or implied (e.g., “early next week”). Extract this as-is, do not perform any computation on this.
        - If the time is not given or not clear, leave the time field empty.
        
        Output format:
        <output>
        {
            "is_meeting_request": true,
            "meeting_subject": "short subject line or key topic",
            "proposed_time": "proposed date/time string or empty if not specified",
            "duration": "duration or `60 minutes` if not specified"
        }
        </output>
    """

    user_prompt = f"""
        Here is an email body:
        "{email_body}"
        
        Please determine if this is a meeting request, and if yes, extract the meeting subject and proposed time.
    """

    response = llm(system_prompt, user_prompt, True)

    output = extract_between_tags("output", " ".join(response.split("\n")))[0]
    return json.loads(output)

In [None]:
def format_message_body(llm, message):
    """
    Use LLM to create a nice natural language description of scheduled meeting details
    to be sent as reply to meeting schedule emails.
    """
    
    s = """
    Format the given message in readable English. Use bullet points and other formatting techiniques as necessary.
    Be succinct.
    Do not include explanations — only the formatted text.
    """
    return llm(s, message)

In [None]:
class Event(NamedTuple):
    """Represents an event with start and end date/time."""
    start_datetime: datetime
    end_datetime: datetime

def generate_available_slots(constraints):
    """
    Finds the earliest available timeslot within the specified date range.
    
    Args:
        start_date: The start date of the range to search
        end_date: The end date of the range to search (inclusive)
        events: List of existing events that might block timeslots
        duration_minutes: Required duration of the timeslot in minutes
    
    Returns:
        The start time of the earliest available timeslot, or None if none found

    Warning: This function is not thoroughly vetted and there could be edge cases for which this won't work. The
        purpose of this function is only to illustrate how a scheduling agent works and this function just 
        serves to help it as a demo only.
    """

    current_dt = parser.isoparse(constraints["currentDateTime"]).astimezone()
    tzinfo = current_dt.tzinfo
    
    start_date = parser.isoparse(constraints["dateRange"]["start"]).astimezone()
    end_date = parser.isoparse(constraints["dateRange"]["end"]).astimezone()

    business_hours = constraints["businessHours"]


    # Office hours constraints
    OFFICE_START_TIME = datetime.strptime(business_hours["start"], "%H:%M").time()
    OFFICE_END_TIME = datetime.strptime(business_hours["end"], "%H:%M").time()
    
    # Lunch break constraints
    LUNCH_START_TIME = datetime.strptime(business_hours["lunchBreakStart"], "%H:%M").time()
    LUNCH_END_TIME = datetime.strptime(business_hours["lunchBreakEnd"], "%H:%M").time()
    
    # Convert duration to timedelta object
    required_duration = timedelta(minutes=constraints["requiredDurationMinutes"])

    if required_duration <= timedelta(0):
        return None
        
    events = [Event(start_datetime=parser.isoparse(event["start"]).astimezone(), end_datetime=parser.isoparse(event["end"]).astimezone())
        for event in constraints["existingEventsSummary"] ]

    # Sort events by start time
    sorted_events = sorted(events, key=lambda e: e.start_datetime)

    print(f"All sorted events {sorted_events}")
    
    # Iterate through each day in the date range
    current_date = start_date
    while current_date <= end_date:

        # Create time blocks for this day
        day_start = datetime.combine(current_date, OFFICE_START_TIME, tzinfo)
        lunch_start = datetime.combine(current_date, LUNCH_START_TIME, tzinfo)
        lunch_end = datetime.combine(current_date, LUNCH_END_TIME, tzinfo)
        day_end = datetime.combine(current_date, OFFICE_END_TIME, tzinfo)

        if current_date >= end_date:
            print("Could not find any")
            break
            
        print(f"\nChecking for day {day_start} - {day_end}\n\n")

        # Filter events for this day
        day_events = []
        for e in sorted_events:
            print(f"\Checking  event : {e.start_datetime} - {e.end_datetime}")
            
            if e.end_datetime <= day_start:
                continue
            if e.start_datetime >= day_end:
                continue
            day_events.append(e)
            print(f"\tOverlapping event of the day: {e.start_datetime} - {e.end_datetime}")
            
        print(f"\tExisting events on this day = {day_events}")
        
        # Create a list of busy periods (including events and lunch)
        busy_periods = []
        
        # Add lunch as a busy period
        busy_periods.append((lunch_start, lunch_end))
        
        # Add events as busy periods
        for event in day_events:
            # Clip event times to office hours for this day
            event_start = max(event.start_datetime, day_start)
            event_end = min(event.end_datetime, day_end)
            
            # Only add if event is within office hours
            if event_start < event_end:
                busy_periods.append((event_start, event_end))
        
        # Sort busy periods by start time
        busy_periods.sort()


        # Merge overlapping busy periods
        if busy_periods:
            merged_periods = [busy_periods[0]]
            for current_start, current_end in busy_periods[1:]:
                prev_start, prev_end = merged_periods[-1]
                if current_start <= prev_end:
                    # Periods overlap, merge them
                    merged_periods[-1] = (prev_start, max(prev_end, current_end))
                else:
                    # No overlap, add as new period
                    merged_periods.append((current_start, current_end))
            busy_periods = merged_periods

        print(f"\tBusy periods = {busy_periods}")
        

        # Find gaps between busy periods that are large enough
        current_time = day_start
        for busy_start, busy_end in busy_periods:
            print(f"\tChecking if current time {current_time} falls in busy period: {busy_start} - {busy_end}")
            # If there's a gap before this busy period
            if current_time < busy_start:
                gap_duration = busy_start - current_time
                if gap_duration >= required_duration:
                    return current_time.isoformat()
            
            # Move current time to end of this busy period
            current_time = busy_end
        
        # Check for gap after last busy period
        if current_time < day_end:
            gap_duration = day_end - current_time
            if gap_duration >= required_duration:
                return current_times.isoformat()
        
        # Move to next day
        current_date += timedelta(days=1)
    
    # No available timeslot found
    return None

In [None]:
def read_email(llm, google_services, state, subject):

    """
    A tool to read unread emails from google email inbox and generate structured output
    for meeting requests.
    """
    
    search_id = google_services.gmail_service.users().messages().list(userId='me', labelIds=['INBOX', 'UNREAD']).execute()

    number_result = search_id['resultSizeEstimate']
    
    if number_result > 0:
        message_ids = search_id['messages']

        for ids in message_ids[:MAX_EMAILS_TO_READ]:
            print("Getting message with id:", ids['id'])
            message = google_services.get_message(ids['id'] )
            mtg_req = check_meeting_request(llm, message['body'])
            if mtg_req['is_meeting_request']:
                mtg_req['from'] = message['from']
                mtg_req['to'] = message['to']
                # Let's store the message id in the state so later steps can use it (for example to mark it as read)
                # It could potentially be stored in the mtg_request and passed on to subsequent steps,
                # but that could pollute the LLM prompt context
                state['message_id'] = ids['id']
                return [mtg_req]

    print('There were 0 results for that search string')
    return "There are no pending emails that require meeting scheduling"

In [None]:
def get_calendar_availability(llm, google_services, state, criteria):

    """
    A tool that goes through user's calendar, find current scheduled events, and based on them
    find the next available time in the calendar.

    Criteria is read from the email of meeting request and is in natural language
    description. First uses LLM to convert this loosedly given criteria into a
    structured set of constraints (JSON). Then searches the calendar for available
    slots.
    """
    
    events = google_services.simple_events()
    events_summary = json.dumps(events) # "\n".join(_events_as_nls(events))

    current = datetime.now().isoformat()

    system_prompt = f"""
    You are an intelligent AI assistant helping with calendar scheduling.
    
    <instructions>
    - You will not perform time calculations.
    - You are given current calendar events and the user's criteria.
    - Your task is to:
        - Identify constraints (date ranges, duration, preferred times of day).
        - Structure those constraints clearly for backend processing.
        - Specify required offsets (e.g., "at least 1 hour from now") and business-hour boundaries.
        - Highlight exclusions (e.g., lunch breaks, existing events).
    - Output constraints as a structured JSON object.
    - Do not guess or calculate times — just reason and structure the information.
    - Fill the "start" and "end" fields in "dateRange" with date/time in RFC3339 format. Make sure use the same timezone as given in the current date/time.
    - Note that current date/time is given in RFC3339 format.
    - When generating dates, always use an explicit offset from UTC.
    
    Current date and time: '{current}'
    
    Output format:
    <constraints>
    {{
        "currentDateTime": "{current}",
        "businessHours": {{
            "start": "08:00",
            "end": "16:30",
            "lunchBreakStart": "12:00",
            "lunchBreakEnd": "13:30"
        }},
        "minOffsetFromNowMinutes": 60,
        "requiredDurationMinutes": "parse duration from criteria converting into minutes",
        "dateRange": {{ "start": "fill start date in RFC3339 format", "end": "fill end date in RFC3339 format" }},
        "userPreferences": "{criteria}",
        "existingEventsSummary": {events_summary}
    }}
    </constraints>
    """

    user_prompt = f"""
    Here are the current calendar events: {events_summary}
    User criteria: {criteria}
    Please extract scheduling constraints for backend processing.
    """
    
    llm_slots = llm(system_prompt, user_prompt, True)
    constraints_str  = extract_between_tags("constraints", " ".join(llm_slots.split("\n")))[0]
    constraints_json = json.loads(constraints_str)

    available_slots = generate_available_slots(constraints_json)

    if available_slots is not None:
        first_available_slot = available_slots
    else:
        first_available_slot = "No slots available. Cannot reserve a time in calendar. Respond to email saying that no slots available in near future and check back again in a week."
    return f"<get_calendar_availability_response><constraints>{constraints_json}</constraints><first_available_slot>{first_available_slot}</first_available_slot></get_calendar_availability_response>"

In [None]:
def send_email(llm, google_services, state, sender, to, subject, body):
    formatted_body  = format_message_body(llm, body)
    return google_services.send_message(sender, to, formatted_body, subject)

In [None]:
def schedule_meeting(llm, google_services, state, subject, startDateTime, duration):
    print(f"Will mark message with id : {state['message_id']} as read")
    google_services.mark_message_read(state['message_id'])
    return google_services.create_calendar_event(subject, startDateTime, duration)

In [None]:
# Description of tools availables. In a practical system, use something like OpenAPI speficiation and tool calling features of LLMs (like those provided by LLM Conversation API)

tool_definitions = """
    <tools>
        <tool>read_email — Read email; requires parameter `subject`</tool>
        <tool>get_calendar_availability - Get the availability day abd time in the calendar based on proposed date/time (such as as week of the day,  daypart, expressions like next week, dayparts like morning / evening and so on) and/or duration (like 30 minutes, 2 hours and so on); requires parameter `criteria`</tool>
        <tool>schedule_meeting - Schedues time on calendar for a meeting; requires parameters: `subject`, `startDateTime`, `duration` in minutes</tool>        
        <tool>send_email — Send email; requires parameters `sender`, `to`, `subject`, `body`</tool>
    </tools>
"""

In [None]:
def task_planner_agent(tool_definitions, objective):
    s = f"""
    You are a Smart AI Task Planner. Your responsibility is to create a structured, step-by-step list of tasks that accomplish a user's objective.
    Each task involves invoking one of the available tools with appropriate parameters.
    You will not execute the tools — only build the plan.
    
    <instructions>
        - Analyze the user's objective and break it down into tasks.
        - Select appropriate tool(s) from the <tools> list for each task.
        - Each tool may require one or more parameters
        - Use parameters from:
            - The user query
            - Stored memory values
        - Do not guess parameter values.
        - Maintain correct order of tasks so outputs from previous tool invocations are always available before dependent tasks are executed.
    </instructions>

    {tool_definitions}

    <checklist>
      - All tool parameter_name and parameter_values are resolved through analizing user objective or memory.
      - The plan is ordered logically and cleanly.
      - Provide the final plan wrapped in a <plan> XML tag in the described JSON format.
      - Choose the correct tools from the provided list.
      - Create a step-by-step plan showing the order of tool calls.
      - For each tool, list its parameters using parameter_name and parameter_value.
    </checklist>

    <output>
        Output Format:
        Output the plan inside a <plan> XML tag.
        The plan is a JSON array of tool calls, where each tool call contains:
        "name": The tool name
        "parameters": An array of objects, each with:
            "parameter_name"
            "parameter_value"
        Example output structure:
            <plan>
            [
              {{
                "tool": "name of the tool",
                "parameters": [
                    {{ "parameter_name": "parameter_name", "parameter_value": "parameter_value" }}
                  ]
              }}
            ]
            </plan>

    </output>
    """
    p = f"""
        {objective}
    """
    return llm(s, p)

In [None]:
def create_executable_plan(tool_definitions, objective):
    llm_plan = task_planner_agent(tool_definitions, objective)
    print(llm_plan)
    all_tasks  = extract_between_tags("plan", " ".join(llm_plan.split("\n")))[0]
    tasks = json.loads(all_tasks)
    return tasks

In [None]:
def get_tool_select_system_prompt(tool_definitions, agent_memory):
    tool_select_system_prompt = f"""
    You are an AI assistant helping the user accomplish a task by populating the parameter values of a tool.
    
    <instructions>
        - Analyze the user's current tool.
        - Reference the memory of previous tool executions and responses to inform parameter values.
        - Choose the most appropriate parameter_names and parameter_values.
        - You are only going to format the tool given in user's query. You are not going to select a different tool.
    </instructions>

    You have access to the previous calls to tools and their responses:
    <memory>
        {agent_memory.format()}
    </memory>
    
    <checklist>
    - Parameter values can come from:
      - The current user query
      - The output of previously used tools (referenced from memory)
    
    - If a tool does not require parameters, just leave parameter_name, parameter_value as empty strings.
    - Only output one tool call at a time (if needed).
    - If no tool is required for the user's query, return an empty tool value.
    </checklist>

    {tool_definitions}

    <output>
    Output only JSON in the following format:
    
    
    {{
      "tool": "tool_name",
      "parameters": [
        {{
          "parameter_name": "name_of_parameter",
          "parameter_value": "value_from_memory"
        }}
      ]
    }}

    </output>

    No explanations or text outside the JSON.
    """

    return tool_select_system_prompt

In [None]:
def should_continue_with_plan(llm, plan, tool_definitions, agent_memory):
    system_prompt = """

        You are an AI assistant helping the user accomplish an objective. A plan is created to achieve the objective in the
        form of sequence of tools to be invoked. 
        Your goal is to analyze the plan, the tools executed so far, and the most recent tool execution and its response. Then you come up with
        saying whether user should continue with the next task in the plan or stop because there is no way to continue based on the most recent tool
        and its response.
        
        
        <background>
        - Review the tools already executed and their outputs.
        - Compare that to the original tool sequence.
        - Decide if the remaining sequence should be modified by:
          - Removing redundant or no-longer-relevant tools
          - Reordering for better flow        
          - Adding new tools that are now needed based on output or failure
        </instructions>
        
        <guidelines>
        - Only keep tools that are still necessary.
        - Add new tools if new subtasks have emerged.
        - Make sure dependencies between tools are preserved.
        </guidelines>

        Here are the tools:
        {tool_definitions}

    Output only JSON using the <output> tags as given below:   

    <output>
    {{
      "continue": "yes or no"
    }}
    </output>

    Think step by step, output your thoughts in <thinking> tags.

    """

    user_prompt = f"""
    <plan>
      {plan}
    </plan>

    <tools_executed_and_responses>
            {agent_memory.format()}
    </tools_executed_and_responses>

    Last item in the tools_executed_and_responses points to the most recent tool execution and its response.
    """

    response = llm(system_prompt, user_prompt, True)

    flag_str  = extract_between_tags("output", " ".join(response.split("\n")))[0]
    flag = json.loads(flag_str)    

    if flag["continue"] == "yes":
        return True

    return False

In [None]:
def invoke_tool(llm, google_services, state, func_name, params):
    args = { param['parameter_name'] : param['parameter_value'] for param in params }
    args["llm"] = llm
    args["google_services"] =  google_services
    args["state"] = state
    return globals()[func_name](**args)

In [None]:
def execute_task(llm, google_services, state, tools, tool_definitions, task, agent_memory):

    task_str = json.dumps(task)
    tool_select_system_prompt = get_tool_select_system_prompt(tool_definitions, agent_memory)
    tool_to_invoke = llm(tool_select_system_prompt, task_str)

    tool_to_invoke_json = json.loads(tool_to_invoke)

    print(f"Invoking tool {tool_to_invoke_json}")

    tool = tool_to_invoke_json.get("tool")
    parameters = tool_to_invoke_json.get("parameters", [])

    if tool in tools:
        response = invoke_tool(llm, google_services, state, tool, parameters)
        agent_memory.add(tool_to_invoke, f"task `{tool}` completed, response = {response}")
    else:
        print("Unknown tool: ", tool)

    return
    

In [None]:
def execute_plan(llm, google_services, state, tools, tool_definitions, tasks, agent_memory):
    print("----\nInitial memory:", agent_memory.format())

    for i, task in enumerate(tasks):
        print("--------\nNext task to execute: ", task, "\n\n")
        go = input("Continue?")
        if go == "yes":
            pass
        else:
            break
        
        execute_task(llm, google_services, state, tools, tool_definitions, task, agent_memory)

        # In general, depending on response (for example as simple as success vs. failure)
        # from tool after execute_task you might want to do somethng different.
        # There are atleast a couple of options (a) if-condition, but that is hard-wiring logic 
        # and need do in advance (b) have a replan prompt that helps to decide to do something else --
        # like calling a different tool or asking to abort. But this could lead to hallucination
        # and other gotchas -- requires more testing with plenty of datasets.
        # With replanning, we may do something like this:
        # pending_tasks = replan(tool_definitions, agent_memory, pending_tasks, task)

        print("Memory after execution:", agent_memory.format())
        print("\n\n-----------\n\n")

        if i < len(tasks) - 1:
            print("Checking if should continue with the plan...")
            continue_flag = should_continue_with_plan(llm, tasks, tool_definitions, agent_memory)
    
            if not continue_flag:
                print("Agent determined there is no point continuing, so aborting the plan execution")
                break

In [None]:
def get_email_schedule_objective(subject):
    objective = f"""
    Here is my objective:
        - I have a email requesting scheduling a meeting with me.
        - Read the email with subject: {subject} and extract what the meeting is about, and when they wanted to schedule the meeting.
        - Then check my calendar, to find available slots based on the criteria given in the email.
        - Schedule the meeting in the calendar with the earliest available slot.
        - Finally send a confirmation email to the original requestor using appropriate subject and body based on previous steps.
    """
    return objective

In [None]:
tools = ["send_email", "read_email", "get_calendar_availability", "schedule_meeting"]

In this toy example, notice how objective and tools are lined up well with each other. This helps to reduce hallucinations when generating execution plan.

In a more complex application the tools may be determined through a shortlisting process (which itself could leverage LLM/RAG techniques) for the current objective.

In [None]:
bedrock_runtime_client = boto3.client(service_name="bedrock-runtime", region_name="us-east-1")

In [None]:
llm = Llm(bedrock_runtime_client, MODEL_ID)

In [None]:
google_services = GoogleServices(GOOGLE_CREDENTALS)

In [None]:
objective = get_email_schedule_objective("schedule meeting")

In [None]:
tasks = create_executable_plan(tool_definitions, objective)

In [None]:
agent_memory = AgentMemory()
state = State()

In [None]:
execute_plan(llm, google_services, state, tools, tool_definitions, tasks, agent_memory)

Exercise
---

Modify should_continue_with_plan to be a richer replanning sub-agent. For example, if there are no timeslots available, instead of aborting, alter the plan (i.e., re-plan) to directly send email indicating so and asking to check back later.