Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions prompting/api/scoring/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,34 @@ async def score_response(request: Request, api_key_data: dict = Depends(validate
model = None
payload: dict[str, Any] = await request.json()
body = payload.get("body")

try:
if body.get("model") is not None:
model = ModelZoo.get_model_by_id(body.get("model"))
except Exception:
logger.warning(
f"Organic request with model {body.get('model')} made but the model cannot be found in model zoo. Skipping scoring."
)
return
uid = int(payload.get("uid"))
chunks = payload.get("chunks")
llm_model = ModelZoo.get_model_by_id(model) if (model := body.get("model")) else None
model = body.get("model")
if model:
try:
llm_model = ModelZoo.get_model_by_id(model)
except Exception:
logger.warning(
f"Organic request with model {body.get('model')} made but the model cannot be found in model zoo. Skipping scoring."
)
return
else:
llm_model = None
task = body.get("task")
if task == "InferenceTask":
logger.info(f"Received Organic InferenceTask with body: {body}")
logger.info(f"With model of type {type(body.get('model'))}")
organic_task = InferenceTask(
messages=body.get("messages"),
llm_model=llm_model,
llm_model_id=body.get("model"),
seed=int(body.get("seed", 0)),
sampling_params=body.get("sampling_parameters", shared_settings.SAMPLING_PARAMS),
query=body.get("messages")[0]["content"],
)
logger.info(f"Task created: {organic_task}")
task_scorer.add_to_queue(
task=InferenceTask(
messages=[msg["content"] for msg in body.get("messages")],
llm_model=llm_model,
llm_model_id=body.get("model"),
seed=int(body.get("seed", 0)),
sampling_params=body.get("sampling_params", {}),
),
task=organic_task,
response=DendriteResponseEvent(
uids=[uid],
stream_results=[
Expand Down
6 changes: 2 additions & 4 deletions prompting/datasets/sn13.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@ def sample(self) -> ChatEntry:
raise self.exception
# Randomly select a sample from the dataset.
messages = []
roles = []
for _ in range(4):
sample_idx = random.randint(0, len(self.dataset) - 1)
if message := self.dataset[sample_idx]["text"]:
roles.append(random.choice(["user", "assistant"]))
messages.append(message)
messages.append({"role": random.choice(["user", "assistant"]), "content": message})

return ChatEntry(roles=roles, messages=messages, organic=False, source=self._url)
return ChatEntry(messages=messages, organic=False, source=self._url)
7 changes: 5 additions & 2 deletions prompting/llms/model_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,15 @@ def _make_prompt(self, messages: list[dict[str, str]]) -> str:
def generate(
self,
messages: list[str],
roles: list[str],
roles: list[str] | None = None,
model: ModelConfig | str | None = None,
seed: int = None,
sampling_params: Dict[str, float] = None,
) -> str:
dict_messages = [{"content": message, "role": role} for message, role in zip(messages, roles)]
if messages and isinstance(messages[0], dict):
dict_messages = messages
else:
dict_messages = [{"content": message, "role": role} for message, role in zip(messages, roles)]

if isinstance(model, str):
model = ModelZoo.get_model_by_id(model)
Expand Down
9 changes: 4 additions & 5 deletions prompting/llms/model_zoo.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ def __hash__(self):
class ModelZoo:
# Currently, we are only using one single model - the one the validator is running
models_configs: ClassVar[list[ModelConfig]] = [
ModelConfig(
llm_model_id=shared_settings.LLM_MODEL,
reward=1,
min_ram=shared_settings.MAX_ALLOWED_VRAM_GB,
),
ModelConfig(llm_model_id=shared_settings.LLM_MODEL, reward=1, min_ram=shared_settings.MAX_ALLOWED_VRAM_GB),
]

# Code below can be uncommended for testing purposes and demonstrates how we rotate multiple LLMs in the future
Expand All @@ -46,6 +42,9 @@ def get_random(cls, max_ram: float = np.inf) -> ModelConfig:

@classmethod
def get_model_by_id(cls, model_id: str) -> ModelConfig:
if not model_id:
logger.error("model_id cannot be None or empty. Returning None...")
return None
try:
return [model for model in cls.models_configs if model.llm_model_id == model_id][0]
except Exception as ex:
Expand Down
13 changes: 7 additions & 6 deletions prompting/miner_availability/miner_availability.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,17 @@ async def run_step(self):
responses: list[Dict[str, bool]] = await query_availabilities(uids_to_query, task_config, model_config)

for response, uid in zip(responses, uids_to_query):
if not response:
miner_availabilities.miners[uid] = MinerAvailability(
task_availabilities={task: True for task in task_config},
llm_model_availabilities={model: False for model in model_config},
)
else:
try:
miner_availabilities.miners[uid] = MinerAvailability(
task_availabilities=response["task_availabilities"],
llm_model_availabilities=response["llm_model_availabilities"],
)
except Exception:
logger.debug("Availability Response Invalid")
miner_availabilities.miners[uid] = MinerAvailability(
task_availabilities={task: True for task in task_config},
llm_model_availabilities={model: False for model in model_config},
)

logger.debug("Miner availabilities updated.")
self.current_index = end_index
Expand Down
1 change: 1 addition & 0 deletions prompting/tasks/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def make_reference(self, **kwargs):

class BaseTextTask(BaseTask):
query: str | None = None
roles: list[str] | None = None
messages: list[str] | list[dict] | None = None
reference: str | None = None
llm_model: ModelConfig = None
Expand Down
6 changes: 4 additions & 2 deletions prompting/tasks/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class InferenceTask(BaseTextTask):

@model_validator(mode="after")
def random_llm_model_id(self):
if self.query: # If we are already defining query, as in the case of organics, we also specify model.
return self

if np.random.rand() < 0.2:
self.llm_model_id = None
else:
Expand All @@ -52,14 +55,13 @@ def random_llm_model_id(self):
def make_query(self, dataset_entry: ChatEntry) -> str:
if self.query:
return self.query
self.query = dataset_entry.messages[-1]
self.query = dataset_entry.messages
self.messages = dataset_entry.messages
return self.query

def make_reference(self, dataset_entry: ChatEntry) -> str:
self.reference = model_manager.generate(
messages=dataset_entry.messages,
roles=dataset_entry.roles,
model=self.llm_model,
seed=self.seed,
sampling_params=self.sampling_params,
Expand Down
3 changes: 1 addition & 2 deletions shared/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ def __hash__(self) -> int:


class ChatEntry(DatasetEntry):
messages: list[str]
roles: list[str]
messages: list[dict]
organic: bool
source: str | None = None
query: str | None = None
Expand Down
12 changes: 8 additions & 4 deletions shared/epistula.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,16 @@ async def merged_stream(responses: list[AsyncGenerator]):
logger.error(f"Error while streaming: {e}")


async def query_miners(uids, body: dict[str, Any]) -> list[SynapseStreamResult]:
async def query_miners(
uids, body: dict[str, Any], timeout_seconds: int = shared_settings.NEURON_TIMEOUT
) -> list[SynapseStreamResult]:
try:
tasks = []
for uid in uids:
tasks.append(
asyncio.create_task(make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid))
asyncio.create_task(
make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, timeout_seconds, body, uid)
)
)
responses = await asyncio.gather(*tasks, return_exceptions=True)

Expand Down Expand Up @@ -197,6 +201,7 @@ async def handle_availability(
async def make_openai_query(
metagraph: "bt.NonTorchMetagraph",
wallet: "bt.wallet",
timeout_seconds: int,
body: dict[str, Any],
uid: int,
stream: bool = False,
Expand All @@ -206,7 +211,7 @@ async def make_openai_query(
base_url=f"http://{axon_info.ip}:{axon_info.port}/v1",
api_key="Apex",
max_retries=0,
timeout=Timeout(30, connect=5, read=10),
timeout=Timeout(timeout_seconds, connect=5, read=timeout_seconds - 5),
http_client=openai.DefaultAsyncHttpxClient(
event_hooks={"request": [create_header_hook(wallet.hotkey, axon_info.hotkey)]}
),
Expand All @@ -217,7 +222,6 @@ async def make_openai_query(
messages=body["messages"],
stream=True,
extra_body=extra_body,
timeout=20,
)
if stream:
return chat
Expand Down
2 changes: 1 addition & 1 deletion shared/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class RewardLoggingEvent(BaseEvent):
reward_events: list[WeightedRewardEvent]
task_id: str
reference: str
challenge: str
challenge: str | list[dict]
task: str
task_dict: dict

Expand Down
5 changes: 2 additions & 3 deletions shared/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class SharedSettings(BaseSettings):
LOG_WEIGHTS: bool = Field(False, env="LOG_WEIGHTS")

# Neuron parameters.
NEURON_TIMEOUT: int = Field(15, env="NEURON_TIMEOUT")
NEURON_TIMEOUT: int = Field(20, env="NEURON_TIMEOUT")
INFERENCE_TIMEOUT: int = Field(60, env="INFERENCE_TIMEOUT")
NEURON_DISABLE_SET_WEIGHTS: bool = Field(False, env="NEURON_DISABLE_SET_WEIGHTS")
NEURON_MOVING_AVERAGE_ALPHA: float = Field(0.1, env="NEURON_MOVING_AVERAGE_ALPHA")
Expand Down Expand Up @@ -119,9 +119,8 @@ class SharedSettings(BaseSettings):
"temperature": 0.7,
"top_p": 0.95,
"top_k": 50,
"max_new_tokens": 256,
"max_new_tokens": 512,
"do_sample": True,
"seed": None,
}
MINER_LLM_MODEL: Optional[str] = Field(None, env="MINER_LLM_MODEL")
LLM_MODEL_RAM: float = Field(70, env="LLM_MODEL_RAM")
Expand Down
6 changes: 5 additions & 1 deletion validator_api/chat_completion.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import json
import math
import random
from typing import AsyncGenerator, List, Optional

Expand Down Expand Up @@ -111,11 +112,14 @@ async def chat_completion(
# Initialize chunks collection for each miner
collected_chunks_list = [[] for _ in selected_uids]

timeout_seconds = int(math.floor(math.log2(body["sampling_parameters"]["max_new_tokens"] / 256))) * 10 + 30
if STREAM:
# Create tasks for all miners
response_tasks = [
asyncio.create_task(
make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=True)
make_openai_query(
shared_settings.METAGRAPH, shared_settings.WALLET, timeout_seconds, body, uid, stream=True
)
)
for uid in selected_uids
]
Expand Down
4 changes: 3 additions & 1 deletion validator_api/gpt_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] =
{"role": "user", "content": search_query},
],
}
stream_results = await query_miners(uids, body)

timeout_seconds = 30
stream_results = await query_miners(uids, body, timeout_seconds)
results = [
"".join(res.accumulated_chunks)
for res in stream_results
Expand Down
Loading