# Part 7

Univeral code that we will use for the entire notebook.

In [1]:
# required libraries
from openai import OpenAI

# libraries needed for streaming output
from typing_extensions import override
from openai import AssistantEventHandler

# additional libraries
import time


In [2]:
# Create an instance of the OpenAI class
# This assumes you have the OPENAI_API_KEY environment variable set
client = OpenAI()

## Creating Assistants, Threads, and Messages Review
Let's create a new assistant, thread, and some messages for us to use later on and to review the code for creating them.

### Creating an Assistant
First, let's make an Assistant we can use to communicate with our run.

In [3]:

# Create an assistant.
assistant = client.beta.assistants.create(
    model="gpt-4-turbo",
    instructions="You are a helpful assistant.",
    name="Run Tester Assistant",
    metadata={
        "holds_threads": "True",
        "likes_threads": "True",
        "holds_messages": "True",
        "likes_messages": "True",
    },
    temperature=1,
    top_p=1,
)

# Print the details of the created assistant to check the properties.
print(assistant)
print("\n\n")
print(assistant.name)
print(assistant.metadata)

Assistant(id='asst_OBpBoTcdpHy8H8b4I5i4fW9H', created_at=1715601574, description=None, instructions='You are a helpful assistant.', metadata={'holds_threads': 'True', 'likes_threads': 'True', 'holds_messages': 'True', 'likes_messages': 'True'}, model='gpt-4-turbo', name='Run Tester Assistant', object='assistant', tools=[], response_format='auto', temperature=1.0, tool_resources=ToolResources(code_interpreter=None, file_search=None), top_p=1.0)



Run Tester Assistant
{'holds_threads': 'True', 'likes_threads': 'True', 'holds_messages': 'True', 'likes_messages': 'True'}


### Creating a Thread
Now, let's create a Thread that can be used to hold our messages.

In [4]:
# Create a thread using the OpenAI API and store it in a variable
# The metadata specifies a user identifier
thread = client.beta.threads.create(
    metadata={
        "user": "abc123"
    }
)

# Output the result of the thread creation to the console
print(thread)


Thread(id='thread_8t0UWIg9frDPQy6yURFwIgN0', created_at=1715601574, metadata={'user': 'abc123'}, object='thread', tool_resources=ToolResources(code_interpreter=None, file_search=None))


### Creating a Message
Finally, let's create a Message that we can go into the Thread for use later.

In [5]:
# Create a message in a specific thread using the client's message creation method.
message = client.beta.threads.messages.create(
    thread_id=thread.id,  # ID of the thread where the message will be posted
    role="user",  # Role of the entity posting the message
    content="Tell me what a penguin is in 50 words or less.",  # The textual content of the message
    metadata={"key": "value"}  # Additional data associated with the message in key-value pairs
)

# Print the entire message object to view its details.
print(message)

# Print a blank line for better readability of the output.
print("\n")

# Print specific attributes of the message.
print(message.id)  # The unique identifier of the message
print(message.content)  # The content of the message
print(message.content[0].text.value)  # Assuming 'content' is a list of text objects, print the value of the first one
print(message.role)  # The role associated with the message

Message(id='msg_Hx86HSFHRo7S3NjwVetxWwsd', assistant_id=None, attachments=[], completed_at=None, content=[TextContentBlock(text=Text(annotations=[], value='Tell me what a penguin is in 50 words or less.'), type='text')], created_at=1715601574, incomplete_at=None, incomplete_details=None, metadata={'key': 'value'}, object='thread.message', role='user', run_id=None, status=None, thread_id='thread_8t0UWIg9frDPQy6yURFwIgN0')


msg_Hx86HSFHRo7S3NjwVetxWwsd
[TextContentBlock(text=Text(annotations=[], value='Tell me what a penguin is in 50 words or less.'), type='text')]
Tell me what a penguin is in 50 words or less.
user


## Creating Runs
Runs are the engine that makes things happen with the LLM. 

In [6]:

run = client.beta.threads.runs.create(
  assistant_id=assistant.id,
  thread_id=thread.id,
)

print(run)


Run(id='run_Y2eihObklxCN9VXWybwjn9We', assistant_id='asst_OBpBoTcdpHy8H8b4I5i4fW9H', cancelled_at=None, completed_at=None, created_at=1715601575, expires_at=1715602175, failed_at=None, incomplete_details=None, instructions='You are a helpful assistant.', last_error=None, max_completion_tokens=None, max_prompt_tokens=None, metadata={}, model='gpt-4-turbo', object='thread.run', required_action=None, response_format='auto', started_at=None, status='queued', thread_id='thread_8t0UWIg9frDPQy6yURFwIgN0', tool_choice='auto', tools=[], truncation_strategy=TruncationStrategy(type='auto', last_messages=None), usage=None, temperature=1.0, top_p=1.0, tool_resources={})


