Skip to content
Merged
20 changes: 16 additions & 4 deletions prompting/rewards/scoring.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import copy
import threading
import time
from multiprocessing.managers import AcquirerProxy

from loguru import logger
Expand Down Expand Up @@ -30,6 +31,7 @@ class TaskScorer(AsyncLoopRunner):
scoring_queue: list | None = None
reward_events: list | None = None
task_queue: list | None = None
expiry_time: int = 60 * 60 * 20
model_config = ConfigDict(arbitrary_types_allowed=True)

async def start(
Expand Down Expand Up @@ -70,12 +72,22 @@ def add_to_queue(
async def run_step(self) -> RewardLoggingEvent:
await asyncio.sleep(0.1)

if not self.scoring_queue:
scoring_config: ScoringConfig | None = None
while self.scoring_queue:
# Pop the oldest item from the queue.
config = self.scoring_queue.pop(0)
# Check if the config is recent enough to be processed.
if config.created_at >= time.time() - self.expiry_time:
scoring_config = config
break
# Otherwise, the old config is discarded and we continue to the next one.
else:
logger.debug(
f"Discarding old scoring config for {config.task.__class__.__name__} created at {config.created_at}"
)
if not scoring_config:
return

# TODO: Filter based on active models before selecting an item to score.
scoring_config: ScoringConfig = self.scoring_queue.pop(0)

# here we generate the actual reference
with Timer(label=f"Generating reference for {scoring_config.task.__class__.__name__}"):
await scoring_config.task.make_reference(
Expand Down
4 changes: 3 additions & 1 deletion prompting/rewards/scoring_config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dataclasses import dataclass
import time
from dataclasses import dataclass, field

from prompting.tasks.base_task import BaseTextTask
from shared.base import DatasetEntry
Expand All @@ -13,3 +14,4 @@ class ScoringConfig:
block: int
step: int
task_id: str
created_at: float = field(default_factory=time.time)
25 changes: 21 additions & 4 deletions prompting/weight_setting/weight_setter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


async def set_weights(
weights: np.ndarray,
weights: npt.NDArray[np.float32],
subtensor: bt.Subtensor | None = None,
metagraph: bt.Metagraph | None = None,
weight_syncer: WeightSynchronizer | None = None,
Expand Down Expand Up @@ -115,6 +115,21 @@ async def start(
await self._load_rewards()
return await super().start(name=name)

async def _compute_avg_reward(self) -> npt.NDArray[np.float32]:
"""Compute reward average based on the `reward_history` and `reward_average_len` window."""
num_uids = int(shared_settings.METAGRAPH.n.item())
accum = np.zeros(num_uids, dtype=np.float32)
if not isinstance(self.reward_history, deque) or len(self.reward_history) == 0:
logger.warning(f"Empty rewards history, setting zero weights: {self.reward_history}")
return accum

for snapshot in self.reward_history:
for uid_str, info in snapshot.items():
accum[int(uid_str)] += float(info["reward"])

avg = accum / len(self.reward_history)
return avg

async def _save_rewards(self, rewards: npt.NDArray[np.float32]):
"""Persist the latest epoch rewards.

Expand Down Expand Up @@ -255,14 +270,16 @@ async def run_step(self):
return

await self._save_rewards(final_rewards)
final_rewards[final_rewards < 0] = 0
final_rewards /= np.sum(final_rewards) + 1e-10
averaged_rewards = await self._compute_avg_reward()
averaged_rewards[averaged_rewards < 0] = 0
averaged_rewards /= np.sum(averaged_rewards) + 1e-10
except BaseException as ex:
logger.exception(f"{ex}")
return

# Set weights on chain.
await set_weights(
final_rewards,
averaged_rewards,
subtensor=shared_settings.SUBTENSOR,
metagraph=shared_settings.metagraph_force_sync(),
weight_syncer=self.weight_syncer,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "prompting"
version = "2.19.8"
version = "2.19.9"
description = "Subnetwork 1 runs on Bittensor and is maintained by Macrocosmos. It's an effort to create decentralised AI"
authors = ["Kalei Brady, Dmytro Bobrenko, Felix Quinque, Steffen Cruz, Richard Wardle"]
readme = "README.md"
Expand Down
36 changes: 34 additions & 2 deletions tests/prompting/weight_setting/test_weight_setter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# ruff: noqa: E402
import asyncio
from collections import deque
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
Expand Down Expand Up @@ -90,7 +91,7 @@ def test_steepness():
assert result[0] < 0, "Negative reward should remain negative"


def test_run_step_with_reward_events():
def test_run_step_with_reward_events(tmp_path: Path):
with (
patch("shared.uids.get_uids") as mock_get_uids,
patch("prompting.weight_setting.weight_setter.TaskRegistry") as MockTaskRegistry,
Expand Down Expand Up @@ -126,7 +127,7 @@ def __init__(self, task, uids, rewards, weight):

# Set up the mock mutable_globals.

weight_setter = WeightSetter(reward_history_path=Path("test_validator_rewards.jsonl"))
weight_setter = WeightSetter(reward_history_path=tmp_path / "test_validator_rewards.jsonl")
reward_events = [
[
WeightedRewardEvent(
Expand Down Expand Up @@ -165,6 +166,37 @@ def __init__(self, task, uids, rewards, weight):
mock_logger.warning.assert_not_called()


def _make_snapshot(values: list[float]) -> dict[int, dict[str, float]]:
return {uid: {"reward": v} for uid, v in enumerate(values)}


@pytest.mark.asyncio
async def test_avg_reward_non_empty(tmp_path: Path) -> None:
"""Mean over two snapshots equals manual average."""
ws = WeightSetter(reward_history_path=tmp_path / "test_validator_rewards.jsonl")
ws.reward_history_len = 10
ws.reward_history = deque(maxlen=10)
rewards = list(range(256))
ws.reward_history.append(_make_snapshot(rewards))
ws.reward_history.append(_make_snapshot(rewards[::-1]))

result = await ws._compute_avg_reward()

expected = np.full(256, 255 / 2, dtype=np.float32)
assert result.dtype == np.float32
assert np.allclose(result, expected, atol=1e-6)


@pytest.mark.asyncio
async def test_avg_reward_empty(monkeypatch: MonkeyPatch, tmp_path: Path) -> None:
"""Empty history returns a zero vector."""
ws = WeightSetter(reward_history_path=tmp_path / "test_validator_rewards.jsonl")
ws.reward_history_len = 10
ws.reward_history = deque(maxlen=10)
result = await ws._compute_avg_reward()
assert np.array_equal(result, np.zeros(256, dtype=np.float32))


@pytest.mark.asyncio
async def test_set_weights(monkeypatch: MonkeyPatch):
"""`set_weights` calls Subtensor.set_weights with processed vectors."""
Expand Down
2 changes: 1 addition & 1 deletion validator_api/chat_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ async def chat_completion(
uids: Optional[list[int]] = None,
num_miners: int = 5,
uid_tracker: UidTracker | None = None,
add_reliable_miners: int = 1,
add_reliable_miners: int = 3,
) -> tuple | StreamingResponse:
# TODO: Add docstring.
"""Handle chat completion with multiple miners in parallel."""
Expand Down