From 0a1f985a77247f3df48ecf407eb579608babc85a Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Fri, 31 Oct 2025 11:26:57 -0500 Subject: [PATCH 1/5] feat(hip-3-pusher): Configurable price sources --- apps/hip-3-pusher/config/config.toml | 30 +++-- apps/hip-3-pusher/pyproject.toml | 2 +- apps/hip-3-pusher/src/pusher/config.py | 45 +++++-- .../src/pusher/hermes_listener.py | 12 +- .../src/pusher/hyperliquid_listener.py | 20 +-- .../hip-3-pusher/src/pusher/lazer_listener.py | 15 +-- apps/hip-3-pusher/src/pusher/price_state.py | 122 +++++++++--------- apps/hip-3-pusher/src/pusher/publisher.py | 23 ++-- apps/hip-3-pusher/uv.lock | 2 +- 9 files changed, 153 insertions(+), 118 deletions(-) diff --git a/apps/hip-3-pusher/config/config.toml b/apps/hip-3-pusher/config/config.toml index 20bd0e554b..f9b9f45bf4 100644 --- a/apps/hip-3-pusher/config/config.toml +++ b/apps/hip-3-pusher/config/config.toml @@ -3,8 +3,8 @@ prometheus_port = 9090 [hyperliquid] hyperliquid_ws_urls = ["wss://api.hyperliquid-testnet.xyz/ws"] -market_name = "" -market_symbol = "BTC" +market_name = "pyth" +asset_context_symbols = ["BTC"] use_testnet = false oracle_pusher_key_path = "/path/to/oracle_pusher_key.txt" publish_interval = 3.0 @@ -13,7 +13,6 @@ enable_publish = false [multisig] enable_multisig = false -multisig_address = "0x0000000000000000000000000000000000000005" [kms] enable_kms = false @@ -22,14 +21,23 @@ aws_kms_key_id_path = "/path/to/aws_kms_key_id.txt" [lazer] lazer_urls = ["wss://pyth-lazer-0.dourolabs.app/v1/stream", "wss://pyth-lazer-1.dourolabs.app/v1/stream"] lazer_api_key = "lazer_api_key" -base_feed_id = 1 # BTC -base_feed_exponent = -8 -quote_feed_id = 8 # USDT -quote_feed_exponent = -8 +feed_ids = [1, 8] # BTC, USDT [hermes] hermes_urls = ["wss://hermes.pyth.network/ws"] -base_feed_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43" # BTC -base_feed_exponent = -8 -quote_feed_id = "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b" # USDT -quote_feed_exponent = -8 +feed_ids = [ + "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", # BTC + "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b" # USDT +] + +[price.oracle] +BTC = [ + { source_type = "single", source = { source_name = "hl_oracle", source_id = "BTC" } }, + { source_type = "pair", base_source = { source_name = "lazer", source_id = 1, exponent = -8 }, quote_source = { source_name = "lazer", source_id = 8, exponent = -8 } }, + { source_type = "pair", base_source = { source_name = "hermes", source_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", exponent = -8 }, quote_source = { source_name = "hermes", source_id = "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", exponent = -8 } }, +] + +[price.external] +BTC = [{ source_type = "single", source = { source_name = "hl_mark", source_id = "BTC" } }] +PYTH = [{ source_type = "constant", value = "0.10" }] +FOGO = [{ source_type = "constant", value = "0.01" }] diff --git a/apps/hip-3-pusher/pyproject.toml b/apps/hip-3-pusher/pyproject.toml index 715b53c917..f26077139d 100644 --- a/apps/hip-3-pusher/pyproject.toml +++ b/apps/hip-3-pusher/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "hip-3-pusher" -version = "0.1.7" +version = "0.2.0" description = "Hyperliquid HIP-3 market oracle pusher" readme = "README.md" requires-python = "==3.13.*" diff --git a/apps/hip-3-pusher/src/pusher/config.py b/apps/hip-3-pusher/src/pusher/config.py index 9c7a2e600c..101f03a5b0 100644 --- a/apps/hip-3-pusher/src/pusher/config.py +++ b/apps/hip-3-pusher/src/pusher/config.py @@ -1,6 +1,7 @@ from hyperliquid.utils.constants import MAINNET_API_URL, TESTNET_API_URL from pydantic import BaseModel, FilePath, model_validator from typing import Optional +from typing import Literal, Union STALE_TIMEOUT_SECONDS = 5 @@ -18,25 +19,19 @@ class MultisigConfig(BaseModel): class LazerConfig(BaseModel): lazer_urls: list[str] lazer_api_key: str - base_feed_id: int - base_feed_exponent: int - quote_feed_id: int - quote_feed_exponent: int + feed_ids: list[int] class HermesConfig(BaseModel): hermes_urls: list[str] - base_feed_id: str - base_feed_exponent: int - quote_feed_id: str - quote_feed_exponent: int + feed_ids: list[str] class HyperliquidConfig(BaseModel): hyperliquid_ws_urls: list[str] push_urls: Optional[list[str]] = None market_name: str - market_symbol: str + asset_context_symbols: list[str] use_testnet: bool oracle_pusher_key_path: Optional[FilePath] = None publish_interval: float @@ -50,6 +45,37 @@ def set_default_urls(self): return self +class PriceSource(BaseModel): + source_name: str + source_id: str | int + exponent: Optional[int] = None + + +class SingleSourceConfig(BaseModel): + source_type: Literal["single"] + source: PriceSource + + +class PairSourceConfig(BaseModel): + source_type: Literal["pair"] + base_source: PriceSource + quote_source: PriceSource + + +class ConstantSourceConfig(BaseModel): + source_type: Literal["constant"] + value: str + + +PriceSourceConfig = Union[SingleSourceConfig, PairSourceConfig, ConstantSourceConfig] + + +class PriceConfig(BaseModel): + oracle: dict[str, list[PriceSourceConfig]] = {} + mark: dict[str, list[PriceSourceConfig]] = {} + external: dict[str, list[PriceSourceConfig]] = {} + + class Config(BaseModel): stale_price_threshold_seconds: int prometheus_port: int @@ -58,3 +84,4 @@ class Config(BaseModel): lazer: LazerConfig hermes: HermesConfig multisig: MultisigConfig + price: PriceConfig diff --git a/apps/hip-3-pusher/src/pusher/hermes_listener.py b/apps/hip-3-pusher/src/pusher/hermes_listener.py index cac0768e1c..7262dc6138 100644 --- a/apps/hip-3-pusher/src/pusher/hermes_listener.py +++ b/apps/hip-3-pusher/src/pusher/hermes_listener.py @@ -11,19 +11,20 @@ class HermesListener: + SOURCE_NAME = "hermes" + """ Subscribe to Hermes price updates for needed feeds. """ def __init__(self, config: Config, price_state: PriceState): self.hermes_urls = config.hermes.hermes_urls - self.base_feed_id = config.hermes.base_feed_id - self.quote_feed_id = config.hermes.quote_feed_id + self.feed_ids = config.hermes.feed_ids self.price_state = price_state def get_subscribe_request(self): return { "type": "subscribe", - "ids": [self.base_feed_id, self.quote_feed_id], + "ids": self.feed_ids, "verbose": False, "binary": True, "allow_out_of_order": False, @@ -81,9 +82,6 @@ def parse_hermes_message(self, data): publish_time = price_object["publish_time"] logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time) now = time.time() - if id == self.base_feed_id: - self.price_state.hermes_base_price = PriceUpdate(price, now) - if id == self.quote_feed_id: - self.price_state.hermes_quote_price = PriceUpdate(price, now) + self.price_state.state[self.SOURCE_NAME][id] = PriceUpdate(price, now) except Exception as e: logger.error("parse_hermes_message error: {}", e) diff --git a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py index d0544c9168..3ef7b1809a 100644 --- a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py +++ b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py @@ -16,13 +16,16 @@ class HyperliquidListener: + ORACLE_SOURCE_NAME = "hl_oracle" + MARK_SOURCE_NAME = "hl_mark" + """ Subscribe to any relevant Hyperliquid websocket streams See https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/websocket """ def __init__(self, config: Config, price_state: PriceState): self.hyperliquid_ws_urls = config.hyperliquid.hyperliquid_ws_urls - self.market_symbol = config.hyperliquid.market_symbol + self.asset_context_symbols = config.hyperliquid.asset_context_symbols self.price_state = price_state def get_subscribe_request(self, asset): @@ -44,9 +47,10 @@ async def subscribe_single(self, url): async def subscribe_single_inner(self, url): async with websockets.connect(url) as ws: - subscribe_request = self.get_subscribe_request(self.market_symbol) - await ws.send(json.dumps(subscribe_request)) - logger.info("Sent subscribe request to {}", url) + for symbol in self.asset_context_symbols: + subscribe_request = self.get_subscribe_request(symbol) + await ws.send(json.dumps(subscribe_request)) + logger.info("Sent subscribe request for symbol: {} to {}", symbol, url) # listen for updates while True: @@ -76,10 +80,10 @@ async def subscribe_single_inner(self, url): def parse_hyperliquid_ws_message(self, message): try: ctx = message["data"]["ctx"] + symbol = message["data"]["coin"] now = time.time() - self.price_state.hl_oracle_price = PriceUpdate(ctx["oraclePx"], now) - self.price_state.hl_mark_price = PriceUpdate(ctx["markPx"], now) - logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.hl_oracle_price, - self.price_state.hl_mark_price) + self.price_state.state[self.ORACLE_SOURCE_NAME][symbol] = PriceUpdate(ctx["oraclePx"], now) + self.price_state.state[self.MARK_SOURCE_NAME][symbol] = PriceUpdate(ctx["markPx"], now) + logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", ctx["oraclePx"], ctx["markPx"]) except Exception as e: logger.error("parse_hyperliquid_ws_message error: message: {} e: {}", message, e) diff --git a/apps/hip-3-pusher/src/pusher/lazer_listener.py b/apps/hip-3-pusher/src/pusher/lazer_listener.py index 4bca7b2e45..37ca4eedf8 100644 --- a/apps/hip-3-pusher/src/pusher/lazer_listener.py +++ b/apps/hip-3-pusher/src/pusher/lazer_listener.py @@ -11,21 +11,22 @@ class LazerListener: + SOURCE_NAME = "lazer" + """ Subscribe to Lazer price updates for needed feeds. """ def __init__(self, config: Config, price_state: PriceState): self.lazer_urls = config.lazer.lazer_urls self.api_key = config.lazer.lazer_api_key - self.base_feed_id = config.lazer.base_feed_id - self.quote_feed_id = config.lazer.quote_feed_id + self.feed_ids = config.lazer.feed_ids self.price_state = price_state def get_subscribe_request(self, subscription_id: int): return { "type": "subscribe", "subscriptionId": subscription_id, - "priceFeedIds": [self.base_feed_id, self.quote_feed_id], + "priceFeedIds": self.feed_ids, "properties": ["price"], "formats": [], "deliveryFormat": "json", @@ -54,7 +55,7 @@ async def subscribe_single_inner(self, router_url): subscribe_request = self.get_subscribe_request(1) await ws.send(json.dumps(subscribe_request)) - logger.info("Sent Lazer subscribe request to {}", router_url) + logger.info("Sent Lazer subscribe request to {} feed_ids {}", router_url, self.feed_ids) # listen for updates while True: @@ -89,9 +90,7 @@ def parse_lazer_message(self, data): price = feed_update.get("price", None) if feed_id is None or price is None: continue - if feed_id == self.base_feed_id: - self.price_state.lazer_base_price = PriceUpdate(price, now) - if feed_id == self.quote_feed_id: - self.price_state.lazer_quote_price = PriceUpdate(price, now) + else: + self.price_state.state[self.SOURCE_NAME][feed_id] = PriceUpdate(price, now) except Exception as e: logger.error("parse_lazer_message error: {}", e) diff --git a/apps/hip-3-pusher/src/pusher/price_state.py b/apps/hip-3-pusher/src/pusher/price_state.py index 764ff42015..4d64d0efc5 100644 --- a/apps/hip-3-pusher/src/pusher/price_state.py +++ b/apps/hip-3-pusher/src/pusher/price_state.py @@ -1,18 +1,17 @@ +from dataclasses import dataclass from loguru import logger import time -from pusher.config import Config +from pusher.config import Config, PriceSource, PriceSourceConfig, ConstantSourceConfig, SingleSourceConfig, \ + PairSourceConfig DEFAULT_STALE_PRICE_THRESHOLD_SECONDS = 5 +@dataclass class PriceUpdate: - def __init__(self, price, timestamp): - self.price = price - self.timestamp = timestamp - - def __str__(self): - return f"PriceUpdate(price={self.price}, timestamp={self.timestamp})" + price: float | str + timestamp: float def time_diff(self, now): return now - self.timestamp @@ -24,60 +23,65 @@ class PriceState: """ def __init__(self, config: Config): self.stale_price_threshold_seconds = config.stale_price_threshold_seconds - - self.hl_oracle_price: PriceUpdate | None = None - self.hl_mark_price: PriceUpdate | None = None - - self.lazer_base_price: PriceUpdate | None = None - self.lazer_base_exponent = config.lazer.base_feed_exponent - self.lazer_quote_price: PriceUpdate | None = None - self.lazer_quote_exponent = config.lazer.quote_feed_exponent - - self.hermes_base_price: PriceUpdate | None = None - self.hermes_base_exponent = config.hermes.base_feed_exponent - self.hermes_quote_price: PriceUpdate | None = None - self.hermes_quote_exponent = config.hermes.quote_feed_exponent - - def get_current_oracle_price(self): - now = time.time() - if self.hl_oracle_price: - time_diff = self.hl_oracle_price.time_diff(now) - if time_diff < self.stale_price_threshold_seconds: - return self.hl_oracle_price.price - else: - logger.error("Hyperliquid oracle price stale by {} seconds", time_diff) - else: - logger.error("Hyperliquid oracle price not received yet") - - # fall back to Lazer - if self.lazer_base_price and self.lazer_quote_price: - max_time_diff = max(self.lazer_base_price.time_diff(now), self.lazer_quote_price.time_diff(now)) - if max_time_diff < self.stale_price_threshold_seconds: - return self.get_lazer_price() - else: - logger.error("Lazer price stale by {} seconds", max_time_diff) + self.price_config = config.price + self.state = { + "hl_oracle": {symbol: None for symbol in config.hyperliquid.asset_context_symbols}, + "hl_mark": {symbol: None for symbol in config.hyperliquid.asset_context_symbols}, + "lazer": {feed_id: None for feed_id in config.lazer.feed_ids}, + "hermes": {feed_id: None for feed_id in config.hermes.feed_ids} + } + + def get_all_prices(self, market_name): + logger.debug("state: {}", self.state) + return ( + self.get_prices(self.price_config.oracle, market_name), + self.get_prices(self.price_config.mark, market_name), + self.get_prices(self.price_config.external, market_name) + ) + + def get_prices(self, symbol_configs: dict[str, list[PriceSourceConfig]], market_name: str): + pxs = {} + for symbol in symbol_configs: + for source_config in symbol_configs[symbol]: + # find first valid price in the waterfall + px = self.get_price(source_config) + if px is not None: + pxs[f"{market_name}:{symbol}"] = px + break + return pxs + + def get_price(self, price_source_config: PriceSourceConfig): + if isinstance(price_source_config, ConstantSourceConfig): + return price_source_config.value + elif isinstance(price_source_config, SingleSourceConfig): + return self.get_price_from_single_source(price_source_config.source) + elif isinstance(price_source_config, PairSourceConfig): + return self.get_price_from_pair_source(price_source_config.base_source, price_source_config.quote_source) else: - logger.error("Lazer base/quote prices not received yet") + raise ValueError - # fall back to Hermes - if self.hermes_base_price and self.hermes_quote_price: - max_time_diff = max(self.hermes_base_price.time_diff(now), self.hermes_quote_price.time_diff(now)) - if max_time_diff < self.stale_price_threshold_seconds: - return self.get_hermes_price() - else: - logger.error("Hermes price stale by {} seconds", max_time_diff) + def get_price_from_single_source(self, source: PriceSource): + now = time.time() + update: PriceUpdate | None = self.state.get(source.source_name, {}).get(source.source_id) + if update is None: + logger.warning("source {} id {} is missing", source.source_name, source.source_id) + return None + time_diff = update.time_diff(now) + if time_diff >= self.stale_price_threshold_seconds: + logger.warning("source {} id {} is stale by {} seconds", source.source_name, source.source_id, time_diff) + return None + # valid price found + if source.exponent is not None: + return float(update.price) / (10.0 ** -source.exponent) else: - logger.error("Hermes base/quote prices not received yet") - - logger.error("All prices missing or stale!") - return None + return update.price - def get_hermes_price(self): - base_price = float(self.hermes_base_price.price) / (10.0 ** -self.hermes_base_exponent) - quote_price = float(self.hermes_quote_price.price) / (10.0 ** -self.hermes_quote_exponent) - return str(round(base_price / quote_price, 2)) + def get_price_from_pair_source(self, base_source: PriceSource, quote_source: PriceSource): + base_price = self.get_price_from_single_source(base_source) + if base_price is None: + return None + quote_price = self.get_price_from_single_source(quote_source) + if quote_price is None: + return None - def get_lazer_price(self): - base_price = float(self.lazer_base_price.price) / (10.0 ** -self.lazer_base_exponent) - quote_price = float(self.lazer_quote_price.price) / (10.0 ** -self.lazer_quote_exponent) - return str(round(base_price / quote_price, 2)) + return str(round(float(base_price) / float(quote_price), 2)) diff --git a/apps/hip-3-pusher/src/pusher/publisher.py b/apps/hip-3-pusher/src/pusher/publisher.py index a1bb600d0d..36324e1540 100644 --- a/apps/hip-3-pusher/src/pusher/publisher.py +++ b/apps/hip-3-pusher/src/pusher/publisher.py @@ -53,7 +53,6 @@ def __init__(self, config: Config, price_state: PriceState, metrics: Metrics): self.multisig_address = config.multisig.multisig_address self.market_name = config.hyperliquid.market_name - self.market_symbol = config.hyperliquid.market_symbol self.enable_publish = config.hyperliquid.enable_publish self.price_state = price_state @@ -70,20 +69,16 @@ async def run(self): logger.exception("Publisher.publish() exception: {}", repr(e)) def publish(self): - oracle_pxs = {} - oracle_px = self.price_state.get_current_oracle_price() - if not oracle_px: - logger.error("No valid oracle price available") - self.metrics.no_oracle_price_counter.add(1, self.metrics_labels) - return - else: - logger.debug("Current oracle price: {}", oracle_px) - oracle_pxs[f"{self.market_name}:{self.market_symbol}"] = oracle_px + oracle_pxs, mark_pxs, external_perp_pxs = self.price_state.get_all_prices(self.market_name) + logger.debug("oracle_pxs: {}", oracle_pxs) + logger.debug("mark_pxs: {}", mark_pxs) + logger.debug("external_perp_pxs: {}", external_perp_pxs) - mark_pxs = [] - external_perp_pxs = {} - if self.price_state.hl_mark_price: - external_perp_pxs[f"{self.market_name}:{self.market_symbol}"] = self.price_state.hl_mark_price.price + if not oracle_pxs: + logger.error("No valid oracle prices available") + self.metrics.no_oracle_price_counter.add(1, self.metrics_labels) + # markPxs is a list of dicts of length 0-2, and so can be empty + mark_pxs = [mark_pxs] if mark_pxs else [] # TODO: "Each update can change oraclePx and markPx by at most 1%." # TODO: "The markPx cannot be updated such that open interest would be 10x the open interest cap." diff --git a/apps/hip-3-pusher/uv.lock b/apps/hip-3-pusher/uv.lock index e00ae1b177..541648027e 100644 --- a/apps/hip-3-pusher/uv.lock +++ b/apps/hip-3-pusher/uv.lock @@ -329,7 +329,7 @@ wheels = [ [[package]] name = "hip-3-pusher" -version = "0.1.7" +version = "0.2.0" source = { editable = "." } dependencies = [ { name = "boto3" }, From 9d5023ad823a71cfb9b6ad707c71ee42e6a376de Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Fri, 31 Oct 2025 23:09:46 -0500 Subject: [PATCH 2/5] Update unit tests --- apps/hip-3-pusher/tests/test_price_state.py | 82 +++++++++++++-------- 1 file changed, 51 insertions(+), 31 deletions(-) diff --git a/apps/hip-3-pusher/tests/test_price_state.py b/apps/hip-3-pusher/tests/test_price_state.py index 21ac0ad034..6427262156 100644 --- a/apps/hip-3-pusher/tests/test_price_state.py +++ b/apps/hip-3-pusher/tests/test_price_state.py @@ -1,18 +1,37 @@ import time -from pusher.config import Config, LazerConfig, HermesConfig +from pusher.config import Config, LazerConfig, HermesConfig, PriceConfig, PriceSource, SingleSourceConfig, \ + PairSourceConfig, HyperliquidConfig from pusher.price_state import PriceState, PriceUpdate +DEX = "pyth" +SYMBOL = "BTC" + def get_config(): config: Config = Config.model_construct() config.stale_price_threshold_seconds = 5 + config.hyperliquid = HyperliquidConfig.model_construct() + config.hyperliquid.asset_context_symbols = [SYMBOL] config.lazer = LazerConfig.model_construct() - config.lazer.base_feed_exponent = -8 - config.lazer.quote_feed_exponent = -8 + config.lazer.feed_ids = [1, 8] config.hermes = HermesConfig.model_construct() - config.hermes.base_feed_exponent = -8 - config.hermes.quote_feed_exponent = -8 + config.hermes.feed_ids = ["e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b"] + config.price = PriceConfig( + oracle={ + SYMBOL: [ + SingleSourceConfig(source_type="single", source=PriceSource(source_name="hl_oracle", source_id="BTC", exponent=None)), + PairSourceConfig(source_type="pair", + base_source=PriceSource(source_name="lazer", source_id=1, exponent=-8), + quote_source=PriceSource(source_name="lazer", source_id=8, exponent=-8)), + PairSourceConfig(source_type="pair", + base_source=PriceSource(source_name="hermes", source_id="e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", exponent=-8), + quote_source=PriceSource(source_name="hermes", source_id="2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", exponent=-8)) + ] + }, + mark={}, + external={} + ) return config @@ -20,25 +39,22 @@ def test_good_hl_price(): config = get_config() price_state = PriceState(config) now = time.time() - price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds / 2.0) - - oracle_px = price_state.get_current_oracle_price() - assert oracle_px == price_state.hl_oracle_price.price - assert oracle_px == "110000.0" + price_state.state["hl_oracle"][SYMBOL] = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds / 2.0) + oracle_px, _, _ = price_state.get_all_prices(DEX) + assert oracle_px == {f"{DEX}:{SYMBOL}": "110000.0"} def test_fallback_lazer(): config = get_config() price_state = PriceState(config) now = time.time() - price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.lazer_base_price = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds / 2.0) - price_state.lazer_quote_price = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0) + price_state.state["hl_oracle"][SYMBOL] = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0) + price_state.state["lazer"][1] = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds / 2.0) + price_state.state["lazer"][8] = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0) - oracle_px = price_state.get_current_oracle_price() - assert oracle_px == price_state.get_lazer_price() - assert oracle_px == "111616.16" + oracle_px, _, _ = price_state.get_all_prices(DEX) + assert oracle_px == {f"{DEX}:{SYMBOL}": "111616.16"} @@ -46,25 +62,29 @@ def test_fallback_hermes(): config = get_config() price_state = PriceState(config) now = time.time() - price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.lazer_base_price = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.lazer_quote_price = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0) - price_state.hermes_base_price = PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds / 2.0) - price_state.hermes_quote_price = PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds / 2.0) + price_state.state["hl_oracle"][SYMBOL] = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0) + price_state.state["lazer"][1] = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0) + price_state.state["lazer"][8] = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0) + price_state.state["hermes"]["e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43"] = \ + PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds / 2.0) + price_state.state["hermes"]["2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b"] = \ + PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds / 2.0) - oracle_px = price_state.get_current_oracle_price() - assert oracle_px == price_state.get_hermes_price() - assert oracle_px == "113265.31" + oracle_px, _, _ = price_state.get_all_prices(DEX) + assert oracle_px == {f"{DEX}:{SYMBOL}": "113265.31"} def test_all_fail(): config = get_config() price_state = PriceState(config) now = time.time() - price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.lazer_base_price = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.lazer_quote_price = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.hermes_base_price = PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.hermes_quote_price = PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds - 1.0) - assert price_state.get_current_oracle_price() is None + price_state.state["hl_oracle"][SYMBOL] = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0) + price_state.state["lazer"][1] = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0) + price_state.state["lazer"][8] = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds - 1.0) + price_state.state["hermes"]["e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43"] = \ + PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds - 1.0) + price_state.state["hermes"]["2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b"] = \ + PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds - 1.0) + + oracle_px, _, _ = price_state.get_all_prices(DEX) + assert oracle_px == {} From 5386800cefb5de06f61338b118d8a21fe3e1fb1d Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Wed, 5 Nov 2025 16:33:17 -0600 Subject: [PATCH 3/5] Added SEDA poller --- .../{config.toml => config.sample.toml} | 0 apps/hip-3-pusher/pyproject.toml | 1 + apps/hip-3-pusher/src/pusher/config.py | 19 ++++- apps/hip-3-pusher/src/pusher/main.py | 3 + apps/hip-3-pusher/src/pusher/price_state.py | 3 +- apps/hip-3-pusher/src/pusher/seda_listener.py | 70 +++++++++++++++++++ apps/hip-3-pusher/uv.lock | 61 ++++++++++++++++ 7 files changed, 154 insertions(+), 3 deletions(-) rename apps/hip-3-pusher/config/{config.toml => config.sample.toml} (100%) create mode 100644 apps/hip-3-pusher/src/pusher/seda_listener.py diff --git a/apps/hip-3-pusher/config/config.toml b/apps/hip-3-pusher/config/config.sample.toml similarity index 100% rename from apps/hip-3-pusher/config/config.toml rename to apps/hip-3-pusher/config/config.sample.toml diff --git a/apps/hip-3-pusher/pyproject.toml b/apps/hip-3-pusher/pyproject.toml index f26077139d..1802a322f0 100644 --- a/apps/hip-3-pusher/pyproject.toml +++ b/apps/hip-3-pusher/pyproject.toml @@ -7,6 +7,7 @@ requires-python = "==3.13.*" dependencies = [ "boto3~=1.40.38", "cryptography~=46.0.1", + "httpx~=0.28.1", "hyperliquid-python-sdk~=0.19.0", "loguru~=0.7.3", "opentelemetry-exporter-prometheus~=0.58b0", diff --git a/apps/hip-3-pusher/src/pusher/config.py b/apps/hip-3-pusher/src/pusher/config.py index 101f03a5b0..c57ae88fcf 100644 --- a/apps/hip-3-pusher/src/pusher/config.py +++ b/apps/hip-3-pusher/src/pusher/config.py @@ -1,7 +1,7 @@ from hyperliquid.utils.constants import MAINNET_API_URL, TESTNET_API_URL from pydantic import BaseModel, FilePath, model_validator from typing import Optional -from typing import Literal, Union +from typing import Literal STALE_TIMEOUT_SECONDS = 5 @@ -45,6 +45,20 @@ def set_default_urls(self): return self +class SedaFeedConfig(BaseModel): + exec_program_id: str + exec_inputs: str + + +class SedaConfig(BaseModel): + url: str + api_key_path: Optional[FilePath] = None + poll_interval: float + poll_failure_interval: float + poll_timeout: float + feeds: dict[str, SedaFeedConfig] + + class PriceSource(BaseModel): source_name: str source_id: str | int @@ -67,7 +81,7 @@ class ConstantSourceConfig(BaseModel): value: str -PriceSourceConfig = Union[SingleSourceConfig, PairSourceConfig, ConstantSourceConfig] +PriceSourceConfig = SingleSourceConfig | PairSourceConfig | ConstantSourceConfig class PriceConfig(BaseModel): @@ -83,5 +97,6 @@ class Config(BaseModel): kms: KMSConfig lazer: LazerConfig hermes: HermesConfig + seda: SedaConfig multisig: MultisigConfig price: PriceConfig diff --git a/apps/hip-3-pusher/src/pusher/main.py b/apps/hip-3-pusher/src/pusher/main.py index 558053c8c1..8eeb2e3e2a 100644 --- a/apps/hip-3-pusher/src/pusher/main.py +++ b/apps/hip-3-pusher/src/pusher/main.py @@ -9,6 +9,7 @@ from pusher.hyperliquid_listener import HyperliquidListener from pusher.lazer_listener import LazerListener from pusher.hermes_listener import HermesListener +from pusher.seda_listener import SedaListener from pusher.price_state import PriceState from pusher.publisher import Publisher from pusher.metrics import Metrics @@ -48,12 +49,14 @@ async def main(): hyperliquid_listener = HyperliquidListener(config, price_state) lazer_listener = LazerListener(config, price_state) hermes_listener = HermesListener(config, price_state) + seda_listener = SedaListener(config, price_state) await asyncio.gather( publisher.run(), hyperliquid_listener.subscribe_all(), lazer_listener.subscribe_all(), hermes_listener.subscribe_all(), + seda_listener.run(), ) logger.info("Exiting hip-3-pusher..") diff --git a/apps/hip-3-pusher/src/pusher/price_state.py b/apps/hip-3-pusher/src/pusher/price_state.py index 4d64d0efc5..e92e5e358b 100644 --- a/apps/hip-3-pusher/src/pusher/price_state.py +++ b/apps/hip-3-pusher/src/pusher/price_state.py @@ -28,7 +28,8 @@ def __init__(self, config: Config): "hl_oracle": {symbol: None for symbol in config.hyperliquid.asset_context_symbols}, "hl_mark": {symbol: None for symbol in config.hyperliquid.asset_context_symbols}, "lazer": {feed_id: None for feed_id in config.lazer.feed_ids}, - "hermes": {feed_id: None for feed_id in config.hermes.feed_ids} + "hermes": {feed_id: None for feed_id in config.hermes.feed_ids}, + "seda": {feed_name: None for feed_name in config.seda.feeds}, } def get_all_prices(self, market_name): diff --git a/apps/hip-3-pusher/src/pusher/seda_listener.py b/apps/hip-3-pusher/src/pusher/seda_listener.py new file mode 100644 index 0000000000..932740508e --- /dev/null +++ b/apps/hip-3-pusher/src/pusher/seda_listener.py @@ -0,0 +1,70 @@ +import asyncio +import datetime +import httpx +import json +from loguru import logger +from pathlib import Path + +from pusher.config import Config, SedaFeedConfig +from pusher.price_state import PriceState, PriceUpdate + + +class SedaListener: + SOURCE_NAME = "seda" + + """ + Subscribe to SEDA price updates for needed feeds. + """ + def __init__(self, config: Config, price_state: PriceState): + self.url = config.seda.url + self.api_key = Path(config.seda.api_key_path).read_text().strip() + self.feeds = config.seda.feeds + self.poll_interval = config.seda.poll_interval + self.poll_failure_interval = config.seda.poll_failure_interval + self.poll_timeout = config.seda.poll_timeout + self.price_state = price_state + + async def run(self): + await asyncio.gather(*[self._run_single(feed_name, self.feeds[feed_name]) for feed_name in self.feeds]) + + async def _run_single(self, feed_name: str, feed_config: SedaFeedConfig) -> None: + headers = { + "Accept": "application/json", + "Authorization": f"Bearer {self.api_key}", + } + params = { + "execProgramId": feed_config.exec_program_id, + "execInputs": feed_config.exec_inputs, + "encoding": "utf8", + } + + async with httpx.AsyncClient(timeout=self.poll_timeout) as client: + while True: + result = await self._poll(client, headers, params) + if result["ok"]: + self._parse_seda_message(feed_name, result["json"]) + else: + logger.error("SEDA poll request for {} failed: {}", feed_name, result) + + await asyncio.sleep(self.poll_interval if result.get("ok") else self.poll_failure_interval) + + async def _poll(self, + client: httpx.AsyncClient, + headers: dict[str, str], + params: dict[str, str], + ) -> dict: + try: + resp = await client.get(self.url, headers=headers, params=params) + resp.raise_for_status() + return {"ok": True, "status": resp.status_code, "json": resp.json()} + except httpx.HTTPStatusError as e: + return {"ok": False, "status": e.response.status_code, "error": str(e)} + except Exception as e: + return {"ok": False, "status": None, "error": str(e)} + + def _parse_seda_message(self, feed_name, message): + result = json.loads(message["data"]["result"]) + price = result["composite_rate"] + timestamp = datetime.datetime.fromisoformat(result["timestamp"]).timestamp() + logger.debug("Parsed SEDA update for feed: {} price: {} timestamp: {}", feed_name, price, timestamp) + self.price_state.state[self.SOURCE_NAME][feed_name] = PriceUpdate(price, timestamp) diff --git a/apps/hip-3-pusher/uv.lock b/apps/hip-3-pusher/uv.lock index 541648027e..3a4cdc3219 100644 --- a/apps/hip-3-pusher/uv.lock +++ b/apps/hip-3-pusher/uv.lock @@ -11,6 +11,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/78/b6/6307fbef88d9b5ee7421e68d78a9f162e0da4900bc5f5793f6d3d0e34fb8/annotated_types-0.7.0-py3-none-any.whl", hash = "sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53", size = 13643, upload-time = "2024-05-20T21:33:24.1Z" }, ] +[[package]] +name = "anyio" +version = "4.11.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "idna" }, + { name = "sniffio" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c6/78/7d432127c41b50bccba979505f272c16cbcadcc33645d5fa3a738110ae75/anyio-4.11.0.tar.gz", hash = "sha256:82a8d0b81e318cc5ce71a5f1f8b5c4e63619620b63141ef8c995fa0db95a57c4", size = 219094, upload-time = "2025-09-23T09:19:12.58Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/15/b3/9b1a8074496371342ec1e796a96f99c82c945a339cd81a8e73de28b4cf9e/anyio-4.11.0-py3-none-any.whl", hash = "sha256:0287e96f4d26d4149305414d4e3bc32f0dcd0862365a4bddea19d7a1ec38c4fc", size = 109097, upload-time = "2025-09-23T09:19:10.601Z" }, +] + [[package]] name = "bitarray" version = "3.7.1" @@ -318,6 +331,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/bf/4d/257cdc01ada430b8e84b9f2385c2553f33218f5b47da9adf0a616308d4b7/eth_utils-5.3.1-py3-none-any.whl", hash = "sha256:1f5476d8f29588d25b8ae4987e1ffdfae6d4c09026e476c4aad13b32dda3ead0", size = 102529, upload-time = "2025-08-27T16:37:15.449Z" }, ] +[[package]] +name = "h11" +version = "0.16.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/01/ee/02a2c011bdab74c6fb3c75474d40b3052059d95df7e73351460c8588d963/h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1", size = 101250, upload-time = "2025-04-24T03:35:25.427Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" }, +] + [[package]] name = "hexbytes" version = "1.3.1" @@ -334,6 +356,7 @@ source = { editable = "." } dependencies = [ { name = "boto3" }, { name = "cryptography" }, + { name = "httpx" }, { name = "hyperliquid-python-sdk" }, { name = "loguru" }, { name = "opentelemetry-exporter-prometheus" }, @@ -352,6 +375,7 @@ dev = [ requires-dist = [ { name = "boto3", specifier = "~=1.40.38" }, { name = "cryptography", specifier = "~=46.0.1" }, + { name = "httpx", specifier = "~=0.28.1" }, { name = "hyperliquid-python-sdk", specifier = "~=0.19.0" }, { name = "loguru", specifier = "~=0.7.3" }, { name = "opentelemetry-exporter-prometheus", specifier = "~=0.58b0" }, @@ -364,6 +388,34 @@ requires-dist = [ [package.metadata.requires-dev] dev = [{ name = "pytest", specifier = "~=8.4.2" }] +[[package]] +name = "httpcore" +version = "1.0.9" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "h11" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484, upload-time = "2025-04-24T22:06:22.219Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784, upload-time = "2025-04-24T22:06:20.566Z" }, +] + +[[package]] +name = "httpx" +version = "0.28.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "certifi" }, + { name = "httpcore" }, + { name = "idna" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406, upload-time = "2024-12-06T15:37:23.222Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" }, +] + [[package]] name = "hyperliquid-python-sdk" version = "0.19.0" @@ -732,6 +784,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" }, ] +[[package]] +name = "sniffio" +version = "1.3.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a2/87/a6771e1546d97e7e041b6ae58d80074f81b7d5121207425c964ddf5cfdbd/sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc", size = 20372, upload-time = "2024-02-25T23:20:04.057Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e9/44/75a9c9421471a6c4805dbf2356f7c181a29c1879239abab1ea2cc8f38b40/sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2", size = 10235, upload-time = "2024-02-25T23:20:01.196Z" }, +] + [[package]] name = "tenacity" version = "9.1.2" From f1bd5656bfb382098ed75adb21bbe48ec3092ce9 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Thu, 6 Nov 2025 10:51:06 -0600 Subject: [PATCH 4/5] Refactor price source state objects --- .../src/pusher/hermes_listener.py | 10 ++--- .../src/pusher/hyperliquid_listener.py | 14 +++--- .../hip-3-pusher/src/pusher/lazer_listener.py | 10 ++--- apps/hip-3-pusher/src/pusher/main.py | 8 ++-- apps/hip-3-pusher/src/pusher/price_state.py | 45 +++++++++++++++---- apps/hip-3-pusher/src/pusher/seda_listener.py | 8 ++-- 6 files changed, 59 insertions(+), 36 deletions(-) diff --git a/apps/hip-3-pusher/src/pusher/hermes_listener.py b/apps/hip-3-pusher/src/pusher/hermes_listener.py index 7262dc6138..ab25a35c50 100644 --- a/apps/hip-3-pusher/src/pusher/hermes_listener.py +++ b/apps/hip-3-pusher/src/pusher/hermes_listener.py @@ -7,19 +7,17 @@ from pusher.config import Config, STALE_TIMEOUT_SECONDS from pusher.exception import StaleConnectionError -from pusher.price_state import PriceState, PriceUpdate +from pusher.price_state import PriceSourceState, PriceUpdate class HermesListener: - SOURCE_NAME = "hermes" - """ Subscribe to Hermes price updates for needed feeds. """ - def __init__(self, config: Config, price_state: PriceState): + def __init__(self, config: Config, hermes_state: PriceSourceState): self.hermes_urls = config.hermes.hermes_urls self.feed_ids = config.hermes.feed_ids - self.price_state = price_state + self.hermes_state = hermes_state def get_subscribe_request(self): return { @@ -82,6 +80,6 @@ def parse_hermes_message(self, data): publish_time = price_object["publish_time"] logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time) now = time.time() - self.price_state.state[self.SOURCE_NAME][id] = PriceUpdate(price, now) + self.hermes_state.put(id, PriceUpdate(price, now)) except Exception as e: logger.error("parse_hermes_message error: {}", e) diff --git a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py index 3ef7b1809a..916da2e177 100644 --- a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py +++ b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py @@ -7,7 +7,7 @@ from pusher.config import Config, STALE_TIMEOUT_SECONDS from pusher.exception import StaleConnectionError -from pusher.price_state import PriceState, PriceUpdate +from pusher.price_state import PriceSourceState, PriceUpdate # This will be in config, but note here. # Other RPC providers exist but so far we've seen their support is incomplete. @@ -16,17 +16,15 @@ class HyperliquidListener: - ORACLE_SOURCE_NAME = "hl_oracle" - MARK_SOURCE_NAME = "hl_mark" - """ Subscribe to any relevant Hyperliquid websocket streams See https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/websocket """ - def __init__(self, config: Config, price_state: PriceState): + def __init__(self, config: Config, hl_oracle_state: PriceSourceState, hl_mark_state: PriceSourceState): self.hyperliquid_ws_urls = config.hyperliquid.hyperliquid_ws_urls self.asset_context_symbols = config.hyperliquid.asset_context_symbols - self.price_state = price_state + self.hl_oracle_state = hl_oracle_state + self.hl_mark_state = hl_mark_state def get_subscribe_request(self, asset): return { @@ -82,8 +80,8 @@ def parse_hyperliquid_ws_message(self, message): ctx = message["data"]["ctx"] symbol = message["data"]["coin"] now = time.time() - self.price_state.state[self.ORACLE_SOURCE_NAME][symbol] = PriceUpdate(ctx["oraclePx"], now) - self.price_state.state[self.MARK_SOURCE_NAME][symbol] = PriceUpdate(ctx["markPx"], now) + self.hl_oracle_state.put(symbol, PriceUpdate(ctx["oraclePx"], now)) + self.hl_mark_state.put(symbol, PriceUpdate(ctx["markPx"], now)) logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", ctx["oraclePx"], ctx["markPx"]) except Exception as e: logger.error("parse_hyperliquid_ws_message error: message: {} e: {}", message, e) diff --git a/apps/hip-3-pusher/src/pusher/lazer_listener.py b/apps/hip-3-pusher/src/pusher/lazer_listener.py index 37ca4eedf8..74e0623270 100644 --- a/apps/hip-3-pusher/src/pusher/lazer_listener.py +++ b/apps/hip-3-pusher/src/pusher/lazer_listener.py @@ -7,20 +7,18 @@ from pusher.config import Config, STALE_TIMEOUT_SECONDS from pusher.exception import StaleConnectionError -from pusher.price_state import PriceState, PriceUpdate +from pusher.price_state import PriceSourceState, PriceUpdate class LazerListener: - SOURCE_NAME = "lazer" - """ Subscribe to Lazer price updates for needed feeds. """ - def __init__(self, config: Config, price_state: PriceState): + def __init__(self, config: Config, lazer_state: PriceSourceState): self.lazer_urls = config.lazer.lazer_urls self.api_key = config.lazer.lazer_api_key self.feed_ids = config.lazer.feed_ids - self.price_state = price_state + self.lazer_state = lazer_state def get_subscribe_request(self, subscription_id: int): return { @@ -91,6 +89,6 @@ def parse_lazer_message(self, data): if feed_id is None or price is None: continue else: - self.price_state.state[self.SOURCE_NAME][feed_id] = PriceUpdate(price, now) + self.lazer_state.put(feed_id, PriceUpdate(price, now)) except Exception as e: logger.error("parse_lazer_message error: {}", e) diff --git a/apps/hip-3-pusher/src/pusher/main.py b/apps/hip-3-pusher/src/pusher/main.py index 8eeb2e3e2a..006d4b7edb 100644 --- a/apps/hip-3-pusher/src/pusher/main.py +++ b/apps/hip-3-pusher/src/pusher/main.py @@ -46,10 +46,10 @@ async def main(): metrics = Metrics(config) publisher = Publisher(config, price_state, metrics) - hyperliquid_listener = HyperliquidListener(config, price_state) - lazer_listener = LazerListener(config, price_state) - hermes_listener = HermesListener(config, price_state) - seda_listener = SedaListener(config, price_state) + hyperliquid_listener = HyperliquidListener(config, price_state.hl_oracle_state, price_state.hl_mark_state) + lazer_listener = LazerListener(config, price_state.lazer_state) + hermes_listener = HermesListener(config, price_state.hermes_state) + seda_listener = SedaListener(config, price_state.seda_state) await asyncio.gather( publisher.run(), diff --git a/apps/hip-3-pusher/src/pusher/price_state.py b/apps/hip-3-pusher/src/pusher/price_state.py index e92e5e358b..33ee51e4c5 100644 --- a/apps/hip-3-pusher/src/pusher/price_state.py +++ b/apps/hip-3-pusher/src/pusher/price_state.py @@ -17,23 +17,52 @@ def time_diff(self, now): return now - self.timestamp +class PriceSourceState: + def __init__(self, name: str): + self.name = name + self.state: dict[str, PriceUpdate] = {} + + def __repr__(self): + return f"PriceSourceState(name={self.name} state={self.state})" + + def get(self, symbol: str) -> PriceUpdate | None: + return self.state.get(symbol) + + def put(self, symbol: str, value: PriceUpdate): + self.state[symbol] = value + + class PriceState: + HL_ORACLE = "hl_oracle" + HL_MARK = "hl_mark" + LAZER = "lazer" + HERMES = "hermes" + SEDA = "seda" + """ Maintain latest prices seen across listeners and publisher. """ def __init__(self, config: Config): self.stale_price_threshold_seconds = config.stale_price_threshold_seconds self.price_config = config.price - self.state = { - "hl_oracle": {symbol: None for symbol in config.hyperliquid.asset_context_symbols}, - "hl_mark": {symbol: None for symbol in config.hyperliquid.asset_context_symbols}, - "lazer": {feed_id: None for feed_id in config.lazer.feed_ids}, - "hermes": {feed_id: None for feed_id in config.hermes.feed_ids}, - "seda": {feed_name: None for feed_name in config.seda.feeds}, + + self.hl_oracle_state = PriceSourceState(self.HL_ORACLE) + self.hl_mark_state = PriceSourceState(self.HL_MARK) + self.lazer_state = PriceSourceState(self.LAZER) + self.hermes_state = PriceSourceState(self.HERMES) + self.seda_state = PriceSourceState(self.SEDA) + + self.all_states = { + self.HL_ORACLE: self.hl_oracle_state, + self.HL_MARK: self.hl_mark_state, + self.LAZER: self.lazer_state, + self.HERMES: self.hermes_state, + self.SEDA: self.seda_state, } def get_all_prices(self, market_name): - logger.debug("state: {}", self.state) + logger.debug("get_all_prices state: {}", self.all_states) + return ( self.get_prices(self.price_config.oracle, market_name), self.get_prices(self.price_config.mark, market_name), @@ -63,7 +92,7 @@ def get_price(self, price_source_config: PriceSourceConfig): def get_price_from_single_source(self, source: PriceSource): now = time.time() - update: PriceUpdate | None = self.state.get(source.source_name, {}).get(source.source_id) + update: PriceUpdate | None = self.all_states.get(source.source_name, {}).get(source.source_id) if update is None: logger.warning("source {} id {} is missing", source.source_name, source.source_id) return None diff --git a/apps/hip-3-pusher/src/pusher/seda_listener.py b/apps/hip-3-pusher/src/pusher/seda_listener.py index 932740508e..5bfdfbebcf 100644 --- a/apps/hip-3-pusher/src/pusher/seda_listener.py +++ b/apps/hip-3-pusher/src/pusher/seda_listener.py @@ -6,7 +6,7 @@ from pathlib import Path from pusher.config import Config, SedaFeedConfig -from pusher.price_state import PriceState, PriceUpdate +from pusher.price_state import PriceSourceState, PriceUpdate class SedaListener: @@ -15,14 +15,14 @@ class SedaListener: """ Subscribe to SEDA price updates for needed feeds. """ - def __init__(self, config: Config, price_state: PriceState): + def __init__(self, config: Config, seda_state: PriceSourceState): self.url = config.seda.url self.api_key = Path(config.seda.api_key_path).read_text().strip() self.feeds = config.seda.feeds self.poll_interval = config.seda.poll_interval self.poll_failure_interval = config.seda.poll_failure_interval self.poll_timeout = config.seda.poll_timeout - self.price_state = price_state + self.seda_state = seda_state async def run(self): await asyncio.gather(*[self._run_single(feed_name, self.feeds[feed_name]) for feed_name in self.feeds]) @@ -67,4 +67,4 @@ def _parse_seda_message(self, feed_name, message): price = result["composite_rate"] timestamp = datetime.datetime.fromisoformat(result["timestamp"]).timestamp() logger.debug("Parsed SEDA update for feed: {} price: {} timestamp: {}", feed_name, price, timestamp) - self.price_state.state[self.SOURCE_NAME][feed_name] = PriceUpdate(price, timestamp) + self.seda_state.put(feed_name, PriceUpdate(price, timestamp)) From 54df5b2459b8b8052aaecac747e87bb4ad14b3fe Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Thu, 6 Nov 2025 11:48:34 -0600 Subject: [PATCH 5/5] update unit tests to new state --- apps/hip-3-pusher/tests/test_price_state.py | 48 +++++++++++++-------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/apps/hip-3-pusher/tests/test_price_state.py b/apps/hip-3-pusher/tests/test_price_state.py index 6427262156..52e023119a 100644 --- a/apps/hip-3-pusher/tests/test_price_state.py +++ b/apps/hip-3-pusher/tests/test_price_state.py @@ -36,22 +36,28 @@ def get_config(): def test_good_hl_price(): + """ + Pass through fresh HL oracle price. + """ config = get_config() price_state = PriceState(config) now = time.time() - price_state.state["hl_oracle"][SYMBOL] = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds / 2.0) + price_state.hl_oracle_state.put(SYMBOL, PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds / 2.0)) oracle_px, _, _ = price_state.get_all_prices(DEX) assert oracle_px == {f"{DEX}:{SYMBOL}": "110000.0"} def test_fallback_lazer(): + """ + HL oracle price is stale, so fall back to fresh Lazer price. + """ config = get_config() price_state = PriceState(config) now = time.time() - price_state.state["hl_oracle"][SYMBOL] = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.state["lazer"][1] = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds / 2.0) - price_state.state["lazer"][8] = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0) + price_state.hl_oracle_state.put(SYMBOL, PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0)) + price_state.lazer_state.put(1, PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds / 2.0)) + price_state.lazer_state.put(8, PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0)) oracle_px, _, _ = price_state.get_all_prices(DEX) assert oracle_px == {f"{DEX}:{SYMBOL}": "111616.16"} @@ -59,32 +65,38 @@ def test_fallback_lazer(): def test_fallback_hermes(): + """ + HL oracle price and Lazer prices are stale, so fall back to fresh Hermes price. + """ config = get_config() price_state = PriceState(config) now = time.time() - price_state.state["hl_oracle"][SYMBOL] = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.state["lazer"][1] = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.state["lazer"][8] = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0) - price_state.state["hermes"]["e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43"] = \ - PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds / 2.0) - price_state.state["hermes"]["2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b"] = \ - PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds / 2.0) + price_state.hl_oracle_state.put(SYMBOL, PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0)) + price_state.lazer_state.put(1, PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0)) + price_state.lazer_state.put(8, PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0)) + price_state.hermes_state.put("e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", + PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds / 2.0)) + price_state.hermes_state.put("2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", + PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds / 2.0)) oracle_px, _, _ = price_state.get_all_prices(DEX) assert oracle_px == {f"{DEX}:{SYMBOL}": "113265.31"} def test_all_fail(): + """ + All prices are stale, so return nothing. + """ config = get_config() price_state = PriceState(config) now = time.time() - price_state.state["hl_oracle"][SYMBOL] = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.state["lazer"][1] = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.state["lazer"][8] = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.state["hermes"]["e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43"] = \ - PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.state["hermes"]["2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b"] = \ - PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds - 1.0) + price_state.hl_oracle_state.put(SYMBOL, PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0)) + price_state.lazer_state.put(1, PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0)) + price_state.lazer_state.put(8, PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds - 1.0)) + price_state.hermes_state.put("e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", + PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds - 1.0)) + price_state.hermes_state.put("2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", + PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds - 1.0)) oracle_px, _, _ = price_state.get_all_prices(DEX) assert oracle_px == {}