diff --git a/prompting/datasets/random_website.py b/prompting/datasets/random_website.py index ae70ed41f..044ad7dec 100644 --- a/prompting/datasets/random_website.py +++ b/prompting/datasets/random_website.py @@ -1,10 +1,10 @@ import random +from functools import lru_cache from typing import Optional import trafilatura from loguru import logger -# from duckduckgo_search import DDGS from prompting.base.duckduckgo_patch import PatchedDDGS from prompting.datasets.utils import ENGLISH_WORDS from shared.base import BaseDataset, Context, DatasetEntry @@ -38,6 +38,7 @@ def search_random_term(self, retries: int = 3) -> tuple[Optional[str], Optional[ return None, None @staticmethod + @lru_cache(maxsize=1000) def extract_website_content(url: str) -> Optional[str]: try: website = trafilatura.fetch_url(url) diff --git a/prompting/llms/apis/llm_wrapper.py b/prompting/llms/apis/llm_wrapper.py index d9d40646e..c1bc0cc33 100644 --- a/prompting/llms/apis/llm_wrapper.py +++ b/prompting/llms/apis/llm_wrapper.py @@ -7,6 +7,7 @@ class LLMWrapper: + @staticmethod def chat_complete( messages: LLMMessages, model="chat-llama-3-1-70b", @@ -29,27 +30,23 @@ def chat_complete( logprobs=logprobs, ) - except Exception as ex: - logger.exception(ex) - logger.warning("Failed to use SN19 API, falling back to GPT-3.5") - else: - if response is not None: - logger.debug(f"Generated {len(response)} characters using {model}") - return response - logger.warning( - "Failed to use SN19 API (check the SN19_API_KEY and/or SN19_API_URL), " "falling back to GPT-3.5" + except Exception: + logger.error( + "Failed to use SN19 API, falling back to GPT-3.5. " + "Make sure to specify 'SN19_API_KEY' and 'SN19_API_URL' in .env.validator" ) - model = "gpt-3.5-turbo" - response, _ = openai_client.chat_complete( - messages=messages, - model=model, - temperature=temperature, - max_tokens=max_tokens, - top_p=top_p, - stream=stream, - logprobs=logprobs, - ) - response = response.choices[0].message.content + if response is None: + model = "gpt-3.5-turbo" + response, _ = openai_client.chat_complete( + messages=messages, + model=model, + temperature=temperature, + max_tokens=max_tokens, + top_p=top_p, + stream=stream, + logprobs=logprobs, + ) + response = response.choices[0].message.content logger.debug(f"Generated {len(response)} characters using {model}") return response diff --git a/prompting/llms/apis/sn19_wrapper.py b/prompting/llms/apis/sn19_wrapper.py index 2e1d634df..4d50a280f 100644 --- a/prompting/llms/apis/sn19_wrapper.py +++ b/prompting/llms/apis/sn19_wrapper.py @@ -1,7 +1,6 @@ import json import requests -from loguru import logger from tenacity import retry, stop_after_attempt, wait_exponential from prompting.llms.apis.llm_messages import LLMMessages @@ -37,11 +36,8 @@ def chat_complete( "logprobs": logprobs, } response = requests.post(url, headers=headers, data=json.dumps(data), timeout=30) + response_json = response.json() try: - response_json = response.json() - try: - return response_json["choices"][0]["message"].get("content") - except KeyError: - return response_json["choices"][0]["delta"].get("content") - except Exception as e: - logger.exception(f"Error in chat_complete: {e}") + return response_json["choices"][0]["message"].get("content") + except KeyError: + return response_json["choices"][0]["delta"].get("content") diff --git a/prompting/llms/hf_llm.py b/prompting/llms/hf_llm.py index 934071e3a..3e931015f 100644 --- a/prompting/llms/hf_llm.py +++ b/prompting/llms/hf_llm.py @@ -65,12 +65,12 @@ def generate(self, messages: list[str] | list[dict], sampling_params=None, seed= )[0] logger.debug( - f"""REPRODUCIBLEHF WAS QUERIED: - PROMPT: {messages}\n\n - RESPONSES: {results}\n\n - SAMPLING PARAMS: {params}\n\n - SEED: {seed}\n\n - TIME FOR RESPONSE: {timer.elapsed_time}""" + f"""{self.__class__.__name__} queried: + prompt: {messages}\n + responses: {results}\n + sampling params: {params}\n + seed: {seed} + """ ) return results if len(results) > 1 else results[0] diff --git a/prompting/rewards/inference_reward_model.py b/prompting/rewards/inference_reward_model.py index e174f55f9..d6ccbfb99 100644 --- a/prompting/rewards/inference_reward_model.py +++ b/prompting/rewards/inference_reward_model.py @@ -6,7 +6,11 @@ class InferenceRewardModel(BaseRewardModel): def reward( - self, reference: str, response_event: DendriteResponseEvent, model_id: str | None = None + self, + reference: str, + response_event: DendriteResponseEvent, + model_id: str | None = None, + **kwargs, ) -> BatchRewardOutput: """Gives an exact reward of 1 if the response matches the reference, 0 otherwise""" if model_id: diff --git a/prompting/rewards/multi_choice.py b/prompting/rewards/multi_choice.py index 40948ad79..13050d70f 100644 --- a/prompting/rewards/multi_choice.py +++ b/prompting/rewards/multi_choice.py @@ -42,6 +42,9 @@ def process_predictions(self, predictions: dict[str, float]) -> dict[str, float] } total = sum(valid_choices.values()) + if np.isclose(total, 0.0): + raise ValueError(f"Values sum up to 0, total={total}") + if not np.isclose(total, 1.0): valid_choices = {k: v / total for k, v in valid_choices.items()} diff --git a/prompting/rewards/scoring.py b/prompting/rewards/scoring.py index 6b7d45bec..2ae481434 100644 --- a/prompting/rewards/scoring.py +++ b/prompting/rewards/scoring.py @@ -51,7 +51,7 @@ def add_to_queue( task_id=task_id, ) ) - logger.debug(f"SCORING: Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}") + logger.debug(f"Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}") async def run_step(self) -> RewardLoggingEvent: await asyncio.sleep(0.1) @@ -79,7 +79,8 @@ async def run_step(self) -> RewardLoggingEvent: # and there we then calculate the reward reward_pipeline = TaskRegistry.get_task_reward(scoring_config.task) logger.debug( - f"""{len(scoring_config.response.completions)} completions to score for task {scoring_config.task}""" + f"{len(scoring_config.response.completions)} completions to score for task " + f"{scoring_config.task.__class__.__name__}" ) reward_events = reward_pipeline.apply( response_event=scoring_config.response, @@ -89,11 +90,9 @@ async def run_step(self) -> RewardLoggingEvent: task=scoring_config.task, ) self.reward_events.append(reward_events) - # logger.debug( - # f"REFERENCE: {scoring_config.task.reference}\n\n||||RESPONSES: {scoring_config.response.completions}" - # ) logger.debug( - f"SCORING: Scored {scoring_config.task.__class__.__name__} {scoring_config.task.task_id} with model {scoring_config.task.llm_model_id} with reward" + f"Scored {scoring_config.task.__class__.__name__} {scoring_config.task.task_id} with model " + f"{scoring_config.task.llm_model_id}" ) log_event( RewardLoggingEvent( @@ -109,7 +108,6 @@ async def run_step(self) -> RewardLoggingEvent: source=scoring_config.dataset_entry.source, ) ) - logger.info("Adding scores to rewards_and_uids") await asyncio.sleep(0.01) diff --git a/prompting/rewards/web_retrieval.py b/prompting/rewards/web_retrieval.py index 69de0c42c..dcd6fade3 100644 --- a/prompting/rewards/web_retrieval.py +++ b/prompting/rewards/web_retrieval.py @@ -1,13 +1,4 @@ -"""Expected miner's response is a JSON object with the following keys: url, content, relevant. - -Example response: -{ - "url": "https://www.example.com", - "content": "This is the content of the website. This is the section we are interested in.", - "relevant": "This is the section we are interested in.", -} -""" - +"""Expected miner's response is a JSON object with the following keys: url, content, relevant for each website.""" import json import numpy as np @@ -27,9 +18,9 @@ class WebsiteResult(BaseModel): - url: str - content: str - relevant: str + url: str | None + content: str | None + relevant: str | None class WebRetrievalRewardModel(RelevanceRewardModel): diff --git a/prompting/tasks/task_sending.py b/prompting/tasks/task_sending.py index 36075e7a3..41042c693 100644 --- a/prompting/tasks/task_sending.py +++ b/prompting/tasks/task_sending.py @@ -70,14 +70,12 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None: body["target_results"] = task.target_results body["timeout"] = task.timeout - logger.info(f"🔍 SENDING TASK {task.task_id} WITH BODY: {body}") + logger.info(f"🔍 Sending task {task.task_id} with body: {body}") stream_results = await query_miners(uids, body) logger.debug(f"🔍 Collected responses from {len(stream_results)} miners") log_stream_results(stream_results) - logger.debug("🔍 Creating response event") - response_event = DendriteResponseEvent( stream_results=stream_results, uids=uids, diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index f37102409..74e92e0b5 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -153,7 +153,7 @@ async def start(self, reward_events, name: str | None = None): try: with np.load(FILENAME) as data: PAST_WEIGHTS = [data[key] for key in data.files] - logger.debug(f"Loaded Past Weights: {PAST_WEIGHTS}") + logger.debug(f"Loaded persistent weights of length: {len(PAST_WEIGHTS)}") except FileNotFoundError: logger.info("No weights file found - this is expected on a new validator, starting with empty weights") PAST_WEIGHTS = [] @@ -164,7 +164,6 @@ async def start(self, reward_events, name: str | None = None): async def run_step(self): await asyncio.sleep(0.01) try: - logger.info("Reward setting loop running") if len(self.reward_events) == 0: logger.warning("No reward events in queue, skipping weight setting...") return @@ -183,8 +182,6 @@ async def run_step(self): config: {uid: {"reward": 0, "count": 0} for uid in range(1024)} for config in TaskRegistry.task_configs } - logger.debug(f"Miner rewards before processing: {miner_rewards}") - inference_events: list[WeightedRewardEvent] = [] for reward_events in self.reward_events: await asyncio.sleep(0.01) @@ -207,8 +204,6 @@ async def run_step(self): 1 * reward_event.weight ) # TODO: Double check I actually average at the end - logger.debug(f"Miner rewards after processing: {miner_rewards}") - for inference_event in inference_events: for uid, reward in zip(inference_event.uids, inference_event.rewards): llm_model = inference_event.task.llm_model_id @@ -233,9 +228,16 @@ async def run_step(self): final_rewards = np.array(list(reward_dict.values())).astype(float) final_rewards[final_rewards < 0] = 0 final_rewards /= np.sum(final_rewards) + 1e-10 - logger.debug(f"Final reward dict: {final_rewards}") except Exception as ex: logger.exception(f"{ex}") + + mean_value = final_rewards.mean() + min_value = final_rewards.min() + max_value = final_rewards.max() + length = len(final_rewards) + logger.debug( + f"Reward stats. Mean: {mean_value:.2f}; Min: {min_value:.4f}; Max: {max_value:.4f}; Count: {length}" + ) # set weights on chain set_weights( final_rewards, step=self.step, subtensor=shared_settings.SUBTENSOR, metagraph=shared_settings.METAGRAPH diff --git a/pyproject.toml b/pyproject.toml index 12a4c79f4..907a17080 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "prompting" -version = "2.17.0" +version = "2.17.1" description = "Subnetwork 1 runs on Bittensor and is maintained by Macrocosmos. It's an effort to create decentralised AI" authors = ["Kalei Brady, Dmytro Bobrenko, Felix Quinque, Steffen Cruz, Richard Wardle"] readme = "README.md" diff --git a/shared/logging.py b/shared/logging.py index 038910cb7..283c2541c 100644 --- a/shared/logging.py +++ b/shared/logging.py @@ -159,16 +159,17 @@ class ValidatorLoggingEvent(BaseEvent): def __str__(self): sample_completions = [completion for completion in self.response_event.completions if len(completion) > 0] + forward_time = round(self.forward_time, 4) if self.forward_time else self.forward_time return f"""ValidatorLoggingEvent: Block: {self.block} Step: {self.step} - Step Time: {self.step_time} - forward_time: {self.forward_time} - task_id: {self.task_id} + Step time: {self.step_time:.4f} + Forward time: {forward_time} + Task id: {self.task_id} Number of total completions: {len(self.response_event.completions)} Number of non-empty completions: {len(sample_completions)} - Sample Completions: {sample_completions[:5]}... - """ + Sample 1 completion: {sample_completions[:1]} + """ class RewardLoggingEvent(BaseEvent): diff --git a/shared/loop_runner.py b/shared/loop_runner.py index af63570ee..43d380bd4 100644 --- a/shared/loop_runner.py +++ b/shared/loop_runner.py @@ -60,7 +60,6 @@ def next_sync_point(self, current_time): async def wait_for_next_execution(self, last_run_time): """Wait until the next execution time, either synced or based on last run.""" current_time = await self.get_time() - logger.debug("Current time") if self.sync: next_run = self.next_sync_point(current_time) else: diff --git a/shared/profiling.py b/shared/profiling.py index 868f62d5f..c6ed8cd0e 100644 --- a/shared/profiling.py +++ b/shared/profiling.py @@ -23,7 +23,7 @@ def __init__(self): "thread_ids": set(), } ) - self.start_time = time.time() + self.start_time = time.perf_counter() self._active_measurements = set() self.process = psutil.Process() # Initialize process CPU times @@ -59,8 +59,8 @@ def measure(self, loop_name): async def print_stats(self): while True: - await asyncio.sleep(60) # Report every minute - total_runtime = time.time() - self.start_time + await asyncio.sleep(5 * 60) # Report every 5 minutes + total_runtime = time.perf_counter() - self.start_time logging.info("\n=== Loop Profiling Stats ===") logging.info(f"Total wall clock time: {total_runtime:.2f}s") diff --git a/shared/settings.py b/shared/settings.py index eaf5a8e02..ab61340df 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -55,6 +55,7 @@ class SharedSettings(BaseSettings): # Neuron parameters. NEURON_TIMEOUT: int = Field(20, env="NEURON_TIMEOUT") + INFERENCE_TIMEOUT: int = Field(60, env="INFERENCE_TIMEOUT") NEURON_DISABLE_SET_WEIGHTS: bool = Field(False, env="NEURON_DISABLE_SET_WEIGHTS") NEURON_MOVING_AVERAGE_ALPHA: float = Field(0.1, env="NEURON_MOVING_AVERAGE_ALPHA") NEURON_DECAY_ALPHA: float = Field(0.001, env="NEURON_DECAY_ALPHA")