# import libraries and setup environment

In [1]:
from openai import OpenAI 
import json 
from pprint import pprint
import time


In [2]:
# check open ai version 
import openai

print(openai.__version__)


1.3.7


In [3]:
# loads API keys from a local file
with open('api_keys.json', 'r') as file:
    config = json.load(file)

openai_api_key = config['OPENAI_API_KEY']

In [4]:
client = OpenAI(api_key=openai_api_key)

# Setup tooling 

In [5]:
tools = [{'type': 'retrieval'}]
tools

[{'type': 'retrieval'}]

In [6]:
# Custom function to Send an email
direction = {
    "name": "direction_to_move",
    "description": "Moves the forward, backward, left or right",
    "parameters": {
        "type": "object",
        "properties": {
            "direction" : {
                "type": "string",
                "description" : "choose the direction to move the robot, the options are forward, backward, left or right"
            },
            "speed": {
                "type": "integer",
                "description" : "speed of the robot, the range is between 0 and 100, if not option is given, choose the default 50"

            },
        }
    },
    "required": ["direction", "speed"]
}


In [7]:

tools.append({'type': 'function', 'function': direction})

print(tools)

[{'type': 'retrieval'}, {'type': 'function', 'function': {'name': 'direction_to_move', 'description': 'Moves the forward, backward, left or right', 'parameters': {'type': 'object', 'properties': {'direction': {'type': 'string', 'description': 'choose the direction to move the robot, the options are forward, backward, left or right'}, 'speed': {'type': 'integer', 'description': 'speed of the robot, the range is between 0 and 100, if not option is given, choose the default 50'}}}, 'required': ['direction', 'speed']}}]


# Create a robot agent (openai assistant)

In [None]:
# you can ignore this one as its already been created 
assistant = client.beta.assistants.create(
    name="Robot_Agent",
    instructions="you area four wheeled robot that can move forward, backward, left, and right, you are awaiting instructions from the user.",
    tools = tools,
    model="gpt-4-1106-preview"
)
print(assistant)

In [8]:
#call this anytime you want to update an existing assistant
assistant = client.beta.assistants.update (
    "asst_9AIjIzXOSklI4nDl9LC7ICMR",
    instructions="you area four wheeled robot that can move forward, backward, left, and right, you are awaiting instructions from the user.",
    tools=tools,
    model ="gpt-4-1106-preview")

print(assistant)

Assistant(id='asst_9AIjIzXOSklI4nDl9LC7ICMR', created_at=1701704986, description=None, file_ids=[], instructions='you area four wheeled robot that can move forward, backward, left, and right, you are awaiting instructions from the user.', metadata={}, model='gpt-4-1106-preview', name='Robot_Agent', object='assistant', tools=[ToolRetrieval(type='retrieval'), ToolFunction(function=FunctionDefinition(name='direction_to_move', parameters={'type': 'object', 'properties': {'direction': {'type': 'string', 'description': 'choose the direction to move the robot, the options are forward, backward, left or right'}, 'speed': {'type': 'integer', 'description': 'speed of the robot, the range is between 0 and 100, if not option is given, choose the default 50'}}}, description='Moves the forward, backward, left or right'), type='function')])


In [10]:
#to retrieve all your defined assistants
my_assistants = client.beta.assistants.list(
    order="desc",
    limit = "20"
)

pprint(my_assistants.data)