### Getting the Run Status
You can't just keep looking at the run result to get the current status

In [7]:
print(run)
print("\n")
print(run.status)

Run(id='run_Y2eihObklxCN9VXWybwjn9We', assistant_id='asst_OBpBoTcdpHy8H8b4I5i4fW9H', cancelled_at=None, completed_at=None, created_at=1715601575, expires_at=1715602175, failed_at=None, incomplete_details=None, instructions='You are a helpful assistant.', last_error=None, max_completion_tokens=None, max_prompt_tokens=None, metadata={}, model='gpt-4-turbo', object='thread.run', required_action=None, response_format='auto', started_at=None, status='queued', thread_id='thread_8t0UWIg9frDPQy6yURFwIgN0', tool_choice='auto', tools=[], truncation_strategy=TruncationStrategy(type='auto', last_messages=None), usage=None, temperature=1.0, top_p=1.0, tool_resources={})


queued


You have to get the run status by retrieving the run manually

In [8]:
runstatus = client.beta.threads.runs.retrieve(
    thread_id=thread.id,
    run_id=run.id,
)

print(runstatus)
print("\n")
print(runstatus.id)
print(runstatus.last_error)
print(runstatus.status)

Run(id='run_Y2eihObklxCN9VXWybwjn9We', assistant_id='asst_OBpBoTcdpHy8H8b4I5i4fW9H', cancelled_at=None, completed_at=None, created_at=1715601575, expires_at=1715602175, failed_at=None, incomplete_details=None, instructions='You are a helpful assistant.', last_error=None, max_completion_tokens=None, max_prompt_tokens=None, metadata={}, model='gpt-4-turbo', object='thread.run', required_action=None, response_format='auto', started_at=None, status='queued', thread_id='thread_8t0UWIg9frDPQy6yURFwIgN0', tool_choice='auto', tools=[], truncation_strategy=TruncationStrategy(type='auto', last_messages=None), usage=None, temperature=1.0, top_p=1.0, tool_resources={})


run_Y2eihObklxCN9VXWybwjn9We
None
queued


Or you can poll the status manually with a loop

In [9]:
# sleep for 10 seconds to let the run finish
# in case you want to run the entire notebook
time.sleep(10)

In [10]:
import openai
import time

another_run = client.beta.threads.runs.create(
  assistant_id=assistant.id,
  thread_id=thread.id,
)

# Retrieve initial run status
run_status = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=another_run.id)

# Poll the status while it's still queued or in progress
while run_status.status in ["queued", "in_progress"]:
    # Log current status
    print(f"Run status: {run_status.status}")
    
    # Wait for 1 second
    time.sleep(1)
    
    # Retrieve the status again
    run_status = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=another_run.id)

# Optionally, print the final status or any other detail
print("Final status:", run_status.status)


Run status: queued
Run status: in_progress
Run status: in_progress
Run status: in_progress
Final status: completed


The best option is to save yourself some code and just use the "create_and_poll" method to do the same thing. 

When interacting with the API some actions such as starting a Run and adding files to vector stores are asynchronous and take time to complete. The SDK includes helper functions which will poll the status until it reaches a terminal state and then return the resulting object. If an API method results in an action which could benefit from polling there will be a corresponding version of the method ending in '_and_poll'.

In [11]:
auto_run_and_poll = client.beta.threads.runs.create_and_poll(
    assistant_id=assistant.id,
    thread_id=thread.id,
)

print(auto_run_and_poll)
print("\n")
print(auto_run_and_poll.id)
print(auto_run_and_poll.status)

Run(id='run_qUnLdiGmLfNWu5VjMHtKycbn', assistant_id='asst_OBpBoTcdpHy8H8b4I5i4fW9H', cancelled_at=None, completed_at=1715601594, created_at=1715601590, expires_at=None, failed_at=None, incomplete_details=None, instructions='You are a helpful assistant.', last_error=None, max_completion_tokens=None, max_prompt_tokens=None, metadata={}, model='gpt-4-turbo', object='thread.run', required_action=None, response_format='auto', started_at=1715601590, status='completed', thread_id='thread_8t0UWIg9frDPQy6yURFwIgN0', tool_choice='auto', tools=[], truncation_strategy=TruncationStrategy(type='auto', last_messages=None), usage=Usage(completion_tokens=70, prompt_tokens=188, total_tokens=258), temperature=1.0, top_p=1.0, tool_resources={})


