Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions third_party/pyth/pyth_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import threading
import time

PYTH_TEST_SYMBOL_COUNT = int(os.environ.get("PYTH_TEST_SYMBOL_COUNT", "9"))

class PythAccEndpoint(BaseHTTPRequestHandler):
"""
A dumb endpoint to respond with a JSON containing Pyth symbol and mapping addresses
Expand Down Expand Up @@ -60,6 +58,9 @@ def accounts_endpoint():


def add_symbol(num: int):
"""
NOTE: Updates HTTP_ENDPOINT_DATA
"""
symbol_name = f"Test symbol {num}"
# Add a product
prod_pubkey = pyth_admin_run_or_die(
Expand Down Expand Up @@ -96,6 +97,8 @@ def add_symbol(num: int):

sys.stdout.flush()

print(f"New symbol: {num}")

return num

# Fund the publisher
Expand All @@ -122,14 +125,14 @@ def add_symbol(num: int):
"--keypair", PYTH_PUBLISHER_KEYPAIR
], capture_output=True).stdout.strip()

with ThreadPoolExecutor(max_workers=10) as executor:
with ThreadPoolExecutor(max_workers=PYTH_TEST_SYMBOL_COUNT) as executor:
add_symbol_futures = {executor.submit(add_symbol, sym_id) for sym_id in range(PYTH_TEST_SYMBOL_COUNT)}

for future in as_completed(add_symbol_futures):
print(f"Completed {future.result()}")

print(
f"Mock updates ready to roll. Updating every {str(PYTH_PUBLISHER_INTERVAL)} seconds")
f"Mock updates ready to roll. Updating every {str(PYTH_PUBLISHER_INTERVAL_SECS)} seconds")

# Spin off the readiness probe endpoint into a separate thread
readiness_thread = threading.Thread(target=readiness, daemon=True)
Expand All @@ -140,12 +143,26 @@ def add_symbol(num: int):
readiness_thread.start()
http_service.start()

while True:
for sym in HTTP_ENDPOINT_DATA["symbols"]:
publisher_random_update(sym["price"])
next_new_symbol_id = PYTH_TEST_SYMBOL_COUNT
last_new_sym_added_at = time.monotonic()

with ThreadPoolExecutor() as executor: # Used for async adding of products and prices
while True:
for sym in HTTP_ENDPOINT_DATA["symbols"]:
publisher_random_update(sym["price"])

# Add a symbol if new symbol interval configured
if PYTH_NEW_SYMBOL_INTERVAL_SECS > 0:
# Do it if enough time passed
now = time.monotonic()
if (now - last_new_sym_added_at) >= PYTH_NEW_SYMBOL_INTERVAL_SECS:
executor.submit(add_symbol, next_new_symbol_id) # Returns immediately, runs in background
last_sym_added_at = now
next_new_symbol_id += 1

time.sleep(PYTH_PUBLISHER_INTERVAL_SECS)
sys.stdout.flush()

time.sleep(PYTH_PUBLISHER_INTERVAL)
sys.stdout.flush()

readiness_thread.join()
http_service.join()
13 changes: 12 additions & 1 deletion third_party/pyth/pyth_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,18 @@
PYTH_PUBLISHER_KEYPAIR = os.environ.get(
"PYTH_PUBLISHER_KEYPAIR", f"{PYTH_KEY_STORE}/publish_key_pair.json"
)
PYTH_PUBLISHER_INTERVAL = float(os.environ.get("PYTH_PUBLISHER_INTERVAL", "5"))
# How long to sleep between mock Pyth price updates
PYTH_PUBLISHER_INTERVAL_SECS = float(os.environ.get("PYTH_PUBLISHER_INTERVAL_SECS", "5"))
PYTH_TEST_SYMBOL_COUNT = int(os.environ.get("PYTH_TEST_SYMBOL_COUNT", "9"))

# If above 0, adds a new test symbol periodically, waiting at least
# the given number of seconds in between
#
# NOTE: the new symbols are added in the HTTP endpoint used by the
# p2w-attest service in Tilt. You may need to wait to see p2w-attest
# pick up brand new symbols
PYTH_NEW_SYMBOL_INTERVAL_SECS = int(os.environ.get("PYTH_NEW_SYMBOL_INTERVAL_SECS", "120"))

PYTH_MAPPING_KEYPAIR = os.environ.get(
"PYTH_MAPPING_KEYPAIR", f"{PYTH_KEY_STORE}/mapping_key_pair.json"
)
Expand Down