Skip to content
Merged
1 change: 0 additions & 1 deletion scripts/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ async def make_completion(client: openai.AsyncOpenAI, prompt: str, stream: bool
"top_k": 50,
"max_new_tokens": 256,
"do_sample": True,
"seed": None,
},
"task": "InferenceTask",
"mixture": False,
Expand Down
4 changes: 4 additions & 0 deletions shared/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ class SharedSettings(BaseSettings):
# ==== API =====
# API key used to access validator organic scoring mechanism (both .env.validator and .env.api).
SCORING_KEY: str | None = Field(None, env="SCORING_KEY")
# Scoring request rate limit in seconds.
SCORING_RATE_LIMIT_SEC: float = Field(0.5, env="SCORING_RATE_LIMIT_SEC")
# Scoring queue threshold when rate-limit start to kick in, used to query validator API with scoring requests.
SCORING_QUEUE_API_THRESHOLD: int = Field(5, env="SCORING_QUEUE_API_THRESHOLD")

# Validator scoring API (.env.validator).
DEPLOY_SCORING_API: bool = Field(False, env="DEPLOY_SCORING_API")
Expand Down
3 changes: 3 additions & 0 deletions validator_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
settings.shared_settings = settings.SharedSettings.load(mode="api")
shared_settings = settings.shared_settings

from validator_api import scoring_queue
from validator_api.api_management import router as api_management_router
from validator_api.gpt_endpoints import router as gpt_router
from validator_api.utils import update_miner_availabilities_for_api
Expand Down Expand Up @@ -38,6 +39,8 @@ async def health():


async def main():
asyncio.create_task(update_miner_availabilities_for_api.start())
asyncio.create_task(scoring_queue.scoring_queue.start())
uvicorn.run(
"validator_api.api:app",
host=shared_settings.API_HOST,
Expand Down
6 changes: 4 additions & 2 deletions validator_api/chat_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from shared.epistula import make_openai_query
from shared.settings import shared_settings
from shared.uids import get_uids
from validator_api.utils import forward_response
from validator_api import scoring_queue


async def peek_until_valid_chunk(
Expand Down Expand Up @@ -143,7 +143,9 @@ async def stream_from_first_response(
remaining = asyncio.gather(*pending, return_exceptions=True)
remaining_tasks = asyncio.create_task(collect_remaining_responses(remaining, collected_chunks_list, body, uids))
await remaining_tasks
asyncio.create_task(forward_response(uids, body, collected_chunks_list))
asyncio.create_task(
scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=collected_chunks_list)
)

except asyncio.CancelledError:
logger.info("Client disconnected, streaming cancelled")
Expand Down
5 changes: 3 additions & 2 deletions validator_api/gpt_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

from shared.epistula import SynapseStreamResult, query_miners
from shared.settings import shared_settings
from validator_api import scoring_queue
from validator_api.api_management import _keys
from validator_api.chat_completion import chat_completion
from validator_api.mixture_of_miners import mixture_of_miners
from validator_api.test_time_inference import generate_response
from validator_api.utils import filter_available_uids, forward_response
from validator_api.utils import filter_available_uids

router = APIRouter()
N_MINERS = 5
Expand Down Expand Up @@ -95,7 +96,7 @@ async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] =
raise HTTPException(status_code=500, detail="No miner responded successfully")

chunks = [res.accumulated_chunks if res and res.accumulated_chunks else [] for res in stream_results]
asyncio.create_task(forward_response(uids=uids, body=body, chunks=chunks))
asyncio.create_task(scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=chunks))
return loaded_results


Expand Down
98 changes: 98 additions & 0 deletions validator_api/scoring_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import asyncio
import datetime
from collections import deque
from typing import Any

import httpx
from loguru import logger
from pydantic import BaseModel

from shared.loop_runner import AsyncLoopRunner
from shared.settings import shared_settings


class ScoringPayload(BaseModel):
payload: dict[str, Any]
retries: int = 0


class ScoringQueue(AsyncLoopRunner):
"""Performs organic scoring every `interval` seconds."""

interval: float = shared_settings.SCORING_RATE_LIMIT_SEC
scoring_queue_threshold: int = shared_settings.SCORING_QUEUE_API_THRESHOLD
max_scoring_retries: int = 3
_scoring_lock = asyncio.Lock()
_scoring_queue: deque[ScoringPayload] = deque()

