Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions apps/hip-3-pusher/config/config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
stale_price_threshold_seconds = 5

[hyperliquid]
market_name = ""
market_symbol = "BTC"
Expand All @@ -12,12 +14,16 @@ key_path = "/path/to/kms_key.txt"
aws_region_name = "ap-northeast-1"

[lazer]
router_urls = ["wss://pyth-lazer-0.dourolabs.app/v1/stream", "wss://pyth-lazer-1.dourolabs.app/v1/stream"]
lazer_urls = ["wss://pyth-lazer-0.dourolabs.app/v1/stream", "wss://pyth-lazer-1.dourolabs.app/v1/stream"]
api_key = "lazer_api_key"
base_feed_id = 1 # BTC
base_feed_exponent = -8
quote_feed_id = 8 # USDT
quote_feed_exponent = -8

[hermes]
urls = ["wss://hermes.pyth.network/ws"]
base_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43" # BTC
quote_id = "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b" # USDT
hermes_urls = ["wss://hermes.pyth.network/ws"]
base_feed_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43" # BTC
base_feed_exponent = -8
quote_feed_id = "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b" # USDT
quote_feed_exponent = -8
Comment on lines +25 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll eventually need to split out the price config into its own file and source the metadata (like exponent) from the data sources themselves.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, will need to scale this into a general set of prices and calcs.

4 changes: 2 additions & 2 deletions apps/hip-3-pusher/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "hip-3-pusher"
version = "0.1.0"
description = "Add your description here"
version = "0.1.1"
description = "Hyperliquid HIP-3 market oracle pusher"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
Expand Down
17 changes: 9 additions & 8 deletions apps/hip-3-pusher/src/hermes_listener.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
from loguru import logger
import time
import websockets

from price_state import PriceState
Expand All @@ -9,26 +10,25 @@
class HermesListener:
"""
Subscribe to Hermes price updates for needed feeds.
TODO: Will need to handle specific conversions/factors and exponents.
"""
def __init__(self, config, price_state: PriceState):
self.urls = config["hermes"]["urls"]
self.base_id = config["hermes"]["base_id"]
self.quote_id = config["hermes"]["quote_id"]
self.hermes_urls = config["hermes"]["hermes_urls"]
self.base_feed_id = config["hermes"]["base_feed_id"]
self.quote_feed_id = config["hermes"]["quote_feed_id"]
self.price_state = price_state

def get_subscribe_request(self):
return {
"type": "subscribe",
"ids": [self.base_id, self.quote_id],
"ids": [self.base_feed_id, self.quote_feed_id],
"verbose": False,
"binary": True,
"allow_out_of_order": False,
"ignore_invalid_price_ids": False,
}

async def subscribe_all(self):
await asyncio.gather(*(self.subscribe_single(url) for url in self.urls))
await asyncio.gather(*(self.subscribe_single(url) for url in self.hermes_urls))

async def subscribe_single(self, url):
while True:
Expand Down Expand Up @@ -71,9 +71,10 @@ def parse_hermes_message(self, data):
expo = price_object["expo"]
publish_time = price_object["publish_time"]
logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time)
if id == self.base_id:
if id == self.base_feed_id:
self.price_state.hermes_base_price = price
if id == self.quote_id:
if id == self.quote_feed_id:
self.price_state.hermes_quote_price = price
self.price_state.latest_hermes_timestamp = time.time()
except Exception as e:
logger.error("parse_hermes_message error: {}", e)
8 changes: 5 additions & 3 deletions apps/hip-3-pusher/src/hyperliquid_listener.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from loguru import logger
import time

