# metacognitive prompting

### import statements

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os
from dotenv import load_dotenv
import sys
import queue

load_dotenv()

In [None]:
sys.path.append(os.getenv("SYS_PATH"))
print(f'{os.getenv("SYS_PATH")}')
print(sys.path)

In [None]:
from openai import OpenAI, OpenAIError
from ipywidgets import widgets
from IPython.display import display, clear_output
from bson.objectid import ObjectId
from typing import Optional
import threading

from models.user import create_user
from models.chat_message import create_chat_message, ChatMessageData
from models.conversation import create_conversation
from models.chatbot_response import create_chatbot_response
from models.expectation import create_expectation
from models.expectation_revision import create_expectation_revision
from models.violation import create_violation

from utils.model_operations import create_model, get_model, get_chat_history
from services.mock_service import mock_user_data, mock_expectation_data, mock_expectation_revision_data, mock_message_data, mock_violation_data, mock_chatbot_response_data
from utils.helpers import CustomOutput

In [None]:
%run ../services/mongo_service.py

In [None]:
from services.mongo_service import mongo_client, ping_client
db_client = mongo_client()
ping_client(db_client)

In [None]:
text_input = ""
chat_output = CustomOutput()
user_input_queue = queue.Queue()

set_user = None
set_conversation = None
chat_history = []

button_click_event = threading.Event()

In [None]:
test_user = create_user(email=mock_user_data["email"], password=mock_user_data["password"])
print(f"test_user: {test_user}\nis of type{type(test_user)}")
saved_user_id = create_model(collection_name="users", model_data=test_user, client=db_client)

In [None]:
test_conversation = create_conversation(user_id=saved_user_id)
saved_conversation_id = create_model(collection_name="conversations", model_data=test_conversation, client=db_client)

In [None]:
test_expectation = create_expectation(reasoning=mock_expectation_data["reasoning"], user_predictions=mock_expectation_data["user_predictions"], additional_data=mock_expectation_data["additional_data"])
saved_expectation_id = create_model(collection_name="expectations", model_data=test_expectation, client=db_client)

In [None]:
test_expectation_revision = create_expectation_revision(revised_input_possibilities=mock_expectation_revision_data["revised_input_possibilities"], prediction_error=-0.045, initial_expectation_id=saved_expectation_id)
saved_expectation_revision_id = create_model(collection_name="expectation_revisions", model_data=test_expectation_revision, client=db_client)

In [None]:
test_message = create_chat_message(user_id=saved_user_id, content=mock_message_data["content"], conversation_id=saved_conversation_id)
saved_message_id = create_model(collection_name="chat_messages", model_data=test_message, client=db_client)

In [None]:
test_violation = create_violation(last_llm_response_id=None, expectation_id=saved_expectation_id, voe_thought=mock_violation_data["voe_thought"])
saved_violation_id = create_model(collection_name="violations", model_data=test_violation, client=db_client)

In [None]:
get_model(collection_name="users", model_id=saved_user_id, client=db_client)

In [None]:
print(mock_expectation_data["reasoning"])

In [None]:
apikey = os.getenv("OPENAI_API_KEY")
openai_client = OpenAI(
    api_key=apikey
)
print(openai_client)

In [None]:
def stream_chatbot(message) -> str:
    try:
        chat_completion_stream = openai_client.chat.completions.create(
            messages=[
                {
                    "role":"user",
                    "content":message,
                 },
            ],
            model="gpt-3.5-turbo",
            stream=True
        )
        print(chat_completion_stream)
        message = ""
        chat_output.append_stdout(f"first 'message' value in stream_chatbot: {message}", debug=True)
        chat_output.append_stdout(f"Chatbot: ", stream=True)
        for chunk in chat_completion_stream:
            if chunk.choices[0].delta.content is not None:
                chat_output.append_stdout(f"{chunk.choices[0].delta.content}", stream=True)
                message += chunk.choices[0].delta.content
        chat_output.append_stdout(f"\n")
        
        chat_output.append_stdout(f"final 'message' value in stream_chatbot: {message}", debug=True)
        
        return message
    except OpenAIError as e:
        #Handle API error here, e.g. retry or log
        print(f"OpenAI API returned an API Error: {e}")
        raise e

