diff --git a/prompting/rewards/scoring.py b/prompting/rewards/scoring.py index fa7126715..84ebaf5c8 100644 --- a/prompting/rewards/scoring.py +++ b/prompting/rewards/scoring.py @@ -1,6 +1,7 @@ import asyncio import copy import threading +import time from multiprocessing.managers import AcquirerProxy from loguru import logger @@ -30,6 +31,7 @@ class TaskScorer(AsyncLoopRunner): scoring_queue: list | None = None reward_events: list | None = None task_queue: list | None = None + expiry_time: int = 60 * 60 * 20 model_config = ConfigDict(arbitrary_types_allowed=True) async def start( @@ -70,12 +72,22 @@ def add_to_queue( async def run_step(self) -> RewardLoggingEvent: await asyncio.sleep(0.1) - if not self.scoring_queue: + scoring_config: ScoringConfig | None = None + while self.scoring_queue: + # Pop the oldest item from the queue. + config = self.scoring_queue.pop(0) + # Check if the config is recent enough to be processed. + if config.created_at >= time.time() - self.expiry_time: + scoring_config = config + break + # Otherwise, the old config is discarded and we continue to the next one. + else: + logger.debug( + f"Discarding old scoring config for {config.task.__class__.__name__} created at {config.created_at}" + ) + if not scoring_config: return - # TODO: Filter based on active models before selecting an item to score. - scoring_config: ScoringConfig = self.scoring_queue.pop(0) - # here we generate the actual reference with Timer(label=f"Generating reference for {scoring_config.task.__class__.__name__}"): await scoring_config.task.make_reference( diff --git a/prompting/rewards/scoring_config.py b/prompting/rewards/scoring_config.py index 1606b21b3..fb176c304 100644 --- a/prompting/rewards/scoring_config.py +++ b/prompting/rewards/scoring_config.py @@ -1,4 +1,5 @@ -from dataclasses import dataclass +import time +from dataclasses import dataclass, field from prompting.tasks.base_task import BaseTextTask from shared.base import DatasetEntry @@ -13,3 +14,4 @@ class ScoringConfig: block: int step: int task_id: str + created_at: float = field(default_factory=time.time) diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index 56df4c1e3..ac4f772e9 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -23,7 +23,7 @@ async def set_weights( - weights: np.ndarray, + weights: npt.NDArray[np.float32], subtensor: bt.Subtensor | None = None, metagraph: bt.Metagraph | None = None, weight_syncer: WeightSynchronizer | None = None, @@ -115,6 +115,21 @@ async def start( await self._load_rewards() return await super().start(name=name) + async def _compute_avg_reward(self) -> npt.NDArray[np.float32]: + """Compute reward average based on the `reward_history` and `reward_average_len` window.""" + num_uids = int(shared_settings.METAGRAPH.n.item()) + accum = np.zeros(num_uids, dtype=np.float32) + if not isinstance(self.reward_history, deque) or len(self.reward_history) == 0: + logger.warning(f"Empty rewards history, setting zero weights: {self.reward_history}") + return accum + + for snapshot in self.reward_history: + for uid_str, info in snapshot.items(): + accum[int(uid_str)] += float(info["reward"]) + + avg = accum / len(self.reward_history) + return avg + async def _save_rewards(self, rewards: npt.NDArray[np.float32]): """Persist the latest epoch rewards. @@ -255,14 +270,16 @@ async def run_step(self): return await self._save_rewards(final_rewards) - final_rewards[final_rewards < 0] = 0 - final_rewards /= np.sum(final_rewards) + 1e-10 + averaged_rewards = await self._compute_avg_reward() + averaged_rewards[averaged_rewards < 0] = 0 + averaged_rewards /= np.sum(averaged_rewards) + 1e-10 except BaseException as ex: logger.exception(f"{ex}") + return # Set weights on chain. await set_weights( - final_rewards, + averaged_rewards, subtensor=shared_settings.SUBTENSOR, metagraph=shared_settings.metagraph_force_sync(), weight_syncer=self.weight_syncer, diff --git a/pyproject.toml b/pyproject.toml index 69ef99829..49e971c9e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "prompting" -version = "2.19.8" +version = "2.19.9" description = "Subnetwork 1 runs on Bittensor and is maintained by Macrocosmos. It's an effort to create decentralised AI" authors = ["Kalei Brady, Dmytro Bobrenko, Felix Quinque, Steffen Cruz, Richard Wardle"] readme = "README.md" diff --git a/tests/prompting/weight_setting/test_weight_setter.py b/tests/prompting/weight_setting/test_weight_setter.py index 56cdc2957..0609d558d 100644 --- a/tests/prompting/weight_setting/test_weight_setter.py +++ b/tests/prompting/weight_setting/test_weight_setter.py @@ -1,5 +1,6 @@ # ruff: noqa: E402 import asyncio +from collections import deque from pathlib import Path from types import SimpleNamespace from unittest.mock import MagicMock, patch @@ -90,7 +91,7 @@ def test_steepness(): assert result[0] < 0, "Negative reward should remain negative" -def test_run_step_with_reward_events(): +def test_run_step_with_reward_events(tmp_path: Path): with ( patch("shared.uids.get_uids") as mock_get_uids, patch("prompting.weight_setting.weight_setter.TaskRegistry") as MockTaskRegistry, @@ -126,7 +127,7 @@ def __init__(self, task, uids, rewards, weight): # Set up the mock mutable_globals. - weight_setter = WeightSetter(reward_history_path=Path("test_validator_rewards.jsonl")) + weight_setter = WeightSetter(reward_history_path=tmp_path / "test_validator_rewards.jsonl") reward_events = [ [ WeightedRewardEvent( @@ -165,6 +166,37 @@ def __init__(self, task, uids, rewards, weight): mock_logger.warning.assert_not_called() +def _make_snapshot(values: list[float]) -> dict[int, dict[str, float]]: + return {uid: {"reward": v} for uid, v in enumerate(values)} + + +@pytest.mark.asyncio +async def test_avg_reward_non_empty(tmp_path: Path) -> None: + """Mean over two snapshots equals manual average.""" + ws = WeightSetter(reward_history_path=tmp_path / "test_validator_rewards.jsonl") + ws.reward_history_len = 10 + ws.reward_history = deque(maxlen=10) + rewards = list(range(256)) + ws.reward_history.append(_make_snapshot(rewards)) + ws.reward_history.append(_make_snapshot(rewards[::-1])) + + result = await ws._compute_avg_reward() + + expected = np.full(256, 255 / 2, dtype=np.float32) + assert result.dtype == np.float32 + assert np.allclose(result, expected, atol=1e-6) + + +@pytest.mark.asyncio +async def test_avg_reward_empty(monkeypatch: MonkeyPatch, tmp_path: Path) -> None: + """Empty history returns a zero vector.""" + ws = WeightSetter(reward_history_path=tmp_path / "test_validator_rewards.jsonl") + ws.reward_history_len = 10 + ws.reward_history = deque(maxlen=10) + result = await ws._compute_avg_reward() + assert np.array_equal(result, np.zeros(256, dtype=np.float32)) + + @pytest.mark.asyncio async def test_set_weights(monkeypatch: MonkeyPatch): """`set_weights` calls Subtensor.set_weights with processed vectors.""" diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index 9e411f767..81049a935 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -237,7 +237,7 @@ async def chat_completion( uids: Optional[list[int]] = None, num_miners: int = 5, uid_tracker: UidTracker | None = None, - add_reliable_miners: int = 1, + add_reliable_miners: int = 3, ) -> tuple | StreamingResponse: # TODO: Add docstring. """Handle chat completion with multiple miners in parallel."""