# Lab 10 - Agent

In [1]:
from tqdm.notebook import tqdm
from datetime import datetime
from IPython.display import display, HTML
import numpy as np
import pickle
import openai
import json
import os
import sqlite3
from datetime import datetime

#Don't include keys like this, use ENV vars!
with open('config.json') as fd:
    conf = json.loads(fd.read())
    gpt = openai.OpenAI(api_key=conf["openai_key"])

In [2]:
# Connect to SQLite (this will create a new database file if it doesn't exist)
db_filename = 'event_planning.db'
try:
    # Attempt to delete the file
    os.remove(db_filename)
    print(f"File '{db_filename}' deleted successfully.")
except Error:
    pass
    
conn = sqlite3.connect(db_filename)
cursor = conn.cursor()

# Function to initialize the database and create tables
def initialize_database():
    # Create 'events' table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS events (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        location TEXT NOT NULL,
        date TEXT NOT NULL
    )
    ''')

    # Create 'attendees' table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS attendees (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        event_id INTEGER NOT NULL,
        full_name TEXT NOT NULL,
        FOREIGN KEY (event_id) REFERENCES events (id)
    )
    ''')

    # Commit the changes
    conn.commit()

# Initialize the database and create the tables
initialize_database()

File 'event_planning.db' deleted successfully.


In [3]:
#Gets the JSON body from a tools function response
def tool_response(response):
    try:
        content = response.choices[0]
        tool_call = content.message.tool_calls[0]
        arguments_str = tool_call.function.arguments
        return json.loads(arguments_str)
    except (AttributeError, IndexError, TypeError, json.JSONDecodeError) as e:
        print(e)
    return None

In [4]:
def complete_function(prompt,function):
    resp = gpt.chat.completions.create(
        messages=[
            {
                "role": "system",
                "content": "You are a helpful event planning assistant.",
            },
            {
                "role": "user",
                "content": prompt,
            }
        ],
        model="gpt-4o",
        temperature=0,
        tools=[
            {
                "type": "function",
                "function": function
            }
        ]
    )

    return tool_response(resp)

In [5]:
def get_classification_prompt(query):
    return f"""#Instructions

For an event management SaaS product, natural language queries need to be classified and routed to the appropriate branch. Classify the following user query:
    
{query}"""

In [6]:
def get_classification_function(methods_enum):
    return {
        "name": "task_type",
        "description": "Classify the task based on the user query.",
        "strict": True,
        "parameters": {
          "type": "object",
          "properties": {
            "task_type": {
              "type": "string",
              "enum": methods_enum
            }
          },
          "required":["task_type"],
          "additionalProperties":False
        }
    }

In [7]:
def classify(methods_enum,query):
    prompt = get_classification_prompt(query)
    function = get_classification_function(methods_enum)
    classification = complete_function(prompt,function)
    if classification:
        task = classification['task_type']
        print(f"""{query} => {task}""")
        return task
    return None

In [8]:
def date_from_string(datestr):
    try:
        date = datetime.strptime(datestr, '%Y-%m-%d')
        return date
    except ValueError as e:
        return None

## Create Event

In [9]:
def get_create_event_prompt(query):
    current_date = datetime.now().strftime('%B %d, %Y')
    return f"""#Instructions

Today's date is {current_date}. For an event management SaaS product, extract the location and the date from the following user query:
    
{query}"""

def get_create_event_function():
    return {
        "name": "event_details",
        "description": "Extract the location and date for an event.",
        "strict": True,
        "parameters": {
          "type": "object",
          "properties": {
            "location": {
              "type": "string"
            },
            "date": {
              "type": "string",
              "description": "The date formatted in YYYY-MM-DD"
            }           
          },
          "required":["location","date"],
          "additionalProperties":False
        }
    }

def create_event_sql(location,date):
    try:
        cursor.execute('''
        INSERT INTO events (location, date)
        VALUES (?, ?)
        ''', (location, date))
        conn.commit()
        print(f"Event '{location}' created successfully.")
    except sqlite3.Error as e:
        print("Error creating event:", e)

