In [1]:
import openai
from dotenv import load_dotenv
import os
import psycopg2
import uuid
import tiktoken

In [2]:
# load environment variables from .env file
load_dotenv()

# get ChatGPT key
openai.api_key = os.getenv("OPENAI_API_KEY")

# get database configuration
db_config = {
    "host": os.getenv("DB_HOST"),
    "database": os.getenv("DB_NAME"),
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD")
}

# define configuration variables for API call
completion_config = {
    "GPT_MODEL": "gpt-4o-mini",
    "TEMPERATURE": 0,
    "MAX_TOKENS": 500,
    "MAX_INPUT_TOKENS": 8000, # Adjust based on the model's token limit
}

In [3]:
# test db connection
with psycopg2.connect(**db_config) as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT version();")
        version = cursor.fetchone()
        print("PostgreSQL Version:", version[0])

PostgreSQL Version: PostgreSQL 16.4 (Ubuntu 16.4-0ubuntu0.24.04.2) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 13.2.0-23ubuntu4) 13.2.0, 64-bit


In [None]:
encoding = tiktoken.encoding_for_model(completion_config["GPT_MODEL"])

In [None]:
def complete(messages, **kwargs):
    # Load configuration settings and allow overrides via kwargs
    model = kwargs.get("model", completion_config["GPT_MODEL"])
    temperature = kwargs.get("temperature", completion_config["TEMPERATURE"])
    max_tokens = kwargs.get("max_tokens", completion_config["MAX_TOKENS"])
    max_input_tokens = kwargs.get("max_input_tokens", completion_config["MAX_INPUT_TOKENS"])

    # Calculate total token count for input messages
    total_tokens = sum(len(encoding.encode(msg["content"])) for msg in messages)
    token_usage = total_tokens + max_tokens

    # Check if token usage is close to the limit (90% warning)
    warning_threshold = int(max_input_tokens * 0.9)
    if token_usage > warning threshold:
        print(f"WARNING: Token usage is at {token_usage}/{max_input_tokens} tokens (90% of the limit).")

    # Check if input tokens exceed the limit
    if token_usage > max_input_tokens:
        raise ValueError("Input messages exceed the maximum allowed token limit.")
    
    # Make the API call
    try:
        completion = openai.ChatCompletion.create(
            model = model,
            messages = messages,
            temperature = temperature,
            max_tokens = max_tokens,
        )
        return completion

    except openai.error.OpenAIError as e:
        print("Error during API call:", e)
        return None

In [None]:
class Assistant:
    def __init__(self):
        self.session_id = None
        self.messages = []
        self.start_new_session()

    def start_new_session(self):
        """Start a new session with a unique session ID."""
        self.session_id = str(uuid.uuid4())
        print(f"Started a new session with ID: {self.session_id}")

    def create_conversation(self, user_id, system_prompt):
        """Create a new conversation with the given system prompt"""
        with psycopg2.connect(**db_config) as conn:
            with conn.cursor() as cursor:
                cursor.execute(
                    """
                    INSERT INTO conversations (user_id, session_id, system_prompt, depth, conv_status)
                    VALUES (%s, %s, %s, %s, %s)
                    RETURNING id;
                    """,
                    (user_id, self.session_id, system_prompt, 0, 'active')
                )
                conversation_id = cursor.fetchone()[0]
                print(f"Created a new conversation with ID: {conversation_id}")
                return conversation_id

    def add_message(self, conversation_id, role, content, GPTmodel):
        """Store a message in the database with accurate token counting."""
        with psycopg2.connect(**db_config) as conn:
            with conn.cursor() as cursor:
                # Count tokens using the OpenAI tokenizer
                encoding = tiktoken.encoding_for_model(GPTmodel)
                token_count = len(encoding.encode(content))

                # Insert the message into the database
                cursor.execute(
                    """
                    INSERT INTO messages (conversation_id, role, content, token_count)
                    VALUES (%s, %s, %s, %s);
                    """,
                    (conversation_id, role, content, token_count)
                )
                print(f"Added message to conversation {conversation_id} with {token_count} tokens.")

    def load_history(self, conversation_id):
        """Load the entire message history of a conversation"""
        with psycopg2.connect(**db_config) as conn:
            with conn.cursor() as cursor:
                cursor.execute(
                    """
                    SELECT role, content FROM messages
                    WHERE conversation_id = %s
                    ORDER BY id;
                    """,
                    (conversation_id,)
                )
                history = cursor.fetchall()
                return [{"role": role, "content": content} for role, content in history]
    
    def query(self, conversation_id, user_input):
        """ Handle a user query and make API call."""
        self.add_message(conversation_id, "user", user_input)

        # Load the full conversation history
        self.messages = self.load_history(conversation_id)
        
        # Query the API
        try:
            completion = complete(self.messages, **kwargs)
            content = completion.choices[0].message["content"]

            # Add the assistant's response to the conversation history
            self.add_message(conversation_id, "assistant", content)

            # Print and returnn the assistant's response
            print(content)
            return content

        except Exception as e:
            print("Error during API call:", e)
            return "An error occurred while processing your request."

    def replay_with_modified_prompt(self, conversation_id, new_system_prompt, user_id):
        """Replay the conversation history with a new system prompt"""
        # Load the original conversation history
        history = self.load_history(conversation_id)

        # Create a new conversation under the current session
        new_conversation_id = self.create_conversation(user_id, new_system_prompt)

        # Add the new system prompt
        self.add_message(new_conversation_id, "system", new_system_prompt)

        # Replay user messages with the new system prompt
        for message in history:
            if message["role"] == "user":
                user_input = message["content"]
                self.query(new_conversation_id, user_input)

        print(f"Replayed conversation created with ID: {new_conversation_id}")
        return new_conversation_id
        

In [None]:
bot = Assistant()