diff --git a/.changelog/unreleased/improvements/814-clonable-secret-conn.md b/.changelog/unreleased/improvements/814-clonable-secret-conn.md new file mode 100644 index 000000000..6e7fa027b --- /dev/null +++ b/.changelog/unreleased/improvements/814-clonable-secret-conn.md @@ -0,0 +1,4 @@ +- `[tendermint-p2p]` The `SecretConnection` can now be split into two halves to + facilitate full-duplex communication (must be facilitated by using each half + in a separate thread). + ([#938](https://github.com/informalsystems/tendermint-rs/pull/938)) diff --git a/Cargo.toml b/Cargo.toml index 712da3286..4a3a782c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "pbt-gen", "proto", "rpc", + "std-ext", "tendermint", "test", "testgen" diff --git a/light-client/src/components/verifier.rs b/light-client/src/components/verifier.rs index 2416ca99c..57a6e40cf 100644 --- a/light-client/src/components/verifier.rs +++ b/light-client/src/components/verifier.rs @@ -119,8 +119,8 @@ impl Verifier for ProdVerifier { &*self.voting_power_calculator, &*self.commit_validator, &*self.hasher, - &trusted, - &untrusted, + trusted, + untrusted, options, now, ) diff --git a/light-client/src/macros.rs b/light-client/src/macros.rs index a5d5aa94f..284ebe651 100644 --- a/light-client/src/macros.rs +++ b/light-client/src/macros.rs @@ -4,7 +4,7 @@ #[macro_export] macro_rules! bail { ($kind:expr) => { - return Err($kind.into()); + return Err($kind.into()) }; } diff --git a/light-client/src/peer_list.rs b/light-client/src/peer_list.rs index d4e567414..a0e408e30 100644 --- a/light-client/src/peer_list.rs +++ b/light-client/src/peer_list.rs @@ -111,7 +111,7 @@ impl PeerList { /// - The given peer id must not be the primary peer id. /// - The given peer must be in the witness list #[pre(faulty_witness != self.primary && self.witnesses.contains(&faulty_witness))] - #[post(Self::invariant(&self))] + #[post(Self::invariant(self))] pub fn replace_faulty_witness(&mut self, faulty_witness: PeerId) -> Option { let mut result = None; @@ -133,7 +133,7 @@ impl PeerList { /// /// ## Errors /// - If there are no witness left, returns `ErrorKind::NoWitnessLeft`. - #[post(ret.is_ok() ==> Self::invariant(&self))] + #[post(ret.is_ok() ==> Self::invariant(self))] pub fn replace_faulty_primary( &mut self, primary_error: Option, diff --git a/light-client/src/predicates.rs b/light-client/src/predicates.rs index a73afa6a9..3a0ade065 100644 --- a/light-client/src/predicates.rs +++ b/light-client/src/predicates.rs @@ -233,10 +233,10 @@ pub fn verify( vp.is_header_from_past(&untrusted.signed_header.header, options.clock_drift, now)?; // Ensure the header validator hashes match the given validators - vp.validator_sets_match(&untrusted, &*hasher)?; + vp.validator_sets_match(untrusted, &*hasher)?; // Ensure the header next validator hashes match the given next validators - vp.next_validators_match(&untrusted, &*hasher)?; + vp.next_validators_match(untrusted, &*hasher)?; // Ensure the header matches the commit vp.header_matches_commit(&untrusted.signed_header, hasher)?; @@ -259,7 +259,7 @@ pub fn verify( if untrusted.height() == trusted_next_height { // If the untrusted block is the very next block after the trusted block, // check that their (next) validator sets hashes match. - vp.valid_next_validator_set(&untrusted, trusted)?; + vp.valid_next_validator_set(untrusted, trusted)?; } else { // Otherwise, ensure that the untrusted block has a greater height than // the trusted block. diff --git a/light-client/src/store/memory.rs b/light-client/src/store/memory.rs index 6af7389c5..300adb2ea 100644 --- a/light-client/src/store/memory.rs +++ b/light-client/src/store/memory.rs @@ -81,6 +81,7 @@ impl LightStore for MemoryStore { .map(|(_, e)| e.light_block.clone()) } + #[allow(clippy::needless_collect)] fn all(&self, status: Status) -> Box> { let light_blocks: Vec<_> = self .store diff --git a/light-client/src/supervisor.rs b/light-client/src/supervisor.rs index 4176070c4..8047f8fc5 100644 --- a/light-client/src/supervisor.rs +++ b/light-client/src/supervisor.rs @@ -319,7 +319,7 @@ impl Supervisor { .collect(); self.fork_detector - .detect_forks(verified_block, &trusted_block, witnesses) + .detect_forks(verified_block, trusted_block, witnesses) } /// Run the supervisor event loop in the same thread. diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 09dea5d5d..d5d9e31fa 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -50,6 +50,7 @@ flex-error = { version = "0.4.1", default-features = false } # path dependencies tendermint = { path = "../tendermint", version = "0.21.0" } tendermint-proto = { path = "../proto", version = "0.21.0" } +tendermint-std-ext = { path = "../std-ext", version = "0.21.0" } # optional dependencies prost-amino = { version = "0.6", optional = true } diff --git a/p2p/src/error.rs b/p2p/src/error.rs index 38ac222cc..3f609f524 100644 --- a/p2p/src/error.rs +++ b/p2p/src/error.rs @@ -63,5 +63,15 @@ define_error! { SmallOutputBuffer | _ | { "output buffer is too small" }, + TransportClone + { detail: String } + | e | { format_args!("failed to clone underlying transport: {}", e.detail) } + + } +} + +impl From for Error { + fn from(e: std::io::Error) -> Self { + Self::io(e) } } diff --git a/p2p/src/secret_connection.rs b/p2p/src/secret_connection.rs index a02e4f24f..18581a1b6 100644 --- a/p2p/src/secret_connection.rs +++ b/p2p/src/secret_connection.rs @@ -1,12 +1,12 @@ //! `SecretConnection`: Transport layer encryption for Tendermint P2P connections. -use std::{ - cmp, - convert::{TryFrom, TryInto}, - io::{self, Read, Write}, - marker::{Send, Sync}, - slice, -}; +use std::cmp; +use std::convert::{TryFrom, TryInto}; +use std::io::{self, Read, Write}; +use std::marker::{Send, Sync}; +use std::slice; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use crate::error::Error; use chacha20poly1305::{ @@ -20,6 +20,7 @@ use subtle::ConstantTimeEq; use x25519_dalek::{EphemeralSecret, PublicKey as EphemeralPublic}; use tendermint_proto as proto; +use tendermint_std_ext::TryClone; pub use self::{ kdf::Kdf, @@ -205,16 +206,59 @@ impl Handshake { } } +// Macro usage allows us to avoid unnecessarily cloning the Arc +// that indicates whether we need to terminate the connection. +// +// Limitation: this only checks once prior to the execution of an I/O operation +// whether we need to terminate. This should be sufficient for our purposes +// though. +macro_rules! checked_io { + ($term:expr, $f:expr) => {{ + if $term.load(Ordering::SeqCst) { + return Err(io::Error::new( + io::ErrorKind::Other, + "secret connection was terminated elsewhere by previous error", + )); + } + let result = { $f }; + if result.is_err() { + $term.store(true, Ordering::SeqCst); + } + result + }}; +} + /// Encrypted connection between peers in a Tendermint network. -pub struct SecretConnection { +/// +/// ## Connection integrity and failures +/// +/// Due to the underlying encryption mechanism (currently [RFC 8439]), when a +/// read or write failure occurs, it is necessary to disconnect from the remote +/// peer and attempt to reconnect. +/// +/// ## Half- and full-duplex connections +/// By default, a `SecretConnection` facilitates half-duplex operations (i.e. +/// one can either read from the connection or write to it at a given time, but +/// not both simultaneously). +/// +/// If, however, the underlying I/O handler class implements +/// [`tendermint_std_ext::TryClone`], then you can use +/// [`SecretConnection::split`] to split the `SecretConnection` into its +/// sending and receiving halves. Each of these halves can then be used in a +/// separate thread to facilitate full-duplex communication. +/// +/// ## Contracts +/// +/// When reading data, data smaller than [`DATA_MAX_SIZE`] is read atomically. +/// +/// [RFC 8439]: https://www.rfc-editor.org/rfc/rfc8439.html +pub struct SecretConnection { io_handler: IoHandler, protocol_version: Version, - recv_nonce: Nonce, - send_nonce: Nonce, - recv_cipher: ChaCha20Poly1305, - send_cipher: ChaCha20Poly1305, remote_pubkey: Option, - recv_buffer: Vec, + send_state: SendState, + recv_state: ReceiveState, + terminate: Arc, } impl SecretConnection { @@ -249,12 +293,17 @@ impl SecretConnection { let mut sc = Self { io_handler, protocol_version, - recv_buffer: vec![], - recv_nonce: Nonce::default(), - send_nonce: Nonce::default(), - recv_cipher: h.state.recv_cipher.clone(), - send_cipher: h.state.send_cipher.clone(), remote_pubkey: None, + send_state: SendState { + cipher: h.state.send_cipher.clone(), + nonce: Nonce::default(), + }, + recv_state: ReceiveState { + cipher: h.state.recv_cipher.clone(), + nonce: Nonce::default(), + buffer: vec![], + }, + terminate: Arc::new(AtomicBool::new(false)), }; // Share each other's pubkey & challenge signature. @@ -272,165 +321,129 @@ impl SecretConnection { sc.remote_pubkey = Some(remote_pubkey); Ok(sc) } - - /// Encrypt AEAD authenticated data - #[allow(clippy::cast_possible_truncation)] - fn encrypt( - &self, - chunk: &[u8], - sealed_frame: &mut [u8; TAG_SIZE + TOTAL_FRAME_SIZE], - ) -> Result<(), Error> { - debug_assert!(!chunk.is_empty(), "chunk is empty"); - debug_assert!( - chunk.len() <= TOTAL_FRAME_SIZE - DATA_LEN_SIZE, - "chunk is too big: {}! max: {}", - chunk.len(), - DATA_MAX_SIZE, - ); - sealed_frame[..DATA_LEN_SIZE].copy_from_slice(&(chunk.len() as u32).to_le_bytes()); - sealed_frame[DATA_LEN_SIZE..DATA_LEN_SIZE + chunk.len()].copy_from_slice(chunk); - - let tag = self - .send_cipher - .encrypt_in_place_detached( - GenericArray::from_slice(self.send_nonce.to_bytes()), - b"", - &mut sealed_frame[..TOTAL_FRAME_SIZE], - ) - .map_err(Error::aead)?; - - sealed_frame[TOTAL_FRAME_SIZE..].copy_from_slice(tag.as_slice()); - - Ok(()) - } - - /// Decrypt AEAD authenticated data - fn decrypt(&self, ciphertext: &[u8], out: &mut [u8]) -> Result { - if ciphertext.len() < TAG_SIZE { - return Err(Error::short_ciphertext(TAG_SIZE)); - } - - // Split ChaCha20 ciphertext from the Poly1305 tag - let (ct, tag) = ciphertext.split_at(ciphertext.len() - TAG_SIZE); - - if out.len() < ct.len() { - return Err(Error::small_output_buffer()); - } - - let in_out = &mut out[..ct.len()]; - in_out.copy_from_slice(ct); - - self.recv_cipher - .decrypt_in_place_detached( - GenericArray::from_slice(self.recv_nonce.to_bytes()), - b"", - in_out, - tag.into(), - ) - .map_err(Error::aead)?; - - Ok(in_out.len()) - } } -impl Read for SecretConnection +impl SecretConnection where - IoHandler: Read + Write + Send + Sync, + IoHandler: TryClone, + ::Error: std::error::Error + Send + Sync + 'static, { - // CONTRACT: data smaller than DATA_MAX_SIZE is read atomically. - fn read(&mut self, data: &mut [u8]) -> io::Result { - if !self.recv_buffer.is_empty() { - let n = cmp::min(data.len(), self.recv_buffer.len()); - data.copy_from_slice(&self.recv_buffer[..n]); - let mut leftover_portion = vec![ - 0; - self.recv_buffer - .len() - .checked_sub(n) - .expect("leftover calculation failed") - ]; - leftover_portion.clone_from_slice(&self.recv_buffer[n..]); - self.recv_buffer = leftover_portion; - - return Ok(n); - } + /// For secret connections whose underlying I/O layer implements + /// [`tendermint_std_ext::TryClone`], this attempts to split such a + /// connection into its sending and receiving halves. + /// + /// This facilitates full-duplex communications when each half is used in + /// a separate thread. + /// + /// ## Errors + /// Fails when the `try_clone` operation for the underlying I/O handler + /// fails. + pub fn split(self) -> Result<(Sender, Receiver), Error> { + let remote_pubkey = self.remote_pubkey.expect("remote_pubkey to be initialized"); + Ok(( + Sender { + io_handler: self + .io_handler + .try_clone() + .map_err(|e| Error::transport_clone(e.to_string()))?, + remote_pubkey, + state: self.send_state, + terminate: self.terminate.clone(), + }, + Receiver { + io_handler: self.io_handler, + remote_pubkey, + state: self.recv_state, + terminate: self.terminate, + }, + )) + } +} - let mut sealed_frame = [0_u8; TAG_SIZE + TOTAL_FRAME_SIZE]; - self.io_handler.read_exact(&mut sealed_frame)?; +impl Read for SecretConnection { + fn read(&mut self, data: &mut [u8]) -> io::Result { + checked_io!( + self.terminate, + read_and_decrypt(&mut self.io_handler, &mut self.recv_state, data) + ) + } +} - // decrypt the frame - let mut frame = [0_u8; TOTAL_FRAME_SIZE]; - let res = self.decrypt(&sealed_frame, &mut frame); +impl Write for SecretConnection { + fn write(&mut self, data: &[u8]) -> io::Result { + checked_io!( + self.terminate, + encrypt_and_write(&mut self.io_handler, &mut self.send_state, data) + ) + } - if let Err(err) = res { - return Err(io::Error::new(io::ErrorKind::Other, err.to_string())); - } + fn flush(&mut self) -> io::Result<()> { + checked_io!(self.terminate, self.io_handler.flush()) + } +} - self.recv_nonce.increment(); - // end decryption +// Sending state for a `SecretConnection`. +struct SendState { + cipher: ChaCha20Poly1305, + nonce: Nonce, +} - let chunk_length = u32::from_le_bytes(frame[..4].try_into().expect("chunk framing failed")); +// Receiving state for a `SecretConnection`. +struct ReceiveState { + cipher: ChaCha20Poly1305, + nonce: Nonce, + buffer: Vec, +} - if chunk_length as usize > DATA_MAX_SIZE { - return Err(io::Error::new( - io::ErrorKind::Other, - format!("chunk is too big: {}! max: {}", chunk_length, DATA_MAX_SIZE), - )); - } +/// The sending end of a [`SecretConnection`]. +pub struct Sender { + io_handler: IoHandler, + remote_pubkey: PublicKey, + state: SendState, + terminate: Arc, +} - let mut chunk = vec![0; chunk_length as usize]; - chunk.clone_from_slice( - &frame[DATA_LEN_SIZE - ..(DATA_LEN_SIZE - .checked_add(chunk_length as usize) - .expect("chunk size addition overflow"))], - ); +impl Sender { + /// Returns the remote pubkey. Panics if there's no key. + pub const fn remote_pubkey(&self) -> PublicKey { + self.remote_pubkey + } +} - let n = cmp::min(data.len(), chunk.len()); - data[..n].copy_from_slice(&chunk[..n]); - self.recv_buffer.copy_from_slice(&chunk[n..]); +impl Write for Sender { + fn write(&mut self, buf: &[u8]) -> io::Result { + checked_io!( + self.terminate, + encrypt_and_write(&mut self.io_handler, &mut self.state, buf) + ) + } - Ok(n) + fn flush(&mut self) -> io::Result<()> { + checked_io!(self.terminate, self.io_handler.flush()) } } -impl Write for SecretConnection -where - IoHandler: Read + Write + Send + Sync, -{ - // Writes encrypted frames of `TAG_SIZE` + `TOTAL_FRAME_SIZE` - // CONTRACT: data smaller than DATA_MAX_SIZE is read atomically. - fn write(&mut self, data: &[u8]) -> io::Result { - let mut n = 0_usize; - let mut data_copy = data; - while !data_copy.is_empty() { - let chunk: &[u8]; - if DATA_MAX_SIZE < data.len() { - chunk = &data[..DATA_MAX_SIZE]; - data_copy = &data_copy[DATA_MAX_SIZE..]; - } else { - chunk = data_copy; - data_copy = &[0_u8; 0]; - } - let sealed_frame = &mut [0_u8; TAG_SIZE + TOTAL_FRAME_SIZE]; - let res = self.encrypt(chunk, sealed_frame); - if let Err(err) = res { - return Err(io::Error::new(io::ErrorKind::Other, err.to_string())); - } - self.send_nonce.increment(); - // end encryption - - self.io_handler.write_all(&sealed_frame[..])?; - n = n - .checked_add(chunk.len()) - .expect("overflow when adding chunk lenghts"); - } +/// The receiving end of a [`SecretConnection`]. +pub struct Receiver { + io_handler: IoHandler, + remote_pubkey: PublicKey, + state: ReceiveState, + terminate: Arc, +} - Ok(n) +impl Receiver { + /// Returns the remote pubkey. Panics if there's no key. + pub const fn remote_pubkey(&self) -> PublicKey { + self.remote_pubkey } +} - fn flush(&mut self) -> io::Result<()> { - self.io_handler.flush() +impl Read for Receiver { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + checked_io!( + self.terminate, + read_and_decrypt(&mut self.io_handler, &mut self.state, buf) + ) } } @@ -444,17 +457,13 @@ fn share_eph_pubkey( // TODO(ismail): on the go side this is done in parallel, here we do send and receive after // each other. thread::spawn would require a static lifetime. // Should still work though. - handler - .write_all(&protocol_version.encode_initial_handshake(local_eph_pubkey)) - .map_err(Error::io)?; + handler.write_all(&protocol_version.encode_initial_handshake(local_eph_pubkey))?; let mut response_len = 0_u8; - handler - .read_exact(slice::from_mut(&mut response_len)) - .map_err(Error::io)?; + handler.read_exact(slice::from_mut(&mut response_len))?; let mut buf = vec![0; response_len as usize]; - handler.read_exact(&mut buf).map_err(Error::io)?; + handler.read_exact(&mut buf)?; protocol_version.decode_initial_handshake(&buf) } @@ -477,11 +486,10 @@ fn share_auth_signature( .protocol_version .encode_auth_signature(pubkey, local_signature); - sc.write_all(&buf).map_err(Error::io)?; + sc.write_all(&buf)?; let mut buf = vec![0; sc.protocol_version.auth_sig_msg_response_len()]; - sc.read_exact(&mut buf).map_err(Error::io)?; - + sc.read_exact(&mut buf)?; sc.protocol_version.decode_auth_signature(&buf) } @@ -494,3 +502,164 @@ pub fn sort32(first: [u8; 32], second: [u8; 32]) -> ([u8; 32], [u8; 32]) { (second, first) } } + +/// Encrypt AEAD authenticated data +#[allow(clippy::cast_possible_truncation)] +fn encrypt( + chunk: &[u8], + send_cipher: &ChaCha20Poly1305, + send_nonce: &Nonce, + sealed_frame: &mut [u8; TAG_SIZE + TOTAL_FRAME_SIZE], +) -> Result<(), Error> { + assert!(!chunk.is_empty(), "chunk is empty"); + assert!( + chunk.len() <= TOTAL_FRAME_SIZE - DATA_LEN_SIZE, + "chunk is too big: {}! max: {}", + chunk.len(), + DATA_MAX_SIZE, + ); + sealed_frame[..DATA_LEN_SIZE].copy_from_slice(&(chunk.len() as u32).to_le_bytes()); + sealed_frame[DATA_LEN_SIZE..DATA_LEN_SIZE + chunk.len()].copy_from_slice(chunk); + + let tag = send_cipher + .encrypt_in_place_detached( + GenericArray::from_slice(send_nonce.to_bytes()), + b"", + &mut sealed_frame[..TOTAL_FRAME_SIZE], + ) + .map_err(Error::aead)?; + + sealed_frame[TOTAL_FRAME_SIZE..].copy_from_slice(tag.as_slice()); + + Ok(()) +} + +// Writes encrypted frames of `TAG_SIZE` + `TOTAL_FRAME_SIZE` +fn encrypt_and_write( + io_handler: &mut IoHandler, + send_state: &mut SendState, + data: &[u8], +) -> io::Result { + let mut n = 0_usize; + let mut data_copy = data; + while !data_copy.is_empty() { + let chunk: &[u8]; + if DATA_MAX_SIZE < data.len() { + chunk = &data[..DATA_MAX_SIZE]; + data_copy = &data_copy[DATA_MAX_SIZE..]; + } else { + chunk = data_copy; + data_copy = &[0_u8; 0]; + } + let sealed_frame = &mut [0_u8; TAG_SIZE + TOTAL_FRAME_SIZE]; + encrypt(chunk, &send_state.cipher, &send_state.nonce, sealed_frame) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + send_state.nonce.increment(); + // end encryption + + io_handler.write_all(&sealed_frame[..])?; + n = n + .checked_add(chunk.len()) + .expect("overflow when adding chunk lengths"); + } + + Ok(n) +} + +/// Decrypt AEAD authenticated data +fn decrypt( + ciphertext: &[u8], + recv_cipher: &ChaCha20Poly1305, + recv_nonce: &Nonce, + out: &mut [u8], +) -> Result { + if ciphertext.len() < TAG_SIZE { + return Err(Error::short_ciphertext(TAG_SIZE)); + } + + // Split ChaCha20 ciphertext from the Poly1305 tag + let (ct, tag) = ciphertext.split_at(ciphertext.len() - TAG_SIZE); + + if out.len() < ct.len() { + return Err(Error::small_output_buffer()); + } + + let in_out = &mut out[..ct.len()]; + in_out.copy_from_slice(ct); + + recv_cipher + .decrypt_in_place_detached( + GenericArray::from_slice(recv_nonce.to_bytes()), + b"", + in_out, + tag.into(), + ) + .map_err(Error::aead)?; + + Ok(in_out.len()) +} + +fn read_and_decrypt( + io_handler: &mut IoHandler, + recv_state: &mut ReceiveState, + data: &mut [u8], +) -> io::Result { + if !recv_state.buffer.is_empty() { + let n = cmp::min(data.len(), recv_state.buffer.len()); + data.copy_from_slice(&recv_state.buffer[..n]); + let mut leftover_portion = vec![ + 0; + recv_state + .buffer + .len() + .checked_sub(n) + .expect("leftover calculation failed") + ]; + leftover_portion.clone_from_slice(&recv_state.buffer[n..]); + recv_state.buffer = leftover_portion; + + return Ok(n); + } + + let mut sealed_frame = [0_u8; TAG_SIZE + TOTAL_FRAME_SIZE]; + io_handler.read_exact(&mut sealed_frame)?; + + // decrypt the frame + let mut frame = [0_u8; TOTAL_FRAME_SIZE]; + let res = decrypt( + &sealed_frame, + &recv_state.cipher, + &recv_state.nonce, + &mut frame, + ); + + if let Err(err) = res { + return Err(io::Error::new(io::ErrorKind::Other, err.to_string())); + } + + recv_state.nonce.increment(); + // end decryption + + let chunk_length = u32::from_le_bytes(frame[..4].try_into().expect("chunk framing failed")); + + if chunk_length as usize > DATA_MAX_SIZE { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("chunk is too big: {}! max: {}", chunk_length, DATA_MAX_SIZE), + )); + } + + let mut chunk = vec![0; chunk_length as usize]; + chunk.clone_from_slice( + &frame[DATA_LEN_SIZE + ..(DATA_LEN_SIZE + .checked_add(chunk_length as usize) + .expect("chunk size addition overflow"))], + ); + + let n = cmp::min(data.len(), chunk.len()); + data[..n].copy_from_slice(&chunk[..n]); + recv_state.buffer.copy_from_slice(&chunk[n..]); + + Ok(n) +} diff --git a/p2p/src/transport.rs b/p2p/src/transport.rs index 9f731ae9b..fc35f3a37 100644 --- a/p2p/src/transport.rs +++ b/p2p/src/transport.rs @@ -16,7 +16,7 @@ where /// List of addresses to be communicated as publicly reachable to other nodes, which in turn /// can use that to share with third parties. /// - /// TODO(xla): Dependning on where this information is going to be disseminated it might be + /// TODO(xla): Depending on where this information is going to be disseminated it might be /// better placed in a higher-level protocol. What stands in opposition to that is the fact /// that advertised addresses will be helpful for hole punching and other involved network /// traversals. @@ -140,6 +140,6 @@ where /// /// # Errors /// - /// * If resource allocation fails for lack of priviliges or being not available. + /// * If resource allocation fails for lack of privileges or being not available. fn bind(self, bind_info: BindInfo) -> Result<(Self::Endpoint, Self::Incoming)>; } diff --git a/release.sh b/release.sh index 3fd1bf424..828815aab 100755 --- a/release.sh +++ b/release.sh @@ -36,7 +36,7 @@ set -e # A space-separated list of all the crates we want to publish, in the order in # which they must be published. It's important to respect this order, since # each subsequent crate depends on one or more of the preceding ones. -DEFAULT_CRATES="tendermint-proto tendermint tendermint-abci tendermint-rpc tendermint-p2p tendermint-light-client tendermint-light-client-js tendermint-testgen" +DEFAULT_CRATES="tendermint-proto tendermint-std-ext tendermint tendermint-abci tendermint-rpc tendermint-p2p tendermint-light-client tendermint-light-client-js tendermint-testgen" # Allows us to override the crates we want to publish. CRATES=${*:-${DEFAULT_CRATES}} diff --git a/std-ext/Cargo.toml b/std-ext/Cargo.toml new file mode 100644 index 000000000..979b750ac --- /dev/null +++ b/std-ext/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "tendermint-std-ext" +version = "0.21.0" +edition = "2018" +license = "Apache-2.0" +homepage = "https://www.tendermint.com/" +repository = "https://github.com/informalsystems/tendermint-rs" +readme = "README.md" +keywords = ["blockchain", "cosmos", "tendermint"] +categories = ["development-tools"] +authors = ["Informal Systems "] + +description = """ + tendermint-std-ext contains extensions to the Rust standard library for use + from tendermint-rs. + """ + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/std-ext/README.md b/std-ext/README.md new file mode 100644 index 000000000..145ecd2b8 --- /dev/null +++ b/std-ext/README.md @@ -0,0 +1,24 @@ +[![Crate][crate-image]][crate-link] +[![Docs][docs-image]][docs-link] + +See the [repo root] for build status, license, rust version, etc. + +# tendermint-std-ext + +Extensions to the [Rust standard library][std] for use by Tendermint in Rust. + +## Documentation + +See documentation on [crates.io][docs-link]. + +[//]: # (badges) + +[crate-image]: https://img.shields.io/crates/v/tendermint-std-ext.svg +[crate-link]: https://crates.io/crates/tendermint-std-ext +[docs-image]: https://docs.rs/tendermint-std-ext/badge.svg +[docs-link]: https://docs.rs/tendermint-std-ext/ + +[//]: # (general links) + +[repo root]: https://github.com/informalsystems/tendermint-rs +[std]: https://doc.rust-lang.org/std/ diff --git a/std-ext/src/lib.rs b/std-ext/src/lib.rs new file mode 100644 index 000000000..6fb7389c2 --- /dev/null +++ b/std-ext/src/lib.rs @@ -0,0 +1,8 @@ +//! Extensions to the [Rust standard library][std] for use by [tendermint-rs]. +//! +//! [std]: https://doc.rust-lang.org/std/ +//! [tendermint-rs]: https://github.com/informalsystems/tendermint-rs/ + +mod try_clone; + +pub use try_clone::TryClone; diff --git a/std-ext/src/try_clone.rs b/std-ext/src/try_clone.rs new file mode 100644 index 000000000..2ea599a76 --- /dev/null +++ b/std-ext/src/try_clone.rs @@ -0,0 +1,27 @@ +//! Rust standard library types that can be fallibly cloned. + +use std::net::TcpStream; + +/// Types that can be cloned where success is not guaranteed can implement this +/// trait. +pub trait TryClone: Sized { + /// The type of error that can be returned when an attempted clone + /// operation fails. + type Error: std::error::Error; + + /// Attempt to clone this instance. + /// + /// # Errors + /// Can fail if the underlying instance cannot be cloned (e.g. the OS could + /// be out of file descriptors, or some low-level OS-specific error could + /// be produced). + fn try_clone(&self) -> Result; +} + +impl TryClone for TcpStream { + type Error = std::io::Error; + + fn try_clone(&self) -> Result { + TcpStream::try_clone(self) + } +} diff --git a/tendermint/src/abci/tag.rs b/tendermint/src/abci/tag.rs index 657a19eec..e610c6d34 100644 --- a/tendermint/src/abci/tag.rs +++ b/tendermint/src/abci/tag.rs @@ -82,7 +82,7 @@ mod test { #[test] fn tag_serde() { let json = r#"{"key": "cGFja2V0X3RpbWVvdXRfaGVpZ2h0", "value": "MC00ODQw"}"#; - let tag: Tag = serde_json::from_str(&json).unwrap(); + let tag: Tag = serde_json::from_str(json).unwrap(); assert_eq!("packet_timeout_height", tag.key.0); assert_eq!("0-4840", tag.value.0); } diff --git a/tendermint/src/chain/id.rs b/tendermint/src/chain/id.rs index b6a75e465..312a7d89f 100644 --- a/tendermint/src/chain/id.rs +++ b/tendermint/src/chain/id.rs @@ -55,7 +55,7 @@ impl Id { /// Get the chain ID as a raw bytes. pub fn as_bytes(&self) -> &[u8] { - &self.0.as_str().as_bytes() + self.0.as_str().as_bytes() } } diff --git a/test/Cargo.toml b/test/Cargo.toml index d9d2dad9a..f9bbaa5a8 100644 --- a/test/Cargo.toml +++ b/test/Cargo.toml @@ -15,7 +15,7 @@ test = true [dev-dependencies] ed25519-dalek = "1" -eyre = "0.6" +flex-error = "0.4.2" flume = "0.10" rand_core = { version = "0.5", features = ["std"] } readwrite = "^0.1.1" diff --git a/test/src/test/unit/p2p/secret_connection.rs b/test/src/test/unit/p2p/secret_connection.rs index 505be5f84..797a29d60 100644 --- a/test/src/test/unit/p2p/secret_connection.rs +++ b/test/src/test/unit/p2p/secret_connection.rs @@ -1,5 +1,6 @@ use std::io::Read as _; use std::io::Write as _; +use std::net::{TcpListener, TcpStream}; use std::thread; use ed25519_dalek::{self as ed25519}; @@ -19,16 +20,12 @@ fn test_handshake() { let (pipe1, pipe2) = pipe::async_bipipe_buffered(); let peer1 = thread::spawn(|| { - let mut csprng = OsRng {}; - let privkey1: ed25519::Keypair = ed25519::Keypair::generate(&mut csprng); - let conn1 = SecretConnection::new(pipe2, privkey1, Version::V0_34); + let conn1 = new_peer_conn(pipe2); assert!(conn1.is_ok()); }); let peer2 = thread::spawn(|| { - let mut csprng = OsRng {}; - let privkey2: ed25519::Keypair = ed25519::Keypair::generate(&mut csprng); - let conn2 = SecretConnection::new(pipe1, privkey2, Version::V0_34); + let conn2 = new_peer_conn(pipe1); assert!(conn2.is_ok()); }); @@ -43,10 +40,7 @@ fn test_read_write_single_message() { let (pipe1, pipe2) = pipe::async_bipipe_buffered(); let sender = thread::spawn(move || { - let mut csprng = OsRng {}; - let privkey1: ed25519::Keypair = ed25519::Keypair::generate(&mut csprng); - let mut conn1 = - SecretConnection::new(pipe2, privkey1, Version::V0_34).expect("handshake to succeed"); + let mut conn1 = new_peer_conn(pipe2).expect("handshake to succeed"); conn1 .write_all(MESSAGE.as_bytes()) @@ -54,10 +48,7 @@ fn test_read_write_single_message() { }); let receiver = thread::spawn(move || { - let mut csprng = OsRng {}; - let privkey2: ed25519::Keypair = ed25519::Keypair::generate(&mut csprng); - let mut conn2 = - SecretConnection::new(pipe1, privkey2, Version::V0_34).expect("handshake to succeed"); + let mut conn2 = new_peer_conn(pipe1).expect("handshake to succeed"); let mut buf = [0; MESSAGE.len()]; conn2 @@ -111,3 +102,90 @@ fn test_sort() { assert_eq!(t1, *t3); assert_eq!(t2, *t4); } + +#[test] +fn test_split_secret_connection() { + const MESSAGES_1_TO_2: &[&str] = &["one", "three", "five", "seven"]; + const MESSAGES_2_TO_1: &[&str] = &["two", "four", "six", "eight"]; + let peer1_listener = TcpListener::bind("127.0.0.1:0").expect("to be able to bind to 127.0.0.1"); + let peer1_addr = peer1_listener.local_addr().unwrap(); + println!("peer1 bound to {:?}", peer1_addr); + + let peer1 = thread::spawn(move || { + let stream = peer1_listener + .incoming() + .next() + .unwrap() + .expect("an incoming TCP stream from peer 2"); + let mut conn_to_peer2 = new_peer_conn(stream).expect("handshake to succeed"); + println!("peer1 handshake concluded"); + for msg_counter in 0..MESSAGES_1_TO_2.len() { + // Peer 1 sends first + conn_to_peer2 + .write_all(MESSAGES_1_TO_2[msg_counter].as_bytes()) + .expect("to write message successfully to peer 2"); + // Peer 1 expects a response + let mut buf = [0u8; 10]; + let br = conn_to_peer2 + .read(&mut buf) + .expect("to read a message from peer 2"); + let msg = String::from_utf8_lossy(&buf[0..br]).to_string(); + println!("Got message from peer2: {}", msg); + assert_eq!(msg, MESSAGES_2_TO_1[msg_counter]); + } + }); + + // Peer 2 attempts to initiate the secret connection to peer 1 + let peer2_to_peer1 = TcpStream::connect(peer1_addr).expect("to be able to connect to peer 1"); + println!("peer2 connected to peer1"); + let conn_to_peer1 = new_peer_conn(peer2_to_peer1).expect("handshake to succeed"); + println!("peer2 handshake concluded"); + + let (mut write_conn, mut read_conn) = conn_to_peer1 + .split() + .expect("to be able to clone the underlying TcpStream"); + let (write_tx, write_rx) = std::sync::mpsc::channel::(); + + // We spawn a standalone thread that makes use of peer2's secret connection + // purely to write outgoing messages. + let peer2_writer = thread::spawn(move || { + for _ in 0..MESSAGES_2_TO_1.len() { + let msg = write_rx + .recv() + .expect("to successfully receive a message to be sent to peer1"); + write_conn + .write_all(msg.as_bytes()) + .expect("to be able to write to peer 1"); + } + }); + + for msg_counter in 0..MESSAGES_2_TO_1.len() { + // Wait for peer 1 to send first + let mut buf = [0u8; 10]; + let br = read_conn + .read(&mut buf) + .expect("to receive a message from peer 1"); + let msg = String::from_utf8_lossy(&buf[0..br]).to_string(); + println!("Got message from peer1: {}", msg); + assert_eq!(msg, MESSAGES_1_TO_2[msg_counter]); + write_tx + .send(MESSAGES_2_TO_1[msg_counter].to_string()) + .expect("to be able to communicate with peer2's writer thread"); + } + + peer2_writer + .join() + .expect("peer 2's writer thread to run to completion"); + peer1.join().expect("peer 1's thread to run to completion") +} + +fn new_peer_conn( + io_handler: IoHandler, +) -> Result, tendermint_p2p::error::Error> +where + IoHandler: std::io::Read + std::io::Write + Send + Sync, +{ + let mut csprng = OsRng {}; + let privkey1: ed25519::Keypair = ed25519::Keypair::generate(&mut csprng); + SecretConnection::new(io_handler, privkey1, Version::V0_34) +} diff --git a/testgen/src/tester.rs b/testgen/src/tester.rs index cc03d340c..0a9dc2114 100644 --- a/testgen/src/tester.rs +++ b/testgen/src/tester.rs @@ -269,7 +269,7 @@ impl Tester { T: 'static + DeserializeOwned + UnwindSafe, F: Fn(T) + UnwindSafe + RefUnwindSafe + 'static, { - let test_fn = move |_path: &str, input: &str| match parse_as::(&input) { + let test_fn = move |_path: &str, input: &str| match parse_as::(input) { Ok(test_case) => Tester::capture_test(|| { test(test_case); }), @@ -288,7 +288,7 @@ impl Tester { { let test_env = self.env().unwrap(); let output_env = self.output_env().unwrap(); - let test_fn = move |path: &str, input: &str| match parse_as::(&input) { + let test_fn = move |path: &str, input: &str| match parse_as::(input) { Ok(test_case) => Tester::capture_test(|| { // It is OK to unwrap() here: in case of unwrapping failure, the test will fail. let dir = TempDir::new().unwrap(); @@ -311,7 +311,7 @@ impl Tester { T: 'static + DeserializeOwned, F: Fn(T) -> Vec<(String, String)> + 'static, { - let batch_fn = move |_path: &str, input: &str| match parse_as::(&input) { + let batch_fn = move |_path: &str, input: &str| match parse_as::(input) { Ok(test_batch) => Some(batch(test_batch)), Err(_) => None, }; diff --git a/testgen/src/validator_set.rs b/testgen/src/validator_set.rs index 44da67af4..0acfed561 100644 --- a/testgen/src/validator_set.rs +++ b/testgen/src/validator_set.rs @@ -48,7 +48,7 @@ impl Generator for ValidatorSet { } fn generate(&self) -> Result { - let vals = generate_validators(&self.validators.as_ref().unwrap())?; + let vals = generate_validators(self.validators.as_ref().unwrap())?; Ok(validator::Set::without_proposer(vals)) } }