# Function to create a new event
def create_event(query):
    args = complete_function(
        get_create_event_prompt(query),
        get_create_event_function()
    )
    print(args)
    if args:
        location = args["location"]
        date = date_from_string(args["date"])
        return create_event_sql(location,date)

In [10]:
#create_event("Book the Ellison Lodge A for December 4th")

## List Events

In [11]:
def get_list_events_prompt(query):
    current_date = datetime.now().strftime('%B %d, %Y')
    return f"""#Instructions

Today's date is {current_date}. For an event management SaaS product, a user is asking to list upcoming events.  An optional date range can be provided.  Extract the date range from the following user query:
    
{query}"""

def get_list_events_function():
    return {
        "name": "events_query",
        "description": "Filter parameters to query an events SQL table based on dates.",
        "strict": True,
        "parameters": {
          "type": "object",
          "properties": {
            "after_date": {
              "type": "string",
              "description": "The date formatted in YYYY-MM-DD"
            },
            "before_date": {
              "type": "string",
              "description": "The date formatted in YYYY-MM-DD"
            }              
          },
          "required":["after_date","before_date"],
          "additionalProperties":False
        }
    }

def list_events_sql(after_date=None,before_date=None):
    query = 'SELECT id,location,date FROM events'
    params = []
    
    # Add conditions for filtering by date
    if after_date and before_date:
        query += ' WHERE date >= ? AND date <= ?'
        params.extend([after_date, before_date])
    elif after_date:
        query += ' WHERE date >= ?'
        params.append(after_date)
    elif before_date:
        query += ' WHERE date <= ?'
        params.append(before_date)
    
    # Execute the query with filters if any
    cursor.execute(query, params)
    events = cursor.fetchall()
    if events:
        for event in events:
            print(f"ID: {event[0]}, Name: {event[1]}, Date: {event[2]}")
        return events
    else:
        print("No events found for the specified date range.")
    

# Function to list all events with optional date filters
def list_events(query):
    args = complete_function(
        get_list_events_prompt(query),
        get_list_events_function()
    )
    print(args)
    if args:
        after_date=date_from_string(args["after_date"])
        before_date=date_from_string(args["before_date"])
        return list_events_sql(after_date=after_date,before_date=before_date)

## Create Attendee

In [12]:
def get_events_enum():
    #after_date = datetime.now()
    #events = list_events_sql(after_date=after_date)
    events = list_events_sql()
    events_enum = [f"""{e[1]} on {e[2]}""" for e in events]
    events_enum.append("other")
    return events_enum

def get_event_id(event_str):
    if event_str == "other":
        #Fallback spotted!  No event found
        return None

    events = list_events_sql()
    for ev in events:
        curr_event_str = f"""{ev[1]} on {ev[2]}"""
        if event_str == curr_event_str:
            #Found the event!
            return ev[0]
            
    return None

In [13]:
def get_create_attendee_prompt(query):
    return f"""#Instructions

For an event management SaaS product, extract the event and the attendee name from the following user query:
    
{query}"""

def get_create_attendee_function(events_enum):
    return {
        "name": "attendee_rsvp_details",
        "description": "Extract the event details and the attendee names.",
        "strict": True,
        "parameters": {
          "type": "object",
          "properties": {
            "event": {
              "type": "string",
              "enum": events_enum
            },
            "attendees": {
              "type": "array",
              "items": {
                "type": "string"
              }
            }
          },
          "required":["event","attendees"],
          "additionalProperties":False
        }
    }

def create_attendee_sql(event_id, full_name):
    try:
        cursor.execute('''
        INSERT INTO attendees (event_id, full_name)
        VALUES (?, ?)
        ''', (event_id, full_name))
        conn.commit()
        print(f"Attendee '{full_name}' added to event ID {event_id}.")
    except sqlite3.Error as e:
        print("Error adding attendee:", e)

