Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
87c7f96
WIP: add sampling by stake
richwardle Jan 28, 2025
65733cb
Serve Validator Scoring Endpoint to Chain
richwardle Feb 1, 2025
6961c5e
Intermediary Dump
richwardle Feb 1, 2025
134a179
Create Validator Registry Object
richwardle Feb 1, 2025
a11f6e9
Update Web Retrieval to Use Response Forwarding
richwardle Feb 3, 2025
4d68dba
Remove SCORE_ORGANICS parameter
richwardle Feb 3, 2025
ffafdda
Remove Unused Parameters
richwardle Feb 3, 2025
c790034
Use Epistula for scroing api
richwardle Feb 3, 2025
c5d8fe6
Switch to Epistula Based Approach
richwardle Feb 3, 2025
7056bcf
Merge branch 'staging' into SN1-380-hardcode-api-key
richwardle Feb 9, 2025
f0cb9cb
Small syntax fixes
richwardle Feb 9, 2025
e27b91f
Linting
richwardle Feb 10, 2025
c2ffcb7
More Linting
richwardle Feb 10, 2025
51fdb66
Checking response status codes
richwardle Feb 10, 2025
26b92b3
Merge branch 'staging' into SN1-380-hardcode-api-key
richwardle Feb 11, 2025
dbfcd57
Merge Fixes
richwardle Feb 11, 2025
b6ad8c7
Big ol fix
richwardle Feb 11, 2025
99751f5
Make scoring queue start
dbobrenko Feb 11, 2025
1e83747
Change to dict rather than list
richwardle Feb 11, 2025
9e61534
Merge branch 'staging' into SN1-380-hardcode-api-key
bkb2135 Feb 11, 2025
066bc45
Increase robustness in case of failure
richwardle Feb 12, 2025
43fe565
Fix things lost in merge
richwardle Feb 12, 2025
bfefda3
Linting
richwardle Feb 12, 2025
a10148c
Clean up scoring
richwardle Feb 12, 2025
6d36e4b
Merge branch 'staging' into SN1-380-hardcode-api-key
bkb2135 Feb 12, 2025
b4f7f75
Linting and default mock settings to cuda
richwardle Feb 13, 2025
2ab4f3e
Forward timings for scoring
richwardle Feb 13, 2025
23e91e1
Default to empty list for timings for web retrieval
richwardle Feb 13, 2025
1e5271c
Final Fixes/ Revert Settings
richwardle Feb 13, 2025
5353451
Merge branch 'staging' into SN1-380-hardcode-api-key
Hollyqui Feb 14, 2025
a1ff719
Linting
richwardle Feb 14, 2025
382014f
Merge branch 'staging' into SN1-380-hardcode-api-key
richwardle Feb 15, 2025
b3e99b0
Linting
richwardle Feb 15, 2025
5b2145c
Initialize settings to validator if cuda is available
richwardle Feb 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .env.api.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
API_PORT = "42170" # Port for the API server
API_HOST = "0.0.0.0" # Host for the API server
SCORING_KEY = "123" # The scoring key for the validator (must match the scoring key in the .env.validator file)
SCORE_ORGANICS = True # Whether to score organics
VALIDATOR_API = "0.0.0.0:8094" # The validator API to forward responses to for scoring
WORKERS=4
1 change: 0 additions & 1 deletion .env.validator.example
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ HF_TOKEN = "your_huggingface_token_here"

# Scoring API (optional).
DEPLOY_SCORING_API = true
SCORING_ADMIN_KEY = "123456"
SCORING_API_PORT = 8094
# Scoring key must match the scoring key in the .env.api.
# SCORING_KEY="..."
2 changes: 1 addition & 1 deletion data/top100k_domains.csv
Original file line number Diff line number Diff line change
Expand Up @@ -99997,4 +99997,4 @@
"99996","tankspotter.com","4.51"
"99997","targetshootingapp.com","4.51"
"99998","tastytalegame.com","4.51"
"99999","tbscan.com","4.51"
"99999","tbscan.com","4.51"
23 changes: 21 additions & 2 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
import time

import loguru
import netaddr
import requests
import torch
import wandb
from bittensor.core.extrinsics.serving import serve_extrinsic

