From 87c7f96dde2bd814df1bbdf40371ca24d5297781 Mon Sep 17 00:00:00 2001 From: richwardle Date: Tue, 28 Jan 2025 16:14:23 +0000 Subject: [PATCH 01/28] WIP: add sampling by stake --- validator_api/utils.py | 11 +++++++++-- validator_api/validator_forwarding.py | 23 +++++++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) create mode 100644 validator_api/validator_forwarding.py diff --git a/validator_api/utils.py b/validator_api/utils.py index 4560bbb54..c734746ba 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -2,8 +2,10 @@ from loguru import logger from shared.settings import shared_settings +from validator_api.validator_forwarding import ValidatorForwarding - +# make class w/ getter that yields validator_axon (creates from shared_settings) based on criterea (stake*x*Y) + # TODO: Modify this so that all the forwarded responses are sent in a single request. This is both more efficient but # also means that on the validator side all responses are scored at once, speeding up the scoring process. async def forward_response(uids: int, body: dict[str, any], chunks: list[str]): @@ -17,7 +19,10 @@ async def forward_response(uids: int, body: dict[str, any], chunks: list[str]): logger.debug(f"Skipping forwarding for non- inference/web retrieval task: {body.get('task')}") return - url = f"http://{shared_settings.VALIDATOR_API}/scoring" + # call - class w/ getter that yields validator_axon based on criterea (stake*x*Y) + # validator_axon = class(shared_settings.METAGRAPH) + + url = ValidatorForwarding.get_validator_axons()[0] payload = {"body": body, "chunks": chunk_dict, "uid": uids} try: timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) @@ -34,3 +39,5 @@ async def forward_response(uids: int, body: dict[str, any], chunks: list[str]): except Exception as e: logger.error(f"Tried to forward response to {url} with payload {payload}") logger.exception(f"Error while forwarding response: {e}") + + diff --git a/validator_api/validator_forwarding.py b/validator_api/validator_forwarding.py new file mode 100644 index 000000000..0f9b272fb --- /dev/null +++ b/validator_api/validator_forwarding.py @@ -0,0 +1,23 @@ +from shared.settings import shared_settings + +class ValidatorForwarding: + """ + Class to yield validator axon based on shared settings. + """ + def __call__(self): + return self.get_validator_criterion(shared_settings.METAGRAPH) + + def get_validator_axons(self, metagraph): + # Get all validator axons where validator_permit[uid] is True + validator_axons = [uid for uid in metagraph.validator_permit if metagraph.validator_permit[uid]] + + # First sort validators by stake + stakes = {uid: metagraph.stake[uid] for uid in validator_axons} + sorted_validators = sorted(stakes.items(), key=lambda x: x[1], reverse=True) + + # Get IP addresses + validator_axons = [metagraph.axons[uid].ip_str().split("/") for uid, _ in sorted_validators] + + #add: random.choice by stake + + return validator_axons \ No newline at end of file From 65733cbe89fc6bac658da13f91b079afaab2f379 Mon Sep 17 00:00:00 2001 From: richwardle Date: Sat, 1 Feb 2025 08:01:34 +0000 Subject: [PATCH 02/28] Serve Validator Scoring Endpoint to Chain --- neurons/validator.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/neurons/validator.py b/neurons/validator.py index ef8559a5a..f57b5f4eb 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -84,7 +84,23 @@ async def spawn_loops(task_queue, scoring_queue, reward_events): def start_api(scoring_queue, reward_events): async def start(): from prompting.api.api import start_scoring_api # noqa: F401 + import bittensor as bt + try: + external_ip = requests.get("https://checkip.amazonaws.com").text.strip() + netaddr.IPAddress(external_ip) + + serve_success = serve_extrinsic( + subtensor=shared_settings.SUBTENSOR, + wallet=shared_settings.WALLET, + ip=external_ip, + port=shared_settings.SCORING_API_PORT, + protocol=4, + netuid=shared_settings.NETUID, + ) + + except Exception as e: + logger.warning(f"Failed to serve scoring api to chain: {e}") await start_scoring_api(scoring_queue, reward_events) while True: From 6961c5ef2e947bd0ceed34d5710ecb31b9bd7a4a Mon Sep 17 00:00:00 2001 From: richwardle Date: Sat, 1 Feb 2025 08:21:00 +0000 Subject: [PATCH 03/28] Intermediary Dump --- validator_api/validator_forwarding.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/validator_api/validator_forwarding.py b/validator_api/validator_forwarding.py index 0f9b272fb..92864a177 100644 --- a/validator_api/validator_forwarding.py +++ b/validator_api/validator_forwarding.py @@ -1,4 +1,28 @@ from shared.settings import shared_settings +from pydantic import BaseModel, model_validator +from shared.misc import cached_property_with_expiration + +class ValidatorRegistry(BaseModel): + """ + This is a class to store the success of forwards to validator axons. + If a validator is routinely failing to respond to scoring requests we should stop sending them to them. + """ + + availability_registry: dict | None = None + + + + @cached_property_with_expiration(expiration_seconds=12000) + def AXON_STAKE_DICT(self, metagraph = shared_settings.METAGRAPH) -> dict: + validator_uids = [uid for uid in metagraph.validator_permit if metagraph.validator_permit[uid]] + validator_axons = [metagraph.axons[uid].ip_str().split("/") for uid in validator_uids] + validator_stakes = [metagraph.stake[uid] for uid in validator_uids] + axon_stake_dict: dict = {axon: stake for axon, stake in zip(validator_axons, validator_stakes)} + + + def get_availabilities(self): + + class ValidatorForwarding: """ From 134a17983a169d5b7a73b2ef0878c121e0f59191 Mon Sep 17 00:00:00 2001 From: richwardle Date: Sat, 1 Feb 2025 19:07:59 +0000 Subject: [PATCH 04/28] Create Validator Registry Object --- validator_api/utils.py | 17 ++++- validator_api/validator_forwarding.py | 90 +++++++++++++++++---------- 2 files changed, 71 insertions(+), 36 deletions(-) diff --git a/validator_api/utils.py b/validator_api/utils.py index c734746ba..5ab3c88fc 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -2,7 +2,9 @@ from loguru import logger from shared.settings import shared_settings -from validator_api.validator_forwarding import ValidatorForwarding +from validator_api.validator_forwarding import ValidatorRegistry + +validator_registry = ValidatorRegistry() # make class w/ getter that yields validator_axon (creates from shared_settings) based on criterea (stake*x*Y) @@ -21,8 +23,16 @@ async def forward_response(uids: int, body: dict[str, any], chunks: list[str]): # call - class w/ getter that yields validator_axon based on criterea (stake*x*Y) # validator_axon = class(shared_settings.METAGRAPH) + try: + vali_uid, vali_axon = validator_registry.get_available_axon() + except Exception as e: + logger.warning(e) + vali_uid, vali_axon = None, None + if not vali_uid: + logger.warning("Unable to get an available validator, either through spot-checking restrictions or errors, skipping scoring") + return - url = ValidatorForwarding.get_validator_axons()[0] + url = f"http://{vali_axon}/scoring" payload = {"body": body, "chunks": chunk_dict, "uid": uids} try: timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) @@ -36,8 +46,11 @@ async def forward_response(uids: int, body: dict[str, any], chunks: list[str]): logger.exception( f"Forwarding response uid {uids} failed with status {response.status_code} and payload {payload}" ) + validator_registry.update_validators(uid = vali_uid, response_code = response.status_code) + except Exception as e: logger.error(f"Tried to forward response to {url} with payload {payload}") logger.exception(f"Error while forwarding response: {e}") + diff --git a/validator_api/validator_forwarding.py b/validator_api/validator_forwarding.py index 92864a177..fa04bceb6 100644 --- a/validator_api/validator_forwarding.py +++ b/validator_api/validator_forwarding.py @@ -1,47 +1,69 @@ from shared.settings import shared_settings from pydantic import BaseModel, model_validator +from typing import ClassVar from shared.misc import cached_property_with_expiration -class ValidatorRegistry(BaseModel): - """ - This is a class to store the success of forwards to validator axons. - If a validator is routinely failing to respond to scoring requests we should stop sending them to them. - """ - - availability_registry: dict | None = None +class Validator(BaseModel): + uid: int + stake: float + axon: str + failures: int = 0 - - - @cached_property_with_expiration(expiration_seconds=12000) - def AXON_STAKE_DICT(self, metagraph = shared_settings.METAGRAPH) -> dict: - validator_uids = [uid for uid in metagraph.validator_permit if metagraph.validator_permit[uid]] - validator_axons = [metagraph.axons[uid].ip_str().split("/") for uid in validator_uids] - validator_stakes = [metagraph.stake[uid] for uid in validator_uids] - axon_stake_dict: dict = {axon: stake for axon, stake in zip(validator_axons, validator_stakes)} + # Define a constant for the maximum number of allowed failures. + MAX_FAILURES: ClassVar[int] = 9 + def update_failure(self, status_code: int) -> None: + """ + Update the validator's failure count based on the success status. - def get_availabilities(self): + - If the operation was successful, decrease the failure count (ensuring it doesn't go below 0). + - If the operation failed, increase the failure count. + - If the failure count exceeds MAX_FAILURES, mark the validator as inactive. + """ + if status_code == 200: + self.failures = max(0, self.failures - 1) + else: + self.failures += 1 + if self.failures > self.MAX_FAILURES: + return 1 + return 0 - - -class ValidatorForwarding: +class ValidatorRegistry(BaseModel): """ - Class to yield validator axon based on shared settings. + Class to store the success of forwards to validator axons. + If a validator is routinely failing to respond to scoring requests, + we should stop sending them requests. """ - def __call__(self): - return self.get_validator_criterion(shared_settings.METAGRAPH) - - def get_validator_axons(self, metagraph): - # Get all validator axons where validator_permit[uid] is True - validator_axons = [uid for uid in metagraph.validator_permit if metagraph.validator_permit[uid]] + + # Using a default factory ensures validators is always a dict. + validators: Dict[int, Validator] = Field(default_factory=dict) + spot_checking_rate: ClassVar[float] = 0.3 - # First sort validators by stake - stakes = {uid: metagraph.stake[uid] for uid in validator_axons} - sorted_validators = sorted(stakes.items(), key=lambda x: x[1], reverse=True) + @model_validator(mode='after') + def create_validator_list(cls, v: "ValidatorRegistry", metagraph = shared_settings.METAGRAPH) -> "ValidatorRegistry": + validator_uids = [ + uid for uid in metagraph.validator_permit if metagraph.validator_permit[uid] + ] + validator_axons = [ + metagraph.axons[uid].ip_str().split("/") for uid in validator_uids + ] + validator_stakes = [metagraph.stake[uid] for uid in validator_uids] + v.validators = { + uid: Validator(uid, stake, axon) + for uid, stake, axon in zip(validator_uids, validator_stakes, validator_axons) + } + return v - # Get IP addresses - validator_axons = [metagraph.axons[uid].ip_str().split("/") for uid, _ in sorted_validators] - - #add: random.choice by stake + def get_available_axon(self) -> Optional[Tuple[int, list]]: + if random.random() > self.spot_checking_rate or not self.validators: + return None, None + validator_list = list(self.validators.values()) + weights = [v.stake for v in validator_list] + chosen = random.choices(validator_list, weights=weights, k=1)[0] + return chosen.uid, chosen.axon - return validator_axons \ No newline at end of file + def update_validators(self, uid: int, response_code: int) -> None: + if uid in self.validators: + max_failures_reached = self.validators[uid].update_failure(success) + if max_failures_reached: + del self.validators[uid] From a11f6e979c61e9644221c83cf433b84523e6b8fc Mon Sep 17 00:00:00 2001 From: richwardle Date: Mon, 3 Feb 2025 10:00:06 +0000 Subject: [PATCH 05/28] Update Web Retrieval to Use Response Forwarding --- validator_api/gpt_endpoints.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index 46ed22889..70ec2afc5 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -80,10 +80,7 @@ async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] = if len(loaded_results) == 0: raise HTTPException(status_code=500, detail="No miner responded successfully") - for uid, res in zip(uids, stream_results): - asyncio.create_task( - forward_response( - uid=uid, body=body, chunks=res.accumulated_chunks if res and res.accumulated_chunks else [] - ) - ) + + collected_chunks_list = [res.accumulated_chunks if res and res.accumulated_chunks else [] for res in stream_results] + asyncio.create_task(forward_response(uids, body, collected_chunks_list)) return loaded_results From 4d68dbaca59368011c7556a5a336f2b0b659d21c Mon Sep 17 00:00:00 2001 From: richwardle Date: Mon, 3 Feb 2025 10:45:08 +0000 Subject: [PATCH 06/28] Remove SCORE_ORGANICS parameter --- .env.api.example | 1 - shared/settings.py | 5 +---- validator_api/utils.py | 2 -- 3 files changed, 1 insertion(+), 7 deletions(-) diff --git a/.env.api.example b/.env.api.example index dcd8ea1dc..b03d6bac0 100644 --- a/.env.api.example +++ b/.env.api.example @@ -1,5 +1,4 @@ API_PORT = "42170" # Port for the API server API_HOST = "0.0.0.0" # Host for the API server SCORING_KEY = "123" # The scoring key for the validator (must match the scoring key in the .env.validator file) -SCORE_ORGANICS = True # Whether to score organics VALIDATOR_API = "0.0.0.0:8094" # The validator API to forward responses to for scoring diff --git a/shared/settings.py b/shared/settings.py index 70c2e1686..ce3bcc4aa 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -89,10 +89,9 @@ class SharedSettings(BaseSettings): SCORING_KEY: str | None = Field(None, env="SCORING_KEY") # Validator scoring API (.env.validator). - DEPLOY_SCORING_API: bool = Field(False, env="DEPLOY_SCORING_API") + DEPLOY_SCORING_API: bool = Field(True, env="DEPLOY_SCORING_API") SCORING_API_PORT: int = Field(8094, env="SCORING_API_PORT") SCORING_ADMIN_KEY: str | None = Field(None, env="SCORING_ADMIN_KEY") - SCORE_ORGANICS: bool = Field(False, env="SCORE_ORGANICS") # API Management (.env.api). API_PORT: int = Field(8005, env="API_PORT") @@ -172,8 +171,6 @@ def validate_mode(cls, v): logger.warning( "No SCORING_KEY found in .env.api file. You must add a scoring key that will allow us to forward miner responses to the validator for scoring." ) - if not os.getenv("SCORE_ORGANICS"): - logger.warning("Not scoring organics. This means that miners may not respond as consistently.") elif v["mode"] == "miner": if not dotenv.load_dotenv(".env.miner"): logger.warning("No .env.miner file found. Please create one.") diff --git a/validator_api/utils.py b/validator_api/utils.py index 5ab3c88fc..3d7e8601a 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -14,8 +14,6 @@ async def forward_response(uids: int, body: dict[str, any], chunks: list[str]): uids = [int(u) for u in uids] chunk_dict = {u: c for u, c in zip(uids, chunks)} logger.info(f"Forwarding response from uid {uids} to scoring with body: {body} and chunks: {chunks}") - if not shared_settings.SCORE_ORGANICS: - return if body.get("task") != "InferenceTask" and body.get("task") != "WebRetrievalTask": logger.debug(f"Skipping forwarding for non- inference/web retrieval task: {body.get('task')}") From ffafdda6f9059580ac61f990ea946f6f23e749f1 Mon Sep 17 00:00:00 2001 From: richwardle Date: Mon, 3 Feb 2025 13:08:10 +0000 Subject: [PATCH 07/28] Remove Unused Parameters --- .env.validator.example | 1 - shared/settings.py | 1 - 2 files changed, 2 deletions(-) diff --git a/.env.validator.example b/.env.validator.example index 0e1bee89c..0b34af5be 100644 --- a/.env.validator.example +++ b/.env.validator.example @@ -26,7 +26,6 @@ HF_TOKEN = "your_huggingface_token_here" # Scoring API (optional). DEPLOY_SCORING_API = true -SCORING_ADMIN_KEY = "123456" SCORING_API_PORT = 8094 # Scoring key must match the scoring key in the .env.api. # SCORING_KEY="..." diff --git a/shared/settings.py b/shared/settings.py index ce3bcc4aa..8ec4b202d 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -91,7 +91,6 @@ class SharedSettings(BaseSettings): # Validator scoring API (.env.validator). DEPLOY_SCORING_API: bool = Field(True, env="DEPLOY_SCORING_API") SCORING_API_PORT: int = Field(8094, env="SCORING_API_PORT") - SCORING_ADMIN_KEY: str | None = Field(None, env="SCORING_ADMIN_KEY") # API Management (.env.api). API_PORT: int = Field(8005, env="API_PORT") From c790034c5924e50047896df9f18acbd3b1e9a317 Mon Sep 17 00:00:00 2001 From: richwardle Date: Mon, 3 Feb 2025 13:09:47 +0000 Subject: [PATCH 08/28] Use Epistula for scroing api --- prompting/api/scoring/api.py | 8 ++----- validator_api/utils.py | 32 ++++++++++++++++++--------- validator_api/validator_forwarding.py | 6 +++-- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index c78e68f54..6edf32030 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -9,6 +9,7 @@ from prompting.rewards.scoring import task_scorer from prompting.tasks.inference import InferenceTask from prompting.tasks.web_retrieval import WebRetrievalTask +from shared.epistula import verify_signature from shared.base import DatasetEntry from shared.dendrite import DendriteResponseEvent from shared.epistula import SynapseStreamResult @@ -17,13 +18,8 @@ router = APIRouter() -def validate_scoring_key(api_key: str = Header(...)): - if api_key != shared_settings.SCORING_KEY: - raise HTTPException(status_code=403, detail="Invalid API key") - - @router.post("/scoring") -async def score_response(request: Request, api_key_data: dict = Depends(validate_scoring_key)): +async def score_response(request: Request, api_key_data: dict = Depends(verify_signature)): model = None payload: dict[str, Any] = await request.json() body = payload.get("body") diff --git a/validator_api/utils.py b/validator_api/utils.py index 3d7e8601a..344431f66 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -22,7 +22,7 @@ async def forward_response(uids: int, body: dict[str, any], chunks: list[str]): # call - class w/ getter that yields validator_axon based on criterea (stake*x*Y) # validator_axon = class(shared_settings.METAGRAPH) try: - vali_uid, vali_axon = validator_registry.get_available_axon() + vali_uid, vali_axon, vali_hotkey = validator_registry.get_available_axon() except Exception as e: logger.warning(e) vali_uid, vali_axon = None, None @@ -32,11 +32,24 @@ async def forward_response(uids: int, body: dict[str, any], chunks: list[str]): url = f"http://{vali_axon}/scoring" payload = {"body": body, "chunks": chunk_dict, "uid": uids} - try: - timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) - async with httpx.AsyncClient(timeout=timeout) as client: + # Create an AsyncClient that attaches the header hook. + # The header hook is created by passing the wallet’s hotkey and the axon’s hotkey. + # Adjust the attribute access as needed depending on how your axon object is defined. + async with httpx.AsyncClient( + timeout=timeout, + event_hooks={ + "request": [ + create_header_hook(shared_settings.WALLET.hotkey, vali_hotkey) + ] + }, + ) as client: + try: response = await client.post( - url, json=payload, headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"} + url, + json=payload, + headers={ + "Content-Type": "application/json", + }, ) if response.status_code == 200: logger.info(f"Forwarding response completed with status {response.status_code}") @@ -44,11 +57,10 @@ async def forward_response(uids: int, body: dict[str, any], chunks: list[str]): logger.exception( f"Forwarding response uid {uids} failed with status {response.status_code} and payload {payload}" ) - validator_registry.update_validators(uid = vali_uid, response_code = response.status_code) - - except Exception as e: - logger.error(f"Tried to forward response to {url} with payload {payload}") - logger.exception(f"Error while forwarding response: {e}") + # Update the validator registry with the response status code. + validator_registry.update_validators(uid=vali_uid, response_code=response.status_code) + except Exception as e: + logger.exception(f"Exception during forwarding response: {e}") diff --git a/validator_api/validator_forwarding.py b/validator_api/validator_forwarding.py index fa04bceb6..8af527a18 100644 --- a/validator_api/validator_forwarding.py +++ b/validator_api/validator_forwarding.py @@ -7,6 +7,7 @@ class Validator(BaseModel): uid: int stake: float axon: str + hotkey: str failures: int = 0 # Define a constant for the maximum number of allowed failures. @@ -48,9 +49,10 @@ def create_validator_list(cls, v: "ValidatorRegistry", metagraph = shared_settin metagraph.axons[uid].ip_str().split("/") for uid in validator_uids ] validator_stakes = [metagraph.stake[uid] for uid in validator_uids] + validator_hotkeys = [metagraph.hotkeys[uid] for uid in validator_uids] v.validators = { uid: Validator(uid, stake, axon) - for uid, stake, axon in zip(validator_uids, validator_stakes, validator_axons) + for uid, stake, axon, hotkey in zip(validator_uids, validator_stakes, validator_axons, validator_hotkeys) } return v @@ -60,7 +62,7 @@ def get_available_axon(self) -> Optional[Tuple[int, list]]: validator_list = list(self.validators.values()) weights = [v.stake for v in validator_list] chosen = random.choices(validator_list, weights=weights, k=1)[0] - return chosen.uid, chosen.axon + return chosen.uid, chosen.axon, chosen.hotkey def update_validators(self, uid: int, response_code: int) -> None: if uid in self.validators: From c5d8fe6a6e3666fae4b2e93caea79d1c8ab0815a Mon Sep 17 00:00:00 2001 From: richwardle Date: Mon, 3 Feb 2025 16:39:35 +0000 Subject: [PATCH 09/28] Switch to Epistula Based Approach --- prompting/api/scoring/api.py | 30 ++++++++++++++++++++++++++++++ shared/settings.py | 4 ++-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index 6edf32030..48f5e4e45 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -17,6 +17,36 @@ router = APIRouter() +def verify_scoring_signature( + signature: str, + body: bytes, + signed_by: str, +) -> Optional[Annotated[str, "Error Message"]]: + # Basic type checks + if not isinstance(signature, str): + return "Invalid signature type" + if not isinstance(body, bytes): + return "Body is not of type bytes" + if not isinstance(signed_by, str): + return "Invalid 'signed_by' address" + + # Check that the 'signed_by' matches the expected SS58 address + if signed_by != shared_settings.API_HOTKEY: + return "Message not from the expected SS58 address" + + # Build the message to verify + message = f"{sha256(body).hexdigest()}.{uuid}.{timestamp}.{signed_for}" + + # Create a keypair using the 'signed_by' address + keypair = Keypair(ss58_address=signed_by) + + # Verify the signature + verified = keypair.verify(message, signature) + if not verified: + return "Signature mismatch" + + return None + @router.post("/scoring") async def score_response(request: Request, api_key_data: dict = Depends(verify_signature)): diff --git a/shared/settings.py b/shared/settings.py index 8ec4b202d..838081461 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -85,8 +85,8 @@ class SharedSettings(BaseSettings): DEPLOY_VALIDATOR: bool = Field(True, env="DEPLOY_VALDITAOR") # ==== API ===== - # API key used to access validator organic scoring mechanism (both .env.validator and .env.api). - SCORING_KEY: str | None = Field(None, env="SCORING_KEY") + # Hotkey used to run api, defaults to Macrocosmos + API_HOTKEY: str = Field("5Cg5QgjMfRqBC6bh8X4PDbQi7UzVRn9eyWXsB8gkyfppFPPy", env = "API_HOTKEY") # Validator scoring API (.env.validator). DEPLOY_SCORING_API: bool = Field(True, env="DEPLOY_SCORING_API") From f0cb9cb48b22003eaeb205f7f4ff307ef2719c78 Mon Sep 17 00:00:00 2001 From: richwardle Date: Sun, 9 Feb 2025 19:33:24 +0000 Subject: [PATCH 10/28] Small syntax fixes --- shared/uids.py | 14 ++++----- validator_api/miner_availabilities.py | 6 ++-- validator_api/utils.py | 1 + validator_api/validator_forwarding.py | 45 ++++++++++++++++----------- 4 files changed, 39 insertions(+), 27 deletions(-) diff --git a/shared/uids.py b/shared/uids.py index a8c99b5d2..a4555959d 100644 --- a/shared/uids.py +++ b/shared/uids.py @@ -25,17 +25,17 @@ def check_uid_availability( metagraph = shared_settings.METAGRAPH # Filter non serving axons. if not metagraph.axons[uid].is_serving: - logger.debug(f"uid: {uid} is not serving") + # logger.debug(f"uid: {uid} is not serving") return False # Filter validator permit > 1024 stake. if metagraph.validator_permit[uid] and metagraph.S[uid] > shared_settings.NEURON_VPERMIT_TAO_LIMIT: - logger.debug( - f"uid: {uid} has vpermit and stake ({metagraph.S[uid]}) > {shared_settings.NEURON_VPERMIT_TAO_LIMIT}" - ) - logger.debug( - f"uid: {uid} has vpermit and stake ({metagraph.S[uid]}) > {shared_settings.NEURON_VPERMIT_TAO_LIMIT}" - ) + # logger.debug( + # f"uid: {uid} has vpermit and stake ({metagraph.S[uid]}) > {shared_settings.NEURON_VPERMIT_TAO_LIMIT}" + # ) + # logger.debug( + # f"uid: {uid} has vpermit and stake ({metagraph.S[uid]}) > {shared_settings.NEURON_VPERMIT_TAO_LIMIT}" + # ) return False if coldkeys and metagraph.axons[uid].coldkey in coldkeys: diff --git a/validator_api/miner_availabilities.py b/validator_api/miner_availabilities.py index e9ae0dae6..ca1dddd90 100644 --- a/validator_api/miner_availabilities.py +++ b/validator_api/miner_availabilities.py @@ -58,13 +58,15 @@ def get_available_miner(task: Optional[str] = None, model: Optional[str] = None) class MinerAvailabilitiesUpdater(AsyncLoopRunner): - interval: int = 20 + interval: int = 40 async def run_step(self): - uids = get_uids(sampling_mode="random", k=100) + uids = get_uids(sampling_mode="random", k=100) # TODO: We should probably not just randomly sample uids, there's likely a better way to do this. + # TODO: Default to highest stake validator's availability api url = f"{shared_settings.VALIDATOR_API}/miner_availabilities/miner_availabilities" try: + # TODO: Need to add some level of ddos protection for this result = requests.post(url, json=uids.tolist(), timeout=10) result.raise_for_status() # Raise an exception for bad status codes diff --git a/validator_api/utils.py b/validator_api/utils.py index 5a70f61e2..03888af12 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -4,6 +4,7 @@ from shared.loop_runner import AsyncLoopRunner from shared.settings import shared_settings from validator_api.validator_forwarding import ValidatorRegistry +from pydantic import BaseModel, Field validator_registry = ValidatorRegistry() diff --git a/validator_api/validator_forwarding.py b/validator_api/validator_forwarding.py index 8af527a18..f497c099c 100644 --- a/validator_api/validator_forwarding.py +++ b/validator_api/validator_forwarding.py @@ -1,25 +1,28 @@ +import random +import numpy as np +from typing import ClassVar, Optional, Tuple, List +from pydantic import BaseModel, model_validator, Field from shared.settings import shared_settings -from pydantic import BaseModel, model_validator -from typing import ClassVar from shared.misc import cached_property_with_expiration class Validator(BaseModel): uid: int stake: float - axon: str + axon: List[str] # Changed to List[str] since we split the string hotkey: str failures: int = 0 # Define a constant for the maximum number of allowed failures. MAX_FAILURES: ClassVar[int] = 9 - def update_failure(self, status_code: int) -> None: + def update_failure(self, status_code: int) -> int: """ - Update the validator's failure count based on the success status. + Update the validator's failure count based on the operation status. - - If the operation was successful, decrease the failure count (ensuring it doesn't go below 0). + - If the operation was successful (status_code == 200), decrease the failure count (ensuring it doesn't go below 0). - If the operation failed, increase the failure count. - - If the failure count exceeds MAX_FAILURES, mark the validator as inactive. + - If the failure count exceeds MAX_FAILURES, return 1 to indicate the validator should be deactivated. + Otherwise, return 0. """ if status_code == 200: self.failures = max(0, self.failures - 1) @@ -32,40 +35,46 @@ def update_failure(self, status_code: int) -> None: class ValidatorRegistry(BaseModel): """ Class to store the success of forwards to validator axons. - If a validator is routinely failing to respond to scoring requests, - we should stop sending them requests. + Validators that routinely fail to respond to scoring requests are removed. """ # Using a default factory ensures validators is always a dict. - validators: Dict[int, Validator] = Field(default_factory=dict) + validators: dict[int, Validator] = Field(default_factory=dict) spot_checking_rate: ClassVar[float] = 0.3 @model_validator(mode='after') - def create_validator_list(cls, v: "ValidatorRegistry", metagraph = shared_settings.METAGRAPH) -> "ValidatorRegistry": - validator_uids = [ - uid for uid in metagraph.validator_permit if metagraph.validator_permit[uid] - ] + def create_validator_list(cls, v: "ValidatorRegistry", metagraph=shared_settings.METAGRAPH) -> "ValidatorRegistry": + validator_uids = np.where(metagraph.validator_permit)[0].tolist() validator_axons = [ metagraph.axons[uid].ip_str().split("/") for uid in validator_uids ] validator_stakes = [metagraph.stake[uid] for uid in validator_uids] validator_hotkeys = [metagraph.hotkeys[uid] for uid in validator_uids] v.validators = { - uid: Validator(uid, stake, axon) + uid: Validator(uid=uid, stake=stake, axon=axon, hotkey=hotkey) for uid, stake, axon, hotkey in zip(validator_uids, validator_stakes, validator_axons, validator_hotkeys) } return v - def get_available_axon(self) -> Optional[Tuple[int, list]]: + def get_available_axon(self) -> Optional[Tuple[int, List[str], str]]: + """ + Returns a tuple (uid, axon, hotkey) for a randomly selected validator based on stake weighting, + if spot checking conditions are met. Otherwise, returns None. + """ if random.random() > self.spot_checking_rate or not self.validators: - return None, None + return None validator_list = list(self.validators.values()) weights = [v.stake for v in validator_list] chosen = random.choices(validator_list, weights=weights, k=1)[0] return chosen.uid, chosen.axon, chosen.hotkey def update_validators(self, uid: int, response_code: int) -> None: + """ + Update a specific validator's failure count based on the response code. + If the validator's failure count exceeds the maximum allowed failures, + the validator is removed from the registry. + """ if uid in self.validators: - max_failures_reached = self.validators[uid].update_failure(success) + max_failures_reached = self.validators[uid].update_failure(response_code) if max_failures_reached: del self.validators[uid] From e27b91f2019d266587e0b348ddb24246e2a8a720 Mon Sep 17 00:00:00 2001 From: richwardle Date: Mon, 10 Feb 2025 14:16:19 +0000 Subject: [PATCH 11/28] Linting --- neurons/validator.py | 9 +++-- prompting/api/scoring/api.py | 54 ++++++++++++--------------- prompting/llms/hf_llm.py | 24 ++++++------ shared/settings.py | 8 ++-- validator_api/gpt_endpoints.py | 4 +- validator_api/miner_availabilities.py | 4 +- validator_api/utils.py | 20 +++++----- validator_api/validator_forwarding.py | 45 +++++++++++++++------- 8 files changed, 89 insertions(+), 79 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 80b6b7509..5fe754917 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -4,8 +4,11 @@ import time import loguru +import netaddr +import requests import torch import wandb +from bittensor.core.extrinsics.serving import serve_extrinsic # ruff: noqa: E402 from shared import settings @@ -90,12 +93,11 @@ async def start(): from prompting.miner_availability.miner_availability import availability_checking_loop asyncio.create_task(availability_checking_loop.start()) - import bittensor as bt try: external_ip = requests.get("https://checkip.amazonaws.com").text.strip() netaddr.IPAddress(external_ip) - + serve_success = serve_extrinsic( subtensor=shared_settings.SUBTENSOR, wallet=shared_settings.WALLET, @@ -105,7 +107,8 @@ async def start(): netuid=shared_settings.NETUID, ) - except Exception as e: + logger.debug(f"Serve success: {serve_success}") + except Exception as e: logger.warning(f"Failed to serve scoring api to chain: {e}") await start_scoring_api(scoring_queue, reward_events) diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index 2aa79c139..aa6c1efe9 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -1,7 +1,8 @@ import uuid from typing import Any +import time -from fastapi import APIRouter, Depends, Header, HTTPException, Request +from fastapi import APIRouter, Depends, Request, HTTPException from loguru import logger from prompting.datasets.random_website import DDGDatasetEntry @@ -9,47 +10,40 @@ from prompting.rewards.scoring import task_scorer from prompting.tasks.inference import InferenceTask from prompting.tasks.web_retrieval import WebRetrievalTask -from shared.epistula import verify_signature from shared.base import DatasetEntry from shared.dendrite import DendriteResponseEvent -from shared.epistula import SynapseStreamResult +from shared.epistula import SynapseStreamResult, verify_signature from shared.settings import shared_settings router = APIRouter() -def verify_scoring_signature( - signature: str, - body: bytes, - signed_by: str, -) -> Optional[Annotated[str, "Error Message"]]: - # Basic type checks - if not isinstance(signature, str): - return "Invalid signature type" - if not isinstance(body, bytes): - return "Body is not of type bytes" - if not isinstance(signed_by, str): - return "Invalid 'signed_by' address" - # Check that the 'signed_by' matches the expected SS58 address +def verify_scoring_signature(self, request: Request): + signed_by = request.headers.get("Epistula-Signed-By") + signed_for = request.headers.get("Epistula-Signed-For") + if signed_for != shared_settings.WALLET.hotkey.ss58_address: + raise HTTPException(status_code=400, detail="Bad Request, message is not intended for self") if signed_by != shared_settings.API_HOTKEY: - return "Message not from the expected SS58 address" + raise HTTPException(status_code=401, detail="Signer not the expected ss58 address") - # Build the message to verify - message = f"{sha256(body).hexdigest()}.{uuid}.{timestamp}.{signed_for}" - - # Create a keypair using the 'signed_by' address - keypair = Keypair(ss58_address=signed_by) - - # Verify the signature - verified = keypair.verify(message, signature) - if not verified: - return "Signature mismatch" - - return None + body = await request.body() + now = time.time() + err = verify_signature( + request.headers.get("Epistula-Request-Signature"), + body, + request.headers.get("Epistula-Timestamp"), + request.headers.get("Epistula-Uuid"), + signed_for, + signed_by, + now, + ) + if err: + logger.error(err) + raise HTTPException(status_code=400, detail=err) @router.post("/scoring") -async def score_response(request: Request, api_key_data: dict = Depends(verify_signature)): +async def score_response(request: Request, api_key_data: dict = Depends(verify_scoring_signature)): model = None payload: dict[str, Any] = await request.json() body = payload.get("body") diff --git a/prompting/llms/hf_llm.py b/prompting/llms/hf_llm.py index 3e931015f..7671a2e6c 100644 --- a/prompting/llms/hf_llm.py +++ b/prompting/llms/hf_llm.py @@ -6,7 +6,6 @@ from transformers import AutoModelForCausalLM, AutoTokenizer, PreTrainedModel, pipeline from shared.settings import shared_settings -from shared.timer import Timer class ReproducibleHF: @@ -51,18 +50,17 @@ def generate(self, messages: list[str] | list[dict], sampling_params=None, seed= params = sampling_params if sampling_params else self.sampling_params filtered_params = {k: v for k, v in params.items() if k in self.valid_generation_params} - with Timer() as timer: - # Generate with optimized settings - outputs = self.model.generate( - **inputs.to(shared_settings.NEURON_DEVICE), - **filtered_params, - eos_token_id=self.tokenizer.eos_token_id, - ) - - results = self.tokenizer.batch_decode( - outputs[:, inputs["input_ids"].shape[1] :], - skip_special_tokens=True, - )[0] + # Generate with optimized settings + outputs = self.model.generate( + **inputs.to(shared_settings.NEURON_DEVICE), + **filtered_params, + eos_token_id=self.tokenizer.eos_token_id, + ) + + results = self.tokenizer.batch_decode( + outputs[:, inputs["input_ids"].shape[1] :], + skip_special_tokens=True, + )[0] logger.debug( f"""{self.__class__.__name__} queried: diff --git a/shared/settings.py b/shared/settings.py index ad9544697..e55e97536 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -86,16 +86,13 @@ class SharedSettings(BaseSettings): # ==== API ===== # Hotkey used to run api, defaults to Macrocosmos - API_HOTKEY: str = Field("5Cg5QgjMfRqBC6bh8X4PDbQi7UzVRn9eyWXsB8gkyfppFPPy", env = "API_HOTKEY") + API_HOTKEY: str = Field("5Cg5QgjMfRqBC6bh8X4PDbQi7UzVRn9eyWXsB8gkyfppFPPy", env="API_HOTKEY") # Scoring request rate limit in seconds. SCORING_RATE_LIMIT_SEC: float = Field(0.5, env="SCORING_RATE_LIMIT_SEC") # Scoring queue threshold when rate-limit start to kick in, used to query validator API with scoring requests. SCORING_QUEUE_API_THRESHOLD: int = Field(5, env="SCORING_QUEUE_API_THRESHOLD") # Validator scoring API (.env.validator). - DEPLOY_SCORING_API: bool = Field(True, env="DEPLOY_SCORING_API") - SCORING_API_PORT: int = Field(8094, env="SCORING_API_PORT") - SCORING_ADMIN_KEY: str | None = Field(None, env="SCORING_ADMIN_KEY") SCORE_ORGANICS: bool = Field(False, env="SCORE_ORGANICS") WORKERS: int = Field(2, env="WORKERS") @@ -103,7 +100,8 @@ class SharedSettings(BaseSettings): API_PORT: int = Field(8005, env="API_PORT") API_HOST: str = Field("0.0.0.0", env="API_HOST") # Validator scoring API address. - VALIDATOR_API: str = Field("0.0.0.0:8094", env="VALIDATOR_API") + # TODO: Choose this dynamically from the network + VALIDATOR_API: str = Field("184.105.5.17:8094", env="VALIDATOR_API") # Used for availability # Default SN1 API address DEFAULT_SN1_API: str = Field("http://sn1.api.macrocosmos.ai:11198/v1", env="DEFAULT_SN1_API") # File with keys used to access API. diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index 0ae9f8164..226601927 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -95,10 +95,8 @@ async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] = if len(loaded_results) == 0: raise HTTPException(status_code=500, detail="No miner responded successfully") - collected_chunks_list = [res.accumulated_chunks if res and res.accumulated_chunks else [] for res in stream_results] - asyncio.create_task(forward_response(uids, body, collected_chunks_list)) - asyncio.create_task(scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=chunks)) + asyncio.create_task(scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=collected_chunks_list)) return loaded_results diff --git a/validator_api/miner_availabilities.py b/validator_api/miner_availabilities.py index ca1dddd90..a4d88b774 100644 --- a/validator_api/miner_availabilities.py +++ b/validator_api/miner_availabilities.py @@ -61,7 +61,9 @@ class MinerAvailabilitiesUpdater(AsyncLoopRunner): interval: int = 40 async def run_step(self): - uids = get_uids(sampling_mode="random", k=100) # TODO: We should probably not just randomly sample uids, there's likely a better way to do this. + uids = get_uids( + sampling_mode="random", k=100 + ) # TODO: We should probably not just randomly sample uids, there's likely a better way to do this. # TODO: Default to highest stake validator's availability api url = f"{shared_settings.VALIDATOR_API}/miner_availabilities/miner_availabilities" diff --git a/validator_api/utils.py b/validator_api/utils.py index 03888af12..41f5e48a7 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -1,15 +1,17 @@ +import httpx import requests from loguru import logger +from shared.epistula import create_header_hook from shared.loop_runner import AsyncLoopRunner from shared.settings import shared_settings from validator_api.validator_forwarding import ValidatorRegistry -from pydantic import BaseModel, Field validator_registry = ValidatorRegistry() # make class w/ getter that yields validator_axon (creates from shared_settings) based on criterea (stake*x*Y) - + + # TODO: Modify this so that all the forwarded responses are sent in a single request. This is both more efficient but # also means that on the validator side all responses are scored at once, speeding up the scoring process. async def forward_response(uids: int, body: dict[str, any], chunks: list[str]): @@ -29,21 +31,19 @@ async def forward_response(uids: int, body: dict[str, any], chunks: list[str]): logger.warning(e) vali_uid, vali_axon = None, None if not vali_uid: - logger.warning("Unable to get an available validator, either through spot-checking restrictions or errors, skipping scoring") + logger.warning( + "Unable to get an available validator, either through spot-checking restrictions or errors, skipping scoring" + ) return - url = f"http://{vali_axon}/scoring" + url = f"http://{vali_axon}/scoring" payload = {"body": body, "chunks": chunk_dict, "uid": uids} # Create an AsyncClient that attaches the header hook. # The header hook is created by passing the wallet’s hotkey and the axon’s hotkey. # Adjust the attribute access as needed depending on how your axon object is defined. async with httpx.AsyncClient( - timeout=timeout, - event_hooks={ - "request": [ - create_header_hook(shared_settings.WALLET.hotkey, vali_hotkey) - ] - }, + timeout=httpx.Timeout(connect=5, read=5, write=5), + event_hooks={"request": [create_header_hook(shared_settings.WALLET.hotkey, vali_hotkey)]}, ) as client: try: response = await client.post( diff --git a/validator_api/validator_forwarding.py b/validator_api/validator_forwarding.py index f497c099c..de527bc1d 100644 --- a/validator_api/validator_forwarding.py +++ b/validator_api/validator_forwarding.py @@ -1,16 +1,20 @@ import random +import time +from typing import ClassVar, List, Optional, Tuple + import numpy as np -from typing import ClassVar, Optional, Tuple, List -from pydantic import BaseModel, model_validator, Field +from pydantic import BaseModel, Field, model_validator + from shared.settings import shared_settings -from shared.misc import cached_property_with_expiration + class Validator(BaseModel): uid: int stake: float - axon: List[str] # Changed to List[str] since we split the string + axon: List[str] # Changed to List[str] since we split the string hotkey: str - failures: int = 0 + timeout: int = 2 # starting cooldown in seconds; doubles on failure (capped at 86400) + available_at: float = 0.0 # Unix timestamp indicating when the validator is next available # Define a constant for the maximum number of allowed failures. MAX_FAILURES: ClassVar[int] = 9 @@ -24,30 +28,37 @@ def update_failure(self, status_code: int) -> int: - If the failure count exceeds MAX_FAILURES, return 1 to indicate the validator should be deactivated. Otherwise, return 0. """ + current_time = time.time() if status_code == 200: self.failures = max(0, self.failures - 1) + self.timeout = 2 + self.available_at = current_time else: self.failures += 1 - if self.failures > self.MAX_FAILURES: - return 1 - return 0 + self.timeout = min(self.timeout * 2, 86400) + self.available_at = current_time + self.timeout + + def is_available(self): + """ + Check if the validator is available based on its cooldown. + """ + return time.time() >= self.available_at + class ValidatorRegistry(BaseModel): """ Class to store the success of forwards to validator axons. Validators that routinely fail to respond to scoring requests are removed. """ - + # Using a default factory ensures validators is always a dict. validators: dict[int, Validator] = Field(default_factory=dict) spot_checking_rate: ClassVar[float] = 0.3 - @model_validator(mode='after') + @model_validator(mode="after") def create_validator_list(cls, v: "ValidatorRegistry", metagraph=shared_settings.METAGRAPH) -> "ValidatorRegistry": validator_uids = np.where(metagraph.validator_permit)[0].tolist() - validator_axons = [ - metagraph.axons[uid].ip_str().split("/") for uid in validator_uids - ] + validator_axons = [metagraph.axons[uid].ip_str().split("/") for uid in validator_uids] validator_stakes = [metagraph.stake[uid] for uid in validator_uids] validator_hotkeys = [metagraph.hotkeys[uid] for uid in validator_uids] v.validators = { @@ -56,6 +67,12 @@ def create_validator_list(cls, v: "ValidatorRegistry", metagraph=shared_settings } return v + def get_available_validators(self) -> List[Validator]: + """ + Given a list of validators, return only those that are not in their cooldown period. + """ + return [v for v in self.validators if v.is_available()] + def get_available_axon(self) -> Optional[Tuple[int, List[str], str]]: """ Returns a tuple (uid, axon, hotkey) for a randomly selected validator based on stake weighting, @@ -63,7 +80,7 @@ def get_available_axon(self) -> Optional[Tuple[int, List[str], str]]: """ if random.random() > self.spot_checking_rate or not self.validators: return None - validator_list = list(self.validators.values()) + validator_list = self.get_available_validators() weights = [v.stake for v in validator_list] chosen = random.choices(validator_list, weights=weights, k=1)[0] return chosen.uid, chosen.axon, chosen.hotkey From c2ffcb789290d82b43118969ee2f7fd3a25ae815 Mon Sep 17 00:00:00 2001 From: richwardle Date: Mon, 10 Feb 2025 14:21:38 +0000 Subject: [PATCH 12/28] More Linting --- prompting/api/scoring/api.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index aa6c1efe9..d6d547237 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -1,8 +1,8 @@ +import time import uuid from typing import Any -import time -from fastapi import APIRouter, Depends, Request, HTTPException +from fastapi import APIRouter, Depends, HTTPException, Request from loguru import logger from prompting.datasets.random_website import DDGDatasetEntry @@ -18,7 +18,7 @@ router = APIRouter() -def verify_scoring_signature(self, request: Request): +async def verify_scoring_signature(self, request: Request): signed_by = request.headers.get("Epistula-Signed-By") signed_for = request.headers.get("Epistula-Signed-For") if signed_for != shared_settings.WALLET.hotkey.ss58_address: From 51fdb66be55636f175a7da66385b90b2ad8bdf85 Mon Sep 17 00:00:00 2001 From: richwardle Date: Mon, 10 Feb 2025 17:38:31 +0000 Subject: [PATCH 13/28] Checking response status codes --- api_keys.json | 2 +- prompting/llms/apis/sn19_wrapper.py | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/api_keys.json b/api_keys.json index 0967ef424..7fdb8f043 100644 --- a/api_keys.json +++ b/api_keys.json @@ -1 +1 @@ -{} +{"14ff91d2ec775415624c6475d385a441": {"rate_limit": 100, "usage": 0}} \ No newline at end of file diff --git a/prompting/llms/apis/sn19_wrapper.py b/prompting/llms/apis/sn19_wrapper.py index 4d50a280f..43ee327f3 100644 --- a/prompting/llms/apis/sn19_wrapper.py +++ b/prompting/llms/apis/sn19_wrapper.py @@ -1,5 +1,5 @@ import json - +from loguru import logger import requests from tenacity import retry, stop_after_attempt, wait_exponential @@ -7,7 +7,6 @@ from shared.settings import shared_settings -# TODO: key error in response.json() when response is 500 @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) def chat_complete( messages: LLMMessages, @@ -36,6 +35,10 @@ def chat_complete( "logprobs": logprobs, } response = requests.post(url, headers=headers, data=json.dumps(data), timeout=30) + if not response.status_code == 200: + logger.error(f"SN19 API returned status code {response.status_code}") + logger.error(f"Response: {response.text}") + raise Exception(f"SN19 API returned status code {response.status_code}") response_json = response.json() try: return response_json["choices"][0]["message"].get("content") From dbfcd577af6f56a0a1c9ecb2fe4b527e5a8b400f Mon Sep 17 00:00:00 2001 From: richwardle Date: Tue, 11 Feb 2025 12:30:20 +0000 Subject: [PATCH 14/28] Merge Fixes --- neurons/validator.py | 4 ++-- prompting/api/api.py | 1 + shared/settings.py | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 635041622..6a75bcf0d 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -13,6 +13,7 @@ # ruff: noqa: E402 from shared import settings from shared.logging import init_wandb +from prompting.rewards.scoring import task_scorer settings.shared_settings = settings.SharedSettings.load(mode="validator") @@ -37,7 +38,7 @@ async def spawn_loops(task_queue, scoring_queue, reward_events): # ruff: noqa: E402 from prompting.llms.model_manager import model_scheduler from prompting.miner_availability.miner_availability import availability_checking_loop - from prompting.rewards.scoring import task_scorer + from prompting.tasks.task_creation import task_loop from prompting.tasks.task_sending import task_sender from prompting.weight_setting.weight_setter import weight_setter @@ -90,7 +91,6 @@ async def start(): # TODO: We should not use 2 availability loops for each process, in reality # we should only be sharing the miner availability data between processes. from prompting.miner_availability.miner_availability import availability_checking_loop - from prompting.rewards.scoring import task_scorer asyncio.create_task(availability_checking_loop.start()) diff --git a/prompting/api/api.py b/prompting/api/api.py index 34b5dadc7..48dfb37a3 100644 --- a/prompting/api/api.py +++ b/prompting/api/api.py @@ -4,6 +4,7 @@ from prompting.api.miner_availabilities.api import router as miner_availabilities_router from prompting.api.scoring.api import router as scoring_router +#from prompting.rewards.scoring import task_scorer from shared import settings app = FastAPI() diff --git a/shared/settings.py b/shared/settings.py index e55e97536..dd4aeb6c3 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -83,6 +83,7 @@ class SharedSettings(BaseSettings): SCORING_QUEUE_LENGTH_THRESHOLD: int = Field(10, env="SCORING_QUEUE_LENGTH_THRESHOLD") HF_TOKEN: Optional[str] = Field(None, env="HF_TOKEN") DEPLOY_VALIDATOR: bool = Field(True, env="DEPLOY_VALDITAOR") + DEPLOY_SCORING_API: bool = Field(True, env="DEPLOY_SCORING_API") # ==== API ===== # Hotkey used to run api, defaults to Macrocosmos From b6ad8c719bcd407f4ff298c2c2d7026435262b55 Mon Sep 17 00:00:00 2001 From: richwardle Date: Tue, 11 Feb 2025 16:49:53 +0000 Subject: [PATCH 15/28] Big ol fix --- neurons/validator.py | 8 ++++---- prompting/api/scoring/api.py | 4 ++-- shared/settings.py | 1 + validator_api/scoring_queue.py | 7 +++++-- validator_api/utils.py | 5 ++--- validator_api/validator_forwarding.py | 5 ++--- 6 files changed, 16 insertions(+), 14 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 6a75bcf0d..e38ec662d 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -99,12 +99,12 @@ async def start(): netaddr.IPAddress(external_ip) serve_success = serve_extrinsic( - subtensor=shared_settings.SUBTENSOR, - wallet=shared_settings.WALLET, + subtensor=settings.shared_settings.SUBTENSOR, + wallet=settings.shared_settings.WALLET, ip=external_ip, - port=shared_settings.SCORING_API_PORT, + port=settings.shared_settings.SCORING_API_PORT, protocol=4, - netuid=shared_settings.NETUID, + netuid=settings.shared_settings.NETUID, ) logger.debug(f"Serve success: {serve_success}") diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index 4a41efc67..885d87a12 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -17,8 +17,8 @@ router = APIRouter() -def validate_scoring_key(api_key: str = Header(...)): - if api_key != settings.shared_settings.SCORING_KEY: +def validate_scoring_key(request: Request): + if request.headers.api_key != settings.shared_settings.SCORING_KEY: raise HTTPException(status_code=403, detail="Invalid API key") diff --git a/shared/settings.py b/shared/settings.py index dd4aeb6c3..3ad6825cd 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -84,6 +84,7 @@ class SharedSettings(BaseSettings): HF_TOKEN: Optional[str] = Field(None, env="HF_TOKEN") DEPLOY_VALIDATOR: bool = Field(True, env="DEPLOY_VALDITAOR") DEPLOY_SCORING_API: bool = Field(True, env="DEPLOY_SCORING_API") + SCORING_API_PORT: int = Field(8095, env = "SCORING_API_PORT") # ==== API ===== # Hotkey used to run api, defaults to Macrocosmos diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 493bc55a8..300b5be9b 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -9,6 +9,8 @@ from shared import settings from shared.loop_runner import AsyncLoopRunner +from validator_api.validator_forwarding import ValidatorRegistry +validator_registry = ValidatorRegistry() shared_settings = settings.shared_settings @@ -46,8 +48,8 @@ async def run_step(self): payload = scoring_payload.payload uids = payload["uid"] logger.info(f"Received new organic for scoring, uids: {uids}") - - url = f"http://{shared_settings.VALIDATOR_API}/scoring" + vali_uid, vali_axon, vali_hotkey = validator_registry.get_available_axon() + url = f"http://{vali_axon}/scoring" try: timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) async with httpx.AsyncClient(timeout=timeout) as client: @@ -56,6 +58,7 @@ async def run_step(self): json=payload, headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"}, ) + validator_registry.update_validators(uid=vali_uid, response_code=response.status_code) if response.status_code != 200: # Raise an exception so that the retry logic in the except block handles it. raise Exception(f"Non-200 response: {response.status_code} for uids {uids}") diff --git a/validator_api/utils.py b/validator_api/utils.py index 6995d91dc..934243c99 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -5,8 +5,7 @@ from shared.epistula import create_header_hook from shared.loop_runner import AsyncLoopRunner from shared.uids import get_uids - -shared_settings = settings.shared_settings +from shared import settings class UpdateMinerAvailabilitiesForAPI(AsyncLoopRunner): @@ -15,7 +14,7 @@ class UpdateMinerAvailabilitiesForAPI(AsyncLoopRunner): async def run_step(self): try: response = requests.post( - f"http://{shared_settings.VALIDATOR_API}/miner_availabilities/miner_availabilities", + f"http://{settings.shared_settings.VALIDATOR_API}/miner_availabilities/miner_availabilities", headers={"accept": "application/json", "Content-Type": "application/json"}, json=get_uids(sampling_mode="all"), timeout=10, diff --git a/validator_api/validator_forwarding.py b/validator_api/validator_forwarding.py index de527bc1d..aa2f36b24 100644 --- a/validator_api/validator_forwarding.py +++ b/validator_api/validator_forwarding.py @@ -92,6 +92,5 @@ def update_validators(self, uid: int, response_code: int) -> None: the validator is removed from the registry. """ if uid in self.validators: - max_failures_reached = self.validators[uid].update_failure(response_code) - if max_failures_reached: - del self.validators[uid] + self.validators[uid].update_failure(response_code) + From 99751f55af2b2d6d11eb6d7b3d3767e177a6584e Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Tue, 11 Feb 2025 17:13:51 +0000 Subject: [PATCH 16/28] Make scoring queue start --- shared/settings.py | 2 +- validator_api/api.py | 22 +++++++++++++--------- validator_api/utils.py | 1 - 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/shared/settings.py b/shared/settings.py index 3ad6825cd..8110a2deb 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -90,7 +90,7 @@ class SharedSettings(BaseSettings): # Hotkey used to run api, defaults to Macrocosmos API_HOTKEY: str = Field("5Cg5QgjMfRqBC6bh8X4PDbQi7UzVRn9eyWXsB8gkyfppFPPy", env="API_HOTKEY") # Scoring request rate limit in seconds. - SCORING_RATE_LIMIT_SEC: float = Field(0.5, env="SCORING_RATE_LIMIT_SEC") + SCORING_RATE_LIMIT_SEC: float = Field(1, env="SCORING_RATE_LIMIT_SEC") # Scoring queue threshold when rate-limit start to kick in, used to query validator API with scoring requests. SCORING_QUEUE_API_THRESHOLD: int = Field(5, env="SCORING_QUEUE_API_THRESHOLD") diff --git a/validator_api/api.py b/validator_api/api.py index af80c43c4..1ffb7de24 100644 --- a/validator_api/api.py +++ b/validator_api/api.py @@ -17,14 +17,18 @@ @contextlib.asynccontextmanager async def lifespan(app: FastAPI): - # Startup: start the background tasks. - background_task = asyncio.create_task(update_miner_availabilities_for_api.start()) - yield - background_task.cancel() + availability_task = asyncio.create_task(update_miner_availabilities_for_api.start()) + scoring_task = asyncio.create_task(scoring_queue.scoring_queue.start()) + try: - await background_task - except asyncio.CancelledError: - pass + yield + finally: + availability_task.cancel() + scoring_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await availability_task + with contextlib.suppress(asyncio.CancelledError): + await scoring_task # Create the FastAPI app with the lifespan handler. @@ -39,8 +43,8 @@ async def health(): async def main(): - asyncio.create_task(update_miner_availabilities_for_api.start()) - asyncio.create_task(scoring_queue.scoring_queue.start()) + # asyncio.create_task(update_miner_availabilities_for_api.start()) + # asyncio.create_task(scoring_queue.scoring_queue.start()) uvicorn.run( "validator_api.api:app", host=shared_settings.API_HOST, diff --git a/validator_api/utils.py b/validator_api/utils.py index 934243c99..3527bfe03 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -1,4 +1,3 @@ -import httpx import requests from loguru import logger From 1e83747f1e341002ac60050a3610115899d74d4c Mon Sep 17 00:00:00 2001 From: richwardle Date: Tue, 11 Feb 2025 19:46:05 +0000 Subject: [PATCH 17/28] Change to dict rather than list --- shared/loop_runner.py | 20 ++++++++++---------- validator_api/scoring_queue.py | 2 +- validator_api/validator_forwarding.py | 17 +++++++++-------- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/shared/loop_runner.py b/shared/loop_runner.py index 43d380bd4..06348d233 100644 --- a/shared/loop_runner.py +++ b/shared/loop_runner.py @@ -33,7 +33,7 @@ async def get_time(self): """Get the current time from the time server with a timeout.""" if not self.sync: time = datetime.datetime.now(datetime.timezone.utc) - logger.debug(f"Time: {time}") + # logger.debug(f"Time: {time}") return time try: async with aiohttp.ClientSession() as session: @@ -64,13 +64,13 @@ async def wait_for_next_execution(self, last_run_time): next_run = self.next_sync_point(current_time) else: next_run = last_run_time + timedelta(seconds=self.interval) - logger.debug(f"Next run: {next_run}") + # logger.debug(f"Next run: {next_run}") wait_time = (next_run - current_time).total_seconds() if wait_time > 0: - logger.debug( - f"{self.name}: Waiting for {wait_time:.2f} seconds until next {'sync point' if self.sync else 'execution'}" - ) + # logger.debug( + # f"{self.name}: Waiting for {wait_time:.2f} seconds until next {'sync point' if self.sync else 'execution'}" + # ) await asyncio.sleep(wait_time) return next_run @@ -83,14 +83,14 @@ async def run_loop(self): try: while self.running: with profiler.measure(self.name): - logger.debug("Waiting...") + # logger.debug("Waiting...") next_run = await self.wait_for_next_execution(last_run_time) - logger.debug("Wait ended") + # logger.debug("Wait ended") try: - run_results = await self.run_step() - logger.debug(f"Run_results: {run_results}") + await self.run_step() # run_results = await self.run_step() + #logger.debug(f"Run_results: {run_results}") self.step += 1 - logger.debug(f"{self.name}: Step {self.step} completed at {next_run}") + # logger.debug(f"{self.name}: Step {self.step} completed at {next_run}") except Exception as ex: logger.exception(f"Error in loop iteration: {ex}") last_run_time = next_run diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 300b5be9b..190b6f0f5 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -56,7 +56,7 @@ async def run_step(self): response = await client.post( url=url, json=payload, - headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"}, + headers={"Content-Type": "application/json"}, ) validator_registry.update_validators(uid=vali_uid, response_code=response.status_code) if response.status_code != 200: diff --git a/validator_api/validator_forwarding.py b/validator_api/validator_forwarding.py index aa2f36b24..4131955b3 100644 --- a/validator_api/validator_forwarding.py +++ b/validator_api/validator_forwarding.py @@ -11,7 +11,7 @@ class Validator(BaseModel): uid: int stake: float - axon: List[str] # Changed to List[str] since we split the string + axon: str hotkey: str timeout: int = 2 # starting cooldown in seconds; doubles on failure (capped at 86400) available_at: float = 0.0 # Unix timestamp indicating when the validator is next available @@ -53,12 +53,13 @@ class ValidatorRegistry(BaseModel): # Using a default factory ensures validators is always a dict. validators: dict[int, Validator] = Field(default_factory=dict) - spot_checking_rate: ClassVar[float] = 0.3 + spot_checking_rate: ClassVar[float] = 0.0 @model_validator(mode="after") def create_validator_list(cls, v: "ValidatorRegistry", metagraph=shared_settings.METAGRAPH) -> "ValidatorRegistry": - validator_uids = np.where(metagraph.validator_permit)[0].tolist() - validator_axons = [metagraph.axons[uid].ip_str().split("/") for uid in validator_uids] + validator_uids = np.where(metagraph.stake >= 100000)[0].tolist() + print(f"uids: {validator_uids}") + validator_axons = [metagraph.axons[uid].ip_str().split("/")[2] for uid in validator_uids] validator_stakes = [metagraph.stake[uid] for uid in validator_uids] validator_hotkeys = [metagraph.hotkeys[uid] for uid in validator_uids] v.validators = { @@ -71,18 +72,18 @@ def get_available_validators(self) -> List[Validator]: """ Given a list of validators, return only those that are not in their cooldown period. """ - return [v for v in self.validators if v.is_available()] + return [uid for uid, validator in self.validators.items() if validator.is_available()] def get_available_axon(self) -> Optional[Tuple[int, List[str], str]]: """ Returns a tuple (uid, axon, hotkey) for a randomly selected validator based on stake weighting, if spot checking conditions are met. Otherwise, returns None. """ - if random.random() > self.spot_checking_rate or not self.validators: + if random.random() < self.spot_checking_rate or not self.validators: return None validator_list = self.get_available_validators() - weights = [v.stake for v in validator_list] - chosen = random.choices(validator_list, weights=weights, k=1)[0] + weights = [self.validators[uid].stake for uid in validator_list] + chosen = self.validators[random.choices(validator_list, weights=weights, k=1)[0]] return chosen.uid, chosen.axon, chosen.hotkey def update_validators(self, uid: int, response_code: int) -> None: From 066bc45c7a14dff6a9b1494bf287ea11c0cef202 Mon Sep 17 00:00:00 2001 From: richwardle Date: Wed, 12 Feb 2025 11:40:26 +0000 Subject: [PATCH 18/28] Increase robustness in case of failure --- validator_api/scoring_queue.py | 8 ++++++-- validator_api/validator_forwarding.py | 21 ++++++++++++++------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 190b6f0f5..872c1e181 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -48,8 +48,12 @@ async def run_step(self): payload = scoring_payload.payload uids = payload["uid"] logger.info(f"Received new organic for scoring, uids: {uids}") - vali_uid, vali_axon, vali_hotkey = validator_registry.get_available_axon() - url = f"http://{vali_axon}/scoring" + try: + vali_uid, vali_axon, vali_hotkey = validator_registry.get_available_axon() + # get_available_axon() will return None if it cannot find an available validator, which will fail to unpack + url = f"http://{vali_axon}/scoring" + except Exception as e: + logger.exception(f"Could not find available validator scoring endpoint: {e}") try: timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) async with httpx.AsyncClient(timeout=timeout) as client: diff --git a/validator_api/validator_forwarding.py b/validator_api/validator_forwarding.py index 4131955b3..ce33182ae 100644 --- a/validator_api/validator_forwarding.py +++ b/validator_api/validator_forwarding.py @@ -1,5 +1,6 @@ import random import time +from loguru import logger from typing import ClassVar, List, Optional, Tuple import numpy as np @@ -13,11 +14,9 @@ class Validator(BaseModel): stake: float axon: str hotkey: str - timeout: int = 2 # starting cooldown in seconds; doubles on failure (capped at 86400) + timeout: int = 1 # starting cooldown in seconds; doubles on failure (capped at 86400) available_at: float = 0.0 # Unix timestamp indicating when the validator is next available - # Define a constant for the maximum number of allowed failures. - MAX_FAILURES: ClassVar[int] = 9 def update_failure(self, status_code: int) -> int: """ @@ -30,12 +29,11 @@ def update_failure(self, status_code: int) -> int: """ current_time = time.time() if status_code == 200: - self.failures = max(0, self.failures - 1) - self.timeout = 2 + self.timeout = 1 self.available_at = current_time else: self.failures += 1 - self.timeout = min(self.timeout * 2, 86400) + self.timeout = min(self.timeout * 4, 86400) self.available_at = current_time + self.timeout def is_available(self): @@ -54,6 +52,7 @@ class ValidatorRegistry(BaseModel): # Using a default factory ensures validators is always a dict. validators: dict[int, Validator] = Field(default_factory=dict) spot_checking_rate: ClassVar[float] = 0.0 + max_retries: ClassVar[int] = 4 @model_validator(mode="after") def create_validator_list(cls, v: "ValidatorRegistry", metagraph=shared_settings.METAGRAPH) -> "ValidatorRegistry": @@ -81,7 +80,15 @@ def get_available_axon(self) -> Optional[Tuple[int, List[str], str]]: """ if random.random() < self.spot_checking_rate or not self.validators: return None - validator_list = self.get_available_validators() + for _ in range(self.max_retries): + validator_list = self.get_available_validators() + if validator_list: + break + else: + time.sleep(5) + if not validator_list: + logger.error(f"Could not find available validator after {self.max_retries}") + return None weights = [self.validators[uid].stake for uid in validator_list] chosen = self.validators[random.choices(validator_list, weights=weights, k=1)[0]] return chosen.uid, chosen.axon, chosen.hotkey From 43fe56523403e3c8882d1bcff94e550f66fa0ce9 Mon Sep 17 00:00:00 2001 From: richwardle Date: Wed, 12 Feb 2025 16:50:27 +0000 Subject: [PATCH 19/28] Fix things lost in merge --- neurons/validator.py | 4 +-- prompting/api/api.py | 3 ++- prompting/api/scoring/api.py | 38 ++++++++++++++++++++++----- prompting/llms/apis/sn19_wrapper.py | 3 ++- prompting/llms/hf_llm.py | 4 +-- shared/loop_runner.py | 4 +-- shared/settings.py | 4 +-- validator_api/gpt_endpoints.py | 2 ++ validator_api/scoring_queue.py | 22 +++++++++++++--- validator_api/utils.py | 4 +-- validator_api/validator_forwarding.py | 13 ++++----- 11 files changed, 70 insertions(+), 31 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index e38ec662d..da2fd6146 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -10,10 +10,11 @@ import wandb from bittensor.core.extrinsics.serving import serve_extrinsic +from prompting.rewards.scoring import task_scorer + # ruff: noqa: E402 from shared import settings from shared.logging import init_wandb -from prompting.rewards.scoring import task_scorer settings.shared_settings = settings.SharedSettings.load(mode="validator") @@ -38,7 +39,6 @@ async def spawn_loops(task_queue, scoring_queue, reward_events): # ruff: noqa: E402 from prompting.llms.model_manager import model_scheduler from prompting.miner_availability.miner_availability import availability_checking_loop - from prompting.tasks.task_creation import task_loop from prompting.tasks.task_sending import task_sender from prompting.weight_setting.weight_setter import weight_setter diff --git a/prompting/api/api.py b/prompting/api/api.py index 48dfb37a3..6585d529f 100644 --- a/prompting/api/api.py +++ b/prompting/api/api.py @@ -4,7 +4,8 @@ from prompting.api.miner_availabilities.api import router as miner_availabilities_router from prompting.api.scoring.api import router as scoring_router -#from prompting.rewards.scoring import task_scorer + +# from prompting.rewards.scoring import task_scorer from shared import settings app = FastAPI() diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index 885d87a12..8ad313a9a 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -1,10 +1,11 @@ -import time import uuid from typing import Any +import time from fastapi import APIRouter, Depends, HTTPException, Request from loguru import logger + from prompting.datasets.random_website import DDGDatasetEntry from prompting.llms.model_zoo import ModelZoo from prompting.tasks.inference import InferenceTask @@ -12,11 +13,36 @@ from shared import settings from shared.base import DatasetEntry from shared.dendrite import DendriteResponseEvent -from shared.epistula import SynapseStreamResult +from shared.epistula import SynapseStreamResult, verify_signature +from shared.settings import shared_settings router = APIRouter() +async def verify_scoring_signature(self, request: Request): + signed_by = request.headers.get("Epistula-Signed-By") + signed_for = request.headers.get("Epistula-Signed-For") + if signed_for != shared_settings.WALLET.hotkey.ss58_address: + raise HTTPException(status_code=400, detail="Bad Request, message is not intended for self") + if signed_by != shared_settings.API_HOTKEY: + raise HTTPException(status_code=401, detail="Signer not the expected ss58 address") + + body = await request.body() + now = time.time() + err = verify_signature( + request.headers.get("Epistula-Request-Signature"), + body, + request.headers.get("Epistula-Timestamp"), + request.headers.get("Epistula-Uuid"), + signed_for, + signed_by, + now, + ) + if err: + logger.error(err) + raise HTTPException(status_code=400, detail=err) + + def validate_scoring_key(request: Request): if request.headers.api_key != settings.shared_settings.SCORING_KEY: raise HTTPException(status_code=403, detail="Invalid API key") @@ -28,7 +54,7 @@ def get_task_scorer(request: Request): @router.post("/scoring") async def score_response( - request: Request, api_key_data: dict = Depends(validate_scoring_key), task_scorer=Depends(get_task_scorer) + request: Request, api_key_data: dict = Depends(verify_scoring_signature), task_scorer=Depends(get_task_scorer) ): model = None payload: dict[str, Any] = await request.json() @@ -92,10 +118,8 @@ async def score_response( query=search_term, ), response=DendriteResponseEvent( - uids=[uids], - stream_results=[ - SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None]) - ], + uids=uids, + stream_results=[SynapseStreamResult(accumulated_chunks=chunks.get(str(uid), [])) for uid in uids], timeout=body.get("timeout", settings.shared_settings.NEURON_TIMEOUT), ), dataset_entry=DDGDatasetEntry(search_term=search_term), diff --git a/prompting/llms/apis/sn19_wrapper.py b/prompting/llms/apis/sn19_wrapper.py index 6fe31bdab..376cfe0a0 100644 --- a/prompting/llms/apis/sn19_wrapper.py +++ b/prompting/llms/apis/sn19_wrapper.py @@ -1,6 +1,7 @@ import json -from loguru import logger + import requests +from loguru import logger from tenacity import retry, stop_after_attempt, wait_exponential from prompting.llms.apis.llm_messages import LLMMessages diff --git a/prompting/llms/hf_llm.py b/prompting/llms/hf_llm.py index 929027430..7671a2e6c 100644 --- a/prompting/llms/hf_llm.py +++ b/prompting/llms/hf_llm.py @@ -30,7 +30,7 @@ def __init__(self, model_id="hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4 self.llm = pipeline("text-generation", model=self.model, tokenizer=self.tokenizer) - self.sampling_params = settings.shared_settings.SAMPLING_PARAMS + self.sampling_params = shared_settings.SAMPLING_PARAMS @torch.inference_mode() def generate(self, messages: list[str] | list[dict], sampling_params=None, seed=None): @@ -45,7 +45,7 @@ def generate(self, messages: list[str] | list[dict], sampling_params=None, seed= add_generation_prompt=True, return_tensors="pt", return_dict=True, - ).to(settings.shared_settings.NEURON_DEVICE) + ).to(shared_settings.NEURON_DEVICE) params = sampling_params if sampling_params else self.sampling_params filtered_params = {k: v for k, v in params.items() if k in self.valid_generation_params} diff --git a/shared/loop_runner.py b/shared/loop_runner.py index 06348d233..cae96ea82 100644 --- a/shared/loop_runner.py +++ b/shared/loop_runner.py @@ -87,8 +87,8 @@ async def run_loop(self): next_run = await self.wait_for_next_execution(last_run_time) # logger.debug("Wait ended") try: - await self.run_step() # run_results = await self.run_step() - #logger.debug(f"Run_results: {run_results}") + await self.run_step() # run_results = await self.run_step() + # logger.debug(f"Run_results: {run_results}") self.step += 1 # logger.debug(f"{self.name}: Step {self.step} completed at {next_run}") except Exception as ex: diff --git a/shared/settings.py b/shared/settings.py index 8110a2deb..627bf1f3f 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -84,11 +84,11 @@ class SharedSettings(BaseSettings): HF_TOKEN: Optional[str] = Field(None, env="HF_TOKEN") DEPLOY_VALIDATOR: bool = Field(True, env="DEPLOY_VALDITAOR") DEPLOY_SCORING_API: bool = Field(True, env="DEPLOY_SCORING_API") - SCORING_API_PORT: int = Field(8095, env = "SCORING_API_PORT") + SCORING_API_PORT: int = Field(8095, env="SCORING_API_PORT") # ==== API ===== # Hotkey used to run api, defaults to Macrocosmos - API_HOTKEY: str = Field("5Cg5QgjMfRqBC6bh8X4PDbQi7UzVRn9eyWXsB8gkyfppFPPy", env="API_HOTKEY") + API_HOTKEY: str = Field("5F4tQyWrhfGVcNhoqeiNsR6KjD4wMZ2kfhLj4oHYuyHbZAc3", env="API_HOTKEY") # Scoring request rate limit in seconds. SCORING_RATE_LIMIT_SEC: float = Field(1, env="SCORING_RATE_LIMIT_SEC") # Scoring queue threshold when rate-limit start to kick in, used to query validator API with scoring requests. diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index 24a2eea13..48f9a80ec 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -37,10 +37,12 @@ async def completions(request: Request, api_key: str = Depends(validate_api_key) try: body = await request.json() body["seed"] = int(body.get("seed") or random.randint(0, 1000000)) + logger.debug(f"Received UIDs: {body.get('uids')}") uids = body.get("uids") or filter_available_uids(task=body.get("task"), model=body.get("model")) if not uids: raise HTTPException(status_code=500, detail="No available miners") uids = random.sample(uids, min(len(uids), N_MINERS)) + logger.debug(f"Sampled UIDs: {uids}") # Choose between regular completion and mixture of miners. if body.get("test_time_inference", False): diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 872c1e181..430d31354 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -2,14 +2,18 @@ import datetime from collections import deque from typing import Any +import time +import uuid +from shared.epistula import create_header_hook import httpx from loguru import logger from pydantic import BaseModel from shared import settings from shared.loop_runner import AsyncLoopRunner -from validator_api.validator_forwarding import ValidatorRegistry +from validator_api.validator_forwarding import ValidatorRegistry + validator_registry = ValidatorRegistry() shared_settings = settings.shared_settings @@ -56,23 +60,33 @@ async def run_step(self): logger.exception(f"Could not find available validator scoring endpoint: {e}") try: timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) + # Add required headers for signature verification + + logger.debug(f"Forwarding payload to {url}.\n\nPAYLOAD: {payload}") async with httpx.AsyncClient(timeout=timeout) as client: response = await client.post( url=url, json=payload, - headers={"Content-Type": "application/json"}, + # headers=headers, + event_hooks={ + "request": [ + create_header_hook(shared_settings.WALLET.hotkey, vali_hotkey) + ] + }, ) validator_registry.update_validators(uid=vali_uid, response_code=response.status_code) if response.status_code != 200: # Raise an exception so that the retry logic in the except block handles it. - raise Exception(f"Non-200 response: {response.status_code} for uids {uids}") + raise Exception(f"Non-200 response: {response.status_code} for validator {vali_uid}") logger.info(f"Forwarding response completed with status {response.status_code}") except Exception as e: if scoring_payload.retries < self.max_scoring_retries: scoring_payload.retries += 1 async with self._scoring_lock: self._scoring_queue.appendleft(scoring_payload) - logger.error(f"Tried to forward response to {url} with payload {payload}. Queued for retry") + logger.error( + f"Tried to forward response to {url} with payload {payload}. Exception: {e}. Queued for retry" + ) else: logger.exception(f"Error while forwarding response after {scoring_payload.retries} retries: {e}") diff --git a/validator_api/utils.py b/validator_api/utils.py index 3527bfe03..d953941b9 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -1,13 +1,13 @@ import requests from loguru import logger -from shared.epistula import create_header_hook +from shared import settings from shared.loop_runner import AsyncLoopRunner from shared.uids import get_uids -from shared import settings class UpdateMinerAvailabilitiesForAPI(AsyncLoopRunner): + interval: int = 300 miner_availabilities: dict[int, dict] = {} async def run_step(self): diff --git a/validator_api/validator_forwarding.py b/validator_api/validator_forwarding.py index ce33182ae..bc43a47b4 100644 --- a/validator_api/validator_forwarding.py +++ b/validator_api/validator_forwarding.py @@ -1,9 +1,9 @@ import random import time -from loguru import logger from typing import ClassVar, List, Optional, Tuple import numpy as np +from loguru import logger from pydantic import BaseModel, Field, model_validator from shared.settings import shared_settings @@ -17,7 +17,6 @@ class Validator(BaseModel): timeout: int = 1 # starting cooldown in seconds; doubles on failure (capped at 86400) available_at: float = 0.0 # Unix timestamp indicating when the validator is next available - def update_failure(self, status_code: int) -> int: """ Update the validator's failure count based on the operation status. @@ -32,7 +31,6 @@ def update_failure(self, status_code: int) -> int: self.timeout = 1 self.available_at = current_time else: - self.failures += 1 self.timeout = min(self.timeout * 4, 86400) self.available_at = current_time + self.timeout @@ -82,15 +80,15 @@ def get_available_axon(self) -> Optional[Tuple[int, List[str], str]]: return None for _ in range(self.max_retries): validator_list = self.get_available_validators() - if validator_list: + if validator_list: break - else: + else: time.sleep(5) - if not validator_list: + if not validator_list: logger.error(f"Could not find available validator after {self.max_retries}") return None weights = [self.validators[uid].stake for uid in validator_list] - chosen = self.validators[random.choices(validator_list, weights=weights, k=1)[0]] + chosen = self.validators[5] # self.validators[random.choices(validator_list, weights=weights, k=1)[0]] return chosen.uid, chosen.axon, chosen.hotkey def update_validators(self, uid: int, response_code: int) -> None: @@ -101,4 +99,3 @@ def update_validators(self, uid: int, response_code: int) -> None: """ if uid in self.validators: self.validators[uid].update_failure(response_code) - From bfefda3b59a99fc1e6f1fd3096e51af315f18c1e Mon Sep 17 00:00:00 2001 From: richwardle Date: Wed, 12 Feb 2025 16:58:39 +0000 Subject: [PATCH 20/28] Linting --- prompting/api/scoring/api.py | 3 +-- validator_api/scoring_queue.py | 14 +++++--------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index 8ad313a9a..4c9548b9a 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -1,11 +1,10 @@ +import time import uuid from typing import Any -import time from fastapi import APIRouter, Depends, HTTPException, Request from loguru import logger - from prompting.datasets.random_website import DDGDatasetEntry from prompting.llms.model_zoo import ModelZoo from prompting.tasks.inference import InferenceTask diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 430d31354..4d17a9fd9 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -2,15 +2,13 @@ import datetime from collections import deque from typing import Any -import time -import uuid -from shared.epistula import create_header_hook import httpx from loguru import logger from pydantic import BaseModel from shared import settings +from shared.epistula import create_header_hook from shared.loop_runner import AsyncLoopRunner from validator_api.validator_forwarding import ValidatorRegistry @@ -63,16 +61,14 @@ async def run_step(self): # Add required headers for signature verification logger.debug(f"Forwarding payload to {url}.\n\nPAYLOAD: {payload}") - async with httpx.AsyncClient(timeout=timeout) as client: + async with httpx.AsyncClient( + timeout=timeout, + event_hooks={"request": [create_header_hook(shared_settings.WALLET.hotkey, vali_hotkey)]}, + ) as client: response = await client.post( url=url, json=payload, # headers=headers, - event_hooks={ - "request": [ - create_header_hook(shared_settings.WALLET.hotkey, vali_hotkey) - ] - }, ) validator_registry.update_validators(uid=vali_uid, response_code=response.status_code) if response.status_code != 200: From a10148cd41f6c7582aa34dd7aa49bb529639b091 Mon Sep 17 00:00:00 2001 From: richwardle Date: Wed, 12 Feb 2025 18:24:11 +0000 Subject: [PATCH 21/28] Clean up scoring --- prompting/api/scoring/api.py | 40 +++++++++++++++++++--------------- validator_api/scoring_queue.py | 6 ++--- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index 4c9548b9a..311ffe24a 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -18,7 +18,7 @@ router = APIRouter() -async def verify_scoring_signature(self, request: Request): +async def verify_scoring_signature(request: Request): signed_by = request.headers.get("Epistula-Signed-By") signed_for = request.headers.get("Epistula-Signed-For") if signed_for != shared_settings.WALLET.hotkey.ss58_address: @@ -55,40 +55,44 @@ def get_task_scorer(request: Request): async def score_response( request: Request, api_key_data: dict = Depends(verify_scoring_signature), task_scorer=Depends(get_task_scorer) ): + logger.debug("Scoring Request received!!!!!!!!!!!!!!!!") model = None + logger.debug("Setted Model to None") payload: dict[str, Any] = await request.json() + logger.debug(f"Awaited body: {payload}") body = payload.get("body") - timeout = payload.get("timeout", settings.shared_settings.NEURON_TIMEOUT) - uids = payload.get("uid", []) + timeout = payload.get("timeout", shared_settings.NEURON_TIMEOUT) + uids = payload.get("uids", []) chunks = payload.get("chunks", {}) + logger.debug("About to check chunks and uids") if not uids or not chunks: logger.error(f"Either uids: {uids} or chunks: {chunks} is not valid, skipping scoring") return uids = [int(uid) for uid in uids] model = body.get("model") - if model: - try: - llm_model = ModelZoo.get_model_by_id(model) - except Exception: - logger.warning( - f"Organic request with model {body.get('model')} made but the model cannot be found in model zoo. Skipping scoring." - ) + logger.debug("About to check model") + if model and model != shared_settings.LLM_MODEL: + logger.error(f"Model {model} not available for scoring on this validator.") return - else: - llm_model = None + logger.debug("Model has been checked") + llm_model = ModelZoo.get_model_by_id(model) + logger.debug("Got LLM Model from ModelZoo") task_name = body.get("task") + logger.debug(f"Task name set: {task_name}") + logger.debug(f"Length pre-insertion: {len(task_scorer.scoring_queue)}") if task_name == "InferenceTask": logger.info(f"Received Organic InferenceTask with body: {body}") logger.info(f"With model of type {type(body.get('model'))}") organic_task = InferenceTask( messages=body.get("messages"), llm_model=llm_model, - llm_model_id=body.get("model"), + llm_model_id=llm_model, seed=int(body.get("seed", 0)), - sampling_params=body.get("sampling_parameters", settings.shared_settings.SAMPLING_PARAMS), + sampling_params=body.get("sampling_parameters", shared_settings.SAMPLING_PARAMS), query=body.get("messages"), ) logger.info(f"Task created: {organic_task}") + task_scorer.add_to_queue( task=organic_task, response=DendriteResponseEvent( @@ -97,10 +101,11 @@ async def score_response( timeout=timeout, ), dataset_entry=DatasetEntry(), - block=settings.shared_settings.METAGRAPH.block, + block=shared_settings.METAGRAPH.block, step=-1, task_id=str(uuid.uuid4()), ) + elif task_name == "WebRetrievalTask": logger.info(f"Received Organic WebRetrievalTask with body: {body}") try: @@ -119,11 +124,12 @@ async def score_response( response=DendriteResponseEvent( uids=uids, stream_results=[SynapseStreamResult(accumulated_chunks=chunks.get(str(uid), [])) for uid in uids], - timeout=body.get("timeout", settings.shared_settings.NEURON_TIMEOUT), + timeout=body.get("timeout", shared_settings.NEURON_TIMEOUT), ), dataset_entry=DDGDatasetEntry(search_term=search_term), - block=settings.shared_settings.METAGRAPH.block, + block=shared_settings.METAGRAPH.block, step=-1, task_id=str(uuid.uuid4()), ) + logger.debug(f"Length post-insertion: {len(task_scorer.scoring_queue)}") logger.info("Organic task appended to scoring queue") diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 4d17a9fd9..736072928 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -48,7 +48,7 @@ async def run_step(self): scoring_payload = self._scoring_queue.popleft() payload = scoring_payload.payload - uids = payload["uid"] + uids = payload["uids"] logger.info(f"Received new organic for scoring, uids: {uids}") try: vali_uid, vali_axon, vali_hotkey = validator_registry.get_available_axon() @@ -95,8 +95,8 @@ async def append_response(self, uids: list[int], body: dict[str, Any], chunks: l return uids = [int(u) for u in uids] - chunk_dict = {u: c for u, c in zip(uids, chunks)} - payload = {"body": body, "chunks": chunk_dict, "uid": uids} + chunk_dict = {str(u): c for u, c in zip(uids, chunks)} + payload = {"body": body, "chunks": chunk_dict, "uids": uids} scoring_item = ScoringPayload(payload=payload) async with self._scoring_lock: From b4f7f75153c9e4788be0bf531cb9880d6d414482 Mon Sep 17 00:00:00 2001 From: richwardle Date: Thu, 13 Feb 2025 07:46:08 +0000 Subject: [PATCH 22/28] Linting and default mock settings to cuda --- prompting/api/scoring/api.py | 2 +- shared/settings.py | 2 +- validator_api/utils.py | 2 +- validator_api/validator_forwarding.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index 311ffe24a..2ea2548a7 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -105,7 +105,7 @@ async def score_response( step=-1, task_id=str(uuid.uuid4()), ) - + elif task_name == "WebRetrievalTask": logger.info(f"Received Organic WebRetrievalTask with body: {body}") try: diff --git a/shared/settings.py b/shared/settings.py index 1ac42f99d..bb1aa16e3 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -233,7 +233,7 @@ def complete_settings(cls, values: dict[str, Any]) -> dict[str, Any]: values["TEST_MINER_IDS"] = str(values["TEST_MINER_IDS"]).split(",") if mode == "mock": values["MOCK"] = True - values["NEURON_DEVICE"] = "cpu" + # values["NEURON_DEVICE"] = "cpu" logger.info("Running in mock mode. Bittensor objects will not be initialized.") return values diff --git a/validator_api/utils.py b/validator_api/utils.py index 430e00ce0..3f774f739 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -11,7 +11,7 @@ class UpdateMinerAvailabilitiesForAPI(AsyncLoopRunner): miner_availabilities: dict[int, dict] = {} async def run_step(self): - if shared_settings.API_TEST_MODE: + if settings.shared_settings.API_TEST_MODE: return try: response = requests.post( diff --git a/validator_api/validator_forwarding.py b/validator_api/validator_forwarding.py index bc43a47b4..3bd0d4ba7 100644 --- a/validator_api/validator_forwarding.py +++ b/validator_api/validator_forwarding.py @@ -88,7 +88,7 @@ def get_available_axon(self) -> Optional[Tuple[int, List[str], str]]: logger.error(f"Could not find available validator after {self.max_retries}") return None weights = [self.validators[uid].stake for uid in validator_list] - chosen = self.validators[5] # self.validators[random.choices(validator_list, weights=weights, k=1)[0]] + chosen = self.validators[random.choices(validator_list, weights=weights, k=1)[0]] return chosen.uid, chosen.axon, chosen.hotkey def update_validators(self, uid: int, response_code: int) -> None: From 2ab4f3e52226b2906d2bf9b7bdb3d14baa8cd14f Mon Sep 17 00:00:00 2001 From: richwardle Date: Thu, 13 Feb 2025 08:18:57 +0000 Subject: [PATCH 23/28] Forward timings for scoring --- prompting/api/scoring/api.py | 2 ++ validator_api/chat_completion.py | 35 +++++++++++++++++++++++++++----- validator_api/scoring_queue.py | 7 +++++-- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index 2ea2548a7..3d778a213 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -64,6 +64,7 @@ async def score_response( timeout = payload.get("timeout", shared_settings.NEURON_TIMEOUT) uids = payload.get("uids", []) chunks = payload.get("chunks", {}) + timings = payload.get("timings", {}) logger.debug("About to check chunks and uids") if not uids or not chunks: logger.error(f"Either uids: {uids} or chunks: {chunks} is not valid, skipping scoring") @@ -99,6 +100,7 @@ async def score_response( uids=uids, stream_results=[SynapseStreamResult(accumulated_chunks=chunks.get(str(uid), None)) for uid in uids], timeout=timeout, + stream_results_all_chunks_timings=[timings.get(str(uid), None) for uid in uids], ), dataset_entry=DatasetEntry(), block=shared_settings.METAGRAPH.block, diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index b44c2b769..fed73e015 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -2,6 +2,7 @@ import json import math import random +import time from typing import Any, AsyncGenerator, Callable, List, Optional from fastapi import HTTPException @@ -86,9 +87,14 @@ async def reconstructed_response() -> AsyncGenerator: async def stream_from_first_response( - responses: List[asyncio.Task], collected_chunks_list: List[List[str]], body: dict[str, any], uids: List[int] + responses: List[asyncio.Task], + collected_chunks_list: List[List[str]], + body: dict[str, any], + uids: List[int], + timings_list: List[List[float]], ) -> AsyncGenerator[str, None]: first_valid_response = None + response_start_time = time.monotonic() try: # Keep looping until we find a valid response or run out of tasks while responses and first_valid_response is None: @@ -130,6 +136,7 @@ async def stream_from_first_response( continue chunks_received = True + timings_list[0].append(time.monotonic() - response_start_time) collected_chunks_list[0].append(content) yield f"data: {json.dumps(chunk.model_dump())}\n\n" @@ -141,10 +148,21 @@ async def stream_from_first_response( # Continue collecting remaining responses in background for scoring remaining = asyncio.gather(*pending, return_exceptions=True) - remaining_tasks = asyncio.create_task(collect_remaining_responses(remaining, collected_chunks_list, body, uids)) + remaining_tasks = asyncio.create_task( + collect_remaining_responses( + remainging=remaining, + collected_chunks_list=collected_chunks_list, + body=body, + uids=uids, + timings_list=timings_list, + response_start_time=response_start_time, + ) + ) await remaining_tasks asyncio.create_task( - scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=collected_chunks_list) + scoring_queue.scoring_queue.append_response( + uids=uids, body=body, chunks=collected_chunks_list, timings=timings_list + ) ) except asyncio.CancelledError: @@ -158,7 +176,12 @@ async def stream_from_first_response( async def collect_remaining_responses( - remaining: asyncio.Task, collected_chunks_list: List[List[str]], body: dict[str, any], uids: List[int] + remaining: asyncio.Task, + collected_chunks_list: List[List[str]], + body: dict[str, any], + uids: List[int], + timings_list: List[List[float]], + response_start_time: float, ): """Collect remaining responses for scoring without blocking the main response.""" try: @@ -175,6 +198,7 @@ async def collect_remaining_responses( content = getattr(chunk.choices[0].delta, "content", None) if content is None: continue + timings_list[i + 1].append(time.monotonic() - response_start_time) collected_chunks_list[i + 1].append(content) except Exception as e: @@ -213,6 +237,7 @@ async def chat_completion( # Initialize chunks collection for each miner collected_chunks_list = [[] for _ in selected_uids] + timings_list = [[] for _ in selected_uids] timeout_seconds = max( 30, max(0, math.floor(math.log2(body["sampling_parameters"].get("max_new_tokens", 256) / 256))) * 10 + 30 @@ -229,7 +254,7 @@ async def chat_completion( ] return StreamingResponse( - stream_from_first_response(response_tasks, collected_chunks_list, body, selected_uids), + stream_from_first_response(response_tasks, collected_chunks_list, body, selected_uids, timings_list), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 736072928..d558a2405 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -86,7 +86,9 @@ async def run_step(self): else: logger.exception(f"Error while forwarding response after {scoring_payload.retries} retries: {e}") - async def append_response(self, uids: list[int], body: dict[str, Any], chunks: list[list[str]]): + async def append_response( + self, uids: list[int], body: dict[str, Any], chunks: list[list[str]], timings: list[list[float]] | None = None + ): if not shared_settings.SCORE_ORGANICS: return @@ -96,7 +98,8 @@ async def append_response(self, uids: list[int], body: dict[str, Any], chunks: l uids = [int(u) for u in uids] chunk_dict = {str(u): c for u, c in zip(uids, chunks)} - payload = {"body": body, "chunks": chunk_dict, "uids": uids} + timing_dict = {str(u): t for u, t in zip(uids, timings)} + payload = {"body": body, "chunks": chunk_dict, "uids": uids, "timings": timing_dict} scoring_item = ScoringPayload(payload=payload) async with self._scoring_lock: From 23e91e10c4d7013d12c377ea22abd6becdc52623 Mon Sep 17 00:00:00 2001 From: richwardle Date: Thu, 13 Feb 2025 08:20:47 +0000 Subject: [PATCH 24/28] Default to empty list for timings for web retrieval --- validator_api/scoring_queue.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index d558a2405..fa32e950e 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -98,7 +98,10 @@ async def append_response( uids = [int(u) for u in uids] chunk_dict = {str(u): c for u, c in zip(uids, chunks)} - timing_dict = {str(u): t for u, t in zip(uids, timings)} + if timings: + timing_dict = {str(u): t for u, t in zip(uids, timings)} + else: + timing_dict = {} payload = {"body": body, "chunks": chunk_dict, "uids": uids, "timings": timing_dict} scoring_item = ScoringPayload(payload=payload) From 1e5271c2bc076a5555aa21f2625a514addf31512 Mon Sep 17 00:00:00 2001 From: richwardle Date: Thu, 13 Feb 2025 08:53:06 +0000 Subject: [PATCH 25/28] Final Fixes/ Revert Settings --- prompting/api/scoring/api.py | 1 + prompting/rewards/scoring.py | 27 ++++++++++++++------------- prompting/tasks/base_task.py | 2 ++ shared/settings.py | 2 +- validator_api/chat_completion.py | 2 +- 5 files changed, 19 insertions(+), 15 deletions(-) diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index 3d778a213..055e11cd5 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -91,6 +91,7 @@ async def score_response( seed=int(body.get("seed", 0)), sampling_params=body.get("sampling_parameters", shared_settings.SAMPLING_PARAMS), query=body.get("messages"), + organic = True, ) logger.info(f"Task created: {organic_task}") diff --git a/prompting/rewards/scoring.py b/prompting/rewards/scoring.py index 2ae481434..bf562f076 100644 --- a/prompting/rewards/scoring.py +++ b/prompting/rewards/scoring.py @@ -94,20 +94,21 @@ async def run_step(self) -> RewardLoggingEvent: f"Scored {scoring_config.task.__class__.__name__} {scoring_config.task.task_id} with model " f"{scoring_config.task.llm_model_id}" ) - log_event( - RewardLoggingEvent( - response_event=scoring_config.response, - reward_events=reward_events, - reference=scoring_config.task.reference, - challenge=scoring_config.task.query, - task=scoring_config.task.name, - block=scoring_config.block, - step=scoring_config.step, - task_id=scoring_config.task_id, - task_dict=scoring_config.task.model_dump(), - source=scoring_config.dataset_entry.source, + if not scoring_config.task.organic: + log_event( + RewardLoggingEvent( + response_event=scoring_config.response, + reward_events=reward_events, + reference=scoring_config.task.reference, + challenge=scoring_config.task.query, + task=scoring_config.task.name, + block=scoring_config.block, + step=scoring_config.step, + task_id=scoring_config.task_id, + task_dict=scoring_config.task.model_dump(), + source=scoring_config.dataset_entry.source, + ) ) - ) await asyncio.sleep(0.01) diff --git a/prompting/tasks/base_task.py b/prompting/tasks/base_task.py index ed202673d..54e4dda7a 100644 --- a/prompting/tasks/base_task.py +++ b/prompting/tasks/base_task.py @@ -33,6 +33,7 @@ class BaseTask(BaseModel, ABC): query: Any = None reference: Any = None task_id: str = Field(default_factory=lambda: str(uuid4()), allow_mutation=False) + organic: bool = False model_config = ConfigDict(arbitrary_types_allowed=True) @@ -60,6 +61,7 @@ class BaseTextTask(BaseTask): sampling_params: dict[str, float] = settings.shared_settings.SAMPLING_PARAMS timeout: int = settings.shared_settings.NEURON_TIMEOUT max_tokens: int = settings.shared_settings.NEURON_MAX_TOKENS + organic: bool = True @property def task_messages(self) -> list[str] | list[dict]: diff --git a/shared/settings.py b/shared/settings.py index bb1aa16e3..1ac42f99d 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -233,7 +233,7 @@ def complete_settings(cls, values: dict[str, Any]) -> dict[str, Any]: values["TEST_MINER_IDS"] = str(values["TEST_MINER_IDS"]).split(",") if mode == "mock": values["MOCK"] = True - # values["NEURON_DEVICE"] = "cpu" + values["NEURON_DEVICE"] = "cpu" logger.info("Running in mock mode. Bittensor objects will not be initialized.") return values diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index fed73e015..6b0b48607 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -150,7 +150,7 @@ async def stream_from_first_response( remaining = asyncio.gather(*pending, return_exceptions=True) remaining_tasks = asyncio.create_task( collect_remaining_responses( - remainging=remaining, + remaining=remaining, collected_chunks_list=collected_chunks_list, body=body, uids=uids, From a1ff7193efa22db46b53fc96aa6d5e62eb99edd6 Mon Sep 17 00:00:00 2001 From: richwardle Date: Fri, 14 Feb 2025 15:46:48 +0000 Subject: [PATCH 26/28] Linting --- prompting/api/scoring/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index 055e11cd5..631e21420 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -91,7 +91,7 @@ async def score_response( seed=int(body.get("seed", 0)), sampling_params=body.get("sampling_parameters", shared_settings.SAMPLING_PARAMS), query=body.get("messages"), - organic = True, + organic=True, ) logger.info(f"Task created: {organic_task}") From b3e99b0f0cb0a0c287fd67fb4c00a5b0b21a7d1c Mon Sep 17 00:00:00 2001 From: richwardle Date: Sat, 15 Feb 2025 11:51:06 +0000 Subject: [PATCH 27/28] Linting --- data/top100k_domains.csv | 2 +- validator_api/chat_completion.py | 2 +- validator_api/scoring_queue.py | 2 -- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/data/top100k_domains.csv b/data/top100k_domains.csv index bc1a6ebd9..8bfa68d9a 100644 --- a/data/top100k_domains.csv +++ b/data/top100k_domains.csv @@ -99997,4 +99997,4 @@ "99996","tankspotter.com","4.51" "99997","targetshootingapp.com","4.51" "99998","tastytalegame.com","4.51" -"99999","tbscan.com","4.51" \ No newline at end of file +"99999","tbscan.com","4.51" diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index 4e2857322..c49b5ce9f 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -237,7 +237,7 @@ async def chat_completion( STREAM = body.get("stream", False) # Initialize chunks collection for each miner -sels] + collected_chunks_list = [[] for _ in uids] timings_list = [[] for _ in uids] timeout_seconds = max( diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 7a81b7e21..fa32e950e 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -16,8 +16,6 @@ shared_settings = settings.shared_settings -from shared.loop_runner import AsyncLoopRunner - class ScoringPayload(BaseModel): payload: dict[str, Any] From 5b2145cbdd6c7da69f948ea5bdb4a130d3714dd8 Mon Sep 17 00:00:00 2001 From: richwardle Date: Sun, 16 Feb 2025 14:39:47 +0000 Subject: [PATCH 28/28] Initialize settings to validator if cuda is available --- shared/misc.py | 12 ++++++++++++ shared/settings.py | 8 +++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/shared/misc.py b/shared/misc.py index 858765b7c..5cd0c25b3 100644 --- a/shared/misc.py +++ b/shared/misc.py @@ -1,4 +1,5 @@ import asyncio +import subprocess import time import traceback from functools import lru_cache, update_wrapper @@ -169,3 +170,14 @@ def wrapper(self): return wrapper return decorator + + +def is_cuda_available(): + try: + # Run nvidia-smi to list available GPUs + result = subprocess.run( + ["nvidia-smi", "-L"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, check=True + ) + return "GPU" in result.stdout + except (subprocess.CalledProcessError, FileNotFoundError): + return False diff --git a/shared/settings.py b/shared/settings.py index 12c3e3f82..52e605ec3 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -18,7 +18,7 @@ from pydantic import Field, model_validator from pydantic_settings import BaseSettings -from shared.misc import cached_property_with_expiration +from shared.misc import cached_property_with_expiration, is_cuda_available logging.getLogger("requests").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) @@ -274,9 +274,11 @@ def DENDRITE(self) -> bt.dendrite: return bt.dendrite(wallet=self.WALLET) -shared_settings: Optional[SharedSettings] = None try: - shared_settings = SharedSettings.load(mode="mock") + if is_cuda_available(): + shared_settings = SharedSettings.load(mode="validator") + else: + shared_settings = SharedSettings.load(mode="mock") pass except Exception as e: logger.exception(f"Error loading settings: {e}")