From 6efdb5d07dc1187a9f1bd20e806579a1a63c7ce5 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Wed, 25 Jun 2025 17:03:25 +0000 Subject: [PATCH 1/5] Fix rewards running history --- .gitignore | 3 + prompting/tasks/task_creation.py | 2 +- prompting/weight_setting/weight_setter.py | 342 ++++++++++-------- shared/settings.py | 4 + tests/prompting/test_weight_settings.py | 145 -------- .../weight_setting/test_weight_setter.py | 215 +++++++++++ 6 files changed, 412 insertions(+), 299 deletions(-) delete mode 100644 tests/prompting/test_weight_settings.py create mode 100644 tests/prompting/weight_setting/test_weight_setter.py diff --git a/.gitignore b/.gitignore index eb2b2dbcd..105e330bd 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,9 @@ __pycache__/ *.npy *.npz prompting/storage/ +validator_rewards.jsonl +test_validator_rewards.jsonl +uid_tracker.sqlite* # C extensions *.so diff --git a/prompting/tasks/task_creation.py b/prompting/tasks/task_creation.py index b49e354f7..098dc647e 100644 --- a/prompting/tasks/task_creation.py +++ b/prompting/tasks/task_creation.py @@ -18,7 +18,7 @@ class TaskLoop(AsyncLoopRunner): is_running: bool = False thread: threading.Thread = None - interval: int = 20 + interval: int = 2 task_queue: list | None = [] scoring_queue: list | None = [] miners_dict: dict | None = None diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index 6cebbc639..879f1dba6 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -1,90 +1,56 @@ import asyncio -import os +from collections import deque +import datetime +import json +from pathlib import Path +from typing import Any import bittensor as bt import numpy as np -import pandas as pd +import numpy.typing as npt from loguru import logger from prompting import __spec_version__ from prompting.rewards.reward import WeightedRewardEvent from prompting.tasks.inference import InferenceTask +from prompting.tasks.msrv2_task import MSRv2Task from prompting.tasks.task_registry import TaskConfig, TaskRegistry from prompting.weight_setting.weight_synchronizer import WeightSynchronizer from shared import settings from shared.loop_runner import AsyncLoopRunner -from shared.misc import ttl_get_block shared_settings = settings.shared_settings -FILENAME = "validator_weights.npz" -WEIGHTS_HISTORY_LENGTH = 24 -PAST_WEIGHTS: list[np.ndarray] = [] - - -def apply_reward_func(raw_rewards: np.ndarray, p=0.5): - """Apply the reward function to the raw rewards. P adjusts the steepness of the function - p = 0.5 leaves - the rewards unchanged, p < 0.5 makes the function more linear (at p=0 all miners with positives reward values get the same reward), - p > 0.5 makes the function more exponential (winner takes all). - """ - exponent = (p**6.64385619) * 100 # 6.64385619 = ln(100)/ln(2) -> this way if p=0.5, the exponent is exatly 1 - raw_rewards = np.array(raw_rewards) / max(1, (np.sum(raw_rewards[raw_rewards > 0]) + 1e-10)) - positive_rewards = np.clip(raw_rewards, 1e-10, np.inf) - normalised_rewards = positive_rewards / np.max(positive_rewards) - post_func_rewards = normalised_rewards**exponent - all_rewards = post_func_rewards / (np.sum(post_func_rewards) + 1e-10) - all_rewards[raw_rewards <= 0] = raw_rewards[raw_rewards <= 0] - return all_rewards - - -def save_weights(weights: list[np.ndarray]): - """Saves the list of numpy arrays to a file.""" - # Save all arrays into a single .npz file - np.savez_compressed(FILENAME, *weights) - async def set_weights( weights: np.ndarray, - step: int = 0, subtensor: bt.Subtensor | None = None, metagraph: bt.Metagraph | None = None, weight_syncer: WeightSynchronizer | None = None, ): + """Set the validator weights to the metagraph hotkeys based on the scores it has received from the miners. + + The weights determine the trust and incentive level the validator assigns to miner nodes on the network. """ - Sets the validator weights to the metagraph hotkeys based on the scores it has received from the miners. The weights determine the trust and incentive level the validator assigns to miner nodes on the network. - """ - # Check if self.scores contains any NaN values and log a warning if it does. try: if any(np.isnan(weights).flatten()): - logger.warning( - f"Scores contain NaN values. This may be due to a lack of responses from miners, or a bug in your reward functions. Scores: {weights}" - ) - - # Replace any NaN values with 0 + logger.warning(f"Scores used for weight setting contain NaN values: {weights}") weights = np.nan_to_num(weights, nan=0.0) - # Calculate the average reward for each uid across non-zero values. - PAST_WEIGHTS.append(weights) - if len(PAST_WEIGHTS) > WEIGHTS_HISTORY_LENGTH: - PAST_WEIGHTS.pop(0) - averaged_weights = np.average(np.array(PAST_WEIGHTS), axis=0) - save_weights(PAST_WEIGHTS) + try: - if ( - shared_settings.NEURON_DISABLE_SET_WEIGHTS - ): # If weights will not be set on chain, we should not synchronize - augmented_weights = averaged_weights + if shared_settings.NEURON_DISABLE_SET_WEIGHTS: + # If weights will not be set on chain, we should not synchronize. + augmented_weights = weights else: augmented_weights = await weight_syncer.get_augmented_weights( - weights=averaged_weights, uid=shared_settings.UID + weights=weights, uid=shared_settings.UID ) - except Exception as ex: + except BaseException as ex: logger.exception(f"Issue with setting weights: {ex}") - augmented_weights = averaged_weights + augmented_weights = weights + # Process the raw weights to final_weights via subtensor limitations. - ( - processed_weight_uids, - processed_weights, - ) = bt.utils.weight_utils.process_weights_for_netuid( + processed_weight_uids, processed_weights = bt.utils.weight_utils.process_weights_for_netuid( uids=shared_settings.METAGRAPH.uids, weights=augmented_weights, netuid=shared_settings.NETUID, @@ -93,35 +59,13 @@ async def set_weights( ) # Convert to uint16 weights and uids. - ( - uint_uids, - uint_weights, - ) = bt.utils.weight_utils.convert_weights_and_uids_for_emit( - uids=processed_weight_uids, weights=processed_weights + uint_uids, uint_weights = bt.utils.weight_utils.convert_weights_and_uids_for_emit( + uids=processed_weight_uids, + weights=processed_weights ) except Exception as ex: logger.exception(f"Issue with setting weights: {ex}") - # Create a dataframe from weights and uids and save it as a csv file, with the current step as the filename. - if shared_settings.LOG_WEIGHTS: - try: - weights_df = pd.DataFrame( - { - "step": step, - "uids": uint_uids, - "weights": processed_weights.flatten(), - "raw_weights": str(list(weights.flatten())), - "averaged_weights": str(list(averaged_weights.flatten())), - "block": ttl_get_block(subtensor=subtensor), - } - ) - step_filename = "weights.csv" - file_exists = os.path.isfile(step_filename) - # Append to the file if it exists, otherwise write a new file. - weights_df.to_csv(step_filename, mode="a", index=False, header=not file_exists) - except Exception as ex: - logger.exception(f"Couldn't write to df: {ex}") - if shared_settings.NEURON_DISABLE_SET_WEIGHTS: logger.debug(f"Set weights disabled: {shared_settings.NEURON_DISABLE_SET_WEIGHTS}") return @@ -147,107 +91,199 @@ class WeightSetter(AsyncLoopRunner): """The weight setter looks at RewardEvents in the reward_events queue and sets the weights of the miners accordingly.""" sync: bool = True - interval: int = 60 * 21 # set rewards every 25 minutes + # interval: int = 60 * 21 + interval: int = 60 * 3 reward_events: list[list[WeightedRewardEvent]] | None = None - subtensor: bt.Subtensor | None = None - metagraph: bt.Metagraph | None = None weight_dict: dict[int, list[float]] | None = None weight_syncer: WeightSynchronizer | None = None - # interval: int = 60 + + reward_history_path: Path = Path("validator_rewards.jsonl") + reward_history_len: int = 24 + # List of uids info per each epoch, e.g.: [{1: {"reward": 1.0}, 2: {"reward": 3.0}}]. + reward_history: deque[dict[int, dict[str, Any]]] | None = None class Config: arbitrary_types_allowed = True - async def start(self, reward_events, weight_dict, name: str | None = None, **kwargs): + async def start( + self, + reward_events: list[list[WeightedRewardEvent]] | None, + weight_dict: dict[int, list[float]], + name: str | None = None, + ): self.reward_events = reward_events - self.weight_dict = weight_dict - global PAST_WEIGHTS self.weight_syncer = WeightSynchronizer( metagraph=shared_settings.METAGRAPH, wallet=shared_settings.WALLET, weight_dict=weight_dict ) + await self._load_rewards() + return await super().start(name=name) + + async def _save_rewards(self, rewards: npt.NDArray[np.float32]): + """Persist the latest epoch rewards. + + The snapshot is appended to `reward_history` (bounded by `reward_average_len`) and the JSONL file at + `reward_average_path` is rewritten with the current buffer. + + Args: + rewards: A one-dimensional array where the index is the uid and the value is its reward. + """ + epoch_rewards = {int(uid): {"reward": float(r)} for uid, r in enumerate(rewards)} + + if not isinstance(self.reward_history, deque): + self.reward_history = deque(maxlen=self.reward_history_len) + self.reward_history.append(epoch_rewards) + + try: + block = shared_settings.block + with self.reward_history_path.open("w", encoding="utf-8") as file: + for snapshot in self.reward_history: + row: dict[str, Any] = { + "ts": datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") + "Z", + "block": block, + "rewards": {str(uid): v["reward"] for uid, v in snapshot.items()}, + } + file.write(json.dumps(row, separators=(",", ":")) + "\n") + except BaseException as exc: + logger.error(f"Couldn't write rewards history: {exc}") + + async def _load_rewards(self): + """Load reward snapshots from disk into `reward_history`. + + Only the newest `reward_average_len` rows are retained. + """ + self.reward_history: deque[dict[int, dict[str, Any]]] | None = deque(maxlen=self.reward_history_len) + if not self.reward_history_path.exists(): + logger.info("No rewards file found - starting with empty history.") + return + try: - with np.load(FILENAME) as data: - PAST_WEIGHTS = [data[key] for key in data.files] - except FileNotFoundError: - logger.info("No weights file found - this is expected on a new validator, starting with empty weights") - PAST_WEIGHTS = [] - except Exception as ex: - logger.error(f"Couldn't load weights from file: {ex}") - return await super().start(name=name, **kwargs) + with self.reward_history_path.open("r", encoding="utf-8") as file: + for line in file: + data = json.loads(line) + payload = data.get("rewards") + if payload is None: + raise ValueError(f"Malformed weight history file: {data}") + + self.reward_history.append( + {int(uid): {"reward": float(reward)} for uid, reward in payload.items()} + ) + except BaseException as exc: + self.reward_history: deque[dict[int, dict[str, Any]]] | None = deque(maxlen=self.reward_history_len) + logger.error(f"Couldn't load rewards from file, resetting weight history: {exc}") + + @classmethod + async def merge_task_rewards(cls, reward_events: list[list[WeightedRewardEvent]]) -> npt.NDArray[np.float32] | None: + if len(reward_events) == 0: + logger.warning("No reward events in queue, skipping weight setting...") + return + + all_uids = range(shared_settings.METAGRAPH.n.item()) + reward_dict = {uid: 0 for uid in all_uids} + logger.info(f"Setting weights for {len(reward_dict)} uids") + + # miner_rewards is a dictionary that separates each task config into a dictionary of uids with their rewards. + miner_rewards: dict[TaskConfig, dict[int, dict[str, int]]] = { + config: {uid: {"reward": 0, "count": 0} for uid in all_uids} for config in TaskRegistry.task_configs + } + + linear_reward_tasks = set([InferenceTask, MSRv2Task]) + linear_events: list[WeightedRewardEvent] = [] + for reward_sub_events in reward_events: + await asyncio.sleep(0.01) + for reward_event in reward_sub_events: + task_config = TaskRegistry.get_task_config(reward_event.task) + + # Inference task uses a different reward model. + if task_config.task in linear_reward_tasks: + linear_events.append(reward_event) + continue + + # Give each uid the reward they received. + for uid, reward in zip(reward_event.uids, reward_event.rewards): + miner_rewards[task_config][uid]["reward"] += reward * reward_event.weight + miner_rewards[task_config][uid]["count"] += reward_event.weight + + for linear_event in linear_events: + task_config = TaskRegistry.get_task_config(linear_event.task) + for uid, reward in zip(linear_event.uids, linear_event.rewards): + miner_rewards[task_config][uid]["reward"] += reward + + for task_config, rewards in miner_rewards.items(): + task_rewards = np.array([x["reward"] / max(1, x["count"]) for x in list(rewards.values())]) + task_uids = np.array(list(rewards.keys())) + if task_config.task in linear_reward_tasks: + processed_rewards = task_rewards / max(1, (np.sum(task_rewards[task_rewards > 0]) + 1e-10)) + else: + processed_rewards = cls.apply_steepness( + raw_rewards=task_rewards, + steepness=shared_settings.REWARD_STEEPNESS + ) + processed_rewards *= task_config.probability + + for uid, reward in zip(task_uids, processed_rewards): + reward_dict[uid] += reward + + if shared_settings.LOG_WEIGHTS: + try: + with open("rewards.jsonl", "a+") as f: + for config, rewards in miner_rewards.items(): + task_name = config.task.__name__ if hasattr(config.task, "__name__") else str(config.task) + rewards_str = {str(uid): reward_info["reward"] for uid, reward_info in rewards.items()} + record = {"task_config": task_name, "rewards": rewards_str} + f.write(json.dumps(record) + "\n") + except Exception as e: + logger.exception(f"Failed to save miner_rewards to miner_rewards.jsonl: {e}") + + final_rewards = np.array(list(reward_dict.values())).astype(np.float32) + return final_rewards + + @classmethod + def apply_steepness(cls, raw_rewards: npt.NDArray[np.float32], steepness: float = 0.5) -> npt.NDArray[np.float32]: + """Apply steepness function to the raw rewards. + + Args: + steepness: Adjusts the steepness of the function - p = 0.5 leaves the rewards unchanged, + p < 0.5 makes the function more linear (at p=0 all miners with positives reward values get the same reward), + p > 0.5 makes the function more exponential (winner takes all). + """ + # 6.64385619 = ln(100)/ln(2) -> this way if p = 0.5, the exponent is exactly 1. + exponent = (steepness ** 6.64385619) * 100 + raw_rewards = np.array(raw_rewards) / max(1, (np.sum(raw_rewards[raw_rewards > 0]) + 1e-10)) + positive_rewards = np.clip(raw_rewards, 1e-10, np.inf) + normalised_rewards = positive_rewards / np.max(positive_rewards) + post_func_rewards = normalised_rewards ** exponent + all_rewards = post_func_rewards / (np.sum(post_func_rewards) + 1e-10) + all_rewards[raw_rewards <= 0] = raw_rewards[raw_rewards <= 0] + return all_rewards async def run_step(self): await asyncio.sleep(0.01) try: - if len(self.reward_events) == 0: - logger.warning("No reward events in queue, skipping weight setting...") + if self.reward_events is None: + logger.error(f"No rewards evants were found, skipping weight setting") + return + + final_rewards = await self.merge_task_rewards(self.reward_events) + + if final_rewards is None: + logger.error(f"No rewards were found, skipping weight setting") return - # reward_events is a list of lists of WeightedRewardEvents - the 'sublists' each contain the multiple reward events for a single task - self.reward_events: list[list[WeightedRewardEvent]] = self.reward_events # to get correct typehinting - - # reward_dict = {uid: 0 for uid in get_uids(sampling_mode="all")} - all_uids = range(shared_settings.METAGRAPH.n.item()) - reward_dict = {uid: 0 for uid in all_uids} - logger.info(f"Setting weights for {len(reward_dict)} uids") - # miner_rewards is a dictionary that separates each task config into a dictionary of uids with their rewards - miner_rewards: dict[TaskConfig, dict[int, float]] = { - config: {uid: {"reward": 0, "count": 0} for uid in all_uids} for config in TaskRegistry.task_configs - } - - inference_events: list[WeightedRewardEvent] = [] - for reward_events in self.reward_events: - await asyncio.sleep(0.01) - for reward_event in reward_events: - if np.sum(reward_event.rewards) > 0: - logger.debug("Identified positive reward event") - task_config = TaskRegistry.get_task_config(reward_event.task) - - # inference task uses a different reward model - if task_config.task == InferenceTask: - inference_events.append(reward_event) - continue - - # give each uid the reward they received - for uid, reward in zip(reward_event.uids, reward_event.rewards): - miner_rewards[task_config][uid]["reward"] += ( - reward * reward_event.weight - ) # TODO: Double check I actually average at the end - miner_rewards[task_config][uid]["count"] += ( - 1 * reward_event.weight - ) # TODO: Double check I actually average at the end - - for inference_event in inference_events: - for uid, reward in zip(inference_event.uids, inference_event.rewards): - miner_rewards[TaskRegistry.get_task_config(InferenceTask)][uid]["reward"] += ( - reward # for inference 2x responses should mean 2x the reward - ) - for task_config, rewards in miner_rewards.items(): - r = np.array([x["reward"] / max(1, x["count"]) for x in list(rewards.values())]) - u = np.array(list(rewards.keys())) - if task_config.task == InferenceTask: - processed_rewards = r / max(1, (np.sum(r[r > 0]) + 1e-10)) - else: - processed_rewards = apply_reward_func(raw_rewards=r, p=shared_settings.REWARD_STEEPNESS) - processed_rewards *= task_config.probability - # update reward dict - for uid, reward in zip(u, processed_rewards): - reward_dict[uid] += reward - final_rewards = np.array(list(reward_dict.values())).astype(float) + await self._save_rewards(final_rewards) final_rewards[final_rewards < 0] = 0 final_rewards /= np.sum(final_rewards) + 1e-10 - except Exception as ex: + except BaseException as ex: logger.exception(f"{ex}") - # set weights on chain + # Set weights on chain. await set_weights( final_rewards, - step=self.step, subtensor=shared_settings.SUBTENSOR, - metagraph=shared_settings.METAGRAPH, + metagraph=shared_settings.metagraph_force_sync(), weight_syncer=self.weight_syncer, ) - # TODO: empty rewards queue only on weight setting success - self.reward_events[:] = [] # empty reward events queue + # TODO: Empty rewards queue only on weight setting success. + self.reward_events[:] = [] await asyncio.sleep(0.01) return final_rewards diff --git a/shared/settings.py b/shared/settings.py index 73497231f..658dbc1e5 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -263,6 +263,10 @@ def SUBTENSOR(self) -> Subtensor: self._subtensor = Subtensor(network=subtensor_network) return self._subtensor + def metagraph_force_sync(self) -> Metagraph: + self._last_update_time = 0 + return self.METAGRAPH + @property def METAGRAPH(self) -> Metagraph: if time.time() - self._last_update_time > 1200: diff --git a/tests/prompting/test_weight_settings.py b/tests/prompting/test_weight_settings.py deleted file mode 100644 index 00b923a5f..000000000 --- a/tests/prompting/test_weight_settings.py +++ /dev/null @@ -1,145 +0,0 @@ -# ruff: noqa: E402 -import asyncio - -import numpy as np - -from shared import settings - -settings.shared_settings = settings.SharedSettings(mode="mock") -raw_rewards = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) -from unittest.mock import MagicMock, patch - -from prompting.weight_setting.weight_setter import WeightSetter, apply_reward_func - - -def test_apply_reward_func(): - raw_rewards = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) - - # test result is even returned - result = apply_reward_func(raw_rewards) - assert result is not None, "Result was None" - - # Test with p = 0.5 (no change) - result = apply_reward_func(raw_rewards, p=0.5) - assert np.allclose( - result, raw_rewards / np.sum(raw_rewards), atol=1e-6 - ), "Result should be unchanged from raw rewards" - - # Test with p = 0 (more linear) - result = apply_reward_func(raw_rewards, p=0) - assert np.isclose(np.std(result), 0, atol=1e-6), "All rewards should be equal" - - # Test with p = 1 (more exponential) - result = apply_reward_func(raw_rewards, p=1) - assert result[-1] > 0.9, "Top miner should take vast majority of reward" - - # Test with negative values - raw_rewards = np.array([-1.0, 0.0, 1.0, 2.0, 3.0]) - result = apply_reward_func(raw_rewards, p=0.5) - assert result[0] < 0, "Negative reward should remain negative" - - -def test_run_step_with_reward_events(): - with ( - patch("shared.uids.get_uids") as mock_get_uids, - patch("prompting.weight_setting.weight_setter.TaskRegistry") as MockTaskRegistry, - # patch("prompting.weight_setting.weight_setter.mutable_globals") as mock_mutable_globals, - patch("prompting.weight_setting.weight_setter.set_weights") as mock_set_weights, - patch("prompting.weight_setting.weight_setter.logger") as mock_logger, - ): - - class MockTask: - pass - - class TaskConfig: - def __init__(self, name, probability): - self.name = name - self.probability = probability - self.task = MockTask - - class WeightedRewardEvent: - def __init__(self, task, uids, rewards, weight): - self.task = task - self.uids = uids - self.rewards = rewards - self.weight = weight - - mock_uids = [1, 2, 3, 4, 5] - mock_get_uids.return_value = mock_uids - - # Set up the mock TaskRegistry - mock_task_registry = MockTaskRegistry - mock_task_registry.task_configs = [ - TaskConfig(name="Task1", probability=0.5), - ] - mock_task_registry.get_task_config = MagicMock(return_value=mock_task_registry.task_configs[0]) - - # Set up the mock mutable_globals - - weight_setter = WeightSetter() - reward_events = [ - [ - WeightedRewardEvent( - task=mock_task_registry.task_configs[0], uids=mock_uids, rewards=[1.0, 2.0, 3.0, 4.0, 5.0], weight=1 - ), - ], - [ - WeightedRewardEvent( - task=mock_task_registry.task_configs[0], uids=mock_uids, rewards=[5.0, 4.0, 3.0, 2.0, 1.0], weight=1 - ), - ], - ] - weight_setter.reward_events = reward_events - output = asyncio.run(weight_setter.run_step()) - - print(output) - mock_set_weights.assert_called_once() - call_args = mock_set_weights.call_args[0] - assert len([c for c in call_args[0] if c > 0]) == len(mock_uids) - assert np.isclose(np.sum(call_args[0]), 1, atol=1e-6) - - # Check that the warning about empty reward events is not logged - mock_logger.warning.assert_not_called() - - -# def test_run_step_without_reward_events(weight_setter): -# with ( -# patch("prompting.weight_setter.get_uids") as mock_get_uids, -# patch("prompting.weight_setter.TaskRegistry.task_configs", new_callable=property) as mock_task_configs, -# patch("prompting.weight_setter.mutable_globals.reward_events") as mock_reward_events, -# patch("prompting.weight_setter.set_weights") as mock_set_weights, -# ): - -# mock_get_uids.return_value = [1, 2, 3, 4, 5] -# mock_task_configs.return_value = [ -# TaskConfig(name="Task1", probability=0.5), -# TaskConfig(name="Task2", probability=0.3), -# ] -# mock_reward_events.return_value = [] - -# weight_setter.run_step() - -# mock_set_weights.assert_not_called() - - -# def test_set_weights(): -# with ( -# patch("prompting.weight_setter.settings.SUBTENSOR") as mock_subtensor, -# patch("prompting.weight_setter.settings.WALLET") as mock_wallet, -# patch("prompting.weight_setter.settings.NETUID") as mock_netuid, -# patch("prompting.weight_setter.settings.METAGRAPH") as mock_metagraph, -# patch("prompting.weight_setter.pd.DataFrame.to_csv") as mock_to_csv, -# ): - -# weights = np.array([0.1, 0.2, 0.3, 0.4]) -# uids = np.array([1, 2, 3, 4]) -# mock_metagraph.uids = uids - -# set_weights(weights) - -# # Check if weights were processed and set -# mock_subtensor.set_weights.assert_called_once() - -# # Check if weights were logged -# if settings.LOG_WEIGHTS: -# mock_to_csv.assert_called_once() diff --git a/tests/prompting/weight_setting/test_weight_setter.py b/tests/prompting/weight_setting/test_weight_setter.py new file mode 100644 index 000000000..348f913ab --- /dev/null +++ b/tests/prompting/weight_setting/test_weight_setter.py @@ -0,0 +1,215 @@ +# ruff: noqa: E402 +import asyncio +from pathlib import Path +from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, patch +import pytest +from pytest import MonkeyPatch +from types import SimpleNamespace + +import numpy as np + +from prompting.weight_setting.weight_setter import set_weights + +from shared import settings +settings.shared_settings = settings.SharedSettings(mode="mock") + +from prompting.weight_setting import weight_setter +from prompting.tasks.inference import InferenceTask +from prompting.tasks.msrv2_task import MSRv2Task +from prompting.tasks.web_retrieval import WebRetrievalTask +from prompting.weight_setting.weight_setter import WeightSetter +from prompting.rewards.reward import WeightedRewardEvent + + +UIDS: list[int] = list(range(256)) + + +def _make_event(task_cls: type, rewards: list[float]) -> WeightedRewardEvent: + """Return a fully-populated WeightedRewardEvent for the given task.""" + return WeightedRewardEvent( + weight=1.0, + task=task_cls, + reward_model_name="test", + rewards=rewards, + rewards_normalized=rewards, + timings=[0.0] * len(rewards), + reward_model_type="reward", + batch_time=0.0, + uids=UIDS, + threshold=None, + extra_info=None, + reward_type="reward", + ) + + +@pytest.mark.asyncio +async def test_merge_task_rewards() -> None: + negative_uids: set[int] = {0, 1, 2} + inference_rewards: list[float] = [ + -2.0 if uid in negative_uids else 1.0 for uid in UIDS + ] + msrv2_rewards: list[float] = [1.0] * len(UIDS) + web_rewards: list[float] = [1.0] * len(UIDS) + + events: list[list[WeightedRewardEvent]] = [ + [ + _make_event(InferenceTask(), inference_rewards), + _make_event(MSRv2Task(), msrv2_rewards), + _make_event(WebRetrievalTask(), web_rewards), + ] + ] + + final_rewards = await WeightSetter.merge_task_rewards(events) + + assert isinstance(final_rewards, np.ndarray) + assert final_rewards.dtype == np.float32 + assert final_rewards.shape == (len(UIDS),) + assert int((final_rewards < 0).sum()) == 3 + + +def test_steepness(): + raw_rewards = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) + + # Test result is even returned. + result = WeightSetter.apply_steepness(raw_rewards) + assert result is not None, "Result was None" + + # Test with p = 0.5 (no change). + result = WeightSetter.apply_steepness(raw_rewards, steepness=0.5) + assert np.allclose( + result, raw_rewards / np.sum(raw_rewards), atol=1e-6 + ), "Result should be unchanged from raw rewards" + + # Test with p = 0 (more linear). + result = WeightSetter.apply_steepness(raw_rewards, steepness=0) + assert np.isclose(np.std(result), 0, atol=1e-6), "All rewards should be equal" + + # Test with p = 1 (more exponential). + result = WeightSetter.apply_steepness(raw_rewards, steepness=1) + assert result[-1] > 0.9, "Top miner should take vast majority of reward" + + # Test with negative values. + raw_rewards = np.array([-1.0, 0.0, 1.0, 2.0, 3.0]) + result = WeightSetter.apply_steepness(raw_rewards, steepness=0.5) + assert result[0] < 0, "Negative reward should remain negative" + + +def test_run_step_with_reward_events(): + with ( + patch("shared.uids.get_uids") as mock_get_uids, + patch("prompting.weight_setting.weight_setter.TaskRegistry") as MockTaskRegistry, + patch("prompting.weight_setting.weight_setter.set_weights") as mock_set_weights, + patch("prompting.weight_setting.weight_setter.logger") as mock_logger, + ): + class MockTask: + pass + + class TaskConfig: + def __init__(self, name, probability): + self.name = name + self.probability = probability + self.task = MockTask + + class WeightedRewardEvent: + def __init__(self, task, uids, rewards, weight): + self.task = task + self.uids = uids + self.rewards = rewards + self.weight = weight + + mock_uids = [1, 2, 3, 4, 5] + mock_get_uids.return_value = mock_uids + + # Set up the mock TaskRegistry + mock_task_registry = MockTaskRegistry + mock_task_registry.task_configs = [ + TaskConfig(name="Task1", probability=0.5), + ] + mock_task_registry.get_task_config = MagicMock(return_value=mock_task_registry.task_configs[0]) + + # Set up the mock mutable_globals. + + weight_setter = WeightSetter(reward_history_path=Path("test_validator_rewards.jsonl")) + reward_events = [ + [ + WeightedRewardEvent( + task=mock_task_registry.task_configs[0], + uids=mock_uids, + rewards=[1.0, 2.0, 3.0, 4.0, 5.0], + weight=1, + ), + ], + [ + WeightedRewardEvent( + task=mock_task_registry.task_configs[0], + uids=mock_uids, + rewards=[-5.0, -4.0, -3.0, -2.0, -1.0], + weight=1, + ), + ], + ] + weight_setter.reward_events = reward_events + asyncio.run(weight_setter.run_step()) + + mock_set_weights.assert_called_once() + call_args = mock_set_weights.call_args[0] + weights = call_args[0] + + assert weights[0] <= 0 + assert weights[1] <= 0 + assert weights[2] == 0 + assert weights[3] >= 0 + assert weights[4] >= 0 + + # Weights are re-normalised to 1. + assert np.isclose(weights.sum(), 1.0, atol=1e-6) + + # Check that the warning about empty reward events is not logged. + mock_logger.warning.assert_not_called() + + +@pytest.mark.asyncio +async def test_set_weights(monkeypatch: MonkeyPatch): + """`set_weights` calls Subtensor.set_weights with processed vectors.""" + stub_settings = SimpleNamespace( + NEURON_DISABLE_SET_WEIGHTS=False, + UID=0, + NETUID=42, + WALLET="dummy-wallet", + METAGRAPH=SimpleNamespace(uids=np.arange(4, dtype=np.uint16)), + ) + + subtensor_mock = MagicMock() + subtensor_mock.set_weights = MagicMock(return_value=(True, "ok")) + stub_settings.SUBTENSOR = subtensor_mock + monkeypatch.setattr(weight_setter, "shared_settings", stub_settings) + + monkeypatch.setattr( + weight_setter.bt.utils.weight_utils, + "process_weights_for_netuid", + lambda *, uids, weights, **_: (uids, weights), + ) + monkeypatch.setattr( + weight_setter.bt.utils.weight_utils, + "convert_weights_and_uids_for_emit", + lambda uids, weights: (uids.astype(np.uint16), (weights * 65535).astype(np.uint16)), + ) + + class _Syncer: + async def get_augmented_weights(self, *, weights: np.ndarray, uid: int) -> np.ndarray: # noqa: D401 + return weights + + raw = np.array([0.1, 0.2, 0.3, 0.4], dtype=np.float32) + await weight_setter.set_weights( + raw, + subtensor=stub_settings.SUBTENSOR, + metagraph=stub_settings.METAGRAPH, + weight_syncer=_Syncer(), + ) + + subtensor_mock.set_weights.assert_called_once() + call_kwargs = subtensor_mock.set_weights.call_args.kwargs + + expected_uint16 = (raw * 65535).astype(np.uint16) + assert np.array_equal(call_kwargs["weights"], expected_uint16) From 30193a3edd7a317ca3727e96523c4d0a73aa0d5a Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Wed, 25 Jun 2025 17:07:03 +0000 Subject: [PATCH 2/5] Remove debug code --- prompting/tasks/task_creation.py | 2 +- prompting/weight_setting/weight_setter.py | 11 ----------- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/prompting/tasks/task_creation.py b/prompting/tasks/task_creation.py index 098dc647e..b49e354f7 100644 --- a/prompting/tasks/task_creation.py +++ b/prompting/tasks/task_creation.py @@ -18,7 +18,7 @@ class TaskLoop(AsyncLoopRunner): is_running: bool = False thread: threading.Thread = None - interval: int = 2 + interval: int = 20 task_queue: list | None = [] scoring_queue: list | None = [] miners_dict: dict | None = None diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index 879f1dba6..51cc25d94 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -223,17 +223,6 @@ async def merge_task_rewards(cls, reward_events: list[list[WeightedRewardEvent]] for uid, reward in zip(task_uids, processed_rewards): reward_dict[uid] += reward - if shared_settings.LOG_WEIGHTS: - try: - with open("rewards.jsonl", "a+") as f: - for config, rewards in miner_rewards.items(): - task_name = config.task.__name__ if hasattr(config.task, "__name__") else str(config.task) - rewards_str = {str(uid): reward_info["reward"] for uid, reward_info in rewards.items()} - record = {"task_config": task_name, "rewards": rewards_str} - f.write(json.dumps(record) + "\n") - except Exception as e: - logger.exception(f"Failed to save miner_rewards to miner_rewards.jsonl: {e}") - final_rewards = np.array(list(reward_dict.values())).astype(np.float32) return final_rewards From 86013fad2390d14cfe1eb4221338768c8e90c166 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Wed, 25 Jun 2025 17:07:40 +0000 Subject: [PATCH 3/5] Run formatter --- prompting/weight_setting/weight_setter.py | 26 +++++++------------ .../weight_setting/test_weight_setter.py | 20 ++++++-------- 2 files changed, 18 insertions(+), 28 deletions(-) diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index 51cc25d94..d6c771d3e 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -1,7 +1,7 @@ import asyncio -from collections import deque import datetime import json +from collections import deque from pathlib import Path from typing import Any @@ -42,9 +42,7 @@ async def set_weights( # If weights will not be set on chain, we should not synchronize. augmented_weights = weights else: - augmented_weights = await weight_syncer.get_augmented_weights( - weights=weights, uid=shared_settings.UID - ) + augmented_weights = await weight_syncer.get_augmented_weights(weights=weights, uid=shared_settings.UID) except BaseException as ex: logger.exception(f"Issue with setting weights: {ex}") augmented_weights = weights @@ -60,8 +58,7 @@ async def set_weights( # Convert to uint16 weights and uids. uint_uids, uint_weights = bt.utils.weight_utils.convert_weights_and_uids_for_emit( - uids=processed_weight_uids, - weights=processed_weights + uids=processed_weight_uids, weights=processed_weights ) except Exception as ex: logger.exception(f"Issue with setting weights: {ex}") @@ -117,7 +114,7 @@ async def start( ) await self._load_rewards() return await super().start(name=name) - + async def _save_rewards(self, rewards: npt.NDArray[np.float32]): """Persist the latest epoch rewards. @@ -164,9 +161,7 @@ async def _load_rewards(self): if payload is None: raise ValueError(f"Malformed weight history file: {data}") - self.reward_history.append( - {int(uid): {"reward": float(reward)} for uid, reward in payload.items()} - ) + self.reward_history.append({int(uid): {"reward": float(reward)} for uid, reward in payload.items()}) except BaseException as exc: self.reward_history: deque[dict[int, dict[str, Any]]] | None = deque(maxlen=self.reward_history_len) logger.error(f"Couldn't load rewards from file, resetting weight history: {exc}") @@ -215,8 +210,7 @@ async def merge_task_rewards(cls, reward_events: list[list[WeightedRewardEvent]] processed_rewards = task_rewards / max(1, (np.sum(task_rewards[task_rewards > 0]) + 1e-10)) else: processed_rewards = cls.apply_steepness( - raw_rewards=task_rewards, - steepness=shared_settings.REWARD_STEEPNESS + raw_rewards=task_rewards, steepness=shared_settings.REWARD_STEEPNESS ) processed_rewards *= task_config.probability @@ -236,11 +230,11 @@ def apply_steepness(cls, raw_rewards: npt.NDArray[np.float32], steepness: float p > 0.5 makes the function more exponential (winner takes all). """ # 6.64385619 = ln(100)/ln(2) -> this way if p = 0.5, the exponent is exactly 1. - exponent = (steepness ** 6.64385619) * 100 + exponent = (steepness**6.64385619) * 100 raw_rewards = np.array(raw_rewards) / max(1, (np.sum(raw_rewards[raw_rewards > 0]) + 1e-10)) positive_rewards = np.clip(raw_rewards, 1e-10, np.inf) normalised_rewards = positive_rewards / np.max(positive_rewards) - post_func_rewards = normalised_rewards ** exponent + post_func_rewards = normalised_rewards**exponent all_rewards = post_func_rewards / (np.sum(post_func_rewards) + 1e-10) all_rewards[raw_rewards <= 0] = raw_rewards[raw_rewards <= 0] return all_rewards @@ -249,13 +243,13 @@ async def run_step(self): await asyncio.sleep(0.01) try: if self.reward_events is None: - logger.error(f"No rewards evants were found, skipping weight setting") + logger.error("No rewards evants were found, skipping weight setting") return final_rewards = await self.merge_task_rewards(self.reward_events) if final_rewards is None: - logger.error(f"No rewards were found, skipping weight setting") + logger.error("No rewards were found, skipping weight setting") return await self._save_rewards(final_rewards) diff --git a/tests/prompting/weight_setting/test_weight_setter.py b/tests/prompting/weight_setting/test_weight_setter.py index 348f913ab..56cdc2957 100644 --- a/tests/prompting/weight_setting/test_weight_setter.py +++ b/tests/prompting/weight_setting/test_weight_setter.py @@ -1,26 +1,23 @@ # ruff: noqa: E402 import asyncio from pathlib import Path -from unittest.mock import MagicMock, patch -from unittest.mock import AsyncMock, patch -import pytest -from pytest import MonkeyPatch from types import SimpleNamespace +from unittest.mock import MagicMock, patch import numpy as np - -from prompting.weight_setting.weight_setter import set_weights +import pytest +from pytest import MonkeyPatch from shared import settings + settings.shared_settings = settings.SharedSettings(mode="mock") -from prompting.weight_setting import weight_setter +from prompting.rewards.reward import WeightedRewardEvent from prompting.tasks.inference import InferenceTask from prompting.tasks.msrv2_task import MSRv2Task from prompting.tasks.web_retrieval import WebRetrievalTask +from prompting.weight_setting import weight_setter from prompting.weight_setting.weight_setter import WeightSetter -from prompting.rewards.reward import WeightedRewardEvent - UIDS: list[int] = list(range(256)) @@ -46,9 +43,7 @@ def _make_event(task_cls: type, rewards: list[float]) -> WeightedRewardEvent: @pytest.mark.asyncio async def test_merge_task_rewards() -> None: negative_uids: set[int] = {0, 1, 2} - inference_rewards: list[float] = [ - -2.0 if uid in negative_uids else 1.0 for uid in UIDS - ] + inference_rewards: list[float] = [-2.0 if uid in negative_uids else 1.0 for uid in UIDS] msrv2_rewards: list[float] = [1.0] * len(UIDS) web_rewards: list[float] = [1.0] * len(UIDS) @@ -102,6 +97,7 @@ def test_run_step_with_reward_events(): patch("prompting.weight_setting.weight_setter.set_weights") as mock_set_weights, patch("prompting.weight_setting.weight_setter.logger") as mock_logger, ): + class MockTask: pass From e232793c17cd5c6179a295ea7fda174549c95bd0 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Wed, 25 Jun 2025 17:11:46 +0000 Subject: [PATCH 4/5] Reset debug code --- prompting/weight_setting/weight_setter.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index d6c771d3e..5ba01606f 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -88,8 +88,7 @@ class WeightSetter(AsyncLoopRunner): """The weight setter looks at RewardEvents in the reward_events queue and sets the weights of the miners accordingly.""" sync: bool = True - # interval: int = 60 * 21 - interval: int = 60 * 3 + interval: int = 60 * 21 reward_events: list[list[WeightedRewardEvent]] | None = None weight_dict: dict[int, list[float]] | None = None weight_syncer: WeightSynchronizer | None = None From 900fc90696b59ca217404bf0a8194547c1953925 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Wed, 25 Jun 2025 19:45:29 +0000 Subject: [PATCH 5/5] Make save rewards safer --- prompting/weight_setting/weight_setter.py | 46 +++++++++++++---------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index 5ba01606f..85e1ed053 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -1,7 +1,7 @@ import asyncio +from collections import deque import datetime import json -from collections import deque from pathlib import Path from typing import Any @@ -42,7 +42,9 @@ async def set_weights( # If weights will not be set on chain, we should not synchronize. augmented_weights = weights else: - augmented_weights = await weight_syncer.get_augmented_weights(weights=weights, uid=shared_settings.UID) + augmented_weights = await weight_syncer.get_augmented_weights( + weights=weights, uid=shared_settings.UID + ) except BaseException as ex: logger.exception(f"Issue with setting weights: {ex}") augmented_weights = weights @@ -58,7 +60,8 @@ async def set_weights( # Convert to uint16 weights and uids. uint_uids, uint_weights = bt.utils.weight_utils.convert_weights_and_uids_for_emit( - uids=processed_weight_uids, weights=processed_weights + uids=processed_weight_uids, + weights=processed_weights ) except Exception as ex: logger.exception(f"Issue with setting weights: {ex}") @@ -123,24 +126,27 @@ async def _save_rewards(self, rewards: npt.NDArray[np.float32]): Args: rewards: A one-dimensional array where the index is the uid and the value is its reward. """ - epoch_rewards = {int(uid): {"reward": float(r)} for uid, r in enumerate(rewards)} - if not isinstance(self.reward_history, deque): self.reward_history = deque(maxlen=self.reward_history_len) - self.reward_history.append(epoch_rewards) + + snapshot = {int(uid): {"reward": float(r)} for uid, r in enumerate(rewards)} + self.reward_history.append(snapshot) + + tmp_path = self.reward_history_path.with_suffix(".jsonl.tmp") + block = getattr(shared_settings, "block", 0) try: - block = shared_settings.block - with self.reward_history_path.open("w", encoding="utf-8") as file: - for snapshot in self.reward_history: + with tmp_path.open("w", encoding="utf-8") as file: + for snap in self.reward_history: row: dict[str, Any] = { "ts": datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") + "Z", "block": block, - "rewards": {str(uid): v["reward"] for uid, v in snapshot.items()}, + "rewards": {str(k): v["reward"] for k, v in snap.items()}, } file.write(json.dumps(row, separators=(",", ":")) + "\n") - except BaseException as exc: - logger.error(f"Couldn't write rewards history: {exc}") + tmp_path.replace(self.reward_history_path) + except Exception as exc: + logger.error(f"Couldn't persist rewards history: {exc}") async def _load_rewards(self): """Load reward snapshots from disk into `reward_history`. @@ -160,7 +166,9 @@ async def _load_rewards(self): if payload is None: raise ValueError(f"Malformed weight history file: {data}") - self.reward_history.append({int(uid): {"reward": float(reward)} for uid, reward in payload.items()}) + self.reward_history.append( + {int(uid): {"reward": float(reward)} for uid, reward in payload.items()} + ) except BaseException as exc: self.reward_history: deque[dict[int, dict[str, Any]]] | None = deque(maxlen=self.reward_history_len) logger.error(f"Couldn't load rewards from file, resetting weight history: {exc}") @@ -183,7 +191,6 @@ async def merge_task_rewards(cls, reward_events: list[list[WeightedRewardEvent]] linear_reward_tasks = set([InferenceTask, MSRv2Task]) linear_events: list[WeightedRewardEvent] = [] for reward_sub_events in reward_events: - await asyncio.sleep(0.01) for reward_event in reward_sub_events: task_config = TaskRegistry.get_task_config(reward_event.task) @@ -209,7 +216,8 @@ async def merge_task_rewards(cls, reward_events: list[list[WeightedRewardEvent]] processed_rewards = task_rewards / max(1, (np.sum(task_rewards[task_rewards > 0]) + 1e-10)) else: processed_rewards = cls.apply_steepness( - raw_rewards=task_rewards, steepness=shared_settings.REWARD_STEEPNESS + raw_rewards=task_rewards, + steepness=shared_settings.REWARD_STEEPNESS ) processed_rewards *= task_config.probability @@ -229,11 +237,11 @@ def apply_steepness(cls, raw_rewards: npt.NDArray[np.float32], steepness: float p > 0.5 makes the function more exponential (winner takes all). """ # 6.64385619 = ln(100)/ln(2) -> this way if p = 0.5, the exponent is exactly 1. - exponent = (steepness**6.64385619) * 100 + exponent = (steepness ** 6.64385619) * 100 raw_rewards = np.array(raw_rewards) / max(1, (np.sum(raw_rewards[raw_rewards > 0]) + 1e-10)) positive_rewards = np.clip(raw_rewards, 1e-10, np.inf) normalised_rewards = positive_rewards / np.max(positive_rewards) - post_func_rewards = normalised_rewards**exponent + post_func_rewards = normalised_rewards ** exponent all_rewards = post_func_rewards / (np.sum(post_func_rewards) + 1e-10) all_rewards[raw_rewards <= 0] = raw_rewards[raw_rewards <= 0] return all_rewards @@ -242,13 +250,13 @@ async def run_step(self): await asyncio.sleep(0.01) try: if self.reward_events is None: - logger.error("No rewards evants were found, skipping weight setting") + logger.error(f"No rewards events were found, skipping weight setting") return final_rewards = await self.merge_task_rewards(self.reward_events) if final_rewards is None: - logger.error("No rewards were found, skipping weight setting") + logger.error(f"No rewards were found, skipping weight setting") return await self._save_rewards(final_rewards)