Skip to content

Commit

Permalink
fix: set robust limits for busy a blockchain (#3150)
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Aug 12, 2021
2 parents fd6d5c6 + 0f894e5 commit c993780
Show file tree
Hide file tree
Showing 20 changed files with 118 additions and 64 deletions.
8 changes: 6 additions & 2 deletions applications/tari_console_wallet/src/init/mod.rs
Expand Up @@ -348,6 +348,7 @@ pub async fn init_wallet(
let base_node_service_config = BaseNodeServiceConfig::new(
config.wallet_base_node_service_refresh_interval,
config.wallet_base_node_service_request_max_age,
config.base_node_event_channel_size,
);

let factories = CryptoFactories::default();
Expand All @@ -363,17 +364,20 @@ pub async fn init_wallet(
config.transaction_routing_mechanism.clone(),
),
num_confirmations_required: config.transaction_num_confirmations_required,
transaction_event_channel_size: config.transaction_event_channel_size,
..Default::default()
}),
Some(OutputManagerServiceConfig {
base_node_query_timeout: config.base_node_query_timeout,
prevent_fee_gt_amount: config.prevent_fee_gt_amount,
event_channel_size: config.output_manager_event_channel_size,
base_node_update_publisher_channel_size: config.base_node_update_publisher_channel_size,
..Default::default()
}),
config.network.into(),
Some(base_node_service_config),
Some(config.buffer_size_base_node_wallet),
Some(config.buffer_rate_limit_base_node_wallet),
Some(config.buffer_size_console_wallet),
Some(config.buffer_rate_limit_console_wallet),
Some(config.scan_for_utxo_interval),
);
wallet_config.buffer_size = std::cmp::max(BASE_NODE_BUFFER_MIN_SIZE, config.buffer_size_base_node);
Expand Down
Expand Up @@ -63,7 +63,7 @@ impl WalletEventMonitor {
result = transaction_service_events.select_next_some() => {
match result {
Ok(msg) => {
trace!(target: LOG_TARGET, "Wallet Event Monitor received wallet event {:?}", msg);
trace!(target: LOG_TARGET, "Wallet Event Monitor received wallet transaction service event {:?}", msg);
match (*msg).clone() {
TransactionEvent::ReceivedFinalizedTransaction(tx_id) => {
self.trigger_tx_state_refresh(tx_id).await;
Expand Down Expand Up @@ -108,7 +108,7 @@ impl WalletEventMonitor {
result = connectivity_events.select_next_some() => {
match result {
Ok(msg) => {
trace!(target: LOG_TARGET, "Wallet Event Monitor received wallet event {:?}", msg);
trace!(target: LOG_TARGET, "Wallet Event Monitor received wallet connectivity event {:?}", msg);
match &*msg {
ConnectivityEvent::PeerDisconnected(_) |
ConnectivityEvent::ManagedPeerDisconnected(_) |
Expand Down
8 changes: 4 additions & 4 deletions base_layer/core/src/base_node/service/service.rs
Expand Up @@ -544,7 +544,7 @@ async fn handle_outbound_request(
Request::FetchBlocksWithUtxos(_) => {
trace!(
target: LOG_TARGET,
"Timeout for service request ({}) at {:?}",
"Timeout for service request FetchBlocks... ({}) set at {:?}",
request_key,
config.fetch_blocks_timeout
);
Expand All @@ -553,7 +553,7 @@ async fn handle_outbound_request(
Request::FetchMatchingUtxos(_) => {
trace!(
target: LOG_TARGET,
"Timeout for service request ({}) at {:?}",
"Timeout for service request FetchMatchingUtxos ({}) set at {:?}",
request_key,
config.fetch_utxos_timeout
);
Expand All @@ -562,7 +562,7 @@ async fn handle_outbound_request(
_ => {
trace!(
target: LOG_TARGET,
"Timeout for service request ({}) at {:?}",
"Timeout for service request ... ({}) set at {:?}",
request_key,
config.service_request_timeout
);
Expand Down Expand Up @@ -638,7 +638,7 @@ async fn handle_request_timeout(
let _ = reply_tx.send(reply_msg.map_err(|e| {
error!(
target: LOG_TARGET,
"Failed to send outbound request (request key: {}): {:?}", &request_key, e
"Failed to process outbound request (request key: {}): {:?}", &request_key, e
);
e
}));
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/mempool/service/service.rs
Expand Up @@ -475,7 +475,7 @@ async fn handle_request_timeout(
let _ = reply_tx.send(reply_msg.map_err(|e| {
error!(
target: LOG_TARGET,
"Failed to send outbound request (request key: {}): {:?}", &request_key, e
"Failed to process outbound request (request key: {}): {:?}", &request_key, e
);
e
}));
Expand Down
11 changes: 8 additions & 3 deletions base_layer/wallet/src/base_node_service/config.rs
Expand Up @@ -30,6 +30,7 @@ pub struct BaseNodeServiceConfig {
pub base_node_monitor_refresh_interval: Duration,
pub base_node_rpc_pool_size: usize,
pub request_max_age: Duration,
pub event_channel_size: usize,
}

impl Default for BaseNodeServiceConfig {
Expand All @@ -38,21 +39,25 @@ impl Default for BaseNodeServiceConfig {
base_node_monitor_refresh_interval: Duration::from_secs(5),
base_node_rpc_pool_size: 10,
request_max_age: Duration::from_secs(60),
event_channel_size: 250,
}
}
}

impl BaseNodeServiceConfig {
pub fn new(refresh_interval: u64, request_max_age: u64) -> Self {
pub fn new(refresh_interval: u64, request_max_age: u64, event_channel_size: usize) -> Self {
info!(
target: LOG_TARGET,
"Setting new wallet base node service config, refresh interval: {}s, request max age: {}s",
"Setting new wallet base node service config, refresh interval: {}s, request max age: {}s, event channel \
size : {}",
refresh_interval,
request_max_age
request_max_age,
event_channel_size
);
Self {
base_node_monitor_refresh_interval: Duration::from_secs(refresh_interval),
request_max_age: Duration::from_secs(request_max_age),
event_channel_size,
..Default::default()
}
}
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/base_node_service/mod.rs
Expand Up @@ -69,7 +69,7 @@ where T: WalletBackend + 'static

let (sender, request_stream) = reply_channel::unbounded();

let (event_publisher, _) = broadcast::channel(200);
let (event_publisher, _) = broadcast::channel(self.config.event_channel_size);

let basenode_service_handle = BaseNodeServiceHandle::new(sender, event_publisher.clone());

Expand Down
4 changes: 4 additions & 0 deletions base_layer/wallet/src/output_manager_service/config.rs
Expand Up @@ -30,6 +30,8 @@ pub struct OutputManagerServiceConfig {
pub prevent_fee_gt_amount: bool,
pub peer_dial_retry_timeout: Duration,
pub seed_word_language: MnemonicLanguage,
pub event_channel_size: usize,
pub base_node_update_publisher_channel_size: usize,
}

impl Default for OutputManagerServiceConfig {
Expand All @@ -40,6 +42,8 @@ impl Default for OutputManagerServiceConfig {
prevent_fee_gt_amount: true,
peer_dial_retry_timeout: Duration::from_secs(20),
seed_word_language: MnemonicLanguage::English,
event_channel_size: 250,
base_node_update_publisher_channel_size: 50,
}
}
}
2 changes: 1 addition & 1 deletion base_layer/wallet/src/output_manager_service/mod.rs
Expand Up @@ -106,7 +106,7 @@ where T: OutputManagerBackend + 'static
);

let (sender, receiver) = reply_channel::unbounded();
let (publisher, _) = broadcast::channel(200);
let (publisher, _) = broadcast::channel(self.config.event_channel_size);

// Register handle before waiting for handles to be ready
let oms_handle = OutputManagerHandle::new(sender, publisher.clone());
Expand Down
3 changes: 2 additions & 1 deletion base_layer/wallet/src/output_manager_service/service.rs
Expand Up @@ -142,7 +142,8 @@ where TBackend: OutputManagerBackend + 'static
shutdown_signal,
};

let (base_node_update_publisher, _) = broadcast::channel(50);
let (base_node_update_publisher, _) =
broadcast::channel(resources.config.base_node_update_publisher_channel_size);

Ok(OutputManagerService {
resources,
Expand Down
Expand Up @@ -264,7 +264,9 @@ where TBackend: OutputManagerBackend + 'static

let mut client = match base_node_connection
.connect_rpc_using_builder(
BaseNodeWalletRpcClient::builder().with_deadline(self.resources.config.base_node_query_timeout),
BaseNodeWalletRpcClient::builder()
.with_deadline(self.resources.config.base_node_query_timeout)
.with_handshake_timeout(self.resources.config.base_node_query_timeout),
)
.await
{
Expand Down
2 changes: 2 additions & 0 deletions base_layer/wallet/src/transaction_service/config.rs
Expand Up @@ -38,6 +38,7 @@ pub struct TransactionServiceConfig {
pub num_confirmations_required: u64,
pub max_tx_query_batch_size: usize,
pub transaction_routing_mechanism: TransactionRoutingMechanism,
pub transaction_event_channel_size: usize,
}

impl Default for TransactionServiceConfig {
Expand All @@ -54,6 +55,7 @@ impl Default for TransactionServiceConfig {
num_confirmations_required: 3,
max_tx_query_batch_size: 5000,
transaction_routing_mechanism: TransactionRoutingMechanism::default(),
transaction_event_channel_size: 1000,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/transaction_service/mod.rs
Expand Up @@ -172,7 +172,7 @@ where T: TransactionBackend + 'static
let base_node_response_stream = self.base_node_response_stream();
let transaction_cancelled_stream = self.transaction_cancelled_stream();

let (publisher, _) = broadcast::channel(200);
let (publisher, _) = broadcast::channel(self.config.transaction_event_channel_size);

let transaction_handle = TransactionServiceHandle::new(sender, publisher.clone());

Expand Down
Expand Up @@ -230,7 +230,8 @@ where TBackend: TransactionBackend + 'static
let mut client = match base_node_connection
.connect_rpc_using_builder(
BaseNodeWalletRpcClient::builder()
.with_deadline(self.resources.config.broadcast_monitoring_timeout),
.with_deadline(self.resources.config.broadcast_monitoring_timeout)
.with_handshake_timeout(self.resources.config.broadcast_monitoring_timeout),
)
.await
{
Expand Down
Expand Up @@ -284,7 +284,9 @@ where TBackend: TransactionBackend + 'static
};
let mut client = match base_node_connection
.connect_rpc_using_builder(
BaseNodeWalletRpcClient::builder().with_deadline(self.resources.config.chain_monitoring_timeout),
BaseNodeWalletRpcClient::builder()
.with_deadline(self.resources.config.chain_monitoring_timeout)
.with_handshake_timeout(self.resources.config.chain_monitoring_timeout),
)
.await
{
Expand Down
Expand Up @@ -258,7 +258,11 @@ where TBackend: TransactionBackend + 'static
};

let mut client = match base_node_connection
.connect_rpc_using_builder(BaseNodeWalletRpcClient::builder().with_deadline(self.timeout))
.connect_rpc_using_builder(
BaseNodeWalletRpcClient::builder()
.with_deadline(self.timeout)
.with_handshake_timeout(self.timeout),
)
.await
{
Ok(c) => c,
Expand Down
63 changes: 41 additions & 22 deletions common/config/presets/tari_config_example.toml
Expand Up @@ -43,16 +43,16 @@ network = "weatherwax"
#liveness_allowlist_cidrs = ["127.0.0.1/32"]

# The buffer size constants for the publish/subscribe connector channel, connecting comms messages to the domain layer:
# - Buffer size for the base node (min value = 30, default value = 100).
#buffer_size_base_node = 100
# - Buffer size for the base node wallet (min value = 300, default value = 1000).
#buffer_size_base_node_wallet = 1000
# - Buffer size for the base node (min value = 30, default value = 1500).
#buffer_size_base_node = 1500
# - Buffer size for the console wallet (min value = 300, default value = 50000).
#buffer_size_console_wallet = 50000
# The rate limit constants for the publish/subscribe connector channel, i.e. maximum amount of inbound messages to
# accept - any rate attemting to exceed this limit will be throttled.
# - Rate limit for the base node (min value = 5, default value = 20).
#buffer_rate_limit_base_node = 20
# - Rate limit for the base node wallet (min value = 5, default value = 20).
#buffer_rate_limit_base_node_wallet = 20
# - Rate limit for the base node (min value = 5, default value = 1000).
#buffer_rate_limit_base_node = 1000
# - Rate limit for the console wallet (min value = 5, default value = 1000).
buffer_rate_limit_console_wallet = 1000
# The message deduplication persistent cache size - messages with these hashes in the cache will only be processed once.
# The cache will also be trimmed down to size periodically (min value = 0, default value = 2500).
dedup_cache_capacity = 25000
Expand All @@ -66,8 +66,9 @@ dedup_cache_capacity = 25000
# The timeout (s) for requesting other base node services (min value = 10 s, default value = 180 s).
#service_request_timeout = 180

# The maximum simultaneous comms RPC sessions allowed. Setting this to -1 will allow unlimited sessions.
# rpc_max_simultaneous_sessions = 1000
# The maximum simultaneous comms RPC sessions allowed (default value = 1000). Setting this to -1 will allow unlimited
# sessions.
rpc_max_simultaneous_sessions = 10000

# Auto Update
#
Expand Down Expand Up @@ -116,20 +117,33 @@ console_wallet_db_file = "wallet/console-wallet.dat"

# This is the timeout period that will be used to monitor TXO queries to the base node (default = 60). Larger values
# are needed for wallets with many (>1000) TXOs to be validated.
base_node_query_timeout = 120
base_node_query_timeout = 180
# The amount of seconds added to the current time (Utc) which will then be used to check if the message has
# expired or not when processing the message (default = 10800).
#saf_expiry_duration = 10800
# This is the number of block confirmations required for a transaction to be considered completely mined and confirmed. (default = 3)
# This is the number of block confirmations required for a transaction to be considered completely mined and
# confirmed. (default = 3)
#transaction_num_confirmations_required = 3
# This is the timeout period that will be used for base node broadcast monitoring tasks (default = 60)
#transaction_broadcast_monitoring_timeout = 60
transaction_broadcast_monitoring_timeout = 180
# This is the timeout period that will be used for chain monitoring tasks (default = 60)
#transaction_chain_monitoring_timeout = 60
# This is the timeout period that will be used for sending transactions directly (default = 20)
#transaction_direct_send_timeout = 20
transaction_direct_send_timeout = 180
# This is the timeout period that will be used for sending transactions via broadcast mode (default = 60)
#transaction_broadcast_send_timeout = 60
transaction_broadcast_send_timeout = 180
# This is the size of the event channel used to communicate transaction status events to the wallet's UI. A busy console
# wallet doing thousands of bulk payments or used for stress testing needs a fairly big size (>10000) (default = 1000).
transaction_event_channel_size = 25000
# This is the size of the event channel used to communicate base node events to the wallet. A busy console
# wallet doing thousands of bulk payments or used for stress testing needs a fairly big size (>3000) (default = 250).
base_node_event_channel_size = 3500
# This is the size of the event channel used to communicate output manager events to the wallet. A busy console
# wallet doing thousands of bulk payments or used for stress testing needs a fairly big size (>3000) (default = 250).
output_manager_event_channel_size = 3500
# This is the size of the event channel used to communicate base node update events to the wallet. A busy console
# wallet doing thousands of bulk payments or used for stress testing needs a fairly big size (>300) (default = 50).
base_node_update_publisher_channel_size = 500
# If a large amount of tiny valued uT UTXOs are used as inputs to a transaction, the fee may be larger than
# the transaction amount. Set this value to `false` to allow spending of "dust" UTXOs for small valued
# transactions (default = true).
Expand All @@ -140,7 +154,7 @@ base_node_query_timeout = 120
#transaction_routing_mechanism = "DirectAndStoreAndForward"

# UTXO scanning service interval (default = 12 hours, i.e. 60 * 60 * 12 seconds)
scan_for_utxo_interval = 60
scan_for_utxo_interval = 180

# When running the console wallet in command mode, use these values to determine what "stage" and timeout to wait
# for sent transactions.
Expand All @@ -151,7 +165,7 @@ scan_for_utxo_interval = 60
# - "MinedUnconfirmed" - The transaction was successfully detected as mined but unconfirmed on the blockchain.
# - "Mined" - The transaction was successfully detected as mined and confirmed on the blockchain.

# The default values are:
# The default values are: "Broadcast", 300
#command_send_wait_stage = "Broadcast"
#command_send_wait_timeout = 300

Expand All @@ -161,9 +175,9 @@ scan_for_utxo_interval = 60

# Configuration for the wallet's base node service
# The refresh interval, defaults to 10 seconds
# base_node_service_refresh_interval = 10
base_node_service_refresh_interval = 30
# The maximum age of service requests in seconds, requests older than this are discarded
# base_node_service_request_max_age = 60
base_node_service_request_max_age = 180

#[base_node.transport.tor]
#control_address = "/ip4/127.0.0.1/tcp/9051"
Expand Down Expand Up @@ -253,6 +267,10 @@ db_type = "lmdb"
# is "0", which indicates an archival node without any pruning.
#pruning_horizon = 0

# The amount of messages that will be permitted in the flood ban timespan of 100s (Default weatherwax = 1000,
# default mainnet = 10000)
flood_ban_max_msg_count = 10000

# The relative path to store persistent data
data_dir = "weatherwax"

Expand Down Expand Up @@ -464,9 +482,10 @@ console_wallet_tor_identity_file = "config/console_wallet_tor.json"
[merge_mining_proxy.weatherwax]

# URL to monerod
monerod_url = "http://monero-stagenet.exan.tech:38081" # stagenet
#monerod_url = "http://18.133.59.45:28081" # testnet
#monerod_url = "http://18.132.124.81:18081" # mainnet
monerod_url = "http://monero-stagenet.exan.tech:38081" # stagenet
#monerod_url = "http://18.133.59.45:28081" # testnet
#monerod_url = "http://18.132.124.81:18081" # mainnet
#monerod_url = "http://monero.exan.tech:18081" # mainnet alternative

# Address of the tari_merge_mining_proxy application
proxy_host_address = "127.0.0.1:7878"
Expand Down

0 comments on commit c993780

Please sign in to comment.