from prompting.rewards.scoring import task_scorer

# ruff: noqa: E402
from shared import settings
Expand Down Expand Up @@ -34,7 +39,6 @@ async def spawn_loops(task_queue, scoring_queue, reward_events):
# ruff: noqa: E402
from prompting.llms.model_manager import model_scheduler
from prompting.miner_availability.miner_availability import availability_checking_loop
from prompting.rewards.scoring import task_scorer
from prompting.tasks.task_creation import task_loop
from prompting.tasks.task_sending import task_sender
from prompting.weight_setting.weight_setter import weight_setter
Expand Down Expand Up @@ -87,10 +91,25 @@ async def start():
# TODO: We should not use 2 availability loops for each process, in reality
# we should only be sharing the miner availability data between processes.
from prompting.miner_availability.miner_availability import availability_checking_loop
from prompting.rewards.scoring import task_scorer

asyncio.create_task(availability_checking_loop.start())

try:
external_ip = requests.get("https://checkip.amazonaws.com").text.strip()
netaddr.IPAddress(external_ip)

serve_success = serve_extrinsic(
subtensor=settings.shared_settings.SUBTENSOR,
wallet=settings.shared_settings.WALLET,
ip=external_ip,
port=settings.shared_settings.SCORING_API_PORT,
protocol=4,
netuid=settings.shared_settings.NETUID,
)

logger.debug(f"Serve success: {serve_success}")
except Exception as e:
logger.warning(f"Failed to serve scoring api to chain: {e}")
await start_scoring_api(task_scorer, scoring_queue, reward_events)

while True:
Expand Down
2 changes: 2 additions & 0 deletions prompting/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from prompting.api.miner_availabilities.api import router as miner_availabilities_router
from prompting.api.scoring.api import router as scoring_router

# from prompting.rewards.scoring import task_scorer
from shared import settings

app = FastAPI()
Expand Down
83 changes: 58 additions & 25 deletions prompting/api/scoring/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import time
import uuid
from typing import Any

from fastapi import APIRouter, Depends, Header, HTTPException, Request
from fastapi import APIRouter, Depends, HTTPException, Request
from loguru import logger

from prompting.datasets.random_website import DDGDatasetEntry
Expand All @@ -11,13 +12,38 @@
from shared import settings
from shared.base import DatasetEntry
from shared.dendrite import DendriteResponseEvent
from shared.epistula import SynapseStreamResult
from shared.epistula import SynapseStreamResult, verify_signature
from shared.settings import shared_settings

router = APIRouter()


def validate_scoring_key(api_key: str = Header(...)):
if api_key != settings.shared_settings.SCORING_KEY:
async def verify_scoring_signature(request: Request):
signed_by = request.headers.get("Epistula-Signed-By")
signed_for = request.headers.get("Epistula-Signed-For")
if signed_for != shared_settings.WALLET.hotkey.ss58_address:
raise HTTPException(status_code=400, detail="Bad Request, message is not intended for self")
if signed_by != shared_settings.API_HOTKEY:
raise HTTPException(status_code=401, detail="Signer not the expected ss58 address")

body = await request.body()
now = time.time()
err = verify_signature(
request.headers.get("Epistula-Request-Signature"),
body,
request.headers.get("Epistula-Timestamp"),
request.headers.get("Epistula-Uuid"),
signed_for,
signed_by,
now,
)
if err:
logger.error(err)
raise HTTPException(status_code=400, detail=err)


def validate_scoring_key(request: Request):
if request.headers.api_key != settings.shared_settings.SCORING_KEY:
raise HTTPException(status_code=403, detail="Invalid API key")


Expand All @@ -27,54 +53,62 @@ def get_task_scorer(request: Request):

