Skip to content

Commit b9e0025

Browse files
authored
Merge pull request #559 from macrocosm-os/SN1-380-hardcode-api-key
Create SN1 Loadbalancer
2 parents 9f10d3f + 5b2145c commit b9e0025

File tree

20 files changed

+327
-108
lines changed

20 files changed

+327
-108
lines changed

.env.api.example

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
API_PORT = "42170" # Port for the API server
22
API_HOST = "0.0.0.0" # Host for the API server
33
SCORING_KEY = "123" # The scoring key for the validator (must match the scoring key in the .env.validator file)
4-
SCORE_ORGANICS = True # Whether to score organics
54
VALIDATOR_API = "0.0.0.0:8094" # The validator API to forward responses to for scoring
65
WORKERS=4

.env.validator.example

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ HF_TOKEN = "your_huggingface_token_here"
2626

2727
# Scoring API (optional).
2828
DEPLOY_SCORING_API = true
29-
SCORING_ADMIN_KEY = "123456"
3029
SCORING_API_PORT = 8094
3130
# Scoring key must match the scoring key in the .env.api.
3231
# SCORING_KEY="..."

data/top100k_domains.csv

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99997,4 +99997,4 @@
9999799997
"99996","tankspotter.com","4.51"
9999899998
"99997","targetshootingapp.com","4.51"
9999999999
"99998","tastytalegame.com","4.51"
100000-
"99999","tbscan.com","4.51"
100000+
"99999","tbscan.com","4.51"

neurons/validator.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,13 @@
44
import time
55

66
import loguru
7+
import netaddr
8+
import requests
79
import torch
810
import wandb
11+
from bittensor.core.extrinsics.serving import serve_extrinsic
12+
13+
from prompting.rewards.scoring import task_scorer
914

1015
# ruff: noqa: E402
1116
from shared import settings
@@ -34,7 +39,6 @@ async def spawn_loops(task_queue, scoring_queue, reward_events):
3439
# ruff: noqa: E402
3540
from prompting.llms.model_manager import model_scheduler
3641
from prompting.miner_availability.miner_availability import availability_checking_loop
37-
from prompting.rewards.scoring import task_scorer
3842
from prompting.tasks.task_creation import task_loop
3943
from prompting.tasks.task_sending import task_sender
4044
from prompting.weight_setting.weight_setter import weight_setter
@@ -87,10 +91,25 @@ async def start():
8791
# TODO: We should not use 2 availability loops for each process, in reality
8892
# we should only be sharing the miner availability data between processes.
8993
from prompting.miner_availability.miner_availability import availability_checking_loop
90-
from prompting.rewards.scoring import task_scorer
9194

9295
asyncio.create_task(availability_checking_loop.start())
9396

97+
try:
98+
external_ip = requests.get("https://checkip.amazonaws.com").text.strip()
99+
netaddr.IPAddress(external_ip)
100+
101+
serve_success = serve_extrinsic(
102+
subtensor=settings.shared_settings.SUBTENSOR,
103+
wallet=settings.shared_settings.WALLET,
104+
ip=external_ip,
105+
port=settings.shared_settings.SCORING_API_PORT,
106+
protocol=4,
107+
netuid=settings.shared_settings.NETUID,
108+
)
109+
110+
logger.debug(f"Serve success: {serve_success}")
111+
except Exception as e:
112+
logger.warning(f"Failed to serve scoring api to chain: {e}")
94113
await start_scoring_api(task_scorer, scoring_queue, reward_events)
95114

96115
while True:

prompting/api/api.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
from prompting.api.miner_availabilities.api import router as miner_availabilities_router
66
from prompting.api.scoring.api import router as scoring_router
7+
8+
# from prompting.rewards.scoring import task_scorer
79
from shared import settings
810

911
app = FastAPI()

prompting/api/scoring/api.py

Lines changed: 58 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import time
12
import uuid
23
from typing import Any
34

4-
from fastapi import APIRouter, Depends, Header, HTTPException, Request
5+
from fastapi import APIRouter, Depends, HTTPException, Request
56
from loguru import logger
67

