# Openai Assistants
> Functionality for making calls to OpenAI Assistants

In [None]:
#| default_exp assistant
import time

In [None]:
%load_ext autoreload
%autoreload

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
import os
import getpass
import time

In [None]:
#| export
class OpenAIAssistantManager:
    def __init__(self, client):
        self.client = client
        self.current_assistant = None
        self.current_thread = None

    def add_file(self, file_path, purpose='assistants'):
        with open(file_path, "rb") as file_data:
            return self.client.files.create(file=file_data, purpose=purpose)

    def create_assistant(self, name="Custom Teacher Utterances Classifier",
                            description="A tool for classifying teacher utterances into the categories OTR (opportunity to respond), PRS (praise), REP (reprimand), or NEU (neutral).",
                            instructions="""You are the co-founder of an ed-tech startup training an automated teacher feedback tool to classify utterances made. I am going to provide several sentences. 
                                            Please classify each sentence as one of the following: OTR (opportunity to respond), PRS (praise), REP (reprimand), or NEU (neutral)
        
                                            user: Can someone give me an example of a pronoun?
                                            assistant: OTR
                                            user: That's right, 'he' is a pronoun because it can take the place of a noun.
                                            assistant: PRS
                                            user: You need to keep quiet while someone else is reading.
                                            assistant: REP
                                            user: A pronoun is a word that can take the place of a noun.
                                            assistant: NEU

                                            Only answer with the following labels: OTR, PRS, REP, NEU""",
                            model="gpt-4-1106-preview",
                            tools=[{"type": "code_interpreter"}], file_id=None):
        assistant_kwargs = {
                "name": name,
                "description": description,
                "instructions": instructions,
                "model": model,
                "tools": tools if tools else []
            }

        if file_id:
            assistant_kwargs["file_ids"] = [file_id] if isinstance(file_id, str) else file_id

        self.current_assistant = self.client.beta.assistants.create(**assistant_kwargs)
        print(self.current_assistant.id)
        return self.current_assistant

    def create_custom_assistant(self, name="Custom Teacher Utterances Classifier",
                                description="A custom tool for classifying teacher utterances using gpt-3.5-turbo.",
                                instructions="""You are the co-founder of an ed-tech startup training an automated teacher feedback tool to classify utterances made. I am going to provide several sentences. 
                                            Please classify each sentence as one of the following: OTR (opportunity to respond), PRS (praise), REP (reprimand), or NEU (neutral)
                                            Only answer with the following labels: OTR, PRS, REP, NEU""",
                                model="gpt-3.5-turbo",
                                tools=[],
                                file_id=None):
        assistant_kwargs = {
            "name": name,
            "description": description,
            "instructions": instructions,
            "model": model,
            "tools": tools
        }

        if file_id:
            assistant_kwargs["file_ids"] = [file_id] if isinstance(file_id, str) else file_id
    
        self.current_assistant = self.client.beta.assistants.create(**assistant_kwargs)
        print(self.current_assistant.id)
        return self.current_assistant

    def retrieve_assistant(self, assistant_id):
        self.current_assistant = self.client.beta.assistants.retrieve(assistant_id)
        return self.current_assistant

    def create_thread(self, user_message, file_id=None):
        message = {
            "role": "user",
            "content": user_message
        }

        if file_id:
            message["file_ids"] = [file_id]

        self.current_thread = self.client.beta.threads.create(messages=[message])
        return self.current_thread
    
    def delete_thread(self, user_message, file_id=None):
        self.client.beta.threads.delete(thread_id=self.current_thread.id)
        return self.current_thread

    
    def send_message(self, message_content):
        if self.current_thread is None:
            raise Exception("No active thread. Create a thread first.")
        return self.client.beta.threads.messages.create(
            thread_id=self.current_thread.id,
            role = "user",
            content = message_content
        )
    
    def list_messages(self):
        if self.current_thread is None:
            raise Exception("No active thread. Create a thread first.")
        thread_messages = self.client.beta.threads.messages.list(thread_id=self.current_thread.id)
        return thread_messages
    
    # def submit_message(self, user_message):
    #     if self.current_thread is None or self.current_assistant is None:
    #         raise Exception("Assistant and Thread must be initialized before submitting a message.")
        
    #     self.client.beta.threads.messages.create(
    #         thread_id=self.current_thread.id, 
    #         role="user", 
    #         content=user_message
    #     )
    #     run = self.client.beta.threads.runs.create(
    #         thread_id=self.current_thread.id,
    #         assistant_id=self.current_assistant.id,
    #     )

    #     # Wait for the run to complete before returning
    #     return self.wait_on_run(run)

    def submit_message(self, user_message):
        if self.current_thread is None or self.current_assistant is None:
            raise Exception("Assistant and Thread must be initialized before submitting a message.")
        
        # Submit the user message
        self.client.beta.threads.messages.create(
            thread_id=self.current_thread.id, 
            role="user", 
            content=user_message
        )
        
        # Create a run for the submitted message
        run = self.client.beta.threads.runs.create(
            thread_id=self.current_thread.id,
            assistant_id=self.current_assistant.id,
        )

        # Wait for the run to complete before returning
        completed_run = self.wait_on_run(run)
        return completed_run
    

    def create_thread_and_run(self, user_input):
        # Create a new thread for each input
        self.current_thread = self.client.beta.threads.create()

        # Submit the message and wait for the run to complete
        run = self.submit_message(user_input)
        completed_run = self.wait_on_run(run)

        return self.current_thread, completed_run
    
    def wait_on_run(self, run):
        import time

        while run.status == "queued" or run.status == "in_progress":
            run = self.client.beta.threads.runs.retrieve(
                thread_id=self.current_thread.id,
                run_id=run.id,
            )
            time.sleep(0.5)
        return run
    
    def retrieve_message(self, message_content):
        if self.current_thread is None:
            raise Exception("No active thread. Create a thread first.")
        return self.client.beta.threads.messages.retrieve('message_id', thread_id=self.current_thread.id)

    def get_response(self):
        if self.current_thread is None:
            raise Exception("No active thread. Create a thread first.")
        return self.client.beta.threads.messages.list(thread_id=self.current_thread.id, order="asc")


In [None]:
def label_data(data, label):
    """
    Assigns the label provided by the user to the given data.

    Args:
    data (str): The text data that needs labeling.
    label (str): The label provided by the user.

    Returns:
    str: The label assigned to the data.
    """
    return label

In [None]:
def process_lines(lines, assistant_manager, context = ""):
    data = []
    additional_context = "Return a list of labels for each utterance separated by \n"
    context += additional_context
    assistant_manager.create_thread(context)

    all_lines = "\n ".join(lines)

    completed_run = assistant_manager.submit_message(all_lines)

    response_page = assistant_manager.get_response()

    messages = [msg for msg in response_page] 

    assistant_message = messages[-1].content[0].text.value

    labels = assistant_message.split("\n")
    # append tupe (line, label) to data using zip
    data = list(zip(lines, labels))
    
    return(data) 



In [None]:
def increase_batch_size(batch_size, accuracy):
    if accuracy <= 0.90:
        batch_size = 5
    elif accuracy > 0.90:
        batch_size = 10
    return batch_size

In [None]:
def collect_labels_for_initial_data(initial_data):
    context = "Here are more examples of how to classify utterances:"
    all_labeled_data = []
    for data in initial_data:
        label = label_data(data)
        context += f"\nuser: '{data}'\nassistant: '{label}'"
        all_labeled_data.append((data, label))
    context += "\nI am going to provide several more sentences..."
    return all_labeled_data, context



In [None]:
#| hide
import nbdev; nbdev.nbdev_export()