In [None]:
def chatbot(user_message) -> str:
    message = stream_chatbot(user_message)
    return message

In [None]:
def print_user_message(user_message):
    chat_output.append_stdout(f"You: {user_message}")

In [None]:
def handle_user_message(user_message):
    chatbot(user_message)

In [None]:
def handle_button_click():
    chat_output.append_stdout(f"handle button clicked", debug=True)
    chat_output.append_stdout(f"text_input.value: {text_input.value}", debug=True)
    user_input_queue.put(text_input.value)
    text_input.value=''
    chat_output.append_stdout(f"handle button clicked after setting queue value", debug=True)
        

In [None]:
def initialize_chat(user_id: ObjectId, conversation_id: Optional[ObjectId] = None):
    global set_user, set_conversation, chat_history
    set_user = get_model(collection_name="users", model_id=saved_user_id, client=db_client)
    if not conversation_id:
        new_conversation = create_conversation(user_id=user_id)
        conversation_id = create_model(collection_name="conversations", model_data=new_conversation, client=db_client)
    set_conversation = get_model(collection_name="conversations", model_id=conversation_id, client=db_client)
    
    chat_history = get_chat_history(conversation_id=conversation_id, client=db_client)
    


In [ ]:
def print_chat_history():
    for item in chat_history:
        if item.get('user_id'):
            print_user_message(item.get('content'))
        else:
            chat_output.append_stdout(f"Chatbot: ", stream=True)
            chat_output.append_stdout(item.get('content'))


In [None]:
def user_prediction_task() -> dict:
    # invoke reasoning process with Engagement Monitor Service
    reasoning = mock_expectation_data["reasoning"]
    
    # invoke user predictions with LLM Service
    user_predictions = mock_expectation_data["user_predictions"]
    
    # invoke vector db fact fetching with Knowledge Assessment Service
    additional_data = mock_expectation_data["additional_data"]
    
    # create and save Expectation
    new_expectation = create_expectation(reasoning=reasoning, user_predictions=user_predictions, additional_data=additional_data)
    expectation_id = create_model(collection_name="expectations", model_data=new_expectation, client=db_client)
    
    # invoke prediction improvement with LLM Service
    improved_predictions = mock_expectation_revision_data["revised_input_possibilities"]
    
    # invoke prediction error calculation with ??? Service
    prediction_error = 0.0
    
    # create and save ExpectationRevision
    new_expectation_revision = create_expectation_revision(revised_input_possibilities=improved_predictions, prediction_error=prediction_error, initial_expectation_id=expectation_id)
    expectation_revision_id = create_model(collection_name="expectation_revisions", model_data=new_expectation_revision, client=db_client)
    
    expectation_revision_dict = get_model(collection_name="expectation_revisions", model_id=expectation_revision_id, client=db_client)
    
    chat_output.append_stdout(f"USER PREDICTION TASK COMPLETED\nexpectation_revision_dict ID: {expectation_revision_dict['_id']}", debug=True)
    
    return expectation_revision_dict
    

In [None]:
def user_input_task() -> dict:
    chat_output.append_stdout(f"USER INPUT TASK", debug=True)
    chat_output.append_stdout(f"user_input_queue: {user_input_queue.__dict__}", debug=True)
    
    try:
        user_input = user_input_queue.get()
        
        print_user_message(user_input)
        
        new_chat_message = create_chat_message(user_id=set_user['_id'], content=user_input, conversation_id=set_conversation['_id'])
        chat_message_id = create_model(collection_name="chat_messages", model_data=new_chat_message, client=db_client)
        chat_message_dict = get_model(collection_name="chat_messages", model_id=chat_message_id, client=db_client)
        
        chat_output.append_stdout(f"USER INPUT TASK COMPLETED\nchat message dict: {chat_message_dict}", debug=True)
        
        return chat_message_dict
    
    except Exception as e:
        raise e

