Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/signers resend timed out messages #4737

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/bitcoin-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ jobs:
- tests::signer::stackerdb_sign_after_signer_reboot
- tests::nakamoto_integrations::stack_stx_burn_op_integration_test
- tests::signer::stackerdb_delayed_dkg
- tests::signer::stackerdb_resend_messages
# Do not run this one until we figure out why it fails in CI
# - tests::neon_integrations::bitcoin_reorg_flap
steps:
Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion stacks-signer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ path = "src/main.rs"

[dependencies]
backoff = "0.4"
chrono = { version = "0.4.19", features = ["serde"] }
clarity = { path = "../clarity" }
clap = { version = "4.1.1", features = ["derive", "env"] }
hashbrown = { workspace = true }
Expand Down Expand Up @@ -51,7 +52,7 @@ num-traits = "0.2.18"

[dependencies.rusqlite]
version = "=0.24.2"
features = ["blob", "serde_json", "i128_blob", "bundled", "trace"]
features = ["blob", "serde_json", "i128_blob", "bundled", "trace", "chrono"]

[dependencies.serde_json]
version = "1.0"
Expand Down
1 change: 1 addition & 0 deletions stacks-signer/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ pub(crate) mod tests {
tx_fee_ustx: config.tx_fee_ustx,
max_tx_fee_ustx: config.max_tx_fee_ustx,
db_path: config.db_path.clone(),
response_wait_timeout: config.response_wait_timeout,
}
}

Expand Down
28 changes: 28 additions & 0 deletions stacks-signer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use crate::signer::SignerSlotID;
const EVENT_TIMEOUT_MS: u64 = 5000;
// Default transaction fee to use in microstacks (if unspecificed in the config file)
const TX_FEE_USTX: u64 = 10_000;
/// Default time to wait for all listening parties' responses before resending messages in milliseconds
const RESPONSE_WAIT_TIMEOUT_MS: u64 = 100_000;

#[derive(thiserror::Error, Debug)]
/// An error occurred parsing the provided configuration
Expand Down Expand Up @@ -149,6 +151,8 @@ pub struct SignerConfig {
pub max_tx_fee_ustx: Option<u64>,
/// The path to the signer's database file
pub db_path: PathBuf,
/// The time to wait for all listening parties' responses before resending messages
pub response_wait_timeout: Duration,
}

/// The parsed configuration for the signer
Expand Down Expand Up @@ -186,6 +190,8 @@ pub struct GlobalConfig {
pub auth_password: String,
/// The path to the signer's database file
pub db_path: PathBuf,
/// The time to wait for all listening parties' responses before resending messages
pub response_wait_timeout: Duration,
}

/// Internal struct for loading up the config file
Expand Down Expand Up @@ -221,6 +227,8 @@ struct RawConfigFile {
pub auth_password: String,
/// The path to the signer's database file or :memory: for an in-memory database
pub db_path: String,
/// The time in milliseconds to wait for all listening parties responses before attempting to resend messages
pub response_wait_timeout_ms: Option<u64>,
}

impl RawConfigFile {
Expand Down Expand Up @@ -296,6 +304,11 @@ impl TryFrom<RawConfigFile> for GlobalConfig {
let dkg_private_timeout = raw_data.dkg_private_timeout_ms.map(Duration::from_millis);
let nonce_timeout = raw_data.nonce_timeout_ms.map(Duration::from_millis);
let sign_timeout = raw_data.sign_timeout_ms.map(Duration::from_millis);
let response_wait_timeout = Duration::from_millis(
raw_data
.response_wait_timeout_ms
.unwrap_or(RESPONSE_WAIT_TIMEOUT_MS),
);
let db_path = raw_data.db_path.into();

Ok(Self {
Expand All @@ -315,6 +328,7 @@ impl TryFrom<RawConfigFile> for GlobalConfig {
max_tx_fee_ustx: raw_data.max_tx_fee_ustx,
auth_password: raw_data.auth_password,
db_path,
response_wait_timeout,
})
}
}
Expand Down Expand Up @@ -384,6 +398,7 @@ pub fn build_signer_config_tomls(
mut port_start: usize,
max_tx_fee_ustx: Option<u64>,
tx_fee_ustx: Option<u64>,
response_wait_timeout: Option<u64>,
) -> Vec<String> {
let mut signer_config_tomls = vec![];

Expand Down Expand Up @@ -434,6 +449,14 @@ max_tx_fee_ustx = {max_tx_fee_ustx}
r#"
{signer_config_toml}
tx_fee_ustx = {tx_fee_ustx}
"#
)
}
if let Some(response_wait_timeout) = response_wait_timeout {
signer_config_toml = format!(
r#"
{signer_config_toml}
response_wait_timeout = {response_wait_timeout}
"#
)
}
Expand Down Expand Up @@ -469,6 +492,7 @@ mod tests {
3000,
None,
None,
None,
);

