# Lab 10 - Agent

In [None]:
from tqdm.notebook import tqdm
from datetime import datetime
from IPython.display import display, HTML
import numpy as np
import pickle
import openai
import json
import os
import sqlite3
from datetime import datetime

#Don't include keys like this, use ENV vars!
with open('config.json') as fd:
    conf = json.loads(fd.read())
    gpt = openai.OpenAI(api_key=conf["openai_key"])

## Our database schema, using SQLite

In [None]:
# Connect to SQLite (this will create a new database file if it doesn't exist)
db_filename = 'event_planning.db'
try:
    # Attempt to delete the file
    os.remove(db_filename)
    print(f"File '{db_filename}' deleted successfully.")
except Exception as e:
    pass
    
conn = sqlite3.connect(db_filename)
cursor = conn.cursor()

# Function to initialize the database and create tables
def initialize_database():
    # Create 'events' table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS events (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        location TEXT NOT NULL,
        date TEXT NOT NULL
    )
    ''')

    # Create 'attendees' table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS attendees (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        event_id INTEGER NOT NULL,
        full_name TEXT NOT NULL,
        FOREIGN KEY (event_id) REFERENCES events (id)
    )
    ''')

    # Commit the changes
    conn.commit()

# Initialize the database and create the tables
initialize_database()

## Calculates the cost of an OpenAI request

In [None]:
def cost(response):
    # Define prices per million tokens for each model version
    prices_per_million = {
        "gpt-4o": {"input": 5.00, "output": 15.00},
        "gpt-4o-2024-08-06": {"input": 2.50, "output": 10.00},
        "gpt-4o-2024-05-13": {"input": 5.00, "output": 15.00},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "gpt-4o-mini-2024-07-18": {"input": 0.15, "output": 0.60},
        "gpt-3.5-turbo": {"input": 0.003, "output": 0.006},
        "davinci-002": {"input": 12.00, "output": 12.00},
        "babbage-002": {"input": 1.60, "output": 1.60},
        "text-embedding-3-small": {"input": 0.020, "output": 0.020},
        "text-embedding-3-large": {"input": 0.130, "output": 0.130},
        "ada-v2": {"input": 0.100, "output": 0.100},
    }

    model_version = response.model
    if model_version in prices_per_million:
        input_price_per_million = prices_per_million[model_version]["input"]
        output_price_per_million = prices_per_million[model_version]["output"]
    else:
        raise ValueError(f"Pricing information for model '{model_version}' is not available.")

    # Get token usage
    prompt_tokens = response.usage.prompt_tokens
    completion_tokens = response.usage.completion_tokens

    # Calculate costs in dollars
    input_cost = (prompt_tokens / 1_000_000) * input_price_per_million
    output_cost = (completion_tokens / 1_000_000) * output_price_per_million
    total_cost = input_cost + output_cost
    return total_cost

## Helpers to process structured OpenAI responses

In [None]:
#Gets the JSON body from a tools function response
def tool_response(response):
    try:
        content = response.choices[0]
        tool_call = content.message.tool_calls[0]
        arguments_str = tool_call.function.arguments
        return json.loads(arguments_str)
    except (AttributeError, IndexError, TypeError, json.JSONDecodeError) as e:
        print(response)
        print(e)
    return None

In [None]:
#Gets the JSON body from a structured format response
def chain_of_thought_response(response):
    try:
        choice = response.choices[0]
        content = choice.message.content
        content_json = json.loads(content)
        final_answer = content_json["final_answer"]
        return final_answer
    except (AttributeError, IndexError, TypeError, json.JSONDecodeError) as e:
        print(response)
        print(e)
    return None

## OpenAI completion request methods

In [None]:
def complete_chain_of_thought(prompt,function):

    #We use parts of the function for a pre-made chain-of-thought template
    name = function["name"]
    description = function["description"]
    parameters = function["parameters"]
    
    resp = gpt.chat.completions.create(
        messages=[
            {
                "role": "system",
                "content": "You are a helpful event planning assistant. Always analyze and think step-by-step before responding with the final answer.",
            },
            {
                "role": "user",
                "content": prompt,
            }
        ],
        model="gpt-4o-mini",
        temperature=0,
        response_format={
            "type": "json_schema",
            "json_schema": {
              "name": name,
              "description": description,
              "strict": True,
              "schema": {
                "type": "object",
                "properties": {
                  "steps": {
                    "type": "array",
                    "items": {
                      "type": "object",
                      "properties": {
                        "explanation": {
                          "type": "string"
                        },
                        "output": {
                          "type": "string"
                        }
                      },
                      "required": ["explanation", "output"],
                      "additionalProperties": False
                    }
                  },
                  "final_answer": parameters
                },
                "required": ["steps", "final_answer"],
                "additionalProperties": False
              }
            }
        }
    )

    return chain_of_thought_response(resp)