async def wait_for_next_execution(self, last_run_time) -> datetime.datetime:
"""If scoring queue is small, execute immediately, otherwise wait until the next execution time."""
async with self._scoring_lock:
if self.scoring_queue_threshold < self.size > 0:
# If scoring queue is small and non-empty, score immediately.
return datetime.datetime.now()

return await super().wait_for_next_execution(last_run_time)

async def run_step(self):
"""Perform organic scoring: pop queued payload, forward to the validator API."""
async with self._scoring_lock:
if not self._scoring_queue:
return

scoring_payload = self._scoring_queue.popleft()
payload = scoring_payload.payload
uids = payload["uid"]
logger.info(f"Received new organic for scoring, uids: {uids}")

url = f"http://{shared_settings.VALIDATOR_API}/scoring"
try:
timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
url=url,
json=payload,
headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"},
)
if response.status_code != 200:
# Raise an exception so that the retry logic in the except block handles it.
raise Exception(f"Non-200 response: {response.status_code} for uids {uids}")
logger.info(f"Forwarding response completed with status {response.status_code}")
except Exception as e:
if scoring_payload.retries < self.max_scoring_retries:
scoring_payload.retries += 1
async with self._scoring_lock:
self._scoring_queue.appendleft(scoring_payload)
logger.error(f"Tried to forward response to {url} with payload {payload}. Queued for retry")
else:
logger.exception(f"Error while forwarding response after {scoring_payload.retries} retries: {e}")

async def append_response(self, uids: list[int], body: dict[str, Any], chunks: list[list[str]]):
if not shared_settings.SCORE_ORGANICS:
return

if body.get("task") != "InferenceTask" and body.get("task") != "WebRetrievalTask":
logger.debug(f"Skipping forwarding for non-inference/web retrieval task: {body.get('task')}")
return

uids = [int(u) for u in uids]
chunk_dict = {u: c for u, c in zip(uids, chunks)}
payload = {"body": body, "chunks": chunk_dict, "uid": uids}
scoring_item = ScoringPayload(payload=payload)

async with self._scoring_lock:
self._scoring_queue.append(scoring_item)

logger.info(f"Appended responses from uids {uids} into scoring queue with size: {self.size}")
logger.debug(f"Queued responses body: {body}, chunks: {chunks}")

@property
def size(self) -> int:
return len(self._scoring_queue)

def __len__(self) -> int:
return self.size


# TODO: Leaving it as a global var to make less architecture changes, refactor as DI.
scoring_queue = ScoringQueue()
33 changes: 0 additions & 33 deletions validator_api/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import httpx
import requests
from loguru import logger

Expand Down Expand Up @@ -65,35 +64,3 @@ def filter_available_uids(task: str | None = None, model: str | None = None) ->
filtered_uids.append(uid)

return filtered_uids


# TODO: Modify this so that all the forwarded responses are sent in a single request. This is both more efficient but
# also means that on the validator side all responses are scored at once, speeding up the scoring process.
async def forward_response(uids: list[int], body: dict[str, any], chunks: list[list[str]]):
uids = [int(u) for u in uids]
chunk_dict = {u: c for u, c in zip(uids, chunks)}
logger.info(f"Forwarding response from uid {uids} to scoring with body: {body} and chunks: {chunks}")
if not shared_settings.SCORE_ORGANICS:
return

if body.get("task") != "InferenceTask" and body.get("task") != "WebRetrievalTask":
logger.debug(f"Skipping forwarding for non- inference/web retrieval task: {body.get('task')}")
return

url = f"http://{shared_settings.VALIDATOR_API}/scoring"
payload = {"body": body, "chunks": chunk_dict, "uid": uids}
try:
timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
url, json=payload, headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"}
)
if response.status_code == 200:
logger.info(f"Forwarding response completed with status {response.status_code}")
else:
logger.exception(
f"Forwarding response uid {uids} failed with status {response.status_code} and payload {payload}"
)
except Exception as e:
logger.error(f"Tried to forward response to {url} with payload {payload}")
logger.exception(f"Error while forwarding response: {e}")