diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index d7c981c17..9618aee68 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -27,29 +27,34 @@ async def score_response(request: Request, api_key_data: dict = Depends(validate model = None payload: dict[str, Any] = await request.json() body = payload.get("body") - - try: - if body.get("model") is not None: - model = ModelZoo.get_model_by_id(body.get("model")) - except Exception: - logger.warning( - f"Organic request with model {body.get('model')} made but the model cannot be found in model zoo. Skipping scoring." - ) - return uid = int(payload.get("uid")) chunks = payload.get("chunks") - llm_model = ModelZoo.get_model_by_id(model) if (model := body.get("model")) else None + model = body.get("model") + if model: + try: + llm_model = ModelZoo.get_model_by_id(model) + except Exception: + logger.warning( + f"Organic request with model {body.get('model')} made but the model cannot be found in model zoo. Skipping scoring." + ) + return + else: + llm_model = None task = body.get("task") if task == "InferenceTask": logger.info(f"Received Organic InferenceTask with body: {body}") + logger.info(f"With model of type {type(body.get('model'))}") + organic_task = InferenceTask( + messages=body.get("messages"), + llm_model=llm_model, + llm_model_id=body.get("model"), + seed=int(body.get("seed", 0)), + sampling_params=body.get("sampling_parameters", shared_settings.SAMPLING_PARAMS), + query=body.get("messages")[0]["content"], + ) + logger.info(f"Task created: {organic_task}") task_scorer.add_to_queue( - task=InferenceTask( - messages=[msg["content"] for msg in body.get("messages")], - llm_model=llm_model, - llm_model_id=body.get("model"), - seed=int(body.get("seed", 0)), - sampling_params=body.get("sampling_params", {}), - ), + task=organic_task, response=DendriteResponseEvent( uids=[uid], stream_results=[ diff --git a/prompting/datasets/sn13.py b/prompting/datasets/sn13.py index ac1bcf776..0615f8c49 100644 --- a/prompting/datasets/sn13.py +++ b/prompting/datasets/sn13.py @@ -38,11 +38,9 @@ def sample(self) -> ChatEntry: raise self.exception # Randomly select a sample from the dataset. messages = [] - roles = [] for _ in range(4): sample_idx = random.randint(0, len(self.dataset) - 1) if message := self.dataset[sample_idx]["text"]: - roles.append(random.choice(["user", "assistant"])) - messages.append(message) + messages.append({"role": random.choice(["user", "assistant"]), "content": message}) - return ChatEntry(roles=roles, messages=messages, organic=False, source=self._url) + return ChatEntry(messages=messages, organic=False, source=self._url) diff --git a/prompting/llms/model_manager.py b/prompting/llms/model_manager.py index b0f516af2..275b50ce4 100644 --- a/prompting/llms/model_manager.py +++ b/prompting/llms/model_manager.py @@ -136,12 +136,15 @@ def _make_prompt(self, messages: list[dict[str, str]]) -> str: def generate( self, messages: list[str], - roles: list[str], + roles: list[str] | None = None, model: ModelConfig | str | None = None, seed: int = None, sampling_params: Dict[str, float] = None, ) -> str: - dict_messages = [{"content": message, "role": role} for message, role in zip(messages, roles)] + if messages and isinstance(messages[0], dict): + dict_messages = messages + else: + dict_messages = [{"content": message, "role": role} for message, role in zip(messages, roles)] if isinstance(model, str): model = ModelZoo.get_model_by_id(model) diff --git a/prompting/llms/model_zoo.py b/prompting/llms/model_zoo.py index 755dd2beb..dab941616 100644 --- a/prompting/llms/model_zoo.py +++ b/prompting/llms/model_zoo.py @@ -20,11 +20,7 @@ def __hash__(self): class ModelZoo: # Currently, we are only using one single model - the one the validator is running models_configs: ClassVar[list[ModelConfig]] = [ - ModelConfig( - llm_model_id=shared_settings.LLM_MODEL, - reward=1, - min_ram=shared_settings.MAX_ALLOWED_VRAM_GB, - ), + ModelConfig(llm_model_id=shared_settings.LLM_MODEL, reward=1, min_ram=shared_settings.MAX_ALLOWED_VRAM_GB), ] # Code below can be uncommended for testing purposes and demonstrates how we rotate multiple LLMs in the future @@ -46,6 +42,9 @@ def get_random(cls, max_ram: float = np.inf) -> ModelConfig: @classmethod def get_model_by_id(cls, model_id: str) -> ModelConfig: + if not model_id: + logger.error("model_id cannot be None or empty. Returning None...") + return None try: return [model for model in cls.models_configs if model.llm_model_id == model_id][0] except Exception as ex: diff --git a/prompting/miner_availability/miner_availability.py b/prompting/miner_availability/miner_availability.py index 35fc2edaa..dfecac3f9 100644 --- a/prompting/miner_availability/miner_availability.py +++ b/prompting/miner_availability/miner_availability.py @@ -73,16 +73,17 @@ async def run_step(self): responses: list[Dict[str, bool]] = await query_availabilities(uids_to_query, task_config, model_config) for response, uid in zip(responses, uids_to_query): - if not response: - miner_availabilities.miners[uid] = MinerAvailability( - task_availabilities={task: True for task in task_config}, - llm_model_availabilities={model: False for model in model_config}, - ) - else: + try: miner_availabilities.miners[uid] = MinerAvailability( task_availabilities=response["task_availabilities"], llm_model_availabilities=response["llm_model_availabilities"], ) + except Exception: + logger.debug("Availability Response Invalid") + miner_availabilities.miners[uid] = MinerAvailability( + task_availabilities={task: True for task in task_config}, + llm_model_availabilities={model: False for model in model_config}, + ) logger.debug("Miner availabilities updated.") self.current_index = end_index diff --git a/prompting/tasks/base_task.py b/prompting/tasks/base_task.py index 9434df94a..f05796471 100644 --- a/prompting/tasks/base_task.py +++ b/prompting/tasks/base_task.py @@ -47,6 +47,7 @@ def make_reference(self, **kwargs): class BaseTextTask(BaseTask): query: str | None = None + roles: list[str] | None = None messages: list[str] | list[dict] | None = None reference: str | None = None llm_model: ModelConfig = None diff --git a/prompting/tasks/inference.py b/prompting/tasks/inference.py index 3270d30f7..2c83865ff 100644 --- a/prompting/tasks/inference.py +++ b/prompting/tasks/inference.py @@ -43,6 +43,9 @@ class InferenceTask(BaseTextTask): @model_validator(mode="after") def random_llm_model_id(self): + if self.query: # If we are already defining query, as in the case of organics, we also specify model. + return self + if np.random.rand() < 0.2: self.llm_model_id = None else: @@ -52,14 +55,13 @@ def random_llm_model_id(self): def make_query(self, dataset_entry: ChatEntry) -> str: if self.query: return self.query - self.query = dataset_entry.messages[-1] + self.query = dataset_entry.messages self.messages = dataset_entry.messages return self.query def make_reference(self, dataset_entry: ChatEntry) -> str: self.reference = model_manager.generate( messages=dataset_entry.messages, - roles=dataset_entry.roles, model=self.llm_model, seed=self.seed, sampling_params=self.sampling_params, diff --git a/shared/base.py b/shared/base.py index 90eb13358..1d851aea3 100644 --- a/shared/base.py +++ b/shared/base.py @@ -18,8 +18,7 @@ def __hash__(self) -> int: class ChatEntry(DatasetEntry): - messages: list[str] - roles: list[str] + messages: list[dict] organic: bool source: str | None = None query: str | None = None diff --git a/shared/epistula.py b/shared/epistula.py index 5af064856..32b197b90 100644 --- a/shared/epistula.py +++ b/shared/epistula.py @@ -111,12 +111,16 @@ async def merged_stream(responses: list[AsyncGenerator]): logger.error(f"Error while streaming: {e}") -async def query_miners(uids, body: dict[str, Any]) -> list[SynapseStreamResult]: +async def query_miners( + uids, body: dict[str, Any], timeout_seconds: int = shared_settings.NEURON_TIMEOUT +) -> list[SynapseStreamResult]: try: tasks = [] for uid in uids: tasks.append( - asyncio.create_task(make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid)) + asyncio.create_task( + make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, timeout_seconds, body, uid) + ) ) responses = await asyncio.gather(*tasks, return_exceptions=True) @@ -197,6 +201,7 @@ async def handle_availability( async def make_openai_query( metagraph: "bt.NonTorchMetagraph", wallet: "bt.wallet", + timeout_seconds: int, body: dict[str, Any], uid: int, stream: bool = False, @@ -206,7 +211,7 @@ async def make_openai_query( base_url=f"http://{axon_info.ip}:{axon_info.port}/v1", api_key="Apex", max_retries=0, - timeout=Timeout(30, connect=5, read=10), + timeout=Timeout(timeout_seconds, connect=5, read=timeout_seconds - 5), http_client=openai.DefaultAsyncHttpxClient( event_hooks={"request": [create_header_hook(wallet.hotkey, axon_info.hotkey)]} ), @@ -217,7 +222,6 @@ async def make_openai_query( messages=body["messages"], stream=True, extra_body=extra_body, - timeout=20, ) if stream: return chat diff --git a/shared/logging.py b/shared/logging.py index 6da5e1de4..d04d68fb1 100644 --- a/shared/logging.py +++ b/shared/logging.py @@ -178,7 +178,7 @@ class RewardLoggingEvent(BaseEvent): reward_events: list[WeightedRewardEvent] task_id: str reference: str - challenge: str + challenge: str | list[dict] task: str task_dict: dict diff --git a/shared/settings.py b/shared/settings.py index e4df024fe..656558f44 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -57,7 +57,7 @@ class SharedSettings(BaseSettings): LOG_WEIGHTS: bool = Field(False, env="LOG_WEIGHTS") # Neuron parameters. - NEURON_TIMEOUT: int = Field(15, env="NEURON_TIMEOUT") + NEURON_TIMEOUT: int = Field(20, env="NEURON_TIMEOUT") INFERENCE_TIMEOUT: int = Field(60, env="INFERENCE_TIMEOUT") NEURON_DISABLE_SET_WEIGHTS: bool = Field(False, env="NEURON_DISABLE_SET_WEIGHTS") NEURON_MOVING_AVERAGE_ALPHA: float = Field(0.1, env="NEURON_MOVING_AVERAGE_ALPHA") @@ -119,9 +119,8 @@ class SharedSettings(BaseSettings): "temperature": 0.7, "top_p": 0.95, "top_k": 50, - "max_new_tokens": 256, + "max_new_tokens": 512, "do_sample": True, - "seed": None, } MINER_LLM_MODEL: Optional[str] = Field(None, env="MINER_LLM_MODEL") LLM_MODEL_RAM: float = Field(70, env="LLM_MODEL_RAM") diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index fd05a174a..84d8501ce 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -1,5 +1,6 @@ import asyncio import json +import math import random from typing import AsyncGenerator, List, Optional @@ -111,11 +112,14 @@ async def chat_completion( # Initialize chunks collection for each miner collected_chunks_list = [[] for _ in selected_uids] + timeout_seconds = int(math.floor(math.log2(body["sampling_parameters"]["max_new_tokens"] / 256))) * 10 + 30 if STREAM: # Create tasks for all miners response_tasks = [ asyncio.create_task( - make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=True) + make_openai_query( + shared_settings.METAGRAPH, shared_settings.WALLET, timeout_seconds, body, uid, stream=True + ) ) for uid in selected_uids ] diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index b363c1b13..6059005d3 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -61,7 +61,9 @@ async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] = {"role": "user", "content": search_query}, ], } - stream_results = await query_miners(uids, body) + + timeout_seconds = 30 + stream_results = await query_miners(uids, body, timeout_seconds) results = [ "".join(res.accumulated_chunks) for res in stream_results