diff --git a/scripts/test_api.py b/scripts/test_api.py index 0d6a265ba..bfa40bd3f 100644 --- a/scripts/test_api.py +++ b/scripts/test_api.py @@ -28,7 +28,6 @@ async def make_completion(client: openai.AsyncOpenAI, prompt: str, stream: bool "top_k": 50, "max_new_tokens": 256, "do_sample": True, - "seed": None, }, "task": "InferenceTask", "mixture": False, diff --git a/shared/settings.py b/shared/settings.py index 14e7006ec..eaf5a8e02 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -86,6 +86,10 @@ class SharedSettings(BaseSettings): # ==== API ===== # API key used to access validator organic scoring mechanism (both .env.validator and .env.api). SCORING_KEY: str | None = Field(None, env="SCORING_KEY") + # Scoring request rate limit in seconds. + SCORING_RATE_LIMIT_SEC: float = Field(0.5, env="SCORING_RATE_LIMIT_SEC") + # Scoring queue threshold when rate-limit start to kick in, used to query validator API with scoring requests. + SCORING_QUEUE_API_THRESHOLD: int = Field(5, env="SCORING_QUEUE_API_THRESHOLD") # Validator scoring API (.env.validator). DEPLOY_SCORING_API: bool = Field(False, env="DEPLOY_SCORING_API") diff --git a/validator_api/api.py b/validator_api/api.py index e96b313c6..af80c43c4 100644 --- a/validator_api/api.py +++ b/validator_api/api.py @@ -9,6 +9,7 @@ settings.shared_settings = settings.SharedSettings.load(mode="api") shared_settings = settings.shared_settings +from validator_api import scoring_queue from validator_api.api_management import router as api_management_router from validator_api.gpt_endpoints import router as gpt_router from validator_api.utils import update_miner_availabilities_for_api @@ -38,6 +39,8 @@ async def health(): async def main(): + asyncio.create_task(update_miner_availabilities_for_api.start()) + asyncio.create_task(scoring_queue.scoring_queue.start()) uvicorn.run( "validator_api.api:app", host=shared_settings.API_HOST, diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index 3d976cfb0..b44c2b769 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -11,7 +11,7 @@ from shared.epistula import make_openai_query from shared.settings import shared_settings from shared.uids import get_uids -from validator_api.utils import forward_response +from validator_api import scoring_queue async def peek_until_valid_chunk( @@ -143,7 +143,9 @@ async def stream_from_first_response( remaining = asyncio.gather(*pending, return_exceptions=True) remaining_tasks = asyncio.create_task(collect_remaining_responses(remaining, collected_chunks_list, body, uids)) await remaining_tasks - asyncio.create_task(forward_response(uids, body, collected_chunks_list)) + asyncio.create_task( + scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=collected_chunks_list) + ) except asyncio.CancelledError: logger.info("Client disconnected, streaming cancelled") diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index dc6d32a95..3dee5274f 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -12,11 +12,12 @@ from shared.epistula import SynapseStreamResult, query_miners from shared.settings import shared_settings +from validator_api import scoring_queue from validator_api.api_management import _keys from validator_api.chat_completion import chat_completion from validator_api.mixture_of_miners import mixture_of_miners from validator_api.test_time_inference import generate_response -from validator_api.utils import filter_available_uids, forward_response +from validator_api.utils import filter_available_uids router = APIRouter() N_MINERS = 5 @@ -95,7 +96,7 @@ async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] = raise HTTPException(status_code=500, detail="No miner responded successfully") chunks = [res.accumulated_chunks if res and res.accumulated_chunks else [] for res in stream_results] - asyncio.create_task(forward_response(uids=uids, body=body, chunks=chunks)) + asyncio.create_task(scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=chunks)) return loaded_results diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py new file mode 100644 index 000000000..8bcd91943 --- /dev/null +++ b/validator_api/scoring_queue.py @@ -0,0 +1,98 @@ +import asyncio +import datetime +from collections import deque +from typing import Any + +import httpx +from loguru import logger +from pydantic import BaseModel + +from shared.loop_runner import AsyncLoopRunner +from shared.settings import shared_settings + + +class ScoringPayload(BaseModel): + payload: dict[str, Any] + retries: int = 0 + + +class ScoringQueue(AsyncLoopRunner): + """Performs organic scoring every `interval` seconds.""" + + interval: float = shared_settings.SCORING_RATE_LIMIT_SEC + scoring_queue_threshold: int = shared_settings.SCORING_QUEUE_API_THRESHOLD + max_scoring_retries: int = 3 + _scoring_lock = asyncio.Lock() + _scoring_queue: deque[ScoringPayload] = deque() + + async def wait_for_next_execution(self, last_run_time) -> datetime.datetime: + """If scoring queue is small, execute immediately, otherwise wait until the next execution time.""" + async with self._scoring_lock: + if self.scoring_queue_threshold < self.size > 0: + # If scoring queue is small and non-empty, score immediately. + return datetime.datetime.now() + + return await super().wait_for_next_execution(last_run_time) + + async def run_step(self): + """Perform organic scoring: pop queued payload, forward to the validator API.""" + async with self._scoring_lock: + if not self._scoring_queue: + return + + scoring_payload = self._scoring_queue.popleft() + payload = scoring_payload.payload + uids = payload["uid"] + logger.info(f"Received new organic for scoring, uids: {uids}") + + url = f"http://{shared_settings.VALIDATOR_API}/scoring" + try: + timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.post( + url=url, + json=payload, + headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"}, + ) + if response.status_code != 200: + # Raise an exception so that the retry logic in the except block handles it. + raise Exception(f"Non-200 response: {response.status_code} for uids {uids}") + logger.info(f"Forwarding response completed with status {response.status_code}") + except Exception as e: + if scoring_payload.retries < self.max_scoring_retries: + scoring_payload.retries += 1 + async with self._scoring_lock: + self._scoring_queue.appendleft(scoring_payload) + logger.error(f"Tried to forward response to {url} with payload {payload}. Queued for retry") + else: + logger.exception(f"Error while forwarding response after {scoring_payload.retries} retries: {e}") + + async def append_response(self, uids: list[int], body: dict[str, Any], chunks: list[list[str]]): + if not shared_settings.SCORE_ORGANICS: + return + + if body.get("task") != "InferenceTask" and body.get("task") != "WebRetrievalTask": + logger.debug(f"Skipping forwarding for non-inference/web retrieval task: {body.get('task')}") + return + + uids = [int(u) for u in uids] + chunk_dict = {u: c for u, c in zip(uids, chunks)} + payload = {"body": body, "chunks": chunk_dict, "uid": uids} + scoring_item = ScoringPayload(payload=payload) + + async with self._scoring_lock: + self._scoring_queue.append(scoring_item) + + logger.info(f"Appended responses from uids {uids} into scoring queue with size: {self.size}") + logger.debug(f"Queued responses body: {body}, chunks: {chunks}") + + @property + def size(self) -> int: + return len(self._scoring_queue) + + def __len__(self) -> int: + return self.size + + +# TODO: Leaving it as a global var to make less architecture changes, refactor as DI. +scoring_queue = ScoringQueue() diff --git a/validator_api/utils.py b/validator_api/utils.py index 85a001aef..4f570c41d 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -1,4 +1,3 @@ -import httpx import requests from loguru import logger @@ -65,35 +64,3 @@ def filter_available_uids(task: str | None = None, model: str | None = None) -> filtered_uids.append(uid) return filtered_uids - - -# TODO: Modify this so that all the forwarded responses are sent in a single request. This is both more efficient but -# also means that on the validator side all responses are scored at once, speeding up the scoring process. -async def forward_response(uids: list[int], body: dict[str, any], chunks: list[list[str]]): - uids = [int(u) for u in uids] - chunk_dict = {u: c for u, c in zip(uids, chunks)} - logger.info(f"Forwarding response from uid {uids} to scoring with body: {body} and chunks: {chunks}") - if not shared_settings.SCORE_ORGANICS: - return - - if body.get("task") != "InferenceTask" and body.get("task") != "WebRetrievalTask": - logger.debug(f"Skipping forwarding for non- inference/web retrieval task: {body.get('task')}") - return - - url = f"http://{shared_settings.VALIDATOR_API}/scoring" - payload = {"body": body, "chunks": chunk_dict, "uid": uids} - try: - timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) - async with httpx.AsyncClient(timeout=timeout) as client: - response = await client.post( - url, json=payload, headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"} - ) - if response.status_code == 200: - logger.info(f"Forwarding response completed with status {response.status_code}") - else: - logger.exception( - f"Forwarding response uid {uids} failed with status {response.status_code} and payload {payload}" - ) - except Exception as e: - logger.error(f"Tried to forward response to {url} with payload {payload}") - logger.exception(f"Error while forwarding response: {e}")