diff --git a/README.md b/README.md index 14e649773..3d55e67d5 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ Validators and miners are based on large language models (LLM). The validation p
-**[For Validators](./assets/validator.md)** ยท **[For Miners](./assets/miner.md)** ยท **[API Documentation](./validator_api/API_docs.md)** +**[For Validators](./docs/validator.md)** ยท **[For Miners](./docs/epistula_miner.md)** ยท **[API Documentation]((./docs/API_docs.md))**
@@ -66,7 +66,7 @@ The miner is given a complex problem that requires multiple steps to solve. Each # API Documentation -For detailed information on the available API endpoints, request/response formats, and usage examples, please refer to the [API Documentation](./validator_api/API_docs.md). +For detailed information on the available API endpoints, request/response formats, and usage examples, please refer to the [API Documentation](./docs/API_docs.md). # Contribute
diff --git a/validator_api/API_docs.md b/docs/API_docs.md similarity index 100% rename from validator_api/API_docs.md rename to docs/API_docs.md diff --git a/neurons/miners/epistula_miner/README.md b/docs/epistula_miner.md similarity index 100% rename from neurons/miners/epistula_miner/README.md rename to docs/epistula_miner.md diff --git a/assets/validator.md b/docs/validator.md similarity index 100% rename from assets/validator.md rename to docs/validator.md diff --git a/neurons/miners/epistula_miner/web_retrieval.py b/neurons/miners/epistula_miner/web_retrieval.py index 344c2046e..6a695d317 100644 --- a/neurons/miners/epistula_miner/web_retrieval.py +++ b/neurons/miners/epistula_miner/web_retrieval.py @@ -7,7 +7,7 @@ from openai import OpenAI from prompting.base.duckduckgo_patch import PatchedDDGS -from shared.settings import shared_settings +from shared import settings # Import the patched DDGS and use that @@ -56,7 +56,7 @@ async def get_websites_with_similarity( List of dictionaries containing website URLs and their best matching chunks """ logger.debug("Getting results") - ddgs = PatchedDDGS(proxy=shared_settings.PROXY_URL, verify=False) + ddgs = PatchedDDGS(proxy=settings.shared_settings.PROXY_URL, verify=False) results = list(ddgs.text(query)) logger.debug(f"Got {len(results)} results") urls = [r["href"] for r in results][:n_results] @@ -66,7 +66,7 @@ async def get_websites_with_similarity( extracted = await asyncio.gather(*[extract_content(c) for c in content]) # Create embeddings - client = OpenAI(api_key=shared_settings.OPENAI_API_KEY) + client = OpenAI(api_key=settings.shared_settings.OPENAI_API_KEY) query_embedding = client.embeddings.create(model="text-embedding-ada-002", input=query).data[0].embedding # Process each website results_with_similarity = [] diff --git a/neurons/validator.py b/neurons/validator.py index 308c794fa..dbd229004 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -9,8 +9,8 @@ # ruff: noqa: E402 from shared import settings +from shared.logging import init_wandb -shared_settings = settings.shared_settings settings.shared_settings = settings.SharedSettings.load(mode="validator") @@ -26,13 +26,12 @@ def create_loop_process(task_queue, scoring_queue, reward_events): + settings.shared_settings = settings.SharedSettings.load(mode="validator") + if settings.shared_settings.WANDB_ON: + init_wandb(neuron="validator") + async def spawn_loops(task_queue, scoring_queue, reward_events): # ruff: noqa: E402 - wandb.setup() - from shared import settings - - settings.shared_settings = settings.SharedSettings.load(mode="validator") - from prompting.llms.model_manager import model_scheduler from prompting.miner_availability.miner_availability import availability_checking_loop from prompting.rewards.scoring import task_scorer @@ -88,10 +87,11 @@ async def start(): # TODO: We should not use 2 availability loops for each process, in reality # we should only be sharing the miner availability data between processes. from prompting.miner_availability.miner_availability import availability_checking_loop + from prompting.rewards.scoring import task_scorer asyncio.create_task(availability_checking_loop.start()) - await start_scoring_api(scoring_queue, reward_events) + await start_scoring_api(task_scorer, scoring_queue, reward_events) while True: await asyncio.sleep(10) @@ -100,23 +100,6 @@ async def start(): asyncio.run(start()) -# def create_task_loop(task_queue, scoring_queue): -# async def start(task_queue, scoring_queue): -# logger.info("Starting AvailabilityCheckingLoop...") -# asyncio.create_task(availability_checking_loop.start()) - -# logger.info("Starting TaskSender...") -# asyncio.create_task(task_sender.start(task_queue, scoring_queue)) - -# logger.info("Starting TaskLoop...") -# asyncio.create_task(task_loop.start(task_queue, scoring_queue)) -# while True: -# await asyncio.sleep(10) -# logger.debug("Running task loop...") - -# asyncio.run(start(task_queue, scoring_queue)) - - async def main(): # will start checking the availability of miners at regular intervals, needed for API and Validator with torch.multiprocessing.Manager() as manager: @@ -130,7 +113,7 @@ async def main(): try: # # Start checking the availability of miners at regular intervals - if shared_settings.DEPLOY_SCORING_API: + if settings.shared_settings.DEPLOY_SCORING_API: # Use multiprocessing to bypass API blocking issue api_process = mp.Process(target=start_api, args=(scoring_queue, reward_events), name="API_Process") api_process.start() @@ -152,17 +135,17 @@ async def main(): while True: await asyncio.sleep(30) if ( - shared_settings.SUBTENSOR.get_current_block() - - shared_settings.METAGRAPH.last_update[shared_settings.UID] + settings.shared_settings.SUBTENSOR.get_current_block() + - settings.shared_settings.METAGRAPH.last_update[settings.shared_settings.UID] > 500 and step > 120 ): + current_block = settings.shared_settings.SUBTENSOR.get_current_block() + last_update_block = settings.shared_settings.METAGRAPH.last_update[settings.shared_settings.UID] logger.warning( - f"UPDATES HAVE STALED FOR {shared_settings.SUBTENSOR.get_current_block() - shared_settings.METAGRAPH.last_update[shared_settings.UID]} BLOCKS AND {step} STEPS" - ) - logger.warning( - f"STALED: {shared_settings.SUBTENSOR.get_current_block()}, {shared_settings.METAGRAPH.block}" + f"UPDATES HAVE STALED FOR {current_block - last_update_block} BLOCKS AND {step} STEPS" ) + logger.warning(f"STALED: {current_block}, {settings.shared_settings.METAGRAPH.block}") sys.exit(1) step += 1 diff --git a/prompting/api/api.py b/prompting/api/api.py index 825b19c7b..34b5dadc7 100644 --- a/prompting/api/api.py +++ b/prompting/api/api.py @@ -4,8 +4,7 @@ from prompting.api.miner_availabilities.api import router as miner_availabilities_router from prompting.api.scoring.api import router as scoring_router -from prompting.rewards.scoring import task_scorer -from shared.settings import shared_settings +from shared import settings app = FastAPI() app.include_router(miner_availabilities_router, prefix="/miner_availabilities", tags=["miner_availabilities"]) @@ -18,10 +17,17 @@ def health(): return {"status": "healthy"} -async def start_scoring_api(scoring_queue, reward_events): - task_scorer.scoring_queue = scoring_queue - task_scorer.reward_events = reward_events - logger.info(f"Starting Scoring API on https://0.0.0.0:{shared_settings.SCORING_API_PORT}") +async def start_scoring_api(task_scorer, scoring_queue, reward_events): + # We pass an object of task scorer then override it's attributes to ensure that they are managed by mp + app.state.task_scorer = task_scorer + app.state.task_scorer.scoring_queue = scoring_queue + app.state.task_scorer.reward_events = reward_events + + logger.info(f"Starting Scoring API on https://0.0.0.0:{settings.shared_settings.SCORING_API_PORT}") uvicorn.run( - "prompting.api.api:app", host="0.0.0.0", port=shared_settings.SCORING_API_PORT, loop="asyncio", reload=False + "prompting.api.api:app", + host="0.0.0.0", + port=settings.shared_settings.SCORING_API_PORT, + loop="asyncio", + reload=False, ) diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index 1364f75f6..d52ba74bc 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -6,28 +6,33 @@ from prompting.datasets.random_website import DDGDatasetEntry from prompting.llms.model_zoo import ModelZoo -from prompting.rewards.scoring import task_scorer from prompting.tasks.inference import InferenceTask from prompting.tasks.web_retrieval import WebRetrievalTask +from shared import settings from shared.base import DatasetEntry from shared.dendrite import DendriteResponseEvent from shared.epistula import SynapseStreamResult -from shared.settings import shared_settings router = APIRouter() def validate_scoring_key(api_key: str = Header(...)): - if api_key != shared_settings.SCORING_KEY: + if api_key != settings.shared_settings.SCORING_KEY: raise HTTPException(status_code=403, detail="Invalid API key") +def get_task_scorer(request: Request): + return request.app.state.task_scorer + + @router.post("/scoring") -async def score_response(request: Request, api_key_data: dict = Depends(validate_scoring_key)): +async def score_response( + request: Request, api_key_data: dict = Depends(validate_scoring_key), task_scorer=Depends(get_task_scorer) +): model = None payload: dict[str, Any] = await request.json() body = payload.get("body") - timeout = payload.get("timeout", shared_settings.NEURON_TIMEOUT) + timeout = payload.get("timeout", settings.shared_settings.NEURON_TIMEOUT) uids = payload.get("uid", []) chunks = payload.get("chunks", {}) if not uids or not chunks: @@ -54,7 +59,7 @@ async def score_response(request: Request, api_key_data: dict = Depends(validate llm_model=llm_model, llm_model_id=body.get("model"), seed=int(body.get("seed", 0)), - sampling_params=body.get("sampling_parameters", shared_settings.SAMPLING_PARAMS), + sampling_params=body.get("sampling_parameters", settings.shared_settings.SAMPLING_PARAMS), query=body.get("messages"), ) logger.info(f"Task created: {organic_task}") @@ -66,7 +71,7 @@ async def score_response(request: Request, api_key_data: dict = Depends(validate timeout=timeout, ), dataset_entry=DatasetEntry(), - block=shared_settings.METAGRAPH.block, + block=settings.shared_settings.METAGRAPH.block, step=-1, task_id=str(uuid.uuid4()), ) @@ -90,10 +95,10 @@ async def score_response(request: Request, api_key_data: dict = Depends(validate stream_results=[ SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None]) ], - timeout=body.get("timeout", shared_settings.NEURON_TIMEOUT), + timeout=body.get("timeout", settings.shared_settings.NEURON_TIMEOUT), ), dataset_entry=DDGDatasetEntry(search_term=search_term), - block=shared_settings.METAGRAPH.block, + block=settings.shared_settings.METAGRAPH.block, step=-1, task_id=str(uuid.uuid4()), ) diff --git a/prompting/datasets/random_website.py b/prompting/datasets/random_website.py index ae70ed41f..4bdaca3a1 100644 --- a/prompting/datasets/random_website.py +++ b/prompting/datasets/random_website.py @@ -1,14 +1,14 @@ import random +from functools import lru_cache from typing import Optional import trafilatura from loguru import logger -# from duckduckgo_search import DDGS from prompting.base.duckduckgo_patch import PatchedDDGS from prompting.datasets.utils import ENGLISH_WORDS +from shared import settings from shared.base import BaseDataset, Context, DatasetEntry -from shared.settings import shared_settings MAX_CHARS = 5000 @@ -25,7 +25,7 @@ class DDGDataset(BaseDataset): english_words: list[str] = None def search_random_term(self, retries: int = 3) -> tuple[Optional[str], Optional[list[dict[str, str]]]]: - ddg = PatchedDDGS(proxy=shared_settings.PROXY_URL, verify=False) + ddg = PatchedDDGS(proxy=settings.shared_settings.PROXY_URL, verify=False) for _ in range(retries): random_words = " ".join(random.sample(ENGLISH_WORDS, 3)) try: @@ -38,6 +38,7 @@ def search_random_term(self, retries: int = 3) -> tuple[Optional[str], Optional[ return None, None @staticmethod + @lru_cache(maxsize=1000) def extract_website_content(url: str) -> Optional[str]: try: website = trafilatura.fetch_url(url) diff --git a/prompting/llms/apis/gpt_wrapper.py b/prompting/llms/apis/gpt_wrapper.py index c6c6e313a..05a58fb65 100644 --- a/prompting/llms/apis/gpt_wrapper.py +++ b/prompting/llms/apis/gpt_wrapper.py @@ -5,7 +5,9 @@ from pydantic import BaseModel from prompting.llms.apis.llm_messages import LLMMessage, LLMMessages -from shared.settings import shared_settings +from shared import settings + +shared_settings = settings.shared_settings class GPT(BaseModel): diff --git a/prompting/llms/apis/llm_wrapper.py b/prompting/llms/apis/llm_wrapper.py index d9d40646e..128af345a 100644 --- a/prompting/llms/apis/llm_wrapper.py +++ b/prompting/llms/apis/llm_wrapper.py @@ -3,10 +3,13 @@ from prompting.llms.apis.gpt_wrapper import openai_client from prompting.llms.apis.llm_messages import LLMMessages from prompting.llms.apis.sn19_wrapper import chat_complete -from shared.settings import shared_settings +from shared import settings + +shared_settings = settings.shared_settings class LLMWrapper: + @staticmethod def chat_complete( messages: LLMMessages, model="chat-llama-3-1-70b", @@ -29,27 +32,23 @@ def chat_complete( logprobs=logprobs, ) - except Exception as ex: - logger.exception(ex) - logger.warning("Failed to use SN19 API, falling back to GPT-3.5") - else: - if response is not None: - logger.debug(f"Generated {len(response)} characters using {model}") - return response - logger.warning( - "Failed to use SN19 API (check the SN19_API_KEY and/or SN19_API_URL), " "falling back to GPT-3.5" + except Exception: + logger.error( + "Failed to use SN19 API, falling back to GPT-3.5. " + "Make sure to specify 'SN19_API_KEY' and 'SN19_API_URL' in .env.validator" ) - model = "gpt-3.5-turbo" - response, _ = openai_client.chat_complete( - messages=messages, - model=model, - temperature=temperature, - max_tokens=max_tokens, - top_p=top_p, - stream=stream, - logprobs=logprobs, - ) - response = response.choices[0].message.content + if response is None: + model = "gpt-3.5-turbo" + response, _ = openai_client.chat_complete( + messages=messages, + model=model, + temperature=temperature, + max_tokens=max_tokens, + top_p=top_p, + stream=stream, + logprobs=logprobs, + ) + response = response.choices[0].message.content logger.debug(f"Generated {len(response)} characters using {model}") return response diff --git a/prompting/llms/apis/sn19_wrapper.py b/prompting/llms/apis/sn19_wrapper.py index 2e1d634df..deecfe3d2 100644 --- a/prompting/llms/apis/sn19_wrapper.py +++ b/prompting/llms/apis/sn19_wrapper.py @@ -1,11 +1,12 @@ import json import requests -from loguru import logger from tenacity import retry, stop_after_attempt, wait_exponential from prompting.llms.apis.llm_messages import LLMMessages -from shared.settings import shared_settings +from shared import settings + +shared_settings = settings.shared_settings # TODO: key error in response.json() when response is 500 @@ -37,11 +38,8 @@ def chat_complete( "logprobs": logprobs, } response = requests.post(url, headers=headers, data=json.dumps(data), timeout=30) + response_json = response.json() try: - response_json = response.json() - try: - return response_json["choices"][0]["message"].get("content") - except KeyError: - return response_json["choices"][0]["delta"].get("content") - except Exception as e: - logger.exception(f"Error in chat_complete: {e}") + return response_json["choices"][0]["message"].get("content") + except KeyError: + return response_json["choices"][0]["delta"].get("content") diff --git a/prompting/llms/hf_llm.py b/prompting/llms/hf_llm.py index 934071e3a..1b35b2ed8 100644 --- a/prompting/llms/hf_llm.py +++ b/prompting/llms/hf_llm.py @@ -5,7 +5,7 @@ from loguru import logger from transformers import AutoModelForCausalLM, AutoTokenizer, PreTrainedModel, pipeline -from shared.settings import shared_settings +from shared import settings from shared.timer import Timer @@ -31,7 +31,7 @@ def __init__(self, model_id="hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4 self.llm = pipeline("text-generation", model=self.model, tokenizer=self.tokenizer) - self.sampling_params = shared_settings.SAMPLING_PARAMS + self.sampling_params = settings.shared_settings.SAMPLING_PARAMS @torch.inference_mode() def generate(self, messages: list[str] | list[dict], sampling_params=None, seed=None): @@ -46,15 +46,15 @@ def generate(self, messages: list[str] | list[dict], sampling_params=None, seed= add_generation_prompt=True, return_tensors="pt", return_dict=True, - ).to(shared_settings.NEURON_DEVICE) + ).to(settings.shared_settings.NEURON_DEVICE) params = sampling_params if sampling_params else self.sampling_params filtered_params = {k: v for k, v in params.items() if k in self.valid_generation_params} - with Timer() as timer: + with Timer(): # Generate with optimized settings outputs = self.model.generate( - **inputs.to(shared_settings.NEURON_DEVICE), + **inputs.to(settings.shared_settings.NEURON_DEVICE), **filtered_params, eos_token_id=self.tokenizer.eos_token_id, ) @@ -65,12 +65,12 @@ def generate(self, messages: list[str] | list[dict], sampling_params=None, seed= )[0] logger.debug( - f"""REPRODUCIBLEHF WAS QUERIED: - PROMPT: {messages}\n\n - RESPONSES: {results}\n\n - SAMPLING PARAMS: {params}\n\n - SEED: {seed}\n\n - TIME FOR RESPONSE: {timer.elapsed_time}""" + f"""{self.__class__.__name__} queried: + prompt: {messages}\n + responses: {results}\n + sampling params: {params}\n + seed: {seed} + """ ) return results if len(results) > 1 else results[0] diff --git a/prompting/llms/model_manager.py b/prompting/llms/model_manager.py index c431dcd88..4c02d8779 100644 --- a/prompting/llms/model_manager.py +++ b/prompting/llms/model_manager.py @@ -9,8 +9,8 @@ from prompting.llms.hf_llm import ReproducibleHF from prompting.llms.model_zoo import ModelConfig, ModelZoo from prompting.llms.utils import GPUInfo +from shared import settings from shared.loop_runner import AsyncLoopRunner -from shared.settings import shared_settings # This maintains a list of tasks for which we need to generate references. Since # we can only generate the references, when the correct model is loaded, we work @@ -20,7 +20,7 @@ class ModelManager(BaseModel): always_active_models: list[ModelConfig] = [] - total_ram: float = shared_settings.LLM_MODEL_RAM + total_ram: float = settings.shared_settings.LLM_MODEL_RAM active_models: dict[ModelConfig, ReproducibleHF] = {} used_ram: float = 0.0 model_config = ConfigDict(arbitrary_types_allowed=True) @@ -71,7 +71,7 @@ def load_model(self, model_config: ModelConfig, force: bool = True): model = ReproducibleHF( model=model_config.llm_model_id, gpu_memory_utilization=model_config.min_ram / GPUInfo.free_memory, - max_model_len=shared_settings.LLM_MAX_MODEL_LEN, + max_model_len=settings.shared_settings.LLM_MAX_MODEL_LEN, ) self.active_models[model_config] = model diff --git a/prompting/llms/model_zoo.py b/prompting/llms/model_zoo.py index dab941616..d078a0380 100644 --- a/prompting/llms/model_zoo.py +++ b/prompting/llms/model_zoo.py @@ -4,7 +4,7 @@ from loguru import logger from pydantic import BaseModel, ConfigDict -from shared.settings import shared_settings +from shared import settings class ModelConfig(BaseModel): @@ -20,7 +20,11 @@ def __hash__(self): class ModelZoo: # Currently, we are only using one single model - the one the validator is running models_configs: ClassVar[list[ModelConfig]] = [ - ModelConfig(llm_model_id=shared_settings.LLM_MODEL, reward=1, min_ram=shared_settings.MAX_ALLOWED_VRAM_GB), + ModelConfig( + llm_model_id=settings.shared_settings.LLM_MODEL, + reward=1, + min_ram=settings.shared_settings.MAX_ALLOWED_VRAM_GB, + ), ] # Code below can be uncommended for testing purposes and demonstrates how we rotate multiple LLMs in the future diff --git a/prompting/miner_availability/miner_availability.py b/prompting/miner_availability/miner_availability.py index 49b7df825..1b2534f26 100644 --- a/prompting/miner_availability/miner_availability.py +++ b/prompting/miner_availability/miner_availability.py @@ -9,11 +9,13 @@ from prompting.llms.model_zoo import ModelZoo from prompting.tasks.base_task import BaseTask from prompting.tasks.task_registry import TaskRegistry +from shared import settings from shared.epistula import query_availabilities from shared.loop_runner import AsyncLoopRunner -from shared.settings import shared_settings from shared.uids import get_uids +shared_settings = settings.shared_settings + task_config: dict[str, bool] = {str(task_config.task.__name__): True for task_config in TaskRegistry.task_configs} model_config: dict[str, bool] = {conf.llm_model_id: False for conf in ModelZoo.models_configs} diff --git a/prompting/rewards/exact_match.py b/prompting/rewards/exact_match.py index ebcc2c691..3b192ea92 100644 --- a/prompting/rewards/exact_match.py +++ b/prompting/rewards/exact_match.py @@ -2,9 +2,10 @@ from loguru import logger from prompting.rewards.reward import BaseRewardModel, BatchRewardOutput +from shared import settings from shared.dendrite import DendriteResponseEvent -from shared.settings import shared_settings +shared_settings = settings.shared_settings INCORRECT_PENALTY = 3 INCOMPLETE_PENALTY = 1 diff --git a/prompting/rewards/inference_reward_model.py b/prompting/rewards/inference_reward_model.py index e174f55f9..d6ccbfb99 100644 --- a/prompting/rewards/inference_reward_model.py +++ b/prompting/rewards/inference_reward_model.py @@ -6,7 +6,11 @@ class InferenceRewardModel(BaseRewardModel): def reward( - self, reference: str, response_event: DendriteResponseEvent, model_id: str | None = None + self, + reference: str, + response_event: DendriteResponseEvent, + model_id: str | None = None, + **kwargs, ) -> BatchRewardOutput: """Gives an exact reward of 1 if the response matches the reference, 0 otherwise""" if model_id: diff --git a/prompting/rewards/multi_choice.py b/prompting/rewards/multi_choice.py index 40948ad79..13050d70f 100644 --- a/prompting/rewards/multi_choice.py +++ b/prompting/rewards/multi_choice.py @@ -42,6 +42,9 @@ def process_predictions(self, predictions: dict[str, float]) -> dict[str, float] } total = sum(valid_choices.values()) + if np.isclose(total, 0.0): + raise ValueError(f"Values sum up to 0, total={total}") + if not np.isclose(total, 1.0): valid_choices = {k: v / total for k, v in valid_choices.items()} diff --git a/prompting/rewards/relevance.py b/prompting/rewards/relevance.py index 8d1f60dec..9288a007f 100644 --- a/prompting/rewards/relevance.py +++ b/prompting/rewards/relevance.py @@ -7,8 +7,10 @@ from scipy import spatial from prompting.rewards.reward import BaseRewardModel, BatchRewardOutput +from shared import settings from shared.dendrite import DendriteResponseEvent -from shared.settings import shared_settings + +shared_settings = settings.shared_settings MODEL = AnglE.from_pretrained("WhereIsAI/UAE-Large-V1", pooling_strategy="cls", device=shared_settings.NEURON_DEVICE) if shared_settings.NEURON_DEVICE.startswith("cuda"): diff --git a/prompting/rewards/scoring.py b/prompting/rewards/scoring.py index 6b7d45bec..2ae481434 100644 --- a/prompting/rewards/scoring.py +++ b/prompting/rewards/scoring.py @@ -51,7 +51,7 @@ def add_to_queue( task_id=task_id, ) ) - logger.debug(f"SCORING: Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}") + logger.debug(f"Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}") async def run_step(self) -> RewardLoggingEvent: await asyncio.sleep(0.1) @@ -79,7 +79,8 @@ async def run_step(self) -> RewardLoggingEvent: # and there we then calculate the reward reward_pipeline = TaskRegistry.get_task_reward(scoring_config.task) logger.debug( - f"""{len(scoring_config.response.completions)} completions to score for task {scoring_config.task}""" + f"{len(scoring_config.response.completions)} completions to score for task " + f"{scoring_config.task.__class__.__name__}" ) reward_events = reward_pipeline.apply( response_event=scoring_config.response, @@ -89,11 +90,9 @@ async def run_step(self) -> RewardLoggingEvent: task=scoring_config.task, ) self.reward_events.append(reward_events) - # logger.debug( - # f"REFERENCE: {scoring_config.task.reference}\n\n||||RESPONSES: {scoring_config.response.completions}" - # ) logger.debug( - f"SCORING: Scored {scoring_config.task.__class__.__name__} {scoring_config.task.task_id} with model {scoring_config.task.llm_model_id} with reward" + f"Scored {scoring_config.task.__class__.__name__} {scoring_config.task.task_id} with model " + f"{scoring_config.task.llm_model_id}" ) log_event( RewardLoggingEvent( @@ -109,7 +108,6 @@ async def run_step(self) -> RewardLoggingEvent: source=scoring_config.dataset_entry.source, ) ) - logger.info("Adding scores to rewards_and_uids") await asyncio.sleep(0.01) diff --git a/prompting/rewards/web_retrieval.py b/prompting/rewards/web_retrieval.py index dc736d686..6fa5f7bd3 100644 --- a/prompting/rewards/web_retrieval.py +++ b/prompting/rewards/web_retrieval.py @@ -1,13 +1,4 @@ -"""Expected miner's response is a JSON object with the following keys: url, content, relevant. - -Example response: -{ - "url": "https://www.example.com", - "content": "This is the content of the website. This is the section we are interested in.", - "relevant": "This is the section we are interested in.", -} -""" - +"""Expected miner's response is a JSON object with the following keys: url, content, relevant for each website.""" import json import numpy as np @@ -23,13 +14,13 @@ from shared.dendrite import DendriteResponseEvent MIN_RELEVANT_CHARS = 300 -MIN_MATCH_THRESHOLD = 90 +MIN_MATCH_THRESHOLD = 98 class WebsiteResult(BaseModel): - url: str - content: str - relevant: str + url: str | None + content: str | None + relevant: str | None class WebRetrievalRewardModel(RelevanceRewardModel): @@ -51,7 +42,7 @@ def score_website_result( logger.debug(f"Failed to extract miner's content from website: {response_url}") return 0 - if fuzz.token_set_ratio(response_content, reference_website_content) < MIN_MATCH_THRESHOLD: + if fuzz.ratio(response_content, reference_website_content) < MIN_MATCH_THRESHOLD: logger.info("Miner returned text that doesn't match the website, scoring 0") return 0 @@ -61,9 +52,9 @@ def score_website_result( f"{len(response_relevant)} > {len(response_content)}" ) return 0 - # if len(response_relevant) < MIN_RELEVANT_CHARS: - # logger.info(f"Relevant section is too short (<{MIN_RELEVANT_CHARS} chars)") - # return 0 + + if response_relevant not in response_content: + return 0 return self._cosine_similarity(content1=dataset_entry.query, content2=response_relevant) diff --git a/prompting/tasks/base_task.py b/prompting/tasks/base_task.py index 2b7ac1d7e..0014fbcd5 100644 --- a/prompting/tasks/base_task.py +++ b/prompting/tasks/base_task.py @@ -11,8 +11,8 @@ from prompting.llms.apis.llm_wrapper import LLMWrapper from prompting.llms.model_manager import model_manager from prompting.llms.model_zoo import ModelConfig +from shared import settings from shared.base import DatasetEntry -from shared.settings import shared_settings def CHATTENSOR_SYSTEM_PROMPT(): @@ -57,8 +57,9 @@ class BaseTextTask(BaseTask): reference_system_prompt: ClassVar[str | None] = None augmentation_system_prompt: ClassVar[str | None] = None dataset_entry: DatasetEntry | None = None - sampling_params: dict[str, float] = shared_settings.SAMPLING_PARAMS - timeout: int = shared_settings.NEURON_TIMEOUT + sampling_params: dict[str, float] = settings.shared_settings.SAMPLING_PARAMS + timeout: int = settings.shared_settings.NEURON_TIMEOUT + max_tokens: int = settings.shared_settings.NEURON_MAX_TOKENS @model_validator(mode="after") def get_model_id_and_seed(self) -> "BaseTextTask": @@ -75,7 +76,7 @@ def make_reference(self, dataset_entry: DatasetEntry) -> str: def generate_reference(self, messages: list[str]) -> str: """Generates a reference answer to be used for scoring miner completions""" logger.info("๐Ÿค– Generating reference...") - self.reference = model_manager.get_model(shared_settings.LLM_MODEL).generate( + self.reference = model_manager.get_model(settings.shared_settings.LLM_MODEL).generate( messages=messages ) # This should be a list of dict if self.reference is None: @@ -110,7 +111,7 @@ def augment_query( LLMMessage(role="system", content=self.augmentation_system_prompt), LLMMessage(role="user", content=query), ), - max_tokens=shared_settings.NEURON_MAX_TOKENS, + max_tokens=self.max_tokens, ) self.query = challenge return challenge diff --git a/prompting/tasks/inference.py b/prompting/tasks/inference.py index 6633a5e02..3bacd228f 100644 --- a/prompting/tasks/inference.py +++ b/prompting/tasks/inference.py @@ -2,7 +2,6 @@ from typing import ClassVar import numpy as np -from loguru import logger from pydantic import Field, model_validator from prompting.datasets.sn13 import ChatEntry @@ -12,7 +11,9 @@ from prompting.rewards.penalty import PenaltyModel from prompting.rewards.reward import BaseRewardConfig, BaseRewardModel from prompting.tasks.base_task import BaseTextTask -from shared.settings import shared_settings +from shared import settings + +shared_settings = settings.shared_settings class InferenceRewardConfig(BaseRewardConfig): @@ -75,10 +76,6 @@ def make_query(self, dataset_entry: ChatEntry) -> str: return self.query def make_reference(self, dataset_entry: ChatEntry) -> str: - logger.info(f"GENERATING REFERENCE FOR TASK {self.task_id}") - logger.info(f"MODEL: {self.llm_model}") - logger.info(f"SAMPLING PARAMS: {self.sampling_params}") - logger.info(f"MESSAGES: {dataset_entry.messages}") self.reference = model_manager.generate( messages=self.messages, model=self.llm_model, diff --git a/prompting/tasks/task_creation.py b/prompting/tasks/task_creation.py index 64a6f19aa..624e0894a 100644 --- a/prompting/tasks/task_creation.py +++ b/prompting/tasks/task_creation.py @@ -6,10 +6,12 @@ from prompting.miner_availability.miner_availability import miner_availabilities from prompting.tasks.task_registry import TaskRegistry +from shared import settings # from shared.logging import ErrorLoggingEvent, ValidatorLoggingEvent from shared.loop_runner import AsyncLoopRunner -from shared.settings import shared_settings + +shared_settings = settings.shared_settings RETRIES = 3 @@ -36,7 +38,8 @@ async def run_step(self): return None await asyncio.sleep(0.1) try: - # Getting task & Dataset + task = None + # Getting task and dataset for i in range(RETRIES): try: logger.debug(f"Retry: {i}") diff --git a/prompting/tasks/task_sending.py b/prompting/tasks/task_sending.py index 36075e7a3..486ecb9df 100644 --- a/prompting/tasks/task_sending.py +++ b/prompting/tasks/task_sending.py @@ -13,13 +13,15 @@ from prompting.tasks.base_task import BaseTextTask from prompting.tasks.inference import InferenceTask from prompting.tasks.web_retrieval import WebRetrievalTask +from shared import settings from shared.dendrite import DendriteResponseEvent, SynapseStreamResult from shared.epistula import query_miners from shared.logging import ErrorLoggingEvent, ValidatorLoggingEvent from shared.loop_runner import AsyncLoopRunner -from shared.settings import shared_settings from shared.timer import Timer +shared_settings = settings.shared_settings + NEURON_SAMPLE_SIZE = 100 @@ -70,14 +72,12 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None: body["target_results"] = task.target_results body["timeout"] = task.timeout - logger.info(f"๐Ÿ” SENDING TASK {task.task_id} WITH BODY: {body}") + logger.info(f"๐Ÿ” Sending task {task.task_id} with body: {body}") stream_results = await query_miners(uids, body) logger.debug(f"๐Ÿ” Collected responses from {len(stream_results)} miners") log_stream_results(stream_results) - logger.debug("๐Ÿ” Creating response event") - response_event = DendriteResponseEvent( stream_results=stream_results, uids=uids, diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index f37102409..fb99f7db4 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -11,10 +11,12 @@ from prompting.rewards.reward import WeightedRewardEvent from prompting.tasks.inference import InferenceTask from prompting.tasks.task_registry import TaskConfig, TaskRegistry +from shared import settings from shared.logging import WeightSetEvent, log_event from shared.loop_runner import AsyncLoopRunner from shared.misc import ttl_get_block -from shared.settings import shared_settings + +shared_settings = settings.shared_settings FILENAME = "validator_weights.npz" WEIGHTS_HISTORY_LENGTH = 24 @@ -153,7 +155,7 @@ async def start(self, reward_events, name: str | None = None): try: with np.load(FILENAME) as data: PAST_WEIGHTS = [data[key] for key in data.files] - logger.debug(f"Loaded Past Weights: {PAST_WEIGHTS}") + logger.debug(f"Loaded persistent weights of length: {len(PAST_WEIGHTS)}") except FileNotFoundError: logger.info("No weights file found - this is expected on a new validator, starting with empty weights") PAST_WEIGHTS = [] @@ -164,7 +166,6 @@ async def start(self, reward_events, name: str | None = None): async def run_step(self): await asyncio.sleep(0.01) try: - logger.info("Reward setting loop running") if len(self.reward_events) == 0: logger.warning("No reward events in queue, skipping weight setting...") return @@ -183,8 +184,6 @@ async def run_step(self): config: {uid: {"reward": 0, "count": 0} for uid in range(1024)} for config in TaskRegistry.task_configs } - logger.debug(f"Miner rewards before processing: {miner_rewards}") - inference_events: list[WeightedRewardEvent] = [] for reward_events in self.reward_events: await asyncio.sleep(0.01) @@ -207,8 +206,6 @@ async def run_step(self): 1 * reward_event.weight ) # TODO: Double check I actually average at the end - logger.debug(f"Miner rewards after processing: {miner_rewards}") - for inference_event in inference_events: for uid, reward in zip(inference_event.uids, inference_event.rewards): llm_model = inference_event.task.llm_model_id @@ -233,9 +230,16 @@ async def run_step(self): final_rewards = np.array(list(reward_dict.values())).astype(float) final_rewards[final_rewards < 0] = 0 final_rewards /= np.sum(final_rewards) + 1e-10 - logger.debug(f"Final reward dict: {final_rewards}") except Exception as ex: logger.exception(f"{ex}") + + mean_value = final_rewards.mean() + min_value = final_rewards.min() + max_value = final_rewards.max() + length = len(final_rewards) + logger.debug( + f"Reward stats. Mean: {mean_value:.2f}; Min: {min_value:.4f}; Max: {max_value:.4f}; Count: {length}" + ) # set weights on chain set_weights( final_rewards, step=self.step, subtensor=shared_settings.SUBTENSOR, metagraph=shared_settings.METAGRAPH diff --git a/pyproject.toml b/pyproject.toml index 12a4c79f4..907a17080 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "prompting" -version = "2.17.0" +version = "2.17.1" description = "Subnetwork 1 runs on Bittensor and is maintained by Macrocosmos. It's an effort to create decentralised AI" authors = ["Kalei Brady, Dmytro Bobrenko, Felix Quinque, Steffen Cruz, Richard Wardle"] readme = "README.md" diff --git a/scripts/test_api.py b/scripts/test_api.py index 0d6a265ba..bfa40bd3f 100644 --- a/scripts/test_api.py +++ b/scripts/test_api.py @@ -28,7 +28,6 @@ async def make_completion(client: openai.AsyncOpenAI, prompt: str, stream: bool "top_k": 50, "max_new_tokens": 256, "do_sample": True, - "seed": None, }, "task": "InferenceTask", "mixture": False, diff --git a/scripts/test_api_empty_responses.py b/scripts/test_api_empty_responses.py new file mode 100644 index 000000000..4510b2463 --- /dev/null +++ b/scripts/test_api_empty_responses.py @@ -0,0 +1,312 @@ +""" +Example usage: +python scripts/test_api_empty_responses.py --concurrent 4 --queries 1000 --out test + +Results will be stored in test/ + +Additional requirements: +pip install plotly kaleido +""" +import argparse +import asyncio +import csv +import random +import sys +import time +from pathlib import Path + +import nltk +import openai +import pandas as pd +import plotly.graph_objects as go +from httpx import HTTPStatusError, Timeout +from nltk.corpus import words + +nltk.download("words") +word_list = words.words() + + +def approximate_tokens(text: str) -> int: + """Approximate the number of tokens from a given string. + + Approach: + - Split by whitespace to get a word count. + - Multiply by ~1.3 to get approximate amount of tokens. + """ + words_in_text = text.split() + return int(len(words_in_text) * 1.3) + + +async def make_completion( + client: openai.AsyncOpenAI, + prompt: str, + stream: bool = True, + seed: str = "1759348", +) -> dict: + """Make a completion request to the API. + + Measures: + - Time to first token. + - Time to full response. + - Approximate tokens received. + - Status code. + + Returns a dictionary with measurements. + """ + start_time = time.perf_counter() + time_to_first_token = None + total_approx_tokens = 0 + status_code = None + + try: + result = await client.chat.completions.create( + model=None, + messages=[{"role": "user", "content": prompt}], + stream=stream, + extra_body={ + "seed": seed, + "sampling_parameters": { + "temperature": 0.7, + "top_p": 0.95, + "top_k": 50, + "max_new_tokens": 256, + "do_sample": True, + "seed": None, + }, + "task": "InferenceTask", + "mixture": False, + }, + ) + + if not stream: + # TODO: Non-streaming part is not tested. + raise NotImplementedError("Implement non-streaming mode") + text = result + total_latency = time.perf_counter() - start_time + total_approx_tokens = approximate_tokens(text) + status_code = 200 + else: + # If streaming, measure time-to-first-token and accumulate approximate tokens + async for chunk in result: + if hasattr(result, "response") and result.response is not None: + status_code = result.response.status_code + + delta_content = chunk.choices[0].delta.content + if delta_content: + if time_to_first_token is None: + time_to_first_token = time.perf_counter() - start_time + total_approx_tokens += approximate_tokens(delta_content) + + total_latency = time.perf_counter() - start_time + + if time_to_first_token is None: + time_to_first_token = total_latency + + if status_code is None: + status_code = 200 + + if total_latency > 0: + tokens_per_second = total_approx_tokens / total_latency + else: + tokens_per_second = 0.0 + + return { + "time_to_first_token": time_to_first_token, + "total_latency": total_latency, + "tokens_per_second": tokens_per_second, + "status_code": status_code, + "success": True, + "total_approx_tokens": total_approx_tokens, + } + + except HTTPStatusError as e: + return { + "time_to_first_token": None, + "total_latency": None, + "tokens_per_second": 0, + "status_code": e.response.status_code, + "success": False, + "total_approx_tokens": 0, + } + except Exception as e: + print(f"Unexpected error: {e}", file=sys.stderr) + return { + "time_to_first_token": None, + "total_latency": None, + "tokens_per_second": 0, + "status_code": 0, + "success": False, + "total_approx_tokens": 0, + } + + +async def run_stress_test( + api_key: str, output_dir: str, concurrent: int = 4, queries: int = 1000, url: str = "http://0.0.0.0:8005/v1" +): + """Run a stress test by sending concurrent API requests. + + Args: + api_key (str): API key for authentication. + output_dir (str): Directory to save outputs. + concurrent (int): Number of concurrent workers. + queries (int): Number of queries per worker. + url (str): API endpoint URL. + + Measures: + - Total successes and failures. + - Success rate. + - Cumulative failures over query execution time. + Saves to result.csv and generates a Plotly fail rate chart. + + Additionally, prints the total number of empty or errored queries to the console. + """ + client = openai.AsyncOpenAI( + base_url=url, + max_retries=0, + timeout=Timeout(120, connect=120, read=120), + api_key=api_key, + ) + client._client.headers["api-key"] = api_key + word = random.choice(word_list) + prompt = f"Write a short story about {word}." + + total_queries = concurrent * queries + success_count = 0 + fail_count = 0 + fail_list = [] + + print( + f"\nStarting stress test with {concurrent} concurrent workers, each performing {queries} queries (Total: {total_queries} queries)." + ) + + semaphore = asyncio.Semaphore(concurrent) + + async def worker(worker_id: int): + nonlocal success_count, fail_count + for i in range(queries): + async with semaphore: + if i % 10 == 0: + print(f"Worker {worker_id}: {i} / {queries}") + response = await make_completion(client, prompt=prompt, stream=True) + if response["success"] and response["total_approx_tokens"] > 0: + success_count += 1 + fail_list.append(0) + else: + fail_count += 1 + fail_list.append(1) + + # Launch all workers + workers = [asyncio.create_task(worker(w_id)) for w_id in range(concurrent)] + await asyncio.gather(*workers) + + # Calculate success rate + success_rate = (success_count / total_queries) * 100 if total_queries > 0 else 0.0 + + # Save result.csv + result_data = { + "concurrent": concurrent, + "queries": queries, + "success": success_count, + "fail": fail_count, + "success_rate": round(success_rate, 2), + } + + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + csv_file = output_dir / "result.csv" + with open(csv_file, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=["concurrent", "queries", "success", "fail", "success_rate"]) + writer.writeheader() + writer.writerow(result_data) + + print("\nSaved result.csv with the following data:") + print(result_data) + + print(f"Total number of empty or errored queries: {fail_count}") + + # Create DataFrame for plotting + fail_df = pd.DataFrame({"fail": fail_list}) + fail_df["cumulative_fail"] = fail_df["fail"].cumsum() + fail_df["query_number"] = fail_df.index + 1 + + # Generate the fail rate chart using Plotly with dark background + fig = go.Figure() + + fig.add_trace( + go.Scatter( + x=fail_df["query_number"], + y=fail_df["cumulative_fail"], + mode="lines", + name="Cumulative Failed/Empty Queries", + line=dict(color="red"), + hoverinfo="x+y", + ) + ) + + fig.update_layout( + template="plotly_dark", + title="Cumulative Failed/Empty Queries Over Time", + xaxis_title="Query Number", + yaxis_title="Number of Failed/Empty Queries", + autosize=False, + width=1200, + height=600, + ) + + # Save the plot to the output directory + plot_file = output_dir / "fail_rate_chart.png" + try: + fig.write_image(plot_file) + print(f"Saved fail rate chart to {plot_file}") + except Exception as e: + print(f"Failed to save fail rate chart as image: {e}") + + # Optionally display the figure + fig.show() + + +def parse_arguments(): + parser = argparse.ArgumentParser(description="Run a stress test against the specified API endpoint.") + + parser.add_argument( + "--key", + type=str, + # Specify your API key, current is left here just for local testings. + default="0566dbe21ee33bba9419549716cd6f1f", + help="API key for authentication (default: 0566dbe21ee33bba9419549716cd6f1f).", + ) + + parser.add_argument( + "--url", + type=str, + default="http://0.0.0.0:8005/v1", + help="URL of the API endpoint to test (default: http://0.0.0.0:8005/v1).", + ) + + parser.add_argument( + "--out", + type=str, + default="stress_test", + help="Output directory for storing test results (default: stress_test).", + ) + + parser.add_argument("--concurrent", type=int, default=4, help="Number of concurrent workers to query (default: 4).") + + parser.add_argument("--queries", type=int, default=1000, help="Number of queries per worker (default: 1000).") + + return parser.parse_args() + + +if __name__ == "__main__": + args = parse_arguments() + + asyncio.run( + run_stress_test( + api_key=args.key, + url=args.url, + output_dir=args.out, + concurrent=args.concurrent, + queries=args.queries, + ) + ) diff --git a/scripts/stress_test_api.py b/scripts/test_api_load.py similarity index 78% rename from scripts/stress_test_api.py rename to scripts/test_api_load.py index aabf0c747..350a33d3c 100644 --- a/scripts/stress_test_api.py +++ b/scripts/test_api_load.py @@ -1,8 +1,19 @@ +""" +Example usage: +python scripts/test_api_load.py --levels 4 --out stress_test --key API_KEY + +Results will be stored in stress_test/ + +Additional requirements: +pip install plotly kaleido +""" +import argparse import asyncio import csv import random import sys import time +from pathlib import Path import nltk import openai @@ -16,8 +27,7 @@ def approximate_tokens(text: str) -> int: - """ - Approximate the number of tokens from a given string. + """Approximate the number of tokens from a given string. Approach: - Split by whitespace to get a word count. @@ -29,13 +39,13 @@ def approximate_tokens(text: str) -> int: def get_color_for_code(code: int) -> str: - """ - Return a color string for a given status code. - - 200 -> green - - 4xx -> crimson - - 5xx -> darkred - - 0 -> firebrick (means unknown error in this script) - - else -> red + """Return a color string for a given status code. + + - 200 -> green + - 4xx -> crimson + - 5xx -> darkred + - 0 -> firebrick (means unknown error in this script) + - else -> red """ if code == 200: return "green" @@ -55,14 +65,15 @@ async def make_completion( stream: bool = True, seed: str = "1759348", ) -> dict: - """ - Make a completion request to the API, measuring: + """Make a completion request to the API. + + Measures: - Time to first token - Time to full response - Approximate tokens received - Status code - Returns a dictionary with these measurements. + Returns a dictionary with measurements. """ start_time = time.perf_counter() time_to_first_token = None @@ -126,7 +137,7 @@ async def make_completion( "tokens_per_second": tokens_per_second, "status_code": status_code, "success": True, - "total_approx_tokens": total_approx_tokens, # Added for empty response tracking + "total_approx_tokens": total_approx_tokens, } except HTTPStatusError as e: @@ -136,23 +147,22 @@ async def make_completion( "tokens_per_second": 0, "status_code": e.response.status_code, "success": False, - "total_approx_tokens": 0, # Assuming zero tokens on failure + "total_approx_tokens": 0, } except Exception as e: - # For other errors, we record status_code=0 - print(f"Unexpected error: {e}", file=sys.stderr) # Optional: Better error logging + print(f"Unexpected error: {e}", file=sys.stderr) return { "time_to_first_token": None, "total_latency": None, "tokens_per_second": 0, "status_code": 0, "success": False, - "total_approx_tokens": 0, # Assuming zero tokens on failure + "total_approx_tokens": 0, } -async def run_stress_test(api_key: str, url: str = "http://0.0.0.0:8005/v1"): - """Run a stress test by sending concurrent requests at levels 1, 2, 4, 8, etc. +async def run_stress_test(api_key: str, output_dir: str, levels: int = 10, url: str = "http://0.0.0.0:8005/v1"): + """Run a stress test by sending exponentially increasing amount of concurrent requests till `2**levels`. Measures: - Time to first token @@ -171,8 +181,7 @@ async def run_stress_test(api_key: str, url: str = "http://0.0.0.0:8005/v1"): client._client.headers["api-key"] = api_key word = random.choice(word_list) prompt = f"Write a short story about {word}." - # concurrency_levels = [2**i for i in range(0, 11)] - concurrency_levels = [2**i for i in range(0, 8)] + concurrency_levels = [2**i for i in range(0, levels)] results = [] for concurrency in concurrency_levels: @@ -238,7 +247,11 @@ async def run_stress_test(api_key: str, url: str = "http://0.0.0.0:8005/v1"): field_names = list(csv_rows[0].keys()) - with open("stress_test.csv", "w", newline="", encoding="utf-8") as f: + output_dir = Path(output_dir) + if not output_dir.exists(): + output_dir.mkdir(exist_ok=True, parents=True) + + with open(output_dir / "stress_test.csv", "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=field_names) writer.writeheader() writer.writerows(csv_rows) @@ -259,7 +272,7 @@ async def run_stress_test(api_key: str, url: str = "http://0.0.0.0:8005/v1"): ) fig1.update_xaxes(title_text="Concurrent Queries") fig1.update_yaxes(title_text="Avg. Tokens/Second (Approx.)") - fig1.write_image("stress_test_tokens_per_second.png") + fig1.write_image(output_dir / "stress_test_tokens_per_second.png") fig1.show() # 2. First-Token Latency vs. Concurrency. @@ -272,7 +285,7 @@ async def run_stress_test(api_key: str, url: str = "http://0.0.0.0:8005/v1"): ) fig2.update_xaxes(title_text="Concurrent Queries") fig2.update_yaxes(title_text="Avg. Latency to First Token (s)") - fig2.write_image("stress_test_first_token_latency.png") + fig2.write_image(output_dir / "stress_test_first_token_latency.png") fig2.show() # 3. Full Response Latency vs. Concurrency. @@ -285,7 +298,7 @@ async def run_stress_test(api_key: str, url: str = "http://0.0.0.0:8005/v1"): ) fig3.update_xaxes(title_text="Concurrent Queries") fig3.update_yaxes(title_text="Avg. Total Latency (s)") - fig3.write_image("stress_test_full_response_latency.png") + fig3.write_image(output_dir / "stress_test_full_response_latency.png") fig3.show() # 4. Status Code Counts vs. Concurrency. @@ -318,7 +331,7 @@ async def run_stress_test(api_key: str, url: str = "http://0.0.0.0:8005/v1"): ) fig4.update_xaxes(title_text="Concurrent Queries") fig4.update_yaxes(title_text="Count of Responses") - fig4.write_image("stress_test_status_codes.png") + fig4.write_image(output_dir / "stress_test_status_codes.png") fig4.show() # 5. Empty Responses vs. Concurrency. @@ -332,12 +345,50 @@ async def run_stress_test(api_key: str, url: str = "http://0.0.0.0:8005/v1"): ) fig5.update_xaxes(title_text="Concurrent Queries") fig5.update_yaxes(title_text="Count of Empty Responses") - fig5.write_image("stress_test_empty_responses.png") + fig5.write_image(output_dir / "stress_test_empty_responses.png") fig5.show() print("All plots saved to .png files and displayed.") +def parse_arguments(): + parser = argparse.ArgumentParser(description="Run a stress test against the specified API endpoint.") + + parser.add_argument( + "--key", + type=str, + # Specify your API key, current is left here just for local testings. + default="0566dbe21ee33bba9419549716cd6f1f", + help="API key for authentication.", + ) + + parser.add_argument( + "--url", + type=str, + default="http://0.0.0.0:8005/v1", + help="URL of the API endpoint to test (default: http://0.0.0.0:8005/v1).", + ) + + parser.add_argument( + "--out", + type=str, + default="stress_test", + help="Output directory for storing test results (default: stress_test).", + ) + + parser.add_argument("--levels", type=int, default=10, help="Number of stress test levels to execute (default: 10).") + + return parser.parse_args() + + if __name__ == "__main__": - # Replace api_key and url with appropriate values. - asyncio.run(run_stress_test(api_key="0566dbe21ee33bba9419549716cd6f1f", url="http://0.0.0.0:8005/v1")) + args = parse_arguments() + + asyncio.run( + run_stress_test( + api_key=args.key, + url=args.url, + output_dir=args.out, + levels=args.levels, + ) + ) diff --git a/shared/epistula.py b/shared/epistula.py index b2621b696..2ed2d6e07 100644 --- a/shared/epistula.py +++ b/shared/epistula.py @@ -1,5 +1,6 @@ import asyncio import json +import random import time from hashlib import sha256 from math import ceil @@ -16,8 +17,10 @@ from openai.types.chat.chat_completion_message import ChatCompletionMessage from substrateinterface import Keypair +from shared import settings from shared.dendrite import SynapseStreamResult -from shared.settings import shared_settings + +shared_settings = settings.shared_settings # from openai.types import Com @@ -207,6 +210,7 @@ async def make_openai_query( uid: int, stream: bool = False, ) -> tuple[ChatCompletion, list, list] | AsyncGenerator: + body["seed"] = body.get("seed", random.randint(0, 1000000)) axon_info = metagraph.axons[uid] miner = openai.AsyncOpenAI( base_url=f"http://{axon_info.ip}:{axon_info.port}/v1", diff --git a/shared/logging.py b/shared/logging.py index 038910cb7..8787d8526 100644 --- a/shared/logging.py +++ b/shared/logging.py @@ -13,9 +13,10 @@ import prompting from prompting.rewards.reward import WeightedRewardEvent from prompting.tasks.task_registry import TaskRegistry +from shared import settings from shared.dendrite import DendriteResponseEvent -from shared.settings import shared_settings +# TODO: Get rid of global variables. WANDB: Run @@ -71,29 +72,24 @@ def should_reinit_wandb(): current_time = datetime.now() elapsed_time = current_time - wandb_start_time # Check if more than 24 hours have passed - if elapsed_time > timedelta(hours=shared_settings.MAX_WANDB_DURATION): + if elapsed_time > timedelta(hours=settings.shared_settings.MAX_WANDB_DURATION): return True return False -def init_wandb(reinit=False, neuron: Literal["validator", "miner"] = "validator", custom_tags: list = []): +def init_wandb(reinit=False, neuron: Literal["validator", "miner", "api"] = "validator", custom_tags: list = []): """Starts a new wandb run.""" global WANDB tags = [ - f"Wallet: {shared_settings.WALLET.hotkey.ss58_address}", + f"Wallet: {settings.shared_settings.WALLET.hotkey.ss58_address}", f"Version: {prompting.__version__}", - # str(prompting.__spec_version__), - f"Netuid: {shared_settings.NETUID}", + f"Netuid: {settings.shared_settings.NETUID}", ] - if shared_settings.MOCK: + if settings.shared_settings.MOCK: tags.append("Mock") - if shared_settings.NEURON_DISABLE_SET_WEIGHTS: - tags.append("disable_set_weights") - tags += [ - f"Neuron UID: {shared_settings.METAGRAPH.hotkeys.index(shared_settings.WALLET.hotkey.ss58_address)}", - f"Time: {datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}", - ] + if settings.shared_settings.NEURON_DISABLE_SET_WEIGHTS: + tags.append("Disable weights set") tags += custom_tags @@ -102,26 +98,31 @@ def init_wandb(reinit=False, neuron: Literal["validator", "miner"] = "validator" task_list.append(task_config.task.__name__) wandb_config = { - "HOTKEY_SS58": shared_settings.WALLET.hotkey.ss58_address, - "NETUID": shared_settings.NETUID, + "HOTKEY_SS58": settings.shared_settings.WALLET.hotkey.ss58_address, + "NETUID": settings.shared_settings.NETUID, "wandb_start_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "TASKS": task_list, } - wandb.login(anonymous="allow", key=shared_settings.WANDB_API_KEY, verify=True) + wandb.login(anonymous="allow", key=settings.shared_settings.WANDB_API_KEY, verify=True) logger.info( - f"Logging in to wandb on entity: {shared_settings.WANDB_ENTITY} and project: {shared_settings.WANDB_PROJECT_NAME}" + f"Logging in to wandb on entity: {settings.shared_settings.WANDB_ENTITY} and project: " + f"{settings.shared_settings.WANDB_PROJECT_NAME}" ) + wandb_run_name = f"{neuron}{settings.shared_settings.UID}-{datetime.now().strftime('%Y%m%d_%H%M%S')}" + + # Initialize the wandb run with the custom name. WANDB = wandb.init( reinit=reinit, - project=shared_settings.WANDB_PROJECT_NAME, - entity=shared_settings.WANDB_ENTITY, - mode="offline" if shared_settings.WANDB_OFFLINE else "online", - dir=shared_settings.SAVE_PATH, + name=wandb_run_name, + project=settings.shared_settings.WANDB_PROJECT_NAME, + entity=settings.shared_settings.WANDB_ENTITY, + mode="offline" if settings.shared_settings.WANDB_OFFLINE else "online", + dir=settings.shared_settings.SAVE_PATH, tags=tags, - notes=shared_settings.WANDB_NOTES, + notes=settings.shared_settings.WANDB_NOTES, config=wandb_config, ) - signature = shared_settings.WALLET.hotkey.sign(WANDB.id.encode()).hex() + signature = settings.shared_settings.WALLET.hotkey.sign(WANDB.id.encode()).hex() wandb_config["SIGNATURE"] = signature WANDB.config.update(wandb_config) logger.success(f"Started a new wandb run {WANDB.name} ") @@ -159,16 +160,17 @@ class ValidatorLoggingEvent(BaseEvent): def __str__(self): sample_completions = [completion for completion in self.response_event.completions if len(completion) > 0] + forward_time = round(self.forward_time, 4) if self.forward_time else self.forward_time return f"""ValidatorLoggingEvent: Block: {self.block} Step: {self.step} - Step Time: {self.step_time} - forward_time: {self.forward_time} - task_id: {self.task_id} + Step time: {self.step_time:.4f} + Forward time: {forward_time} + Task id: {self.task_id} Number of total completions: {len(self.response_event.completions)} Number of non-empty completions: {len(sample_completions)} - Sample Completions: {sample_completions[:5]}... - """ + Sample 1 completion: {sample_completions[:1]} + """ class RewardLoggingEvent(BaseEvent): @@ -216,10 +218,10 @@ class MinerLoggingEvent(BaseEvent): def log_event(event: BaseEvent): - if not shared_settings.LOGGING_DONT_SAVE_EVENTS: + if not settings.shared_settings.LOGGING_DONT_SAVE_EVENTS: logger.info(f"{event}") - if shared_settings.WANDB_ON: + if settings.shared_settings.WANDB_ON: if should_reinit_wandb(): reinit_wandb() unpacked_event = unpack_events(event) @@ -240,7 +242,3 @@ def unpack_events(event: BaseEvent) -> dict[str, Any]: def convert_arrays_to_lists(data: dict) -> dict: return {key: value.tolist() if hasattr(value, "tolist") else value for key, value in data.items()} - - -if shared_settings.WANDB_ON and not shared_settings.MOCK: - init_wandb() diff --git a/shared/loop_runner.py b/shared/loop_runner.py index af63570ee..43d380bd4 100644 --- a/shared/loop_runner.py +++ b/shared/loop_runner.py @@ -60,7 +60,6 @@ def next_sync_point(self, current_time): async def wait_for_next_execution(self, last_run_time): """Wait until the next execution time, either synced or based on last run.""" current_time = await self.get_time() - logger.debug("Current time") if self.sync: next_run = self.next_sync_point(current_time) else: diff --git a/shared/profiling.py b/shared/profiling.py index 868f62d5f..c6ed8cd0e 100644 --- a/shared/profiling.py +++ b/shared/profiling.py @@ -23,7 +23,7 @@ def __init__(self): "thread_ids": set(), } ) - self.start_time = time.time() + self.start_time = time.perf_counter() self._active_measurements = set() self.process = psutil.Process() # Initialize process CPU times @@ -59,8 +59,8 @@ def measure(self, loop_name): async def print_stats(self): while True: - await asyncio.sleep(60) # Report every minute - total_runtime = time.time() - self.start_time + await asyncio.sleep(5 * 60) # Report every 5 minutes + total_runtime = time.perf_counter() - self.start_time logging.info("\n=== Loop Profiling Stats ===") logging.info(f"Total wall clock time: {total_runtime:.2f}s") diff --git a/shared/settings.py b/shared/settings.py index 8bb342caf..ab61340df 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -55,6 +55,7 @@ class SharedSettings(BaseSettings): # Neuron parameters. NEURON_TIMEOUT: int = Field(20, env="NEURON_TIMEOUT") + INFERENCE_TIMEOUT: int = Field(60, env="INFERENCE_TIMEOUT") NEURON_DISABLE_SET_WEIGHTS: bool = Field(False, env="NEURON_DISABLE_SET_WEIGHTS") NEURON_MOVING_AVERAGE_ALPHA: float = Field(0.1, env="NEURON_MOVING_AVERAGE_ALPHA") NEURON_DECAY_ALPHA: float = Field(0.001, env="NEURON_DECAY_ALPHA") @@ -86,13 +87,17 @@ class SharedSettings(BaseSettings): # ==== API ===== # API key used to access validator organic scoring mechanism (both .env.validator and .env.api). SCORING_KEY: str | None = Field(None, env="SCORING_KEY") + # Scoring request rate limit in seconds. + SCORING_RATE_LIMIT_SEC: float = Field(0.5, env="SCORING_RATE_LIMIT_SEC") + # Scoring queue threshold when rate-limit start to kick in, used to query validator API with scoring requests. + SCORING_QUEUE_API_THRESHOLD: int = Field(5, env="SCORING_QUEUE_API_THRESHOLD") # Validator scoring API (.env.validator). DEPLOY_SCORING_API: bool = Field(False, env="DEPLOY_SCORING_API") SCORING_API_PORT: int = Field(8094, env="SCORING_API_PORT") SCORING_ADMIN_KEY: str | None = Field(None, env="SCORING_ADMIN_KEY") SCORE_ORGANICS: bool = Field(False, env="SCORE_ORGANICS") - WORKERS: int = Field(4, env="WORKERS") + WORKERS: int = Field(2, env="WORKERS") # API Management (.env.api). API_PORT: int = Field(8005, env="API_PORT") diff --git a/shared/uids.py b/shared/uids.py index a8c99b5d2..00b1a22bf 100644 --- a/shared/uids.py +++ b/shared/uids.py @@ -4,7 +4,9 @@ import numpy as np from loguru import logger -from shared.settings import shared_settings +from shared import settings + +shared_settings = settings.shared_settings def check_uid_availability( diff --git a/test.md b/test.md new file mode 100644 index 000000000..e69de29bb diff --git a/validator_api/api.py b/validator_api/api.py index a56388d25..af80c43c4 100644 --- a/validator_api/api.py +++ b/validator_api/api.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import uvicorn from fastapi import FastAPI @@ -8,17 +9,29 @@ settings.shared_settings = settings.SharedSettings.load(mode="api") shared_settings = settings.shared_settings +from validator_api import scoring_queue from validator_api.api_management import router as api_management_router from validator_api.gpt_endpoints import router as gpt_router from validator_api.utils import update_miner_availabilities_for_api -app = FastAPI() + +@contextlib.asynccontextmanager +async def lifespan(app: FastAPI): + # Startup: start the background tasks. + background_task = asyncio.create_task(update_miner_availabilities_for_api.start()) + yield + background_task.cancel() + try: + await background_task + except asyncio.CancelledError: + pass + + +# Create the FastAPI app with the lifespan handler. +app = FastAPI(lifespan=lifespan) app.include_router(gpt_router, tags=["GPT Endpoints"]) app.include_router(api_management_router, tags=["API Management"]) -# TODO: This api requests miner availabilities from validator -# TODO: Forward the results from miners to the validator - @app.get("/health") async def health(): @@ -27,12 +40,15 @@ async def health(): async def main(): asyncio.create_task(update_miner_availabilities_for_api.start()) + asyncio.create_task(scoring_queue.scoring_queue.start()) uvicorn.run( - app, + "validator_api.api:app", host=shared_settings.API_HOST, port=shared_settings.API_PORT, log_level="debug", timeout_keep_alive=60, + workers=shared_settings.WORKERS, + reload=False, ) diff --git a/validator_api/api_management.py b/validator_api/api_management.py index 109efe57e..fa57f27e3 100644 --- a/validator_api/api_management.py +++ b/validator_api/api_management.py @@ -4,7 +4,9 @@ from fastapi import APIRouter, Depends, Header, HTTPException from loguru import logger -from shared.settings import shared_settings +from shared import settings + +shared_settings = settings.shared_settings router = APIRouter() @@ -26,7 +28,6 @@ def save_api_keys(api_keys): # Use lifespan to initialize API keys _keys = load_api_keys() logger.info(f"Loaded API keys: {_keys}") -save_api_keys(_keys) # Dependency to validate the admin key diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index 3d976cfb0..b44c2b769 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -11,7 +11,7 @@ from shared.epistula import make_openai_query from shared.settings import shared_settings from shared.uids import get_uids -from validator_api.utils import forward_response +from validator_api import scoring_queue async def peek_until_valid_chunk( @@ -143,7 +143,9 @@ async def stream_from_first_response( remaining = asyncio.gather(*pending, return_exceptions=True) remaining_tasks = asyncio.create_task(collect_remaining_responses(remaining, collected_chunks_list, body, uids)) await remaining_tasks - asyncio.create_task(forward_response(uids, body, collected_chunks_list)) + asyncio.create_task( + scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=collected_chunks_list) + ) except asyncio.CancelledError: logger.info("Client disconnected, streaming cancelled") diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index dc6d32a95..119937542 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -10,13 +10,16 @@ from openai.types.chat.chat_completion_chunk import ChatCompletionChunk, Choice, ChoiceDelta from starlette.responses import StreamingResponse +from shared import settings from shared.epistula import SynapseStreamResult, query_miners -from shared.settings import shared_settings +from validator_api import scoring_queue from validator_api.api_management import _keys from validator_api.chat_completion import chat_completion from validator_api.mixture_of_miners import mixture_of_miners from validator_api.test_time_inference import generate_response -from validator_api.utils import filter_available_uids, forward_response +from validator_api.utils import filter_available_uids + +shared_settings = settings.shared_settings router = APIRouter() N_MINERS = 5 @@ -95,14 +98,14 @@ async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] = raise HTTPException(status_code=500, detail="No miner responded successfully") chunks = [res.accumulated_chunks if res and res.accumulated_chunks else [] for res in stream_results] - asyncio.create_task(forward_response(uids=uids, body=body, chunks=chunks)) + asyncio.create_task(scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=chunks)) return loaded_results @router.post("/test_time_inference") async def test_time_inference(messages: list[dict], model: str = None): - async def create_response_stream(user_query): - async for steps, total_thinking_time in generate_response(user_query): + async def create_response_stream(messages): + async for steps, total_thinking_time in generate_response(messages): if total_thinking_time is not None: logger.info(f"**Total thinking time: {total_thinking_time:.2f} seconds**") yield steps, total_thinking_time @@ -110,10 +113,8 @@ async def create_response_stream(user_query): # Create a streaming response that yields each step async def stream_steps(): try: - query = messages[-1]["content"] - logger.info(f"Query: {query}") i = 0 - async for steps, thinking_time in create_response_stream(query): + async for steps, thinking_time in create_response_stream(messages): i += 1 yield "data: " + ChatCompletionChunk( id=str(uuid.uuid4()), diff --git a/validator_api/miner_availabilities.py b/validator_api/miner_availabilities.py index e9ae0dae6..c58d96c30 100644 --- a/validator_api/miner_availabilities.py +++ b/validator_api/miner_availabilities.py @@ -6,10 +6,11 @@ from loguru import logger from pydantic import BaseModel +from shared import settings from shared.loop_runner import AsyncLoopRunner -from shared.settings import shared_settings from shared.uids import get_uids +shared_settings = settings.shared_settings router = APIRouter() diff --git a/validator_api/mixture_of_miners.py b/validator_api/mixture_of_miners.py index 4326a01a0..8d797f38f 100644 --- a/validator_api/mixture_of_miners.py +++ b/validator_api/mixture_of_miners.py @@ -10,19 +10,6 @@ from shared.uids import get_uids from validator_api.chat_completion import chat_completion, get_response_from_miner -DEFAULT_SYSTEM_PROMPT = """You have been provided with a set of responses from various open-source models to the latest user query. -Your task is to synthesize these responses into a single, high-quality and concise response. -It is crucial to follow the provided instuctions or examples in the given prompt if any, and ensure the answer is in correct and expected format. -Critically evaluate the information provided in these responses, recognizing that some of it may be biased or incorrect. -Your response should not simply replicate the given answers but should offer a refined and accurate reply to the instruction. -Ensure your response is well-structured, coherent, and adheres to the highest standards of accuracy and reliability. -Responses from models:""" - -TASK_SYSTEM_PROMPT = { - None: DEFAULT_SYSTEM_PROMPT, - # Add more task-specific system prompts here. -} - NUM_MIXTURE_MINERS = 8 TOP_INCENTIVE_POOL = 100 @@ -52,7 +39,7 @@ async def mixture_of_miners(body: dict[str, any], uids: list[int]) -> tuple | St body_first_step = copy.deepcopy(body) body_first_step["stream"] = False - # Get multiple miners + # Get multiple minerss if not uids: uids = get_uids(sampling_mode="top_incentive", k=NUM_MIXTURE_MINERS) if len(uids) == 0: @@ -74,15 +61,16 @@ async def mixture_of_miners(body: dict[str, any], uids: list[int]) -> tuple | St # Extract completions from the responses. completions = ["".join(response[1]) for response in valid_responses if response and len(response) > 1] - task_name = body.get("task") - system_prompt = TASK_SYSTEM_PROMPT.get(task_name, DEFAULT_SYSTEM_PROMPT) - - # Aggregate responses into one system prompt. - agg_system_prompt = system_prompt + "\n" + "\n".join([f"{i+1}. {comp}" for i, comp in enumerate(completions)]) + logger.debug(f"Using Mixture of Miners with {len(completions)} miners") - # Prepare new messages with the aggregated system prompt. - new_messages = [{"role": "system", "content": agg_system_prompt}] - new_messages.extend([msg for msg in body["messages"] if msg["role"] != "system"]) + new_messages = body["messages"] + [ + { + "role": "assistant", + "content": "I have received the following responses from various models:\n" + + "\n".join([f"{i+1}. {comp}" for i, comp in enumerate(completions)]) + + "\nNow I will synthesize them into a single, high-quality and concise response to the user's query.", + }, + ] # Update the body with the new messages. final_body = copy.deepcopy(body) diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py new file mode 100644 index 000000000..493bc55a8 --- /dev/null +++ b/validator_api/scoring_queue.py @@ -0,0 +1,100 @@ +import asyncio +import datetime +from collections import deque +from typing import Any + +import httpx +from loguru import logger +from pydantic import BaseModel + +from shared import settings +from shared.loop_runner import AsyncLoopRunner + +shared_settings = settings.shared_settings + + +class ScoringPayload(BaseModel): + payload: dict[str, Any] + retries: int = 0 + + +class ScoringQueue(AsyncLoopRunner): + """Performs organic scoring every `interval` seconds.""" + + interval: float = shared_settings.SCORING_RATE_LIMIT_SEC + scoring_queue_threshold: int = shared_settings.SCORING_QUEUE_API_THRESHOLD + max_scoring_retries: int = 3 + _scoring_lock = asyncio.Lock() + _scoring_queue: deque[ScoringPayload] = deque() + + async def wait_for_next_execution(self, last_run_time) -> datetime.datetime: + """If scoring queue is small, execute immediately, otherwise wait until the next execution time.""" + async with self._scoring_lock: + if self.scoring_queue_threshold < self.size > 0: + # If scoring queue is small and non-empty, score immediately. + return datetime.datetime.now() + + return await super().wait_for_next_execution(last_run_time) + + async def run_step(self): + """Perform organic scoring: pop queued payload, forward to the validator API.""" + async with self._scoring_lock: + if not self._scoring_queue: + return + + scoring_payload = self._scoring_queue.popleft() + payload = scoring_payload.payload + uids = payload["uid"] + logger.info(f"Received new organic for scoring, uids: {uids}") + + url = f"http://{shared_settings.VALIDATOR_API}/scoring" + try: + timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.post( + url=url, + json=payload, + headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"}, + ) + if response.status_code != 200: + # Raise an exception so that the retry logic in the except block handles it. + raise Exception(f"Non-200 response: {response.status_code} for uids {uids}") + logger.info(f"Forwarding response completed with status {response.status_code}") + except Exception as e: + if scoring_payload.retries < self.max_scoring_retries: + scoring_payload.retries += 1 + async with self._scoring_lock: + self._scoring_queue.appendleft(scoring_payload) + logger.error(f"Tried to forward response to {url} with payload {payload}. Queued for retry") + else: + logger.exception(f"Error while forwarding response after {scoring_payload.retries} retries: {e}") + + async def append_response(self, uids: list[int], body: dict[str, Any], chunks: list[list[str]]): + if not shared_settings.SCORE_ORGANICS: + return + + if body.get("task") != "InferenceTask" and body.get("task") != "WebRetrievalTask": + logger.debug(f"Skipping forwarding for non-inference/web retrieval task: {body.get('task')}") + return + + uids = [int(u) for u in uids] + chunk_dict = {u: c for u, c in zip(uids, chunks)} + payload = {"body": body, "chunks": chunk_dict, "uid": uids} + scoring_item = ScoringPayload(payload=payload) + + async with self._scoring_lock: + self._scoring_queue.append(scoring_item) + + logger.info(f"Appended responses from uids {uids} into scoring queue with size: {self.size}") + logger.debug(f"Queued responses body: {body}, chunks: {chunks}") + + @property + def size(self) -> int: + return len(self._scoring_queue) + + def __len__(self) -> int: + return self.size + + +# TODO: Leaving it as a global var to make less architecture changes, refactor as DI. +scoring_queue = ScoringQueue() diff --git a/validator_api/test_time_inference.py b/validator_api/test_time_inference.py index 6a5d81d94..f9bbc0167 100644 --- a/validator_api/test_time_inference.py +++ b/validator_api/test_time_inference.py @@ -83,7 +83,7 @@ async def make_api_call(messages, max_tokens, model=None, is_final_answer=False) time.sleep(1) # Wait for 1 second before retrying -async def generate_response(prompt): +async def generate_response(original_messages: list[dict[str, str]]): messages = [ { "role": "system", @@ -120,7 +120,7 @@ async def generate_response(prompt): """, } ] - messages += [{"role": "user", "content": prompt}] + messages += original_messages messages += [ { "role": "assistant", diff --git a/validator_api/utils.py b/validator_api/utils.py index 85a001aef..d9e7a2714 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -1,11 +1,12 @@ -import httpx import requests from loguru import logger +from shared import settings from shared.loop_runner import AsyncLoopRunner -from shared.settings import shared_settings from shared.uids import get_uids +shared_settings = settings.shared_settings + class UpdateMinerAvailabilitiesForAPI(AsyncLoopRunner): miner_availabilities: dict[int, dict] = {} @@ -65,35 +66,3 @@ def filter_available_uids(task: str | None = None, model: str | None = None) -> filtered_uids.append(uid) return filtered_uids - - -# TODO: Modify this so that all the forwarded responses are sent in a single request. This is both more efficient but -# also means that on the validator side all responses are scored at once, speeding up the scoring process. -async def forward_response(uids: list[int], body: dict[str, any], chunks: list[list[str]]): - uids = [int(u) for u in uids] - chunk_dict = {u: c for u, c in zip(uids, chunks)} - logger.info(f"Forwarding response from uid {uids} to scoring with body: {body} and chunks: {chunks}") - if not shared_settings.SCORE_ORGANICS: - return - - if body.get("task") != "InferenceTask" and body.get("task") != "WebRetrievalTask": - logger.debug(f"Skipping forwarding for non- inference/web retrieval task: {body.get('task')}") - return - - url = f"http://{shared_settings.VALIDATOR_API}/scoring" - payload = {"body": body, "chunks": chunk_dict, "uid": uids} - try: - timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) - async with httpx.AsyncClient(timeout=timeout) as client: - response = await client.post( - url, json=payload, headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"} - ) - if response.status_code == 200: - logger.info(f"Forwarding response completed with status {response.status_code}") - else: - logger.exception( - f"Forwarding response uid {uids} failed with status {response.status_code} and payload {payload}" - ) - except Exception as e: - logger.error(f"Tried to forward response to {url} with payload {payload}") - logger.exception(f"Error while forwarding response: {e}")