In [None]:
def voe_task(expectation_id: ObjectId) -> dict:
    try:
        # invoke violation of expectation with LLM service
        voe_thought = mock_violation_data['voe_thought']
        
        # implement DB call
        last_llm_response = None
        
        new_violation = create_violation(last_llm_response_id=last_llm_response, expectation_id=expectation_id, voe_thought=voe_thought)
        violation_id = create_model(collection_name="violations", model_data=new_violation, client=db_client)
        violation_dict = get_model(collection_name="violations", model_id=violation_id, client=db_client)
        
        return violation_dict
    
    except Exception as e:
        raise e

In [None]:
def chatbot_response_task(response_to_id: ObjectId):
    try:
        user_message_dict = get_model(collection_name="chat_messages", model_id=response_to_id, client=db_client)
        chat_output.append_stdout(f"{user_message_dict}", debug=True)
        user_message_model: ChatMessageData = user_message_dict["model"]
        chat_output.append_stdout(f"user_message_model: {user_message_model}",debug=True)
        
        # invoke prompt creation with useful information
        
        # invoke chatbot response with LLM Service
        generated_response = chatbot(user_message_model.content)
        chat_output.append_stdout(f"generated_response: {generated_response}", debug=True)
        
        # save final API output as "content"
        content = generated_response
        
        new_chatbot_response = create_chatbot_response(content=content, conversation_id=set_conversation['_id'], response_to_id=response_to_id)
        response_id = create_model(collection_name="chatbot_responses", model_data=new_chatbot_response, client=db_client)
        response_dict = get_model(collection_name="chatbot_responses", model_id=response_id, client=db_client)
        
        return response_dict
    
    except Exception as e:
        raise e

In [None]:
def conversation_loop():
    global text_input
    active = True
    text_input = widgets.Text(placeholder='Type your message here...')
    submit_button = widgets.Button(description='Submit')
    exit_button = widgets.Button(description='Exit')

    def on_submit_button_clicked(b):
        with chat_output:
            # invoke user input task and return user message
            handle_button_click()
            if active:
                message_dict = user_input_task()
                chat_output.append_stdout(f"Received message_dict: {message_dict}", debug=True)                
                message_id = message_dict["_id"]
    
                # invoke user prediction task and return prediction object
                expectation_dict = user_prediction_task()
                chat_output.append_stdout(f"Received prediction: {expectation_dict}", debug=True)
                expectation_id = expectation_dict["_id"]
    
                # invoke violation of expectation task and return violation object
                violation_dict = voe_task(expectation_id=expectation_id)
                chat_output.append_stdout(f"Received violation: {violation_dict}", debug=True)
                
                # invoke vector db fact storing with Knowledge Assessment Service
                
                # invoke chatbot response task and return chatbot message
                chatbot_response_dict = chatbot_response_task(message_id)
                chat_output.append_stdout(f"Received response: {chatbot_response_dict}", debug=True)

    submit_button.on_click(on_submit_button_clicked)

    def on_exit_button_clicked(b):
        nonlocal active
        with chat_output:
            active = False
            chat_output.clear_output()
            chat_output.append_stdout("Exiting conversation loop...")

    exit_button.on_click(on_exit_button_clicked)

    display(widgets.VBox([chat_output, text_input, submit_button, exit_button]))

  

In [None]:
def main():
    initialize_chat(saved_user_id, ObjectId('65f1c96cc63f2e5bb56d6976'))
    chat_output.append_stdout(f"user: {set_user}\nconversation: {set_conversation}")
    print_chat_history()
    conversation_loop()

if __name__ == "__main__": main()