run_qUnLdiGmLfNWu5VjMHtKycbn
completed


### Creating a Simple Streaming Run
Instead of having to wait for the entire run to finish before getting a result, it is best to stream output to the user for interactive sessions.

In [12]:
# Create a new run with the stream option set to True
# Attempt to create a new run
stream_run = client.beta.threads.runs.create(
    thread_id=thread.id,
    assistant_id=assistant.id,
    stream=True
)

# Continue processing the new run
for event in stream_run:
    if hasattr(event.data, 'status'):  # Check if 'status' is an attribute of the event data
        print(event.data.id)
        print(event.data.status)
    else:
        print(f"Event ID: {event.data.id} does not have a status attribute.")
        print(event.data.delta)
    print("---------------\n")


run_jRf8v5e9wscmGtaVYv2oU2Vk
queued
---------------

run_jRf8v5e9wscmGtaVYv2oU2Vk
queued
---------------

run_jRf8v5e9wscmGtaVYv2oU2Vk
in_progress
---------------

step_g70kJOU9twsIZ2r5LTP8LNPi
in_progress
---------------

step_g70kJOU9twsIZ2r5LTP8LNPi
in_progress
---------------

msg_IbwhML6JqEVD65mhGggmv9Rc
in_progress
---------------

msg_IbwhML6JqEVD65mhGggmv9Rc
in_progress
---------------

Event ID: msg_IbwhML6JqEVD65mhGggmv9Rc does not have a status attribute.
MessageDelta(content=[TextDeltaBlock(index=0, type='text', text=TextDelta(annotations=[], value='A'))], role=None)
---------------

Event ID: msg_IbwhML6JqEVD65mhGggmv9Rc does not have a status attribute.
MessageDelta(content=[TextDeltaBlock(index=0, type='text', text=TextDelta(annotations=None, value=' p'))], role=None)
---------------