78
from prompting.datasets.random_website import DDGDatasetEntry
@@ -11,13 +12,38 @@
1112
from shared import settings
1213
from shared.base import DatasetEntry
1314
from shared.dendrite import DendriteResponseEvent
14-
from shared.epistula import SynapseStreamResult
15+
from shared.epistula import SynapseStreamResult, verify_signature
16+
from shared.settings import shared_settings
1517

1618
router = APIRouter()
1719

1820

19-
def validate_scoring_key(api_key: str = Header(...)):
20-
if api_key != settings.shared_settings.SCORING_KEY:
21+
async def verify_scoring_signature(request: Request):
22+
signed_by = request.headers.get("Epistula-Signed-By")
23+
signed_for = request.headers.get("Epistula-Signed-For")
24+
if signed_for != shared_settings.WALLET.hotkey.ss58_address:
25+
raise HTTPException(status_code=400, detail="Bad Request, message is not intended for self")
26+
if signed_by != shared_settings.API_HOTKEY:
27+
raise HTTPException(status_code=401, detail="Signer not the expected ss58 address")
28+
29+
body = await request.body()
30+
now = time.time()
31+
err = verify_signature(
32+
request.headers.get("Epistula-Request-Signature"),
33+
body,
34+
request.headers.get("Epistula-Timestamp"),
35+
request.headers.get("Epistula-Uuid"),
36+
signed_for,
37+
signed_by,
38+
now,
39+
)
40+
if err:
41+
logger.error(err)
42+
raise HTTPException(status_code=400, detail=err)
43+
44+
45+
def validate_scoring_key(request: Request):
46+
if request.headers.api_key != settings.shared_settings.SCORING_KEY:
2147
raise HTTPException(status_code=403, detail="Invalid API key")
2248

2349

@@ -27,54 +53,62 @@ def get_task_scorer(request: Request):
2753