def create_attendee(query):  
    events_enum = get_events_enum()
    if events_enum:        
        args = complete_function(
            get_create_attendee_prompt(query),
            get_create_attendee_function(events_enum)
        )
        print(args)
        if args:
            event_id = get_event_id(args["event"])
            if event_id:
                names = args["attendees"]
                for name in names:
                    create_attendee_sql(event_id,name)
    else:
        print("[create_attendee] No events found!  Please create an event first")
        return None

In [14]:
def get_list_attendees_prompt(query):
    return f"""#Instructions

For an event management SaaS product, extract the event from the following user query:
    
{query}"""

def get_list_attendees_function(events_enum):
    return {
        "name": "event_query",
        "description": "Identify the event details",
        "strict": True,
        "parameters": {
          "type": "object",
          "properties": {
            "event": {
              "type": "string",
              "enum": events_enum
            }
          },
          "required":["event"],
          "additionalProperties":False
        }
    }

# Function to list attendees for a specific event
def list_attendees_sql(event_id):
    cursor.execute('SELECT full_name FROM attendees WHERE event_id = ?', (event_id,))
    attendees = cursor.fetchall()
    if attendees:
        for attendee in attendees:
            print(f"ID: {attendee[0]}, Event ID: {attendee[1]}, Name: {attendee[2]}")
        return attendees
    else:
        print(f"No attendees found for event ID {event_id}.")

def list_attendees(query):
    events_enum = get_events_enum()
    if events_enum:
        args = complete_function(
            get_list_attendees_prompt(query),
            get_list_attendees_function(events_enum)
        )
        if args:
            event_id = get_event_id(args["event"])
            return list_attendees_sql(event_id)
            
    else:
       print("[list_attendees] No events found!  Please create an event first")

# Main Agent Loop

1. Accept a user query
2. Classify the query task (create event, list events, etc..)
3. Call appropriate task method
4. Repeat

In [15]:
def fallback(query):
    print("Fallback!",query)
    
def run(query):
    methods = {
        "create_event":create_event,
        "list_events":list_events,
        "create_attendee":create_attendee,
        "list_attendees":list_attendees,
        "other":fallback
    }
    print('--------')
    methods_enum = list(methods.keys())
    method_key = classify(methods_enum, query)
    method = methods[method_key]
    #print(method)
    method(query)
    print('--------')

In [16]:
run("Book the Ellison Lodge A for December 4th")
run("Jane Doe is coming to the party on 12/4")
run("Pencil in highland park gazebo for june 11th")
run("Leah, Alice, and Fred are all coming to ellison in december")
run("Who is coming to highland park?")
run("What spaces are available?")
run("What events are coming up?")

--------
Book the Ellison Lodge A for December 4th => create_event
{'location': 'Ellison Lodge A', 'date': '2023-12-04'}
Event 'Ellison Lodge A' created successfully.
--------
--------
Jane Doe is coming to the party on 12/4 => create_attendee
ID: 1, Name: Ellison Lodge A, Date: 2023-12-04 00:00:00
{'event': 'Ellison Lodge A on 2023-12-04 00:00:00', 'attendees': ['Jane Doe']}
ID: 1, Name: Ellison Lodge A, Date: 2023-12-04 00:00:00
Attendee 'Jane Doe' added to event ID 1.
--------
--------
Pencil in highland park gazebo for june 11th => create_event
{'location': 'highland park gazebo', 'date': '2024-06-11'}
Event 'highland park gazebo' created successfully.
--------
--------
Leah, Alice, and Fred are all coming to ellison in december => create_attendee
ID: 1, Name: Ellison Lodge A, Date: 2023-12-04 00:00:00
ID: 2, Name: highland park gazebo, Date: 2024-06-11 00:00:00
{'event': 'Ellison Lodge A on 2023-12-04 00:00:00', 'attendees': ['Leah', 'Alice', 'Fred']}
ID: 1, Name: Ellison Lodge A,