Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apex/common/async_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def __init__(self, coldkey: str, hotkey: str, netuid: int, network: list[str] |
if isinstance(network, str):
network = [network]
self._network: list[str] = network

self._coldkey = coldkey
self._hotkey = hotkey
self._netuid = netuid
Expand Down
1 change: 1 addition & 0 deletions apex/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Config(BaseModel):
chain: ConfigClass = Field(default_factory=ConfigClass)
websearch: ConfigClass = Field(default_factory=ConfigClass)
logger_db: ConfigClass = Field(default_factory=ConfigClass)
weight_syncer: ConfigClass = Field(default_factory=ConfigClass)
miner_sampler: ConfigClass = Field(default_factory=ConfigClass)
miner_scorer: ConfigClass = Field(default_factory=ConfigClass)
llm: ConfigClass = Field(default_factory=ConfigClass)
Expand Down
8 changes: 8 additions & 0 deletions apex/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
TEMPERATURE: float = 0.1
WEBPAGE_MAXSIZE: int = 500
VALIDATOR_REFERENCE_LABEL = "Validator"
VALIDATOR_VERIFIED_HOTKEYS = {
"5CGLCBndTR1BvQZzn429ckT8GyxduzyjMgt4K1UVTYa8gKfb": "167.99.236.79:8001", # Macrocosmos.
"5CUbyC2Ez7tWYYmnFSSwjqkw26dFNo9cXH8YmcxBSfxi2XSG": None, # Yuma.
"5C8Em1kDZi5rxgDN4zZtfoT7dUqJ7FFbTzS3yTP5GPgVUsn1": None, # RoundTable21.
"5HmkM6X1D3W3CuCSPuHhrbYyZNBy2aGAiZy9NczoJmtY25H7": None, # Crucible.
"5GeR3cDuuFKJ7p66wKGjY65MWjWnYqffq571ZMV4gKMnJqK5": None, # OTF.
"5D1saVvssckE1XoPwPzdHrqYZtvBJ3vESsrPNxZ4zAxbKGs1": None, # Rizzo.
}


_ENGLISH_WORDS: tuple[str, ...] | None = None
Expand Down
151 changes: 84 additions & 67 deletions apex/common/epistula.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# import json
import json
import time
from hashlib import sha256
from math import ceil
from typing import Any
from typing import Annotated, Any
from uuid import uuid4

# from fastapi import HTTPException, Request
# from loguru import logger
from fastapi import HTTPException, Request
from loguru import logger
from substrateinterface import Keypair

from apex.common.async_chain import AsyncChain
from apex.common.constants import VALIDATOR_VERIFIED_HOTKEYS


async def generate_header(
hotkey: Keypair,
Expand Down Expand Up @@ -38,66 +41,80 @@ async def generate_header(
return headers


# def verify_signature(
# signature: str, body: bytes, timestamp: int, uuid: str, signed_for: str, signed_by: str, now: float
# ) -> Annotated[str, "Error Message"] | None:
# if not isinstance(signature, str):
# return "Invalid Signature"
# timestamp = int(timestamp)
# if not isinstance(timestamp, int):
# return "Invalid Timestamp"
# if not isinstance(signed_by, str):
# return "Invalid Sender key"
# if not isinstance(signed_for, str):
# return "Invalid receiver key"
# if not isinstance(uuid, str):
# return "Invalid uuid"
# if not isinstance(body, bytes):
# return "Body is not of type bytes"
# allowed_delta_ms = 8000
# keypair = Keypair(ss58_address=signed_by)
# if timestamp + allowed_delta_ms < now:
# return "Request is too stale"
# message = f"{sha256(body).hexdigest()}.{uuid}.{timestamp}.{signed_for}"
# verified = keypair.verify(message, signature)
# if not verified:
# return "Signature Mismatch"
# return None
#
#
# async def verify_weight_signature(request: Request):
# signed_by = request.headers.get("Epistula-Signed-By")
# signed_for = request.headers.get("Epistula-Signed-For")
# if not signed_by or not signed_for:
# raise HTTPException(400, "Missing Epistula-Signed-* headers")
#
# if signed_for != shared_settings.WALLET.hotkey.ss58_address:
# logger.error("Bad Request, message is not intended for self")
# raise HTTPException(status_code=400, detail="Bad Request, message is not intended for self")
# validator_hotkeys = [shared_settings.METAGRAPH.hotkeys[uid] for uid in WHITELISTED_VALIDATORS_UIDS]
# if signed_by not in validator_hotkeys:
# logger.error(f"Signer not the expected ss58 address: {signed_by}")
# raise HTTPException(status_code=401, detail="Signer not the expected ss58 address")
#
# now = time.time()
# body: bytes = await request.body()
# try:
# payload = json.loads(body)
# except json.JSONDecodeError:
# raise HTTPException(400, "Invalid JSON body")
#
# if payload.get("uid") != get_uid_from_hotkey(signed_by):
# raise HTTPException(400, "Invalid uid in body")
#
# err = verify_signature(
# request.headers.get("Epistula-Request-Signature"),
# body,
# request.headers.get("Epistula-Timestamp"),
# request.headers.get("Epistula-Uuid"),
# signed_for,
# signed_by,
# now,
# )
# if err:
# logger.error(err)
# raise HTTPException(status_code=400, detail=err)
def verify_signature(
signature: str | None,
body: bytes,
timestamp: str | None,
uuid: str | None,
signed_for: str,
signed_by: str,
now: float,
) -> Annotated[str, "Error Message"] | None:
if not isinstance(signature, str):
return "Invalid Signature"
if not isinstance(timestamp, str) or not timestamp.isdigit():
return "Invalid Timestamp"
timestamp_as_int = int(timestamp)
if not isinstance(signed_by, str):
return "Invalid Sender key"
if not isinstance(signed_for, str):
return "Invalid receiver key"
if not isinstance(uuid, str):
return "Invalid uuid"
if not isinstance(body, bytes):
return "Body is not of type bytes"
allowed_delta_ms = 8000
keypair = Keypair(ss58_address=signed_by)
if timestamp_as_int + allowed_delta_ms < now:
return "Request is too stale"
message = f"{sha256(body).hexdigest()}.{uuid}.{timestamp}.{signed_for}"
verified = keypair.verify(message, signature)
if not verified:
return "Signature Mismatch"
return None


async def verify_validator_signature(request: Request, chain: AsyncChain, min_stake: float = 1024) -> None:
signed_by = request.headers.get("Epistula-Signed-By")
signed_for = request.headers.get("Epistula-Signed-For")
if not signed_by or not signed_for:
logger.error("Missing Epistula-Signed-* headers")
raise HTTPException(400, "Missing Epistula-Signed-* headers")

wallet = chain.wallet
if signed_for != wallet.hotkey.ss58_address:
logger.error("Bad Request, message is not intended for self")
raise HTTPException(status_code=400, detail="Bad Request, message is not intended for self")

is_validator = True
if min_stake > 0:
metagraph = await chain.metagraph()
try:
caller_uid = metagraph.hotkeys.index(signed_by)
except ValueError as exc:
raise HTTPException(status_code=401, detail="Signer is not in metagraph") from exc
is_validator = metagraph.stake[caller_uid] > min_stake

if signed_by not in VALIDATOR_VERIFIED_HOTKEYS and not is_validator:
logger.error(f"Signer not the expected ss58 address: {signed_by}")
raise HTTPException(status_code=401, detail="Signer not the expected ss58 address")

now = time.time()
body: bytes = await request.body()
try:
json.loads(body)
except json.JSONDecodeError as exc:
raise HTTPException(400, "Invalid JSON body") from exc

err = verify_signature(
request.headers.get("Epistula-Request-Signature"),
body,
request.headers.get("Epistula-Timestamp"),
request.headers.get("Epistula-Uuid"),
signed_for,
signed_by,
now,
)
if err:
logger.error(err)
raise HTTPException(status_code=400, detail=err)
22 changes: 19 additions & 3 deletions apex/validator/miner_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,27 @@

from apex.common.async_chain import AsyncChain
from apex.common.constants import VALIDATOR_REFERENCE_LABEL
from apex.validator.weight_syncer import WeightSyncer

# Scoring moving average in hours. Set to be: immunity_period - post_reg_threshold.
SCORE_MA_WINDOW_HOURS = 23.75
SCORE_INTERVAL_DEFAULT = 22 * 60


class MinerScorer:
def __init__(self, chain: AsyncChain, interval: float = SCORE_INTERVAL_DEFAULT, debug: bool = False):
def __init__(
self,
chain: AsyncChain,
weight_syncer: WeightSyncer | None = None,
interval: float = SCORE_INTERVAL_DEFAULT,
debug: bool = False,
):
self.chain = chain
self.interval = interval
self._running = True
self._debug = debug
self._weight_syncer = weight_syncer
self._debug_rewards_path = Path("debug_rewards.jsonl")
self._running = True

async def start_loop(self) -> None:
self._running = True
Expand Down Expand Up @@ -108,12 +116,20 @@ async def set_scores(self) -> bool:
with self._debug_rewards_path.open("a+") as fh:
record_str: str = json.dumps(record)
fh.write(f"{record_str}\n")
# TODO: Flush the db only on set_weights_result is True.

if self._weight_syncer is not None:
try:
hkey_agg_rewards = await self._weight_syncer.compute_weighted_rewards(hkey_agg_rewards)
except BaseException as exc:
logger.error(f"Failed to compute weighted average rewards over the network, skipping: {exc}")

if hkey_agg_rewards:
rewards_array = np.array(list(hkey_agg_rewards.values()))
logger.debug(f"Setting weights, reward mean={rewards_array.mean():.4f} min={rewards_array.min():.4f}")
else:
logger.warning(f"Setting empty rewards: {hkey_agg_rewards}")

# TODO: Flush the db only on set_weights_result is True.
set_weights_result = await self.chain.set_weights(hkey_agg_rewards)

# 4. Flush all deletions in a single commit.
Expand Down
Loading