In [None]:
def complete_function(prompt,function,chain=False):
    if chain:
        return complete_chain_of_thought(prompt,function)
        
    resp = gpt.chat.completions.create(
        messages=[
            {
                "role": "system",
                "content": "You are a helpful event planning assistant.",
            },
            {
                "role": "user",
                "content": prompt,
            }
        ],
        model="gpt-4o",
        temperature=0,
        tools=[
            {
                "type": "function",
                "function": function
            }
        ]
    )

    return tool_response(resp)

## Task Classification

In [None]:
def get_classification_prompt(query):
    return f"""# Instructions\n\nFor an event management SaaS product, natural language queries need to be classified and routed to the appropriate branch. Classify the following user query:\n    \n{query}"""

In [None]:
def get_classification_function(methods_enum):
    return {
        "name": "task_type",
        "description": "Classify the task based on the user query.",
        "strict": True,
        "parameters": {
          "type": "object",
          "properties": {
            "task_type": {
              "type": "string",
              "enum": methods_enum
            }
          },
          "required":["task_type"],
          "additionalProperties":False
        }
    }

In [None]:
def classify(methods_enum,query):
    prompt = get_classification_prompt(query)
    function = get_classification_function(methods_enum)
    classification = complete_function(prompt,function)
    if classification:
        task = classification['task_type']
        print(f"""{query} => {task}""")
        return task
    return None

In [None]:
def date_from_string(datestr):
    try:
        date = datetime.strptime(datestr, '%Y-%m-%d')
        return date
    except ValueError as e:
        return None

## Create Event

In [None]:
def get_create_event_prompt(query):
    current_date = datetime.now().strftime('%B %d, %Y')
    return f"""# Instructions\n\nToday's date is {current_date}. For an event management SaaS product, extract the location and the date from the following user query. For dates that don't specify a year, always choose a date in the future:\n\n## User Query\n\n{query}"""
def get_create_event_function():
    return {
        "name": "event_details",
        "description": "Extract the location and date for an event.",
        "strict": True,
        "parameters": {
          "type": "object",
          "properties": {
            "location": {
              "type": "string"
            },
            "date": {
              "type": "string",
              "description": "The date formatted in YYYY-MM-DD"
            }
          },
          "required":["location","date"],
          "additionalProperties":False
        }
    }

def create_event_sql(location,date):
    try:
        cursor.execute('''
        INSERT INTO events (location, date)
        VALUES (?, ?)
        ''', (location, date))
        conn.commit()
        print(f"Event '{location}' created successfully.")
    except sqlite3.Error as e:
        print("Error creating event:", e)

# Function to create a new event
def create_event(query,chain=False):
    args = complete_function(
        get_create_event_prompt(query),
        get_create_event_function(),
        chain=chain
    )
    print(args)
    if args:
        location = args["location"]
        date = date_from_string(args["date"])
        return create_event_sql(location,date)

In [None]:
#create_event("Book the Ellison Lodge A for December 4th",chain=True)

## List Events

In [None]:
def get_list_events_prompt(query):
    current_date = datetime.now().strftime('%B %d, %Y')
    return f"""# Instructions\n\nToday's date is {current_date}. For an event management SaaS product, a user is asking to list upcoming events for which an optional date range can be provided. For dates that don't specify a year, always choose a date in the future. Extract the date range from the following user query:\n    \n{query}"""
def get_list_events_function():
    return {
        "name": "events_query",
        "description": "Filter parameters to query an events SQL table based on dates.",
        "strict": True,
        "parameters": {
          "type": "object",
          "properties": {
            "after_date": {
              "type": "string",
              "description": "The date formatted in YYYY-MM-DD"
            },
            "before_date": {
              "type": "string",
              "description": "The date formatted in YYYY-MM-DD"
            }
          },
          "required":["after_date","before_date"],
          "additionalProperties":False
        }
    }

def list_events_sql(after_date=None,before_date=None,output=True):
    query = 'SELECT id,location,date FROM events'
    params = []
    
    # Add conditions for filtering by date
    if after_date and before_date:
        query += ' WHERE date >= ? AND date <= ?'
        params.extend([after_date, before_date])
    elif after_date:
        query += ' WHERE date >= ?'
        params.append(after_date)
    elif before_date:
        query += ' WHERE date <= ?'
        params.append(before_date)
    
    # Execute the query with filters if any
    cursor.execute(query, params)
    events = cursor.fetchall()
    if events:
        if output:
            for ev in events:
                print(f"ID: {ev[0]}, Location: {ev[1]}, Date: {ev[2]}")
        return events
    else:
        print("No events found for the specified date range.")
    return None

# Function to list all events with optional date filters
def list_events(query,chain=False):
    args = complete_function(
        get_list_events_prompt(query),
        get_list_events_function(),
        chain=chain
    )
    print(args)
    if args:
        after_date=date_from_string(args["after_date"])
        before_date=date_from_string(args["before_date"])
        return list_events_sql(after_date=after_date,before_date=before_date)

