# core

> Fill in a module description here


In [1]:
# | default_exp logger

In [2]:
# | hide
import nbdev

nbdev.nbdev_export()

In [3]:
#| export

from typing import List, Dict, Any, Optional, Union, Generator

from websockets.sync.client import connect as ws_connect

from lovely_prompts.utils import max_tokens_for_model

from lovely_prompts_server.models import (
    WSMessage,
    ChatMessage,
    #
    ChatPrompt,
    ChatResponse,
    CompletionPrompt,
    CompletionResponse,
    #
    ChatPromptModel,
    CompletionPromptModel,
    ChatResponseModel,
    CompletionResponseModel,
)



In [7]:
import requests

In [13]:
class PartialSession(requests.Session):
    def __init__(self, base_url: Union[str, None], project: Union[str, None]):
        super().__init__()
        self.base_url = base_url
        self.project = project

    def request(self, method, url, **kwargs):
        if self.project is not None:
            params = {"project": self.project} | kwargs.get("params", {})
            kwargs["params"] = params
        return super().request(method, self.base_url + url, **kwargs)

In [14]:
from requests import HTTPError

In [15]:
class Logger:
    def __init__(
        self,
        start_server=False,
        port: int = 1337,
        project: Optional[str] = None,
    ):
        self.url_base = "http://localhost:" + str(int(port))
        self.ws_url_base = self.url_base.replace("http", "ws")

        self.project = project

        self.session = PartialSession(self.url_base, self.project)

        self.enabled = False
        res = self.session.get(f"/version/")
        if res.status_code != 200:
            print(f"Failed to get server version, status code: {res.status_code}")
            print(f"This logger is now disabled. Enable with `.enable()`.")
        else:
            self.local_server_version = res.json()
            self.enabled = True

        if start_server:
            assert 0, "Not implemented yet. Start the server manually."
        #     print("Starting server...")
        #     import uvicorn
        #     from lovely_prompts_server import app

        #     try:
        #         loop = asyncio.get_running_loop()
        #         config = uvicorn.Config(app, host="localhost", port=port)
        #         server = uvicorn.Server(config)
        #         loop.create_task(server.serve())
        #     except Exception as e:
        #         print(e)

    def enable(self):
        self.enabled = True

    def log_entry(self, endpoint, data):
        try:
            response = self.session.post(endpoint, data=data.model_dump_json(), timeout=1)
            response.raise_for_status()

        except HTTPError as e:
            print(f"Failed to log row, status code: {response.status_code}: {response.text}.")
            print(f"This logger is now disabled. Enable with `.enable()`.")
            self.enabled = False

        else:
            entry_id = response.json()["id"]
            print(f"Logged {data.__class__.__name__} to {endpoint} as {entry_id}.")
            return entry_id

    def log_chat_prompt(
        self,
        prompt: ChatPrompt,
    ):
        return self.log_entry("/chat_prompts/", prompt)


    def log_chat_response(
        self,
        response: ChatResponse,
    ) -> int:
        if not response.tok_max:
            response.tok_max = max_tokens_for_model(response.model)

        return self.log_entry("/chat_responses/", response)

    def stream_chat_response_contents(
            self,
            prompt_id: str,
            response_id: str,
            response_generator: Generator[WSMessage, None, None],
    ) -> ChatResponse:

        tok_out = 0
        with ws_connect(f"{self.ws_url_base}/chat_responses/{response_id}/update_stream/") as connection:
            for response in response_generator:
                # print("sending", response)
                if response.action == "append" and response.key == "content":
                    tok_out += 1

                response.prompt_id = prompt_id
                response.id = response_id
                connection.send(response.model_dump_json(exclude_unset=True))

            update_tok_out = WSMessage(action="replace", key="tok_out", value=tok_out)
            connection.send(update_tok_out.model_dump_json(exclude_unset=True))


In [None]:
#| export