@router.post("/scoring")
async def score_response(
request: Request, api_key_data: dict = Depends(validate_scoring_key), task_scorer=Depends(get_task_scorer)
request: Request, api_key_data: dict = Depends(verify_scoring_signature), task_scorer=Depends(get_task_scorer)
):
logger.debug("Scoring Request received!!!!!!!!!!!!!!!!")
model = None
logger.debug("Setted Model to None")
payload: dict[str, Any] = await request.json()
logger.debug(f"Awaited body: {payload}")
body = payload.get("body")
timeout = payload.get("timeout", settings.shared_settings.NEURON_TIMEOUT)
uids = payload.get("uid", [])
timeout = payload.get("timeout", shared_settings.NEURON_TIMEOUT)
uids = payload.get("uids", [])
chunks = payload.get("chunks", {})
timings = payload.get("timings", {})
logger.debug("About to check chunks and uids")
if not uids or not chunks:
logger.error(f"Either uids: {uids} or chunks: {chunks} is not valid, skipping scoring")
return
uids = [int(uid) for uid in uids]
model = body.get("model")
if model:
try:
llm_model = ModelZoo.get_model_by_id(model)
except Exception:
logger.warning(
f"Organic request with model {body.get('model')} made but the model cannot be found in model zoo. Skipping scoring."
)
logger.debug("About to check model")
if model and model != shared_settings.LLM_MODEL:
logger.error(f"Model {model} not available for scoring on this validator.")
return
else:
llm_model = None
logger.debug("Model has been checked")
llm_model = ModelZoo.get_model_by_id(model)
logger.debug("Got LLM Model from ModelZoo")
task_name = body.get("task")
logger.debug(f"Task name set: {task_name}")
logger.debug(f"Length pre-insertion: {len(task_scorer.scoring_queue)}")
if task_name == "InferenceTask":
logger.info(f"Received Organic InferenceTask with body: {body}")
logger.info(f"With model of type {type(body.get('model'))}")
organic_task = InferenceTask(
messages=body.get("messages"),
llm_model=llm_model,
llm_model_id=body.get("model"),
llm_model_id=llm_model,
seed=int(body.get("seed", 0)),
sampling_params=body.get("sampling_parameters", settings.shared_settings.SAMPLING_PARAMS),
sampling_params=body.get("sampling_parameters", shared_settings.SAMPLING_PARAMS),
query=body.get("messages"),
organic=True,
)
logger.info(f"Task created: {organic_task}")

task_scorer.add_to_queue(
task=organic_task,
response=DendriteResponseEvent(
uids=uids,
stream_results=[SynapseStreamResult(accumulated_chunks=chunks.get(str(uid), None)) for uid in uids],
timeout=timeout,
stream_results_all_chunks_timings=[timings.get(str(uid), None) for uid in uids],
),
dataset_entry=DatasetEntry(),
block=settings.shared_settings.METAGRAPH.block,
block=shared_settings.METAGRAPH.block,
step=-1,
task_id=str(uuid.uuid4()),
)

elif task_name == "WebRetrievalTask":
logger.info(f"Received Organic WebRetrievalTask with body: {body}")
try:
Expand All @@ -91,15 +125,14 @@ async def score_response(
query=search_term,
),
response=DendriteResponseEvent(
uids=[uids],
stream_results=[
SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None])
],
timeout=body.get("timeout", settings.shared_settings.NEURON_TIMEOUT),
uids=uids,
stream_results=[SynapseStreamResult(accumulated_chunks=chunks.get(str(uid), [])) for uid in uids],
timeout=body.get("timeout", shared_settings.NEURON_TIMEOUT),
),
dataset_entry=DDGDatasetEntry(search_term=search_term),
block=settings.shared_settings.METAGRAPH.block,
block=shared_settings.METAGRAPH.block,
step=-1,
task_id=str(uuid.uuid4()),
)
logger.debug(f"Length post-insertion: {len(task_scorer.scoring_queue)}")
logger.info("Organic task appended to scoring queue")
6 changes: 5 additions & 1 deletion prompting/llms/apis/sn19_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json

import requests
from loguru import logger
from tenacity import retry, stop_after_attempt, wait_exponential

from prompting.llms.apis.llm_messages import LLMMessages
Expand All @@ -9,7 +10,6 @@
shared_settings = settings.shared_settings