2854
@router.post("/scoring")
2955
async def score_response(
30-
request: Request, api_key_data: dict = Depends(validate_scoring_key), task_scorer=Depends(get_task_scorer)
56+
request: Request, api_key_data: dict = Depends(verify_scoring_signature), task_scorer=Depends(get_task_scorer)
3157
):
58+
logger.debug("Scoring Request received!!!!!!!!!!!!!!!!")
3259
model = None
60+
logger.debug("Setted Model to None")
3361
payload: dict[str, Any] = await request.json()
62+
logger.debug(f"Awaited body: {payload}")
3463
body = payload.get("body")
35-
timeout = payload.get("timeout", settings.shared_settings.NEURON_TIMEOUT)
36-
uids = payload.get("uid", [])
64+
timeout = payload.get("timeout", shared_settings.NEURON_TIMEOUT)
65+
uids = payload.get("uids", [])
3766
chunks = payload.get("chunks", {})
67+
timings = payload.get("timings", {})
68+
logger.debug("About to check chunks and uids")
3869
if not uids or not chunks:
3970
logger.error(f"Either uids: {uids} or chunks: {chunks} is not valid, skipping scoring")
4071
return
4172
uids = [int(uid) for uid in uids]
4273
model = body.get("model")
43-
if model:
44-
try:
45-
llm_model = ModelZoo.get_model_by_id(model)
46-
except Exception:
47-
logger.warning(
48-
f"Organic request with model {body.get('model')} made but the model cannot be found in model zoo. Skipping scoring."
49-
)
74+
logger.debug("About to check model")
75+
if model and model != shared_settings.LLM_MODEL:
76+
logger.error(f"Model {model} not available for scoring on this validator.")
5077
return
51-
else:
52-
llm_model = None
78+
logger.debug("Model has been checked")
79+
llm_model = ModelZoo.get_model_by_id(model)
80+
logger.debug("Got LLM Model from ModelZoo")
5381
task_name = body.get("task")
82+
logger.debug(f"Task name set: {task_name}")
83+
logger.debug(f"Length pre-insertion: {len(task_scorer.scoring_queue)}")
5484
if task_name == "InferenceTask":
5585
logger.info(f"Received Organic InferenceTask with body: {body}")
5686
logger.info(f"With model of type {type(body.get('model'))}")
5787
organic_task = InferenceTask(
5888
messages=body.get("messages"),
5989
llm_model=llm_model,
60-
llm_model_id=body.get("model"),
90+
llm_model_id=llm_model,
6191
seed=int(body.get("seed", 0)),
62-
sampling_params=body.get("sampling_parameters", settings.shared_settings.SAMPLING_PARAMS),
92+
sampling_params=body.get("sampling_parameters", shared_settings.SAMPLING_PARAMS),
6393
query=body.get("messages"),
94+
organic=True,
6495
)
6596
logger.info(f"Task created: {organic_task}")
97+
6698
task_scorer.add_to_queue(
6799
task=organic_task,
68100
response=DendriteResponseEvent(
69101
uids=uids,
70102
stream_results=[SynapseStreamResult(accumulated_chunks=chunks.get(str(uid), None)) for uid in uids],
71103
timeout=timeout,
104+
stream_results_all_chunks_timings=[timings.get(str(uid), None) for uid in uids],
72105
),
73106
dataset_entry=DatasetEntry(),
74-
block=settings.shared_settings.METAGRAPH.block,
107+
block=shared_settings.METAGRAPH.block,
75108
step=-1,
76109
task_id=str(uuid.uuid4()),
77110
)
111+
78112
elif task_name == "WebRetrievalTask":
79113
logger.info(f"Received Organic WebRetrievalTask with body: {body}")
80114
try:
@@ -91,15 +125,14 @@ async def score_response(
91125
query=search_term,
92126
),
93127
response=DendriteResponseEvent(
94-
uids=[uids],
95-
stream_results=[
96-
SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None])
97-
],
98-
timeout=body.get("timeout", settings.shared_settings.NEURON_TIMEOUT),
128+
uids=uids,
129+
stream_results=[SynapseStreamResult(accumulated_chunks=chunks.get(str(uid), [])) for uid in uids],
130+
timeout=body.get("timeout", shared_settings.NEURON_TIMEOUT),
99131
),
100132
dataset_entry=DDGDatasetEntry(search_term=search_term),
101-
block=settings.shared_settings.METAGRAPH.block,
133+
block=shared_settings.METAGRAPH.block,
102134
step=-1,
103135
task_id=str(uuid.uuid4()),
104136
)
137+
logger.debug(f"Length post-insertion: {len(task_scorer.scoring_queue)}")
105138
logger.info("Organic task appended to scoring queue")

prompting/llms/apis/sn19_wrapper.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22

33
import requests
4+
from loguru import logger
45
from tenacity import retry, stop_after_attempt, wait_exponential
56

67
from prompting.llms.apis.llm_messages import LLMMessages
@@ -9,7 +10,6 @@
910
shared_settings = settings.shared_settings
1011

1112

