diff --git a/apps/hip-3-pusher/config/config.toml b/apps/hip-3-pusher/config/config.toml index 15e751c212..dbbd5c3fae 100644 --- a/apps/hip-3-pusher/config/config.toml +++ b/apps/hip-3-pusher/config/config.toml @@ -1,3 +1,5 @@ +stale_price_threshold_seconds = 5 + [hyperliquid] market_name = "" market_symbol = "BTC" @@ -12,12 +14,16 @@ key_path = "/path/to/kms_key.txt" aws_region_name = "ap-northeast-1" [lazer] -router_urls = ["wss://pyth-lazer-0.dourolabs.app/v1/stream", "wss://pyth-lazer-1.dourolabs.app/v1/stream"] +lazer_urls = ["wss://pyth-lazer-0.dourolabs.app/v1/stream", "wss://pyth-lazer-1.dourolabs.app/v1/stream"] api_key = "lazer_api_key" base_feed_id = 1 # BTC +base_feed_exponent = -8 quote_feed_id = 8 # USDT +quote_feed_exponent = -8 [hermes] -urls = ["wss://hermes.pyth.network/ws"] -base_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43" # BTC -quote_id = "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b" # USDT +hermes_urls = ["wss://hermes.pyth.network/ws"] +base_feed_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43" # BTC +base_feed_exponent = -8 +quote_feed_id = "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b" # USDT +quote_feed_exponent = -8 diff --git a/apps/hip-3-pusher/pyproject.toml b/apps/hip-3-pusher/pyproject.toml index f8049120f0..1c557ead0f 100644 --- a/apps/hip-3-pusher/pyproject.toml +++ b/apps/hip-3-pusher/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "hip-3-pusher" -version = "0.1.0" -description = "Add your description here" +version = "0.1.1" +description = "Hyperliquid HIP-3 market oracle pusher" readme = "README.md" requires-python = ">=3.13" dependencies = [ diff --git a/apps/hip-3-pusher/src/hermes_listener.py b/apps/hip-3-pusher/src/hermes_listener.py index 16634b0fb5..02adb2e1aa 100644 --- a/apps/hip-3-pusher/src/hermes_listener.py +++ b/apps/hip-3-pusher/src/hermes_listener.py @@ -1,6 +1,7 @@ import asyncio import json from loguru import logger +import time import websockets from price_state import PriceState @@ -9,18 +10,17 @@ class HermesListener: """ Subscribe to Hermes price updates for needed feeds. - TODO: Will need to handle specific conversions/factors and exponents. """ def __init__(self, config, price_state: PriceState): - self.urls = config["hermes"]["urls"] - self.base_id = config["hermes"]["base_id"] - self.quote_id = config["hermes"]["quote_id"] + self.hermes_urls = config["hermes"]["hermes_urls"] + self.base_feed_id = config["hermes"]["base_feed_id"] + self.quote_feed_id = config["hermes"]["quote_feed_id"] self.price_state = price_state def get_subscribe_request(self): return { "type": "subscribe", - "ids": [self.base_id, self.quote_id], + "ids": [self.base_feed_id, self.quote_feed_id], "verbose": False, "binary": True, "allow_out_of_order": False, @@ -28,7 +28,7 @@ def get_subscribe_request(self): } async def subscribe_all(self): - await asyncio.gather(*(self.subscribe_single(url) for url in self.urls)) + await asyncio.gather(*(self.subscribe_single(url) for url in self.hermes_urls)) async def subscribe_single(self, url): while True: @@ -71,9 +71,10 @@ def parse_hermes_message(self, data): expo = price_object["expo"] publish_time = price_object["publish_time"] logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time) - if id == self.base_id: + if id == self.base_feed_id: self.price_state.hermes_base_price = price - if id == self.quote_id: + if id == self.quote_feed_id: self.price_state.hermes_quote_price = price + self.price_state.latest_hermes_timestamp = time.time() except Exception as e: logger.error("parse_hermes_message error: {}", e) diff --git a/apps/hip-3-pusher/src/hyperliquid_listener.py b/apps/hip-3-pusher/src/hyperliquid_listener.py index dbc30ebd98..aed7e4ea74 100644 --- a/apps/hip-3-pusher/src/hyperliquid_listener.py +++ b/apps/hip-3-pusher/src/hyperliquid_listener.py @@ -1,4 +1,5 @@ from loguru import logger +import time from hyperliquid.info import Info from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL @@ -28,6 +29,7 @@ def on_activeAssetCtx(self, message): :return: None """ ctx = message["data"]["ctx"] - self.price_state.latest_oracle_price = ctx["oraclePx"] - self.price_state.latest_mark_price = ctx["markPx"] - logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.latest_oracle_price, self.price_state.latest_mark_price) + self.price_state.hl_oracle_price = ctx["oraclePx"] + self.price_state.hl_mark_price = ctx["markPx"] + logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.hl_oracle_price, self.price_state.hl_mark_price) + self.price_state.latest_hl_timestamp = time.time() diff --git a/apps/hip-3-pusher/src/lazer_listener.py b/apps/hip-3-pusher/src/lazer_listener.py index 759fd172eb..58631c512d 100644 --- a/apps/hip-3-pusher/src/lazer_listener.py +++ b/apps/hip-3-pusher/src/lazer_listener.py @@ -1,6 +1,7 @@ import asyncio import json from loguru import logger +import time import websockets from price_state import PriceState @@ -9,11 +10,10 @@ class LazerListener: """ Subscribe to Lazer price updates for needed feeds. - TODO: Will need to handle specific conversions/factors and exponents. """ def __init__(self, config, price_state: PriceState): - self.router_urls = config["lazer"]["router_urls"] - self.api_key = config["lazer"]["api_key"] + self.lazer_urls = config["lazer"]["lazer_urls"] + self.api_key = config["lazer"]["lazer_api_key"] self.base_feed_id = config["lazer"]["base_feed_id"] self.quote_feed_id = config["lazer"]["quote_feed_id"] self.price_state = price_state @@ -32,7 +32,7 @@ def get_subscribe_request(self, subscription_id: int): } async def subscribe_all(self): - await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.router_urls)) + await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.lazer_urls)) async def subscribe_single(self, router_url): while True: @@ -52,7 +52,7 @@ async def subscribe_single_inner(self, router_url): subscribe_request = self.get_subscribe_request(1) await ws.send(json.dumps(subscribe_request)) - logger.info("Sent Lazer subscribe request to {}", self.router_urls[0]) + logger.info("Sent Lazer subscribe request to {}", self.lazer_urls[0]) # listen for updates async for message in ws: @@ -83,5 +83,6 @@ def parse_lazer_message(self, data): self.price_state.lazer_base_price = price if feed_id == self.quote_feed_id: self.price_state.lazer_quote_price = price + self.price_state.latest_lazer_timestamp = time.time() except Exception as e: logger.error("parse_lazer_message error: {}", e) diff --git a/apps/hip-3-pusher/src/main.py b/apps/hip-3-pusher/src/main.py index 9762e362cd..5b4f703bde 100644 --- a/apps/hip-3-pusher/src/main.py +++ b/apps/hip-3-pusher/src/main.py @@ -1,6 +1,8 @@ import argparse import asyncio from loguru import logger +import os +import sys import toml from hyperliquid_listener import HyperliquidListener @@ -23,25 +25,36 @@ def load_config(): return config +def init_logging(): + logger.remove() + log_level = os.getenv("LOG_LEVEL", "INFO").upper() + # serialize=True if we want json logging + logger.add(sys.stderr, level=log_level, serialize=False) + + async def main(): - logger.info("Starting hip3-agent...") + init_logging() + logger.info("Starting hip-3-pusher...") config = load_config() - price_state = PriceState() + price_state = PriceState(config) publisher = Publisher(config, price_state) hyperliquid_listener = HyperliquidListener(config, price_state) lazer_listener = LazerListener(config, price_state) hermes_listener = HermesListener(config, price_state) - # TODO: Probably pull this out of the sdk. + # TODO: Probably pull this out of the sdk so we can handle reconnects. hyperliquid_listener.subscribe() await asyncio.gather( publisher.run(), lazer_listener.subscribe_all(), hermes_listener.subscribe_all(), ) - logger.info("Exiting hip3-agent...") + logger.info("Exiting hip-3-pusher..") if __name__ == "__main__": - asyncio.run(main()) + try: + asyncio.run(main()) + except Exception as e: + logger.exception("Uncaught exception, exiting: {}", e) diff --git a/apps/hip-3-pusher/src/price_state.py b/apps/hip-3-pusher/src/price_state.py index 038a865a63..f7d3a13650 100644 --- a/apps/hip-3-pusher/src/price_state.py +++ b/apps/hip-3-pusher/src/price_state.py @@ -1,11 +1,73 @@ +from loguru import logger +import time + +DEFAULT_STALE_PRICE_THRESHOLD_SECONDS = 5 + + class PriceState: """ Maintain latest prices seen across listeners and publisher. """ - def __init__(self): - self.latest_oracle_price = None - self.latest_mark_price = None + def __init__(self, config): + self.stale_price_threshold_seconds = config.get("stale_price_threshold_seconds", DEFAULT_STALE_PRICE_THRESHOLD_SECONDS) + now = time.time() + + self.hl_oracle_price = None + self.hl_mark_price = None + self.latest_hl_timestamp = now + self.lazer_base_price = None + self.lazer_base_exponent = config["lazer"]["base_feed_exponent"] self.lazer_quote_price = None + self.lazer_quote_exponent = config["lazer"]["quote_feed_exponent"] + self.latest_lazer_timestamp = now + self.hermes_base_price = None + self.hermes_base_exponent = config["hermes"]["base_feed_exponent"] self.hermes_quote_price = None + self.hermes_quote_exponent = config["hermes"]["quote_feed_exponent"] + self.latest_hermes_timestamp = now + + def get_current_oracle_price(self): + now = time.time() + if self.hl_oracle_price: + time_diff = now - self.latest_hl_timestamp + if time_diff < self.stale_price_threshold_seconds: + return self.hl_oracle_price + else: + logger.error("Hyperliquid oracle price stale by {} seconds", time_diff) + else: + logger.error("Hyperliquid oracle price not received yet") + + # fall back to Hermes + if self.hermes_base_price and self.hermes_quote_price: + time_diff = now - self.latest_hermes_timestamp + if time_diff < self.stale_price_threshold_seconds: + return self.get_hermes_price() + else: + logger.error("Hermes price stale by {} seconds", time_diff) + else: + logger.error("Hermes base/quote prices not received yet") + + # fall back to Lazer + if self.lazer_base_price and self.lazer_quote_price: + time_diff = now - self.latest_lazer_timestamp + if time_diff < self.stale_price_threshold_seconds: + return self.get_lazer_price() + else: + logger.error("Lazer price stale by {} seconds", time_diff) + else: + logger.error("Lazer base/quote prices not received yet") + + logger.error("All prices missing or stale!") + return None + + def get_hermes_price(self): + base_price = float(self.hermes_base_price) / (10.0 ** -self.hermes_base_exponent) + quote_price = float(self.hermes_quote_price) / (10.0 ** -self.hermes_quote_exponent) + return str(round(base_price / quote_price, 2)) + + def get_lazer_price(self): + base_price = float(self.lazer_base_price) / (10.0 ** -self.lazer_base_exponent) + quote_price = float(self.lazer_quote_price) / (10.0 ** -self.lazer_quote_exponent) + return str(round(base_price / quote_price, 2)) diff --git a/apps/hip-3-pusher/src/publisher.py b/apps/hip-3-pusher/src/publisher.py index 94c801c14a..53d2911fe5 100644 --- a/apps/hip-3-pusher/src/publisher.py +++ b/apps/hip-3-pusher/src/publisher.py @@ -47,15 +47,21 @@ def __init__(self, config: dict, price_state: PriceState): async def run(self): while True: await asyncio.sleep(self.publish_interval) - logger.debug("publish price_state: {}", vars(self.price_state)) oracle_pxs = {} + oracle_px = self.price_state.get_current_oracle_price() + if not oracle_px: + logger.error("No valid oracle price available!") + return + else: + logger.debug("Current oracle price: {}", oracle_px) + oracle_pxs[self.market_symbol] = oracle_px + mark_pxs = [] + #if self.price_state.hl_mark_price: + # mark_pxs.append({self.market_symbol: self.price_state.hl_mark_price}) + external_perp_pxs = {} - if self.price_state.latest_oracle_price: - oracle_pxs[self.market_symbol] = self.price_state.latest_oracle_price - if self.price_state.latest_mark_price: - mark_pxs.append({self.market_symbol: self.price_state.latest_mark_price}) if self.enable_publish: if self.enable_kms: diff --git a/apps/hip-3-pusher/uv.lock b/apps/hip-3-pusher/uv.lock index f724f4fbec..8a7bee3ba6 100644 --- a/apps/hip-3-pusher/uv.lock +++ b/apps/hip-3-pusher/uv.lock @@ -365,7 +365,7 @@ wheels = [ [[package]] name = "hip-3-pusher" -version = "0.1.0" +version = "0.1.1" source = { virtual = "." } dependencies = [ { name = "asn1crypto" },