diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index ac4f772e9..8cc7039c5 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -1,5 +1,4 @@ import asyncio -import datetime import json from collections import deque from pathlib import Path @@ -89,15 +88,17 @@ class WeightSetter(AsyncLoopRunner): """The weight setter looks at RewardEvents in the reward_events queue and sets the weights of the miners accordingly.""" sync: bool = True - interval: int = 60 * 21 + interval: int = 60 * 23 reward_events: list[list[WeightedRewardEvent]] | None = None weight_dict: dict[int, list[float]] | None = None weight_syncer: WeightSynchronizer | None = None + # Rewards moving average persistency. reward_history_path: Path = Path("validator_rewards.jsonl") - reward_history_len: int = 24 - # List of uids info per each epoch, e.g.: [{1: {"reward": 1.0}, 2: {"reward": 3.0}}]. - reward_history: deque[dict[int, dict[str, Any]]] | None = None + # Rewards moving average, 36 epochs = approx. 12 hours. + reward_history_len: int = 36 + # List of uids info per epoch, e.g.: [{1: {"reward": 1.0, "hotkey": "ABC"}, 2: {"reward": 3.0, "hotkey": "XYZ"}}]. + reward_history: deque[dict[int, dict[str, float | str]]] | None = None class Config: arbitrary_types_allowed = True @@ -120,17 +121,24 @@ async def _compute_avg_reward(self) -> npt.NDArray[np.float32]: num_uids = int(shared_settings.METAGRAPH.n.item()) accum = np.zeros(num_uids, dtype=np.float32) if not isinstance(self.reward_history, deque) or len(self.reward_history) == 0: - logger.warning(f"Empty rewards history, setting zero weights: {self.reward_history}") + logger.error(f"Empty rewards history, setting zero weights: {self.reward_history}") return accum - for snapshot in self.reward_history: - for uid_str, info in snapshot.items(): - accum[int(uid_str)] += float(info["reward"]) + # Get current active hotkeys for the current set of UIDs. + active_hotkeys: dict[int, str] = {} + for uid_str, info in self.reward_history[-1].items(): + active_hotkeys[int(uid_str)] = info.get("hotkey") + + # Accumulate rewards for each epoch only if hotkey was not changed for the given UID. + for epoch_info in self.reward_history: + for uid_str, info in epoch_info.items(): + if active_hotkeys[int(uid_str)] == info["hotkey"]: + accum[int(uid_str)] += float(info["reward"]) avg = accum / len(self.reward_history) return avg - async def _save_rewards(self, rewards: npt.NDArray[np.float32]): + async def _update_rewards(self, rewards: npt.NDArray[np.float32]): """Persist the latest epoch rewards. The snapshot is appended to `reward_history` (bounded by `reward_average_len`) and the JSONL file at @@ -142,20 +150,22 @@ async def _save_rewards(self, rewards: npt.NDArray[np.float32]): if not isinstance(self.reward_history, deque): self.reward_history = deque(maxlen=self.reward_history_len) - snapshot = {int(uid): {"reward": float(r)} for uid, r in enumerate(rewards)} - self.reward_history.append(snapshot) + hkeys = shared_settings.METAGRAPH.hotkeys + epoch_rewards: dict[int, dict[str, float | str]] = {} + for uid, reward in enumerate(rewards): + epoch_rewards[int(uid)] = {"reward": float(reward), "hotkey": hkeys[uid]} + self.reward_history.append(epoch_rewards) - tmp_path = self.reward_history_path.with_suffix(".jsonl.tmp") - block = getattr(shared_settings, "block", 0) + # block = getattr(shared_settings, "block", 0) + # Write results into tmp file, them move to the main rewards file to make write operation atomic. + tmp_path = self.reward_history_path.with_suffix(".jsonl.tmp") try: with tmp_path.open("w", encoding="utf-8") as file: - for snap in self.reward_history: - row: dict[str, Any] = { - "ts": datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") + "Z", - "block": block, - "rewards": {str(k): v["reward"] for k, v in snap.items()}, - } + for epoch_rewards in self.reward_history: + row: dict[str, Any] = {} + for uid, info in epoch_rewards.items(): + row[str(uid)] = {"reward": float(info["reward"]), "hotkey": info["hotkey"]} file.write(json.dumps(row, separators=(",", ":")) + "\n") tmp_path.replace(self.reward_history_path) except Exception as exc: @@ -166,7 +176,7 @@ async def _load_rewards(self): Only the newest `reward_average_len` rows are retained. """ - self.reward_history: deque[dict[int, dict[str, Any]]] | None = deque(maxlen=self.reward_history_len) + self.reward_history: deque[dict[int, dict[str, float | str]]] | None = deque(maxlen=self.reward_history_len) if not self.reward_history_path.exists(): logger.info("No rewards file found - starting with empty history.") return @@ -175,13 +185,16 @@ async def _load_rewards(self): with self.reward_history_path.open("r", encoding="utf-8") as file: for line in file: data = json.loads(line) - payload = data.get("rewards") - if payload is None: + if not data: raise ValueError(f"Malformed weight history file: {data}") - self.reward_history.append({int(uid): {"reward": float(reward)} for uid, reward in payload.items()}) + epoch_rewards: dict[int, dict[str, float | str]] = {} + for uid, info in data.items(): + epoch_rewards[int(uid)] = {"reward": float(info["reward"]), "hotkey": info.get("hotkey")} + + self.reward_history.append(epoch_rewards) except BaseException as exc: - self.reward_history: deque[dict[int, dict[str, Any]]] | None = deque(maxlen=self.reward_history_len) + self.reward_history: deque[dict[int, dict[str, float | str]]] | None = deque(maxlen=self.reward_history_len) logger.error(f"Couldn't load rewards from file, resetting weight history: {exc}") @classmethod @@ -269,7 +282,7 @@ async def run_step(self): logger.error("No rewards were found, skipping weight setting") return - await self._save_rewards(final_rewards) + await self._update_rewards(final_rewards) averaged_rewards = await self._compute_avg_reward() averaged_rewards[averaged_rewards < 0] = 0 averaged_rewards /= np.sum(averaged_rewards) + 1e-10 diff --git a/pyproject.toml b/pyproject.toml index 49e971c9e..1a67c7114 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "prompting" -version = "2.19.9" +version = "2.19.10" description = "Subnetwork 1 runs on Bittensor and is maintained by Macrocosmos. It's an effort to create decentralised AI" authors = ["Kalei Brady, Dmytro Bobrenko, Felix Quinque, Steffen Cruz, Richard Wardle"] readme = "README.md" diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index 81049a935..9efaa183b 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -237,7 +237,7 @@ async def chat_completion( uids: Optional[list[int]] = None, num_miners: int = 5, uid_tracker: UidTracker | None = None, - add_reliable_miners: int = 3, + add_reliable_miners: int = 0, ) -> tuple | StreamingResponse: # TODO: Add docstring. """Handle chat completion with multiple miners in parallel.""" @@ -252,7 +252,7 @@ async def chat_completion( primary_uids = filter_available_uids( task=body.get("task"), model=body.get("model"), test=shared_settings.API_TEST_MODE, n_miners=num_miners ) - if uid_tracker is not None: + if uid_tracker is not None and add_reliable_miners > 0: # Add reliable uids, or ones with highest success rate to guarantee completed stream. reliable_uids = await uid_tracker.sample_reliable( task=TaskType.Inference, amount=add_reliable_miners, success_rate=0.99, add_random_extra=False