Skip to content

Commit

Permalink
Use flex-error for tendermint-rs errors (#923)
Browse files Browse the repository at this point in the history
* Use flex-error for tendermint

* Use flex-error for p2p

* Use flex-error for light-client

* Use flex-error for light_client::predicates

* Fix lint

* Use flex-error for builder and io errors

* Use flex-error for rpc errors

* Use flex-error for protobuf errors

* Use flex-error for abci

* Fix test_bisection_no_witness_left

* Fix build errors in all-features

* Fix failing tests

* Fix more failures

* Fix tungstenite error under wasm target

* Fix incoming_fixtures test

* Fix conflict

* Update flex-error to v0.4.0

* set std feature in flex-error instead of individual crates

* Add flex-error patch to tools/Cargo.toml

* Use published version of flex-error v0.4.1

* Enable flex-error/eyre_tracer feature by default

* Add .changelog entry (#940)

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* flex-error: resolve conflicts with `master` (#945)

* Implement full-duplex secret connection (#938)

* Implement thread-safe cloning of a secret connection

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Expand documentation for SecretConnection on threading considerations

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Extract peer construction into its own method

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Add test for cloned SecretConnection

This adds a `TcpStream`-based test for parallelizing operations on a
`SecretConnection`. I used `TcpStream` instead of the buffered reader in
the other tests because it wasn't feasible to implement the `TryClone`
trait for that buffered pipe implementation.

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Add more messages to test

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Expand comment for clarity

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Add .changelog entry

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Restore half-duplex operations

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Extract encrypt/decrypt fns as independent methods

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Remove unnecessary trait bounds

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Extract send/receive state

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Extract read/write functionality as standalone methods

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Add logic to facilitate splitting SecretConnection into its sending and receiving halves

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Restore split SecretConnection test using new semantics

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Update changelog entry

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Update docs for `SecretConnection`

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Condense error reporting

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Extract TryClone trait into its own crate

As per the discussion at
#938 (comment),
this extracts the `TryClone` trait into a new crate called
`tendermint-std-ext` in the `std-ext` directory.

This new crate is intended to contain any code that we need that extends
the Rust standard library.

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Reorder imports

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Assert validation regardless of debug build

This introduces the internal encryption assertions at runtime regardless
of build type. This may introduce a small performance hit, but it's
probably worth it to ensure correctness.

Effectively this is keeping an eye on the code in the
`encrypt_and_write` fn to ensure its correctness.

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Remove remote_pubkey optionality from sender/receiver halves

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Update SecretConnection docs with comment content

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Fix doc link to TryClone trait

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Fix doc link to TryClone trait

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Add docs on SecretConnection failures and connection integrity

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Synchronize sending/receiving failures to comply with crypto algorithm constraints

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Rename try_split method to split for SecretConnection

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Remove redundant field name prefixes

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Fix broken link in docs

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Fix recent clippy errors on `master` (#941)

* Fix needless borrows in codebase

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Ignore needless collect warning (we do actually seem to need it)

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Remove trailing semicolon in macro to fix docs compiling

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Remove unnecessary macros

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Correct error messages

Signed-off-by: Thane Thomson <connect@thanethomson.com>

Co-authored-by: Thane Thomson <thane@informal.systems>
Co-authored-by: Thane Thomson <connect@thanethomson.com>
  • Loading branch information
3 people committed Aug 7, 2021
1 parent e824bfc commit 877e408
Show file tree
Hide file tree
Showing 111 changed files with 2,321 additions and 2,068 deletions.
5 changes: 5 additions & 0 deletions .changelog/unreleased/breaking-changes/923-flex-error.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
- All crates' error handling has been refactored to make use of
[`flex-error`](https://github.com/informalsystems/flex-error/). This gives
users greater flexibility in terms of the error handling/reporting systems
they want to use and is a critical step towards `no_std` support.
([#923](https://github.com/informalsystems/tendermint-rs/pull/923))
8 changes: 6 additions & 2 deletions abci/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ path = "src/application/kvstore/main.rs"
required-features = [ "binary", "kvstore-app" ]

[features]
default = ["std", "eyre_tracer"]
eyre_tracer = ["flex-error/eyre_tracer"]
client = []
echo-app = []
kvstore-app = []
binary = [ "structopt", "tracing-subscriber" ]
std = [
"flex-error/std"
]

[dependencies]
bytes = "1.0"
eyre = "0.6"
prost = "0.7"
tendermint-proto = { version = "0.21.0", path = "../proto" }
thiserror = "1.0"
tracing = "0.1"
flex-error = { version = "0.4.1", default-features = false }

structopt = { version = "0.3", optional = true }
tracing-subscriber = { version = "0.2", optional = true }
25 changes: 10 additions & 15 deletions abci/src/application/kvstore.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! In-memory key/value store ABCI application.

use crate::codec::{encode_varint, MAX_VARINT_LENGTH};
use crate::{Application, Error, Result};
use crate::{Application, Error};
use bytes::BytesMut;
use std::collections::HashMap;
use std::sync::mpsc::{channel, Receiver, Sender};
Expand All @@ -28,7 +28,7 @@ impl KeyValueStoreApp {
}

/// Attempt to retrieve the value associated with the given key.
pub fn get<K: AsRef<str>>(&self, key: K) -> Result<(i64, Option<String>)> {
pub fn get<K: AsRef<str>>(&self, key: K) -> Result<(i64, Option<String>), Error> {
let (result_tx, result_rx) = channel();
channel_send(
&self.cmd_tx,
Expand All @@ -44,7 +44,7 @@ impl KeyValueStoreApp {
///
/// Optionally returns any pre-existing value associated with the given
/// key.
pub fn set<K, V>(&self, key: K, value: V) -> Result<Option<String>>
pub fn set<K, V>(&self, key: K, value: V) -> Result<Option<String>, Error>
where
K: AsRef<str>,
V: AsRef<str>,
Expand Down Expand Up @@ -202,12 +202,9 @@ impl KeyValueStoreDriver {
}

/// Run the driver in the current thread (blocking).
pub fn run(mut self) -> Result<()> {
pub fn run(mut self) -> Result<(), Error> {
loop {
let cmd = self
.cmd_rx
.recv()
.map_err(|e| Error::ChannelRecv(e.to_string()))?;
let cmd = self.cmd_rx.recv().map_err(Error::channel_recv)?;
match cmd {
Command::GetInfo { result_tx } => {
channel_send(&result_tx, (self.height, self.app_hash.clone()))?
Expand All @@ -232,7 +229,7 @@ impl KeyValueStoreDriver {
}
}

fn commit(&mut self, result_tx: Sender<(i64, Vec<u8>)>) -> Result<()> {
fn commit(&mut self, result_tx: Sender<(i64, Vec<u8>)>) -> Result<(), Error> {
// As in the Go-based key/value store, simply encode the number of
// items as the "app hash"
let mut app_hash = BytesMut::with_capacity(MAX_VARINT_LENGTH);
Expand Down Expand Up @@ -263,12 +260,10 @@ enum Command {
Commit { result_tx: Sender<(i64, Vec<u8>)> },
}

fn channel_send<T>(tx: &Sender<T>, value: T) -> Result<()> {
tx.send(value)
.map_err(|e| Error::ChannelSend(e.to_string()).into())
fn channel_send<T>(tx: &Sender<T>, value: T) -> Result<(), Error> {
tx.send(value).map_err(Error::send)
}

fn channel_recv<T>(rx: &Receiver<T>) -> Result<T> {
rx.recv()
.map_err(|e| Error::ChannelRecv(e.to_string()).into())
fn channel_recv<T>(rx: &Receiver<T>) -> Result<T, Error> {
rx.recv().map_err(Error::channel_recv)
}
52 changes: 27 additions & 25 deletions abci/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Blocking ABCI client.

use crate::codec::ClientCodec;
use crate::{Error, Result};
use crate::Error;
use std::net::{TcpStream, ToSocketAddrs};
use tendermint_proto::abci::{
request, response, RequestApplySnapshotChunk, RequestBeginBlock, RequestCheckTx, RequestCommit,
Expand Down Expand Up @@ -31,8 +31,8 @@ impl ClientBuilder {

/// Client constructor that attempts to connect to the given network
/// address.
pub fn connect<A: ToSocketAddrs>(self, addr: A) -> Result<Client> {
let stream = TcpStream::connect(addr)?;
pub fn connect<A: ToSocketAddrs>(self, addr: A) -> Result<Client, Error> {
let stream = TcpStream::connect(addr).map_err(Error::io)?;
Ok(Client {
codec: ClientCodec::new(stream, self.read_buf_size),
})
Expand All @@ -56,101 +56,103 @@ macro_rules! perform {
($self:expr, $type:ident, $req:expr) => {
match $self.perform(request::Value::$type($req))? {
response::Value::$type(r) => Ok(r),
r => Err(Error::UnexpectedServerResponseType(stringify!($type).to_string(), r).into()),
r => {
Err(Error::unexpected_server_response_type(stringify!($type).to_string(), r).into())
}
}
};
}

impl Client {
/// Ask the ABCI server to echo back a message.
pub fn echo(&mut self, req: RequestEcho) -> Result<ResponseEcho> {
pub fn echo(&mut self, req: RequestEcho) -> Result<ResponseEcho, Error> {
perform!(self, Echo, req)
}

/// Request information about the ABCI application.
pub fn info(&mut self, req: RequestInfo) -> Result<ResponseInfo> {
pub fn info(&mut self, req: RequestInfo) -> Result<ResponseInfo, Error> {
perform!(self, Info, req)
}

/// To be called once upon genesis.
pub fn init_chain(&mut self, req: RequestInitChain) -> Result<ResponseInitChain> {
pub fn init_chain(&mut self, req: RequestInitChain) -> Result<ResponseInitChain, Error> {
perform!(self, InitChain, req)
}

/// Query the application for data at the current or past height.
pub fn query(&mut self, req: RequestQuery) -> Result<ResponseQuery> {
pub fn query(&mut self, req: RequestQuery) -> Result<ResponseQuery, Error> {
perform!(self, Query, req)
}

/// Check the given transaction before putting it into the local mempool.
pub fn check_tx(&mut self, req: RequestCheckTx) -> Result<ResponseCheckTx> {
pub fn check_tx(&mut self, req: RequestCheckTx) -> Result<ResponseCheckTx, Error> {
perform!(self, CheckTx, req)
}

/// Signal the beginning of a new block, prior to any `DeliverTx` calls.
pub fn begin_block(&mut self, req: RequestBeginBlock) -> Result<ResponseBeginBlock> {
pub fn begin_block(&mut self, req: RequestBeginBlock) -> Result<ResponseBeginBlock, Error> {
perform!(self, BeginBlock, req)
}

/// Apply a transaction to the application's state.
pub fn deliver_tx(&mut self, req: RequestDeliverTx) -> Result<ResponseDeliverTx> {
pub fn deliver_tx(&mut self, req: RequestDeliverTx) -> Result<ResponseDeliverTx, Error> {
perform!(self, DeliverTx, req)
}

/// Signal the end of a block.
pub fn end_block(&mut self, req: RequestEndBlock) -> Result<ResponseEndBlock> {
pub fn end_block(&mut self, req: RequestEndBlock) -> Result<ResponseEndBlock, Error> {
perform!(self, EndBlock, req)
}

pub fn flush(&mut self) -> Result<ResponseFlush> {
pub fn flush(&mut self) -> Result<ResponseFlush, Error> {
perform!(self, Flush, RequestFlush {})
}

/// Commit the current state at the current height.
pub fn commit(&mut self) -> Result<ResponseCommit> {
pub fn commit(&mut self) -> Result<ResponseCommit, Error> {
perform!(self, Commit, RequestCommit {})
}

/// Request that the application set an option to a particular value.
pub fn set_option(&mut self, req: RequestSetOption) -> Result<ResponseSetOption> {
pub fn set_option(&mut self, req: RequestSetOption) -> Result<ResponseSetOption, Error> {
perform!(self, SetOption, req)
}

/// Used during state sync to discover available snapshots on peers.
pub fn list_snapshots(&mut self) -> Result<ResponseListSnapshots> {
pub fn list_snapshots(&mut self) -> Result<ResponseListSnapshots, Error> {
perform!(self, ListSnapshots, RequestListSnapshots {})
}

/// Called when bootstrapping the node using state sync.
pub fn offer_snapshot(&mut self, req: RequestOfferSnapshot) -> Result<ResponseOfferSnapshot> {
pub fn offer_snapshot(
&mut self,
req: RequestOfferSnapshot,
) -> Result<ResponseOfferSnapshot, Error> {
perform!(self, OfferSnapshot, req)
}

/// Used during state sync to retrieve chunks of snapshots from peers.
pub fn load_snapshot_chunk(
&mut self,
req: RequestLoadSnapshotChunk,
) -> Result<ResponseLoadSnapshotChunk> {
) -> Result<ResponseLoadSnapshotChunk, Error> {
perform!(self, LoadSnapshotChunk, req)
}

/// Apply the given snapshot chunk to the application's state.
pub fn apply_snapshot_chunk(
&mut self,
req: RequestApplySnapshotChunk,
) -> Result<ResponseApplySnapshotChunk> {
) -> Result<ResponseApplySnapshotChunk, Error> {
perform!(self, ApplySnapshotChunk, req)
}

fn perform(&mut self, req: request::Value) -> Result<response::Value> {
fn perform(&mut self, req: request::Value) -> Result<response::Value, Error> {
self.codec.send(Request { value: Some(req) })?;
let res = self
.codec
.next()
.ok_or(Error::ServerConnectionTerminated)??;
match res.value {
Some(value) => Ok(value),
None => Err(Error::MalformedServerResponse.into()),
}
.ok_or_else(Error::server_connection_terminated)??;
res.value.ok_or_else(Error::malformed_server_response)
}
}
40 changes: 25 additions & 15 deletions abci/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
//!
//! [tsp]: https://docs.tendermint.com/master/spec/abci/client-server.html#tsp

use crate::Result;
use bytes::{Buf, BufMut, BytesMut};
use prost::Message;
use std::io::{Read, Write};
use std::marker::PhantomData;
use tendermint_proto::abci::{Request, Response};

use crate::error::Error;

/// The maximum number of bytes we expect in a varint. We use this to check if
/// we're encountering a decoding error for a varint.
pub const MAX_VARINT_LENGTH: usize = 16;
Expand Down Expand Up @@ -60,7 +61,7 @@ where
S: Read,
I: Message + Default,
{
type Item = Result<I>;
type Item = Result<I, Error>;

fn next(&mut self) -> Option<Self::Item> {
loop {
Expand All @@ -75,7 +76,7 @@ where
// more
let bytes_read = match self.stream.read(self.read_window.as_mut()) {
Ok(br) => br,
Err(e) => return Some(Err(e.into())),
Err(e) => return Some(Err(Error::io(e))),
};
if bytes_read == 0 {
// The underlying stream terminated
Expand All @@ -93,39 +94,46 @@ where
O: Message,
{
/// Send a message using this codec.
pub fn send(&mut self, message: O) -> Result<()> {
pub fn send(&mut self, message: O) -> Result<(), Error> {
encode_length_delimited(message, &mut self.write_buf)?;
while !self.write_buf.is_empty() {
let bytes_written = self.stream.write(self.write_buf.as_ref())?;
let bytes_written = self
.stream
.write(self.write_buf.as_ref())
.map_err(Error::io)?;

if bytes_written == 0 {
return Err(std::io::Error::new(
return Err(Error::io(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"failed to write to underlying stream",
)
.into());
)));
}
self.write_buf.advance(bytes_written);
}
Ok(self.stream.flush()?)

self.stream.flush().map_err(Error::io)?;

Ok(())
}
}

/// Encode the given message with a length prefix.
pub fn encode_length_delimited<M, B>(message: M, mut dst: &mut B) -> Result<()>
pub fn encode_length_delimited<M, B>(message: M, mut dst: &mut B) -> Result<(), Error>
where
M: Message,
B: BufMut,
{
let mut buf = BytesMut::new();
message.encode(&mut buf)?;
message.encode(&mut buf).map_err(Error::encode)?;

let buf = buf.freeze();
encode_varint(buf.len() as u64, &mut dst);
dst.put(buf);
Ok(())
}

/// Attempt to decode a message of type `M` from the given source buffer.
pub fn decode_length_delimited<M>(src: &mut BytesMut) -> Result<Option<M>>
pub fn decode_length_delimited<M>(src: &mut BytesMut) -> Result<Option<M>, Error>
where
M: Message + Default,
{
Expand All @@ -148,7 +156,9 @@ where
src.advance(delim_len + (encoded_len as usize));

let mut result_bytes = BytesMut::from(tmp.split_to(encoded_len as usize).as_ref());
Ok(Some(M::decode(&mut result_bytes)?))
let res = M::decode(&mut result_bytes).map_err(Error::decode)?;

Ok(Some(res))
}
}

Expand All @@ -158,7 +168,7 @@ pub fn encode_varint<B: BufMut>(val: u64, mut buf: &mut B) {
prost::encoding::encode_varint(val << 1, &mut buf);
}

pub fn decode_varint<B: Buf>(mut buf: &mut B) -> Result<u64> {
let len = prost::encoding::decode_varint(&mut buf)?;
pub fn decode_varint<B: Buf>(mut buf: &mut B) -> Result<u64, Error> {
let len = prost::encoding::decode_varint(&mut buf).map_err(Error::decode)?;
Ok(len >> 1)
}
Loading

0 comments on commit 877e408

Please sign in to comment.