[Assistant(id='asst_9AIjIzXOSklI4nDl9LC7ICMR', created_at=1701704986, description=None, file_ids=[], instructions='you area four wheeled robot that can move forward, backward, left, and right, you are awaiting instructions from the user.', metadata={}, model='gpt-4-1106-preview', name='Robot_Agent', object='assistant', tools=[ToolRetrieval(type='retrieval'), ToolFunction(function=FunctionDefinition(name='direction_to_move', parameters={'type': 'object', 'properties': {'direction': {'type': 'string', 'description': 'choose the direction to move the robot, the options are forward, backward, left or right'}, 'speed': {'type': 'integer', 'description': 'speed of the robot, the range is between 0 and 100, if not option is given, choose the default 50'}}}, description='Moves the forward, backward, left or right'), type='function')]),
 Assistant(id='asst_7PKKgdYEgLT8V3vGPmZVreVV', created_at=1699368801, description=None, file_ids=['file-2A71jKOJp89WGd3Ua2mfCKHO', 'file-2mCiMGLV3KjUPnVstX4Teuz


# Create a the thread for the robot agent

A Thread represents a conversation session between the assistant and a user. It stores Messages, providing context for ongoing interactions.

In [11]:
# Creates a new conversation
thread = client.beta.threads.create()

In [12]:
# Create a new message in the thread as the user (role) 
# this doesnt run the model yet
message = client.beta.threads.messages.create(
    thread_id = thread.id,
    role = 'user',
    content='please move forward, then backward, then turn left, then turn right'
)
print(message)

ThreadMessage(id='msg_jQ1Ad82bkcGpWio92vPSbDsJ', assistant_id=None, content=[MessageContentText(text=Text(annotations=[], value='please move forward, then backward, then turn left, then turn right'), type='text')], created_at=1701708241, file_ids=[], metadata={}, object='thread.message', role='user', run_id=None, thread_id='thread_QdAC4o0lxc8iZlSI3VEzUlYr')


In [None]:
# this runs the model with the input from thread 
run = client.beta.threads.runs.create(
    thread_id = thread.id,
    assistant_id = assistant.id,
)

print(run.id)

In [None]:
# This retrieves the output from the model
run = client.beta.threads.runs.retrieve(
    thread_id = thread.id,
    run_id = run.id
)

print(run.status)

In [None]:
# This checks the run status and waits until the run is completed or has failed or requires action
# and then prints the status.  Basically it waits until the model has finished running
while run.status not in ["completed", "failed", "requires_action"]:
    run = client.beta.threads.runs.retrieve(
thread_id = thread.id,
run_id = run.id
)

print(run.status)
time.sleep(10)
print(run.status)

In [None]:
# This retrieves the output from the model
messages = client.beta.threads.messages.list(
    thread_id = thread.id,
)

for each in messages:
  pprint(each.role + ":" + each.content[0].text.value)

# Extract the tool from the robot agent 

In [None]:
# This runs the model with the input from thread
tools_to_call = run.required_action.submit_tool_outputs.tool_calls
print(len(tools_to_call))
print(tools_to_call)

In [None]:
# prints the 1st tool call name and arguments (there could be more than one tool call so this wont work for all)
print(tools_to_call[0].function.name)
print(tools_to_call[0].function.arguments)

In [None]:
# this prints for each tool call the name and arguments
tools_output_array = []
for each_tool in tools_to_call:
  tool_call_id = each_tool.id
  function_name = each_tool.function.name
  function_arg = each_tool.function.arguments
  print("Tool ID:" + tool_call_id)
  print("Function to Call:" + function_name )
  print("Parameters to use:" + function_arg)

  if (function_name == 'direction_to_move'):
    output=True

  tools_output_array.append({"tool_call_id": tool_call_id, "output": output})

print(tools_output_array)

# Use the tool and perform a action 

In [None]:
tool_calls = run.required_action.submit_tool_outputs.tool_calls
tool_calls

In [None]:
json.loads(tool_calls[0].function.arguments)["direction"]


In [None]:
def direction_to_move(direction, speed=50):
    return f"move {direction} at speed {speed}"

In [None]:
import json

def get_outputs_for_tool_call(tool_call):
    # calls apis, would need to change to call the function
    direction_to_move_robot = json.loads(tool_call.function.arguments)["direction"]
    #speed_to_move = json.loads(tool_calls[0].function.arguments)["speed"]
    robot_status = direction_to_move(direction_to_move_robot)
    return {
        "tool_call_id": tool_call.id,
        "output": robot_status
        }
tool_calls = run.required_action.submit_tool_outputs.tool_calls
tool_outputs = map(get_outputs_for_tool_call, tool_calls)


In [None]:
tool_outputs = list(tool_outputs)


In [None]:
tool_outputs

In [None]:
run = client.beta.threads.runs.submit_tool_outputs(
    thread_id=thread.id,
    run_id = run.id,
    tool_outputs = tool_outputs
)

In [None]:
# This retrieves the output from the model
messages = client.beta.threads.messages.list(
    thread_id = thread.id,
)

for each in messages:
  pprint(each.role + ":" + each.content[0].text.value)

# removes the agent (assistant)

In [None]:
# remove assistant 
response = client.beta.assistants.delete(assistant.id)
print(response)

<?xml version="1.0" encoding="UTF-8"?>
<DeviceInfo version="2.0" xmlns="http://www.hikvision.com/ver20/XMLSchema">
<deviceName>IP CAMERA</deviceName>
<deviceID>95690000-2059-11b3-8149-08a189a203a9</deviceID>
<deviceDescription>IPCamera</deviceDescription>
<deviceLocation>hangzhou</deviceLocation>
<systemContact>Hikvision.China</systemContact>
<model>DS-2CD2686G2-IZS</model>
<serialNumber>DS-2CD2686G2-IZS20210329AAWRF74547550</serialNumber>
<macAddress>08:a1:89:a2:03:a9</macAddress>
<firmwareVersion>V5.5.112</firmwareVersion>
<firmwareReleasedDate>build 210204</firmwareReleasedDate>
<encoderVersion>V7.3</encoderVersion>
<encoderReleasedDate>build 210204</encoderReleasedDate>
<bootVersion>V1.3.4</bootVersion>
<bootReleasedDate>100316</bootReleasedDate>
<hardwareVersion>0x0</hardwareVersion>
<deviceType>IPCamera</deviceType>
<telecontrolID>88</telecontrolID>
<supportBeep>false</supportBeep>
<supportVideoLoss>false</supportVideoLoss>
<firmwareVersionInfo>B-R-G5-0</firmwareVersionInfo>
</De