Event ID: msg_IbwhML6JqEVD65mhGggmv9Rc does not have a status attribute.
MessageDelta(content=[TextDeltaBlock(index=0, type='text', text=TextDelta(annotations=None, value='enguin'))], role=

### Dealing with Thread Locks

#### Notification of Failed Run
You can't have multiple runs on a thread. To deal with situations where this might occur, you can check to see if a run is already taking place and get feedback on your failed run.

In [13]:
# create a run to interfere with the other run
interference_run = client.beta.threads.runs.create(
        thread_id=thread.id,
        assistant_id=assistant.id,
        stream=True
    )

# Function to check if there is an active run
def check_active_run(thread_id):
    try:
        # Fetch runs for the thread and check their status
        active_runs = client.beta.threads.runs.list(thread_id=thread_id)
        for run in active_runs.data:
            if run.status in ["in_progress", "queued"]:
                return run.id
        return None
    except OpenAIError as e:
        print(f"Failed to check runs: {str(e)}")
        return None

# Get the active run ID, if any
active_run_id = check_active_run(thread.id)
if active_run_id:
    print(f"Thread already has an active run: {active_run_id}. Please wait until it completes.")
else:
    try:
        # Attempt to create a new run if no active run exists
        stream_run = client.beta.threads.runs.create(
            thread_id=thread.id,
            assistant_id=assistant.id,
            stream=True
        )

        # Continue processing the new run
        for event in stream_run:
            if hasattr(event.data, 'status'):  # Check if 'status' is an attribute of the event data
                print(event.data.id)
                print(event.data.status)
            else:
                print(f"Event ID: {event.data.id} does not have a status attribute.")
                print(event.data.delta)  # Ensure that event.data.delta exists or handle it similarly
            print("---------------\n")

    except OpenAIError as e:  # Handle generic OpenAIError if BadRequestError is not explicitly available
        print(f"Error occurred: {e}")
        if "already has an active run" in str(e):
            print("A run is already active on this thread. Please wait until it completes.")

Thread already has an active run: run_JZBvfwHn7Pzr9pPA470fhfWY. Please wait until it completes.


In [14]:
# sleep for 10 seconds to let the run finish
# in case you want to run the entire notebook
time.sleep(10)

#### Waiting Your Turn
A better strategy is to check to see if there is already a run going on and just poll until the run is complete then do your run afterward

In [15]:

# create a run to interfere with the other run
interference_run = client.beta.threads.runs.create(
        thread_id=thread.id,
        assistant_id=assistant.id,
        stream=True
    )

# Function to check if there is an active run
def check_active_run(thread_id):
    try:
        # Fetch runs for the thread and check their status
        active_runs = client.beta.threads.runs.list(thread_id=thread_id)
        for run in active_runs.data:
            if run.status in ["in_progress", "queued"]:
                return True
        return False
    except OpenAIError as e:
        print(f"Failed to check runs: {str(e)}")
        return False

# Wait until there is no active run
while check_active_run(thread.id):
    print("Waiting for the existing run to complete...")
    time.sleep(10)  # Wait for 10 seconds before checking again

# Once no active runs are detected, proceed to create a new run
try:
    print("Creating a new run...")
    stream_run = client.beta.threads.runs.create(
        thread_id=thread.id,
        assistant_id=assistant.id,
        stream=True
    )

    # Continue processing the new run
    for event in stream_run:
        if hasattr(event.data, 'status'):  # Check if 'status' is an attribute of the event data
            print(event.data.id)
            print(event.data.status)
        else:
            print(f"Event ID: {event.data.id} does not have a status attribute.")
            print(event.data.delta)  # Ensure that event.data.delta exists or handle it similarly
        print("---------------\n")

except OpenAIError as e:
    print(f"Error occurred: {e}")


Waiting for the existing run to complete...
Creating a new run...
run_8Dop5QdL25XCtKtU86M0HzmM
queued
---------------

run_8Dop5QdL25XCtKtU86M0HzmM
queued
---------------

run_8Dop5QdL25XCtKtU86M0HzmM
in_progress
---------------

step_miKsQeXbwOM15cgmztLsopH6
in_progress
---------------

step_miKsQeXbwOM15cgmztLsopH6
in_progress
---------------

msg_xZhTAIJFA7W2wdZs0roU0gs0
in_progress
---------------

msg_xZhTAIJFA7W2wdZs0roU0gs0
in_progress
---------------

Event ID: msg_xZhTAIJFA7W2wdZs0roU0gs0 does not have a status attribute.
MessageDelta(content=[TextDeltaBlock(index=0, type='text', text=TextDelta(annotations=[], value='A'))], role=None)
---------------

Event ID: msg_xZhTAIJFA7W2wdZs0roU0gs0 does not have a status attribute.
MessageDelta(content=[TextDeltaBlock(index=0, type='text', text=TextDelta(annotations=None, value=' p'))], role=None)
---------------

Event ID: msg_xZhTAIJFA7W2wdZs0roU0gs0 does not have a status attribute.
MessageDelta(content=[TextDeltaBlock(index=0, type

## Getting the Output from the Model

### Output After Polling
If we do polling, we can get the output by fetching the last Assistant message from the run when it is done. The last message should be the Assistant's answer to our query. 

In [16]:
# Create a single run 
fresh_run = client.beta.threads.runs.create_and_poll(
    assistant_id=assistant.id,
    thread_id=thread.id,
)

# Retrieve messages from the thread
messages = client.beta.threads.messages.list(thread_id=thread.id)

# Get the latest assistant message 
latest_assistant_message = None
for message in messages.data:
    if message.role == 'assistant' and message.run_id == fresh_run.id:
        latest_assistant_message = message
        break

# Print the latest response
if latest_assistant_message:
    print("Output:\n" + latest_assistant_message.content[0].text.value) 
else:
    print("No assistant message found for the run.")

Output:
A penguin is a flightless bird native primarily to the Southern Hemisphere, especially Antarctica. Characterized by their black and white plumage and upright posture, penguins are excellent swimmers, using their wings as flippers to navigate icy waters. They primarily feed on fish and live in large, social colonies.


### Output While Streaming
Conversly, when streaming, we want to show the output right away. We do this using events from the stream to show the output as it is produced. 

In [17]:
# First, we create a EventHandler class to define
# how we want to handle the events in the response stream.
# Normally, you would define this class at the top of your script or in a separate file.
class EventHandler(AssistantEventHandler):    
  @override
  def on_text_created(self, text) -> None:
    print(f"\nassistant text > ", end="", flush=True)
      
  @override
  def on_text_delta(self, delta, snapshot):
    print(delta.value, end="", flush=True)
      
  def on_tool_call_created(self, tool_call):
    print(f"\nassistant tool > {tool_call.type}\n", flush=True)
  
  def on_tool_call_delta(self, delta, snapshot):
    if delta.type == 'code_interpreter':
      if delta.code_interpreter.input:
        print(delta.code_interpreter.input, end="", flush=True)
      if delta.code_interpreter.outputs:
        print(f"\n\noutput >", flush=True)
        for output in delta.code_interpreter.outputs:
          if output.type == "logs":
            print(f"\n{output.logs}", flush=True)

In [20]:
 
# Then, we use the `stream` SDK helper 
# with the `EventHandler` class to create the Run 
# and stream the response.
with client.beta.threads.runs.stream(
  thread_id=thread.id,
  assistant_id=assistant.id,
  event_handler=EventHandler(),
) as stream:
  stream.until_done()


assistant text > A penguin is a flightless bird native to mostly the Southern Hemisphere, particularly Antarctica. Known for their distinct black and white plumage and ability to endure cold climates, they are superb swimmers and divers. Penguins feed on krill, fish, and squid. They live in large colonies for breeding and molting.

## Modifying Assistants and Threads with Runs
You can make changes in real-time to your Runs that override or modify existing setting on Assistants and Threads.

### Runs and Assistants
Let's see how we can make changes to the Assistant settings for our Runs.

In [25]:
# Show the Assistant instructions
print("Assistant Instructions: " + assistant.instructions)
print("\n")

# Create a run to modify Assistant instructions
additional_instruction_run = client.beta.threads.runs.create_and_poll(
    assistant_id=assistant.id,
    thread_id=thread.id,
    additional_instructions="That speaks like a pirate.",
)

print("Run Instructions: " + additional_instruction_run.instructions)
print("\n")

# Retrieve messages from the thread
messages = client.beta.threads.messages.list(thread_id=thread.id)

# Get the latest assistant message 
latest_assistant_message = None
for message in messages.data:
    if message.role == 'assistant' and message.run_id == additional_instruction_run.id:
        latest_assistant_message = message
        break

# Print the latest response
if latest_assistant_message:
    print("Output:\n" + latest_assistant_message.content[0].text.value.strip()) 
else:
    print("No assistant message found for the run.")

# add some space
print("\n")


# print the original assistant instructions
print("Assistant Instructions: " + assistant.instructions)

Assistant Instructions: You are a helpful assistant.


Run Instructions: You are a helpful assistant. That speaks like a pirate.


Output:
In the heart of a vibrant forest lived a magical frog named Gizmo with the power to change colors at will. One autumn day, a lost, small bear cub stumbled upon Gizmo, distraught and crying. Sensing the cub’s distress, Gizmo transformed into a soothing blue and sang a magical, calming melody. The sound guided the cub's mother back to her baby. Thankful and amazed, the bear family watched as Gizmo faded back into the green forest landscape, promising to always guard the creatures of his home. Gizmo became a legend, a guardian symbolizing hope and protection within the whispers of the forest.


Assistant Instructions: You are a helpful assistant.


## Controlling Output
We can detemine the temperature and top_p for our runs and override the Assistant settings.

In [23]:
# show the Assistant temperature
print(assistant.instructions)
print(assistant.temperature)

# Then, we use the `stream` SDK helper 
# with the `EventHandler` class to create the Run 
# and stream the response.
with client.beta.threads.runs.stream(
  thread_id=thread.id,
  assistant_id=assistant.id,
  additional_messages=[
    {
      "role": "user",
      "content": "Write a story about a magical frog in a forest in 100 words less."
    }
  ],
  temperature=0,
  event_handler=EventHandler(),
) as stream:
  stream.until_done()

print("\n")

# Now we change the temperature with our run
with client.beta.threads.runs.stream(
  thread_id=thread.id,
  assistant_id=assistant.id,
  additional_messages=[
    {
      "role": "user",
      "content": "Write a story about a magical frog in a forest in 100 words less."
    }
  ],
  temperature=1.2,
  event_handler=EventHandler(),
) as stream:
  stream.until_done()

You are a helpful assistant.
1.0

assistant text > In the heart of an enchanted forest, there lived a magical frog named Jasper who could turn invisible. Jasper loved playing tricks on the other forest creatures, hiding and then reappearing with a loud "Ribbit!" One day, while invisible, he stumbled upon a lost, scared kitten. Realizing the kitten needed help, Jasper decided to use his magic for good. He guided the kitten out of the forest, staying visible to comfort her. The grateful kitten nuzzled him as they reached the edge. From that day on, Jasper used his powers to help others, becoming a beloved guardian of the forest.


assistant text > Deep within a verdant forest lived a magical frog named Theo, who had the gift of elemental control. Shunning his powers, Theo preferred simple solitude—until a fire threatened his woodland home. As the inferno closed in, Theo leaped into action, summoning rain to quench the flames and winds to guide the smoke away. His actions saved countless 