diff --git a/Cargo.lock b/Cargo.lock index 745a67e4..abbda64b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1240,7 +1240,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util 0.7.3", + "tokio-util", "tracing", ] @@ -2384,7 +2384,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "0.1.2" +version = "0.1.3" dependencies = [ "anyhow", "async-trait", @@ -2417,7 +2417,7 @@ dependencies = [ "tokio", "tokio-retry", "tokio-stream", - "tokio-util 0.7.3", + "tokio-util", "tracing", "warp", ] @@ -2731,7 +2731,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-rustls", - "tokio-util 0.7.3", + "tokio-util", "tower-service", "url", "wasm-bindgen", @@ -4233,19 +4233,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-tungstenite" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "511de3f85caf1c98983545490c3d09685fa8eb634e57eec22bb4db271f46cbd8" -dependencies = [ - "futures-util", - "log", - "pin-project", - "tokio", - "tungstenite 0.14.0", -] - [[package]] name = "tokio-tungstenite" version = "0.16.1" @@ -4276,20 +4263,6 @@ dependencies = [ "webpki-roots", ] -[[package]] -name = "tokio-util" -version = "0.6.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "log", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.3" @@ -4372,25 +4345,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" -[[package]] -name = "tungstenite" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0b2d8558abd2e276b0a8df5c05a2ec762609344191e5fd23e292c910e9165b5" -dependencies = [ - "base64 0.13.0", - "byteorder", - "bytes", - "http", - "httparse", - "log", - "rand 0.8.5", - "sha-1 0.9.8", - "thiserror", - "url", - "utf-8", -] - [[package]] name = "tungstenite" version = "0.16.0" @@ -4576,9 +4530,9 @@ dependencies = [ [[package]] name = "warp" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cef4e1e9114a4b7f1ac799f16ce71c14de5778500c5450ec6b7b920c55b587e" +checksum = "ed7b8be92646fc3d18b06147664ebc5f48d222686cb11a8755e561a735aacc6d" dependencies = [ "bytes", "futures-channel", @@ -4592,14 +4546,15 @@ dependencies = [ "multipart", "percent-encoding", "pin-project", + "rustls-pemfile 0.2.1", "scoped-tls", "serde", "serde_json", "serde_urlencoded", "tokio", "tokio-stream", - "tokio-tungstenite 0.15.0", - "tokio-util 0.6.10", + "tokio-tungstenite 0.17.2", + "tokio-util", "tower-service", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index b2648476..6ce9413a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "0.1.2" +version = "0.1.3" edition = "2021" [[bin]] @@ -11,7 +11,7 @@ path = "src/bin/agent.rs" anyhow = "1.0.55" serde = { version = "1.0.136", features = ["derive"] } async-trait = "0.1.52" -warp = { version = "0.3.1", features = ["websocket"] } +warp = { version = "0.3.3", features = ["websocket"] } tokio = { version = "1.0", features = ["full"] } tokio-stream = "0.1.1" futures-util = { version = "0.3", default-features = false, features = [ diff --git a/config/config.toml b/config/config.toml index ca4dfbb4..cf928bc7 100644 --- a/config/config.toml +++ b/config/config.toml @@ -67,6 +67,9 @@ key_store.root_path = "/path/to/keystore" # This can be omitted when oracle.subscriber_enabled is set to false. # wss_url = "ws://api.devnet.solana.com" +# Timeout for the requests to the RPC +# rpc_timeout = "10s" + # Path to the key store. # key_store.root_path = "/path/to/keystore" diff --git a/src/agent.rs b/src/agent.rs index 128035e7..bfff806c 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -103,7 +103,7 @@ impl Agent { self.config.primary_network.clone(), local_store_tx.clone(), primary_oracle_updates_tx, - logger.clone(), + logger.new(o!("primary" => true)), )?); // Spawn the secondary network, if needed @@ -112,7 +112,7 @@ impl Agent { config.clone(), local_store_tx.clone(), secondary_oracle_updates_tx, - logger.clone(), + logger.new(o!("primary" => false)), )?); } diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 6b56d8e8..a45bdeff 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -24,6 +24,7 @@ pub mod network { Serialize, }, slog::Logger, + std::time::Duration, tokio::{ sync::{ mpsc, @@ -38,25 +39,29 @@ pub mod network { #[serde(default)] pub struct Config { /// HTTP RPC endpoint - pub rpc_url: String, + pub rpc_url: String, /// WSS RPC endpoint - pub wss_url: String, + pub wss_url: String, + /// Timeout for the requests to the RPC + #[serde(with = "humantime_serde")] + pub rpc_timeout: Duration, /// Keystore - pub key_store: key_store::Config, + pub key_store: key_store::Config, /// Configuration for the Oracle reading data from this network - pub oracle: oracle::Config, + pub oracle: oracle::Config, /// Configuration for the Exporter publishing data to this network - pub exporter: exporter::Config, + pub exporter: exporter::Config, } impl Default for Config { fn default() -> Self { Self { - rpc_url: "http://localhost:8899".to_string(), - wss_url: "ws://localhost:8900".to_string(), - key_store: Default::default(), - oracle: Default::default(), - exporter: Default::default(), + rpc_url: "http://localhost:8899".to_string(), + wss_url: "ws://localhost:8900".to_string(), + rpc_timeout: Duration::from_secs(10), + key_store: Default::default(), + oracle: Default::default(), + exporter: Default::default(), } } } @@ -72,6 +77,7 @@ pub mod network { config.oracle.clone(), &config.rpc_url, &config.wss_url, + config.rpc_timeout, KeyStore::new(config.key_store.clone())?, global_store_update_tx, logger.clone(), @@ -81,6 +87,7 @@ pub mod network { let exporter_jhs = exporter::spawn_exporter( config.exporter, &config.rpc_url, + config.rpc_timeout, KeyStore::new(config.key_store.clone())?, local_store_tx, logger, diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index 7ce715a7..ed10c5cc 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -130,6 +130,7 @@ impl Default for Config { pub fn spawn_exporter( config: Config, rpc_url: &str, + rpc_timeout: Duration, key_store: KeyStore, local_store_tx: Sender, logger: Logger, @@ -137,7 +138,8 @@ pub fn spawn_exporter( // Create and spawn the network state querier let (network_state_tx, network_state_rx) = watch::channel(Default::default()); let mut network_state_querier = NetworkStateQuerier::new( - &rpc_url, + rpc_url, + rpc_timeout, time::interval(config.refresh_network_state_interval_duration), network_state_tx, logger.clone(), @@ -150,6 +152,7 @@ pub fn spawn_exporter( let mut transaction_monitor = TransactionMonitor::new( config.transaction_monitor.clone(), rpc_url, + rpc_timeout, transactions_rx, logger.clone(), ); @@ -159,6 +162,7 @@ pub fn spawn_exporter( let mut exporter = Exporter::new( config, rpc_url, + rpc_timeout, key_store, local_store_tx, network_state_rx, @@ -206,6 +210,7 @@ impl Exporter { pub fn new( config: Config, rpc_url: &str, + rpc_timeout: Duration, key_store: KeyStore, local_store_tx: Sender, network_state_rx: watch::Receiver, @@ -214,7 +219,7 @@ impl Exporter { ) -> Self { let publish_interval = time::interval(config.publish_interval_duration); Exporter { - rpc_client: RpcClient::new(rpc_url.to_string()), + rpc_client: RpcClient::new_with_timeout(rpc_url.to_string(), rpc_timeout), config, publish_interval, key_store, @@ -445,12 +450,13 @@ struct NetworkStateQuerier { impl NetworkStateQuerier { pub fn new( rpc_endpoint: &str, + rpc_timeout: Duration, query_interval: Interval, network_state_tx: watch::Sender, logger: Logger, ) -> Self { NetworkStateQuerier { - rpc_client: RpcClient::new(rpc_endpoint.to_string()), + rpc_client: RpcClient::new_with_timeout(rpc_endpoint.to_string(), rpc_timeout), query_interval, network_state_tx, logger, @@ -558,11 +564,12 @@ mod transaction_monitor { pub fn new( config: Config, rpc_url: &str, + rpc_timeout: Duration, transactions_rx: mpsc::Receiver, logger: Logger, ) -> Self { let poll_interval = time::interval(config.poll_interval_duration); - let rpc_client = RpcClient::new(rpc_url.to_string()); + let rpc_client = RpcClient::new_with_timeout(rpc_url.to_string(), rpc_timeout); TransactionMonitor { config, rpc_client, diff --git a/src/agent/solana/oracle.rs b/src/agent/solana/oracle.rs index 80748f7d..12c8e431 100644 --- a/src/agent/solana/oracle.rs +++ b/src/agent/solana/oracle.rs @@ -123,6 +123,7 @@ pub fn spawn_oracle( config: Config, rpc_url: &str, wss_url: &str, + rpc_timeout: Duration, key_store: KeyStore, global_store_update_tx: mpsc::Sender, logger: Logger, @@ -135,6 +136,7 @@ pub fn spawn_oracle( let subscriber = Subscriber::new( rpc_url.to_string(), wss_url.to_string(), + rpc_timeout, config.commitment, key_store.program_key.clone(), updates_tx, @@ -149,6 +151,7 @@ pub fn spawn_oracle( data_tx, key_store.mapping_key, rpc_url, + rpc_timeout, config.commitment, config.poll_interval_duration, logger.clone(), @@ -337,12 +340,16 @@ impl Poller { data_tx: mpsc::Sender, mapping_account_key: Pubkey, rpc_url: &str, + rpc_timeout: Duration, commitment: CommitmentLevel, poll_interval_duration: Duration, logger: Logger, ) -> Self { - let rpc_client = - RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig { commitment }); + let rpc_client = RpcClient::new_with_timeout_and_commitment( + rpc_url.to_string(), + rpc_timeout, + CommitmentConfig { commitment }, + ); let poll_interval = tokio::time::interval(poll_interval_duration); Poller { @@ -513,6 +520,7 @@ mod subscriber { BlockchainShadow, SyncOptions, }, + std::time::Duration, tokio::sync::{ broadcast, mpsc, @@ -527,6 +535,9 @@ mod subscriber { /// WSS RPC endpoint wss_url: String, + /// Timeout for RPC requests + rpc_timeout: Duration, + /// Commitment level used to read account data commitment: CommitmentLevel, @@ -544,6 +555,7 @@ mod subscriber { pub fn new( rpc_url: String, wss_url: String, + rpc_timeout: Duration, commitment: CommitmentLevel, account_key: Pubkey, updates_tx: mpsc::Sender<(Pubkey, solana_sdk::account::Account)>, @@ -552,6 +564,7 @@ mod subscriber { Subscriber { rpc_url, wss_url, + rpc_timeout, commitment, account_key, updates_tx, @@ -597,6 +610,7 @@ mod subscriber { self.wss_url.clone(), ), commitment: self.commitment, + rpc_timeout: self.rpc_timeout, max_lag: Some(10000), ..SyncOptions::default() }, diff --git a/src/bin/agent.rs b/src/bin/agent.rs index dbf52e05..1a3b7614 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -32,7 +32,7 @@ async fn main() { let logger = slog::Logger::root( slog_async::Async::default( LogBuilder::new( - slog_term::CompactFormat::new(slog_term::TermDecorator::new().stdout().build()) + slog_term::FullFormat::new(slog_term::TermDecorator::new().stdout().build()) .build() .fuse(), )