Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 39 additions & 26 deletions prompting/weight_setting/weight_setter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import datetime
import json
from collections import deque
from pathlib import Path
Expand Down Expand Up @@ -89,15 +88,17 @@ class WeightSetter(AsyncLoopRunner):
"""The weight setter looks at RewardEvents in the reward_events queue and sets the weights of the miners accordingly."""

sync: bool = True
interval: int = 60 * 21
interval: int = 60 * 23
reward_events: list[list[WeightedRewardEvent]] | None = None
weight_dict: dict[int, list[float]] | None = None
weight_syncer: WeightSynchronizer | None = None

# Rewards moving average persistency.
reward_history_path: Path = Path("validator_rewards.jsonl")
reward_history_len: int = 24
# List of uids info per each epoch, e.g.: [{1: {"reward": 1.0}, 2: {"reward": 3.0}}].
reward_history: deque[dict[int, dict[str, Any]]] | None = None
# Rewards moving average, 36 epochs = approx. 12 hours.
reward_history_len: int = 36
# List of uids info per epoch, e.g.: [{1: {"reward": 1.0, "hotkey": "ABC"}, 2: {"reward": 3.0, "hotkey": "XYZ"}}].
reward_history: deque[dict[int, dict[str, float | str]]] | None = None

class Config:
arbitrary_types_allowed = True
Expand All @@ -120,17 +121,24 @@ async def _compute_avg_reward(self) -> npt.NDArray[np.float32]:
num_uids = int(shared_settings.METAGRAPH.n.item())
accum = np.zeros(num_uids, dtype=np.float32)
if not isinstance(self.reward_history, deque) or len(self.reward_history) == 0:
logger.warning(f"Empty rewards history, setting zero weights: {self.reward_history}")
logger.error(f"Empty rewards history, setting zero weights: {self.reward_history}")
return accum

for snapshot in self.reward_history:
for uid_str, info in snapshot.items():
accum[int(uid_str)] += float(info["reward"])
# Get current active hotkeys for the current set of UIDs.
active_hotkeys: dict[int, str] = {}
for uid_str, info in self.reward_history[-1].items():
active_hotkeys[int(uid_str)] = info.get("hotkey")

# Accumulate rewards for each epoch only if hotkey was not changed for the given UID.
for epoch_info in self.reward_history:
for uid_str, info in epoch_info.items():
if active_hotkeys[int(uid_str)] == info["hotkey"]:
accum[int(uid_str)] += float(info["reward"])

avg = accum / len(self.reward_history)
return avg

async def _save_rewards(self, rewards: npt.NDArray[np.float32]):
async def _update_rewards(self, rewards: npt.NDArray[np.float32]):
"""Persist the latest epoch rewards.

The snapshot is appended to `reward_history` (bounded by `reward_average_len`) and the JSONL file at
Expand All @@ -142,20 +150,22 @@ async def _save_rewards(self, rewards: npt.NDArray[np.float32]):
if not isinstance(self.reward_history, deque):
self.reward_history = deque(maxlen=self.reward_history_len)

snapshot = {int(uid): {"reward": float(r)} for uid, r in enumerate(rewards)}
self.reward_history.append(snapshot)
hkeys = shared_settings.METAGRAPH.hotkeys
epoch_rewards: dict[int, dict[str, float | str]] = {}
for uid, reward in enumerate(rewards):
epoch_rewards[int(uid)] = {"reward": float(reward), "hotkey": hkeys[uid]}
self.reward_history.append(epoch_rewards)

tmp_path = self.reward_history_path.with_suffix(".jsonl.tmp")
block = getattr(shared_settings, "block", 0)
# block = getattr(shared_settings, "block", 0)

# Write results into tmp file, them move to the main rewards file to make write operation atomic.
tmp_path = self.reward_history_path.with_suffix(".jsonl.tmp")
try:
with tmp_path.open("w", encoding="utf-8") as file:
for snap in self.reward_history:
row: dict[str, Any] = {
"ts": datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") + "Z",
"block": block,
"rewards": {str(k): v["reward"] for k, v in snap.items()},
}
for epoch_rewards in self.reward_history:
row: dict[str, Any] = {}
for uid, info in epoch_rewards.items():
row[str(uid)] = {"reward": float(info["reward"]), "hotkey": info["hotkey"]}
file.write(json.dumps(row, separators=(",", ":")) + "\n")
tmp_path.replace(self.reward_history_path)
except Exception as exc:
Expand All @@ -166,7 +176,7 @@ async def _load_rewards(self):

Only the newest `reward_average_len` rows are retained.
"""
self.reward_history: deque[dict[int, dict[str, Any]]] | None = deque(maxlen=self.reward_history_len)
self.reward_history: deque[dict[int, dict[str, float | str]]] | None = deque(maxlen=self.reward_history_len)
if not self.reward_history_path.exists():
logger.info("No rewards file found - starting with empty history.")
return
Expand All @@ -175,13 +185,16 @@ async def _load_rewards(self):
with self.reward_history_path.open("r", encoding="utf-8") as file:
for line in file:
data = json.loads(line)
payload = data.get("rewards")
if payload is None:
if not data:
raise ValueError(f"Malformed weight history file: {data}")

self.reward_history.append({int(uid): {"reward": float(reward)} for uid, reward in payload.items()})
epoch_rewards: dict[int, dict[str, float | str]] = {}
for uid, info in data.items():
epoch_rewards[int(uid)] = {"reward": float(info["reward"]), "hotkey": info.get("hotkey")}

self.reward_history.append(epoch_rewards)
except BaseException as exc:
self.reward_history: deque[dict[int, dict[str, Any]]] | None = deque(maxlen=self.reward_history_len)
self.reward_history: deque[dict[int, dict[str, float | str]]] | None = deque(maxlen=self.reward_history_len)
logger.error(f"Couldn't load rewards from file, resetting weight history: {exc}")

@classmethod
Expand Down Expand Up @@ -269,7 +282,7 @@ async def run_step(self):
logger.error("No rewards were found, skipping weight setting")
return

await self._save_rewards(final_rewards)
await self._update_rewards(final_rewards)
averaged_rewards = await self._compute_avg_reward()
averaged_rewards[averaged_rewards < 0] = 0
averaged_rewards /= np.sum(averaged_rewards) + 1e-10
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "prompting"
version = "2.19.9"
version = "2.19.10"
description = "Subnetwork 1 runs on Bittensor and is maintained by Macrocosmos. It's an effort to create decentralised AI"
authors = ["Kalei Brady, Dmytro Bobrenko, Felix Quinque, Steffen Cruz, Richard Wardle"]
readme = "README.md"
Expand Down
4 changes: 2 additions & 2 deletions validator_api/chat_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ async def chat_completion(
uids: Optional[list[int]] = None,
num_miners: int = 5,
uid_tracker: UidTracker | None = None,
add_reliable_miners: int = 3,
add_reliable_miners: int = 0,
) -> tuple | StreamingResponse:
# TODO: Add docstring.
"""Handle chat completion with multiple miners in parallel."""
Expand All @@ -252,7 +252,7 @@ async def chat_completion(
primary_uids = filter_available_uids(
task=body.get("task"), model=body.get("model"), test=shared_settings.API_TEST_MODE, n_miners=num_miners
)
if uid_tracker is not None:
if uid_tracker is not None and add_reliable_miners > 0:
# Add reliable uids, or ones with highest success rate to guarantee completed stream.
reliable_uids = await uid_tracker.sample_reliable(
task=TaskType.Inference, amount=add_reliable_miners, success_rate=0.99, add_random_extra=False
Expand Down