12-
# TODO: key error in response.json() when response is 500
1313
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
1414
def chat_complete(
1515
messages: LLMMessages,
@@ -38,6 +38,10 @@ def chat_complete(
3838
"logprobs": logprobs,
3939
}
4040
response = requests.post(url, headers=headers, data=json.dumps(data), timeout=30)
41+
if not response.status_code == 200:
42+
logger.error(f"SN19 API returned status code {response.status_code}")
43+
logger.error(f"Response: {response.text}")
44+
raise Exception(f"SN19 API returned status code {response.status_code}")
4145
response_json = response.json()
4246
try:
4347
return response_json["choices"][0]["message"].get("content")

prompting/llms/hf_llm.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
from loguru import logger
66
from transformers import AutoModelForCausalLM, AutoTokenizer, PreTrainedModel, pipeline
77

8-
from shared import settings
9-
from shared.timer import Timer
8+
from shared.settings import shared_settings
109

1110

1211
class ReproducibleHF:
@@ -31,7 +30,7 @@ def __init__(self, model_id="hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4
3130

3231
self.llm = pipeline("text-generation", model=self.model, tokenizer=self.tokenizer)
3332

34-
self.sampling_params = settings.shared_settings.SAMPLING_PARAMS
33+
self.sampling_params = shared_settings.SAMPLING_PARAMS
3534

3635
@torch.inference_mode()
3736
def generate(self, messages: list[str] | list[dict], sampling_params=None, seed=None):
@@ -46,23 +45,22 @@ def generate(self, messages: list[str] | list[dict], sampling_params=None, seed=
4645
add_generation_prompt=True,
4746
return_tensors="pt",
4847
return_dict=True,
49-
).to(settings.shared_settings.NEURON_DEVICE)
48+
).to(shared_settings.NEURON_DEVICE)
5049

5150
params = sampling_params if sampling_params else self.sampling_params
5251
filtered_params = {k: v for k, v in params.items() if k in self.valid_generation_params}
5352

54-
with Timer():
55-
# Generate with optimized settings
56-
outputs = self.model.generate(
57-
**inputs.to(settings.shared_settings.NEURON_DEVICE),
58-
**filtered_params,
59-
eos_token_id=self.tokenizer.eos_token_id,
60-
)
61-
62-
results = self.tokenizer.batch_decode(
63-
outputs[:, inputs["input_ids"].shape[1] :],
64-
skip_special_tokens=True,
65-
)[0]
53+
# Generate with optimized settings
54+
outputs = self.model.generate(
55+
**inputs.to(shared_settings.NEURON_DEVICE),
56+
**filtered_params,
57+
eos_token_id=self.tokenizer.eos_token_id,
58+
)
59+
60+
results = self.tokenizer.batch_decode(
61+
outputs[:, inputs["input_ids"].shape[1] :],
62+
skip_special_tokens=True,
63+
)[0]
6664

6765
logger.debug(
6866
f"""{self.__class__.__name__} queried:

prompting/rewards/scoring.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -94,20 +94,21 @@ async def run_step(self) -> RewardLoggingEvent:
9494
f"Scored {scoring_config.task.__class__.__name__} {scoring_config.task.task_id} with model "
9595
f"{scoring_config.task.llm_model_id}"
9696
)
97-
log_event(
98-
RewardLoggingEvent(
99-
response_event=scoring_config.response,
100-
reward_events=reward_events,
101-
reference=scoring_config.task.reference,
102-
challenge=scoring_config.task.query,
103-
task=scoring_config.task.name,
104-
block=scoring_config.block,
105-
step=scoring_config.step,
106-
task_id=scoring_config.task_id,
107-
task_dict=scoring_config.task.model_dump(),
108-
source=scoring_config.dataset_entry.source,
97+
if not scoring_config.task.organic:
98+
log_event(
99+
RewardLoggingEvent(
100+
response_event=scoring_config.response,
101+
reward_events=reward_events,
102+
reference=scoring_config.task.reference,
103+
challenge=scoring_config.task.query,
104+
task=scoring_config.task.name,
105+
block=scoring_config.block,
106+
step=scoring_config.step,
107+
task_id=scoring_config.task_id,
108+
task_dict=scoring_config.task.model_dump(),
109+
source=scoring_config.dataset_entry.source,
110+
)
109111
)
110-
)
111112
await asyncio.sleep(0.01)
112113

113114

prompting/tasks/base_task.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class BaseTask(BaseModel, ABC):
3333
query: Any = None
3434
reference: Any = None
3535
task_id: str = Field(default_factory=lambda: str(uuid4()), allow_mutation=False)
36+
organic: bool = False
3637

3738
model_config = ConfigDict(arbitrary_types_allowed=True)
3839

@@ -60,6 +61,7 @@ class BaseTextTask(BaseTask):
6061
sampling_params: dict[str, float] = settings.shared_settings.SAMPLING_PARAMS
6162
timeout: int = settings.shared_settings.NEURON_TIMEOUT
6263
max_tokens: int = settings.shared_settings.NEURON_MAX_TOKENS
64+
organic: bool = True
6365

6466
@property
6567
def task_messages(self) -> list[str] | list[dict]:

0 commit comments

Comments
 (0)