From 1309210d5d473981c5fa17589ba48395dd95293e Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Tue, 20 May 2025 09:54:36 +0000 Subject: [PATCH] Fix API; bump vllm 0.8.5 --- README.md | 2 +- neurons/validator.py | 10 ++--- scripts/install.sh | 2 +- shared/misc.py | 13 +++---- shared/settings.py | 6 +-- validator_api/chat_completion.py | 18 ++------- .../deep_research/orchestrator_v2.py | 39 +++++++++---------- validator_api/scoring_queue.py | 4 +- validator_api/serializers.py | 8 ++++ 9 files changed, 46 insertions(+), 56 deletions(-) diff --git a/README.md b/README.md index 4f0cae728..e0740c607 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ Subnet 1 provides access to decentralized, competitive intelligence. As its spee By combining real‑time web search with LLM‑grade synthesis in a single, developer‑friendly endpoint, the service sits squarely at the intersection of two explosive trends — AI‑native application stacks and the API‑first economy. Demand spans customer-facing chatbots, autonomous agents, research copilots, and data‑driven decision dashboards, giving the platform both horizontal reach and strong pricing power. Early adopters report dramatic boosts in user engagement and analyst productivity, and investors view the recurring‑revenue potential as "SaaS‑plus," since the engine continuously improves with every query. In short, the product is meeting a surging, underserved need at precisely the right moment, positioning it for rapid, sustainable growth. ### Indicators that this subnet's value will rise -Subnet 1 provides access to decentralized, competitive intelligence. As its speed of inference continues to rise, and miner efficiencies compound, Apex is able to provide fast, flexible, and efficient inference at scale. With the market for AI applications only growing, the ability to provide fast, efficient inference while keeping cost per token low will become increasingly desirable, putting Apex in a strong position to benefit - and making it a strong investment proposition. +Subnet 1 provides access to decentralized, competitive intelligence. As its speed of inference continues to rise, and miner efficiencies compound, Apex is able to provide fast, flexible, and efficient inference at scale. With the market for AI applications only growing, the ability to provide fast, efficient inference while keeping cost per token low will become increasingly desirable, putting Apex in a strong position to benefit - and making it a strong investment proposition. By combining real‑time web search with LLM‑grade synthesis in a single, developer‑friendly endpoint, the service sits squarely at the intersection of two explosive trends — AI‑native application stacks and the API‑first economy. Demand spans customer-facing chatbots, autonomous agents, research copilots, and data‑driven decision dashboards, giving the platform both horizontal reach and strong pricing power. Early adopters report dramatic boosts in user engagement and analyst productivity, and investors view the recurring‑revenue potential as “SaaS‑plus,” since the engine continuously improves with every query. In short, the product is meeting a surging, underserved need at precisely the right moment, positioning it for rapid, sustainable growth. diff --git a/neurons/validator.py b/neurons/validator.py index b7a5bea5f..f9a26afa6 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -16,11 +16,13 @@ from bittensor.core.extrinsics.serving import serve_extrinsic from loguru import logger -from prompting.llms.model_manager import AsyncModelScheduler, ModelManager -from prompting.rewards.scoring import task_scorer - # ruff: noqa: E402 from shared import settings + +settings.shared_settings = settings.SharedSettings.load(mode="validator") + +from prompting.llms.model_manager import AsyncModelScheduler, ModelManager +from prompting.rewards.scoring import task_scorer from shared.logging import init_wandb @@ -44,8 +46,6 @@ def init_process_logging(name: str): print(f"Failed to initialize logging for process {os.getpid()}: {e}") -settings.shared_settings = settings.SharedSettings.load(mode="validator") - from prompting.llms.utils import GPUInfo logger.remove() diff --git a/scripts/install.sh b/scripts/install.sh index f7f8f76d2..ab32a7e17 100644 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -8,7 +8,7 @@ poetry config virtualenvs.in-project true # Install the project dependencies poetry install --extras "validator" --no-cache -poetry run pip install vllm==0.8.3 +poetry run pip install vllm==0.8.5 # Check if jq is installed and install it if not if ! command -v jq &> /dev/null diff --git a/shared/misc.py b/shared/misc.py index 013949a33..020a84b05 100644 --- a/shared/misc.py +++ b/shared/misc.py @@ -1,6 +1,5 @@ import asyncio import functools -import subprocess import time import traceback from functools import lru_cache, update_wrapper @@ -175,12 +174,10 @@ def wrapper(self): return decorator -def is_cuda_available(): +def is_cuda_available() -> bool: try: - # Run nvidia-smi to list available GPUs - result = subprocess.run( - ["nvidia-smi", "-L"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, check=True - ) - return "GPU" in result.stdout - except (subprocess.CalledProcessError, FileNotFoundError): + import torch + + return torch.cuda.is_available() + except ImportError: return False diff --git a/shared/settings.py b/shared/settings.py index fc4535547..6d662cddb 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -67,6 +67,7 @@ class SharedSettings(BaseSettings): # Neuron parameters. NEURON_TIMEOUT: int = Field(20, env="NEURON_TIMEOUT") INFERENCE_TIMEOUT: int = Field(30, env="INFERENCE_TIMEOUT") + MAX_TIMEOUT: int = Field(240, env="INFERENCE_TIMEOUT") NEURON_DISABLE_SET_WEIGHTS: bool = Field(False, env="NEURON_DISABLE_SET_WEIGHTS") NEURON_MOVING_AVERAGE_ALPHA: float = Field(0.1, env="NEURON_MOVING_AVERAGE_ALPHA") NEURON_DECAY_ALPHA: float = Field(0.001, env="NEURON_DECAY_ALPHA") @@ -252,7 +253,7 @@ def complete_settings(cls, values: dict[str, Any]) -> dict[str, Any]: raise ValueError("NETUID must be specified") values["TEST"] = netuid != 1 if values.get("TEST_MINER_IDS"): - values["TEST_MINER_IDS"] = str(values["TEST_MINER_IDS"]).split(",") + values["TEST_MINER_IDS"] = values["TEST_MINER_IDS"] if mode == "mock": values["MOCK"] = True values["NEURON_DEVICE"] = "cpu" @@ -331,5 +332,4 @@ def block(self) -> int: pass except Exception as e: logger.exception(f"Error loading settings: {e}") - shared_settings = None -logger.info("Shared settings loaded.") +shared_settings = None diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index fce0aabe2..a4be428ed 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -1,6 +1,5 @@ import asyncio import json -import math import random import time from typing import Any, AsyncGenerator, Optional @@ -182,26 +181,15 @@ async def chat_completion( uids = random.sample(uids, min(len(uids), num_miners)) STREAM = body.get("stream", False) + timeout_seconds = body.get("timeout", shared_settings.INFERENCE_TIMEOUT) + timeout_seconds = max(timeout_seconds, shared_settings.INFERENCE_TIMEOUT) + timeout_seconds = min(timeout_seconds, shared_settings.MAX_TIMEOUT) # Initialize chunks collection for each miner collected_chunks_list = [[] for _ in uids] collected_chunks_raw_list = [[] for _ in uids] timings_list = [[] for _ in uids] - timeout_seconds = max( - 30, - max( - 0, - math.floor( - math.log2( - body.get("sampling_parameters", shared_settings.SAMPLING_PARAMS).get("max_new_tokens", 256) / 256 - ) - ), - ) - * 10 - + 30, - ) - if STREAM: # Create tasks for all miners response_tasks = [ diff --git a/validator_api/deep_research/orchestrator_v2.py b/validator_api/deep_research/orchestrator_v2.py index f5b4fe4af..3e39680c7 100644 --- a/validator_api/deep_research/orchestrator_v2.py +++ b/validator_api/deep_research/orchestrator_v2.py @@ -14,6 +14,8 @@ from validator_api.serializers import CompletionsRequest, WebRetrievalRequest from validator_api.web_retrieval import web_retrieval +STEP_MAX_RETRIES = 10 + def make_chunk(text): chunk = json.dumps({"choices": [{"delta": {"content": text}}]}) @@ -61,14 +63,15 @@ async def search_web(question: str, n_results: int = 2, completions=None) -> dic ) # Perform web search - for i in range(3): + search_results = None + for i in range(STEP_MAX_RETRIES): try: search_results = await web_retrieval(WebRetrievalRequest(search_query=optimized_query, n_results=n_results)) if search_results.results: break except BaseException: logger.warning(f"Try {i+1} failed") - if not search_results.results: + if search_results is None or not search_results.results: search_results = {"results": []} # Generate referenced answer @@ -119,7 +122,7 @@ async def search_web(question: str, n_results: int = 2, completions=None) -> dic @retry( - stop=stop_after_attempt(3), + stop=stop_after_attempt(STEP_MAX_RETRIES), wait=wait_exponential(multiplier=1, min=2, max=5), retry=retry_if_exception_type(json.JSONDecodeError), ) @@ -139,7 +142,7 @@ async def make_mistral_request_with_json( @retry( - stop=stop_after_attempt(3), + stop=stop_after_attempt(STEP_MAX_RETRIES), wait=wait_exponential(multiplier=1, min=2, max=5), retry=retry_if_exception_type(BaseException), ) @@ -151,24 +154,24 @@ async def make_mistral_request( model = "mrfakename/mistral-small-3.1-24b-instruct-2503-hf" temperature = 0.15 top_p = 1 - max_tokens = 128000 + max_tokens = 8192 sample_params = { "top_p": top_p, "max_tokens": max_tokens, "temperature": temperature, "do_sample": False, } - logger.info(f"Making request to Mistral API with model: {model}") request = CompletionsRequest( messages=messages, model=model, stream=True, sampling_parameters=sample_params, + timeout=90, ) # Iterate over the response then collect the content response = await completions(request) response_content = await extract_content_from_stream(response) - logger.info(f"Response content: {response_content}") + logger.debug(f"Response content: {response_content}") if not response_content: raise ValueError(f"No response content received from Mistral API, response: {response}") if "Error" in response_content: @@ -288,12 +291,7 @@ def __init__(self, **data): async def assess_question_suitability( self, question: str, completions: Callable[[CompletionsRequest], Awaitable[StreamingResponse]] = None ) -> dict: - logger.info(f"assess_question_suitability: {question}") - logger.info(f"completions: {completions}") - logger.info(f"self.completions: {self.completions}") - - """ - Assesses whether a question is suitable for deep research or if it can be answered directly. + """Assess whether a question is suitable for deep research or if it can be answered directly. Args: question: The user's question to assess @@ -304,7 +302,7 @@ async def assess_question_suitability( - reason: Explanation of the assessment - direct_answer: Simple answer if question doesn't need deep research """ - + logger.debug(f"Assess question suitability: {question}") assessment_prompt = f"""You are part of Apex, a Deep Research Assistant. Your purpose is to assess whether a question is suitable for deep research or if it can be answered directly. The current date and time is {get_current_datetime_str()}. Task: @@ -360,7 +358,7 @@ async def assess_question_suitability( "direct_answer": None, } - @with_retries(max_retries=3) + @with_retries(max_retries=STEP_MAX_RETRIES) async def plan_tool_executions(self) -> list[ToolRequest]: """Uses mistral LLM to plan which tools to execute for the current step""" logger.info(f"Planning tool executions for step {self.current_step}") @@ -467,7 +465,6 @@ async def run(self, messages): # Always take the last user message as the question question = messages[-1]["content"] - logger.info(f"self.completions: {self.completions}") # First assess if the question is suitable for deep research question_assessment = await self.assess_question_suitability(question, self.completions) @@ -485,7 +482,7 @@ async def run(self, messages): for step in range(self.max_steps): self.current_step = step + 1 - logger.info(f"Step {step + 1}/{self.max_steps}") + logger.debug(f"Step {step + 1}/{self.max_steps}") # Plan and execute tools for this step yield make_chunk(f"\n## Step {step + 1}: Planning Tools\n") @@ -517,7 +514,7 @@ async def run(self, messages): yield make_chunk(f"\n# Final Answer\n{final_answer}\n") yield "data: [DONE]\n\n" - @with_retries(max_retries=3) + @with_retries(max_retries=STEP_MAX_RETRIES) async def generate_todo_list(self): """Uses mistral LLM to generate a todo list for the Chain of Thought process""" logger.info("Generating initial todo list") @@ -555,7 +552,7 @@ async def generate_todo_list(self): self.todo_list = response return self.todo_list - @with_retries(max_retries=3) + @with_retries(max_retries=STEP_MAX_RETRIES) async def do_thinking(self) -> Step: """Uses mistral LLM to generate thinking/reasoning tokens in line with the todo list""" logger.info(f"Analyzing step {self.current_step}") @@ -612,7 +609,7 @@ async def do_thinking(self) -> Step: logger.error(f"Missing required key in thinking output: {e}") raise - @with_retries(max_retries=3) + @with_retries(max_retries=STEP_MAX_RETRIES) async def update_todo_list(self): """Uses mistral LLM to update the todo list based on the steps taken""" logger.info("Updating todo list") @@ -677,7 +674,7 @@ async def update_todo_list(self): logger.error(f"Missing required key in updated todo list: {e}") raise - @with_retries(max_retries=3) + @with_retries(max_retries=STEP_MAX_RETRIES) async def generate_final_answer(self): """Uses mistral LLM to generate a final answer to the user's request""" logger.info("Generating final answer") diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 1c41a2b36..84422f428 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -59,11 +59,11 @@ async def run_step(self): f"Trying to score organic from {scoring_payload.date}, uids: {uids}. " f"Queue size: {len(self._scoring_queue)}" ) - validators: list[Validator] = [] + validators: dict[int, Validator] = {} try: if shared_settings.OVERRIDE_AVAILABLE_AXONS: for idx, vali_axon in enumerate(shared_settings.OVERRIDE_AVAILABLE_AXONS): - validators.append(Validator(uid=-idx, axon=vali_axon, hotkey=shared_settings.API_HOTKEY, stake=1e6)) + validators[-idx] = Validator(uid=-idx, axon=vali_axon, hotkey=shared_settings.API_HOTKEY, stake=1e6) else: validators = await validator_registry.get_available_axons(balance=shared_settings.API_ENABLE_BALANCE) except Exception as e: diff --git a/validator_api/serializers.py b/validator_api/serializers.py index 852323d29..363a345b4 100644 --- a/validator_api/serializers.py +++ b/validator_api/serializers.py @@ -3,8 +3,11 @@ from pydantic import BaseModel, Field, model_validator +from shared import settings from validator_api.job_store import JobStatus +shared_settings = settings.shared_settings + class CompletionsRequest(BaseModel): """Request model for the /v1/chat/completions endpoint.""" @@ -83,6 +86,11 @@ def add_tools(self): } ], ) + timeout: Optional[int] = Field( + default=shared_settings.INFERENCE_TIMEOUT, + description="Timeout in seconds for the request. If not provided, a default timeout will be used.", + example=30, + ) class WebRetrievalRequest(BaseModel):