Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Subnet 1 provides access to decentralized, competitive intelligence. As its spee
By combining real‑time web search with LLM‑grade synthesis in a single, developer‑friendly endpoint, the service sits squarely at the intersection of two explosive trends — AI‑native application stacks and the API‑first economy. Demand spans customer-facing chatbots, autonomous agents, research copilots, and data‑driven decision dashboards, giving the platform both horizontal reach and strong pricing power. Early adopters report dramatic boosts in user engagement and analyst productivity, and investors view the recurring‑revenue potential as "SaaS‑plus," since the engine continuously improves with every query. In short, the product is meeting a surging, underserved need at precisely the right moment, positioning it for rapid, sustainable growth.

### Indicators that this subnet's value will rise
Subnet 1 provides access to decentralized, competitive intelligence. As its speed of inference continues to rise, and miner efficiencies compound, Apex is able to provide fast, flexible, and efficient inference at scale. With the market for AI applications only growing, the ability to provide fast, efficient inference while keeping cost per token low will become increasingly desirable, putting Apex in a strong position to benefit - and making it a strong investment proposition.
Subnet 1 provides access to decentralized, competitive intelligence. As its speed of inference continues to rise, and miner efficiencies compound, Apex is able to provide fast, flexible, and efficient inference at scale. With the market for AI applications only growing, the ability to provide fast, efficient inference while keeping cost per token low will become increasingly desirable, putting Apex in a strong position to benefit - and making it a strong investment proposition.

By combining real‑time web search with LLM‑grade synthesis in a single, developer‑friendly endpoint, the service sits squarely at the intersection of two explosive trends — AI‑native application stacks and the API‑first economy. Demand spans customer-facing chatbots, autonomous agents, research copilots, and data‑driven decision dashboards, giving the platform both horizontal reach and strong pricing power. Early adopters report dramatic boosts in user engagement and analyst productivity, and investors view the recurring‑revenue potential as “SaaS‑plus,” since the engine continuously improves with every query. In short, the product is meeting a surging, underserved need at precisely the right moment, positioning it for rapid, sustainable growth.

Expand Down
10 changes: 5 additions & 5 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
from bittensor.core.extrinsics.serving import serve_extrinsic
from loguru import logger

from prompting.llms.model_manager import AsyncModelScheduler, ModelManager
from prompting.rewards.scoring import task_scorer

# ruff: noqa: E402
from shared import settings

settings.shared_settings = settings.SharedSettings.load(mode="validator")

from prompting.llms.model_manager import AsyncModelScheduler, ModelManager
from prompting.rewards.scoring import task_scorer
from shared.logging import init_wandb


Expand All @@ -44,8 +46,6 @@ def init_process_logging(name: str):
print(f"Failed to initialize logging for process {os.getpid()}: {e}")


settings.shared_settings = settings.SharedSettings.load(mode="validator")

from prompting.llms.utils import GPUInfo

logger.remove()
Expand Down
2 changes: 1 addition & 1 deletion scripts/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ poetry config virtualenvs.in-project true

# Install the project dependencies
poetry install --extras "validator" --no-cache
poetry run pip install vllm==0.8.3
poetry run pip install vllm==0.8.5

# Check if jq is installed and install it if not
if ! command -v jq &> /dev/null
Expand Down
13 changes: 5 additions & 8 deletions shared/misc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import functools
import subprocess
import time
import traceback
from functools import lru_cache, update_wrapper
Expand Down Expand Up @@ -175,12 +174,10 @@ def wrapper(self):
return decorator