# TODO: key error in response.json() when response is 500
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def chat_complete(
messages: LLMMessages,
Expand Down Expand Up @@ -38,6 +38,10 @@ def chat_complete(
"logprobs": logprobs,
}
response = requests.post(url, headers=headers, data=json.dumps(data), timeout=30)
if not response.status_code == 200:
logger.error(f"SN19 API returned status code {response.status_code}")
logger.error(f"Response: {response.text}")
raise Exception(f"SN19 API returned status code {response.status_code}")
response_json = response.json()
try:
return response_json["choices"][0]["message"].get("content")
Expand Down
30 changes: 14 additions & 16 deletions prompting/llms/hf_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from loguru import logger
from transformers import AutoModelForCausalLM, AutoTokenizer, PreTrainedModel, pipeline

from shared import settings
from shared.timer import Timer
from shared.settings import shared_settings


class ReproducibleHF:
Expand All @@ -31,7 +30,7 @@ def __init__(self, model_id="hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4

self.llm = pipeline("text-generation", model=self.model, tokenizer=self.tokenizer)

self.sampling_params = settings.shared_settings.SAMPLING_PARAMS
self.sampling_params = shared_settings.SAMPLING_PARAMS

@torch.inference_mode()
def generate(self, messages: list[str] | list[dict], sampling_params=None, seed=None):
Expand All @@ -46,23 +45,22 @@ def generate(self, messages: list[str] | list[dict], sampling_params=None, seed=
add_generation_prompt=True,
return_tensors="pt",
return_dict=True,
).to(settings.shared_settings.NEURON_DEVICE)
).to(shared_settings.NEURON_DEVICE)

params = sampling_params if sampling_params else self.sampling_params
filtered_params = {k: v for k, v in params.items() if k in self.valid_generation_params}

with Timer():
# Generate with optimized settings
outputs = self.model.generate(
**inputs.to(settings.shared_settings.NEURON_DEVICE),
**filtered_params,
eos_token_id=self.tokenizer.eos_token_id,
)

results = self.tokenizer.batch_decode(
outputs[:, inputs["input_ids"].shape[1] :],
skip_special_tokens=True,
)[0]
# Generate with optimized settings
outputs = self.model.generate(
**inputs.to(shared_settings.NEURON_DEVICE),
**filtered_params,
eos_token_id=self.tokenizer.eos_token_id,
)

results = self.tokenizer.batch_decode(
outputs[:, inputs["input_ids"].shape[1] :],
skip_special_tokens=True,
)[0]

logger.debug(
f"""{self.__class__.__name__} queried:
Expand Down
27 changes: 14 additions & 13 deletions prompting/rewards/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,21 @@ async def run_step(self) -> RewardLoggingEvent:
f"Scored {scoring_config.task.__class__.__name__} {scoring_config.task.task_id} with model "
f"{scoring_config.task.llm_model_id}"
)
log_event(
RewardLoggingEvent(
response_event=scoring_config.response,
reward_events=reward_events,
reference=scoring_config.task.reference,
challenge=scoring_config.task.query,
task=scoring_config.task.name,
block=scoring_config.block,
step=scoring_config.step,
task_id=scoring_config.task_id,
task_dict=scoring_config.task.model_dump(),
source=scoring_config.dataset_entry.source,
if not scoring_config.task.organic:
log_event(
RewardLoggingEvent(
response_event=scoring_config.response,
reward_events=reward_events,
reference=scoring_config.task.reference,
challenge=scoring_config.task.query,
task=scoring_config.task.name,
block=scoring_config.block,
step=scoring_config.step,
task_id=scoring_config.task_id,
task_dict=scoring_config.task.model_dump(),
source=scoring_config.dataset_entry.source,
)
)
)
await asyncio.sleep(0.01)


Expand Down
2 changes: 2 additions & 0 deletions prompting/tasks/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class BaseTask(BaseModel, ABC):
query: Any = None
reference: Any = None
task_id: str = Field(default_factory=lambda: str(uuid4()), allow_mutation=False)
organic: bool = False

model_config = ConfigDict(arbitrary_types_allowed=True)

Expand Down Expand Up @@ -60,6 +61,7 @@ class BaseTextTask(BaseTask):
sampling_params: dict[str, float] = settings.shared_settings.SAMPLING_PARAMS
timeout: int = settings.shared_settings.NEURON_TIMEOUT
max_tokens: int = settings.shared_settings.NEURON_MAX_TOKENS
organic: bool = True

@property
def task_messages(self) -> list[str] | list[dict]:
Expand Down
Loading