In [1]:
from IPython.display import display, Markdown

In [2]:
import dotenv
import os

dotenv.load_dotenv()


def _getOrThrow(key):
    value = os.getenv(key)
    if value is None:
        raise RuntimeError(f"Missing environment variable: {key}")
    return value


OPENAI_API_KEY = _getOrThrow("OPENAI_API_KEY")


In [3]:
import requests


class GptCompleter:
    def __init__(self, system_prompt: str) -> None:
        self._system_prompt = system_prompt

    def complete(self, prompt: str) -> str:
        response = requests.post(
            "https://api.openai.com/v1/chat/completions",
            json={
                "model": "gpt-4-0125-preview",
                "max_tokens": 4000,
                "temperature": 0,
                "top_p": 1,
                "frequency_penalty": 0,
                "presence_penalty": 0,
                "messages": [
                    {"role": "system", "content": self._system_prompt},
                    {"role": "user", "content": prompt},
                ],
            },
            headers={
                "Content-Type": "application/json",
                "Authorization": f"Bearer {OPENAI_API_KEY}",
            },
        )
        response.raise_for_status()
        data = response.json()
        return data["choices"][0]["message"]["content"]


In [4]:
from pathlib import Path


SYSTEM_PROMPT_TEPMPLATE = """\
You are a programming assistant. 

When asked to update, please provide only the code that you want to add or change.


Source files are:
```
{filenames}
```

Source code:

{source_files}
"""


SOURCE_PART_TEMPLATE = """\
name: `{filename}`

contents:
```
{contents}
```
"""

INCLUDE_SUFFIXES = [".py", ".md", ".json"]
EXCLUDE_DIRS = [".git", "__pycache__", ".venv"]


def all_files_recursively(dir=".", exclude_dirs=EXCLUDE_DIRS):
    result = []
    for p in Path(dir).iterdir():
        if p.is_dir() and p.name in exclude_dirs:
            continue
        if p.is_file():
            result.append(p)
        else:
            result.extend(all_files_recursively(p, exclude_dirs))
    return result


def build_sys_prompt(dir="."):
    all_files = all_files_recursively(dir)
    filenames = [str(p.relative_to(dir)) for p in all_files]
    parts = []
    for p in all_files:
        if p.suffix in INCLUDE_SUFFIXES:
            try:
                contents = p.read_text() or "EMPTY FILE"
            except Exception as e:
                print(f"Error reading {p}: {e}")
                return
            parts.append(
                SOURCE_PART_TEMPLATE.format(
                    filename=p.relative_to(dir), contents=contents
                )
            )
    return SYSTEM_PROMPT_TEPMPLATE.format(
        filenames="\n".join(filenames), source_files="\n".join(parts)
    )



In [None]:
display(Markdown(build_sys_prompt()))

In [6]:
QUERY = """\
Please suggest how to implement Thread creation in RuntimeManager.
"""
res = GptCompleter(build_sys_prompt()).complete(QUERY)
display(Markdown(res))

To implement thread creation in the `RuntimeManager` class for starting bots, you can use the `threading` module in Python. The idea is to create a new thread for each bot that needs to be started, and then run the bot's start method within that thread. This allows each bot to operate concurrently without blocking the main execution thread of your application.

Here's how you can implement the `start` method in the `RuntimeManager` class to create and start a new thread for each bot:

```python
from threading import Thread

class RuntimeManager:
    def __init__(self, master_config: MasterConfig):
        self._master_config = check_required(master_config, "main_config", MasterConfig)
        self._threads = {}  # Keep track of threads by bot_id

    def start_all(self):
        for bot_conf in self._master_config.bots():
            self.start(bot_conf)

    def stop_all(self):
        for bot_id in list(self._threads.keys()):
            self.stop(bot_id)

    def start(self, bot_conf: BotConf):
        if isinstance(bot_conf, TgBotConf):
            # Define the target function for the thread
            def target():
                from app.src.server.tg_bot import TgBot  # Import here to avoid circular imports
                bot = TgBot(bot_conf)
                bot.start()

            # Create and start a new thread for the bot
            thread = Thread(target=target, name=bot_conf.bot_id(), daemon=True)
            thread.start()
            self._threads[bot_conf.bot_id()] = thread
        else:
            raise NotImplementedError(
                f"Bot type {bot_conf.__class__.__name__} is not supported"
            )
        
    def stop(self, bot_id: str):
        # Here you would implement logic to gracefully stop the bot
        # For now, we'll just remove it from the threads dictionary
        if bot_id in self._threads:
            # Implement stopping logic here if necessary
            del self._threads[bot_id]
```

This implementation adds a `_threads` dictionary to the `RuntimeManager` class to keep track of the threads by their `bot_id`. When `start` is called for a bot configuration, it checks if the bot is of type `TgBotConf`, and if so, it creates a new thread targeting a function that initializes and starts the bot. The thread is then started and stored in the `_threads` dictionary.

The `stop` method is a placeholder where you would implement the logic to gracefully stop a bot. For now, it simply removes the bot's thread from the `_threads` dictionary. You would need to add more logic here to actually stop the bot's execution, depending on how your bot is designed to handle shutdown signals.

This approach allows each bot to run in its own thread, enabling concurrent operation of multiple bots within your application.