let config =
Expand Down Expand Up @@ -501,6 +525,7 @@ mod tests {
3000,
None,
None,
None,
);

let config =
Expand All @@ -526,6 +551,7 @@ mod tests {
3000,
max_tx_fee_ustx,
tx_fee_ustx,
None,
);

let config =
Expand All @@ -546,6 +572,7 @@ mod tests {
3000,
max_tx_fee_ustx,
None,
None,
);

let config =
Expand All @@ -570,6 +597,7 @@ mod tests {
3000,
None,
tx_fee_ustx,
None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can you add a test here to ensure that the duration config field is set correctly when deserializing from a string?

);

let config =
Expand Down
1 change: 1 addition & 0 deletions stacks-signer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ fn handle_generate_files(args: GenerateFilesArgs) {
3000,
None,
None,
None,
);
debug!("Built {:?} signer config tomls.", signer_config_tomls.len());
for (i, file_contents) in signer_config_tomls.iter().enumerate() {
Expand Down
2 changes: 2 additions & 0 deletions stacks-signer/src/runloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ impl RunLoop {
tx_fee_ustx: self.config.tx_fee_ustx,
max_tx_fee_ustx: self.config.max_tx_fee_ustx,
db_path: self.config.db_path.clone(),
response_wait_timeout: self.config.response_wait_timeout,
})
}

Expand Down Expand Up @@ -417,6 +418,7 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
signer.commands.push_back(command.command);
}
}
signer.resend_outbound_messages(self.config.response_wait_timeout);
// After processing event, run the next command for each signer
signer.process_next_command(&self.stacks_client, current_reward_cycle);
}
Expand Down
92 changes: 87 additions & 5 deletions stacks-signer/src/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::mpsc::Sender;
use std::time::Instant;
use std::time::{Duration, Instant};

use blockstack_lib::chainstate::burn::ConsensusHashExtensions;
use blockstack_lib::chainstate::nakamoto::signer_set::NakamotoSigners;
Expand Down Expand Up @@ -47,7 +47,7 @@ use wsts::state_machine::coordinator::fire::Coordinator as FireCoordinator;
use wsts::state_machine::coordinator::{
Config as CoordinatorConfig, Coordinator, State as CoordinatorState,
};
use wsts::state_machine::signer::Signer as SignerStateMachine;
use wsts::state_machine::signer::{Signer as SignerStateMachine, State as SignerState};
use wsts::state_machine::{OperationResult, SignError};
use wsts::traits::Signer as _;
use wsts::v2;
Expand Down Expand Up @@ -409,6 +409,39 @@ impl Signer {
self.coordinator_selector.last_message_time = Some(Instant::now());
}

/// Resend outbound messages if no state change has occurred within the configured timeout
pub fn resend_outbound_messages(&mut self, response_wait_timeout: Duration) {
// We do not need to resend outbound messages if we are in the Idle state.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be the case then that self.signer_db.outbound_messages_lookup(self.reward_cycle) would return None if the state were Idle?

if self.state == State::Idle {
return;
}
match self.signer_db.outbound_messages_lookup(self.reward_cycle) {
Ok(Some(info)) => {
if info.insertion_time < chrono::Utc::now() - response_wait_timeout
jcnelson marked this conversation as resolved.
Show resolved Hide resolved
&& info.coordinator_state
== coordinator_state_to_string(&self.coordinator.state)
&& info.signer_state == signer_state_to_string(&self.state_machine.state)
{
info!("{self}: timed out waiting for responses. Resending {:?} outbound messages", info.outbound_messages.len();
"coordinator_state" => info.coordinator_state,
"signer_state" => info.signer_state
);
self.send_outbound_messages(info.outbound_messages);
if let Err(e) = self
.signer_db
.update_outbound_messages_time(self.reward_cycle)
{
error!("{self}: Failed to update outbound messages in DB: {e:?}");
}
}
}
Ok(None) => {}
Err(e) => {
error!("{self}: failed to read outbound messages from signer DB. {e:?}");
}
}
}

