Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions prompting/api/scoring/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ async def score_response(
seed=int(body.get("seed", 0)),
sampling_params=body.get("sampling_parameters", shared_settings.SAMPLING_PARAMS),
query=body.get("messages"),
timeout=body.get("timeout", shared_settings.INFERENCE_TIMEOUT),
organic=True,
)
task_scorer.add_to_queue(
Expand Down Expand Up @@ -115,6 +116,7 @@ async def score_response(
query=search_term,
target_results=body.get("target_results", 1),
timeout=body.get("timeout", 10),
organic=True,
),
response=DendriteResponseEvent(
uids=uids,
Expand Down
2 changes: 1 addition & 1 deletion prompting/tasks/task_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
class TaskLoop(AsyncLoopRunner):
is_running: bool = False
thread: threading.Thread = None
interval: int = 1
interval: int = 20
task_queue: list | None = []
scoring_queue: list | None = []
miners_dict: dict | None = None
Expand Down
4 changes: 4 additions & 0 deletions shared/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ class SharedSettings(BaseSettings):
API_TEST_MODE: bool = Field(False, env="API_TEST_MODE")
API_UIDS_EXPLORE: float = Field(0.2, env="API_UIDS_EXPLORE")
API_TOP_MINERS_SAMPLE: int = Field(400, env="API_TOP_MINERS_SAMPLE")
API_TOP_MINERS_TO_STREAM: int = Field(10, env="API_TOP_MINERS_TO_STREAM")
API_MIN_MINERS_TO_SAMPLE: int = Field(50, env="API_MIN_UIDS_TO_SAMPLE")
OVERRIDE_AVAILABLE_AXONS: list[str] | None = Field(None, env="OVERRIDE_AVAILABLE_AXONS")
API_ENABLE_BALANCE: bool = Field(True, env="API_ENABLE_BALANCE")

# Validator scoring API (.env.validator).
SCORE_ORGANICS: bool = Field(False, env="SCORE_ORGANICS")
Expand Down
20 changes: 12 additions & 8 deletions validator_api/chat_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,18 @@ async def _collector(idx: int, resp_task: asyncio.Task) -> None:

async def get_response_from_miner(body: dict[str, any], uid: int, timeout_seconds: int) -> tuple:
"""Get response from a single miner."""
return await make_openai_query(
metagraph=shared_settings.METAGRAPH,
wallet=shared_settings.WALLET,
body=body,
uid=uid,
stream=False,
timeout_seconds=timeout_seconds,
)
try:
return await make_openai_query(
metagraph=shared_settings.METAGRAPH,
wallet=shared_settings.WALLET,
body=body,
uid=uid,
stream=False,
timeout_seconds=timeout_seconds,
)
except BaseException as e:
logger.warning(f"Error getting response from miner {uid}: {e}")
return None


async def chat_completion(
Expand Down
11 changes: 6 additions & 5 deletions validator_api/gpt_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from validator_api.utils import filter_available_uids

router = APIRouter()
N_MINERS = 10


@router.post(
Expand Down Expand Up @@ -88,7 +87,7 @@ async def completions(request: CompletionsRequest, api_key: str = Depends(valida
task=body.get("task"),
model=body.get("model"),
test=shared_settings.API_TEST_MODE,
n_miners=N_MINERS,
n_miners=shared_settings.API_TOP_MINERS_TO_STREAM,
n_top_incentive=shared_settings.API_TOP_MINERS_SAMPLE,
explore=shared_settings.API_UIDS_EXPLORE,
)
Expand Down Expand Up @@ -176,8 +175,7 @@ async def create_response_stream(request):
async def submit_chain_of_thought_job(
request: CompletionsRequest, background_tasks: BackgroundTasks, api_key: str = Depends(validate_api_key)
):
"""
Submit a Chain-of-Thought inference job to be processed in the background.
"""Submit a Chain-of-Thought inference job to be processed in the background.

This endpoint accepts the same parameters as the /v1/chat/completions endpoint,
but instead of streaming the response, it submits the job to the background and
Expand Down Expand Up @@ -222,7 +220,10 @@ async def submit_chain_of_thought_job(
[int(uid) for uid in body.get("uids")]
if body.get("uids")
else filter_available_uids(
task=body.get("task"), model=body.get("model"), test=shared_settings.API_TEST_MODE, n_miners=N_MINERS
task=body.get("task"),
model=body.get("model"),
test=shared_settings.API_TEST_MODE,
n_miners=shared_settings.API_TOP_MINERS_TO_STREAM,
)
)

Expand Down
102 changes: 63 additions & 39 deletions validator_api/scoring_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from shared import settings
from shared.epistula import create_header_hook
from shared.loop_runner import AsyncLoopRunner
from validator_api.validator_forwarding import ValidatorRegistry
from validator_api.validator_forwarding import Validator, ValidatorRegistry

validator_registry = ValidatorRegistry()

Expand All @@ -33,7 +33,7 @@ class ScoringQueue(AsyncLoopRunner):
max_scoring_retries: int = 2
_scoring_lock = asyncio.Lock()
_scoring_queue: deque[ScoringPayload] = deque()
_queue_maxlen: int = 20
_queue_maxlen: int = 50
_min_wait_time: float = 1

async def wait_for_next_execution(self, last_run_time) -> datetime.datetime:
Expand All @@ -48,7 +48,6 @@ async def wait_for_next_execution(self, last_run_time) -> datetime.datetime:

async def run_step(self):
"""Perform organic scoring: pop queued payload, forward to the validator API."""
# logger.debug("Running scoring step")
async with self._scoring_lock:
if not self._scoring_queue:
return
Expand All @@ -60,52 +59,36 @@ async def run_step(self):
f"Trying to score organic from {scoring_payload.date}, uids: {uids}. "
f"Queue size: {len(self._scoring_queue)}"
)
validators: list[Validator] = []
try:
vali_uid, vali_axon, vali_hotkey = validator_registry.get_available_axon()
# get_available_axon() will return None if it cannot find an available validator, which will fail to unpack
url = f"http://{vali_axon}/scoring"
if shared_settings.OVERRIDE_AVAILABLE_AXONS:
for idx, vali_axon in enumerate(shared_settings.OVERRIDE_AVAILABLE_AXONS):
validators.append(Validator(uid=-idx, axon=vali_axon, hotkey=shared_settings.API_HOTKEY, stake=1e6))
else:
validators = await validator_registry.get_available_axons(balance=shared_settings.API_ENABLE_BALANCE)
except Exception as e:
logger.exception(f"Could not find available validator scoring endpoint: {e}")

if validators is None:
logger.warning("No validators are available")
return

try:
if hasattr(payload, "to_dict"):
payload = payload.to_dict()
elif isinstance(payload, BaseModel):
payload = payload.model_dump()
payload_bytes = json.dumps(payload).encode()
except BaseException as e:
logger.exception(f"Error when encoding payload: {e}")
await asyncio.sleep(0.1)
return

timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0)
# Add required headers for signature verification

async with httpx.AsyncClient(
timeout=timeout,
event_hooks={"request": [create_header_hook(shared_settings.WALLET.hotkey, vali_hotkey)]},
) as client:
response = await client.post(
url=url,
content=payload_bytes,
headers={"Content-Type": "application/json"},
)
validator_registry.update_validators(uid=vali_uid, response_code=response.status_code)
if response.status_code != 200:
# Raise an exception so that the retry logic in the except block handles it.
raise Exception(f"Non-200 response: {response.status_code} for uids {uids}")
logger.debug(f"Forwarding response completed with status {response.status_code} to uid {vali_uid}")
except httpx.ConnectError as e:
logger.warning(f"Couldn't connect to validator {url} for Scoring {uids}. Exception: {e}")
except Exception as e:
if scoring_payload.retries < self.max_scoring_retries:
scoring_payload.retries += 1
async with self._scoring_lock:
self._scoring_queue.appendleft(scoring_payload)
logger.warning(
f"Tried to forward response from {scoring_payload.date} to {url} for uids {uids}. "
f"Queue size: {len(self._scoring_queue)}. Exception: {e}. Queued for retry"
)
else:
logger.error(
f"Error while forwarding response from {scoring_payload.date} after {scoring_payload.retries} "
f"retries: {e}"
)
tasks = []
for _, validator in validators.items():
task = asyncio.create_task(self._send_result(payload_bytes, scoring_payload, validator, uids))
tasks.append(task)
await asyncio.gather(*tasks)
await asyncio.sleep(0.1)

async def append_response(
Expand Down Expand Up @@ -151,6 +134,47 @@ async def append_response(
logger.info(f"Dropping oldest organic from {scoring_payload.date} for uids {uids}")
self._scoring_queue.append(scoring_item)

async def _send_result(self, payload_bytes, scoring_payload, validator: Validator, uids):
try:
vali_url = f"http://{validator.axon}/scoring"
timeout = httpx.Timeout(timeout=120.0)
async with httpx.AsyncClient(
timeout=timeout,
event_hooks={"request": [create_header_hook(shared_settings.WALLET.hotkey, validator.hotkey)]},
) as client:
response = await client.post(
url=vali_url,
content=payload_bytes,
headers={"Content-Type": "application/json"},
)
validator_registry.update_validators(uid=validator.uid, response_code=response.status_code)
if response.status_code != 200:
raise Exception(
f"Status code {response.status_code} response for validator {validator.uid} - {vali_url}: "
f"{response.status_code} for uids {len(uids)}"
)
logger.debug(f"Successfully forwarded response to uid {validator.uid} - {vali_url}")
except httpx.ConnectError as e:
logger.warning(
f"Couldn't connect to validator {validator.uid} {vali_url} for scoring {len(uids)}. Exception: {e}"
)
except Exception as e:
if shared_settings.API_ENABLE_BALANCE and scoring_payload.retries < self.max_scoring_retries:
scoring_payload.retries += 1
async with self._scoring_lock:
self._scoring_queue.appendleft(scoring_payload)
logger.warning(
f"Tried to forward response from {scoring_payload.date} "
f"to validator {validator.uid} {vali_url} for {len(uids)} uids. "
f"Queue size: {len(self._scoring_queue)}. Exception: {e}"
)
else:
logger.warning(
f"Error while forwarding response from {scoring_payload.date} "
f"to validator {validator.uid} {vali_url} for {len(uids)} uids "
f"retries. Queue size: {len(self._scoring_queue)}. Exception: {e}"
)

@property
def size(self) -> int:
return len(self._scoring_queue)
Expand Down
39 changes: 21 additions & 18 deletions validator_api/validator_forwarding.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import random
import time
from typing import ClassVar, List, Optional, Tuple
from typing import ClassVar

import numpy as np
from loguru import logger
Expand Down Expand Up @@ -54,43 +55,45 @@ class ValidatorRegistry(BaseModel):

@model_validator(mode="after")
def create_validator_list(cls, v: "ValidatorRegistry", metagraph=shared_settings.METAGRAPH) -> "ValidatorRegistry":
validator_uids = np.where(metagraph.stake >= 100000)[0].tolist()
# TODO: Calculate 1000 tao using subtensor alpha price.
validator_uids = np.where(metagraph.stake >= 16000)[0].tolist()
validator_axons = [metagraph.axons[uid].ip_str().split("/")[2] for uid in validator_uids]
validator_stakes = [metagraph.stake[uid] for uid in validator_uids]
validator_hotkeys = [metagraph.hotkeys[uid] for uid in validator_uids]
v.validators = {
uid: Validator(uid=uid, stake=stake, axon=axon, hotkey=hotkey)
for uid, stake, axon, hotkey in zip(validator_uids, validator_stakes, validator_axons, validator_hotkeys)
}
v.validators = {}
for uid, stake, axon, hotkey in zip(validator_uids, validator_stakes, validator_axons, validator_hotkeys):
if hotkey == shared_settings.MC_VALIDATOR_HOTKEY:
logger.debug(f"Replacing {uid} axon from {axon} to {shared_settings.MC_VALIDATOR_AXON}")
axon = shared_settings.MC_VALIDATOR_AXON
v.validators[uid] = Validator(uid=uid, stake=stake, axon=axon, hotkey=hotkey)
return v

def get_available_validators(self) -> List[Validator]:
"""
Given a list of validators, return only those that are not in their cooldown period.
"""
async def get_available_validators(self) -> list[Validator]:
"""Given a list of validators, return only those that are not in their cooldown period."""
return [uid for uid, validator in self.validators.items() if validator.is_available()]

def get_available_axon(self) -> Optional[Tuple[int, List[str], str]]:
async def get_available_axons(self, balance: bool = True) -> list[Validator] | None:
"""
Returns a tuple (uid, axon, hotkey) for a randomly selected validator based on stake weighting,
if spot checking conditions are met. Otherwise, returns None.
"""
if random.random() < self.spot_checking_rate or not self.validators:
return None
for _ in range(self.max_retries):
validator_list = self.get_available_validators()
validator_list = await self.get_available_validators()
if validator_list:
break
else:
time.sleep(5)
await asyncio.sleep(5)
if not validator_list:
logger.error(f"Could not find available validator after {self.max_retries}")
return None
weights = [self.validators[uid].stake for uid in validator_list]
chosen = self.validators[random.choices(validator_list, weights=weights, k=1)[0]]
if chosen.hotkey == shared_settings.MC_VALIDATOR_HOTKEY:
chosen.axon = shared_settings.MC_VALIDATOR_AXON
return chosen.uid, chosen.axon, chosen.hotkey
if balance:
weights = [self.validators[uid].stake for uid in validator_list]
chosen = [self.validators[random.choices(validator_list, weights=weights, k=1)[0]]]
else:
chosen = self.validators
return chosen

def update_validators(self, uid: int, response_code: int) -> None:
"""
Expand Down