diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 2ba30cf..ce733ec 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -48,6 +48,8 @@ jobs: - 'project.toml' - 'requirements.txt' - 'logger_config.yaml' + - 'logger_config_em.yaml' + - 'supervisord.conf' - '.github/workflows/integration-test.yml' integration: @@ -181,6 +183,7 @@ jobs: pip install -r requirements.txt cp example.env .env echo "NEXTCLOUD_URL=http://localhost:8080" >> .env + python3 -u ./main_em.py > em_backend_logs 2>&1 & python3 -u ./main.py > backend_logs 2>&1 & echo $! > ../pid.txt # Save the process ID (PID) @@ -260,7 +263,8 @@ jobs: run: | cat data/nextcloud.log echo '--------------------------------------------------' - cat context_chat_backend/backend_logs || echo "No backend logs" + cat context_chat_backend/backend_logs || echo "No main backend logs" + cat context_chat_backend/em_backend_logs || echo "No main backend logs" echo '--------------------------------------------------' tail -v -n +1 context_chat_backend/persistent_storage/logs/* || echo "No logs in logs directory" diff --git a/Dockerfile b/Dockerfile index b6fd2ce..5823ef4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -46,6 +46,8 @@ COPY main.py . COPY main_em.py . COPY config.?pu.yaml . COPY logger_config.yaml . +COPY logger_config_em.yaml . COPY hwdetect.sh . +COPY supervisord.conf /etc/supervisor/supervisord.conf -ENTRYPOINT [ "./dockerfile_scripts/entrypoint.sh" ] +ENTRYPOINT ["supervisord", "-c", "/etc/supervisor/supervisord.conf"] diff --git a/context_chat_backend/dyn_loader.py b/context_chat_backend/dyn_loader.py index 274a402..79f4577 100644 --- a/context_chat_backend/dyn_loader.py +++ b/context_chat_backend/dyn_loader.py @@ -6,17 +6,11 @@ import gc import logging -import multiprocessing as mp -import os -import signal -import subprocess from abc import ABC, abstractmethod -from datetime import datetime from time import sleep, time from typing import Any import httpx -import psutil import torch from fastapi import FastAPI from langchain.llms.base import LLM @@ -39,45 +33,21 @@ def load(self) -> Any: def offload(self): ... -pid = mp.Value('i', 0) class EmbeddingModelLoader(Loader): def __init__(self, config: TConfig): self.config = config - logfile_path = os.path.join( - os.environ['EM_SERVER_LOG_PATH'], - f'embedding_server_{datetime.now().strftime("%Y-%m-%d")}.log', - ) - self.logfile = open(logfile_path, 'a+') def load(self): - global pid - emconf = self.config.embedding - - # start the embedding server if workers are > 0 - if emconf.workers > 0: - # compare with None, as PID can be 0, you never know - if pid.value > 0 and psutil.pid_exists(pid.value): - return - - proc = subprocess.Popen( # noqa: S603 - ['./main_em.py'], - stdout=self.logfile, - stderr=self.logfile, - stdin=None, - close_fds=True, - env=os.environ, - ) - pid.value = proc.pid - last_resp, last_exc = None, None # poll for heartbeat try_ = 0 - while try_ < 20: - with httpx.Client() as client: + with httpx.Client() as client: + while try_ < 20: try: # test the server is up + # todo: replace with a tcp connection check response = client.post( f'{emconf.protocol}://{emconf.host}:{emconf.port}/v1/embeddings', json={'input': 'hello'}, @@ -98,10 +68,7 @@ def load(self): ) from last_exc def offload(self): - global pid - if pid.value > 0 and psutil.pid_exists(pid.value): - os.kill(pid.value, signal.SIGTERM) - self.logfile.close() + ... class VectorDBLoader(Loader): diff --git a/context_chat_backend/logger.py b/context_chat_backend/logger.py index 7c9de70..0ceefe6 100644 --- a/context_chat_backend/logger.py +++ b/context_chat_backend/logger.py @@ -84,8 +84,8 @@ def _prepare_log_dict(self, record: logging.LogRecord): return message -def get_logging_config() -> dict: - with open('logger_config.yaml') as f: +def get_logging_config(config_path: str) -> dict: + with open(config_path) as f: try: yaml = YAML(typ='safe') config: dict = yaml.load(f) diff --git a/context_chat_backend/ocs_utils.py b/context_chat_backend/ocs_utils.py index f19fcb5..2c1ecd0 100644 --- a/context_chat_backend/ocs_utils.py +++ b/context_chat_backend/ocs_utils.py @@ -15,7 +15,7 @@ logger = logging.getLogger('ccb.ocs_utils') -def _sign_request(headers: dict, username: str = '') -> None: +def sign_request(headers: dict, username: str = '') -> None: headers['EX-APP-ID'] = getenv('APP_ID') headers['EX-APP-VERSION'] = getenv('APP_VERSION') headers['OCS-APIRequest'] = 'true' @@ -124,7 +124,7 @@ def ocs_call( headers.update({'Content-Type': 'application/json'}) data_bytes = json.dumps(json_data).encode('utf-8') - _sign_request(headers, kwargs.get('username', '')) + sign_request(headers, kwargs.get('username', '')) with httpx.Client(verify=verify_ssl) as client: ret = client.request( diff --git a/dockerfile_scripts/install_deps.sh b/dockerfile_scripts/install_deps.sh index 8b80c0d..2ac49ac 100755 --- a/dockerfile_scripts/install_deps.sh +++ b/dockerfile_scripts/install_deps.sh @@ -4,4 +4,4 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # apt-get update -apt-get install -y --no-install-recommends vim git pciutils libgomp1 libreoffice +apt-get install -y --no-install-recommends vim git pciutils libgomp1 libreoffice supervisor diff --git a/hwdetect.sh b/hwdetect.sh index 181ac65..c997fc1 100755 --- a/hwdetect.sh +++ b/hwdetect.sh @@ -47,6 +47,7 @@ if [ "$1" = "config" ]; then exit 0 fi +# todo: use fallback config instead of copying it to persistent storage echo "Copying config file to the persistent storage..." if [ "$accel" = "CUDA" ]; then cp "config.gpu.yaml" "$APP_PERSISTENT_STORAGE/config.yaml" diff --git a/logger_config_em.yaml b/logger_config_em.yaml new file mode 100644 index 0000000..dfb430e --- /dev/null +++ b/logger_config_em.yaml @@ -0,0 +1,57 @@ +# +# SPDX-FileCopyrightText: 2022 MCODING, LLC +# SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors +# SPDX-License-Identifier: AGPL-3.0-or-later +# + +version: 1 +disable_existing_loggers: false + +formatters: + default: + format: '%(asctime)s: [%(levelname)s|%(module)s]: %(message)s' + datefmt: '%Y-%m-%dT%H:%M:%S%z' + + json: + (): context_chat_backend.logger.JSONFormatter + fmt_keys: + timestamp: timestamp + level: levelname + logger: name + message: message + filename: filename + function: funcName + line: lineno + thread_name: threadName + pid: process + + +handlers: + stderr: + class: logging.StreamHandler + level: WARNING + formatter: default + stream: ext://sys.stderr + + file_json: + class: logging.handlers.RotatingFileHandler + level: DEBUG + formatter: json + filename: logs/em_server.log + maxBytes: 20971520 + backupCount: 10 + + +loggers: + root: + level: WARNING + handlers: + - stderr + - file_json + + emserver: + level: DEBUG + handlers: + - stderr + - file_json + propagate: false diff --git a/main.py b/main.py index 3b3a259..746c280 100755 --- a/main.py +++ b/main.py @@ -13,6 +13,8 @@ from context_chat_backend.utils import to_int # isort: skip from context_chat_backend.logger import get_logging_config, setup_logging # isort: skip +LOGGER_CONFIG_NAME = 'logger_config.yaml' + def _setup_log_levels(debug: bool): ''' Set log levels for the modules at once for a cleaner usage later. @@ -40,7 +42,7 @@ def _setup_log_levels(debug: bool): if __name__ == '__main__': - logging_config = get_logging_config() + logging_config = get_logging_config(LOGGER_CONFIG_NAME) setup_logging(logging_config) app_config: TConfig = app.extra['CONFIG'] _setup_log_levels(app_config.debug) diff --git a/main_em.py b/main_em.py index 8576ff4..fa36b94 100755 --- a/main_em.py +++ b/main_em.py @@ -3,76 +3,73 @@ # SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors # SPDX-License-Identifier: AGPL-3.0-or-later # +import logging import os -import signal -import threading -import time +from time import sleep +import httpx import uvicorn -from llama_cpp.server.app import create_app -from llama_cpp.server.settings import ModelSettings, ServerSettings -from starlette.datastructures import URL -from starlette.types import ASGIApp, Receive, Scope, Send -from context_chat_backend.types import TConfig # isort: skip from context_chat_backend.config_parser import get_config # isort: skip +from context_chat_backend.logger import get_logging_config, setup_logging # isort: skip +from context_chat_backend.ocs_utils import sign_request # isort: skip from context_chat_backend.setup_functions import ensure_config_file, setup_env_vars # isort: skip -last_time_lock = threading.Lock() -last_time = 0 -holding_cnt = 0 - -def die_on_time(app_config: TConfig): - global last_time - while True: - time.sleep(60) - with last_time_lock: - if holding_cnt <= 0 and time.time() - last_time > app_config.embedding.offload_after_mins * 60: - print('Killing the embedding server due to inactivity', flush=True) - os.kill(os.getpid(), signal.SIGTERM) - - -class LastAccessMiddleware: - ''' - Records last access time of the request to the embeddings route. - ''' - def __init__(self, app: ASGIApp) -> None: - self.app = app - - async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: - global last_time, holding_cnt - if scope['type'] != 'http': - await self.app(scope, receive, send) - return - - url = URL(scope=scope) - - if url.path == '/heartbeat': - await send({'type': 'http.response.start', 'status': 200}) - await send({'type': 'http.response.body', 'body': b'OK'}) - return - - if url.path == '/v1/embeddings': - try: - with last_time_lock: - holding_cnt += 1 - await self.app(scope, receive, send) - finally: - with last_time_lock: - last_time = time.time() - holding_cnt -= 1 +LOGGER_CONFIG_NAME = 'logger_config_em.yaml' +# todo: config and env var for this +MODEL_ALIAS = 'em_model' +STARTUP_CHECK_SEC = 10 if __name__ == '__main__': - # todo + # intial buffer + sleep(STARTUP_CHECK_SEC) + setup_env_vars() ensure_config_file() - app_config = get_config(os.environ['CC_CONFIG_PATH']) em_conf = app_config.embedding + + if em_conf.workers <= 0: + print('No embedding workers configured, exiting...', flush=True) + exit(0) + print('Embedder config:\n' + em_conf.model_dump_json(indent=2), flush=True) + logging_config = get_logging_config(LOGGER_CONFIG_NAME) + setup_logging(logging_config) + logger = logging.getLogger('emserver') + if app_config.debug: + logger.setLevel(logging.DEBUG) + + _max_tries = 120 # 20 minutes max + _enabled = False + _headers = {} + sign_request(_headers) + # wait for the main process to be ready, check the /enabled endpoint + while _max_tries > 0: + with httpx.Client() as client: + ret = client.get(f'http://{os.environ["APP_HOST"]}:{os.environ["APP_PORT"]}/enabled', headers=_headers) + if ret.status_code != 200: + print(f'Error checking main app status: {ret.text}', flush=True) + sleep(STARTUP_CHECK_SEC) + _max_tries -= 1 + continue + + if not ret.json().get('enabled', False): + print('Main app is not enabled, sleeping for a while...', flush=True) + sleep(STARTUP_CHECK_SEC) + _max_tries -= 1 + continue + + _enabled = True + break + + if not _enabled: + print('Failed waiting for the main app to be enabled, exiting...', flush=True) + exit(0) + # update model path to be in the persistent storage if it is not already valid if 'model' not in em_conf.llama: raise ValueError('Error: "model" key not found in embedding->llama\'s config') @@ -88,31 +85,36 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: if not os.path.isfile(em_conf.llama['model']): raise ValueError('Error: Model file not found at the updated path') + # delayed import for libcuda.so.1 to be available + from llama_cpp.server.app import create_app + from llama_cpp.server.settings import ModelSettings, ServerSettings + server_settings = ServerSettings( host=em_conf.host, port=em_conf.port, ) - model_settings = [ModelSettings(model_alias='em_model', embedding=True, **em_conf.llama)] + model_settings = [ModelSettings(model_alias=MODEL_ALIAS, embedding=True, **em_conf.llama)] app = create_app( server_settings=server_settings, model_settings=model_settings, ) - app.add_middleware(LastAccessMiddleware) - # start the last time thread - last_time_thread = threading.Thread(target=die_on_time, args=(app_config,)) - last_time_thread.start() - with last_time_lock: - last_time = time.time() + uv_log_config = uvicorn.config.LOGGING_CONFIG # pyright: ignore[reportAttributeAccessIssue] + uv_log_config['formatters']['json'] = logging_config['formatters']['json'] + uv_log_config['handlers']['file_json'] = logging_config['handlers']['file_json'] + + uv_log_config['loggers']['uvicorn']['handlers'].append('file_json') + uv_log_config['loggers']['uvicorn.access']['handlers'].append('file_json') uvicorn.run( + # todo: use string import of the app app=app, host=em_conf.host, port=em_conf.port, http='h11', interface='asgi3', - # todo - log_level=('warning', 'trace')[app_config.debug], + log_config=uv_log_config, + log_level=app_config.uvicorn_log_level, use_colors=bool(app_config.use_colors and os.getenv('CI', 'false') == 'false'), workers=em_conf.workers, ) diff --git a/supervisord.conf b/supervisord.conf new file mode 100644 index 0000000..22f52d2 --- /dev/null +++ b/supervisord.conf @@ -0,0 +1,37 @@ +# SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors +# SPDX-License-Identifier: AGPL-3.0-or-later + +[supervisord] +nodaemon=true +logfile=/var/log/supervisor/supervisord.log +pidfile=/var/run/supervisord.pid +childlogdir=/var/log/supervisor +logfile_maxbytes=50MB +logfile_backups=10 +loglevel=error +user=root + +#[supervisorctl] +#serverurl=unix:///var/run/supervisor.sock + +[program:app] +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 +autostart=true +autorestart=true +startsecs=120 +directory=/app +command=/app/dockerfile_scripts/entrypoint.sh + +[program:embedding_server] +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 +autostart=true +autorestart=unexpected +startsecs=120 +directory=/app +command=python3 -u /app/main_em.py