From 39c98522de205c63ec2caef769273b62ea9d31aa Mon Sep 17 00:00:00 2001 From: bkb2135 Date: Sat, 9 Aug 2025 10:27:27 +0100 Subject: [PATCH 01/18] Add Spec Version for Weight Set --- apex/__init__.py | 11 +++++++++++ apex/common/async_chain.py | 2 ++ 2 files changed, 13 insertions(+) diff --git a/apex/__init__.py b/apex/__init__.py index 93de2e624..ceb6e43cb 100644 --- a/apex/__init__.py +++ b/apex/__init__.py @@ -7,6 +7,17 @@ __version__ = version("apex") +def _version_to_int(version_str: str) -> int: + version_split = version_str.split(".") + ["0", "0"] # in case a version doesn't have third element, e.g. 3.0 + major = int(version_split[0]) + minor = int(version_split[1]) + patch = int(version_split[2]) + return (10000 * major) + (100 * minor) + patch + + +__spec_version__ = _version_to_int(__version__) + + def setup_logger(log_file_path: str | Path | None = None, level: str = "INFO") -> Any: """Set up the loguru logger with optional file logging and specified log level. diff --git a/apex/common/async_chain.py b/apex/common/async_chain.py index 28453f86a..cd948ad11 100644 --- a/apex/common/async_chain.py +++ b/apex/common/async_chain.py @@ -5,6 +5,7 @@ from bittensor.core.metagraph import AsyncMetagraph from loguru import logger +from apex import __spec_version__ from apex.common.utils import async_cache _METAGRAPH_TTL: int = 10 * 60 @@ -132,6 +133,7 @@ async def set_weights(self, rewards: dict[str, float]) -> bool: netuid=self._netuid, uids=list(weights.keys()), weights=list(weights.values()), + version_key=__spec_version__, wait_for_inclusion=True, wait_for_finalization=True, ) From cc0484012a4ca51c6297daa58f841bbbafbe3368 Mon Sep 17 00:00:00 2001 From: bkb2135 Date: Sat, 9 Aug 2025 10:59:55 +0100 Subject: [PATCH 02/18] Update unit tests --- tests/common/mock_async_chain.py | 2 ++ tests/common/test_async_chain.py | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/tests/common/mock_async_chain.py b/tests/common/mock_async_chain.py index 2161c344b..7bbbb0980 100644 --- a/tests/common/mock_async_chain.py +++ b/tests/common/mock_async_chain.py @@ -54,6 +54,7 @@ async def set_weights( netuid: int, uids: Iterable[int], weights: Iterable[float], + version_key: int, wait_for_inclusion: bool, wait_for_finalization: bool, ) -> bool: @@ -62,6 +63,7 @@ async def set_weights( "netuid": netuid, "uids": list(uids), "weights": list(weights), + "version_key": version_key, "wait_for_inclusion": wait_for_inclusion, "wait_for_finalization": wait_for_finalization, } diff --git a/tests/common/test_async_chain.py b/tests/common/test_async_chain.py index 6d83fa262..a16d88ad2 100644 --- a/tests/common/test_async_chain.py +++ b/tests/common/test_async_chain.py @@ -121,6 +121,10 @@ async def test_set_weights_happy_path(monkeypatch): assert stub.last_set_weights is not None assert stub.last_set_weights["uids"] == [2] assert stub.last_set_weights["weights"] == [0.7] + # ensure we pass spec version as version_key + from apex import __spec_version__ + + assert stub.last_set_weights["version_key"] == __spec_version__ @pytest.mark.asyncio From 7c153f104b87309e7e9ac2da2f9acd27fe4791f1 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 12:10:29 +0200 Subject: [PATCH 03/18] Add logs, reduce frequency --- apex/validator/miner_sampler.py | 5 +++-- apex/validator/miner_scorer.py | 4 ++-- apex/validator/pipeline.py | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/apex/validator/miner_sampler.py b/apex/validator/miner_sampler.py index d8c71d16a..95f1543fc 100644 --- a/apex/validator/miner_sampler.py +++ b/apex/validator/miner_sampler.py @@ -38,7 +38,7 @@ def __init__( self, chain: AsyncChain, sample_mode: Literal["random", "sequential"] = "sequential", - sample_size: int = 50, + sample_size: int = 100, logger_db: LoggerDB | None = None, available_uids: Sequence[int] | None = None, available_addresses: Sequence[str] | None = None, @@ -151,9 +151,10 @@ async def query_generators(self, query: str) -> MinerGeneratorResults: hotkeys: list[str] = [] tasks: list[Coroutine[str, str, Any]] = [] + + logger.debug(f"Querying {len(miner_information)} miner generators") for miner_info in miner_information: hotkeys.append(miner_info.hotkey) - logger.debug(f"Querying miner generator at {miner_info.address} with uid: {miner_info.uid}") tasks.append(self.query_miners(body=body, endpoint=miner_info.address, hotkey=miner_info.hotkey)) generator_results = await asyncio.gather(*tasks) return MinerGeneratorResults(query=query, generator_hotkeys=hotkeys, generator_results=generator_results) diff --git a/apex/validator/miner_scorer.py b/apex/validator/miner_scorer.py index 5ee18c646..c2108a110 100644 --- a/apex/validator/miner_scorer.py +++ b/apex/validator/miner_scorer.py @@ -29,9 +29,9 @@ async def start_loop(self) -> None: self._running = True while self._running: await asyncio.sleep(self.interval) + logger.debug("Attempting to set weights") success = await self.set_scores() - if not success: - logger.error("Failed to set weights") + logger.log("INFO" if success else "ERROR", f"Set weights: {'success' if success else 'fail'}") async def shutdown(self) -> None: self._running = False diff --git a/apex/validator/pipeline.py b/apex/validator/pipeline.py index ea8dfaf9a..24d5b8118 100644 --- a/apex/validator/pipeline.py +++ b/apex/validator/pipeline.py @@ -26,10 +26,10 @@ def __init__( deep_research: DeepResearchBase, logger_apex: LoggerApex | None = None, num_consumers: int = 10, - timeout_consumer: float = 60, - timeout_producer: float = 6, + timeout_consumer: float = 180, + timeout_producer: float = 18, queue_size: int = 10_000, - redundancy_rate: float = 0.1, # The rate that references are generated in addition to generator steps + redundancy_rate: float = 0.05, # The rate that references are generated in addition to generator steps reference_rate: float = 0.5, # The rate that references are generated as opposed to generator steps ): self.config = config From 4b3562498cb9860348eb86fe05479d77a690b127 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 12:28:20 +0200 Subject: [PATCH 04/18] Add verbose logs --- README.md | 2 +- apex/validator/miner_scorer.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 0192cf38d..64ad1e079 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ Subnet 1 is the most intelligent inference model on Bittensor. As the first agen 3. **Install the project and its development dependencies:** ```bash - uv venv && uv python install 3.11 && uv python pin 3.11 && uv venv --python=3.11 && uv pip install -e '.[dev]' + uv venv --python=3.11 && uv pip install '.[dev]' ``` 4. **Activate python environment:** diff --git a/apex/validator/miner_scorer.py b/apex/validator/miner_scorer.py index c2108a110..df35a62aa 100644 --- a/apex/validator/miner_scorer.py +++ b/apex/validator/miner_scorer.py @@ -50,6 +50,7 @@ async def set_scores(self) -> bool: expose each one as plain python objects so that downstream code can work with them, and remove rows that are older than the time window. """ + logger.debug("Retrieving miner's performance history") async with self._db() as conn: # type: aiosqlite.Connection # Calculate the cutoff timestamp (current time - window hours). cutoff_timestamp = int(time.time() - SCORE_MA_WINDOW_HOURS * 3600) @@ -70,6 +71,7 @@ async def set_scores(self) -> bool: return False # 2. Iterate over the in-memory list so that the caller can process freely. + logger.debug("Pre-processing miner's rewards") hkey_agg_rewards: dict[str, float] = {} for generator_hotkey, generator_score, disc_hotkeys_json, disc_scores_json in rows: # Deserialize JSON columns. @@ -88,6 +90,7 @@ async def set_scores(self) -> bool: hkey_agg_rewards[hotkey] = float(hkey_agg_rewards.get(hotkey, 0.0)) + float(reward) # 3. Delete rows that are older than the time window. + logger.debug("Cleaning up miner's outdated history") await conn.execute( "DELETE FROM discriminator_results WHERE timestamp < ?", (cutoff_timestamp,), @@ -102,8 +105,10 @@ async def set_scores(self) -> bool: record_str: str = json.dumps(record) fh.write(f"{record_str}\n") # TODO: Flush the db only on set_weights_result is True. + logger.debug("Setting weights") set_weights_result = await self.chain.set_weights(hkey_agg_rewards) # 4. Flush all deletions in a single commit. + logger.debug("Updating rewards DB") await conn.commit() return set_weights_result From 915671f458a2d7d9e39567bd46de1ad5a3858aec Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 13:05:38 +0200 Subject: [PATCH 05/18] Add logs file and fix tests --- .gitignore | 1 + apex/__init__.py | 2 +- apex/validator/miner_scorer.py | 5 ++++- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index a68384611..70cbedd8b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +*.log requirements.txt **/*.ipynb debug_rewards.jsonl diff --git a/apex/__init__.py b/apex/__init__.py index ceb6e43cb..680ddbc64 100644 --- a/apex/__init__.py +++ b/apex/__init__.py @@ -44,4 +44,4 @@ def setup_logger(log_file_path: str | Path | None = None, level: str = "INFO") - return logger -setup_logger(level="DEBUG") +setup_logger(log_file_path="logs.log", level="DEBUG") diff --git a/apex/validator/miner_scorer.py b/apex/validator/miner_scorer.py index df35a62aa..46a225b78 100644 --- a/apex/validator/miner_scorer.py +++ b/apex/validator/miner_scorer.py @@ -31,7 +31,10 @@ async def start_loop(self) -> None: await asyncio.sleep(self.interval) logger.debug("Attempting to set weights") success = await self.set_scores() - logger.log("INFO" if success else "ERROR", f"Set weights: {'success' if success else 'fail'}") + if success: + logger.info(f"Set weights: {success}") + else: + logger.error("Failed to set weights") async def shutdown(self) -> None: self._running = False From 7c0219a87211178af0a93c39792f9675ae24bdc4 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 13:06:07 +0200 Subject: [PATCH 06/18] Bump patch version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index ec1d2dc28..31ed9eca4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "apex" -version = "3.0.0" +version = "3.0.1" description = "Bittensor Subnet 1: Apex" readme = "README.md" requires-python = "~=3.11" From dad2cb86124fc7df78f1a77a33f48aa2faf5bca2 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 14:07:39 +0200 Subject: [PATCH 07/18] Add more verbose logs --- apex/validator/miner_sampler.py | 5 ++++- validator.py | 10 ++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/apex/validator/miner_sampler.py b/apex/validator/miner_sampler.py index 95f1543fc..1014ce8a0 100644 --- a/apex/validator/miner_sampler.py +++ b/apex/validator/miner_sampler.py @@ -124,6 +124,9 @@ async def _sample_miners(self) -> list[MinerInfo]: else: raise ValueError(f"Unknown sampling mode: {self._sample_mode}") + logger.debug( + f"Sampled uids (sample size = {self._sample_size}): {sorted([miner.uid for miner in miners_sample])}" + ) return miners_sample async def query_miners(self, body: dict[str, Any], endpoint: str, hotkey: str | None = None) -> str: @@ -134,7 +137,7 @@ async def query_miners(self, body: dict[str, Any], endpoint: str, hotkey: str | self._chain.wallet.hotkey, body=json.dumps(body).encode("utf-8"), signed_for=hotkey ) async with session.post( - endpoint + "/v1/chat/completions", + f"{endpoint}/v1/chat/completions", headers=headers, json=body, ) as resp: diff --git a/validator.py b/validator.py index 754afa9a0..87f72a903 100644 --- a/validator.py +++ b/validator.py @@ -4,6 +4,7 @@ from loguru import logger +from apex import __version__ from apex.common.async_chain import AsyncChain from apex.common.config import Config from apex.services.deep_research.deep_research_langchain import DeepResearchLangchain @@ -32,25 +33,33 @@ async def read_args() -> argparse.Namespace: async def main() -> None: args = await read_args() config = Config.from_file(path=args.config) + logger.debug(f"Starting validator v{__version__} with config: {args.config}") chain = AsyncChain(**config.chain.kwargs) await chain.start() + logger.debug(f"Connected to the chain with coldkey '{chain.coldkey[:3]}**', hotkey '{chain.hotkey[:3]}**'") logger_db = LoggerDB(**config.logger_db.kwargs) asyncio.create_task(logger_db.start_loop()) + logger.debug(f"Started DB at: '{logger_db.db_path}") # logger_apex = LoggerApex(async_chain=chain) websearch = WebSearchTavily(**config.websearch.kwargs) + logger.debug("Started web search tool") miner_sampler = MinerSampler(chain=chain, logger_db=logger_db, **config.miner_sampler.kwargs) + logger.debug("Started miner sampler") miner_scorer = MinerScorer(chain=chain, **config.miner_scorer.kwargs) asyncio.create_task(miner_scorer.start_loop()) + logger.debug(f"Started miner scorer with interval={miner_scorer.interval}") llm = LLM(**config.llm.kwargs) + logger.debug("Started LLM provider") deep_research = DeepResearchLangchain(websearch=websearch, **config.deep_research.kwargs) + logger.debug("Started Deep Researcher") pipeline = Pipeline( config=config, @@ -62,6 +71,7 @@ async def main() -> None: **config.pipeline.kwargs, ) try: + logger.debug("Starting pipeline loop...") await pipeline.start_loop() except KeyboardInterrupt: logger.warning("Keyboard interrupt caught, exiting validator") From 7355c967526519f7ec2ced7b9be1407819b7e670 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 14:10:25 +0200 Subject: [PATCH 08/18] Reduce loops to 5 --- apex/validator/pipeline.py | 2 +- validator.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apex/validator/pipeline.py b/apex/validator/pipeline.py index 24d5b8118..bc4adbc25 100644 --- a/apex/validator/pipeline.py +++ b/apex/validator/pipeline.py @@ -25,7 +25,7 @@ def __init__( llm: LLMBase, deep_research: DeepResearchBase, logger_apex: LoggerApex | None = None, - num_consumers: int = 10, + num_consumers: int = 5, timeout_consumer: float = 180, timeout_producer: float = 18, queue_size: int = 10_000, diff --git a/validator.py b/validator.py index 87f72a903..bc1161e19 100644 --- a/validator.py +++ b/validator.py @@ -37,11 +37,11 @@ async def main() -> None: chain = AsyncChain(**config.chain.kwargs) await chain.start() - logger.debug(f"Connected to the chain with coldkey '{chain.coldkey[:3]}**', hotkey '{chain.hotkey[:3]}**'") + logger.debug(f"Connected to the chain with coldkey '{chain.coldkey[:3]}***', hotkey '{chain.hotkey[:3]}***'") logger_db = LoggerDB(**config.logger_db.kwargs) asyncio.create_task(logger_db.start_loop()) - logger.debug(f"Started DB at: '{logger_db.db_path}") + logger.debug(f"Started DB at: '{logger_db.db_path}'") # logger_apex = LoggerApex(async_chain=chain) From 7c22d89b26e466fd820475fe2cef5876e170e380 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 14:31:47 +0200 Subject: [PATCH 09/18] Adjust logging --- validator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator.py b/validator.py index bc1161e19..5146ab1a6 100644 --- a/validator.py +++ b/validator.py @@ -37,7 +37,7 @@ async def main() -> None: chain = AsyncChain(**config.chain.kwargs) await chain.start() - logger.debug(f"Connected to the chain with coldkey '{chain.coldkey[:3]}***', hotkey '{chain.hotkey[:3]}***'") + logger.debug(f"Connected to the chain with coldkey '{chain.coldkey[:3]}***', hotkey '{chain.hotkey[:2]}***'") logger_db = LoggerDB(**config.logger_db.kwargs) asyncio.create_task(logger_db.start_loop()) From 06c43fcc83c008a2dceb906d4eee354b7d97dae7 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 14:33:54 +0200 Subject: [PATCH 10/18] Adjust timeouts --- apex/validator/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apex/validator/pipeline.py b/apex/validator/pipeline.py index bc4adbc25..023b9fa51 100644 --- a/apex/validator/pipeline.py +++ b/apex/validator/pipeline.py @@ -27,7 +27,7 @@ def __init__( logger_apex: LoggerApex | None = None, num_consumers: int = 5, timeout_consumer: float = 180, - timeout_producer: float = 18, + timeout_producer: float = 36, queue_size: int = 10_000, redundancy_rate: float = 0.05, # The rate that references are generated in addition to generator steps reference_rate: float = 0.5, # The rate that references are generated as opposed to generator steps From cebee2153e2a70f5da673e2d31752131d66a3238 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 16:31:29 +0200 Subject: [PATCH 11/18] Fix sampling --- apex/validator/miner_sampler.py | 17 ++++++++++------- tests/validator/test_miner_sampler.py | 12 ++++++++---- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/apex/validator/miner_sampler.py b/apex/validator/miner_sampler.py index 1014ce8a0..665956479 100644 --- a/apex/validator/miner_sampler.py +++ b/apex/validator/miner_sampler.py @@ -2,6 +2,7 @@ import json import random import time +from collections import deque from collections.abc import Coroutine, Sequence from typing import Any, Literal @@ -69,7 +70,8 @@ def __init__( if self._available_uids and self._available_addresses: equal_length = len(self._available_uids) == len(self._available_addresses) assert equal_length, "Test UIDs and addresses must be the same length." - self._remaining_epoch_miners: set[MinerInfo] = set() + self._epoch_deque: deque[MinerInfo] = deque() + self._sample_lock = asyncio.Lock() @async_cache(_TTL_UIDS_RESYNC) async def _get_all_miners(self) -> list[MinerInfo]: @@ -114,12 +116,13 @@ async def _sample_miners(self) -> list[MinerInfo]: miners_sample = random.sample(miners, self._sample_size) elif self._sample_mode == "sequential": - if len(self._remaining_epoch_miners) < self._sample_size: - self._remaining_epoch_miners = set(miners) - logger.debug(f"Starting new miner sampling epoch, miners amount: {len(self._remaining_epoch_miners)}") - indices_sample = sorted(random.sample(range(len(self._remaining_epoch_miners)), self._sample_size)) - miners_sample = [miners[i] for i in indices_sample] - self._remaining_epoch_miners -= set(miners_sample) + async with self._sample_lock: + miners_sample: list[MinerInfo] = [] + while len(miners_sample) < self._sample_size: + if not self._epoch_deque: + # Get shuffled deque of miners. + self._epoch_deque: deque[MinerInfo] = deque(random.sample(miners, len(miners))) + miners_sample.append(self._epoch_deque.popleft()) else: raise ValueError(f"Unknown sampling mode: {self._sample_mode}") diff --git a/tests/validator/test_miner_sampler.py b/tests/validator/test_miner_sampler.py index d2b8c04ba..d184ed41f 100644 --- a/tests/validator/test_miner_sampler.py +++ b/tests/validator/test_miner_sampler.py @@ -166,20 +166,24 @@ async def test_sample_miners_sequential(monkeypatch: MagicMock, miner_sampler: M monkeypatch.setattr(miner_sampler, "_get_all_miners", AsyncMock(return_value=all_miners)) # 1st call in epoch. - with patch("random.sample", return_value=[0, 2]): + with patch( + "random.sample", + return_value=[MinerInfo(uid=1, address="", hotkey="1"), MinerInfo(uid=5, address="", hotkey="5")], + ): miners1 = await miner_sampler._sample_miners() assert len(miners1) == 2 assert {m.uid for m in miners1} == {all_miners[0].uid, all_miners[2].uid} - assert len(miner_sampler._remaining_epoch_miners) == 1 # 2nd call, new epoch starts as remaining (1) < sample_size (2). - with patch("random.sample", return_value=[1, 2]): + with patch( + "random.sample", + return_value=[MinerInfo(uid=3, address="", hotkey="3"), MinerInfo(uid=5, address="", hotkey="5")], + ): miners2 = await miner_sampler._sample_miners() assert len(miners2) == 2 assert {m.uid for m in miners2} == {all_miners[1].uid, all_miners[2].uid} - assert len(miner_sampler._remaining_epoch_miners) == 1 @pytest.mark.asyncio From 784715938445ef12bd8ca9523a8f404ea8c43015 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 17:16:05 +0200 Subject: [PATCH 12/18] Add error handling --- apex/common/async_chain.py | 26 +++++++++++++------------- apex/validator/miner_scorer.py | 5 +++-- apex/validator/pipeline.py | 26 ++++++++++++++++---------- 3 files changed, 32 insertions(+), 25 deletions(-) diff --git a/apex/common/async_chain.py b/apex/common/async_chain.py index cd948ad11..e9b91d0b2 100644 --- a/apex/common/async_chain.py +++ b/apex/common/async_chain.py @@ -112,22 +112,22 @@ def network(self) -> list[str]: return self._network async def set_weights(self, rewards: dict[str, float]) -> bool: - metagraph = await self.metagraph() - subtensor = await self.subtensor() - weights: dict[int, float] = {} + try: + metagraph = await self.metagraph() + subtensor = await self.subtensor() + weights: dict[int, float] = {} - for hotkey, reward in rewards.items(): - try: - idx = metagraph.hotkeys.index(hotkey) - except ValueError: - # Hotkey not found in the metagraph (e.g., deregistered). Skip it. - continue + for hotkey, reward in rewards.items(): + try: + idx = metagraph.hotkeys.index(hotkey) + except ValueError: + # Hotkey not found in the metagraph (e.g., deregistered). Skip it. + continue - uid = metagraph.uids[idx] - weights[uid] = reward + uid = metagraph.uids[idx] + weights[uid] = reward - # Set the weights. - try: + # Set the weights. result = await subtensor.set_weights( wallet=self._wallet, netuid=self._netuid, diff --git a/apex/validator/miner_scorer.py b/apex/validator/miner_scorer.py index 46a225b78..0828217ff 100644 --- a/apex/validator/miner_scorer.py +++ b/apex/validator/miner_scorer.py @@ -7,6 +7,7 @@ from pathlib import Path import aiosqlite +import numpy as np from loguru import logger from apex.common.async_chain import AsyncChain @@ -32,7 +33,7 @@ async def start_loop(self) -> None: logger.debug("Attempting to set weights") success = await self.set_scores() if success: - logger.info(f"Set weights: {success}") + logger.info("Successfully set weights") else: logger.error("Failed to set weights") @@ -108,7 +109,7 @@ async def set_scores(self) -> bool: record_str: str = json.dumps(record) fh.write(f"{record_str}\n") # TODO: Flush the db only on set_weights_result is True. - logger.debug("Setting weights") + logger.debug(f"Setting weights, mean reward={np.mean(list(hkey_agg_rewards.values())):.4f}") set_weights_result = await self.chain.set_weights(hkey_agg_rewards) # 4. Flush all deletions in a single commit. diff --git a/apex/validator/pipeline.py b/apex/validator/pipeline.py index 023b9fa51..48f8427a8 100644 --- a/apex/validator/pipeline.py +++ b/apex/validator/pipeline.py @@ -81,21 +81,27 @@ async def run_single(self, task: QueryTask) -> str: logger.debug("Generating task query") query = await generate_query(llm=self.llm, websearch=self.websearch) + reference = None + tool_history = [] if random.random() < self.reference_rate: + try: + generator_results = None + ground_truth = 0 + logger.debug(f"Generating task reference for query: {query[:20]}..") + reference, tool_history = await generate_reference(llm=self.deep_research, query=query) + except BaseException as exc: + logger.exception(f"Failed to generate reference: {exc}") + + if reference is None: ground_truth = 1 logger.debug(f"Querying generators with query: {query[:20]}..") generator_results = await self.miner_registry.query_generators(query=query) if random.random() < self.redundancy_rate: - logger.debug(f"Generating redundant task reference for query: {query[:20]}..") - reference, tool_history = await generate_reference(llm=self.deep_research, query=query) - else: - reference = None - tool_history = [] - else: - generator_results = None - ground_truth = 0 - logger.debug(f"Generating task reference for query: {query[:20]}..") - reference, tool_history = await generate_reference(llm=self.deep_research, query=query) + try: + logger.debug(f"Generating redundant task reference for query: {query[:20]}..") + reference, tool_history = await generate_reference(llm=self.deep_research, query=query) + except BaseException as exc: + logger.warning(f"Failed to generate redundant reference: {exc}") discriminator_results = await self.miner_registry.query_discriminators( query=query, generator_results=generator_results, reference=reference, ground_truth=ground_truth From d8e96d6cd3341dc47d4cd7f69087d43530046d69 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 17:20:47 +0200 Subject: [PATCH 13/18] Reduce logs persistency --- apex/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apex/__init__.py b/apex/__init__.py index 680ddbc64..367567422 100644 --- a/apex/__init__.py +++ b/apex/__init__.py @@ -39,7 +39,7 @@ def setup_logger(log_file_path: str | Path | None = None, level: str = "INFO") - # Add file handler if a path is provided. if log_file_path: file_log_format = "{time:YYYY-MM-DD HH:mm:ss} [{file}:{line}] {message}" - logger.add(str(log_file_path), level=level, format=file_log_format, rotation="10 MB", retention="7 days") + logger.add(str(log_file_path), level=level, format=file_log_format, rotation="5 MB", retention="3 days") return logger From 86e61c3b1a45f227bb8725d5f53292e89115f0d5 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 19:05:59 +0200 Subject: [PATCH 14/18] Fix set weights result parsing --- apex/common/async_chain.py | 9 ++++----- apex/validator/miner_scorer.py | 3 ++- validator.py | 5 ++++- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/apex/common/async_chain.py b/apex/common/async_chain.py index e9b91d0b2..c02db9bbf 100644 --- a/apex/common/async_chain.py +++ b/apex/common/async_chain.py @@ -128,7 +128,7 @@ async def set_weights(self, rewards: dict[str, float]) -> bool: weights[uid] = reward # Set the weights. - result = await subtensor.set_weights( + success, err = await subtensor.set_weights( wallet=self._wallet, netuid=self._netuid, uids=list(weights.keys()), @@ -137,10 +137,9 @@ async def set_weights(self, rewards: dict[str, float]) -> bool: wait_for_inclusion=True, wait_for_finalization=True, ) - if not result: - logger.error(f"Error setting weights: {result}") - return False - return True + if not success: + logger.error(f"Error setting weights: {err}") + return success except BaseException as exc: logger.exception(f"Error setting weights: {exc}") return False diff --git a/apex/validator/miner_scorer.py b/apex/validator/miner_scorer.py index 0828217ff..655a06333 100644 --- a/apex/validator/miner_scorer.py +++ b/apex/validator/miner_scorer.py @@ -109,7 +109,8 @@ async def set_scores(self) -> bool: record_str: str = json.dumps(record) fh.write(f"{record_str}\n") # TODO: Flush the db only on set_weights_result is True. - logger.debug(f"Setting weights, mean reward={np.mean(list(hkey_agg_rewards.values())):.4f}") + rewards_array = np.array(list(hkey_agg_rewards.values())) + logger.debug(f"Setting weights, reward mean={rewards_array.mean():.4f} min={rewards_array.min():.4f}") set_weights_result = await self.chain.set_weights(hkey_agg_rewards) # 4. Flush all deletions in a single commit. diff --git a/validator.py b/validator.py index 5146ab1a6..b0ae22229 100644 --- a/validator.py +++ b/validator.py @@ -37,7 +37,10 @@ async def main() -> None: chain = AsyncChain(**config.chain.kwargs) await chain.start() - logger.debug(f"Connected to the chain with coldkey '{chain.coldkey[:3]}***', hotkey '{chain.hotkey[:2]}***'") + logger.debug( + f"Connected to the chain netuid={chain.netuid} with coldkey '{chain.coldkey[:2]}***', " + f"hotkey '{chain.hotkey[:2]}***'" + ) logger_db = LoggerDB(**config.logger_db.kwargs) asyncio.create_task(logger_db.start_loop()) From 53a5f29294d66ff87e5b443f426419e27fe89b2e Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 20:29:07 +0200 Subject: [PATCH 15/18] Revert sample size to 50 --- apex/validator/miner_sampler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apex/validator/miner_sampler.py b/apex/validator/miner_sampler.py index 665956479..3b900ec91 100644 --- a/apex/validator/miner_sampler.py +++ b/apex/validator/miner_sampler.py @@ -39,7 +39,7 @@ def __init__( self, chain: AsyncChain, sample_mode: Literal["random", "sequential"] = "sequential", - sample_size: int = 100, + sample_size: int = 50, logger_db: LoggerDB | None = None, available_uids: Sequence[int] | None = None, available_addresses: Sequence[str] | None = None, From 4583804d4c750d5b80ec3453a0db5a6b540e0ab5 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 22:55:06 +0200 Subject: [PATCH 16/18] Fix mypy --- apex/common/async_chain.py | 7 +++---- apex/validator/miner_sampler.py | 8 ++++---- apex/validator/pipeline.py | 2 +- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/apex/common/async_chain.py b/apex/common/async_chain.py index c02db9bbf..dbaa73ade 100644 --- a/apex/common/async_chain.py +++ b/apex/common/async_chain.py @@ -111,7 +111,7 @@ def netuid(self) -> int: def network(self) -> list[str]: return self._network - async def set_weights(self, rewards: dict[str, float]) -> bool: + async def set_weights(self, rewards: dict[str, float]) -> bool: # type: ignore try: metagraph = await self.metagraph() subtensor = await self.subtensor() @@ -127,7 +127,6 @@ async def set_weights(self, rewards: dict[str, float]) -> bool: uid = metagraph.uids[idx] weights[uid] = reward - # Set the weights. success, err = await subtensor.set_weights( wallet=self._wallet, netuid=self._netuid, @@ -138,10 +137,10 @@ async def set_weights(self, rewards: dict[str, float]) -> bool: wait_for_finalization=True, ) if not success: - logger.error(f"Error setting weights: {err}") + logger.error(f"Error during weight set: {err}") return success except BaseException as exc: - logger.exception(f"Error setting weights: {exc}") + logger.exception(f"Error during weight set: {exc}") return False async def mask_network(self) -> list[str]: diff --git a/apex/validator/miner_sampler.py b/apex/validator/miner_sampler.py index 3b900ec91..f4948efe2 100644 --- a/apex/validator/miner_sampler.py +++ b/apex/validator/miner_sampler.py @@ -112,16 +112,16 @@ async def _get_all_miners(self) -> list[MinerInfo]: async def _sample_miners(self) -> list[MinerInfo]: miners = await self._get_all_miners() + miners_sample: list[MinerInfo] = [] if self._sample_mode == "random": miners_sample = random.sample(miners, self._sample_size) elif self._sample_mode == "sequential": async with self._sample_lock: - miners_sample: list[MinerInfo] = [] while len(miners_sample) < self._sample_size: if not self._epoch_deque: # Get shuffled deque of miners. - self._epoch_deque: deque[MinerInfo] = deque(random.sample(miners, len(miners))) + self._epoch_deque = deque(random.sample(miners, len(miners))) miners_sample.append(self._epoch_deque.popleft()) else: @@ -224,7 +224,7 @@ async def query_discriminators( choice_content = "None" parsed_discriminator_results.append(choice_content) - # Apply scoring logic based on selected generator type + # Apply scoring logic based on selected generator type. if choice_content == str(ground_truth): discriminator_score = score_per_miner else: @@ -232,7 +232,7 @@ async def query_discriminators( discriminator_results_float.append(discriminator_score) - # Generator result is 1 minus sum of discriminator results + # Generator result is 1 minus sum of discriminator results. generator_result_float = 1.0 - sum(discriminator_results_float) miner_discriminator_results = MinerDiscriminatorResults( query=query, diff --git a/apex/validator/pipeline.py b/apex/validator/pipeline.py index 48f8427a8..26b55ca1c 100644 --- a/apex/validator/pipeline.py +++ b/apex/validator/pipeline.py @@ -82,7 +82,7 @@ async def run_single(self, task: QueryTask) -> str: query = await generate_query(llm=self.llm, websearch=self.websearch) reference = None - tool_history = [] + tool_history: list[dict[str, str]] = [] if random.random() < self.reference_rate: try: generator_results = None From 8fdecf6d6d6673f0a1bca9ab10d867665ad06c3d Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 23:04:45 +0200 Subject: [PATCH 17/18] Fix mypy --- .pre-commit-config.yaml | 19 +++++++++++++++++++ apex/common/async_chain.py | 4 ++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7852ed5cc..e43ae52d9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -20,4 +20,23 @@ repos: hooks: - id: ruff args: [--fix] + stages: [pre-commit] - id: ruff-format + stages: [pre-commit] + +- repo: local + hooks: + - id: mypy + name: mypy + entry: mypy . + language: system + pass_filenames: false + stages: [pre-commit] + + # Run tests on push so local "push" matches CI. + - id: pytest + name: pytest + entry: pytest tests/ --verbose --failed-first --exitfirst --disable-warnings + language: system + pass_filenames: false + stages: [pre-push] diff --git a/apex/common/async_chain.py b/apex/common/async_chain.py index dbaa73ade..a87d96a88 100644 --- a/apex/common/async_chain.py +++ b/apex/common/async_chain.py @@ -111,7 +111,7 @@ def netuid(self) -> int: def network(self) -> list[str]: return self._network - async def set_weights(self, rewards: dict[str, float]) -> bool: # type: ignore + async def set_weights(self, rewards: dict[str, float]) -> bool: try: metagraph = await self.metagraph() subtensor = await self.subtensor() @@ -138,7 +138,7 @@ async def set_weights(self, rewards: dict[str, float]) -> bool: # type: ignore ) if not success: logger.error(f"Error during weight set: {err}") - return success + return bool(success) except BaseException as exc: logger.exception(f"Error during weight set: {exc}") return False From 710e4d1a494426692e027319ff35d22663f46e98 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 9 Aug 2025 23:15:20 +0200 Subject: [PATCH 18/18] Fix tests --- .pre-commit-config.yaml | 3 +-- apex/validator/miner_scorer.py | 7 +++++-- tests/common/mock_async_chain.py | 4 ++-- tests/common/test_async_chain.py | 4 +--- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e43ae52d9..08c1e7862 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -33,10 +33,9 @@ repos: pass_filenames: false stages: [pre-commit] - # Run tests on push so local "push" matches CI. - id: pytest name: pytest entry: pytest tests/ --verbose --failed-first --exitfirst --disable-warnings language: system pass_filenames: false - stages: [pre-push] + stages: [pre-commit] diff --git a/apex/validator/miner_scorer.py b/apex/validator/miner_scorer.py index 655a06333..6539e6689 100644 --- a/apex/validator/miner_scorer.py +++ b/apex/validator/miner_scorer.py @@ -109,8 +109,11 @@ async def set_scores(self) -> bool: record_str: str = json.dumps(record) fh.write(f"{record_str}\n") # TODO: Flush the db only on set_weights_result is True. - rewards_array = np.array(list(hkey_agg_rewards.values())) - logger.debug(f"Setting weights, reward mean={rewards_array.mean():.4f} min={rewards_array.min():.4f}") + if hkey_agg_rewards: + rewards_array = np.array(list(hkey_agg_rewards.values())) + logger.debug(f"Setting weights, reward mean={rewards_array.mean():.4f} min={rewards_array.min():.4f}") + else: + logger.warning(f"Setting empty rewards: {hkey_agg_rewards}") set_weights_result = await self.chain.set_weights(hkey_agg_rewards) # 4. Flush all deletions in a single commit. diff --git a/tests/common/mock_async_chain.py b/tests/common/mock_async_chain.py index 7bbbb0980..1c0312fdd 100644 --- a/tests/common/mock_async_chain.py +++ b/tests/common/mock_async_chain.py @@ -57,7 +57,7 @@ async def set_weights( version_key: int, wait_for_inclusion: bool, wait_for_finalization: bool, - ) -> bool: + ) -> tuple[bool, str | None]: self.last_set_weights = { "wallet": wallet, "netuid": netuid, @@ -67,7 +67,7 @@ async def set_weights( "wait_for_inclusion": wait_for_inclusion, "wait_for_finalization": wait_for_finalization, } - return self.weights_result + return self.weights_result, "" def patch_wallet(monkeypatch: pytest.MonkeyPatch) -> None: diff --git a/tests/common/test_async_chain.py b/tests/common/test_async_chain.py index a16d88ad2..85092880c 100644 --- a/tests/common/test_async_chain.py +++ b/tests/common/test_async_chain.py @@ -2,6 +2,7 @@ import pytest +from apex import __spec_version__ from apex.common.async_chain import AsyncChain # noqa: E402 from tests.common.mock_async_chain import DummyMetagraph, DummySubtensor, patch_subtensor, patch_wallet @@ -121,9 +122,6 @@ async def test_set_weights_happy_path(monkeypatch): assert stub.last_set_weights is not None assert stub.last_set_weights["uids"] == [2] assert stub.last_set_weights["weights"] == [0.7] - # ensure we pass spec version as version_key - from apex import __spec_version__ - assert stub.last_set_weights["version_key"] == __spec_version__