/// Execute the given command and update state accordingly
fn execute_command(&mut self, stacks_client: &StacksClient, command: &Command) {
match command {
Expand Down Expand Up @@ -738,8 +771,26 @@ impl Signer {
self.save_signer_state()
.unwrap_or_else(|_| panic!("{self}: Failed to save signer state"));
}
self.send_outbound_messages(signer_outbound_messages);
self.send_outbound_messages(coordinator_outbound_messages);
let mut outbound_messages = Vec::with_capacity(
signer_outbound_messages.len() + coordinator_outbound_messages.len(),
);
outbound_messages.extend(signer_outbound_messages);
outbound_messages.extend(coordinator_outbound_messages);
self.send_outbound_messages(outbound_messages.clone());
if !outbound_messages.is_empty() || self.state == State::Idle {
debug!(
"{self}: Saving outbound {:?} message(s) to SignerDB.",
outbound_messages.len()
);
if let Err(e) = self.signer_db.insert_outbound_messages(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding this correctly, couldn't this overwrite existing messages that we would've wanted to re-send after a timeout?

For example:

  • Signer sends outbound A at X
  • Signer sends outbound B at Y
  • At Y + timeout, state hasn't changed since A, but we only send messages B

If I'm understanding that correctly, then maybe a better implementation would be to insert these messages with the existing outbound messages, but only if the signer's state hasn't changed

self.reward_cycle,
&self.coordinator.state,
&self.state_machine.state,
&outbound_messages,
) {
error!("{self}: Failed to save outbound message info. {e:?}");
}
}
}

/// Validate a signature share request, updating its message where appropriate.
Expand Down Expand Up @@ -1374,7 +1425,7 @@ impl Signer {
return Ok(());
}
// Check stackerdb for any missed DKG messages to catch up our state.
self.read_dkg_stackerdb_messages(&stacks_client, res, current_reward_cycle)?;
self.read_dkg_stackerdb_messages(stacks_client, res, current_reward_cycle)?;
// Check if we should still queue DKG
if !self.should_queue_dkg(stacks_client)? {
return Ok(());
Expand Down Expand Up @@ -1605,6 +1656,37 @@ impl Signer {
}
}

/// Convert the signer state to a string
pub fn signer_state_to_string(state: &SignerState) -> String {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can these methods here instead be part of a Display impl for SignerState and CoordinatorState?

match state {
SignerState::Idle => "Idle",
SignerState::DkgPublicDistribute => "DkgPublicDistribute",
SignerState::DkgPublicGather => "DkgPublicGather",
SignerState::DkgPrivateDistribute => "DkgPrivateDistribute",
SignerState::DkgPrivateGather => "DkgPrivateGather",
SignerState::SignGather => "SignGather",
}
.to_string()
}

/// Convert the coordinator state to a string
pub fn coordinator_state_to_string(state: &CoordinatorState) -> String {
match state {
CoordinatorState::Idle => "Idle",
CoordinatorState::DkgPublicDistribute => "DkgPublicDistribute",
CoordinatorState::DkgPublicGather => "DkgPublicGather",
CoordinatorState::DkgPrivateDistribute => "DkgPrivateGather",
CoordinatorState::DkgPrivateGather => "DkgPrivateGather",
CoordinatorState::DkgEndDistribute => "DkgEndDistribute",
CoordinatorState::DkgEndGather => "DkgEndGather",
CoordinatorState::NonceRequest(_, _) => "NonceRequest",
CoordinatorState::NonceGather(_, _) => "NonceGather",
CoordinatorState::SigShareRequest(_, _) => "SigShareRequest",
CoordinatorState::SigShareGather(_, _) => "SigShareGather",
}
.to_string()
}

fn load_encrypted_signer_state<S: SignerStateStorage>(
storage: S,
id: S::IdType,
Expand Down