Skip to content

Add Remote Keypair Loader, a new keypair hotloading mechanism #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 16 additions & 50 deletions config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ key_store.root_path = "/path/to/keystore"

### Optional fields ###

# Path to publisher identity keypair. When not found, the network will expect a remote-loaded keypair. see remote_keypair_loader options for details.
# key_store.publish_keypair_path = "publish_key_pair.json" # I exist, remote loading disabled
# key_store.publish_keypair_path = "none" # I do not exist, remote loading activated for the network

# The interval with which to poll account information.
# oracle.poll_interval_duration = "2m"

Expand Down Expand Up @@ -56,59 +60,21 @@ key_store.root_path = "/path/to/keystore"
# a value at least as large as (number of products published / number of products in a batch).
# exporter.transaction_monitor.max_transactions = "100"

# Configuration for the optional secondary network this agent will publish data to. In most cases this should be a Solana endpoint.
# [secondary_network]
### Required fields ###

# HTTP(S) endpoint of the RPC node. The Solana public RPC endpoints are rate-limited, so a private node should be used.
# rpc_url = "http://api.devnet.solana.com"

# WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network.
# This can be omitted when oracle.subscriber_enabled is set to false.
# wss_url = "ws://api.devnet.solana.com"

# Timeout for the requests to the RPC
# rpc_timeout = "10s"

# Path to the key store.
# key_store.root_path = "/path/to/keystore"

### Optional fields ###

# The interval with which to poll account information.
# oracle.poll_interval_duration = "2m"

# Whether subscribing to account updates over websocket is enabled
# oracle.subscriber_enabled = true

# Duration of the interval at which to refresh the cached network state (current slot and blockhash).
# It is recommended to set this to slightly less than the network's block time,
# as the slot fetched will be used as the time of the price update.
# exporter.refresh_network_state_interval_duration = "200ms"

# Duration of the interval at which to publish updates
# exporter.publish_interval_duration = "1s"

# Age after which a price update is considered stale and not published
# exporter.staleness_threshold = "5s"

# Maximum size of a batch
# exporter.max_batch_size = 12

# Maximum number of compute units requested by each update_price transaction
# exporter.compute_unit_limit = 20000
# Where to serve the quick-access dashboard and metrics.
# metrics_server.bind_address = "127.0.0.1:8888"

# Price per compute unit offered for update_price transactions
# exporter.compute_unit_price_micro_lamports =
# Where to serve the remote keypair loading endpoint, under "/primary/load_keypair" and "/secondary/load_keypair"
#
# NOTE: non-loopback addresses must be used carefully, making sure the
# connection is not exposed for unauthorized access.
# remote_keypair_loader.bind_address = "127.0.0.1:9001"

# Duration of the interval with which to poll the status of transactions.
# It is recommended to set this to a value close to exporter.publish_interval_duration
# exporter.transaction_monitor.poll_interval_duration = "4s"
# How much whole SOL must a keypair hold to be considered valid for use on a given network. Disabled with 0
# remote_keypair_loader.primary_min_keypair_balance_sol = 1
# remote_keypair_loader.secondary_min_keypair_balance_sol = 1

# Maximum number of recent transactions to monitor. When this number is exceeded,
# the oldest transactions are no longer monitored. It is recommended to set this to
# a value at least as large as (number of products published / number of products in a batch).
# exporter.transaction_monitor.max_transactions = "100"
# Configuration for the optional secondary network this agent will publish data to. In most cases this should be a Solana endpoint. The options correspond to the ones in primary_network
# [secondary_network]