def is_cuda_available():
def is_cuda_available() -> bool:
try:
# Run nvidia-smi to list available GPUs
result = subprocess.run(
["nvidia-smi", "-L"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, check=True
)
return "GPU" in result.stdout
except (subprocess.CalledProcessError, FileNotFoundError):
import torch

return torch.cuda.is_available()
except ImportError:
return False
6 changes: 3 additions & 3 deletions shared/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class SharedSettings(BaseSettings):
# Neuron parameters.
NEURON_TIMEOUT: int = Field(20, env="NEURON_TIMEOUT")
INFERENCE_TIMEOUT: int = Field(30, env="INFERENCE_TIMEOUT")
MAX_TIMEOUT: int = Field(240, env="INFERENCE_TIMEOUT")
NEURON_DISABLE_SET_WEIGHTS: bool = Field(False, env="NEURON_DISABLE_SET_WEIGHTS")
NEURON_MOVING_AVERAGE_ALPHA: float = Field(0.1, env="NEURON_MOVING_AVERAGE_ALPHA")
NEURON_DECAY_ALPHA: float = Field(0.001, env="NEURON_DECAY_ALPHA")
Expand Down Expand Up @@ -252,7 +253,7 @@ def complete_settings(cls, values: dict[str, Any]) -> dict[str, Any]:
raise ValueError("NETUID must be specified")
values["TEST"] = netuid != 1
if values.get("TEST_MINER_IDS"):
values["TEST_MINER_IDS"] = str(values["TEST_MINER_IDS"]).split(",")
values["TEST_MINER_IDS"] = values["TEST_MINER_IDS"]
if mode == "mock":
values["MOCK"] = True
values["NEURON_DEVICE"] = "cpu"
Expand Down Expand Up @@ -331,5 +332,4 @@ def block(self) -> int:
pass
except Exception as e:
logger.exception(f"Error loading settings: {e}")
shared_settings = None
logger.info("Shared settings loaded.")
shared_settings = None
18 changes: 3 additions & 15 deletions validator_api/chat_completion.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import json
import math
import random
import time
from typing import Any, AsyncGenerator, Optional
Expand Down Expand Up @@ -182,26 +181,15 @@ async def chat_completion(
uids = random.sample(uids, min(len(uids), num_miners))

STREAM = body.get("stream", False)
timeout_seconds = body.get("timeout", shared_settings.INFERENCE_TIMEOUT)
timeout_seconds = max(timeout_seconds, shared_settings.INFERENCE_TIMEOUT)
timeout_seconds = min(timeout_seconds, shared_settings.MAX_TIMEOUT)

# Initialize chunks collection for each miner
collected_chunks_list = [[] for _ in uids]
collected_chunks_raw_list = [[] for _ in uids]
timings_list = [[] for _ in uids]

timeout_seconds = max(
30,
max(
0,
math.floor(
math.log2(
body.get("sampling_parameters", shared_settings.SAMPLING_PARAMS).get("max_new_tokens", 256) / 256
)
),
)
* 10
+ 30,
)

if STREAM:
# Create tasks for all miners
response_tasks = [
Expand Down
39 changes: 18 additions & 21 deletions validator_api/deep_research/orchestrator_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from validator_api.serializers import CompletionsRequest, WebRetrievalRequest
from validator_api.web_retrieval import web_retrieval

STEP_MAX_RETRIES = 10


def make_chunk(text):
chunk = json.dumps({"choices": [{"delta": {"content": text}}]})
Expand Down Expand Up @@ -61,14 +63,15 @@ async def search_web(question: str, n_results: int = 2, completions=None) -> dic
)

# Perform web search
for i in range(3):
search_results = None
for i in range(STEP_MAX_RETRIES):
try:
search_results = await web_retrieval(WebRetrievalRequest(search_query=optimized_query, n_results=n_results))
if search_results.results:
break
except BaseException:
logger.warning(f"Try {i+1} failed")
if not search_results.results:
if search_results is None or not search_results.results:
search_results = {"results": []}

# Generate referenced answer
Expand Down Expand Up @@ -119,7 +122,7 @@ async def search_web(question: str, n_results: int = 2, completions=None) -> dic


@retry(
stop=stop_after_attempt(3),
stop=stop_after_attempt(STEP_MAX_RETRIES),
wait=wait_exponential(multiplier=1, min=2, max=5),
retry=retry_if_exception_type(json.JSONDecodeError),
)
Expand All @@ -139,7 +142,7 @@ async def make_mistral_request_with_json(


@retry(
stop=stop_after_attempt(3),
stop=stop_after_attempt(STEP_MAX_RETRIES),
wait=wait_exponential(multiplier=1, min=2, max=5),
retry=retry_if_exception_type(BaseException),
)
Expand All @@ -151,24 +154,24 @@ async def make_mistral_request(
model = "mrfakename/mistral-small-3.1-24b-instruct-2503-hf"
temperature = 0.15
top_p = 1
max_tokens = 128000
max_tokens = 8192
sample_params = {
"top_p": top_p,
"max_tokens": max_tokens,
"temperature": temperature,
"do_sample": False,
}
logger.info(f"Making request to Mistral API with model: {model}")
request = CompletionsRequest(
messages=messages,
model=model,
stream=True,
sampling_parameters=sample_params,
timeout=90,
)
# Iterate over the response then collect the content
response = await completions(request)
response_content = await extract_content_from_stream(response)
logger.info(f"Response content: {response_content}")
logger.debug(f"Response content: {response_content}")
if not response_content:
raise ValueError(f"No response content received from Mistral API, response: {response}")
if "Error" in response_content:
Expand Down Expand Up @@ -288,12 +291,7 @@ def __init__(self, **data):
async def assess_question_suitability(
self, question: str, completions: Callable[[CompletionsRequest], Awaitable[StreamingResponse]] = None
) -> dict:
logger.info(f"assess_question_suitability: {question}")
logger.info(f"completions: {completions}")
logger.info(f"self.completions: {self.completions}")

"""
Assesses whether a question is suitable for deep research or if it can be answered directly.
"""Assess whether a question is suitable for deep research or if it can be answered directly.

Args:
question: The user's question to assess
Expand All @@ -304,7 +302,7 @@ async def assess_question_suitability(
- reason: Explanation of the assessment
- direct_answer: Simple answer if question doesn't need deep research
"""

logger.debug(f"Assess question suitability: {question}")
assessment_prompt = f"""You are part of Apex, a Deep Research Assistant. Your purpose is to assess whether a question is suitable for deep research or if it can be answered directly. The current date and time is {get_current_datetime_str()}.

Task:
Expand Down Expand Up @@ -360,7 +358,7 @@ async def assess_question_suitability(
"direct_answer": None,
}

@with_retries(max_retries=3)
@with_retries(max_retries=STEP_MAX_RETRIES)
async def plan_tool_executions(self) -> list[ToolRequest]:
"""Uses mistral LLM to plan which tools to execute for the current step"""
logger.info(f"Planning tool executions for step {self.current_step}")
Expand Down Expand Up @@ -467,7 +465,6 @@ async def run(self, messages):

# Always take the last user message as the question
question = messages[-1]["content"]
logger.info(f"self.completions: {self.completions}")
# First assess if the question is suitable for deep research
question_assessment = await self.assess_question_suitability(question, self.completions)

Expand All @@ -485,7 +482,7 @@ async def run(self, messages):

for step in range(self.max_steps):
self.current_step = step + 1
logger.info(f"Step {step + 1}/{self.max_steps}")
logger.debug(f"Step {step + 1}/{self.max_steps}")

# Plan and execute tools for this step
yield make_chunk(f"\n## Step {step + 1}: Planning Tools\n")
Expand Down Expand Up @@ -517,7 +514,7 @@ async def run(self, messages):
yield make_chunk(f"\n# Final Answer\n{final_answer}\n")
yield "data: [DONE]\n\n"

@with_retries(max_retries=3)
@with_retries(max_retries=STEP_MAX_RETRIES)
async def generate_todo_list(self):
"""Uses mistral LLM to generate a todo list for the Chain of Thought process"""
logger.info("Generating initial todo list")
Expand Down Expand Up @@ -555,7 +552,7 @@ async def generate_todo_list(self):
self.todo_list = response
return self.todo_list

@with_retries(max_retries=3)
@with_retries(max_retries=STEP_MAX_RETRIES)
async def do_thinking(self) -> Step:
"""Uses mistral LLM to generate thinking/reasoning tokens in line with the todo list"""
logger.info(f"Analyzing step {self.current_step}")
Expand Down Expand Up @@ -612,7 +609,7 @@ async def do_thinking(self) -> Step:
logger.error(f"Missing required key in thinking output: {e}")
raise

@with_retries(max_retries=3)
@with_retries(max_retries=STEP_MAX_RETRIES)
async def update_todo_list(self):
"""Uses mistral LLM to update the todo list based on the steps taken"""
logger.info("Updating todo list")
Expand Down Expand Up @@ -677,7 +674,7 @@ async def update_todo_list(self):
logger.error(f"Missing required key in updated todo list: {e}")
raise

@with_retries(max_retries=3)
@with_retries(max_retries=STEP_MAX_RETRIES)
async def generate_final_answer(self):
"""Uses mistral LLM to generate a final answer to the user's request"""
logger.info("Generating final answer")
Expand Down
4 changes: 2 additions & 2 deletions validator_api/scoring_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ async def run_step(self):
f"Trying to score organic from {scoring_payload.date}, uids: {uids}. "
f"Queue size: {len(self._scoring_queue)}"
)
validators: list[Validator] = []
validators: dict[int, Validator] = {}
try:
if shared_settings.OVERRIDE_AVAILABLE_AXONS:
for idx, vali_axon in enumerate(shared_settings.OVERRIDE_AVAILABLE_AXONS):
validators.append(Validator(uid=-idx, axon=vali_axon, hotkey=shared_settings.API_HOTKEY, stake=1e6))
validators[-idx] = Validator(uid=-idx, axon=vali_axon, hotkey=shared_settings.API_HOTKEY, stake=1e6)
else:
validators = await validator_registry.get_available_axons(balance=shared_settings.API_ENABLE_BALANCE)
except Exception as e:
Expand Down
8 changes: 8 additions & 0 deletions validator_api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

from pydantic import BaseModel, Field, model_validator

from shared import settings
from validator_api.job_store import JobStatus

shared_settings = settings.shared_settings


class CompletionsRequest(BaseModel):
"""Request model for the /v1/chat/completions endpoint."""
Expand Down Expand Up @@ -83,6 +86,11 @@ def add_tools(self):
}
],
)
timeout: Optional[int] = Field(
default=shared_settings.INFERENCE_TIMEOUT,
description="Timeout in seconds for the request. If not provided, a default timeout will be used.",
example=30,
)


class WebRetrievalRequest(BaseModel):
Expand Down
Loading