diff --git a/.gitignore b/.gitignore index eb2b2dbcd..105e330bd 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,9 @@ __pycache__/ *.npy *.npz prompting/storage/ +validator_rewards.jsonl +test_validator_rewards.jsonl +uid_tracker.sqlite* # C extensions *.so diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index 6cebbc639..85e1ed053 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -1,90 +1,56 @@ import asyncio -import os +from collections import deque +import datetime +import json +from pathlib import Path +from typing import Any import bittensor as bt import numpy as np -import pandas as pd +import numpy.typing as npt from loguru import logger from prompting import __spec_version__ from prompting.rewards.reward import WeightedRewardEvent from prompting.tasks.inference import InferenceTask +from prompting.tasks.msrv2_task import MSRv2Task from prompting.tasks.task_registry import TaskConfig, TaskRegistry from prompting.weight_setting.weight_synchronizer import WeightSynchronizer from shared import settings from shared.loop_runner import AsyncLoopRunner -from shared.misc import ttl_get_block shared_settings = settings.shared_settings -FILENAME = "validator_weights.npz" -WEIGHTS_HISTORY_LENGTH = 24 -PAST_WEIGHTS: list[np.ndarray] = [] - - -def apply_reward_func(raw_rewards: np.ndarray, p=0.5): - """Apply the reward function to the raw rewards. P adjusts the steepness of the function - p = 0.5 leaves - the rewards unchanged, p < 0.5 makes the function more linear (at p=0 all miners with positives reward values get the same reward), - p > 0.5 makes the function more exponential (winner takes all). - """ - exponent = (p**6.64385619) * 100 # 6.64385619 = ln(100)/ln(2) -> this way if p=0.5, the exponent is exatly 1 - raw_rewards = np.array(raw_rewards) / max(1, (np.sum(raw_rewards[raw_rewards > 0]) + 1e-10)) - positive_rewards = np.clip(raw_rewards, 1e-10, np.inf) - normalised_rewards = positive_rewards / np.max(positive_rewards) - post_func_rewards = normalised_rewards**exponent - all_rewards = post_func_rewards / (np.sum(post_func_rewards) + 1e-10) - all_rewards[raw_rewards <= 0] = raw_rewards[raw_rewards <= 0] - return all_rewards - - -def save_weights(weights: list[np.ndarray]): - """Saves the list of numpy arrays to a file.""" - # Save all arrays into a single .npz file - np.savez_compressed(FILENAME, *weights) - async def set_weights( weights: np.ndarray, - step: int = 0, subtensor: bt.Subtensor | None = None, metagraph: bt.Metagraph | None = None, weight_syncer: WeightSynchronizer | None = None, ): + """Set the validator weights to the metagraph hotkeys based on the scores it has received from the miners. + + The weights determine the trust and incentive level the validator assigns to miner nodes on the network. """ - Sets the validator weights to the metagraph hotkeys based on the scores it has received from the miners. The weights determine the trust and incentive level the validator assigns to miner nodes on the network. - """ - # Check if self.scores contains any NaN values and log a warning if it does. try: if any(np.isnan(weights).flatten()): - logger.warning( - f"Scores contain NaN values. This may be due to a lack of responses from miners, or a bug in your reward functions. Scores: {weights}" - ) - - # Replace any NaN values with 0 + logger.warning(f"Scores used for weight setting contain NaN values: {weights}") weights = np.nan_to_num(weights, nan=0.0) - # Calculate the average reward for each uid across non-zero values. - PAST_WEIGHTS.append(weights) - if len(PAST_WEIGHTS) > WEIGHTS_HISTORY_LENGTH: - PAST_WEIGHTS.pop(0) - averaged_weights = np.average(np.array(PAST_WEIGHTS), axis=0) - save_weights(PAST_WEIGHTS) + try: - if ( - shared_settings.NEURON_DISABLE_SET_WEIGHTS - ): # If weights will not be set on chain, we should not synchronize - augmented_weights = averaged_weights + if shared_settings.NEURON_DISABLE_SET_WEIGHTS: + # If weights will not be set on chain, we should not synchronize. + augmented_weights = weights else: augmented_weights = await weight_syncer.get_augmented_weights( - weights=averaged_weights, uid=shared_settings.UID + weights=weights, uid=shared_settings.UID ) - except Exception as ex: + except BaseException as ex: logger.exception(f"Issue with setting weights: {ex}") - augmented_weights = averaged_weights + augmented_weights = weights + # Process the raw weights to final_weights via subtensor limitations. - ( - processed_weight_uids, - processed_weights, - ) = bt.utils.weight_utils.process_weights_for_netuid( + processed_weight_uids, processed_weights = bt.utils.weight_utils.process_weights_for_netuid( uids=shared_settings.METAGRAPH.uids, weights=augmented_weights, netuid=shared_settings.NETUID, @@ -93,35 +59,13 @@ async def set_weights( ) # Convert to uint16 weights and uids. - ( - uint_uids, - uint_weights, - ) = bt.utils.weight_utils.convert_weights_and_uids_for_emit( - uids=processed_weight_uids, weights=processed_weights + uint_uids, uint_weights = bt.utils.weight_utils.convert_weights_and_uids_for_emit( + uids=processed_weight_uids, + weights=processed_weights ) except Exception as ex: logger.exception(f"Issue with setting weights: {ex}") - # Create a dataframe from weights and uids and save it as a csv file, with the current step as the filename. - if shared_settings.LOG_WEIGHTS: - try: - weights_df = pd.DataFrame( - { - "step": step, - "uids": uint_uids, - "weights": processed_weights.flatten(), - "raw_weights": str(list(weights.flatten())), - "averaged_weights": str(list(averaged_weights.flatten())), - "block": ttl_get_block(subtensor=subtensor), - } - ) - step_filename = "weights.csv" - file_exists = os.path.isfile(step_filename) - # Append to the file if it exists, otherwise write a new file. - weights_df.to_csv(step_filename, mode="a", index=False, header=not file_exists) - except Exception as ex: - logger.exception(f"Couldn't write to df: {ex}") - if shared_settings.NEURON_DISABLE_SET_WEIGHTS: logger.debug(f"Set weights disabled: {shared_settings.NEURON_DISABLE_SET_WEIGHTS}") return @@ -147,107 +91,189 @@ class WeightSetter(AsyncLoopRunner): """The weight setter looks at RewardEvents in the reward_events queue and sets the weights of the miners accordingly.""" sync: bool = True - interval: int = 60 * 21 # set rewards every 25 minutes + interval: int = 60 * 21 reward_events: list[list[WeightedRewardEvent]] | None = None - subtensor: bt.Subtensor | None = None - metagraph: bt.Metagraph | None = None weight_dict: dict[int, list[float]] | None = None weight_syncer: WeightSynchronizer | None = None - # interval: int = 60 + + reward_history_path: Path = Path("validator_rewards.jsonl") + reward_history_len: int = 24 + # List of uids info per each epoch, e.g.: [{1: {"reward": 1.0}, 2: {"reward": 3.0}}]. + reward_history: deque[dict[int, dict[str, Any]]] | None = None class Config: arbitrary_types_allowed = True - async def start(self, reward_events, weight_dict, name: str | None = None, **kwargs): + async def start( + self, + reward_events: list[list[WeightedRewardEvent]] | None, + weight_dict: dict[int, list[float]], + name: str | None = None, + ): self.reward_events = reward_events - self.weight_dict = weight_dict - global PAST_WEIGHTS self.weight_syncer = WeightSynchronizer( metagraph=shared_settings.METAGRAPH, wallet=shared_settings.WALLET, weight_dict=weight_dict ) + await self._load_rewards() + return await super().start(name=name) + + async def _save_rewards(self, rewards: npt.NDArray[np.float32]): + """Persist the latest epoch rewards. + + The snapshot is appended to `reward_history` (bounded by `reward_average_len`) and the JSONL file at + `reward_average_path` is rewritten with the current buffer. + + Args: + rewards: A one-dimensional array where the index is the uid and the value is its reward. + """ + if not isinstance(self.reward_history, deque): + self.reward_history = deque(maxlen=self.reward_history_len) + + snapshot = {int(uid): {"reward": float(r)} for uid, r in enumerate(rewards)} + self.reward_history.append(snapshot) + + tmp_path = self.reward_history_path.with_suffix(".jsonl.tmp") + block = getattr(shared_settings, "block", 0) + try: - with np.load(FILENAME) as data: - PAST_WEIGHTS = [data[key] for key in data.files] - except FileNotFoundError: - logger.info("No weights file found - this is expected on a new validator, starting with empty weights") - PAST_WEIGHTS = [] - except Exception as ex: - logger.error(f"Couldn't load weights from file: {ex}") - return await super().start(name=name, **kwargs) + with tmp_path.open("w", encoding="utf-8") as file: + for snap in self.reward_history: + row: dict[str, Any] = { + "ts": datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") + "Z", + "block": block, + "rewards": {str(k): v["reward"] for k, v in snap.items()}, + } + file.write(json.dumps(row, separators=(",", ":")) + "\n") + tmp_path.replace(self.reward_history_path) + except Exception as exc: + logger.error(f"Couldn't persist rewards history: {exc}") + + async def _load_rewards(self): + """Load reward snapshots from disk into `reward_history`. + + Only the newest `reward_average_len` rows are retained. + """ + self.reward_history: deque[dict[int, dict[str, Any]]] | None = deque(maxlen=self.reward_history_len) + if not self.reward_history_path.exists(): + logger.info("No rewards file found - starting with empty history.") + return + + try: + with self.reward_history_path.open("r", encoding="utf-8") as file: + for line in file: + data = json.loads(line) + payload = data.get("rewards") + if payload is None: + raise ValueError(f"Malformed weight history file: {data}") + + self.reward_history.append( + {int(uid): {"reward": float(reward)} for uid, reward in payload.items()} + ) + except BaseException as exc: + self.reward_history: deque[dict[int, dict[str, Any]]] | None = deque(maxlen=self.reward_history_len) + logger.error(f"Couldn't load rewards from file, resetting weight history: {exc}") + + @classmethod + async def merge_task_rewards(cls, reward_events: list[list[WeightedRewardEvent]]) -> npt.NDArray[np.float32] | None: + if len(reward_events) == 0: + logger.warning("No reward events in queue, skipping weight setting...") + return + + all_uids = range(shared_settings.METAGRAPH.n.item()) + reward_dict = {uid: 0 for uid in all_uids} + logger.info(f"Setting weights for {len(reward_dict)} uids") + + # miner_rewards is a dictionary that separates each task config into a dictionary of uids with their rewards. + miner_rewards: dict[TaskConfig, dict[int, dict[str, int]]] = { + config: {uid: {"reward": 0, "count": 0} for uid in all_uids} for config in TaskRegistry.task_configs + } + + linear_reward_tasks = set([InferenceTask, MSRv2Task]) + linear_events: list[WeightedRewardEvent] = [] + for reward_sub_events in reward_events: + for reward_event in reward_sub_events: + task_config = TaskRegistry.get_task_config(reward_event.task) + + # Inference task uses a different reward model. + if task_config.task in linear_reward_tasks: + linear_events.append(reward_event) + continue + + # Give each uid the reward they received. + for uid, reward in zip(reward_event.uids, reward_event.rewards): + miner_rewards[task_config][uid]["reward"] += reward * reward_event.weight + miner_rewards[task_config][uid]["count"] += reward_event.weight + + for linear_event in linear_events: + task_config = TaskRegistry.get_task_config(linear_event.task) + for uid, reward in zip(linear_event.uids, linear_event.rewards): + miner_rewards[task_config][uid]["reward"] += reward + + for task_config, rewards in miner_rewards.items(): + task_rewards = np.array([x["reward"] / max(1, x["count"]) for x in list(rewards.values())]) + task_uids = np.array(list(rewards.keys())) + if task_config.task in linear_reward_tasks: + processed_rewards = task_rewards / max(1, (np.sum(task_rewards[task_rewards > 0]) + 1e-10)) + else: + processed_rewards = cls.apply_steepness( + raw_rewards=task_rewards, + steepness=shared_settings.REWARD_STEEPNESS + ) + processed_rewards *= task_config.probability + + for uid, reward in zip(task_uids, processed_rewards): + reward_dict[uid] += reward + + final_rewards = np.array(list(reward_dict.values())).astype(np.float32) + return final_rewards + + @classmethod + def apply_steepness(cls, raw_rewards: npt.NDArray[np.float32], steepness: float = 0.5) -> npt.NDArray[np.float32]: + """Apply steepness function to the raw rewards. + + Args: + steepness: Adjusts the steepness of the function - p = 0.5 leaves the rewards unchanged, + p < 0.5 makes the function more linear (at p=0 all miners with positives reward values get the same reward), + p > 0.5 makes the function more exponential (winner takes all). + """ + # 6.64385619 = ln(100)/ln(2) -> this way if p = 0.5, the exponent is exactly 1. + exponent = (steepness ** 6.64385619) * 100 + raw_rewards = np.array(raw_rewards) / max(1, (np.sum(raw_rewards[raw_rewards > 0]) + 1e-10)) + positive_rewards = np.clip(raw_rewards, 1e-10, np.inf) + normalised_rewards = positive_rewards / np.max(positive_rewards) + post_func_rewards = normalised_rewards ** exponent + all_rewards = post_func_rewards / (np.sum(post_func_rewards) + 1e-10) + all_rewards[raw_rewards <= 0] = raw_rewards[raw_rewards <= 0] + return all_rewards async def run_step(self): await asyncio.sleep(0.01) try: - if len(self.reward_events) == 0: - logger.warning("No reward events in queue, skipping weight setting...") + if self.reward_events is None: + logger.error(f"No rewards events were found, skipping weight setting") + return + + final_rewards = await self.merge_task_rewards(self.reward_events) + + if final_rewards is None: + logger.error(f"No rewards were found, skipping weight setting") return - # reward_events is a list of lists of WeightedRewardEvents - the 'sublists' each contain the multiple reward events for a single task - self.reward_events: list[list[WeightedRewardEvent]] = self.reward_events # to get correct typehinting - - # reward_dict = {uid: 0 for uid in get_uids(sampling_mode="all")} - all_uids = range(shared_settings.METAGRAPH.n.item()) - reward_dict = {uid: 0 for uid in all_uids} - logger.info(f"Setting weights for {len(reward_dict)} uids") - # miner_rewards is a dictionary that separates each task config into a dictionary of uids with their rewards - miner_rewards: dict[TaskConfig, dict[int, float]] = { - config: {uid: {"reward": 0, "count": 0} for uid in all_uids} for config in TaskRegistry.task_configs - } - - inference_events: list[WeightedRewardEvent] = [] - for reward_events in self.reward_events: - await asyncio.sleep(0.01) - for reward_event in reward_events: - if np.sum(reward_event.rewards) > 0: - logger.debug("Identified positive reward event") - task_config = TaskRegistry.get_task_config(reward_event.task) - - # inference task uses a different reward model - if task_config.task == InferenceTask: - inference_events.append(reward_event) - continue - - # give each uid the reward they received - for uid, reward in zip(reward_event.uids, reward_event.rewards): - miner_rewards[task_config][uid]["reward"] += ( - reward * reward_event.weight - ) # TODO: Double check I actually average at the end - miner_rewards[task_config][uid]["count"] += ( - 1 * reward_event.weight - ) # TODO: Double check I actually average at the end - - for inference_event in inference_events: - for uid, reward in zip(inference_event.uids, inference_event.rewards): - miner_rewards[TaskRegistry.get_task_config(InferenceTask)][uid]["reward"] += ( - reward # for inference 2x responses should mean 2x the reward - ) - for task_config, rewards in miner_rewards.items(): - r = np.array([x["reward"] / max(1, x["count"]) for x in list(rewards.values())]) - u = np.array(list(rewards.keys())) - if task_config.task == InferenceTask: - processed_rewards = r / max(1, (np.sum(r[r > 0]) + 1e-10)) - else: - processed_rewards = apply_reward_func(raw_rewards=r, p=shared_settings.REWARD_STEEPNESS) - processed_rewards *= task_config.probability - # update reward dict - for uid, reward in zip(u, processed_rewards): - reward_dict[uid] += reward - final_rewards = np.array(list(reward_dict.values())).astype(float) + await self._save_rewards(final_rewards) final_rewards[final_rewards < 0] = 0 final_rewards /= np.sum(final_rewards) + 1e-10 - except Exception as ex: + except BaseException as ex: logger.exception(f"{ex}") - # set weights on chain + # Set weights on chain. await set_weights( final_rewards, - step=self.step, subtensor=shared_settings.SUBTENSOR, - metagraph=shared_settings.METAGRAPH, + metagraph=shared_settings.metagraph_force_sync(), weight_syncer=self.weight_syncer, ) - # TODO: empty rewards queue only on weight setting success - self.reward_events[:] = [] # empty reward events queue + # TODO: Empty rewards queue only on weight setting success. + self.reward_events[:] = [] await asyncio.sleep(0.01) return final_rewards diff --git a/shared/settings.py b/shared/settings.py index 73497231f..658dbc1e5 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -263,6 +263,10 @@ def SUBTENSOR(self) -> Subtensor: self._subtensor = Subtensor(network=subtensor_network) return self._subtensor + def metagraph_force_sync(self) -> Metagraph: + self._last_update_time = 0 + return self.METAGRAPH + @property def METAGRAPH(self) -> Metagraph: if time.time() - self._last_update_time > 1200: diff --git a/tests/prompting/test_weight_settings.py b/tests/prompting/test_weight_settings.py deleted file mode 100644 index 00b923a5f..000000000 --- a/tests/prompting/test_weight_settings.py +++ /dev/null @@ -1,145 +0,0 @@ -# ruff: noqa: E402 -import asyncio - -import numpy as np - -from shared import settings - -settings.shared_settings = settings.SharedSettings(mode="mock") -raw_rewards = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) -from unittest.mock import MagicMock, patch - -from prompting.weight_setting.weight_setter import WeightSetter, apply_reward_func - - -def test_apply_reward_func(): - raw_rewards = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) - - # test result is even returned - result = apply_reward_func(raw_rewards) - assert result is not None, "Result was None" - - # Test with p = 0.5 (no change) - result = apply_reward_func(raw_rewards, p=0.5) - assert np.allclose( - result, raw_rewards / np.sum(raw_rewards), atol=1e-6 - ), "Result should be unchanged from raw rewards" - - # Test with p = 0 (more linear) - result = apply_reward_func(raw_rewards, p=0) - assert np.isclose(np.std(result), 0, atol=1e-6), "All rewards should be equal" - - # Test with p = 1 (more exponential) - result = apply_reward_func(raw_rewards, p=1) - assert result[-1] > 0.9, "Top miner should take vast majority of reward" - - # Test with negative values - raw_rewards = np.array([-1.0, 0.0, 1.0, 2.0, 3.0]) - result = apply_reward_func(raw_rewards, p=0.5) - assert result[0] < 0, "Negative reward should remain negative" - - -def test_run_step_with_reward_events(): - with ( - patch("shared.uids.get_uids") as mock_get_uids, - patch("prompting.weight_setting.weight_setter.TaskRegistry") as MockTaskRegistry, - # patch("prompting.weight_setting.weight_setter.mutable_globals") as mock_mutable_globals, - patch("prompting.weight_setting.weight_setter.set_weights") as mock_set_weights, - patch("prompting.weight_setting.weight_setter.logger") as mock_logger, - ): - - class MockTask: - pass - - class TaskConfig: - def __init__(self, name, probability): - self.name = name - self.probability = probability - self.task = MockTask - - class WeightedRewardEvent: - def __init__(self, task, uids, rewards, weight): - self.task = task - self.uids = uids - self.rewards = rewards - self.weight = weight - - mock_uids = [1, 2, 3, 4, 5] - mock_get_uids.return_value = mock_uids - - # Set up the mock TaskRegistry - mock_task_registry = MockTaskRegistry - mock_task_registry.task_configs = [ - TaskConfig(name="Task1", probability=0.5), - ] - mock_task_registry.get_task_config = MagicMock(return_value=mock_task_registry.task_configs[0]) - - # Set up the mock mutable_globals - - weight_setter = WeightSetter() - reward_events = [ - [ - WeightedRewardEvent( - task=mock_task_registry.task_configs[0], uids=mock_uids, rewards=[1.0, 2.0, 3.0, 4.0, 5.0], weight=1 - ), - ], - [ - WeightedRewardEvent( - task=mock_task_registry.task_configs[0], uids=mock_uids, rewards=[5.0, 4.0, 3.0, 2.0, 1.0], weight=1 - ), - ], - ] - weight_setter.reward_events = reward_events - output = asyncio.run(weight_setter.run_step()) - - print(output) - mock_set_weights.assert_called_once() - call_args = mock_set_weights.call_args[0] - assert len([c for c in call_args[0] if c > 0]) == len(mock_uids) - assert np.isclose(np.sum(call_args[0]), 1, atol=1e-6) - - # Check that the warning about empty reward events is not logged - mock_logger.warning.assert_not_called() - - -# def test_run_step_without_reward_events(weight_setter): -# with ( -# patch("prompting.weight_setter.get_uids") as mock_get_uids, -# patch("prompting.weight_setter.TaskRegistry.task_configs", new_callable=property) as mock_task_configs, -# patch("prompting.weight_setter.mutable_globals.reward_events") as mock_reward_events, -# patch("prompting.weight_setter.set_weights") as mock_set_weights, -# ): - -# mock_get_uids.return_value = [1, 2, 3, 4, 5] -# mock_task_configs.return_value = [ -# TaskConfig(name="Task1", probability=0.5), -# TaskConfig(name="Task2", probability=0.3), -# ] -# mock_reward_events.return_value = [] - -# weight_setter.run_step() - -# mock_set_weights.assert_not_called() - - -# def test_set_weights(): -# with ( -# patch("prompting.weight_setter.settings.SUBTENSOR") as mock_subtensor, -# patch("prompting.weight_setter.settings.WALLET") as mock_wallet, -# patch("prompting.weight_setter.settings.NETUID") as mock_netuid, -# patch("prompting.weight_setter.settings.METAGRAPH") as mock_metagraph, -# patch("prompting.weight_setter.pd.DataFrame.to_csv") as mock_to_csv, -# ): - -# weights = np.array([0.1, 0.2, 0.3, 0.4]) -# uids = np.array([1, 2, 3, 4]) -# mock_metagraph.uids = uids - -# set_weights(weights) - -# # Check if weights were processed and set -# mock_subtensor.set_weights.assert_called_once() - -# # Check if weights were logged -# if settings.LOG_WEIGHTS: -# mock_to_csv.assert_called_once() diff --git a/tests/prompting/weight_setting/test_weight_setter.py b/tests/prompting/weight_setting/test_weight_setter.py new file mode 100644 index 000000000..56cdc2957 --- /dev/null +++ b/tests/prompting/weight_setting/test_weight_setter.py @@ -0,0 +1,211 @@ +# ruff: noqa: E402 +import asyncio +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest +from pytest import MonkeyPatch + +from shared import settings + +settings.shared_settings = settings.SharedSettings(mode="mock") + +from prompting.rewards.reward import WeightedRewardEvent +from prompting.tasks.inference import InferenceTask +from prompting.tasks.msrv2_task import MSRv2Task +from prompting.tasks.web_retrieval import WebRetrievalTask +from prompting.weight_setting import weight_setter +from prompting.weight_setting.weight_setter import WeightSetter + +UIDS: list[int] = list(range(256)) + + +def _make_event(task_cls: type, rewards: list[float]) -> WeightedRewardEvent: + """Return a fully-populated WeightedRewardEvent for the given task.""" + return WeightedRewardEvent( + weight=1.0, + task=task_cls, + reward_model_name="test", + rewards=rewards, + rewards_normalized=rewards, + timings=[0.0] * len(rewards), + reward_model_type="reward", + batch_time=0.0, + uids=UIDS, + threshold=None, + extra_info=None, + reward_type="reward", + ) + + +@pytest.mark.asyncio +async def test_merge_task_rewards() -> None: + negative_uids: set[int] = {0, 1, 2} + inference_rewards: list[float] = [-2.0 if uid in negative_uids else 1.0 for uid in UIDS] + msrv2_rewards: list[float] = [1.0] * len(UIDS) + web_rewards: list[float] = [1.0] * len(UIDS) + + events: list[list[WeightedRewardEvent]] = [ + [ + _make_event(InferenceTask(), inference_rewards), + _make_event(MSRv2Task(), msrv2_rewards), + _make_event(WebRetrievalTask(), web_rewards), + ] + ] + + final_rewards = await WeightSetter.merge_task_rewards(events) + + assert isinstance(final_rewards, np.ndarray) + assert final_rewards.dtype == np.float32 + assert final_rewards.shape == (len(UIDS),) + assert int((final_rewards < 0).sum()) == 3 + + +def test_steepness(): + raw_rewards = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) + + # Test result is even returned. + result = WeightSetter.apply_steepness(raw_rewards) + assert result is not None, "Result was None" + + # Test with p = 0.5 (no change). + result = WeightSetter.apply_steepness(raw_rewards, steepness=0.5) + assert np.allclose( + result, raw_rewards / np.sum(raw_rewards), atol=1e-6 + ), "Result should be unchanged from raw rewards" + + # Test with p = 0 (more linear). + result = WeightSetter.apply_steepness(raw_rewards, steepness=0) + assert np.isclose(np.std(result), 0, atol=1e-6), "All rewards should be equal" + + # Test with p = 1 (more exponential). + result = WeightSetter.apply_steepness(raw_rewards, steepness=1) + assert result[-1] > 0.9, "Top miner should take vast majority of reward" + + # Test with negative values. + raw_rewards = np.array([-1.0, 0.0, 1.0, 2.0, 3.0]) + result = WeightSetter.apply_steepness(raw_rewards, steepness=0.5) + assert result[0] < 0, "Negative reward should remain negative" + + +def test_run_step_with_reward_events(): + with ( + patch("shared.uids.get_uids") as mock_get_uids, + patch("prompting.weight_setting.weight_setter.TaskRegistry") as MockTaskRegistry, + patch("prompting.weight_setting.weight_setter.set_weights") as mock_set_weights, + patch("prompting.weight_setting.weight_setter.logger") as mock_logger, + ): + + class MockTask: + pass + + class TaskConfig: + def __init__(self, name, probability): + self.name = name + self.probability = probability + self.task = MockTask + + class WeightedRewardEvent: + def __init__(self, task, uids, rewards, weight): + self.task = task + self.uids = uids + self.rewards = rewards + self.weight = weight + + mock_uids = [1, 2, 3, 4, 5] + mock_get_uids.return_value = mock_uids + + # Set up the mock TaskRegistry + mock_task_registry = MockTaskRegistry + mock_task_registry.task_configs = [ + TaskConfig(name="Task1", probability=0.5), + ] + mock_task_registry.get_task_config = MagicMock(return_value=mock_task_registry.task_configs[0]) + + # Set up the mock mutable_globals. + + weight_setter = WeightSetter(reward_history_path=Path("test_validator_rewards.jsonl")) + reward_events = [ + [ + WeightedRewardEvent( + task=mock_task_registry.task_configs[0], + uids=mock_uids, + rewards=[1.0, 2.0, 3.0, 4.0, 5.0], + weight=1, + ), + ], + [ + WeightedRewardEvent( + task=mock_task_registry.task_configs[0], + uids=mock_uids, + rewards=[-5.0, -4.0, -3.0, -2.0, -1.0], + weight=1, + ), + ], + ] + weight_setter.reward_events = reward_events + asyncio.run(weight_setter.run_step()) + + mock_set_weights.assert_called_once() + call_args = mock_set_weights.call_args[0] + weights = call_args[0] + + assert weights[0] <= 0 + assert weights[1] <= 0 + assert weights[2] == 0 + assert weights[3] >= 0 + assert weights[4] >= 0 + + # Weights are re-normalised to 1. + assert np.isclose(weights.sum(), 1.0, atol=1e-6) + + # Check that the warning about empty reward events is not logged. + mock_logger.warning.assert_not_called() + + +@pytest.mark.asyncio +async def test_set_weights(monkeypatch: MonkeyPatch): + """`set_weights` calls Subtensor.set_weights with processed vectors.""" + stub_settings = SimpleNamespace( + NEURON_DISABLE_SET_WEIGHTS=False, + UID=0, + NETUID=42, + WALLET="dummy-wallet", + METAGRAPH=SimpleNamespace(uids=np.arange(4, dtype=np.uint16)), + ) + + subtensor_mock = MagicMock() + subtensor_mock.set_weights = MagicMock(return_value=(True, "ok")) + stub_settings.SUBTENSOR = subtensor_mock + monkeypatch.setattr(weight_setter, "shared_settings", stub_settings) + + monkeypatch.setattr( + weight_setter.bt.utils.weight_utils, + "process_weights_for_netuid", + lambda *, uids, weights, **_: (uids, weights), + ) + monkeypatch.setattr( + weight_setter.bt.utils.weight_utils, + "convert_weights_and_uids_for_emit", + lambda uids, weights: (uids.astype(np.uint16), (weights * 65535).astype(np.uint16)), + ) + + class _Syncer: + async def get_augmented_weights(self, *, weights: np.ndarray, uid: int) -> np.ndarray: # noqa: D401 + return weights + + raw = np.array([0.1, 0.2, 0.3, 0.4], dtype=np.float32) + await weight_setter.set_weights( + raw, + subtensor=stub_settings.SUBTENSOR, + metagraph=stub_settings.METAGRAPH, + weight_syncer=_Syncer(), + ) + + subtensor_mock.set_weights.assert_called_once() + call_kwargs = subtensor_mock.set_weights.call_args.kwargs + + expected_uint16 = (raw * 65535).astype(np.uint16) + assert np.array_equal(call_kwargs["weights"], expected_uint16)