diff --git a/.env.api.example b/.env.api.example index c9c7c37ff..a2f8fdc2b 100644 --- a/.env.api.example +++ b/.env.api.example @@ -1,6 +1,5 @@ API_PORT = "42170" # Port for the API server API_HOST = "0.0.0.0" # Host for the API server SCORING_KEY = "123" # The scoring key for the validator (must match the scoring key in the .env.validator file) -SCORE_ORGANICS = True # Whether to score organics VALIDATOR_API = "0.0.0.0:8094" # The validator API to forward responses to for scoring WORKERS=4 diff --git a/.env.validator.example b/.env.validator.example index 0e1bee89c..0b34af5be 100644 --- a/.env.validator.example +++ b/.env.validator.example @@ -26,7 +26,6 @@ HF_TOKEN = "your_huggingface_token_here" # Scoring API (optional). DEPLOY_SCORING_API = true -SCORING_ADMIN_KEY = "123456" SCORING_API_PORT = 8094 # Scoring key must match the scoring key in the .env.api. # SCORING_KEY="..." diff --git a/data/top100k_domains.csv b/data/top100k_domains.csv index bc1a6ebd9..8bfa68d9a 100644 --- a/data/top100k_domains.csv +++ b/data/top100k_domains.csv @@ -99997,4 +99997,4 @@ "99996","tankspotter.com","4.51" "99997","targetshootingapp.com","4.51" "99998","tastytalegame.com","4.51" -"99999","tbscan.com","4.51" \ No newline at end of file +"99999","tbscan.com","4.51" diff --git a/neurons/validator.py b/neurons/validator.py index dbd229004..da2fd6146 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -4,8 +4,13 @@ import time import loguru +import netaddr +import requests import torch import wandb +from bittensor.core.extrinsics.serving import serve_extrinsic + +from prompting.rewards.scoring import task_scorer # ruff: noqa: E402 from shared import settings @@ -34,7 +39,6 @@ async def spawn_loops(task_queue, scoring_queue, reward_events): # ruff: noqa: E402 from prompting.llms.model_manager import model_scheduler from prompting.miner_availability.miner_availability import availability_checking_loop - from prompting.rewards.scoring import task_scorer from prompting.tasks.task_creation import task_loop from prompting.tasks.task_sending import task_sender from prompting.weight_setting.weight_setter import weight_setter @@ -87,10 +91,25 @@ async def start(): # TODO: We should not use 2 availability loops for each process, in reality # we should only be sharing the miner availability data between processes. from prompting.miner_availability.miner_availability import availability_checking_loop - from prompting.rewards.scoring import task_scorer asyncio.create_task(availability_checking_loop.start()) + try: + external_ip = requests.get("https://checkip.amazonaws.com").text.strip() + netaddr.IPAddress(external_ip) + + serve_success = serve_extrinsic( + subtensor=settings.shared_settings.SUBTENSOR, + wallet=settings.shared_settings.WALLET, + ip=external_ip, + port=settings.shared_settings.SCORING_API_PORT, + protocol=4, + netuid=settings.shared_settings.NETUID, + ) + + logger.debug(f"Serve success: {serve_success}") + except Exception as e: + logger.warning(f"Failed to serve scoring api to chain: {e}") await start_scoring_api(task_scorer, scoring_queue, reward_events) while True: diff --git a/prompting/api/api.py b/prompting/api/api.py index 80fdd6841..cd40d8621 100644 --- a/prompting/api/api.py +++ b/prompting/api/api.py @@ -4,6 +4,8 @@ from prompting.api.miner_availabilities.api import router as miner_availabilities_router from prompting.api.scoring.api import router as scoring_router + +# from prompting.rewards.scoring import task_scorer from shared import settings app = FastAPI() diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index d52ba74bc..631e21420 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -1,7 +1,8 @@ +import time import uuid from typing import Any -from fastapi import APIRouter, Depends, Header, HTTPException, Request +from fastapi import APIRouter, Depends, HTTPException, Request from loguru import logger from prompting.datasets.random_website import DDGDatasetEntry @@ -11,13 +12,38 @@ from shared import settings from shared.base import DatasetEntry from shared.dendrite import DendriteResponseEvent -from shared.epistula import SynapseStreamResult +from shared.epistula import SynapseStreamResult, verify_signature +from shared.settings import shared_settings router = APIRouter() -def validate_scoring_key(api_key: str = Header(...)): - if api_key != settings.shared_settings.SCORING_KEY: +async def verify_scoring_signature(request: Request): + signed_by = request.headers.get("Epistula-Signed-By") + signed_for = request.headers.get("Epistula-Signed-For") + if signed_for != shared_settings.WALLET.hotkey.ss58_address: + raise HTTPException(status_code=400, detail="Bad Request, message is not intended for self") + if signed_by != shared_settings.API_HOTKEY: + raise HTTPException(status_code=401, detail="Signer not the expected ss58 address") + + body = await request.body() + now = time.time() + err = verify_signature( + request.headers.get("Epistula-Request-Signature"), + body, + request.headers.get("Epistula-Timestamp"), + request.headers.get("Epistula-Uuid"), + signed_for, + signed_by, + now, + ) + if err: + logger.error(err) + raise HTTPException(status_code=400, detail=err) + + +def validate_scoring_key(request: Request): + if request.headers.api_key != settings.shared_settings.SCORING_KEY: raise HTTPException(status_code=403, detail="Invalid API key") @@ -27,54 +53,62 @@ def get_task_scorer(request: Request): @router.post("/scoring") async def score_response( - request: Request, api_key_data: dict = Depends(validate_scoring_key), task_scorer=Depends(get_task_scorer) + request: Request, api_key_data: dict = Depends(verify_scoring_signature), task_scorer=Depends(get_task_scorer) ): + logger.debug("Scoring Request received!!!!!!!!!!!!!!!!") model = None + logger.debug("Setted Model to None") payload: dict[str, Any] = await request.json() + logger.debug(f"Awaited body: {payload}") body = payload.get("body") - timeout = payload.get("timeout", settings.shared_settings.NEURON_TIMEOUT) - uids = payload.get("uid", []) + timeout = payload.get("timeout", shared_settings.NEURON_TIMEOUT) + uids = payload.get("uids", []) chunks = payload.get("chunks", {}) + timings = payload.get("timings", {}) + logger.debug("About to check chunks and uids") if not uids or not chunks: logger.error(f"Either uids: {uids} or chunks: {chunks} is not valid, skipping scoring") return uids = [int(uid) for uid in uids] model = body.get("model") - if model: - try: - llm_model = ModelZoo.get_model_by_id(model) - except Exception: - logger.warning( - f"Organic request with model {body.get('model')} made but the model cannot be found in model zoo. Skipping scoring." - ) + logger.debug("About to check model") + if model and model != shared_settings.LLM_MODEL: + logger.error(f"Model {model} not available for scoring on this validator.") return - else: - llm_model = None + logger.debug("Model has been checked") + llm_model = ModelZoo.get_model_by_id(model) + logger.debug("Got LLM Model from ModelZoo") task_name = body.get("task") + logger.debug(f"Task name set: {task_name}") + logger.debug(f"Length pre-insertion: {len(task_scorer.scoring_queue)}") if task_name == "InferenceTask": logger.info(f"Received Organic InferenceTask with body: {body}") logger.info(f"With model of type {type(body.get('model'))}") organic_task = InferenceTask( messages=body.get("messages"), llm_model=llm_model, - llm_model_id=body.get("model"), + llm_model_id=llm_model, seed=int(body.get("seed", 0)), - sampling_params=body.get("sampling_parameters", settings.shared_settings.SAMPLING_PARAMS), + sampling_params=body.get("sampling_parameters", shared_settings.SAMPLING_PARAMS), query=body.get("messages"), + organic=True, ) logger.info(f"Task created: {organic_task}") + task_scorer.add_to_queue( task=organic_task, response=DendriteResponseEvent( uids=uids, stream_results=[SynapseStreamResult(accumulated_chunks=chunks.get(str(uid), None)) for uid in uids], timeout=timeout, + stream_results_all_chunks_timings=[timings.get(str(uid), None) for uid in uids], ), dataset_entry=DatasetEntry(), - block=settings.shared_settings.METAGRAPH.block, + block=shared_settings.METAGRAPH.block, step=-1, task_id=str(uuid.uuid4()), ) + elif task_name == "WebRetrievalTask": logger.info(f"Received Organic WebRetrievalTask with body: {body}") try: @@ -91,15 +125,14 @@ async def score_response( query=search_term, ), response=DendriteResponseEvent( - uids=[uids], - stream_results=[ - SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None]) - ], - timeout=body.get("timeout", settings.shared_settings.NEURON_TIMEOUT), + uids=uids, + stream_results=[SynapseStreamResult(accumulated_chunks=chunks.get(str(uid), [])) for uid in uids], + timeout=body.get("timeout", shared_settings.NEURON_TIMEOUT), ), dataset_entry=DDGDatasetEntry(search_term=search_term), - block=settings.shared_settings.METAGRAPH.block, + block=shared_settings.METAGRAPH.block, step=-1, task_id=str(uuid.uuid4()), ) + logger.debug(f"Length post-insertion: {len(task_scorer.scoring_queue)}") logger.info("Organic task appended to scoring queue") diff --git a/prompting/llms/apis/sn19_wrapper.py b/prompting/llms/apis/sn19_wrapper.py index deecfe3d2..376cfe0a0 100644 --- a/prompting/llms/apis/sn19_wrapper.py +++ b/prompting/llms/apis/sn19_wrapper.py @@ -1,6 +1,7 @@ import json import requests +from loguru import logger from tenacity import retry, stop_after_attempt, wait_exponential from prompting.llms.apis.llm_messages import LLMMessages @@ -9,7 +10,6 @@ shared_settings = settings.shared_settings -# TODO: key error in response.json() when response is 500 @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) def chat_complete( messages: LLMMessages, @@ -38,6 +38,10 @@ def chat_complete( "logprobs": logprobs, } response = requests.post(url, headers=headers, data=json.dumps(data), timeout=30) + if not response.status_code == 200: + logger.error(f"SN19 API returned status code {response.status_code}") + logger.error(f"Response: {response.text}") + raise Exception(f"SN19 API returned status code {response.status_code}") response_json = response.json() try: return response_json["choices"][0]["message"].get("content") diff --git a/prompting/llms/hf_llm.py b/prompting/llms/hf_llm.py index 1b35b2ed8..7671a2e6c 100644 --- a/prompting/llms/hf_llm.py +++ b/prompting/llms/hf_llm.py @@ -5,8 +5,7 @@ from loguru import logger from transformers import AutoModelForCausalLM, AutoTokenizer, PreTrainedModel, pipeline -from shared import settings -from shared.timer import Timer +from shared.settings import shared_settings class ReproducibleHF: @@ -31,7 +30,7 @@ def __init__(self, model_id="hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4 self.llm = pipeline("text-generation", model=self.model, tokenizer=self.tokenizer) - self.sampling_params = settings.shared_settings.SAMPLING_PARAMS + self.sampling_params = shared_settings.SAMPLING_PARAMS @torch.inference_mode() def generate(self, messages: list[str] | list[dict], sampling_params=None, seed=None): @@ -46,23 +45,22 @@ def generate(self, messages: list[str] | list[dict], sampling_params=None, seed= add_generation_prompt=True, return_tensors="pt", return_dict=True, - ).to(settings.shared_settings.NEURON_DEVICE) + ).to(shared_settings.NEURON_DEVICE) params = sampling_params if sampling_params else self.sampling_params filtered_params = {k: v for k, v in params.items() if k in self.valid_generation_params} - with Timer(): - # Generate with optimized settings - outputs = self.model.generate( - **inputs.to(settings.shared_settings.NEURON_DEVICE), - **filtered_params, - eos_token_id=self.tokenizer.eos_token_id, - ) - - results = self.tokenizer.batch_decode( - outputs[:, inputs["input_ids"].shape[1] :], - skip_special_tokens=True, - )[0] + # Generate with optimized settings + outputs = self.model.generate( + **inputs.to(shared_settings.NEURON_DEVICE), + **filtered_params, + eos_token_id=self.tokenizer.eos_token_id, + ) + + results = self.tokenizer.batch_decode( + outputs[:, inputs["input_ids"].shape[1] :], + skip_special_tokens=True, + )[0] logger.debug( f"""{self.__class__.__name__} queried: diff --git a/prompting/rewards/scoring.py b/prompting/rewards/scoring.py index 2ae481434..bf562f076 100644 --- a/prompting/rewards/scoring.py +++ b/prompting/rewards/scoring.py @@ -94,20 +94,21 @@ async def run_step(self) -> RewardLoggingEvent: f"Scored {scoring_config.task.__class__.__name__} {scoring_config.task.task_id} with model " f"{scoring_config.task.llm_model_id}" ) - log_event( - RewardLoggingEvent( - response_event=scoring_config.response, - reward_events=reward_events, - reference=scoring_config.task.reference, - challenge=scoring_config.task.query, - task=scoring_config.task.name, - block=scoring_config.block, - step=scoring_config.step, - task_id=scoring_config.task_id, - task_dict=scoring_config.task.model_dump(), - source=scoring_config.dataset_entry.source, + if not scoring_config.task.organic: + log_event( + RewardLoggingEvent( + response_event=scoring_config.response, + reward_events=reward_events, + reference=scoring_config.task.reference, + challenge=scoring_config.task.query, + task=scoring_config.task.name, + block=scoring_config.block, + step=scoring_config.step, + task_id=scoring_config.task_id, + task_dict=scoring_config.task.model_dump(), + source=scoring_config.dataset_entry.source, + ) ) - ) await asyncio.sleep(0.01) diff --git a/prompting/tasks/base_task.py b/prompting/tasks/base_task.py index ed202673d..54e4dda7a 100644 --- a/prompting/tasks/base_task.py +++ b/prompting/tasks/base_task.py @@ -33,6 +33,7 @@ class BaseTask(BaseModel, ABC): query: Any = None reference: Any = None task_id: str = Field(default_factory=lambda: str(uuid4()), allow_mutation=False) + organic: bool = False model_config = ConfigDict(arbitrary_types_allowed=True) @@ -60,6 +61,7 @@ class BaseTextTask(BaseTask): sampling_params: dict[str, float] = settings.shared_settings.SAMPLING_PARAMS timeout: int = settings.shared_settings.NEURON_TIMEOUT max_tokens: int = settings.shared_settings.NEURON_MAX_TOKENS + organic: bool = True @property def task_messages(self) -> list[str] | list[dict]: diff --git a/shared/loop_runner.py b/shared/loop_runner.py index 43d380bd4..cae96ea82 100644 --- a/shared/loop_runner.py +++ b/shared/loop_runner.py @@ -33,7 +33,7 @@ async def get_time(self): """Get the current time from the time server with a timeout.""" if not self.sync: time = datetime.datetime.now(datetime.timezone.utc) - logger.debug(f"Time: {time}") + # logger.debug(f"Time: {time}") return time try: async with aiohttp.ClientSession() as session: @@ -64,13 +64,13 @@ async def wait_for_next_execution(self, last_run_time): next_run = self.next_sync_point(current_time) else: next_run = last_run_time + timedelta(seconds=self.interval) - logger.debug(f"Next run: {next_run}") + # logger.debug(f"Next run: {next_run}") wait_time = (next_run - current_time).total_seconds() if wait_time > 0: - logger.debug( - f"{self.name}: Waiting for {wait_time:.2f} seconds until next {'sync point' if self.sync else 'execution'}" - ) + # logger.debug( + # f"{self.name}: Waiting for {wait_time:.2f} seconds until next {'sync point' if self.sync else 'execution'}" + # ) await asyncio.sleep(wait_time) return next_run @@ -83,14 +83,14 @@ async def run_loop(self): try: while self.running: with profiler.measure(self.name): - logger.debug("Waiting...") + # logger.debug("Waiting...") next_run = await self.wait_for_next_execution(last_run_time) - logger.debug("Wait ended") + # logger.debug("Wait ended") try: - run_results = await self.run_step() - logger.debug(f"Run_results: {run_results}") + await self.run_step() # run_results = await self.run_step() + # logger.debug(f"Run_results: {run_results}") self.step += 1 - logger.debug(f"{self.name}: Step {self.step} completed at {next_run}") + # logger.debug(f"{self.name}: Step {self.step} completed at {next_run}") except Exception as ex: logger.exception(f"Error in loop iteration: {ex}") last_run_time = next_run diff --git a/shared/misc.py b/shared/misc.py index 858765b7c..5cd0c25b3 100644 --- a/shared/misc.py +++ b/shared/misc.py @@ -1,4 +1,5 @@ import asyncio +import subprocess import time import traceback from functools import lru_cache, update_wrapper @@ -169,3 +170,14 @@ def wrapper(self): return wrapper return decorator + + +def is_cuda_available(): + try: + # Run nvidia-smi to list available GPUs + result = subprocess.run( + ["nvidia-smi", "-L"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, check=True + ) + return "GPU" in result.stdout + except (subprocess.CalledProcessError, FileNotFoundError): + return False diff --git a/shared/settings.py b/shared/settings.py index 1486aa21d..52e605ec3 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -18,7 +18,7 @@ from pydantic import Field, model_validator from pydantic_settings import BaseSettings -from shared.misc import cached_property_with_expiration +from shared.misc import cached_property_with_expiration, is_cuda_available logging.getLogger("requests").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) @@ -83,10 +83,12 @@ class SharedSettings(BaseSettings): SCORING_QUEUE_LENGTH_THRESHOLD: int = Field(10, env="SCORING_QUEUE_LENGTH_THRESHOLD") HF_TOKEN: Optional[str] = Field(None, env="HF_TOKEN") DEPLOY_VALIDATOR: bool = Field(True, env="DEPLOY_VALDITAOR") + DEPLOY_SCORING_API: bool = Field(True, env="DEPLOY_SCORING_API") + SCORING_API_PORT: int = Field(8095, env="SCORING_API_PORT") # ==== API ===== - # API key used to access validator organic scoring mechanism (both .env.validator and .env.api). - SCORING_KEY: str | None = Field(None, env="SCORING_KEY") + # Hotkey used to run api, defaults to Macrocosmos + API_HOTKEY: str = Field("5F4tQyWrhfGVcNhoqeiNsR6KjD4wMZ2kfhLj4oHYuyHbZAc3", env="API_HOTKEY") # Scoring request rate limit in seconds. SCORING_RATE_LIMIT_SEC: float = Field(5, env="SCORING_RATE_LIMIT_SEC") # Scoring queue threshold when rate-limit start to kick in, used to query validator API with scoring requests. @@ -94,9 +96,6 @@ class SharedSettings(BaseSettings): API_TEST_MODE: bool = Field(False, env="API_TEST_MODE") # Validator scoring API (.env.validator). - DEPLOY_SCORING_API: bool = Field(False, env="DEPLOY_SCORING_API") - SCORING_API_PORT: int = Field(8094, env="SCORING_API_PORT") - SCORING_ADMIN_KEY: str | None = Field(None, env="SCORING_ADMIN_KEY") SCORE_ORGANICS: bool = Field(False, env="SCORE_ORGANICS") WORKERS: int = Field(1, env="WORKERS") @@ -104,7 +103,8 @@ class SharedSettings(BaseSettings): API_PORT: int = Field(8005, env="API_PORT") API_HOST: str = Field("0.0.0.0", env="API_HOST") # Validator scoring API address. - VALIDATOR_API: str = Field("0.0.0.0:8094", env="VALIDATOR_API") + # TODO: Choose this dynamically from the network + VALIDATOR_API: str = Field("184.105.5.17:8094", env="VALIDATOR_API") # Used for availability # Default SN1 API address DEFAULT_SN1_API: str = Field("http://sn1.api.macrocosmos.ai:11198/v1", env="DEFAULT_SN1_API") # File with keys used to access API. @@ -180,8 +180,6 @@ def validate_mode(cls, v): logger.warning( "No SCORING_KEY found in .env.api file. You must add a scoring key that will allow us to forward miner responses to the validator for scoring." ) - if not os.getenv("SCORE_ORGANICS"): - logger.warning("Not scoring organics. This means that miners may not respond as consistently.") elif v["mode"] == "miner": if not dotenv.load_dotenv(".env.miner"): logger.warning("No .env.miner file found. Please create one.") @@ -276,9 +274,11 @@ def DENDRITE(self) -> bt.dendrite: return bt.dendrite(wallet=self.WALLET) -shared_settings: Optional[SharedSettings] = None try: - shared_settings = SharedSettings.load(mode="mock") + if is_cuda_available(): + shared_settings = SharedSettings.load(mode="validator") + else: + shared_settings = SharedSettings.load(mode="mock") pass except Exception as e: logger.exception(f"Error loading settings: {e}") diff --git a/shared/uids.py b/shared/uids.py index 52fdd110a..8de376ebe 100644 --- a/shared/uids.py +++ b/shared/uids.py @@ -27,7 +27,7 @@ def check_uid_availability( metagraph = shared_settings.METAGRAPH # Filter non serving axons. if not metagraph.axons[uid].is_serving: - logger.debug(f"uid: {uid} is not serving") + # logger.debug(f"uid: {uid} is not serving") return False # Filter validator permit > 1024 stake. diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index 677dc1b09..c49b5ce9f 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -2,6 +2,7 @@ import json import math import random +import time from typing import Any, AsyncGenerator, Callable, List, Optional from fastapi import HTTPException @@ -89,9 +90,14 @@ async def reconstructed_response() -> AsyncGenerator: async def stream_from_first_response( - responses: List[asyncio.Task], collected_chunks_list: List[List[str]], body: dict[str, any], uids: List[int] + responses: List[asyncio.Task], + collected_chunks_list: List[List[str]], + body: dict[str, any], + uids: List[int], + timings_list: List[List[float]], ) -> AsyncGenerator[str, None]: first_valid_response = None + response_start_time = time.monotonic() try: # Keep looping until we find a valid response or run out of tasks while responses and first_valid_response is None: @@ -133,6 +139,7 @@ async def stream_from_first_response( continue chunks_received = True + timings_list[0].append(time.monotonic() - response_start_time) collected_chunks_list[0].append(content) yield f"data: {json.dumps(chunk.model_dump())}\n\n" @@ -144,10 +151,21 @@ async def stream_from_first_response( # Continue collecting remaining responses in background for scoring remaining = asyncio.gather(*pending, return_exceptions=True) - remaining_tasks = asyncio.create_task(collect_remaining_responses(remaining, collected_chunks_list, body, uids)) + remaining_tasks = asyncio.create_task( + collect_remaining_responses( + remaining=remaining, + collected_chunks_list=collected_chunks_list, + body=body, + uids=uids, + timings_list=timings_list, + response_start_time=response_start_time, + ) + ) await remaining_tasks asyncio.create_task( - scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=collected_chunks_list) + scoring_queue.scoring_queue.append_response( + uids=uids, body=body, chunks=collected_chunks_list, timings=timings_list + ) ) except asyncio.CancelledError: @@ -161,7 +179,12 @@ async def stream_from_first_response( async def collect_remaining_responses( - remaining: asyncio.Task, collected_chunks_list: List[List[str]], body: dict[str, any], uids: List[int] + remaining: asyncio.Task, + collected_chunks_list: List[List[str]], + body: dict[str, any], + uids: List[int], + timings_list: List[List[float]], + response_start_time: float, ): """Collect remaining responses for scoring without blocking the main response.""" try: @@ -178,6 +201,7 @@ async def collect_remaining_responses( content = getattr(chunk.choices[0].delta, "content", None) if content is None: continue + timings_list[i + 1].append(time.monotonic() - response_start_time) collected_chunks_list[i + 1].append(content) except Exception as e: @@ -214,6 +238,7 @@ async def chat_completion( # Initialize chunks collection for each miner collected_chunks_list = [[] for _ in uids] + timings_list = [[] for _ in uids] timeout_seconds = max( 30, max(0, math.floor(math.log2(body["sampling_parameters"].get("max_new_tokens", 256) / 256))) * 10 + 30 @@ -230,7 +255,7 @@ async def chat_completion( ] return StreamingResponse( - stream_from_first_response(response_tasks, collected_chunks_list, body, uids), + stream_from_first_response(response_tasks, collected_chunks_list, body, uids, timings_list), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index b7070945d..814aec02b 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -42,7 +42,6 @@ async def completions(request: Request, api_key: str = Depends(validate_api_key) ) if not uids: raise HTTPException(status_code=500, detail="No available miners") - # Choose between regular completion and mixture of miners. if body.get("test_time_inference", False): return await test_time_inference(body["messages"], body.get("model", None)) @@ -98,8 +97,8 @@ async def web_retrieval(search_query: str, n_miners: int = 10, n_results: int = if len(loaded_results) == 0: raise HTTPException(status_code=500, detail="No miner responded successfully") - chunks = [res.accumulated_chunks if res and res.accumulated_chunks else [] for res in stream_results] - asyncio.create_task(scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=chunks)) + collected_chunks_list = [res.accumulated_chunks if res and res.accumulated_chunks else [] for res in stream_results] + asyncio.create_task(scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=collected_chunks_list)) return loaded_results diff --git a/validator_api/miner_availabilities.py b/validator_api/miner_availabilities.py index 177b9d24d..eb650211e 100644 --- a/validator_api/miner_availabilities.py +++ b/validator_api/miner_availabilities.py @@ -61,13 +61,17 @@ def get_available_miner(task: Optional[str] = None, model: Optional[str] = None) class MinerAvailabilitiesUpdater(AsyncLoopRunner): - interval: int = 20 + interval: int = 40 async def run_step(self): - uids = get_uids(sampling_mode="random", k=100) + uids = get_uids( + sampling_mode="random", k=100 + ) # TODO: We should probably not just randomly sample uids, there's likely a better way to do this. + # TODO: Default to highest stake validator's availability api url = f"{shared_settings.VALIDATOR_API}/miner_availabilities/miner_availabilities" try: + # TODO: Need to add some level of ddos protection for this result = requests.post(url, json=uids.tolist(), timeout=10) result.raise_for_status() # Raise an exception for bad status codes diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index ac1ff60f9..fa32e950e 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -8,10 +8,13 @@ from pydantic import BaseModel from shared import settings +from shared.epistula import create_header_hook +from shared.loop_runner import AsyncLoopRunner +from validator_api.validator_forwarding import ValidatorRegistry -shared_settings = settings.shared_settings +validator_registry = ValidatorRegistry() -from shared.loop_runner import AsyncLoopRunner +shared_settings = settings.shared_settings class ScoringPayload(BaseModel): @@ -45,32 +48,47 @@ async def run_step(self): scoring_payload = self._scoring_queue.popleft() payload = scoring_payload.payload - uids = payload["uid"] + uids = payload["uids"] logger.info(f"Received new organic for scoring, uids: {uids}") - - url = f"http://{shared_settings.VALIDATOR_API}/scoring" + try: + vali_uid, vali_axon, vali_hotkey = validator_registry.get_available_axon() + # get_available_axon() will return None if it cannot find an available validator, which will fail to unpack + url = f"http://{vali_axon}/scoring" + except Exception as e: + logger.exception(f"Could not find available validator scoring endpoint: {e}") try: timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) - async with httpx.AsyncClient(timeout=timeout) as client: + # Add required headers for signature verification + + logger.debug(f"Forwarding payload to {url}.\n\nPAYLOAD: {payload}") + async with httpx.AsyncClient( + timeout=timeout, + event_hooks={"request": [create_header_hook(shared_settings.WALLET.hotkey, vali_hotkey)]}, + ) as client: response = await client.post( url=url, json=payload, - headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"}, + # headers=headers, ) + validator_registry.update_validators(uid=vali_uid, response_code=response.status_code) if response.status_code != 200: # Raise an exception so that the retry logic in the except block handles it. - raise Exception(f"Non-200 response: {response.status_code} for uids {uids}") + raise Exception(f"Non-200 response: {response.status_code} for validator {vali_uid}") logger.info(f"Forwarding response completed with status {response.status_code}") except Exception as e: if scoring_payload.retries < self.max_scoring_retries: scoring_payload.retries += 1 async with self._scoring_lock: self._scoring_queue.appendleft(scoring_payload) - logger.error(f"Tried to forward response to {url} with payload {payload}. Queued for retry") + logger.error( + f"Tried to forward response to {url} with payload {payload}. Exception: {e}. Queued for retry" + ) else: logger.exception(f"Error while forwarding response after {scoring_payload.retries} retries: {e}") - async def append_response(self, uids: list[int], body: dict[str, Any], chunks: list[list[str]]): + async def append_response( + self, uids: list[int], body: dict[str, Any], chunks: list[list[str]], timings: list[list[float]] | None = None + ): if not shared_settings.SCORE_ORGANICS: return @@ -79,8 +97,12 @@ async def append_response(self, uids: list[int], body: dict[str, Any], chunks: l return uids = [int(u) for u in uids] - chunk_dict = {u: c for u, c in zip(uids, chunks)} - payload = {"body": body, "chunks": chunk_dict, "uid": uids} + chunk_dict = {str(u): c for u, c in zip(uids, chunks)} + if timings: + timing_dict = {str(u): t for u, t in zip(uids, timings)} + else: + timing_dict = {} + payload = {"body": body, "chunks": chunk_dict, "uids": uids, "timings": timing_dict} scoring_item = ScoringPayload(payload=payload) async with self._scoring_lock: diff --git a/validator_api/utils.py b/validator_api/utils.py index a24c708a6..ee25e2dbf 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -7,18 +7,17 @@ from shared.loop_runner import AsyncLoopRunner from shared.uids import get_uids -shared_settings = settings.shared_settings - class UpdateMinerAvailabilitiesForAPI(AsyncLoopRunner): + interval: int = 300 miner_availabilities: dict[int, dict] = {} async def run_step(self): - if shared_settings.API_TEST_MODE: + if settings.shared_settings.API_TEST_MODE: return try: response = requests.post( - f"http://{shared_settings.VALIDATOR_API}/miner_availabilities/miner_availabilities", + f"http://{settings.shared_settings.VALIDATOR_API}/miner_availabilities/miner_availabilities", headers={"accept": "application/json", "Content-Type": "application/json"}, json=get_uids(sampling_mode="all"), timeout=10, diff --git a/validator_api/validator_forwarding.py b/validator_api/validator_forwarding.py new file mode 100644 index 000000000..3bd0d4ba7 --- /dev/null +++ b/validator_api/validator_forwarding.py @@ -0,0 +1,101 @@ +import random +import time +from typing import ClassVar, List, Optional, Tuple + +import numpy as np +from loguru import logger +from pydantic import BaseModel, Field, model_validator + +from shared.settings import shared_settings + + +class Validator(BaseModel): + uid: int + stake: float + axon: str + hotkey: str + timeout: int = 1 # starting cooldown in seconds; doubles on failure (capped at 86400) + available_at: float = 0.0 # Unix timestamp indicating when the validator is next available + + def update_failure(self, status_code: int) -> int: + """ + Update the validator's failure count based on the operation status. + + - If the operation was successful (status_code == 200), decrease the failure count (ensuring it doesn't go below 0). + - If the operation failed, increase the failure count. + - If the failure count exceeds MAX_FAILURES, return 1 to indicate the validator should be deactivated. + Otherwise, return 0. + """ + current_time = time.time() + if status_code == 200: + self.timeout = 1 + self.available_at = current_time + else: + self.timeout = min(self.timeout * 4, 86400) + self.available_at = current_time + self.timeout + + def is_available(self): + """ + Check if the validator is available based on its cooldown. + """ + return time.time() >= self.available_at + + +class ValidatorRegistry(BaseModel): + """ + Class to store the success of forwards to validator axons. + Validators that routinely fail to respond to scoring requests are removed. + """ + + # Using a default factory ensures validators is always a dict. + validators: dict[int, Validator] = Field(default_factory=dict) + spot_checking_rate: ClassVar[float] = 0.0 + max_retries: ClassVar[int] = 4 + + @model_validator(mode="after") + def create_validator_list(cls, v: "ValidatorRegistry", metagraph=shared_settings.METAGRAPH) -> "ValidatorRegistry": + validator_uids = np.where(metagraph.stake >= 100000)[0].tolist() + print(f"uids: {validator_uids}") + validator_axons = [metagraph.axons[uid].ip_str().split("/")[2] for uid in validator_uids] + validator_stakes = [metagraph.stake[uid] for uid in validator_uids] + validator_hotkeys = [metagraph.hotkeys[uid] for uid in validator_uids] + v.validators = { + uid: Validator(uid=uid, stake=stake, axon=axon, hotkey=hotkey) + for uid, stake, axon, hotkey in zip(validator_uids, validator_stakes, validator_axons, validator_hotkeys) + } + return v + + def get_available_validators(self) -> List[Validator]: + """ + Given a list of validators, return only those that are not in their cooldown period. + """ + return [uid for uid, validator in self.validators.items() if validator.is_available()] + + def get_available_axon(self) -> Optional[Tuple[int, List[str], str]]: + """ + Returns a tuple (uid, axon, hotkey) for a randomly selected validator based on stake weighting, + if spot checking conditions are met. Otherwise, returns None. + """ + if random.random() < self.spot_checking_rate or not self.validators: + return None + for _ in range(self.max_retries): + validator_list = self.get_available_validators() + if validator_list: + break + else: + time.sleep(5) + if not validator_list: + logger.error(f"Could not find available validator after {self.max_retries}") + return None + weights = [self.validators[uid].stake for uid in validator_list] + chosen = self.validators[random.choices(validator_list, weights=weights, k=1)[0]] + return chosen.uid, chosen.axon, chosen.hotkey + + def update_validators(self, uid: int, response_code: int) -> None: + """ + Update a specific validator's failure count based on the response code. + If the validator's failure count exceeds the maximum allowed failures, + the validator is removed from the registry. + """ + if uid in self.validators: + self.validators[uid].update_failure(response_code)