In [None]:
list_events("highland",chain=True)

## Create Attendee

In [None]:
def get_events_enum():
    events = list_events_sql(output=False)
    events_enum = [f"""{e[1]} on {e[2]}""" for e in events]
    events_enum.append("other")
    return events_enum

def get_event_id(event_str):
    if event_str == "other":
        #Fallback spotted!  No event found
        return None
    events = list_events_sql(output=False)
    for ev in events:
        curr_event_str = f"""{ev[1]} on {ev[2]}"""
        if event_str == curr_event_str:
            #Found the event!
            return ev[0]
    return None

In [None]:
def get_create_attendee_prompt(query):
    current_date = datetime.now().strftime('%B %d, %Y')
    return f"""# Instructions\n\nToday's date is {current_date}. For an event management SaaS product, extract the event and the attendee name from the following user query:\n    \n{query}"""
def get_create_attendee_function(events_enum):
    return {
        "name": "attendee_rsvp_details",
        "description": "Extract the event details and the attendee names.",
        "strict": True,
        "parameters": {
          "type": "object",
          "properties": {
            "event": {
              "type": "string",
              "enum": events_enum
            },
            "attendees": {
              "type": "array",
              "items": {
                "type": "string"
              }
            }
          },
          "required":["event","attendees"],
          "additionalProperties":False
        }
    }

def create_attendee_sql(event_id, full_name):
    try:
        cursor.execute('''
        INSERT INTO attendees (event_id, full_name)
        VALUES (?, ?)
        ''', (event_id, full_name))
        conn.commit()
        print(f"Attendee '{full_name}' added to event ID {event_id}.")
    except sqlite3.Error as e:
        print("Error adding attendee:", e)

def create_attendee(query,chain=False):  
    events_enum = get_events_enum()
    if events_enum:
        args = complete_function(
            get_create_attendee_prompt(query),
            get_create_attendee_function(events_enum),
            chain=chain
        )
        print(args)
        if args:
            event_id = get_event_id(args["event"])
            if event_id:
                names = args["attendees"]
                for name in names:
                    create_attendee_sql(event_id,name)
    else:
        print("[create_attendee] No events found!  Please create an event first")
        return None

## List Attendees

In [None]:
def get_list_attendees_prompt(query):
    current_date = datetime.now().strftime('%B %d, %Y')    
    return f"""# Instructions\n\nToday's date is {current_date}. For an event management SaaS product, extract the event from the following user query:\n    \n{query}"""
def get_list_attendees_function(events_enum):
    return {
        "name": "event_query",
        "description": "Identify the event details",
        "strict": True,
        "parameters": {
          "type": "object",
          "properties": {
            "event": {
              "type": "string",
              "enum": events_enum
            }
          },
          "required":["event"],
          "additionalProperties":False
        }
    }

# Function to list attendees for a specific event
def list_attendees_sql(event_id):
    cursor.execute('SELECT id, full_name FROM attendees WHERE event_id = ?', (event_id,))
    attendees = cursor.fetchall()
    if attendees:
        for attendee in attendees:
            print(f"ID: {attendee[0]}, Name: {attendee[1]}")
        return attendees
    else:
        print(f"No attendees found for event ID {event_id}.")

def list_attendees(query,chain=False):
    events_enum = get_events_enum()
    if events_enum:
        args = complete_function(
            get_list_attendees_prompt(query),
            get_list_attendees_function(events_enum),
            chain=chain
        )
        if args:
            event_id = get_event_id(args["event"])
            return list_attendees_sql(event_id)
    else:
       print("[list_attendees] No events found!  Please create an event first")

# Main Agent Loop

1. Accept a user query
2. Classify the query task (create event, list events, etc..)
3. Call appropriate task method
4. Repeat

In [None]:
def fallback(query):
    print("Fallback!",query)
    
def run(query,chain=True):
    methods = {
        "create_event":create_event,
        "list_events":list_events,
        "create_attendee":create_attendee,
        "list_attendees":list_attendees,
        "other":fallback
    }
    print('--------')
    methods_enum = list(methods.keys())
    method_key = classify(methods_enum, query)
    method = methods[method_key]
    #print(method)
    method(query,chain=chain)

In [None]:
run("Book the Ellison Lodge A for December 4th")
run("Jane Doe is coming to the party on 12/4")
run("Pencil in highland park gazebo for june 11th")
run("Leah, Alice, and Fred are all coming to ellison in december")
run("Who is coming in december?")
run("What events are coming up?")

In [None]:
run("Steve's coming to highland park!")

In [None]:
run("highland list")

In [None]:
run("highland attendees")

## A (very) basic agent text input UI

In [None]:
while True:
    user_input = input("Enter a string (type 'exit' to quit): ")
    if user_input.lower() == "exit":
        break
    run(user_input)