diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index 49f930f5f..dfc0b3e5c 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -83,6 +83,7 @@ async def score_response( seed=int(body.get("seed", 0)), sampling_params=body.get("sampling_parameters", shared_settings.SAMPLING_PARAMS), query=body.get("messages"), + timeout=body.get("timeout", shared_settings.INFERENCE_TIMEOUT), organic=True, ) task_scorer.add_to_queue( @@ -115,6 +116,7 @@ async def score_response( query=search_term, target_results=body.get("target_results", 1), timeout=body.get("timeout", 10), + organic=True, ), response=DendriteResponseEvent( uids=uids, diff --git a/prompting/tasks/task_creation.py b/prompting/tasks/task_creation.py index d31837226..b49e354f7 100644 --- a/prompting/tasks/task_creation.py +++ b/prompting/tasks/task_creation.py @@ -18,7 +18,7 @@ class TaskLoop(AsyncLoopRunner): is_running: bool = False thread: threading.Thread = None - interval: int = 1 + interval: int = 20 task_queue: list | None = [] scoring_queue: list | None = [] miners_dict: dict | None = None diff --git a/shared/settings.py b/shared/settings.py index 27cdba1b7..fc4535547 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -110,6 +110,10 @@ class SharedSettings(BaseSettings): API_TEST_MODE: bool = Field(False, env="API_TEST_MODE") API_UIDS_EXPLORE: float = Field(0.2, env="API_UIDS_EXPLORE") API_TOP_MINERS_SAMPLE: int = Field(400, env="API_TOP_MINERS_SAMPLE") + API_TOP_MINERS_TO_STREAM: int = Field(10, env="API_TOP_MINERS_TO_STREAM") + API_MIN_MINERS_TO_SAMPLE: int = Field(50, env="API_MIN_UIDS_TO_SAMPLE") + OVERRIDE_AVAILABLE_AXONS: list[str] | None = Field(None, env="OVERRIDE_AVAILABLE_AXONS") + API_ENABLE_BALANCE: bool = Field(True, env="API_ENABLE_BALANCE") # Validator scoring API (.env.validator). SCORE_ORGANICS: bool = Field(False, env="SCORE_ORGANICS") diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index 312ce8fc4..fce0aabe2 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -147,14 +147,18 @@ async def _collector(idx: int, resp_task: asyncio.Task) -> None: async def get_response_from_miner(body: dict[str, any], uid: int, timeout_seconds: int) -> tuple: """Get response from a single miner.""" - return await make_openai_query( - metagraph=shared_settings.METAGRAPH, - wallet=shared_settings.WALLET, - body=body, - uid=uid, - stream=False, - timeout_seconds=timeout_seconds, - ) + try: + return await make_openai_query( + metagraph=shared_settings.METAGRAPH, + wallet=shared_settings.WALLET, + body=body, + uid=uid, + stream=False, + timeout_seconds=timeout_seconds, + ) + except BaseException as e: + logger.warning(f"Error getting response from miner {uid}: {e}") + return None async def chat_completion( diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index f8179111d..19b16183a 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -16,7 +16,6 @@ from validator_api.utils import filter_available_uids router = APIRouter() -N_MINERS = 10 @router.post( @@ -88,7 +87,7 @@ async def completions(request: CompletionsRequest, api_key: str = Depends(valida task=body.get("task"), model=body.get("model"), test=shared_settings.API_TEST_MODE, - n_miners=N_MINERS, + n_miners=shared_settings.API_TOP_MINERS_TO_STREAM, n_top_incentive=shared_settings.API_TOP_MINERS_SAMPLE, explore=shared_settings.API_UIDS_EXPLORE, ) @@ -176,8 +175,7 @@ async def create_response_stream(request): async def submit_chain_of_thought_job( request: CompletionsRequest, background_tasks: BackgroundTasks, api_key: str = Depends(validate_api_key) ): - """ - Submit a Chain-of-Thought inference job to be processed in the background. + """Submit a Chain-of-Thought inference job to be processed in the background. This endpoint accepts the same parameters as the /v1/chat/completions endpoint, but instead of streaming the response, it submits the job to the background and @@ -222,7 +220,10 @@ async def submit_chain_of_thought_job( [int(uid) for uid in body.get("uids")] if body.get("uids") else filter_available_uids( - task=body.get("task"), model=body.get("model"), test=shared_settings.API_TEST_MODE, n_miners=N_MINERS + task=body.get("task"), + model=body.get("model"), + test=shared_settings.API_TEST_MODE, + n_miners=shared_settings.API_TOP_MINERS_TO_STREAM, ) ) diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 4769d9251..1c41a2b36 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -12,7 +12,7 @@ from shared import settings from shared.epistula import create_header_hook from shared.loop_runner import AsyncLoopRunner -from validator_api.validator_forwarding import ValidatorRegistry +from validator_api.validator_forwarding import Validator, ValidatorRegistry validator_registry = ValidatorRegistry() @@ -33,7 +33,7 @@ class ScoringQueue(AsyncLoopRunner): max_scoring_retries: int = 2 _scoring_lock = asyncio.Lock() _scoring_queue: deque[ScoringPayload] = deque() - _queue_maxlen: int = 20 + _queue_maxlen: int = 50 _min_wait_time: float = 1 async def wait_for_next_execution(self, last_run_time) -> datetime.datetime: @@ -48,7 +48,6 @@ async def wait_for_next_execution(self, last_run_time) -> datetime.datetime: async def run_step(self): """Perform organic scoring: pop queued payload, forward to the validator API.""" - # logger.debug("Running scoring step") async with self._scoring_lock: if not self._scoring_queue: return @@ -60,52 +59,36 @@ async def run_step(self): f"Trying to score organic from {scoring_payload.date}, uids: {uids}. " f"Queue size: {len(self._scoring_queue)}" ) + validators: list[Validator] = [] try: - vali_uid, vali_axon, vali_hotkey = validator_registry.get_available_axon() - # get_available_axon() will return None if it cannot find an available validator, which will fail to unpack - url = f"http://{vali_axon}/scoring" + if shared_settings.OVERRIDE_AVAILABLE_AXONS: + for idx, vali_axon in enumerate(shared_settings.OVERRIDE_AVAILABLE_AXONS): + validators.append(Validator(uid=-idx, axon=vali_axon, hotkey=shared_settings.API_HOTKEY, stake=1e6)) + else: + validators = await validator_registry.get_available_axons(balance=shared_settings.API_ENABLE_BALANCE) except Exception as e: logger.exception(f"Could not find available validator scoring endpoint: {e}") + + if validators is None: + logger.warning("No validators are available") + return + try: if hasattr(payload, "to_dict"): payload = payload.to_dict() elif isinstance(payload, BaseModel): payload = payload.model_dump() payload_bytes = json.dumps(payload).encode() + except BaseException as e: + logger.exception(f"Error when encoding payload: {e}") + await asyncio.sleep(0.1) + return - timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) - # Add required headers for signature verification - - async with httpx.AsyncClient( - timeout=timeout, - event_hooks={"request": [create_header_hook(shared_settings.WALLET.hotkey, vali_hotkey)]}, - ) as client: - response = await client.post( - url=url, - content=payload_bytes, - headers={"Content-Type": "application/json"}, - ) - validator_registry.update_validators(uid=vali_uid, response_code=response.status_code) - if response.status_code != 200: - # Raise an exception so that the retry logic in the except block handles it. - raise Exception(f"Non-200 response: {response.status_code} for uids {uids}") - logger.debug(f"Forwarding response completed with status {response.status_code} to uid {vali_uid}") - except httpx.ConnectError as e: - logger.warning(f"Couldn't connect to validator {url} for Scoring {uids}. Exception: {e}") - except Exception as e: - if scoring_payload.retries < self.max_scoring_retries: - scoring_payload.retries += 1 - async with self._scoring_lock: - self._scoring_queue.appendleft(scoring_payload) - logger.warning( - f"Tried to forward response from {scoring_payload.date} to {url} for uids {uids}. " - f"Queue size: {len(self._scoring_queue)}. Exception: {e}. Queued for retry" - ) - else: - logger.error( - f"Error while forwarding response from {scoring_payload.date} after {scoring_payload.retries} " - f"retries: {e}" - ) + tasks = [] + for _, validator in validators.items(): + task = asyncio.create_task(self._send_result(payload_bytes, scoring_payload, validator, uids)) + tasks.append(task) + await asyncio.gather(*tasks) await asyncio.sleep(0.1) async def append_response( @@ -151,6 +134,47 @@ async def append_response( logger.info(f"Dropping oldest organic from {scoring_payload.date} for uids {uids}") self._scoring_queue.append(scoring_item) + async def _send_result(self, payload_bytes, scoring_payload, validator: Validator, uids): + try: + vali_url = f"http://{validator.axon}/scoring" + timeout = httpx.Timeout(timeout=120.0) + async with httpx.AsyncClient( + timeout=timeout, + event_hooks={"request": [create_header_hook(shared_settings.WALLET.hotkey, validator.hotkey)]}, + ) as client: + response = await client.post( + url=vali_url, + content=payload_bytes, + headers={"Content-Type": "application/json"}, + ) + validator_registry.update_validators(uid=validator.uid, response_code=response.status_code) + if response.status_code != 200: + raise Exception( + f"Status code {response.status_code} response for validator {validator.uid} - {vali_url}: " + f"{response.status_code} for uids {len(uids)}" + ) + logger.debug(f"Successfully forwarded response to uid {validator.uid} - {vali_url}") + except httpx.ConnectError as e: + logger.warning( + f"Couldn't connect to validator {validator.uid} {vali_url} for scoring {len(uids)}. Exception: {e}" + ) + except Exception as e: + if shared_settings.API_ENABLE_BALANCE and scoring_payload.retries < self.max_scoring_retries: + scoring_payload.retries += 1 + async with self._scoring_lock: + self._scoring_queue.appendleft(scoring_payload) + logger.warning( + f"Tried to forward response from {scoring_payload.date} " + f"to validator {validator.uid} {vali_url} for {len(uids)} uids. " + f"Queue size: {len(self._scoring_queue)}. Exception: {e}" + ) + else: + logger.warning( + f"Error while forwarding response from {scoring_payload.date} " + f"to validator {validator.uid} {vali_url} for {len(uids)} uids " + f"retries. Queue size: {len(self._scoring_queue)}. Exception: {e}" + ) + @property def size(self) -> int: return len(self._scoring_queue) diff --git a/validator_api/validator_forwarding.py b/validator_api/validator_forwarding.py index 869a9e992..c20927261 100644 --- a/validator_api/validator_forwarding.py +++ b/validator_api/validator_forwarding.py @@ -1,6 +1,7 @@ +import asyncio import random import time -from typing import ClassVar, List, Optional, Tuple +from typing import ClassVar import numpy as np from loguru import logger @@ -54,23 +55,24 @@ class ValidatorRegistry(BaseModel): @model_validator(mode="after") def create_validator_list(cls, v: "ValidatorRegistry", metagraph=shared_settings.METAGRAPH) -> "ValidatorRegistry": - validator_uids = np.where(metagraph.stake >= 100000)[0].tolist() + # TODO: Calculate 1000 tao using subtensor alpha price. + validator_uids = np.where(metagraph.stake >= 16000)[0].tolist() validator_axons = [metagraph.axons[uid].ip_str().split("/")[2] for uid in validator_uids] validator_stakes = [metagraph.stake[uid] for uid in validator_uids] validator_hotkeys = [metagraph.hotkeys[uid] for uid in validator_uids] - v.validators = { - uid: Validator(uid=uid, stake=stake, axon=axon, hotkey=hotkey) - for uid, stake, axon, hotkey in zip(validator_uids, validator_stakes, validator_axons, validator_hotkeys) - } + v.validators = {} + for uid, stake, axon, hotkey in zip(validator_uids, validator_stakes, validator_axons, validator_hotkeys): + if hotkey == shared_settings.MC_VALIDATOR_HOTKEY: + logger.debug(f"Replacing {uid} axon from {axon} to {shared_settings.MC_VALIDATOR_AXON}") + axon = shared_settings.MC_VALIDATOR_AXON + v.validators[uid] = Validator(uid=uid, stake=stake, axon=axon, hotkey=hotkey) return v - def get_available_validators(self) -> List[Validator]: - """ - Given a list of validators, return only those that are not in their cooldown period. - """ + async def get_available_validators(self) -> list[Validator]: + """Given a list of validators, return only those that are not in their cooldown period.""" return [uid for uid, validator in self.validators.items() if validator.is_available()] - def get_available_axon(self) -> Optional[Tuple[int, List[str], str]]: + async def get_available_axons(self, balance: bool = True) -> list[Validator] | None: """ Returns a tuple (uid, axon, hotkey) for a randomly selected validator based on stake weighting, if spot checking conditions are met. Otherwise, returns None. @@ -78,19 +80,20 @@ def get_available_axon(self) -> Optional[Tuple[int, List[str], str]]: if random.random() < self.spot_checking_rate or not self.validators: return None for _ in range(self.max_retries): - validator_list = self.get_available_validators() + validator_list = await self.get_available_validators() if validator_list: break else: - time.sleep(5) + await asyncio.sleep(5) if not validator_list: logger.error(f"Could not find available validator after {self.max_retries}") return None - weights = [self.validators[uid].stake for uid in validator_list] - chosen = self.validators[random.choices(validator_list, weights=weights, k=1)[0]] - if chosen.hotkey == shared_settings.MC_VALIDATOR_HOTKEY: - chosen.axon = shared_settings.MC_VALIDATOR_AXON - return chosen.uid, chosen.axon, chosen.hotkey + if balance: + weights = [self.validators[uid].stake for uid in validator_list] + chosen = [self.validators[random.choices(validator_list, weights=weights, k=1)[0]]] + else: + chosen = self.validators + return chosen def update_validators(self, uid: int, response_code: int) -> None: """