# Configuration for the JRPC API
[pythd_adapter]
Expand Down
377 changes: 243 additions & 134 deletions integration-tests/poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions integration-tests/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ program-admin = { git = "https://github.com/pyth-network/program-admin.git", bra
pytest = "^7.2"
pytest-asyncio = "^0.18.3"
pre-commit = "^2.21.0"
requests = "^2.28.2"
jsonrpc_websocket = "^3.1.4"

[build-system]
Expand Down
62 changes: 56 additions & 6 deletions integration-tests/tests/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
import os
import requests
import time
from typing import Any, List
import pytest
Expand Down Expand Up @@ -140,12 +141,19 @@ def spawn(self, cmd, log_dir=None):

with open(stdout_path, 'w') as stdout:
with open(stderr_path, 'w') as stderr:
with subprocess.Popen(cmd.split(), stdout=stdout, stderr=stderr, env=env) as process:
LOGGER.debug(
"Spawned subprocess with command %s logging to %s", cmd, log_dir)
yield
process.terminate()
process.wait()
process = subprocess.Popen(cmd.split(), stdout=stdout, stderr=stderr, env=env)
LOGGER.debug(
"Spawned subprocess with command %s logging to %s", cmd, log_dir)
yield

process.poll() # fills return code if available

if process.returncode is not None and process.returncode != 0:
LOGGER.error("Spawned process \"%s\" finished with error code %d before teardown. See logs in %s", cmd, process.returncode, log_dir)

process.terminate()
process.wait()

stderr.flush()
stdout.flush()

Expand Down Expand Up @@ -288,6 +296,9 @@ def agent_publish_keypair(self, agent_keystore_path, sync_accounts):

LOGGER.debug("Airdropping SOL to publish keypair at %s", path)
self.run(f"solana airdrop 100 -k {path} -u localhost")
address = self.run(f"solana address -k {path} -u localhost")
balance = self.run(f"solana balance -k {path} -u localhost")
LOGGER.debug(f"Publisher {address.stdout.strip()} balance: {balance.stdout.strip()}")
time.sleep(8)

@pytest.fixture
Expand All @@ -312,13 +323,39 @@ def agent(self, sync_accounts, agent_keystore, tmp_path):
time.sleep(3)
yield

@pytest.fixture
def agent_hotload(self, sync_accounts, agent_keystore, agent_keystore_path, tmp_path):
"""
Spawns an agent without a publish keypair, used for keypair hotloading testing
"""
os.remove(os.path.join(agent_keystore_path, "publish_key_pair.json"))

LOGGER.debug("Building hotload agent binary")
self.run("cargo build --release")

log_dir = os.path.join(tmp_path, "agent_logs")
LOGGER.debug("Launching hotload agent logging to %s", log_dir)

os.environ["RUST_BACKTRACE"] = "full"
os.environ["RUST_LOG"] = "debug"
with self.spawn("../target/release/agent --config agent_conf.toml", log_dir=log_dir):
time.sleep(3)
yield

@pytest_asyncio.fixture
async def client(self, agent):
client = PythAgentClient(address="ws://localhost:8910")
await client.connect()
yield client
await client.close()

@pytest_asyncio.fixture
async def client_hotload(self, agent_hotload):
client = PythAgentClient(address="ws://localhost:8910")
await client.connect()
yield client
await client.close()


class TestUpdatePrice(PythTest):

Expand Down Expand Up @@ -348,3 +385,16 @@ async def test_update_price_simple(self, client: PythAgentClient):
assert price_account["price"] == 42
assert price_account["conf"] == 2
assert price_account["status"] == "trading"

@pytest.mark.asyncio
async def test_update_price_simple_with_keypair_hotload(self, client_hotload: PythAgentClient):
# Hotload the keypair into running agent
hl_request = requests.post("http://localhost:9001/primary/load_keypair", json=PUBLISHER_KEYPAIR)

# Verify succesful hotload
assert hl_request.status_code == 200

LOGGER.info("Publisher keypair hotload OK")

# Continue normally with the existing simple scenario
await self.test_update_price_simple(client_hotload)
38 changes: 30 additions & 8 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Note that there is an Oracle and Exporter for each network, but only one Local S

pub mod metrics;
pub mod pythd;
pub mod remote_keypair_loader;
pub mod solana;
pub mod store;
use {
Expand Down Expand Up @@ -99,12 +100,15 @@ impl Agent {
mpsc::channel(self.config.channel_capacities.local_store);
let (pythd_adapter_tx, pythd_adapter_rx) =
mpsc::channel(self.config.channel_capacities.pythd_adapter);
let (primary_keypair_loader_tx, primary_keypair_loader_rx) = mpsc::channel(10);
let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10);

// Spawn the primary network
jhs.extend(network::spawn_network(
self.config.primary_network.clone(),
local_store_tx.clone(),
primary_oracle_updates_tx,
primary_keypair_loader_tx,
logger.new(o!("primary" => true)),
)?);

Expand All @@ -114,6 +118,7 @@ impl Agent {
config.clone(),
local_store_tx.clone(),
secondary_oracle_updates_tx,
secondary_keypair_loader_tx,
logger.new(o!("primary" => false)),
)?);
}
Expand Down Expand Up @@ -153,9 +158,25 @@ impl Agent {
self.config.metrics_server.bind_address,
local_store_tx,
global_store_lookup_tx,
logger,
logger.clone(),
)));

// Spawn the remote keypair loader endpoint for both networks
jhs.append(
&mut remote_keypair_loader::RemoteKeypairLoader::spawn(
primary_keypair_loader_rx,
secondary_keypair_loader_rx,
self.config.primary_network.rpc_url.clone(),
self.config
.secondary_network
.as_ref()
.map(|c| c.rpc_url.clone()),
self.config.remote_keypair_loader.clone(),
logger,
)
.await,
);

// Wait for all tasks to complete
join_all(jhs).await;

Expand All @@ -164,11 +185,11 @@ impl Agent {
}

pub mod config {

use {
super::{
metrics,
pythd,
remote_keypair_loader,
solana::network,
},
anyhow::{
Expand All @@ -188,12 +209,13 @@ pub mod config {
#[derive(Default, Deserialize, Debug)]
#[serde(default)]
pub struct Config {
pub channel_capacities: ChannelCapacities,
pub primary_network: network::Config,
pub secondary_network: Option<network::Config>,
pub pythd_adapter: pythd::adapter::Config,
pub pythd_api_server: pythd::api::rpc::Config,
pub metrics_server: metrics::Config,
pub channel_capacities: ChannelCapacities,
pub primary_network: network::Config,
pub secondary_network: Option<network::Config>,
pub pythd_adapter: pythd::adapter::Config,
pub pythd_api_server: pythd::api::rpc::Config,
pub metrics_server: metrics::Config,
pub remote_keypair_loader: remote_keypair_loader::Config,
}

impl Config {
Expand Down
Loading