From 118cac4f4c6f746532202fd21ca1fda1ebb5354e Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Wed, 5 Feb 2025 08:58:51 +0000 Subject: [PATCH 01/14] Clean llm and sn19 wrappers --- prompting/llms/apis/llm_wrapper.py | 25 +++++++++++-------------- prompting/llms/apis/sn19_wrapper.py | 11 ++++------- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/prompting/llms/apis/llm_wrapper.py b/prompting/llms/apis/llm_wrapper.py index d9d40646e..dffcfe3f0 100644 --- a/prompting/llms/apis/llm_wrapper.py +++ b/prompting/llms/apis/llm_wrapper.py @@ -9,12 +9,12 @@ class LLMWrapper: def chat_complete( messages: LLMMessages, - model="chat-llama-3-1-70b", - temperature=0.5, - max_tokens=500, - top_p=1, - stream=False, - logprobs=True, + model: str = "chat-llama-3-1-70b", + temperature: float = 0.5, + max_tokens: int = 500, + top_p: float = 1, + stream: bool = False, + logprobs: bool = True, ) -> str: response: str | None = None if "gpt" not in model.lower() and shared_settings.SN19_API_KEY and shared_settings.SN19_API_URL: @@ -28,16 +28,13 @@ def chat_complete( stream=stream, logprobs=logprobs, ) + logger.debug(f"Generated {len(response)} characters using {model}") + return response except Exception as ex: - logger.exception(ex) - logger.warning("Failed to use SN19 API, falling back to GPT-3.5") - else: - if response is not None: - logger.debug(f"Generated {len(response)} characters using {model}") - return response - logger.warning( - "Failed to use SN19 API (check the SN19_API_KEY and/or SN19_API_URL), " "falling back to GPT-3.5" + logger.error( + "Failed to use SN19 API, falling back to GPT-3.5. " + f"Make sure to setup 'SN19_API_KEY' and 'SN19_API_URL' in .env.validator" ) model = "gpt-3.5-turbo" diff --git a/prompting/llms/apis/sn19_wrapper.py b/prompting/llms/apis/sn19_wrapper.py index 2e1d634df..0b43871ed 100644 --- a/prompting/llms/apis/sn19_wrapper.py +++ b/prompting/llms/apis/sn19_wrapper.py @@ -37,11 +37,8 @@ def chat_complete( "logprobs": logprobs, } response = requests.post(url, headers=headers, data=json.dumps(data), timeout=30) + response_json = response.json() try: - response_json = response.json() - try: - return response_json["choices"][0]["message"].get("content") - except KeyError: - return response_json["choices"][0]["delta"].get("content") - except Exception as e: - logger.exception(f"Error in chat_complete: {e}") + return response_json["choices"][0]["message"].get("content") + except KeyError: + return response_json["choices"][0]["delta"].get("content") From 97c33be7e375b4cfca51190ba3224be2192e7216 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Wed, 5 Feb 2025 11:41:06 +0000 Subject: [PATCH 02/14] Add scoring queue --- shared/settings.py | 4 ++ validator_api/api.py | 5 +- validator_api/chat_completion.py | 9 +++- validator_api/gpt_endpoints.py | 10 +++- validator_api/scoring_queue.py | 93 ++++++++++++++++++++++++++++++++ validator_api/utils.py | 12 +++++ 6 files changed, 126 insertions(+), 7 deletions(-) create mode 100644 validator_api/scoring_queue.py diff --git a/shared/settings.py b/shared/settings.py index 8bb342caf..80cb4b111 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -86,6 +86,10 @@ class SharedSettings(BaseSettings): # ==== API ===== # API key used to access validator organic scoring mechanism (both .env.validator and .env.api). SCORING_KEY: str | None = Field(None, env="SCORING_KEY") + # Scoring request rate limit in seconds. + SCORING_RATE_LIMIT_SEC: float = Field(0.5, env="SCORING_RATE_LIMIT_SEC") + # Scoring queue threshold when rate-limit start to kick in, used to query validator API with scoring requests. + SCORING_QUEUE_API_THRESHOLD: int = Field(5, env="SCORING_QUEUE_API_THRESHOLD") # Validator scoring API (.env.validator). DEPLOY_SCORING_API: bool = Field(False, env="DEPLOY_SCORING_API") diff --git a/validator_api/api.py b/validator_api/api.py index a56388d25..29b967356 100644 --- a/validator_api/api.py +++ b/validator_api/api.py @@ -8,6 +8,7 @@ settings.shared_settings = settings.SharedSettings.load(mode="api") shared_settings = settings.shared_settings +from validator_api import scoring_queue from validator_api.api_management import router as api_management_router from validator_api.gpt_endpoints import router as gpt_router from validator_api.utils import update_miner_availabilities_for_api @@ -16,9 +17,6 @@ app.include_router(gpt_router, tags=["GPT Endpoints"]) app.include_router(api_management_router, tags=["API Management"]) -# TODO: This api requests miner availabilities from validator -# TODO: Forward the results from miners to the validator - @app.get("/health") async def health(): @@ -27,6 +25,7 @@ async def health(): async def main(): asyncio.create_task(update_miner_availabilities_for_api.start()) + asyncio.create_task(scoring_queue.scoring_queue.start()) uvicorn.run( app, host=shared_settings.API_HOST, diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index 3d976cfb0..94a308bff 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -11,7 +11,7 @@ from shared.epistula import make_openai_query from shared.settings import shared_settings from shared.uids import get_uids -from validator_api.utils import forward_response +from validator_api import scoring_queue async def peek_until_valid_chunk( @@ -143,7 +143,12 @@ async def stream_from_first_response( remaining = asyncio.gather(*pending, return_exceptions=True) remaining_tasks = asyncio.create_task(collect_remaining_responses(remaining, collected_chunks_list, body, uids)) await remaining_tasks - asyncio.create_task(forward_response(uids, body, collected_chunks_list)) + # asyncio.create_task(forward_response(uids, body, collected_chunks_list)) + asyncio.create_task( + scoring_queue.scoring_queue.append_response( + uids=uids, body=body, chunks=collected_chunks_list + ) + ) except asyncio.CancelledError: logger.info("Client disconnected, streaming cancelled") diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index dc6d32a95..c4d9a214f 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -12,11 +12,12 @@ from shared.epistula import SynapseStreamResult, query_miners from shared.settings import shared_settings +from validator_api import scoring_queue from validator_api.api_management import _keys from validator_api.chat_completion import chat_completion from validator_api.mixture_of_miners import mixture_of_miners from validator_api.test_time_inference import generate_response -from validator_api.utils import filter_available_uids, forward_response +from validator_api.utils import filter_available_uids router = APIRouter() N_MINERS = 5 @@ -95,7 +96,12 @@ async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] = raise HTTPException(status_code=500, detail="No miner responded successfully") chunks = [res.accumulated_chunks if res and res.accumulated_chunks else [] for res in stream_results] - asyncio.create_task(forward_response(uids=uids, body=body, chunks=chunks)) + # asyncio.create_task(forward_response(uids=uids, body=body, chunks=chunks)) + asyncio.create_task( + scoring_queue.scoring_queue.append_response( + uids=uids, body=body, chunks=chunks + ) + ) return loaded_results diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py new file mode 100644 index 000000000..526f2057b --- /dev/null +++ b/validator_api/scoring_queue.py @@ -0,0 +1,93 @@ +import asyncio +from collections import deque +import datetime +from typing import Any + +import httpx +from loguru import logger +from pydantic import BaseModel + +from shared.loop_runner import AsyncLoopRunner +from shared.settings import shared_settings + + +class ScoringPayload(BaseModel): + payload: dict[str, Any] + query_retries: int = 0 + + +class ScoringQueue(AsyncLoopRunner): + """Performs organic scoring every `interval` seconds.""" + interval: float = shared_settings.SCORING_RATE_LIMIT_SEC + scoring_queue_threshold: int = shared_settings.SCORING_QUEUE_API_THRESHOLD + max_scoring_retries: int = 3 + name = __name__ + _scoring_lock = asyncio.Lock() + _scoring_queue: deque[ScoringPayload] = deque() + + async def wait_for_next_execution(self, last_run_time) -> datetime.datetime: + """If scoring queue is small, execute immediately, otherwise wait until the next execution time.""" + async with self._scoring_lock: + if self.size < self.scoring_queue_threshold: + return datetime.datetime.now() + + return super().wait_for_next_execution(last_run_time) + + async def run_step(self): + """Perform organic scoring: pop queued payload, forward to the validator API.""" + async with self._scoring_lock: + if not self._scoring_queue: + return + scoring_payload = self._scoring_queue.popleft() + + url = f"http://{shared_settings.VALIDATOR_API}/scoring" + payload = scoring_payload.payload + try: + uids = payload["uids"] + timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.post( + url=url, + json=payload, + headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"} + ) + if response.status_code != 200: + # Raise an exception so that the retry logic in the except block handles it. + raise Exception(f"Non-200 response: {response.status_code} for uids {uids}") + logger.info(f"Forwarding response completed with status {response.status_code}") + except Exception as e: + if scoring_payload.retries < self.max_scoring_retries: + scoring_payload.retries += 1 + async with self._scoring_lock: + self._scoring_queue.appendleft(scoring_payload) + logger.error(f"Tried to forward response to {url} with payload {payload}. Queued for retry") + else: + logger.exception(f"Error while forwarding response: {e}") + + async def append_response(self, uids: list[int], body: dict[str, any], chunks: list[list[str]]): + uids = [int(u) for u in uids] + chunk_dict = {u: c for u, c in zip(uids, chunks)} + logger.info(f"Forwarding response from uid {uids} to scoring with body: {body} and chunks: {chunks}") + if not shared_settings.SCORE_ORGANICS: + return + + if body.get("task") != "InferenceTask" and body.get("task") != "WebRetrievalTask": + logger.debug(f"Skipping forwarding for non- inference/web retrieval task: {body.get('task')}") + return + + payload = {"body": body, "chunks": chunk_dict, "uid": uids} + scoring_item = ScoringPayload(payload=payload) + + async with self._scoring_lock: + self._scoring_queue.append(scoring_item) + + @property + def size(self) -> int: + return len(self._scoring_queue) + + def __len__(self) -> int: + return self.size + + +# TODO: Leaving it as a global var to make less architecture changes, refactor as DI. +scoring_queue = ScoringQueue() diff --git a/validator_api/utils.py b/validator_api/utils.py index 85a001aef..b04112e68 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -1,3 +1,6 @@ +import asyncio +from collections import deque +import datetime import httpx import requests from loguru import logger @@ -7,12 +10,18 @@ from shared.uids import get_uids +_scoring_lock = asyncio.Lock() +_scoring_last_query_time = datetime.datetime.fromtimestamp(0) +_scoring_queue: deque[dict[str, any]] = deque() + + class UpdateMinerAvailabilitiesForAPI(AsyncLoopRunner): miner_availabilities: dict[int, dict] = {} async def run_step(self): try: response = requests.post( + # TODO check if settings changes are working. f"http://{shared_settings.VALIDATOR_API}/miner_availabilities/miner_availabilities", headers={"accept": "application/json", "Content-Type": "application/json"}, json=get_uids(sampling_mode="all"), @@ -82,6 +91,9 @@ async def forward_response(uids: list[int], body: dict[str, any], chunks: list[l url = f"http://{shared_settings.VALIDATOR_API}/scoring" payload = {"body": body, "chunks": chunk_dict, "uid": uids} + + _scoring_queue.append(payload) + _scoring_lock.popleft() try: timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) async with httpx.AsyncClient(timeout=timeout) as client: From 7446b5c4302b07c71b66db9f70aea978d06368b8 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Wed, 5 Feb 2025 11:43:58 +0000 Subject: [PATCH 03/14] Revert non-related files --- prompting/llms/apis/llm_wrapper.py | 25 ++++++++++++++----------- prompting/llms/apis/sn19_wrapper.py | 11 +++++++---- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/prompting/llms/apis/llm_wrapper.py b/prompting/llms/apis/llm_wrapper.py index dffcfe3f0..d9d40646e 100644 --- a/prompting/llms/apis/llm_wrapper.py +++ b/prompting/llms/apis/llm_wrapper.py @@ -9,12 +9,12 @@ class LLMWrapper: def chat_complete( messages: LLMMessages, - model: str = "chat-llama-3-1-70b", - temperature: float = 0.5, - max_tokens: int = 500, - top_p: float = 1, - stream: bool = False, - logprobs: bool = True, + model="chat-llama-3-1-70b", + temperature=0.5, + max_tokens=500, + top_p=1, + stream=False, + logprobs=True, ) -> str: response: str | None = None if "gpt" not in model.lower() and shared_settings.SN19_API_KEY and shared_settings.SN19_API_URL: @@ -28,13 +28,16 @@ def chat_complete( stream=stream, logprobs=logprobs, ) - logger.debug(f"Generated {len(response)} characters using {model}") - return response except Exception as ex: - logger.error( - "Failed to use SN19 API, falling back to GPT-3.5. " - f"Make sure to setup 'SN19_API_KEY' and 'SN19_API_URL' in .env.validator" + logger.exception(ex) + logger.warning("Failed to use SN19 API, falling back to GPT-3.5") + else: + if response is not None: + logger.debug(f"Generated {len(response)} characters using {model}") + return response + logger.warning( + "Failed to use SN19 API (check the SN19_API_KEY and/or SN19_API_URL), " "falling back to GPT-3.5" ) model = "gpt-3.5-turbo" diff --git a/prompting/llms/apis/sn19_wrapper.py b/prompting/llms/apis/sn19_wrapper.py index 0b43871ed..2e1d634df 100644 --- a/prompting/llms/apis/sn19_wrapper.py +++ b/prompting/llms/apis/sn19_wrapper.py @@ -37,8 +37,11 @@ def chat_complete( "logprobs": logprobs, } response = requests.post(url, headers=headers, data=json.dumps(data), timeout=30) - response_json = response.json() try: - return response_json["choices"][0]["message"].get("content") - except KeyError: - return response_json["choices"][0]["delta"].get("content") + response_json = response.json() + try: + return response_json["choices"][0]["message"].get("content") + except KeyError: + return response_json["choices"][0]["delta"].get("content") + except Exception as e: + logger.exception(f"Error in chat_complete: {e}") From 1199ae135a5a99d487f0b905221ccfa2691211bb Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Wed, 5 Feb 2025 11:53:20 +0000 Subject: [PATCH 04/14] Remove unused code --- validator_api/utils.py | 44 ------------------------------------------ 1 file changed, 44 deletions(-) diff --git a/validator_api/utils.py b/validator_api/utils.py index b04112e68..5b99c4cbe 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -1,7 +1,3 @@ -import asyncio -from collections import deque -import datetime -import httpx import requests from loguru import logger @@ -10,11 +6,6 @@ from shared.uids import get_uids -_scoring_lock = asyncio.Lock() -_scoring_last_query_time = datetime.datetime.fromtimestamp(0) -_scoring_queue: deque[dict[str, any]] = deque() - - class UpdateMinerAvailabilitiesForAPI(AsyncLoopRunner): miner_availabilities: dict[int, dict] = {} @@ -74,38 +65,3 @@ def filter_available_uids(task: str | None = None, model: str | None = None) -> filtered_uids.append(uid) return filtered_uids - - -# TODO: Modify this so that all the forwarded responses are sent in a single request. This is both more efficient but -# also means that on the validator side all responses are scored at once, speeding up the scoring process. -async def forward_response(uids: list[int], body: dict[str, any], chunks: list[list[str]]): - uids = [int(u) for u in uids] - chunk_dict = {u: c for u, c in zip(uids, chunks)} - logger.info(f"Forwarding response from uid {uids} to scoring with body: {body} and chunks: {chunks}") - if not shared_settings.SCORE_ORGANICS: - return - - if body.get("task") != "InferenceTask" and body.get("task") != "WebRetrievalTask": - logger.debug(f"Skipping forwarding for non- inference/web retrieval task: {body.get('task')}") - return - - url = f"http://{shared_settings.VALIDATOR_API}/scoring" - payload = {"body": body, "chunks": chunk_dict, "uid": uids} - - _scoring_queue.append(payload) - _scoring_lock.popleft() - try: - timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) - async with httpx.AsyncClient(timeout=timeout) as client: - response = await client.post( - url, json=payload, headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"} - ) - if response.status_code == 200: - logger.info(f"Forwarding response completed with status {response.status_code}") - else: - logger.exception( - f"Forwarding response uid {uids} failed with status {response.status_code} and payload {payload}" - ) - except Exception as e: - logger.error(f"Tried to forward response to {url} with payload {payload}") - logger.exception(f"Error while forwarding response: {e}") From 1c2a505f41a1e18d67f7e4e1c1bf13dc061ee865 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Thu, 6 Feb 2025 01:10:39 +0000 Subject: [PATCH 05/14] Fix scoring queue --- validator_api/scoring_queue.py | 1 - 1 file changed, 1 deletion(-) diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 526f2057b..3eeabe215 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -21,7 +21,6 @@ class ScoringQueue(AsyncLoopRunner): interval: float = shared_settings.SCORING_RATE_LIMIT_SEC scoring_queue_threshold: int = shared_settings.SCORING_QUEUE_API_THRESHOLD max_scoring_retries: int = 3 - name = __name__ _scoring_lock = asyncio.Lock() _scoring_queue: deque[ScoringPayload] = deque() From 6f37d23714ba86a2001b1734e7e5d17d481cbd8f Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Thu, 6 Feb 2025 01:11:50 +0000 Subject: [PATCH 06/14] Fix pre-commit --- validator_api/chat_completion.py | 4 +--- validator_api/gpt_endpoints.py | 6 +----- validator_api/scoring_queue.py | 7 ++++--- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index 94a308bff..84cc49ff6 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -145,9 +145,7 @@ async def stream_from_first_response( await remaining_tasks # asyncio.create_task(forward_response(uids, body, collected_chunks_list)) asyncio.create_task( - scoring_queue.scoring_queue.append_response( - uids=uids, body=body, chunks=collected_chunks_list - ) + scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=collected_chunks_list) ) except asyncio.CancelledError: diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index c4d9a214f..041f65fbc 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -97,11 +97,7 @@ async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] = chunks = [res.accumulated_chunks if res and res.accumulated_chunks else [] for res in stream_results] # asyncio.create_task(forward_response(uids=uids, body=body, chunks=chunks)) - asyncio.create_task( - scoring_queue.scoring_queue.append_response( - uids=uids, body=body, chunks=chunks - ) - ) + asyncio.create_task(scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=chunks)) return loaded_results diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 3eeabe215..febcf835c 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -1,6 +1,6 @@ import asyncio -from collections import deque import datetime +from collections import deque from typing import Any import httpx @@ -18,6 +18,7 @@ class ScoringPayload(BaseModel): class ScoringQueue(AsyncLoopRunner): """Performs organic scoring every `interval` seconds.""" + interval: float = shared_settings.SCORING_RATE_LIMIT_SEC scoring_queue_threshold: int = shared_settings.SCORING_QUEUE_API_THRESHOLD max_scoring_retries: int = 3 @@ -48,7 +49,7 @@ async def run_step(self): response = await client.post( url=url, json=payload, - headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"} + headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"}, ) if response.status_code != 200: # Raise an exception so that the retry logic in the except block handles it. @@ -83,7 +84,7 @@ async def append_response(self, uids: list[int], body: dict[str, any], chunks: l @property def size(self) -> int: return len(self._scoring_queue) - + def __len__(self) -> int: return self.size From 11159ceb0c1527a4d6942f53816d573d2629ea87 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Thu, 6 Feb 2025 01:15:30 +0000 Subject: [PATCH 07/14] Improve logging --- validator_api/scoring_queue.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index febcf835c..a177c419e 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -7,6 +7,8 @@ from loguru import logger from pydantic import BaseModel +from prompting.tasks.inference import InferenceTask +from prompting.tasks.web_retrieval import WebRetrievalTask from shared.loop_runner import AsyncLoopRunner from shared.settings import shared_settings @@ -62,7 +64,7 @@ async def run_step(self): self._scoring_queue.appendleft(scoring_payload) logger.error(f"Tried to forward response to {url} with payload {payload}. Queued for retry") else: - logger.exception(f"Error while forwarding response: {e}") + logger.exception(f"Error while forwarding response after {scoring_payload.retries} retries: {e}") async def append_response(self, uids: list[int], body: dict[str, any], chunks: list[list[str]]): uids = [int(u) for u in uids] @@ -71,8 +73,8 @@ async def append_response(self, uids: list[int], body: dict[str, any], chunks: l if not shared_settings.SCORE_ORGANICS: return - if body.get("task") != "InferenceTask" and body.get("task") != "WebRetrievalTask": - logger.debug(f"Skipping forwarding for non- inference/web retrieval task: {body.get('task')}") + if body.get("task") != InferenceTask.__name__ and body.get("task") != WebRetrievalTask.__name__: + logger.debug(f"Skipping forwarding for non-inference/web retrieval task: {body.get('task')}") return payload = {"body": body, "chunks": chunk_dict, "uid": uids} From 345cf217fb58ffebbda6181cee63e85e6605c667 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Thu, 6 Feb 2025 06:09:22 +0000 Subject: [PATCH 08/14] Minor fixes --- validator_api/scoring_queue.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index a177c419e..b58f57aa6 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -15,12 +15,11 @@ class ScoringPayload(BaseModel): payload: dict[str, Any] - query_retries: int = 0 + retries: int = 0 class ScoringQueue(AsyncLoopRunner): """Performs organic scoring every `interval` seconds.""" - interval: float = shared_settings.SCORING_RATE_LIMIT_SEC scoring_queue_threshold: int = shared_settings.SCORING_QUEUE_API_THRESHOLD max_scoring_retries: int = 3 @@ -30,10 +29,11 @@ class ScoringQueue(AsyncLoopRunner): async def wait_for_next_execution(self, last_run_time) -> datetime.datetime: """If scoring queue is small, execute immediately, otherwise wait until the next execution time.""" async with self._scoring_lock: - if self.size < self.scoring_queue_threshold: + if self.scoring_queue_threshold < self.size > 0: + # If scoring queue is small and non-empty, score immediately. return datetime.datetime.now() - return super().wait_for_next_execution(last_run_time) + return await super().wait_for_next_execution(last_run_time) async def run_step(self): """Perform organic scoring: pop queued payload, forward to the validator API.""" @@ -41,11 +41,12 @@ async def run_step(self): if not self._scoring_queue: return scoring_payload = self._scoring_queue.popleft() + uids = payload["uids"] + logger.info(f"Received new organic for scoring, uids: {uids}") url = f"http://{shared_settings.VALIDATOR_API}/scoring" payload = scoring_payload.payload try: - uids = payload["uids"] timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) async with httpx.AsyncClient(timeout=timeout) as client: response = await client.post( @@ -67,9 +68,6 @@ async def run_step(self): logger.exception(f"Error while forwarding response after {scoring_payload.retries} retries: {e}") async def append_response(self, uids: list[int], body: dict[str, any], chunks: list[list[str]]): - uids = [int(u) for u in uids] - chunk_dict = {u: c for u, c in zip(uids, chunks)} - logger.info(f"Forwarding response from uid {uids} to scoring with body: {body} and chunks: {chunks}") if not shared_settings.SCORE_ORGANICS: return @@ -77,12 +75,17 @@ async def append_response(self, uids: list[int], body: dict[str, any], chunks: l logger.debug(f"Skipping forwarding for non-inference/web retrieval task: {body.get('task')}") return + uids = [int(u) for u in uids] + chunk_dict = {u: c for u, c in zip(uids, chunks)} payload = {"body": body, "chunks": chunk_dict, "uid": uids} scoring_item = ScoringPayload(payload=payload) async with self._scoring_lock: self._scoring_queue.append(scoring_item) + logger.info(f"Appended responses from uids {uids} into scoring queue with size: {self.size}") + logger.debug(f"Queued responses body: {body}, chunks: {chunks}") + @property def size(self) -> int: return len(self._scoring_queue) From dc8c1e542202202ae2c174711aa6bc133cbe5d0f Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Thu, 6 Feb 2025 06:20:10 +0000 Subject: [PATCH 09/14] Run pre-commit --- validator_api/scoring_queue.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index b58f57aa6..0031a7d67 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -20,6 +20,7 @@ class ScoringPayload(BaseModel): class ScoringQueue(AsyncLoopRunner): """Performs organic scoring every `interval` seconds.""" + interval: float = shared_settings.SCORING_RATE_LIMIT_SEC scoring_queue_threshold: int = shared_settings.SCORING_QUEUE_API_THRESHOLD max_scoring_retries: int = 3 @@ -40,12 +41,13 @@ async def run_step(self): async with self._scoring_lock: if not self._scoring_queue: return + scoring_payload = self._scoring_queue.popleft() + payload = scoring_payload.payload uids = payload["uids"] logger.info(f"Received new organic for scoring, uids: {uids}") url = f"http://{shared_settings.VALIDATOR_API}/scoring" - payload = scoring_payload.payload try: timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) async with httpx.AsyncClient(timeout=timeout) as client: @@ -67,7 +69,7 @@ async def run_step(self): else: logger.exception(f"Error while forwarding response after {scoring_payload.retries} retries: {e}") - async def append_response(self, uids: list[int], body: dict[str, any], chunks: list[list[str]]): + async def append_response(self, uids: list[int], body: dict[str, Any], chunks: list[list[str]]): if not shared_settings.SCORE_ORGANICS: return From 0a3119f7a9e73ed6f249a003efaa42e81aa153f0 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Thu, 6 Feb 2025 07:00:05 +0000 Subject: [PATCH 10/14] Remove tasks deps from API --- validator_api/scoring_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 0031a7d67..52fec30e2 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -73,7 +73,7 @@ async def append_response(self, uids: list[int], body: dict[str, Any], chunks: l if not shared_settings.SCORE_ORGANICS: return - if body.get("task") != InferenceTask.__name__ and body.get("task") != WebRetrievalTask.__name__: + if body.get("task") != "InferenceTask" and body.get("task") != "WebRetrievalTask": logger.debug(f"Skipping forwarding for non-inference/web retrieval task: {body.get('task')}") return From 420ecbf22e882d19901cf833eb578d9aa8d032d7 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Thu, 6 Feb 2025 07:00:45 +0000 Subject: [PATCH 11/14] Remove unused imports --- validator_api/scoring_queue.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 52fec30e2..35a007f76 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -7,8 +7,6 @@ from loguru import logger from pydantic import BaseModel -from prompting.tasks.inference import InferenceTask -from prompting.tasks.web_retrieval import WebRetrievalTask from shared.loop_runner import AsyncLoopRunner from shared.settings import shared_settings From bb3a03bac2ffd8d3371d1cfbddc451c7a41e611a Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Thu, 6 Feb 2025 07:49:28 +0000 Subject: [PATCH 12/14] Fix uid key error --- validator_api/scoring_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 35a007f76..8bcd91943 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -42,7 +42,7 @@ async def run_step(self): scoring_payload = self._scoring_queue.popleft() payload = scoring_payload.payload - uids = payload["uids"] + uids = payload["uid"] logger.info(f"Received new organic for scoring, uids: {uids}") url = f"http://{shared_settings.VALIDATOR_API}/scoring" From 5635855177e5c59b82cc1487a7258de351cfcb68 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Thu, 6 Feb 2025 07:54:03 +0000 Subject: [PATCH 13/14] Fix test api script --- scripts/test_api.py | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/test_api.py b/scripts/test_api.py index 0d6a265ba..bfa40bd3f 100644 --- a/scripts/test_api.py +++ b/scripts/test_api.py @@ -28,7 +28,6 @@ async def make_completion(client: openai.AsyncOpenAI, prompt: str, stream: bool "top_k": 50, "max_new_tokens": 256, "do_sample": True, - "seed": None, }, "task": "InferenceTask", "mixture": False, From f474a614b75495ec2af673538c0ebe9fe9af9aa0 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Thu, 6 Feb 2025 08:26:21 +0000 Subject: [PATCH 14/14] Clean up the code --- validator_api/chat_completion.py | 1 - validator_api/gpt_endpoints.py | 1 - validator_api/utils.py | 1 - 3 files changed, 3 deletions(-) diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index 84cc49ff6..b44c2b769 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -143,7 +143,6 @@ async def stream_from_first_response( remaining = asyncio.gather(*pending, return_exceptions=True) remaining_tasks = asyncio.create_task(collect_remaining_responses(remaining, collected_chunks_list, body, uids)) await remaining_tasks - # asyncio.create_task(forward_response(uids, body, collected_chunks_list)) asyncio.create_task( scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=collected_chunks_list) ) diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index 041f65fbc..3dee5274f 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -96,7 +96,6 @@ async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] = raise HTTPException(status_code=500, detail="No miner responded successfully") chunks = [res.accumulated_chunks if res and res.accumulated_chunks else [] for res in stream_results] - # asyncio.create_task(forward_response(uids=uids, body=body, chunks=chunks)) asyncio.create_task(scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=chunks)) return loaded_results diff --git a/validator_api/utils.py b/validator_api/utils.py index 5b99c4cbe..4f570c41d 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -12,7 +12,6 @@ class UpdateMinerAvailabilitiesForAPI(AsyncLoopRunner): async def run_step(self): try: response = requests.post( - # TODO check if settings changes are working. f"http://{shared_settings.VALIDATOR_API}/miner_availabilities/miner_availabilities", headers={"accept": "application/json", "Content-Type": "application/json"}, json=get_uids(sampling_mode="all"),