from hyperliquid.info import Info
from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL
Expand Down Expand Up @@ -28,6 +29,7 @@ def on_activeAssetCtx(self, message):
:return: None
"""
ctx = message["data"]["ctx"]
self.price_state.latest_oracle_price = ctx["oraclePx"]
self.price_state.latest_mark_price = ctx["markPx"]
logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.latest_oracle_price, self.price_state.latest_mark_price)
self.price_state.hl_oracle_price = ctx["oraclePx"]
self.price_state.hl_mark_price = ctx["markPx"]
logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.hl_oracle_price, self.price_state.hl_mark_price)
self.price_state.latest_hl_timestamp = time.time()
Comment on lines +32 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would recommend storing the timestamp together with the price in one type to enforce they are updated together. something like

class PriceUpdate:
    asset_id: str
    price: float
    timestamp: int

11 changes: 6 additions & 5 deletions apps/hip-3-pusher/src/lazer_listener.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
from loguru import logger
import time
import websockets

from price_state import PriceState
Expand All @@ -9,11 +10,10 @@
class LazerListener:
"""
Subscribe to Lazer price updates for needed feeds.
TODO: Will need to handle specific conversions/factors and exponents.
"""
def __init__(self, config, price_state: PriceState):
self.router_urls = config["lazer"]["router_urls"]
self.api_key = config["lazer"]["api_key"]
self.lazer_urls = config["lazer"]["lazer_urls"]
self.api_key = config["lazer"]["lazer_api_key"]
self.base_feed_id = config["lazer"]["base_feed_id"]
self.quote_feed_id = config["lazer"]["quote_feed_id"]
self.price_state = price_state
Expand All @@ -32,7 +32,7 @@ def get_subscribe_request(self, subscription_id: int):
}

async def subscribe_all(self):
await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.router_urls))
await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.lazer_urls))

async def subscribe_single(self, router_url):
while True:
Expand All @@ -52,7 +52,7 @@ async def subscribe_single_inner(self, router_url):
subscribe_request = self.get_subscribe_request(1)

await ws.send(json.dumps(subscribe_request))
logger.info("Sent Lazer subscribe request to {}", self.router_urls[0])
logger.info("Sent Lazer subscribe request to {}", self.lazer_urls[0])

# listen for updates
async for message in ws:
Expand Down Expand Up @@ -83,5 +83,6 @@ def parse_lazer_message(self, data):
self.price_state.lazer_base_price = price
if feed_id == self.quote_feed_id:
self.price_state.lazer_quote_price = price
self.price_state.latest_lazer_timestamp = time.time()
except Exception as e:
logger.error("parse_lazer_message error: {}", e)
23 changes: 18 additions & 5 deletions apps/hip-3-pusher/src/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import argparse
import asyncio
from loguru import logger
import os
import sys
import toml

from hyperliquid_listener import HyperliquidListener
Expand All @@ -23,25 +25,36 @@ def load_config():
return config


def init_logging():
logger.remove()
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# serialize=True if we want json logging
logger.add(sys.stderr, level=log_level, serialize=False)


async def main():
logger.info("Starting hip3-agent...")
init_logging()
logger.info("Starting hip-3-pusher...")
config = load_config()

price_state = PriceState()
price_state = PriceState(config)
publisher = Publisher(config, price_state)
hyperliquid_listener = HyperliquidListener(config, price_state)
lazer_listener = LazerListener(config, price_state)
hermes_listener = HermesListener(config, price_state)

# TODO: Probably pull this out of the sdk.
# TODO: Probably pull this out of the sdk so we can handle reconnects.
hyperliquid_listener.subscribe()
await asyncio.gather(
publisher.run(),
lazer_listener.subscribe_all(),
hermes_listener.subscribe_all(),
)
logger.info("Exiting hip3-agent...")
logger.info("Exiting hip-3-pusher..")


if __name__ == "__main__":
asyncio.run(main())
try:
asyncio.run(main())
except Exception as e:
logger.exception("Uncaught exception, exiting: {}", e)
68 changes: 65 additions & 3 deletions apps/hip-3-pusher/src/price_state.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,73 @@
from loguru import logger
import time

DEFAULT_STALE_PRICE_THRESHOLD_SECONDS = 5


class PriceState:
"""
Maintain latest prices seen across listeners and publisher.
"""
def __init__(self):
self.latest_oracle_price = None
self.latest_mark_price = None
def __init__(self, config):
self.stale_price_threshold_seconds = config.get("stale_price_threshold_seconds", DEFAULT_STALE_PRICE_THRESHOLD_SECONDS)
now = time.time()

self.hl_oracle_price = None
self.hl_mark_price = None
self.latest_hl_timestamp = now

self.lazer_base_price = None
self.lazer_base_exponent = config["lazer"]["base_feed_exponent"]
self.lazer_quote_price = None
self.lazer_quote_exponent = config["lazer"]["quote_feed_exponent"]
self.latest_lazer_timestamp = now

self.hermes_base_price = None
self.hermes_base_exponent = config["hermes"]["base_feed_exponent"]
self.hermes_quote_price = None
self.hermes_quote_exponent = config["hermes"]["quote_feed_exponent"]
self.latest_hermes_timestamp = now

def get_current_oracle_price(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth thinking through how this evolves as we need to support pushing multiple prices. Also, we might want to have this orchestration in a different module and split up the state to be managed by the individual data sources instead of a single mono-state.

now = time.time()
if self.hl_oracle_price:
time_diff = now - self.latest_hl_timestamp
if time_diff < self.stale_price_threshold_seconds:
return self.hl_oracle_price
else:
logger.error("Hyperliquid oracle price stale by {} seconds", time_diff)
else:
logger.error("Hyperliquid oracle price not received yet")

# fall back to Hermes
if self.hermes_base_price and self.hermes_quote_price:
time_diff = now - self.latest_hermes_timestamp
if time_diff < self.stale_price_threshold_seconds:
return self.get_hermes_price()
else:
logger.error("Hermes price stale by {} seconds", time_diff)
else:
logger.error("Hermes base/quote prices not received yet")

# fall back to Lazer
if self.lazer_base_price and self.lazer_quote_price:
time_diff = now - self.latest_lazer_timestamp
if time_diff < self.stale_price_threshold_seconds:
return self.get_lazer_price()
else:
logger.error("Lazer price stale by {} seconds", time_diff)
else:
logger.error("Lazer base/quote prices not received yet")

logger.error("All prices missing or stale!")
return None
Comment on lines +42 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we fall back to Hermes before Lazer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think I just mixed up the order you guys suggested for whatever reason. Dunno if currently we trust one more than the other.


def get_hermes_price(self):
base_price = float(self.hermes_base_price) / (10.0 ** -self.hermes_base_exponent)
quote_price = float(self.hermes_quote_price) / (10.0 ** -self.hermes_quote_exponent)
return str(round(base_price / quote_price, 2))

def get_lazer_price(self):
base_price = float(self.lazer_base_price) / (10.0 ** -self.lazer_base_exponent)
quote_price = float(self.lazer_quote_price) / (10.0 ** -self.lazer_quote_exponent)
return str(round(base_price / quote_price, 2))
16 changes: 11 additions & 5 deletions apps/hip-3-pusher/src/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,21 @@ def __init__(self, config: dict, price_state: PriceState):
async def run(self):
while True:
await asyncio.sleep(self.publish_interval)
logger.debug("publish price_state: {}", vars(self.price_state))

oracle_pxs = {}
oracle_px = self.price_state.get_current_oracle_price()
if not oracle_px:
logger.error("No valid oracle price available!")
return
else:
logger.debug("Current oracle price: {}", oracle_px)
oracle_pxs[self.market_symbol] = oracle_px

mark_pxs = []
#if self.price_state.hl_mark_price:
# mark_pxs.append({self.market_symbol: self.price_state.hl_mark_price})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not publish the mark prices?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They told us not to for now. Note that this will also include the actual HIP-3's market data in the calculation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are just mark prices for the HyperCore perps which are useful data points but separate.


external_perp_pxs = {}
if self.price_state.latest_oracle_price:
oracle_pxs[self.market_symbol] = self.price_state.latest_oracle_price
if self.price_state.latest_mark_price:
mark_pxs.append({self.market_symbol: self.price_state.latest_mark_price})

if self.enable_publish:
if self.enable_kms:
Expand Down
2 changes: 1 addition & 1 deletion apps/hip-3-pusher/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.