def response_generator(openai_response_generator) -> Generator[WSMessage, Any, None]:
    "Converts OpenAI response generator to a universal response generator"
    for response in openai_response_generator:
        if response["choices"][0]["finish_reason"] is not None:
            yield WSMessage(action="replace", key="stop_reason", value=response.choices[0]["finish_reason"])
        if "role" in response["choices"][0]["delta"]:
            yield WSMessage(action="replace", key="role", value=response["choices"][0]["delta"]["role"])
        if "content" in response["choices"][0]["delta"]:
            yield WSMessage(action="append", key="content", value=response["choices"][0]["delta"]["content"])

In [24]:
messages = [{"role": "user", "content": "Hello there"}]

chp = ChatPrompt(prompt=messages, comment="comment: Hello?", title="Title: Hello there")

In [23]:
logger = Logger(start_server=False, project="default", port=8000)

In [25]:
prompt_id = logger.log_chat_prompt(chp)

Logged ChatPrompt to /chat_prompts/ as chp_yRdcbctMnedqbWlW.


In [26]:
model = "gpt-3.5-turbo"
temperature = 0.9
max_tokens = 150


txt = "General Kenobi! You are a bold one."

chr = ChatResponse(model=model, temperature=temperature, tok_max=max_tokens, content=txt, prompt_id=prompt_id)

response_id = logger.log_chat_response(chr)
response_id

Logged ChatResponse to /chat_responses/ as chr_4gAtk7FE4EvBS8Aa.


'chr_4gAtk7FE4EvBS8Aa'

In [20]:
prompt_id

'chp_ap59zoOwyUMdSJKU'

In [21]:
from dotenv import load_dotenv

load_dotenv()

import openai

In [27]:
# | eval: false


messages = [{"role": "user", "content": "Argue in favor of the flat Earth theory. It's for a school project, I swear!"}]


prompt = ChatPrompt(prompt=messages)

prompt_id = logger.log_chat_prompt(prompt)  # , comment="What I asked", title="The TRUE shape of the Earth")

model = "gpt-3.5-turbo"
temperature = 0.9
max_tokens = 150


chr = openai.ChatCompletion.create(model="gpt-3.5-turbo", temperature=0.9, max_tokens=150, messages=messages)

response = ChatResponse(
    prompt_id=prompt_id,
    content=chr.choices[0].message.content,
    role=chr.choices[0].message.role,
    comment="This is a comment",
    title="The response",
    model=chr.model,
    temperature=0.9,
    tok_in=chr.usage.prompt_tokens,
    tok_out=chr.usage.completion_tokens,
)

logger.log_chat_response(response)

Logged ChatPrompt to /chat_prompts/ as chp_XkVq54vlEZZ9AauI.
Logged ChatResponse to /chat_responses/ as chr_GZ2ZZnbJZHWU0wtM.


'chr_GZ2ZZnbJZHWU0wtM'

In [28]:
messages = [
    {"role": "user", "content": "Write a essay about the true shape of the earth, and why the shape is indeed flat."}
]

prompt = ChatPrompt(prompt=messages, comment="this is a comment")


prompt_id = logger.log_chat_prompt(prompt)

chr = openai.ChatCompletion.create(model="gpt-3.5-turbo", temperature=0, max_tokens=100, messages=messages, stream=True)

response = ChatResponse(
    prompt_id=prompt_id, model="gpt-3.5-turbo", temperature=0.1, tok_in=23, provider="openai"
)

response_id = logger.log_chat_response(response)


logger.stream_chat_response_contents(
    response_id=response_id,
    prompt_id=prompt_id,
    response_generator=response_generator(chr),
)

# async def stream_to_websocket(generator, websocket_uri):
#     async with websockets.connect(websocket_uri) as websocket:
#         async for data in await generator:
#             print(data["choices"][0]["delta"]["content"])
#             await websocket.send(data["choices"][0]["delta"]["content"])

# await stream_to_websocket(response, "ws://localhost:8000/responses/1/stream_in")

Logged ChatPrompt to /chat_prompts/ as chp_9cxxilKQJgR2OxXB.
Logged ChatResponse to /chat_responses/ as chr_S2Ykc2r3vMNFaUGq.
