From cbb855f63cf5a1e075db88ede315f46fee338ce3 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Thu, 28 Jan 2021 11:56:58 -0500 Subject: [PATCH 01/25] Add minimal blocking ABCI library Signed-off-by: Thane Thomson --- Cargo.toml | 1 + abci/Cargo.toml | 19 ++++ abci/src/application.rs | 41 +++++++++ abci/src/application/echo.rs | 15 +++ abci/src/client.rs | 69 ++++++++++++++ abci/src/codec.rs | 173 +++++++++++++++++++++++++++++++++++ abci/src/lib.rs | 19 ++++ abci/src/result.rs | 28 ++++++ abci/src/server.rs | 106 +++++++++++++++++++++ abci/tests/integration.rs | 24 +++++ 10 files changed, 495 insertions(+) create mode 100644 abci/Cargo.toml create mode 100644 abci/src/application.rs create mode 100644 abci/src/application/echo.rs create mode 100644 abci/src/client.rs create mode 100644 abci/src/codec.rs create mode 100644 abci/src/lib.rs create mode 100644 abci/src/result.rs create mode 100644 abci/src/server.rs create mode 100644 abci/tests/integration.rs diff --git a/Cargo.toml b/Cargo.toml index 3541dc634..034bac036 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ + "abci", "light-client", "light-node", "p2p", diff --git a/abci/Cargo.toml b/abci/Cargo.toml new file mode 100644 index 000000000..d1997b9e2 --- /dev/null +++ b/abci/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "tendermint-abci" +version = "0.1.0" +authors = ["Thane Thomson "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[features] +client = [] +echo-app = [] + +[dependencies] +bytes = "1.0" +eyre = "0.6" +log = "0.4" +prost = "0.7" +tendermint-proto = { version = "0.17.1", path = "../proto" } +thiserror = "1.0" diff --git a/abci/src/application.rs b/abci/src/application.rs new file mode 100644 index 000000000..d6699f6ab --- /dev/null +++ b/abci/src/application.rs @@ -0,0 +1,41 @@ +//! ABCI application interface. + +#[cfg(feature = "echo-app")] +pub mod echo; + +use tendermint_proto::abci::request::Value; +use tendermint_proto::abci::{response, Request, RequestEcho, Response, ResponseEcho}; + +/// An ABCI application. +pub trait Application: Send + Clone + 'static { + fn echo(&self, request: RequestEcho) -> ResponseEcho { + ResponseEcho { + message: request.message, + } + } + + /// Executes the relevant application method based on the type of the + /// request, and produces the corresponding response. + fn handle(&self, request: Request) -> Response { + Response { + value: Some(match request.value.unwrap() { + Value::Echo(req) => response::Value::Echo(self.echo(req)), + _ => unimplemented!(), + // Value::Flush(_) => {} + // Value::Info(_) => {} + // Value::SetOption(_) => {} + // Value::InitChain(_) => {} + // Value::Query(_) => {} + // Value::BeginBlock(_) => {} + // Value::CheckTx(_) => {} + // Value::DeliverTx(_) => {} + // Value::EndBlock(_) => {} + // Value::Commit(_) => {} + // Value::ListSnapshots(_) => {} + // Value::OfferSnapshot(_) => {} + // Value::LoadSnapshotChunk(_) => {} + // Value::ApplySnapshotChunk(_) => {} + }), + } + } +} diff --git a/abci/src/application/echo.rs b/abci/src/application/echo.rs new file mode 100644 index 000000000..825c2d8f0 --- /dev/null +++ b/abci/src/application/echo.rs @@ -0,0 +1,15 @@ +//! Trivial ABCI echo application + +use crate::Application; + +/// Trivial echo application, mainly for testing purposes. +#[derive(Clone)] +pub struct EchoApp; + +impl Default for EchoApp { + fn default() -> Self { + Self {} + } +} + +impl Application for EchoApp {} diff --git a/abci/src/client.rs b/abci/src/client.rs new file mode 100644 index 000000000..020a6aa16 --- /dev/null +++ b/abci/src/client.rs @@ -0,0 +1,69 @@ +//! Blocking ABCI client. + +use crate::codec::ClientCodec; +use crate::{Error, Result}; +use std::net::{TcpStream, ToSocketAddrs}; +use tendermint_proto::abci::request; +use tendermint_proto::abci::response::Value; +use tendermint_proto::abci::{Request, RequestEcho, ResponseEcho}; + +/// The size of the read buffer for the client in its receiving of responses +/// from the server. +pub const DEFAULT_CLIENT_READ_BUF_SIZE: usize = 1024; + +/// Builder for a blocking ABCI client. +pub struct ClientBuilder { + read_buf_size: usize, +} + +impl ClientBuilder { + /// Builder constructor. + pub fn new(read_buf_size: usize) -> Self { + Self { read_buf_size } + } + + /// Client constructor that attempts to connect to the given network + /// address. + pub fn connect(self, addr: A) -> Result { + let stream = TcpStream::connect(addr)?; + Ok(Client { + codec: ClientCodec::new(stream, self.read_buf_size), + }) + } +} + +impl Default for ClientBuilder { + fn default() -> Self { + Self { + read_buf_size: DEFAULT_CLIENT_READ_BUF_SIZE, + } + } +} + +/// Blocking ABCI client. +pub struct Client { + codec: ClientCodec, +} + +impl Client { + /// Ask the ABCI server to echo back a message. + pub fn echo(&mut self, request: RequestEcho) -> Result { + self.codec.send(Request { + value: Some(request::Value::Echo(request)), + })?; + let response = self + .codec + .next() + .ok_or(Error::ServerConnectionTerminated)??; + match response.value { + Some(value) => match value { + Value::Echo(response) => Ok(response), + _ => Err(Error::UnexpectedServerResponseType( + "Echo".to_string(), + value, + )), + }, + None => Err(Error::MalformedServerResponse), + } + } +} diff --git a/abci/src/codec.rs b/abci/src/codec.rs new file mode 100644 index 000000000..e1c00e2c4 --- /dev/null +++ b/abci/src/codec.rs @@ -0,0 +1,173 @@ +//! Encoding/decoding mechanisms for ABCI requests and responses. +//! +//! Implements the [Tendermint Socket Protocol][tsp]. +//! +//! [tsp]: https://docs.tendermint.com/master/spec/abci/client-server.html#tsp + +use crate::Result; +use bytes::{Buf, BufMut, BytesMut}; +use prost::Message; +use std::io::{Read, Write}; +use std::marker::PhantomData; +use tendermint_proto::abci::{Request, Response}; + +// The maximum number of bytes we expect in a varint. We use this to check if +// we're encountering a decoding error for a varint. +const MAX_VARINT_LENGTH: usize = 16; + +/// The server receives incoming requests, and sends outgoing responses. +pub type ServerCodec = Codec; + +/// The client sends outgoing requests, and receives incoming responses. +pub type ClientCodec = Codec; + +/// Allows for iteration over `S` to produce instances of `I`, as well as +/// sending instances of `O`. +/// +/// `S` must implement [`std::io::Read`] and [`std::io::Write`]. +pub struct Codec { + stream: S, + // Long-running read buffer + read_buf: BytesMut, + // Fixed-length read window + read_window: Vec, + write_buf: BytesMut, + _incoming: PhantomData, + _outgoing: PhantomData, +} + +impl Codec +where + S: Read + Write, + I: Message + Default, + O: Message, +{ + /// Constructor. + pub fn new(stream: S, read_buf_size: usize) -> Self { + Self { + stream, + read_buf: BytesMut::new(), + read_window: vec![0_u8; read_buf_size], + write_buf: BytesMut::new(), + _incoming: Default::default(), + _outgoing: Default::default(), + } + } +} + +// Iterating over a codec produces instances of `Result`. +impl Iterator for Codec +where + S: Read, + I: Message + Default, +{ + type Item = Result; + + fn next(&mut self) -> Option { + loop { + // Try to decode an incoming message from our buffer first + match decode_length_delimited::(&mut self.read_buf) { + Ok(opt) => { + if let Some(incoming) = opt { + return Some(Ok(incoming)); + } + } + Err(e) => return Some(Err(e)), + } + + // If we don't have enough data to decode a message, try to read + // more + let bytes_read = match self.stream.read(self.read_window.as_mut()) { + Ok(br) => br, + Err(e) => return Some(Err(e.into())), + }; + if bytes_read == 0 { + // The underlying stream terminated + return None; + } + self.read_buf + .extend_from_slice(&self.read_window[..bytes_read]); + } + } +} + +impl Codec +where + S: Write, + O: Message, +{ + /// Send a message using this codec. + pub fn send(&mut self, message: O) -> Result<()> { + encode_length_delimited(message, &mut self.write_buf)?; + while !self.write_buf.is_empty() { + let bytes_written = self.stream.write(self.write_buf.as_ref())?; + if bytes_written == 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::WriteZero, + "failed to write to underlying stream", + ) + .into()); + } + self.write_buf.advance(bytes_written); + } + Ok(self.stream.flush()?) + } +} + +/// Encode the given message with a length prefix. +pub fn encode_length_delimited(message: M, mut dst: &mut B) -> Result<()> +where + M: Message, + B: BufMut, +{ + let mut buf = BytesMut::new(); + message.encode(&mut buf)?; + let buf = buf.freeze(); + encode_varint(buf.len() as u64, &mut dst); + dst.put(buf); + Ok(()) +} + +/// Attempt to decode a message of type `M` from the given source buffer. +pub fn decode_length_delimited(src: &mut BytesMut) -> Result> +where + M: Message + Default, +{ + let src_len = src.len(); + let mut tmp = src.clone().freeze(); + let encoded_len = match decode_varint(&mut tmp) { + Ok(len) => len, + Err(e) => { + return if src_len <= MAX_VARINT_LENGTH { + // We've potentially only received a partial length delimiter + Ok(None) + } else { + Err(e) + }; + } + }; + let remaining = tmp.remaining() as u64; + if remaining < encoded_len { + // We don't have enough data yet to decode the entire message + Ok(None) + } else { + let delim_len = src_len - tmp.remaining(); + // We only advance the source buffer once we're sure we have enough + // data to try to decode the result. + src.advance(delim_len + (encoded_len as usize)); + + let mut result_bytes = BytesMut::from(tmp.split_to(encoded_len as usize).as_ref()); + Ok(Some(M::decode(&mut result_bytes)?)) + } +} + +// encode_varint and decode_varint will be removed once +// https://github.com/tendermint/tendermint/issues/5783 lands in Tendermint. +fn encode_varint(val: u64, mut buf: &mut B) { + prost::encoding::encode_varint(val << 1, &mut buf); +} + +fn decode_varint(mut buf: &mut B) -> Result { + let len = prost::encoding::decode_varint(&mut buf)?; + Ok(len >> 1) +} diff --git a/abci/src/lib.rs b/abci/src/lib.rs new file mode 100644 index 000000000..70b88de0f --- /dev/null +++ b/abci/src/lib.rs @@ -0,0 +1,19 @@ +//! ABCI framework for building Tendermint applications in Rust. + +mod application; +#[cfg(feature = "client")] +mod client; +mod codec; +mod result; +mod server; + +// Common exports +pub use application::Application; +#[cfg(feature = "client")] +pub use client::{Client, ClientBuilder}; +pub use result::{Error, Result}; +pub use server::{Server, ServerBuilder}; + +// Example applications +#[cfg(feature = "echo-app")] +pub use application::echo::EchoApp; diff --git a/abci/src/result.rs b/abci/src/result.rs new file mode 100644 index 000000000..d9e7ef202 --- /dev/null +++ b/abci/src/result.rs @@ -0,0 +1,28 @@ +//! tendermint-abci error and result handling. + +use thiserror::Error; + +/// Result type used throughout the crate. +pub type Result = std::result::Result; + +/// Errors that can be produced by tendermint-abci. +#[derive(Debug, Error)] +pub enum Error { + #[error("I/O error")] + Io(#[from] std::io::Error), + + #[error("protobuf encoding error")] + ProtobufEncode(#[from] prost::EncodeError), + + #[error("protobuf decoding error")] + ProtobufDecode(#[from] prost::DecodeError), + + #[error("server connection terminated")] + ServerConnectionTerminated, + + #[error("malformed server response")] + MalformedServerResponse, + + #[error("unexpected server response type: expected {0}, but got {1:?}")] + UnexpectedServerResponseType(String, tendermint_proto::abci::response::Value), +} diff --git a/abci/src/server.rs b/abci/src/server.rs new file mode 100644 index 000000000..8078a3467 --- /dev/null +++ b/abci/src/server.rs @@ -0,0 +1,106 @@ +//! ABCI application server interface. + +use crate::codec::ServerCodec; +use crate::{Application, Result}; +use log::{error, info}; +use std::net::{TcpListener, TcpStream, ToSocketAddrs}; +use std::thread; + +/// The size of the read buffer for each incoming connection to the ABCI +/// server. +pub const DEFAULT_SERVER_READ_BUF_SIZE: usize = 1024 * 1024; + +/// Allows us to configure and construct an ABCI server. +pub struct ServerBuilder { + read_buf_size: usize, +} + +impl ServerBuilder { + /// Builder constructor. + pub fn new(read_buf_size: usize) -> Self { + Self { read_buf_size } + } + + /// Constructor for an ABCI server. + pub fn bind(self, addr: Addr, app: App) -> Result> + where + Addr: ToSocketAddrs, + App: Application, + { + let listener = TcpListener::bind(addr)?; + let local_addr = listener.local_addr()?.to_string(); + Ok(Server { + app, + listener, + local_addr, + read_buf_size: self.read_buf_size, + }) + } +} + +impl Default for ServerBuilder { + fn default() -> Self { + Self { + read_buf_size: DEFAULT_SERVER_READ_BUF_SIZE, + } + } +} + +/// A TCP-based server for serving a specific ABCI application. +pub struct Server { + app: App, + listener: TcpListener, + local_addr: String, + read_buf_size: usize, +} + +impl Server { + /// Initiate a blocking listener for incoming connections. + pub fn listen(self) -> Result<()> { + loop { + let (stream, addr) = self.listener.accept()?; + let addr = addr.to_string(); + info!("Incoming connection from: {}", addr); + self.spawn_client_handler(stream, addr); + } + } + + /// Getter for this server's local address. + pub fn local_addr(&self) -> String { + self.local_addr.clone() + } + + fn spawn_client_handler(&self, stream: TcpStream, addr: String) { + let app = self.app.clone(); + let read_buf_size = self.read_buf_size; + let _ = thread::spawn(move || Self::handle_client(stream, addr, app, read_buf_size)); + } + + fn handle_client(stream: TcpStream, addr: String, app: App, read_buf_size: usize) { + let mut codec = ServerCodec::new(stream, read_buf_size); + info!("Listening for incoming requests from {}", addr); + loop { + let request = match codec.next() { + Some(result) => match result { + Ok(r) => r, + Err(e) => { + error!( + "Failed to read incoming request from client {}: {:?}", + addr, e + ); + return; + } + }, + None => { + info!("Client {} terminated stream", addr); + return; + } + }; + let response = app.handle(request); + if let Err(e) = codec.send(response) { + error!("Failed sending response to client {}: {:?}", addr, e); + return; + } + } + } +} diff --git a/abci/tests/integration.rs b/abci/tests/integration.rs new file mode 100644 index 000000000..21dbf7f40 --- /dev/null +++ b/abci/tests/integration.rs @@ -0,0 +1,24 @@ +//! Integration tests for ABCI client/server. + +#[cfg(all(feature = "client", feature = "echo-app"))] +mod integration_tests { + use tendermint_abci::{ClientBuilder, EchoApp, ServerBuilder}; + use tendermint_proto::abci::RequestEcho; + + #[test] + fn echo() { + let server = ServerBuilder::default() + .bind("127.0.0.1:0", EchoApp::default()) + .unwrap(); + let server_addr = server.local_addr(); + let _ = std::thread::spawn(move || server.listen()); + let mut client = ClientBuilder::default().connect(server_addr).unwrap(); + + let response = client + .echo(RequestEcho { + message: "Hello ABCI!".to_string(), + }) + .unwrap(); + assert_eq!(response.message, "Hello ABCI!"); + } +} From f766e16aa323bec8b70b8d2af089e20d87d399f3 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Thu, 28 Jan 2021 17:54:21 -0500 Subject: [PATCH 02/25] Expand API to implement in-memory key/value store app Signed-off-by: Thane Thomson --- abci/Cargo.toml | 1 + abci/src/application.rs | 175 ++++++++++++-- abci/src/application/kvstore.rs | 263 +++++++++++++++++++++ abci/src/client.rs | 120 ++++++++-- abci/src/codec.rs | 10 +- abci/src/lib.rs | 2 + abci/src/result.rs | 6 + abci/tests/{integration.rs => echo_app.rs} | 2 +- abci/tests/kvstore_app.rs | 42 ++++ 9 files changed, 584 insertions(+), 37 deletions(-) create mode 100644 abci/src/application/kvstore.rs rename abci/tests/{integration.rs => echo_app.rs} (96%) create mode 100644 abci/tests/kvstore_app.rs diff --git a/abci/Cargo.toml b/abci/Cargo.toml index d1997b9e2..0a1ed892c 100644 --- a/abci/Cargo.toml +++ b/abci/Cargo.toml @@ -9,6 +9,7 @@ edition = "2018" [features] client = [] echo-app = [] +kvstore-app = [] [dependencies] bytes = "1.0" diff --git a/abci/src/application.rs b/abci/src/application.rs index d6699f6ab..abc9c0da7 100644 --- a/abci/src/application.rs +++ b/abci/src/application.rs @@ -2,39 +2,182 @@ #[cfg(feature = "echo-app")] pub mod echo; +#[cfg(feature = "kvstore-app")] +pub mod kvstore; use tendermint_proto::abci::request::Value; -use tendermint_proto::abci::{response, Request, RequestEcho, Response, ResponseEcho}; +use tendermint_proto::abci::{ + response, Request, RequestApplySnapshotChunk, RequestBeginBlock, RequestCheckTx, + RequestDeliverTx, RequestEcho, RequestEndBlock, RequestInfo, RequestInitChain, + RequestLoadSnapshotChunk, RequestOfferSnapshot, RequestQuery, RequestSetOption, Response, + ResponseApplySnapshotChunk, ResponseBeginBlock, ResponseCheckTx, ResponseCommit, + ResponseDeliverTx, ResponseEcho, ResponseEndBlock, ResponseFlush, ResponseInfo, + ResponseInitChain, ResponseListSnapshots, ResponseLoadSnapshotChunk, ResponseOfferSnapshot, + ResponseQuery, ResponseSetOption, +}; /// An ABCI application. pub trait Application: Send + Clone + 'static { + /// Echo back the same message as provided in the request. fn echo(&self, request: RequestEcho) -> ResponseEcho { ResponseEcho { message: request.message, } } + /// Provide information about the ABCI application. + fn info(&self, _request: RequestInfo) -> ResponseInfo { + ResponseInfo { + data: "".to_string(), + version: "".to_string(), + app_version: 0, + last_block_height: 0, + last_block_app_hash: vec![], + } + } + + /// Called once upon genesis. + fn init_chain(&self, _request: RequestInitChain) -> ResponseInitChain { + ResponseInitChain { + consensus_params: None, + validators: vec![], + app_hash: vec![], + } + } + + /// Query the application for data at the current or past height. + fn query(&self, _request: RequestQuery) -> ResponseQuery { + ResponseQuery { + code: 0, + log: "".to_string(), + info: "".to_string(), + index: 0, + key: vec![], + value: vec![], + proof_ops: None, + height: 0, + codespace: "".to_string(), + } + } + + /// Check the given transaction before putting it into the local mempool. + fn check_tx(&self, _request: RequestCheckTx) -> ResponseCheckTx { + ResponseCheckTx { + code: 0, + data: vec![], + log: "".to_string(), + info: "".to_string(), + gas_wanted: 0, + gas_used: 0, + events: vec![], + codespace: "".to_string(), + } + } + + /// Signals the beginning of a new block, prior to any `DeliverTx` calls. + fn begin_block(&self, _request: RequestBeginBlock) -> ResponseBeginBlock { + ResponseBeginBlock { events: vec![] } + } + + /// Apply a transaction to the application's state. + fn deliver_tx(&self, _request: RequestDeliverTx) -> ResponseDeliverTx { + ResponseDeliverTx { + code: 0, + data: vec![], + log: "".to_string(), + info: "".to_string(), + gas_wanted: 0, + gas_used: 0, + events: vec![], + codespace: "".to_string(), + } + } + + /// Signals the end of a block. + fn end_block(&self, _request: RequestEndBlock) -> ResponseEndBlock { + ResponseEndBlock { + validator_updates: vec![], + consensus_param_updates: None, + events: vec![], + } + } + + /// Signals that messages queued on the client should be flushed to the server. + fn flush(&self) -> ResponseFlush { + ResponseFlush {} + } + + /// Commit the current state at the current height. + fn commit(&self) -> ResponseCommit { + ResponseCommit { + data: vec![], + retain_height: 0, + } + } + + /// Allows the Tendermint node to request that the application set an + /// option to a particular value. + fn set_option(&self, _request: RequestSetOption) -> ResponseSetOption { + ResponseSetOption { + code: 0, + log: "".to_string(), + info: "".to_string(), + } + } + + /// Used during state sync to discover available snapshots on peers. + fn list_snapshots(&self) -> ResponseListSnapshots { + ResponseListSnapshots { snapshots: vec![] } + } + + /// Called when bootstrapping the node using state sync. + fn offer_snapshot(&self, _request: RequestOfferSnapshot) -> ResponseOfferSnapshot { + ResponseOfferSnapshot { result: 0 } + } + + /// Used during state sync to retrieve chunks of snapshots from peers. + fn load_snapshot_chunk(&self, _request: RequestLoadSnapshotChunk) -> ResponseLoadSnapshotChunk { + ResponseLoadSnapshotChunk { chunk: vec![] } + } + + /// Apply the given snapshot chunk to the application's state. + fn apply_snapshot_chunk( + &self, + _request: RequestApplySnapshotChunk, + ) -> ResponseApplySnapshotChunk { + ResponseApplySnapshotChunk { + result: 0, + refetch_chunks: vec![], + reject_senders: vec![], + } + } + /// Executes the relevant application method based on the type of the /// request, and produces the corresponding response. fn handle(&self, request: Request) -> Response { Response { value: Some(match request.value.unwrap() { Value::Echo(req) => response::Value::Echo(self.echo(req)), - _ => unimplemented!(), - // Value::Flush(_) => {} - // Value::Info(_) => {} - // Value::SetOption(_) => {} - // Value::InitChain(_) => {} - // Value::Query(_) => {} - // Value::BeginBlock(_) => {} - // Value::CheckTx(_) => {} - // Value::DeliverTx(_) => {} - // Value::EndBlock(_) => {} - // Value::Commit(_) => {} - // Value::ListSnapshots(_) => {} - // Value::OfferSnapshot(_) => {} - // Value::LoadSnapshotChunk(_) => {} - // Value::ApplySnapshotChunk(_) => {} + Value::Flush(_) => response::Value::Flush(self.flush()), + Value::Info(req) => response::Value::Info(self.info(req)), + Value::SetOption(req) => response::Value::SetOption(self.set_option(req)), + Value::InitChain(req) => response::Value::InitChain(self.init_chain(req)), + Value::Query(req) => response::Value::Query(self.query(req)), + Value::BeginBlock(req) => response::Value::BeginBlock(self.begin_block(req)), + Value::CheckTx(req) => response::Value::CheckTx(self.check_tx(req)), + Value::DeliverTx(req) => response::Value::DeliverTx(self.deliver_tx(req)), + Value::EndBlock(req) => response::Value::EndBlock(self.end_block(req)), + Value::Commit(_) => response::Value::Commit(self.commit()), + Value::ListSnapshots(_) => response::Value::ListSnapshots(self.list_snapshots()), + Value::OfferSnapshot(req) => { + response::Value::OfferSnapshot(self.offer_snapshot(req)) + } + Value::LoadSnapshotChunk(req) => { + response::Value::LoadSnapshotChunk(self.load_snapshot_chunk(req)) + } + Value::ApplySnapshotChunk(req) => { + response::Value::ApplySnapshotChunk(self.apply_snapshot_chunk(req)) + } }), } } diff --git a/abci/src/application/kvstore.rs b/abci/src/application/kvstore.rs new file mode 100644 index 000000000..47871a9db --- /dev/null +++ b/abci/src/application/kvstore.rs @@ -0,0 +1,263 @@ +//! In-memory key/value store ABCI application. + +use crate::codec::{encode_varint, MAX_VARINT_LENGTH}; +use crate::{Application, Error, Result}; +use log::debug; +use std::collections::HashMap; +use std::sync::mpsc::{channel, Receiver, Sender}; +use tendermint_proto::abci::{ + Event, EventAttribute, RequestCheckTx, RequestDeliverTx, RequestInfo, RequestQuery, + ResponseCheckTx, ResponseCommit, ResponseDeliverTx, ResponseInfo, ResponseQuery, +}; + +/// In-memory, hashmap-backed key/value store ABCI application. +/// +/// This structure effectively just serves as a handle to the actual key/value +/// store - the [`KeyValueStoreDriver`]. +#[derive(Debug, Clone)] +pub struct KeyValueStoreApp { + cmd_tx: Sender, +} + +impl KeyValueStoreApp { + /// Constructor. + pub fn new() -> (Self, KeyValueStoreDriver) { + let (cmd_tx, cmd_rx) = channel(); + (Self { cmd_tx }, KeyValueStoreDriver::new(cmd_rx)) + } + + /// Attempt to retrieve the value associated with the given key. + pub fn get>(&self, key: K) -> Result<(i64, Option)> { + let (result_tx, result_rx) = channel(); + channel_send( + &self.cmd_tx, + Command::Get { + key: key.as_ref().to_string(), + result_tx, + }, + )?; + channel_recv(&result_rx) + } + + /// Attempt to set the value associated with the given key. + /// + /// Optionally returns any pre-existing value associated with the given + /// key. + pub fn set(&self, key: K, value: V) -> Result> + where + K: AsRef, + V: AsRef, + { + let (result_tx, result_rx) = channel(); + channel_send( + &self.cmd_tx, + Command::Set { + key: key.as_ref().to_string(), + value: value.as_ref().to_string(), + result_tx, + }, + )?; + channel_recv(&result_rx) + } +} + +impl Application for KeyValueStoreApp { + fn info(&self, request: RequestInfo) -> ResponseInfo { + debug!("Tendermint version: {}", request.version); + debug!("Block version: {}", request.block_version); + debug!("P2P version: {}", request.p2p_version); + + let (result_tx, result_rx) = channel(); + channel_send(&self.cmd_tx, Command::GetInfo { result_tx }).unwrap(); + let (last_block_height, last_block_app_hash) = channel_recv(&result_rx).unwrap(); + + ResponseInfo { + data: "kvstore-rs".to_string(), + version: "0.1.0".to_string(), + app_version: 1, + last_block_height, + last_block_app_hash, + } + } + + fn query(&self, request: RequestQuery) -> ResponseQuery { + let key = match String::from_utf8(request.data.clone()) { + Ok(s) => s, + Err(e) => panic!("Failed to intepret key as UTF-8: {}", e), + }; + debug!("Attempting to get key: {}", key); + match self.get(key.clone()) { + Ok((height, value_opt)) => match value_opt { + Some(value) => ResponseQuery { + code: 0, + log: "exists".to_string(), + info: "".to_string(), + index: 0, + key: request.data, + value: value.into_bytes(), + proof_ops: None, + height, + codespace: "".to_string(), + }, + None => ResponseQuery { + code: 0, + log: "does not exist".to_string(), + info: "".to_string(), + index: 0, + key: request.data, + value: vec![], + proof_ops: None, + height, + codespace: "".to_string(), + }, + }, + Err(e) => panic!("Failed to get key \"{}\": {:?}", key, e), + } + } + + fn check_tx(&self, _request: RequestCheckTx) -> ResponseCheckTx { + ResponseCheckTx { + code: 0, + data: vec![], + log: "".to_string(), + info: "".to_string(), + gas_wanted: 1, + gas_used: 0, + events: vec![], + codespace: "".to_string(), + } + } + + fn deliver_tx(&self, request: RequestDeliverTx) -> ResponseDeliverTx { + let tx = String::from_utf8(request.tx).unwrap(); + let tx_parts = tx.split('=').collect::>(); + let (key, value) = if tx_parts.len() == 2 { + (tx_parts[0], tx_parts[1]) + } else { + (tx.as_ref(), tx.as_ref()) + }; + let _ = self.set(key, value).unwrap(); + ResponseDeliverTx { + code: 0, + data: vec![], + log: "".to_string(), + info: "".to_string(), + gas_wanted: 0, + gas_used: 0, + events: vec![Event { + r#type: "app".to_string(), + attributes: vec![ + EventAttribute { + key: "key".as_bytes().to_owned(), + value: key.as_bytes().to_owned(), + index: true, + }, + EventAttribute { + key: "index_key".as_bytes().to_owned(), + value: "index is working".as_bytes().to_owned(), + index: true, + }, + EventAttribute { + key: "noindex_key".as_bytes().to_owned(), + value: "index is working".as_bytes().to_owned(), + index: false, + }, + ], + }], + codespace: "".to_string(), + } + } + + fn commit(&self) -> ResponseCommit { + let (result_tx, result_rx) = channel(); + channel_send(&self.cmd_tx, Command::Commit { result_tx }).unwrap(); + let (height, app_hash) = channel_recv(&result_rx).unwrap(); + ResponseCommit { + data: app_hash, + retain_height: height - 1, + } + } +} + +/// Manages key/value store state. +#[derive(Debug)] +pub struct KeyValueStoreDriver { + store: HashMap, + height: i64, + app_hash: Vec, + cmd_rx: Receiver, +} + +impl KeyValueStoreDriver { + fn new(cmd_rx: Receiver) -> Self { + Self { + store: HashMap::new(), + height: 0, + app_hash: vec![0_u8; MAX_VARINT_LENGTH], + cmd_rx, + } + } + + /// Run the driver in the current thread (blocking). + pub fn run(mut self) -> Result<()> { + loop { + let cmd = self + .cmd_rx + .recv() + .map_err(|e| Error::ChannelRecv(e.to_string()))?; + debug!("Received driver command: {:?}", cmd); + match cmd { + Command::GetInfo { result_tx } => { + channel_send(&result_tx, (self.height, self.app_hash.clone()))? + } + Command::Get { key, result_tx } => channel_send( + &result_tx, + (self.height, self.store.get(&key).map(Clone::clone)), + )?, + Command::Set { + key, + value, + result_tx, + } => channel_send(&result_tx, self.store.insert(key, value))?, + Command::Commit { result_tx } => self.commit(result_tx)?, + } + } + } + + fn commit(&mut self, result_tx: Sender<(i64, Vec)>) -> Result<()> { + // As in the Go-based key/value store, simply encode the number of + // items as the "app hash" + encode_varint(self.store.len() as u64, &mut self.app_hash); + self.height += 1; + channel_send(&result_tx, (self.height, self.app_hash.clone())) + } +} + +#[derive(Debug, Clone)] +enum Command { + /// Get the height of the last commit. + GetInfo { result_tx: Sender<(i64, Vec)> }, + /// Get the key associated with `key`. + Get { + key: String, + result_tx: Sender<(i64, Option)>, + }, + /// Set the value of `key` to to `value`. + Set { + key: String, + value: String, + result_tx: Sender>, + }, + /// Commit the current state of the application, which involves recomputing + /// the application's hash. + Commit { result_tx: Sender<(i64, Vec)> }, +} + +fn channel_send(tx: &Sender, value: T) -> Result<()> { + tx.send(value) + .map_err(|e| Error::ChannelSend(e.to_string())) +} + +fn channel_recv(rx: &Receiver) -> Result { + rx.recv().map_err(|e| Error::ChannelRecv(e.to_string())) +} diff --git a/abci/src/client.rs b/abci/src/client.rs index 020a6aa16..386f69a45 100644 --- a/abci/src/client.rs +++ b/abci/src/client.rs @@ -3,8 +3,15 @@ use crate::codec::ClientCodec; use crate::{Error, Result}; use std::net::{TcpStream, ToSocketAddrs}; -use tendermint_proto::abci::request; -use tendermint_proto::abci::response::Value; +use tendermint_proto::abci::{ + request, response, RequestApplySnapshotChunk, RequestBeginBlock, RequestCheckTx, RequestCommit, + RequestDeliverTx, RequestEndBlock, RequestFlush, RequestInfo, RequestInitChain, + RequestListSnapshots, RequestLoadSnapshotChunk, RequestOfferSnapshot, RequestQuery, + RequestSetOption, ResponseApplySnapshotChunk, ResponseBeginBlock, ResponseCheckTx, + ResponseCommit, ResponseDeliverTx, ResponseEndBlock, ResponseFlush, ResponseInfo, + ResponseInitChain, ResponseListSnapshots, ResponseLoadSnapshotChunk, ResponseOfferSnapshot, + ResponseQuery, ResponseSetOption, +}; use tendermint_proto::abci::{Request, RequestEcho, ResponseEcho}; /// The size of the read buffer for the client in its receiving of responses @@ -45,24 +52,107 @@ pub struct Client { codec: ClientCodec, } +macro_rules! perform { + ($self:expr, $type:ident, $req:expr) => { + match $self.perform(request::Value::$type($req))? { + response::Value::$type(r) => Ok(r), + r => Err(Error::UnexpectedServerResponseType( + stringify!($type).to_string(), + r, + )), + } + }; +} + impl Client { /// Ask the ABCI server to echo back a message. - pub fn echo(&mut self, request: RequestEcho) -> Result { - self.codec.send(Request { - value: Some(request::Value::Echo(request)), - })?; - let response = self + pub fn echo(&mut self, req: RequestEcho) -> Result { + perform!(self, Echo, req) + } + + /// Request information about the ABCI application. + pub fn info(&mut self, req: RequestInfo) -> Result { + perform!(self, Info, req) + } + + /// To be called once upon genesis. + pub fn init_chain(&mut self, req: RequestInitChain) -> Result { + perform!(self, InitChain, req) + } + + /// Query the application for data at the current or past height. + pub fn query(&mut self, req: RequestQuery) -> Result { + perform!(self, Query, req) + } + + /// Check the given transaction before putting it into the local mempool. + pub fn check_tx(&mut self, req: RequestCheckTx) -> Result { + perform!(self, CheckTx, req) + } + + /// Signal the beginning of a new block, prior to any `DeliverTx` calls. + pub fn begin_block(&mut self, req: RequestBeginBlock) -> Result { + perform!(self, BeginBlock, req) + } + + /// Apply a transaction to the application's state. + pub fn deliver_tx(&mut self, req: RequestDeliverTx) -> Result { + perform!(self, DeliverTx, req) + } + + /// Signal the end of a block. + pub fn end_block(&mut self, req: RequestEndBlock) -> Result { + perform!(self, EndBlock, req) + } + + pub fn flush(&mut self) -> Result { + perform!(self, Flush, RequestFlush {}) + } + + /// Commit the current state at the current height. + pub fn commit(&mut self) -> Result { + perform!(self, Commit, RequestCommit {}) + } + + /// Request that the application set an option to a particular value. + pub fn set_option(&mut self, req: RequestSetOption) -> Result { + perform!(self, SetOption, req) + } + + /// Used during state sync to discover available snapshots on peers. + pub fn list_snapshots(&mut self) -> Result { + perform!(self, ListSnapshots, RequestListSnapshots {}) + } + + /// Called when bootstrapping the node using state sync. + pub fn offer_snapshot(&mut self, req: RequestOfferSnapshot) -> Result { + perform!(self, OfferSnapshot, req) + } + + /// Used during state sync to retrieve chunks of snapshots from peers. + pub fn load_snapshot_chunk( + &mut self, + req: RequestLoadSnapshotChunk, + ) -> Result { + perform!(self, LoadSnapshotChunk, req) + } + + /// Apply the given snapshot chunk to the application's state. + pub fn apply_snapshot_chunk( + &mut self, + req: RequestApplySnapshotChunk, + ) -> Result { + perform!(self, ApplySnapshotChunk, req) + } + + fn perform(&mut self, req: request::Value) -> Result { + self.codec.send(Request { value: Some(req) })?; + let res = self .codec .next() .ok_or(Error::ServerConnectionTerminated)??; - match response.value { - Some(value) => match value { - Value::Echo(response) => Ok(response), - _ => Err(Error::UnexpectedServerResponseType( - "Echo".to_string(), - value, - )), - }, + match res.value { + Some(value) => Ok(value), None => Err(Error::MalformedServerResponse), } } diff --git a/abci/src/codec.rs b/abci/src/codec.rs index e1c00e2c4..19a0c3b46 100644 --- a/abci/src/codec.rs +++ b/abci/src/codec.rs @@ -11,9 +11,9 @@ use std::io::{Read, Write}; use std::marker::PhantomData; use tendermint_proto::abci::{Request, Response}; -// The maximum number of bytes we expect in a varint. We use this to check if -// we're encountering a decoding error for a varint. -const MAX_VARINT_LENGTH: usize = 16; +/// The maximum number of bytes we expect in a varint. We use this to check if +/// we're encountering a decoding error for a varint. +pub const MAX_VARINT_LENGTH: usize = 16; /// The server receives incoming requests, and sends outgoing responses. pub type ServerCodec = Codec; @@ -163,11 +163,11 @@ where // encode_varint and decode_varint will be removed once // https://github.com/tendermint/tendermint/issues/5783 lands in Tendermint. -fn encode_varint(val: u64, mut buf: &mut B) { +pub fn encode_varint(val: u64, mut buf: &mut B) { prost::encoding::encode_varint(val << 1, &mut buf); } -fn decode_varint(mut buf: &mut B) -> Result { +pub fn decode_varint(mut buf: &mut B) -> Result { let len = prost::encoding::decode_varint(&mut buf)?; Ok(len >> 1) } diff --git a/abci/src/lib.rs b/abci/src/lib.rs index 70b88de0f..65449502a 100644 --- a/abci/src/lib.rs +++ b/abci/src/lib.rs @@ -17,3 +17,5 @@ pub use server::{Server, ServerBuilder}; // Example applications #[cfg(feature = "echo-app")] pub use application::echo::EchoApp; +#[cfg(feature = "kvstore-app")] +pub use application::kvstore::{KeyValueStoreApp, KeyValueStoreDriver}; diff --git a/abci/src/result.rs b/abci/src/result.rs index d9e7ef202..84b34d73e 100644 --- a/abci/src/result.rs +++ b/abci/src/result.rs @@ -25,4 +25,10 @@ pub enum Error { #[error("unexpected server response type: expected {0}, but got {1:?}")] UnexpectedServerResponseType(String, tendermint_proto::abci::response::Value), + + #[error("channel send error: {0}")] + ChannelSend(String), + + #[error("channel receive error: {0}")] + ChannelRecv(String), } diff --git a/abci/tests/integration.rs b/abci/tests/echo_app.rs similarity index 96% rename from abci/tests/integration.rs rename to abci/tests/echo_app.rs index 21dbf7f40..f0306099b 100644 --- a/abci/tests/integration.rs +++ b/abci/tests/echo_app.rs @@ -1,7 +1,7 @@ //! Integration tests for ABCI client/server. #[cfg(all(feature = "client", feature = "echo-app"))] -mod integration_tests { +mod echo_app_integration { use tendermint_abci::{ClientBuilder, EchoApp, ServerBuilder}; use tendermint_proto::abci::RequestEcho; diff --git a/abci/tests/kvstore_app.rs b/abci/tests/kvstore_app.rs new file mode 100644 index 000000000..62a1c51d6 --- /dev/null +++ b/abci/tests/kvstore_app.rs @@ -0,0 +1,42 @@ +//! Key/value store application integration tests. + +#[cfg(all(feature = "client", feature = "kvstore-app"))] +mod kvstore_app_integration { + use std::thread; + use tendermint_abci::{ClientBuilder, KeyValueStoreApp, ServerBuilder}; + use tendermint_proto::abci::{RequestDeliverTx, RequestEcho, RequestQuery}; + + #[test] + fn happy_path() { + let (app, driver) = KeyValueStoreApp::new(); + let server = ServerBuilder::default().bind("127.0.0.1:0", app).unwrap(); + let server_addr = server.local_addr(); + thread::spawn(move || driver.run()); + thread::spawn(move || server.listen()); + + let mut client = ClientBuilder::default().connect(server_addr).unwrap(); + let res = client + .echo(RequestEcho { + message: "Hello ABCI!".to_string(), + }) + .unwrap(); + assert_eq!(res.message, "Hello ABCI!"); + + client + .deliver_tx(RequestDeliverTx { + tx: "test-key=test-value".as_bytes().to_owned(), + }) + .unwrap(); + client.commit().unwrap(); + + let res = client + .query(RequestQuery { + data: "test-key".as_bytes().to_owned(), + path: "".to_string(), + height: 0, + prove: false, + }) + .unwrap(); + assert_eq!(res.value, "test-value".as_bytes().to_owned()); + } +} From 247e0547102a92f304e9b2fabd91b474759698cb Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Thu, 28 Jan 2021 18:26:38 -0500 Subject: [PATCH 03/25] Add kvstore-rs ABCI app Signed-off-by: Thane Thomson --- abci/Cargo.toml | 8 +++++++ abci/src/application/kvstore.rs | 32 ++++++++++++++++++---------- abci/src/application/kvstore/main.rs | 14 ++++++++++++ abci/src/codec.rs | 1 + abci/src/server.rs | 1 + 5 files changed, 45 insertions(+), 11 deletions(-) create mode 100644 abci/src/application/kvstore/main.rs diff --git a/abci/Cargo.toml b/abci/Cargo.toml index 0a1ed892c..0f6ae94e9 100644 --- a/abci/Cargo.toml +++ b/abci/Cargo.toml @@ -6,10 +6,16 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[[bin]] +name = "kvstore-rs" +path = "src/application/kvstore/main.rs" +required-features = [ "binary", "kvstore-app" ] + [features] client = [] echo-app = [] kvstore-app = [] +binary = [ "env_logger" ] [dependencies] bytes = "1.0" @@ -18,3 +24,5 @@ log = "0.4" prost = "0.7" tendermint-proto = { version = "0.17.1", path = "../proto" } thiserror = "1.0" + +env_logger = { version = "0.8", optional = true } diff --git a/abci/src/application/kvstore.rs b/abci/src/application/kvstore.rs index 47871a9db..ffbd2be77 100644 --- a/abci/src/application/kvstore.rs +++ b/abci/src/application/kvstore.rs @@ -2,7 +2,8 @@ use crate::codec::{encode_varint, MAX_VARINT_LENGTH}; use crate::{Application, Error, Result}; -use log::debug; +use bytes::BytesMut; +use log::{debug, info}; use std::collections::HashMap; use std::sync::mpsc::{channel, Receiver, Sender}; use tendermint_proto::abci::{ @@ -63,9 +64,10 @@ impl KeyValueStoreApp { impl Application for KeyValueStoreApp { fn info(&self, request: RequestInfo) -> ResponseInfo { - debug!("Tendermint version: {}", request.version); - debug!("Block version: {}", request.block_version); - debug!("P2P version: {}", request.p2p_version); + debug!( + "Got info request. Tendermint version: {}; Block version: {}; P2P version: {}", + request.version, request.block_version, request.p2p_version + ); let (result_tx, result_rx) = channel(); channel_send(&self.cmd_tx, Command::GetInfo { result_tx }).unwrap(); @@ -172,6 +174,7 @@ impl Application for KeyValueStoreApp { let (result_tx, result_rx) = channel(); channel_send(&self.cmd_tx, Command::Commit { result_tx }).unwrap(); let (height, app_hash) = channel_recv(&result_rx).unwrap(); + info!("Committed height {}", height); ResponseCommit { data: app_hash, retain_height: height - 1, @@ -205,20 +208,25 @@ impl KeyValueStoreDriver { .cmd_rx .recv() .map_err(|e| Error::ChannelRecv(e.to_string()))?; - debug!("Received driver command: {:?}", cmd); match cmd { Command::GetInfo { result_tx } => { channel_send(&result_tx, (self.height, self.app_hash.clone()))? } - Command::Get { key, result_tx } => channel_send( - &result_tx, - (self.height, self.store.get(&key).map(Clone::clone)), - )?, + Command::Get { key, result_tx } => { + debug!("Getting value for \"{}\"", key); + channel_send( + &result_tx, + (self.height, self.store.get(&key).map(Clone::clone)), + )?; + } Command::Set { key, value, result_tx, - } => channel_send(&result_tx, self.store.insert(key, value))?, + } => { + debug!("Setting \"{}\" = \"{}\"", key, value); + channel_send(&result_tx, self.store.insert(key, value))?; + } Command::Commit { result_tx } => self.commit(result_tx)?, } } @@ -227,7 +235,9 @@ impl KeyValueStoreDriver { fn commit(&mut self, result_tx: Sender<(i64, Vec)>) -> Result<()> { // As in the Go-based key/value store, simply encode the number of // items as the "app hash" - encode_varint(self.store.len() as u64, &mut self.app_hash); + let mut app_hash = BytesMut::with_capacity(MAX_VARINT_LENGTH); + encode_varint(self.store.len() as u64, &mut app_hash); + self.app_hash = app_hash.to_vec(); self.height += 1; channel_send(&result_tx, (self.height, self.app_hash.clone())) } diff --git a/abci/src/application/kvstore/main.rs b/abci/src/application/kvstore/main.rs new file mode 100644 index 000000000..c2bd3fdc3 --- /dev/null +++ b/abci/src/application/kvstore/main.rs @@ -0,0 +1,14 @@ +//! In-memory key/value store application for Tendermint. + +use tendermint_abci::{KeyValueStoreApp, ServerBuilder}; + +fn main() { + env_logger::init(); + + let (app, driver) = KeyValueStoreApp::new(); + let server = ServerBuilder::default() + .bind("127.0.0.1:26658", app) + .unwrap(); + std::thread::spawn(move || driver.run()); + server.listen().unwrap(); +} diff --git a/abci/src/codec.rs b/abci/src/codec.rs index 19a0c3b46..93063912d 100644 --- a/abci/src/codec.rs +++ b/abci/src/codec.rs @@ -18,6 +18,7 @@ pub const MAX_VARINT_LENGTH: usize = 16; /// The server receives incoming requests, and sends outgoing responses. pub type ServerCodec = Codec; +#[cfg(feature = "client")] /// The client sends outgoing requests, and receives incoming responses. pub type ClientCodec = Codec; diff --git a/abci/src/server.rs b/abci/src/server.rs index 8078a3467..4dde9bdb9 100644 --- a/abci/src/server.rs +++ b/abci/src/server.rs @@ -29,6 +29,7 @@ impl ServerBuilder { { let listener = TcpListener::bind(addr)?; let local_addr = listener.local_addr()?.to_string(); + info!("ABCI server running at {}", local_addr); Ok(Server { app, listener, From 14a5cd1cf3e3306f23b8380eec035b0eb9567cd4 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Thu, 28 Jan 2021 18:37:06 -0500 Subject: [PATCH 04/25] Add rudimentary README Signed-off-by: Thane Thomson --- abci/README.md | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 abci/README.md diff --git a/abci/README.md b/abci/README.md new file mode 100644 index 000000000..2d6b9693b --- /dev/null +++ b/abci/README.md @@ -0,0 +1,97 @@ +## tendermint-abci (WIP) + +[![Crate][crate-image]][crate-link] +[![Docs][docs-image]][docs-link] +[![Build Status][build-image]][build-link] +[![Audit Status][audit-image]][audit-link] +[![Apache 2.0 Licensed][license-image]][license-link] +![Rust Stable][rustc-image] + +ABCI framework for building applications for Tendermint in Rust. + +## Requirements + +- The latest stable version of Rust + +## Examples + +A trivial in-memory key/value store application is provided. To run this +application, from the `tendermint-abci` crate's directory: + +```bash +# Set your logging level through RUST_LOG (e.g. RUST_LOG=info) +# Binds to 127.0.0.1:26658 +RUST_LOG=debug cargo run --bin kvstore-rs --features binary,kvstore-app + +# Reset and run your Tendermint node (binds RPC to 127.0.0.1:26657 by default) +tendermint unsafe_reset_all && tendermint start + +# Submit a key/value pair (set "somekey" to "somevalue") +curl 'http://127.0.0.1:26657/broadcast_tx_async?tx="somekey=somevalue"' + +#{ +# "jsonrpc": "2.0", +# "id": -1, +# "result": { +# "code": 0, +# "data": "", +# "log": "", +# "codespace": "", +# "hash": "17ED61261A5357FEE7ACDE4FAB154882A346E479AC236CFB2F22A2E8870A9C3D" +# } +#} + +# Query for the value we just submitted ("736f6d656b6579" is the hex +# representation of "somekey") +curl 'http://127.0.0.1:26657/abci_query?data=0x736f6d656b6579' + +#{ +# "jsonrpc": "2.0", +# "id": -1, +# "result": { +# "response": { +# "code": 0, +# "log": "exists", +# "info": "", +# "index": "0", +# "key": "c29tZWtleQ==", +# "value": "c29tZXZhbHVl", +# "proofOps": null, +# "height": "189", +# "codespace": "" +# } +# } +#} +``` + +## License + +Copyright © 2021 Informal Systems + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use the files in this repository except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +[//]: # (badges) + +[crate-image]: https://img.shields.io/crates/v/tendermint-abci.svg +[crate-link]: https://crates.io/crates/tendermint-abci +[docs-image]: https://docs.rs/tendermint-abci/badge.svg +[docs-link]: https://docs.rs/tendermint-abci/ +[build-image]: https://github.com/informalsystems/tendermint-rs/workflows/Rust/badge.svg +[build-link]: https://github.com/informalsystems/tendermint-rs/actions?query=workflow%3ARust +[audit-image]: https://github.com/informalsystems/tendermint-rs/workflows/Audit-Check/badge.svg +[audit-link]: https://github.com/informalsystems/tendermint-rs/actions?query=workflow%3AAudit-Check +[license-image]: https://img.shields.io/badge/license-Apache2.0-blue.svg +[license-link]: https://github.com/informalsystems/tendermint-rs/blob/master/LICENSE +[rustc-image]: https://img.shields.io/badge/rustc-stable-blue.svg + +[//]: # (general links) From de60b3066fe0bf44506c55fdc1ac86656b06b248 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Fri, 29 Jan 2021 11:19:32 -0500 Subject: [PATCH 05/25] Bump proto version dependency to v0.18.0 Signed-off-by: Thane Thomson --- abci/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/abci/Cargo.toml b/abci/Cargo.toml index 0f6ae94e9..ff0277559 100644 --- a/abci/Cargo.toml +++ b/abci/Cargo.toml @@ -22,7 +22,7 @@ bytes = "1.0" eyre = "0.6" log = "0.4" prost = "0.7" -tendermint-proto = { version = "0.17.1", path = "../proto" } +tendermint-proto = { version = "0.18.0", path = "../proto" } thiserror = "1.0" env_logger = { version = "0.8", optional = true } From 71fb034d9fdf51d3f845c21d3e2e375e76181dc6 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Fri, 29 Jan 2021 11:23:31 -0500 Subject: [PATCH 06/25] Replace manual default structs with Default::default() Signed-off-by: Thane Thomson --- abci/src/application.rs | 79 +++++++---------------------------------- 1 file changed, 13 insertions(+), 66 deletions(-) diff --git a/abci/src/application.rs b/abci/src/application.rs index abc9c0da7..04b6b9b12 100644 --- a/abci/src/application.rs +++ b/abci/src/application.rs @@ -27,79 +27,37 @@ pub trait Application: Send + Clone + 'static { /// Provide information about the ABCI application. fn info(&self, _request: RequestInfo) -> ResponseInfo { - ResponseInfo { - data: "".to_string(), - version: "".to_string(), - app_version: 0, - last_block_height: 0, - last_block_app_hash: vec![], - } + Default::default() } /// Called once upon genesis. fn init_chain(&self, _request: RequestInitChain) -> ResponseInitChain { - ResponseInitChain { - consensus_params: None, - validators: vec![], - app_hash: vec![], - } + Default::default() } /// Query the application for data at the current or past height. fn query(&self, _request: RequestQuery) -> ResponseQuery { - ResponseQuery { - code: 0, - log: "".to_string(), - info: "".to_string(), - index: 0, - key: vec![], - value: vec![], - proof_ops: None, - height: 0, - codespace: "".to_string(), - } + Default::default() } /// Check the given transaction before putting it into the local mempool. fn check_tx(&self, _request: RequestCheckTx) -> ResponseCheckTx { - ResponseCheckTx { - code: 0, - data: vec![], - log: "".to_string(), - info: "".to_string(), - gas_wanted: 0, - gas_used: 0, - events: vec![], - codespace: "".to_string(), - } + Default::default() } /// Signals the beginning of a new block, prior to any `DeliverTx` calls. fn begin_block(&self, _request: RequestBeginBlock) -> ResponseBeginBlock { - ResponseBeginBlock { events: vec![] } + Default::default() } /// Apply a transaction to the application's state. fn deliver_tx(&self, _request: RequestDeliverTx) -> ResponseDeliverTx { - ResponseDeliverTx { - code: 0, - data: vec![], - log: "".to_string(), - info: "".to_string(), - gas_wanted: 0, - gas_used: 0, - events: vec![], - codespace: "".to_string(), - } + Default::default() } /// Signals the end of a block. fn end_block(&self, _request: RequestEndBlock) -> ResponseEndBlock { - ResponseEndBlock { - validator_updates: vec![], - consensus_param_updates: None, - events: vec![], - } + Default::default() } /// Signals that messages queued on the client should be flushed to the server. @@ -109,35 +67,28 @@ pub trait Application: Send + Clone + 'static { /// Commit the current state at the current height. fn commit(&self) -> ResponseCommit { - ResponseCommit { - data: vec![], - retain_height: 0, - } + Default::default() } /// Allows the Tendermint node to request that the application set an /// option to a particular value. fn set_option(&self, _request: RequestSetOption) -> ResponseSetOption { - ResponseSetOption { - code: 0, - log: "".to_string(), - info: "".to_string(), - } + Default::default() } /// Used during state sync to discover available snapshots on peers. fn list_snapshots(&self) -> ResponseListSnapshots { - ResponseListSnapshots { snapshots: vec![] } + Default::default() } /// Called when bootstrapping the node using state sync. fn offer_snapshot(&self, _request: RequestOfferSnapshot) -> ResponseOfferSnapshot { - ResponseOfferSnapshot { result: 0 } + Default::default() } /// Used during state sync to retrieve chunks of snapshots from peers. fn load_snapshot_chunk(&self, _request: RequestLoadSnapshotChunk) -> ResponseLoadSnapshotChunk { - ResponseLoadSnapshotChunk { chunk: vec![] } + Default::default() } /// Apply the given snapshot chunk to the application's state. @@ -145,11 +96,7 @@ pub trait Application: Send + Clone + 'static { &self, _request: RequestApplySnapshotChunk, ) -> ResponseApplySnapshotChunk { - ResponseApplySnapshotChunk { - result: 0, - refetch_chunks: vec![], - reject_senders: vec![], - } + Default::default() } /// Executes the relevant application method based on the type of the From 569f6aee73f2f7d7ba0a6ece2f24182e7c001605 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Fri, 29 Jan 2021 12:24:45 -0500 Subject: [PATCH 07/25] Enable debug logging for all incoming ABCI requests Signed-off-by: Thane Thomson --- abci/src/application.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/abci/src/application.rs b/abci/src/application.rs index 04b6b9b12..9253b12b4 100644 --- a/abci/src/application.rs +++ b/abci/src/application.rs @@ -102,6 +102,7 @@ pub trait Application: Send + Clone + 'static { /// Executes the relevant application method based on the type of the /// request, and produces the corresponding response. fn handle(&self, request: Request) -> Response { + log::debug!("Incoming request: {:?}", request); Response { value: Some(match request.value.unwrap() { Value::Echo(req) => response::Value::Echo(self.echo(req)), From 4497249112ab320ddc8c5b3bfacc375ade73b1d3 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Fri, 29 Jan 2021 14:54:55 -0500 Subject: [PATCH 08/25] Improve CLI UX Signed-off-by: Thane Thomson --- abci/Cargo.toml | 5 ++-- abci/src/application/kvstore/main.rs | 36 ++++++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/abci/Cargo.toml b/abci/Cargo.toml index ff0277559..49ae3dc1f 100644 --- a/abci/Cargo.toml +++ b/abci/Cargo.toml @@ -15,7 +15,7 @@ required-features = [ "binary", "kvstore-app" ] client = [] echo-app = [] kvstore-app = [] -binary = [ "env_logger" ] +binary = [ "simple_logger", "structopt" ] [dependencies] bytes = "1.0" @@ -25,4 +25,5 @@ prost = "0.7" tendermint-proto = { version = "0.18.0", path = "../proto" } thiserror = "1.0" -env_logger = { version = "0.8", optional = true } +simple_logger = { version = "1.11", optional = true } +structopt = { version = "0.3", optional = true } diff --git a/abci/src/application/kvstore/main.rs b/abci/src/application/kvstore/main.rs index c2bd3fdc3..9e7cd61f1 100644 --- a/abci/src/application/kvstore/main.rs +++ b/abci/src/application/kvstore/main.rs @@ -1,13 +1,45 @@ //! In-memory key/value store application for Tendermint. +use log::LevelFilter; +use simple_logger::SimpleLogger; +use structopt::StructOpt; use tendermint_abci::{KeyValueStoreApp, ServerBuilder}; +#[derive(Debug, StructOpt)] +struct Opt { + /// Bind the TCP server to this host. + #[structopt(short, long, default_value = "127.0.0.1")] + host: String, + + /// Bind the TCP server to this port. + #[structopt(short, long, default_value = "26658")] + port: u16, + + /// Increase output logging verbosity to DEBUG level. + #[structopt(short, long)] + verbose: bool, + + /// Suppress all output logging (overrides --verbose). + #[structopt(short, long)] + quiet: bool, +} + fn main() { - env_logger::init(); + let opt: Opt = Opt::from_args(); + SimpleLogger::new() + .with_level(if opt.quiet { + LevelFilter::Off + } else if opt.verbose { + LevelFilter::Debug + } else { + LevelFilter::Info + }) + .init() + .unwrap(); let (app, driver) = KeyValueStoreApp::new(); let server = ServerBuilder::default() - .bind("127.0.0.1:26658", app) + .bind(format!("{}:{}", opt.host, opt.port), app) .unwrap(); std::thread::spawn(move || driver.run()); server.listen().unwrap(); From e877c908d25e75272e25674db88b7abab872f7b5 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Fri, 29 Jan 2021 14:57:04 -0500 Subject: [PATCH 09/25] Allow for read buffer size customization Signed-off-by: Thane Thomson --- abci/src/application/kvstore/main.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/abci/src/application/kvstore/main.rs b/abci/src/application/kvstore/main.rs index 9e7cd61f1..fa21894d9 100644 --- a/abci/src/application/kvstore/main.rs +++ b/abci/src/application/kvstore/main.rs @@ -15,6 +15,11 @@ struct Opt { #[structopt(short, long, default_value = "26658")] port: u16, + /// The default server read buffer size, in bytes, for each incoming client + /// connection. + #[structopt(short, long, default_value = "1048576")] + read_buf_size: usize, + /// Increase output logging verbosity to DEBUG level. #[structopt(short, long)] verbose: bool, @@ -38,7 +43,7 @@ fn main() { .unwrap(); let (app, driver) = KeyValueStoreApp::new(); - let server = ServerBuilder::default() + let server = ServerBuilder::new(opt.read_buf_size) .bind(format!("{}:{}", opt.host, opt.port), app) .unwrap(); std::thread::spawn(move || driver.run()); From d4eaa56122bbddfa4ea824b751dec91376ce6958 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Mon, 8 Feb 2021 09:35:57 -0500 Subject: [PATCH 10/25] Add crate description Signed-off-by: Thane Thomson --- abci/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/abci/Cargo.toml b/abci/Cargo.toml index 49ae3dc1f..1f6415672 100644 --- a/abci/Cargo.toml +++ b/abci/Cargo.toml @@ -3,6 +3,10 @@ name = "tendermint-abci" version = "0.1.0" authors = ["Thane Thomson "] edition = "2018" +description = """ + tendermint-abci provides a simple framework with which to build low-level + applications on top of Tendermint. + """ # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html From 7cee7a97ba0bc7b3162b9f4339991a6aba58ed2a Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Mon, 8 Feb 2021 09:50:41 -0500 Subject: [PATCH 11/25] Update README for ABCI crate Signed-off-by: Thane Thomson --- abci/README.md | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/abci/README.md b/abci/README.md index 2d6b9693b..cf9bccd2c 100644 --- a/abci/README.md +++ b/abci/README.md @@ -1,4 +1,4 @@ -## tendermint-abci (WIP) +## tendermint-abci [![Crate][crate-image]][crate-link] [![Docs][docs-image]][docs-link] @@ -7,16 +7,31 @@ [![Apache 2.0 Licensed][license-image]][license-link] ![Rust Stable][rustc-image] -ABCI framework for building applications for Tendermint in Rust. +[ABCI] framework for building low-level applications for Tendermint in Rust. ## Requirements - The latest stable version of Rust +## API + +At present, this crate only exposes a synchronous, blocking API based on Rust's +standard library's networking capabilities. `async` client/server support is +planned in future updates. + +The primary trait to be implemented by an ABCI application is the +[`Application`] trait. One of the core ideas here is that an ABCI application +must be able to be cloned for use in different threads, since Tendermint opens +4 connections to the ABCI server. See the [spec][tendermint-abci-spec] for +details. + ## Examples -A trivial in-memory key/value store application is provided. To run this -application, from the `tendermint-abci` crate's directory: +See [`src/application`](./src/application/) for some example applications +written using this crate. + +To run the key/value store example application, from the `tendermint-abci` +crate's directory: ```bash # Set your logging level through RUST_LOG (e.g. RUST_LOG=info) @@ -95,3 +110,7 @@ limitations under the License. [rustc-image]: https://img.shields.io/badge/rustc-stable-blue.svg [//]: # (general links) + +[ABCI]: https://docs.tendermint.com/master/spec/abci/ +[`Application`]: ./src/application.rs +[tendermint-abci-spec]: https://github.com/tendermint/spec/blob/master/spec/abci/abci.md From b429e5bbcd0b4fe06df3ebbce563b5e912c1897d Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 9 Feb 2021 09:35:50 -0500 Subject: [PATCH 12/25] Add ABCI integration test for minimal ABCI crate (#797) * Add integration testing utility for ABCI key/value store Signed-off-by: Thane Thomson * Add hacky bash script to demonstrate parallel execution Signed-off-by: Thane Thomson * Created abci test harness (#800) * Created abci test harness * cargo make additions and docs Co-authored-by: Greg Szabo <16846635+greg-szabo@users.noreply.github.com> --- .cargo/config | 1 + .dockerignore | 19 ++ tools/Cargo.toml | 1 + tools/abci-test/Cargo.toml | 20 ++ tools/abci-test/Makefile.toml | 52 +++++ tools/abci-test/src/main.rs | 179 ++++++++++++++++++ tools/docker/README.md | 20 ++ .../.gitignore | 0 tools/docker/abci-harness-0.34.0/Dockerfile | 31 +++ tools/docker/abci-harness-0.34.0/entrypoint | 40 ++++ tools/docker/tendermint-0.34.0/.gitignore | 1 + .../Dockerfile | 0 .../entrypoint | 0 13 files changed, 364 insertions(+) create mode 100644 .dockerignore create mode 100644 tools/abci-test/Cargo.toml create mode 100644 tools/abci-test/Makefile.toml create mode 100644 tools/abci-test/src/main.rs rename tools/docker/{tendermint-v0.34.0 => abci-harness-0.34.0}/.gitignore (100%) create mode 100644 tools/docker/abci-harness-0.34.0/Dockerfile create mode 100755 tools/docker/abci-harness-0.34.0/entrypoint create mode 100644 tools/docker/tendermint-0.34.0/.gitignore rename tools/docker/{tendermint-v0.34.0 => tendermint-0.34.0}/Dockerfile (100%) rename tools/docker/{tendermint-v0.34.0 => tendermint-0.34.0}/entrypoint (100%) diff --git a/.cargo/config b/.cargo/config index 06136c7ca..3a3045ebd 100644 --- a/.cargo/config +++ b/.cargo/config @@ -2,4 +2,5 @@ build-all = "build --workspace --all-targets --" build-wasm-tendermint = "build -p tendermint --manifest-path tendermint/Cargo.toml --target wasm32-unknown-unknown --release --no-default-features --" build-wasm-light-client = "build -p tendermint-light-client --manifest-path light-client/Cargo.toml --target wasm32-unknown-unknown --release --no-default-features --" +build-abci = "build --manifest-path abci/Cargo.toml --bin kvstore-rs --features binary,kvstore-app" test-all-features = "test --all-features --no-fail-fast" diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..46c3ebed2 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,19 @@ +# Generated by Cargo +# will have compiled files and executables +/target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# These are log files emitted by model-based tests +**/*.log + +# RPC probe results +/rpc-probe/probe-results/ + +# Proptest regressions dumps +**/*.proptest-regressions diff --git a/tools/Cargo.toml b/tools/Cargo.toml index 07b2e5dd6..ce4a0ef08 100644 --- a/tools/Cargo.toml +++ b/tools/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ + "abci-test", "kvstore-test", "proto-compiler", "rpc-probe" diff --git a/tools/abci-test/Cargo.toml b/tools/abci-test/Cargo.toml new file mode 100644 index 000000000..f9e5dca5c --- /dev/null +++ b/tools/abci-test/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "abci-test" +version = "0.18.0" +authors = ["Thane Thomson "] +edition = "2018" +description = """ + abci-test provides some end-to-end integration testing between + tendermint-abci and a full Tendermint node. + """ + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +futures = "0.3" +log = "0.4" +simple_logger = "1.11" +structopt = "0.3" +tendermint = { version = "0.18.0", path = "../../tendermint" } +tendermint-rpc = { version = "0.18.0", path = "../../rpc", features = [ "websocket-client" ] } +tokio = { version = "1", features = ["full"] } diff --git a/tools/abci-test/Makefile.toml b/tools/abci-test/Makefile.toml new file mode 100644 index 000000000..a33e677fd --- /dev/null +++ b/tools/abci-test/Makefile.toml @@ -0,0 +1,52 @@ +[env] +CONTAINER_NAME = "abci-test" +DOCKER_IMAGE = "informaldev/abci-harness:0.34.0" +HOST_RPC_PORT = 26657 +CARGO_MAKE_WAIT_MILLISECONDS = 3500 + +# abci-test infrastructure: +# cargo make build-linux-abci - build the ABCI app using Docker (helpful on a Mac) +# cargo make - run the test harness and all tests. Expects a Linux ABCI app already built. +# cargo make docker-up-debug - troubleshoot the infra setup (useful to see docker error messages or the kvstore log). + +[tasks.default] +clear = true +dependencies = [ "docker-up", "wait", "test", "docker-down" ] + +[tasks.build-linux-abci] +command = "docker" +args = [ "run", "--rm", "--volume", "${CARGO_MAKE_WORKSPACE_WORKING_DIRECTORY}/..:/usr/src/myapp", "--workdir", "/usr/src/myapp", "rust:latest", "cargo", "build-abci" ] + +[tasks.docker-down] +dependencies = [ "docker-stop", "docker-rm" ] + +[tasks.docker-up] +command = "docker" +args = ["run", "--name", "${CONTAINER_NAME}", "--rm", "--publish", "26657:${HOST_RPC_PORT}", "--volume", "${CARGO_MAKE_WORKSPACE_WORKING_DIRECTORY}/../target/debug:/abci", "--detach", "${DOCKER_IMAGE}", "--verbose" ] +dependencies = ["docker-up-stop-old", "docker-up-rm-old"] + +[tasks.docker-up-debug] +command = "docker" +args = ["run", "--name", "${CONTAINER_NAME}", "--rm", "--publish", "26657:${HOST_RPC_PORT}", "--volume", "${CARGO_MAKE_WORKSPACE_WORKING_DIRECTORY}/../target/debug:/abci", "${DOCKER_IMAGE}", "--verbose" ] +dependencies = ["docker-up-stop-old", "docker-up-rm-old"] + +[tasks.test] +args = ["run", "--all-features"] + +[tasks.docker-stop] +command = "docker" +args = ["stop", "${CONTAINER_NAME}"] +ignore_errors = true +private = true + +[tasks.docker-rm] +command = "docker" +args = ["rm", "--force", "${CONTAINER_NAME}"] +ignore_errors = true +private = true + +[tasks.docker-up-stop-old] +alias = "docker-stop" + +[tasks.docker-up-rm-old] +alias = "docker-rm" diff --git a/tools/abci-test/src/main.rs b/tools/abci-test/src/main.rs new file mode 100644 index 000000000..35f225583 --- /dev/null +++ b/tools/abci-test/src/main.rs @@ -0,0 +1,179 @@ +//! ABCI key/value store integration test application. + +use futures::StreamExt; +use log::{debug, error, info, LevelFilter}; +use simple_logger::SimpleLogger; +use structopt::StructOpt; +use tendermint::abci::Transaction; +use tendermint::net::Address; +use tendermint_rpc::event::EventData; +use tendermint_rpc::query::EventType; +use tendermint_rpc::{Client, SubscriptionClient, WebSocketClient}; +use tokio::time::Duration; + +#[derive(Debug, StructOpt)] +/// A harness for testing tendermint-abci through a full Tendermint node +/// running our in-memory key/value store application (kvstore-rs). +struct Opt { + /// Tendermint RPC host address. + #[structopt(short, long, default_value = "127.0.0.1")] + host: String, + + /// Tendermint RPC port. + #[structopt(short, long, default_value = "26657")] + port: u16, + + #[structopt(short, long)] + verbose: bool, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let opt: Opt = Opt::from_args(); + SimpleLogger::new() + .with_level(if opt.verbose { + LevelFilter::Debug + } else { + LevelFilter::Info + }) + .init() + .unwrap(); + + info!("Connecting to Tendermint node at {}:{}", opt.host, opt.port); + let (mut client, driver) = WebSocketClient::new(Address::Tcp { + peer_id: None, + host: opt.host, + port: opt.port, + }) + .await?; + let driver_handle = tokio::spawn(async move { driver.run().await }); + let result = run_tests(&mut client).await; + client.close()?; + driver_handle.await??; + + match result { + Ok(_) => { + info!("Success!"); + Ok(()) + } + Err(e) => { + error!("Test failed: {:?}", e); + Err(e) + } + } +} + +async fn run_tests(client: &mut WebSocketClient) -> Result<(), Box> { + info!("Checking ABCI application version"); + let abci_info = client.abci_info().await?; + debug!("Received: {:?}", abci_info); + if abci_info.data != "kvstore-rs" { + fail("abci_info", "data", "kvstore-rs", abci_info.data)?; + } + + info!("Subscribing to transaction events"); + let mut tx_subs = client.subscribe(EventType::Tx.into()).await?; + + info!("Submitting a transaction"); + let raw_tx_key = "test-key".as_bytes().to_vec(); + let raw_tx_value = "test-value".as_bytes().to_vec(); + let mut raw_tx = raw_tx_key.clone(); + raw_tx.push('=' as u8); + raw_tx.extend(raw_tx_value.clone()); + + let _ = client + .broadcast_tx_async(Transaction::from(raw_tx.clone())) + .await?; + + info!("Checking for transaction events"); + let tx = tokio::time::timeout(Duration::from_secs(3), tx_subs.next()) + .await? + .ok_or_else(|| { + fail( + "transaction subscription", + "transaction", + "returned", + "nothing", + ) + .unwrap_err() + })??; + debug!("Got event: {:?}", tx); + match tx.data { + EventData::Tx { tx_result } => { + if tx_result.tx != raw_tx { + fail("transaction subscription", "tx", raw_tx, tx_result.tx)?; + } + } + _ => fail( + "transaction subscription", + "event data", + "of type Tx", + tx.data, + )?, + } + // Terminate our transaction subscription + drop(tx_subs); + + info!("Waiting for at least one block to pass to ensure transaction has been committed"); + let mut new_block_subs = client.subscribe(EventType::NewBlock.into()).await?; + let _ = new_block_subs.next().await.ok_or_else(|| { + fail("new block subscription", "event", "returned", "nothing").unwrap_err() + })??; + drop(new_block_subs); + + info!( + "Querying for the value associated with key {}", + String::from_utf8(raw_tx_key.clone()).unwrap() + ); + let res = client + .abci_query(None, raw_tx_key.clone(), None, false) + .await?; + if res.key != raw_tx_key { + fail("abci_query", "key", raw_tx_key, res.key)?; + } + if res.value != raw_tx_value { + fail("abci_query", "value", raw_tx_value, res.value)?; + } + + Ok(()) +} + +fn fail( + ctx: S1, + what: S2, + expected: S3, + actual: S4, +) -> Result<(), Box> +where + S1: ToString, + S2: ToString, + S3: std::fmt::Debug, + S4: std::fmt::Debug, +{ + Err(Box::new(AssertionError { + ctx: ctx.to_string(), + what: what.to_string(), + expected: format!("{:?}", expected), + actual: format!("{:?}", actual), + })) +} + +#[derive(Debug)] +struct AssertionError { + ctx: String, + what: String, + expected: String, + actual: String, +} + +impl std::fmt::Display for AssertionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "for {}, expected {} to be {}, but got {}", + self.ctx, self.what, self.expected, self.actual + ) + } +} + +impl std::error::Error for AssertionError {} diff --git a/tools/docker/README.md b/tools/docker/README.md index c3f90bfdc..b1dfeddae 100644 --- a/tools/docker/README.md +++ b/tools/docker/README.md @@ -24,3 +24,23 @@ Both wallets have `uatom`, `stake` and `n0token` added. Both wallets have an initial signed transaction created for easier population of the network before testing. These transactions will send uatom tokens from c0 -> c1 and vice versa. They are both signed as `sequence 0` in the wallet, so they can only be executed as the first transaction of the corresponding wallet. + +# abci-harness +This image is used during CI testing in the abci-rs crate. +It tests compatibility with the Tendermint Go implementation. +It derives from the Tendermint Docker image above, but it expects a volume attached at `/abci` that contains the ABCI +application to be tested. The name of the ABCI application is `kvstore-rs` by default. This can be changed by setting the +`ABCI_APP` environment variable. + +The image will fire up a Tendermint node (auto-creating the configuration) and then execute the ABCI application +from the attached volume. It logs the Tendermint node log into kvstore-rs.tendermint and the ABCI application log into +kvstore-rs.log on the attached volume. + +This image has both the `muslc` and `glibc` libraries installed for easy testing of dynamically linked binaries. + +Example: +```bash +docker run -it --rm -v $PWD/target/debug:/abci -p 26657:26657 informaldev/abci-harness:0.34.0 +``` + +The image version reflects the Tendermint Go binary version. diff --git a/tools/docker/tendermint-v0.34.0/.gitignore b/tools/docker/abci-harness-0.34.0/.gitignore similarity index 100% rename from tools/docker/tendermint-v0.34.0/.gitignore rename to tools/docker/abci-harness-0.34.0/.gitignore diff --git a/tools/docker/abci-harness-0.34.0/Dockerfile b/tools/docker/abci-harness-0.34.0/Dockerfile new file mode 100644 index 000000000..3dde3efdc --- /dev/null +++ b/tools/docker/abci-harness-0.34.0/Dockerfile @@ -0,0 +1,31 @@ +FROM alpine:3.13.1 +LABEL maintainer="hello@informal.systems" + +ENV TMHOME=/tendermint +#GLIBC for Alpine from: https://github.com/sgerrand/alpine-pkg-glibc +RUN wget https://alpine-pkgs.sgerrand.com/sgerrand.rsa.pub \ + -O /etc/apk/keys/sgerrand.rsa.pub && \ + wget https://github.com/sgerrand/alpine-pkg-glibc/releases/download/2.32-r0/glibc-2.32-r0.apk \ + https://github.com/sgerrand/alpine-pkg-glibc/releases/download/2.32-r0/glibc-bin-2.32-r0.apk \ + https://github.com/sgerrand/alpine-pkg-glibc/releases/download/2.32-r0/glibc-i18n-2.32-r0.apk && \ + apk add --no-cache glibc-2.32-r0.apk glibc-bin-2.32-r0.apk glibc-i18n-2.32-r0.apk && \ + rm glibc-2.32-r0.apk glibc-bin-2.32-r0.apk glibc-i18n-2.32-r0.apk && \ + /usr/glibc-compat/bin/localedef -i en_US -f UTF-8 en_US.UTF-8 && \ + apk --no-cache add jq bash file && \ + wget https://github.com/freshautomations/sconfig/releases/download/v0.1.0/sconfig_linux_amd64 \ + -O /usr/bin/sconfig && \ + chmod 755 /usr/bin/sconfig && \ + addgroup tendermint && \ + adduser -S -G tendermint tendermint -h "$TMHOME" +USER tendermint +WORKDIR $TMHOME + +EXPOSE 26656 26657 26658 26660 +STOPSIGNAL SIGTERM + +ARG TENDERMINT=tendermint +COPY $TENDERMINT /usr/bin/tendermint + +COPY entrypoint /usr/bin/entrypoint +ENTRYPOINT ["/usr/bin/entrypoint"] +VOLUME [ "$TMHOME", "/abci" ] diff --git a/tools/docker/abci-harness-0.34.0/entrypoint b/tools/docker/abci-harness-0.34.0/entrypoint new file mode 100755 index 000000000..a5ed98bb2 --- /dev/null +++ b/tools/docker/abci-harness-0.34.0/entrypoint @@ -0,0 +1,40 @@ +#!/usr/bin/env sh +set -euo pipefail + +ABCI_PATH="/abci/${ABCI_APP:-kvstore-rs}" + +if [ ! -x "${ABCI_PATH}" ]; then + echo "Could not find executable ABCI app at ${ABCI_PATH} ." + echo "Add a volume with the file and use the ABCI_APP environment variable to point to a different file." + exit 1 +else + FILE_TYPE="$(file -b "${ABCI_PATH}")" + if [ -n "${FILE_TYPE##ELF 64-bit*}" ]; then + echo "File is not an ELF 64-bit binary (${FILE_TYPE})." + echo "Build the ABCI application for Linux using Docker:" + echo "docker run -it --rm --user \"\$(id -u)\":\"\$(id -g)\" -v \"\$PWD\":/usr/src/myapp -w /usr/src/myapp rust:latest cargo build-abci" + exit 1 + fi +fi + +if [ ! -d "${TMHOME}/config" ]; then + + echo "Running tendermint init to create configuration." + /usr/bin/tendermint init + + sconfig -s ${TMHOME}/config/config.toml \ + moniker=${MONIKER:-dockernode} \ + consensus.timeout_commit=500ms \ + rpc.laddr=tcp://0.0.0.0:26657 \ + p2p.addr_book_strict=false \ + instrumentation.prometheus=true + + sconfig -s ${TMHOME}/config/genesis.json \ + chain_id=${CHAIN_ID:-dockerchain} \ + consensus_params.block.time_iota_ms=500 + +fi + +exec /usr/bin/tendermint node 2>&1 > "${ABCI_PATH}.tendermint" & + +exec "${ABCI_PATH}" "$@" 2>&1 | tee "${ABCI_PATH}.log" diff --git a/tools/docker/tendermint-0.34.0/.gitignore b/tools/docker/tendermint-0.34.0/.gitignore new file mode 100644 index 000000000..0c8c39bef --- /dev/null +++ b/tools/docker/tendermint-0.34.0/.gitignore @@ -0,0 +1 @@ +tendermint \ No newline at end of file diff --git a/tools/docker/tendermint-v0.34.0/Dockerfile b/tools/docker/tendermint-0.34.0/Dockerfile similarity index 100% rename from tools/docker/tendermint-v0.34.0/Dockerfile rename to tools/docker/tendermint-0.34.0/Dockerfile diff --git a/tools/docker/tendermint-v0.34.0/entrypoint b/tools/docker/tendermint-0.34.0/entrypoint similarity index 100% rename from tools/docker/tendermint-v0.34.0/entrypoint rename to tools/docker/tendermint-0.34.0/entrypoint From 7e106f8acb0bb369e17aedd045e592856147b171 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 9 Feb 2021 14:12:51 -0500 Subject: [PATCH 13/25] Update abci/src/codec.rs Co-authored-by: Romain Ruetschi --- abci/src/codec.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/abci/src/codec.rs b/abci/src/codec.rs index 93063912d..2f85e5fac 100644 --- a/abci/src/codec.rs +++ b/abci/src/codec.rs @@ -68,12 +68,9 @@ where loop { // Try to decode an incoming message from our buffer first match decode_length_delimited::(&mut self.read_buf) { - Ok(opt) => { - if let Some(incoming) = opt { - return Some(Ok(incoming)); - } - } + Ok(Some(incoming)) => return Some(Ok(incoming)), Err(e) => return Some(Err(e)), + _ => (), // not enough data to decode a message, let's continue. } // If we don't have enough data to decode a message, try to read From 88f32470912b966680776e42a157df10990d250d Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 9 Feb 2021 14:15:19 -0500 Subject: [PATCH 14/25] Apply suggestion from https://github.com/informalsystems/tendermint-rs/pull/794\#discussion_r573100911 Signed-off-by: Thane Thomson --- abci/src/codec.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/abci/src/codec.rs b/abci/src/codec.rs index 2f85e5fac..6ae623b0b 100644 --- a/abci/src/codec.rs +++ b/abci/src/codec.rs @@ -135,14 +135,9 @@ where let mut tmp = src.clone().freeze(); let encoded_len = match decode_varint(&mut tmp) { Ok(len) => len, - Err(e) => { - return if src_len <= MAX_VARINT_LENGTH { - // We've potentially only received a partial length delimiter - Ok(None) - } else { - Err(e) - }; - } + // We've potentially only received a partial length delimiter + Err(_) if src_len <= MAX_VARINT_LENGTH => return Ok(None), + Err(e) => return Err(e), }; let remaining = tmp.remaining() as u64; if remaining < encoded_len { From a7d290386a657c78846ef163e3da76b85cc20432 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 9 Feb 2021 17:34:41 -0500 Subject: [PATCH 15/25] Refactor error handing and expose eyre::Result as crate default Result type Signed-off-by: Thane Thomson --- abci/src/application/kvstore.rs | 5 +++-- abci/src/client.rs | 7 ++----- abci/src/{result.rs => error.rs} | 14 +------------- abci/src/lib.rs | 7 +++++-- 4 files changed, 11 insertions(+), 22 deletions(-) rename abci/src/{result.rs => error.rs} (59%) diff --git a/abci/src/application/kvstore.rs b/abci/src/application/kvstore.rs index ffbd2be77..919917445 100644 --- a/abci/src/application/kvstore.rs +++ b/abci/src/application/kvstore.rs @@ -265,9 +265,10 @@ enum Command { fn channel_send(tx: &Sender, value: T) -> Result<()> { tx.send(value) - .map_err(|e| Error::ChannelSend(e.to_string())) + .map_err(|e| Error::ChannelSend(e.to_string()).into()) } fn channel_recv(rx: &Receiver) -> Result { - rx.recv().map_err(|e| Error::ChannelRecv(e.to_string())) + rx.recv() + .map_err(|e| Error::ChannelRecv(e.to_string()).into()) } diff --git a/abci/src/client.rs b/abci/src/client.rs index 386f69a45..d8cf0db23 100644 --- a/abci/src/client.rs +++ b/abci/src/client.rs @@ -56,10 +56,7 @@ macro_rules! perform { ($self:expr, $type:ident, $req:expr) => { match $self.perform(request::Value::$type($req))? { response::Value::$type(r) => Ok(r), - r => Err(Error::UnexpectedServerResponseType( - stringify!($type).to_string(), - r, - )), + r => Err(Error::UnexpectedServerResponseType(stringify!($type).to_string(), r).into()), } }; } @@ -153,7 +150,7 @@ impl Client { .ok_or(Error::ServerConnectionTerminated)??; match res.value { Some(value) => Ok(value), - None => Err(Error::MalformedServerResponse), + None => Err(Error::MalformedServerResponse.into()), } } } diff --git a/abci/src/result.rs b/abci/src/error.rs similarity index 59% rename from abci/src/result.rs rename to abci/src/error.rs index 84b34d73e..a66021cd5 100644 --- a/abci/src/result.rs +++ b/abci/src/error.rs @@ -1,22 +1,10 @@ -//! tendermint-abci error and result handling. +//! tendermint-abci errors use thiserror::Error; -/// Result type used throughout the crate. -pub type Result = std::result::Result; - /// Errors that can be produced by tendermint-abci. #[derive(Debug, Error)] pub enum Error { - #[error("I/O error")] - Io(#[from] std::io::Error), - - #[error("protobuf encoding error")] - ProtobufEncode(#[from] prost::EncodeError), - - #[error("protobuf decoding error")] - ProtobufDecode(#[from] prost::DecodeError), - #[error("server connection terminated")] ServerConnectionTerminated, diff --git a/abci/src/lib.rs b/abci/src/lib.rs index 65449502a..d33b55bf2 100644 --- a/abci/src/lib.rs +++ b/abci/src/lib.rs @@ -4,14 +4,17 @@ mod application; #[cfg(feature = "client")] mod client; mod codec; -mod result; +mod error; mod server; +// Re-exported +pub use eyre::Result; + // Common exports pub use application::Application; #[cfg(feature = "client")] pub use client::{Client, ClientBuilder}; -pub use result::{Error, Result}; +pub use error::Error; pub use server::{Server, ServerBuilder}; // Example applications From 98d03688715cfae2619a140921c0dcff4f788a10 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 9 Feb 2021 17:49:31 -0500 Subject: [PATCH 16/25] Refactor to use tracing instead of log Signed-off-by: Thane Thomson --- abci/Cargo.toml | 6 +++--- abci/src/application.rs | 2 +- abci/src/application/kvstore.rs | 2 +- abci/src/application/kvstore/main.rs | 21 +++++++++------------ abci/src/server.rs | 2 +- 5 files changed, 15 insertions(+), 18 deletions(-) diff --git a/abci/Cargo.toml b/abci/Cargo.toml index 1f6415672..1653c9b06 100644 --- a/abci/Cargo.toml +++ b/abci/Cargo.toml @@ -19,15 +19,15 @@ required-features = [ "binary", "kvstore-app" ] client = [] echo-app = [] kvstore-app = [] -binary = [ "simple_logger", "structopt" ] +binary = [ "structopt", "tracing-subscriber" ] [dependencies] bytes = "1.0" eyre = "0.6" -log = "0.4" prost = "0.7" tendermint-proto = { version = "0.18.0", path = "../proto" } thiserror = "1.0" +tracing = "0.1" -simple_logger = { version = "1.11", optional = true } structopt = { version = "0.3", optional = true } +tracing-subscriber = { version = "0.2", optional = true } diff --git a/abci/src/application.rs b/abci/src/application.rs index 9253b12b4..aa8ce5c36 100644 --- a/abci/src/application.rs +++ b/abci/src/application.rs @@ -102,7 +102,7 @@ pub trait Application: Send + Clone + 'static { /// Executes the relevant application method based on the type of the /// request, and produces the corresponding response. fn handle(&self, request: Request) -> Response { - log::debug!("Incoming request: {:?}", request); + tracing::debug!("Incoming request: {:?}", request); Response { value: Some(match request.value.unwrap() { Value::Echo(req) => response::Value::Echo(self.echo(req)), diff --git a/abci/src/application/kvstore.rs b/abci/src/application/kvstore.rs index 919917445..6a1cf5097 100644 --- a/abci/src/application/kvstore.rs +++ b/abci/src/application/kvstore.rs @@ -3,13 +3,13 @@ use crate::codec::{encode_varint, MAX_VARINT_LENGTH}; use crate::{Application, Error, Result}; use bytes::BytesMut; -use log::{debug, info}; use std::collections::HashMap; use std::sync::mpsc::{channel, Receiver, Sender}; use tendermint_proto::abci::{ Event, EventAttribute, RequestCheckTx, RequestDeliverTx, RequestInfo, RequestQuery, ResponseCheckTx, ResponseCommit, ResponseDeliverTx, ResponseInfo, ResponseQuery, }; +use tracing::{debug, info}; /// In-memory, hashmap-backed key/value store ABCI application. /// diff --git a/abci/src/application/kvstore/main.rs b/abci/src/application/kvstore/main.rs index fa21894d9..5eba12d75 100644 --- a/abci/src/application/kvstore/main.rs +++ b/abci/src/application/kvstore/main.rs @@ -1,9 +1,8 @@ //! In-memory key/value store application for Tendermint. -use log::LevelFilter; -use simple_logger::SimpleLogger; use structopt::StructOpt; use tendermint_abci::{KeyValueStoreApp, ServerBuilder}; +use tracing_subscriber::filter::LevelFilter; #[derive(Debug, StructOpt)] struct Opt { @@ -31,16 +30,14 @@ struct Opt { fn main() { let opt: Opt = Opt::from_args(); - SimpleLogger::new() - .with_level(if opt.quiet { - LevelFilter::Off - } else if opt.verbose { - LevelFilter::Debug - } else { - LevelFilter::Info - }) - .init() - .unwrap(); + let log_level = if opt.quiet { + LevelFilter::OFF + } else if opt.verbose { + LevelFilter::DEBUG + } else { + LevelFilter::INFO + }; + tracing_subscriber::fmt().with_max_level(log_level).init(); let (app, driver) = KeyValueStoreApp::new(); let server = ServerBuilder::new(opt.read_buf_size) diff --git a/abci/src/server.rs b/abci/src/server.rs index 4dde9bdb9..53c498cd7 100644 --- a/abci/src/server.rs +++ b/abci/src/server.rs @@ -2,9 +2,9 @@ use crate::codec::ServerCodec; use crate::{Application, Result}; -use log::{error, info}; use std::net::{TcpListener, TcpStream, ToSocketAddrs}; use std::thread; +use tracing::{error, info}; /// The size of the read buffer for each incoming connection to the ABCI /// server. From 7950597149e914b65fbe633706f03ece3359552c Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Thu, 11 Feb 2021 13:17:22 -0500 Subject: [PATCH 17/25] Add newline Signed-off-by: Thane Thomson --- tools/docker/tendermint-0.34.0/.gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/docker/tendermint-0.34.0/.gitignore b/tools/docker/tendermint-0.34.0/.gitignore index 0c8c39bef..9059c6848 100644 --- a/tools/docker/tendermint-0.34.0/.gitignore +++ b/tools/docker/tendermint-0.34.0/.gitignore @@ -1 +1 @@ -tendermint \ No newline at end of file +tendermint From ad2501afe2731b722124369e6469321d751adf4c Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Thu, 11 Feb 2021 13:18:55 -0500 Subject: [PATCH 18/25] Remove comment relating to constraints on Codec struct params Signed-off-by: Thane Thomson --- abci/src/codec.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/abci/src/codec.rs b/abci/src/codec.rs index 6ae623b0b..919e2e798 100644 --- a/abci/src/codec.rs +++ b/abci/src/codec.rs @@ -24,8 +24,6 @@ pub type ClientCodec = Codec; /// Allows for iteration over `S` to produce instances of `I`, as well as /// sending instances of `O`. -/// -/// `S` must implement [`std::io::Read`] and [`std::io::Write`]. pub struct Codec { stream: S, // Long-running read buffer From fefa9bc16ed388a33ddb20c5ea63cda5de1df253 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Thu, 18 Feb 2021 14:14:11 -0500 Subject: [PATCH 19/25] Version tendermint-abci crate in line with other tendermint-rs crates Signed-off-by: Thane Thomson --- abci/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/abci/Cargo.toml b/abci/Cargo.toml index 1653c9b06..4318ad2c4 100644 --- a/abci/Cargo.toml +++ b/abci/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tendermint-abci" -version = "0.1.0" +version = "0.18.1" authors = ["Thane Thomson "] edition = "2018" description = """ From 723c5068f218e25beffa6c0575cbad7a5f820b32 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Thu, 18 Feb 2021 14:14:19 -0500 Subject: [PATCH 20/25] Update CHANGELOG Signed-off-by: Thane Thomson --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index adfe82ed2..a620ec1bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +## Unreleased + +### FEATURES + +* `[tendermint-abci]` Release minimal framework for building ABCI applications + in Rust ([#794]) + +[#794]: https://github.com/informalsystems/tendermint-rs/pull/794 + ## v0.18.1 *Feb 10, 2021* From a77b8da35b5f74f3cbf5c1646e4e49462620d74c Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Thu, 18 Feb 2021 14:33:19 -0500 Subject: [PATCH 21/25] Expand crate documentation Signed-off-by: Thane Thomson --- abci/src/application.rs | 4 ++++ abci/src/server.rs | 36 +++++++++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/abci/src/application.rs b/abci/src/application.rs index aa8ce5c36..c33ac8a09 100644 --- a/abci/src/application.rs +++ b/abci/src/application.rs @@ -17,6 +17,10 @@ use tendermint_proto::abci::{ }; /// An ABCI application. +/// +/// Applications are `Send` + `Clone` + `'static` because they are cloned for +/// each incoming connection to the ABCI server. It is up to the application +/// developer to manage shared state between these clones of their application. pub trait Application: Send + Clone + 'static { /// Echo back the same message as provided in the request. fn echo(&self, request: RequestEcho) -> ResponseEcho { diff --git a/abci/src/server.rs b/abci/src/server.rs index 53c498cd7..2ae0a7872 100644 --- a/abci/src/server.rs +++ b/abci/src/server.rs @@ -7,21 +7,50 @@ use std::thread; use tracing::{error, info}; /// The size of the read buffer for each incoming connection to the ABCI -/// server. +/// server (1MB). pub const DEFAULT_SERVER_READ_BUF_SIZE: usize = 1024 * 1024; /// Allows us to configure and construct an ABCI server. +/// +/// ## Example +/// +/// ```rust +/// use tendermint_abci::{EchoApp, ServerBuilder, ClientBuilder}; +/// use tendermint_proto::abci::RequestEcho; +/// +/// let server = ServerBuilder::default() +/// .bind("127.0.0.1:26658", EchoApp::default()) +/// .unwrap(); +/// let server_addr = server.local_addr(); +/// std::thread::spawn(move || server.listen().unwrap()); +/// +/// let mut client = ClientBuilder::default() +/// .connect(server_addr) +/// .unwrap(); +/// +/// let message = String::from("Hello ABCI!"); +/// let response = client.echo(RequestEcho { message: message.clone() }).unwrap(); +/// assert_eq!(response.message, message); +/// ``` pub struct ServerBuilder { read_buf_size: usize, } impl ServerBuilder { /// Builder constructor. + /// + /// Allows you to specify the read buffer size used when reading chunks of + /// incoming data from the client. This needs to be tuned for your + /// application. pub fn new(read_buf_size: usize) -> Self { Self { read_buf_size } } /// Constructor for an ABCI server. + /// + /// Binds the server to the given address. You must subsequently call the + /// [`Server::listen`] method in order for incoming connections' requests + /// to be routed to the specified ABCI application. pub fn bind(self, addr: Addr, app: App) -> Result> where Addr: ToSocketAddrs, @@ -48,6 +77,11 @@ impl Default for ServerBuilder { } /// A TCP-based server for serving a specific ABCI application. +/// +/// Each incoming connection is handled in a separate thread. The ABCI +/// application is cloned for access in each thread. It is up to the +/// application developer to manage shared state across these different +/// threads. pub struct Server { app: App, listener: TcpListener, From 17ee9d901fe6c98bcdfa17a3637546a95b691bdd Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Thu, 18 Feb 2021 14:38:57 -0500 Subject: [PATCH 22/25] Extract request dispatch functionality from Application trait Signed-off-by: Thane Thomson --- abci/src/application.rs | 14 ++++++++++++-- abci/src/server.rs | 1 + 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/abci/src/application.rs b/abci/src/application.rs index c33ac8a09..e201e14bd 100644 --- a/abci/src/application.rs +++ b/abci/src/application.rs @@ -19,8 +19,9 @@ use tendermint_proto::abci::{ /// An ABCI application. /// /// Applications are `Send` + `Clone` + `'static` because they are cloned for -/// each incoming connection to the ABCI server. It is up to the application -/// developer to manage shared state between these clones of their application. +/// each incoming connection to the ABCI [`Server`]. It is up to the +/// application developer to manage shared state between these clones of their +/// application. pub trait Application: Send + Clone + 'static { /// Echo back the same message as provided in the request. fn echo(&self, request: RequestEcho) -> ResponseEcho { @@ -102,9 +103,18 @@ pub trait Application: Send + Clone + 'static { ) -> ResponseApplySnapshotChunk { Default::default() } +} +/// Provides a mechanism for the [`Server`] to execute incoming requests while +/// expecting the correct response types. +pub trait RequestDispatcher { /// Executes the relevant application method based on the type of the /// request, and produces the corresponding response. + fn handle(&self, request: Request) -> Response; +} + +// Implement `RequestDispatcher` for all `Application`s. +impl RequestDispatcher for A { fn handle(&self, request: Request) -> Response { tracing::debug!("Incoming request: {:?}", request); Response { diff --git a/abci/src/server.rs b/abci/src/server.rs index 2ae0a7872..405a1f586 100644 --- a/abci/src/server.rs +++ b/abci/src/server.rs @@ -1,5 +1,6 @@ //! ABCI application server interface. +use crate::application::RequestDispatcher; use crate::codec::ServerCodec; use crate::{Application, Result}; use std::net::{TcpListener, TcpStream, ToSocketAddrs}; From a30d19d699193c2cc8941d8c2a3aa41a017671a8 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Thu, 18 Feb 2021 14:43:20 -0500 Subject: [PATCH 23/25] Move ABCI server example to crate root Signed-off-by: Thane Thomson --- abci/src/lib.rs | 29 +++++++++++++++++++++++++++++ abci/src/server.rs | 21 --------------------- 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/abci/src/lib.rs b/abci/src/lib.rs index d33b55bf2..07b731684 100644 --- a/abci/src/lib.rs +++ b/abci/src/lib.rs @@ -1,4 +1,33 @@ //! ABCI framework for building Tendermint applications in Rust. +//! +//! ## Example +//! +//! The following example shows how to use the trivial [`EchoApp`] to +//! instantiate an ABCI server. +//! +//! **NOTE**: `EchoApp` won't work when coupled with a real Tendermint node, as +//! it does not implement the minimum ABCI application functionality required +//! by Tendermint. See the [`KeyValueStoreApp`] for a more functional ABCI +//! application. +//! +//! ```rust +//! use tendermint_abci::{EchoApp, ServerBuilder, ClientBuilder}; +//! use tendermint_proto::abci::RequestEcho; +//! +//! let server = ServerBuilder::default() +//! .bind("127.0.0.1:26658", EchoApp::default()) +//! .unwrap(); +//! let server_addr = server.local_addr(); +//! std::thread::spawn(move || server.listen().unwrap()); +//! +//! let mut client = ClientBuilder::default() +//! .connect(server_addr) +//! .unwrap(); +//! +//! let message = String::from("Hello ABCI!"); +//! let response = client.echo(RequestEcho { message: message.clone() }).unwrap(); +//! assert_eq!(response.message, message); +//! ``` mod application; #[cfg(feature = "client")] diff --git a/abci/src/server.rs b/abci/src/server.rs index 405a1f586..b9e6dc51b 100644 --- a/abci/src/server.rs +++ b/abci/src/server.rs @@ -12,27 +12,6 @@ use tracing::{error, info}; pub const DEFAULT_SERVER_READ_BUF_SIZE: usize = 1024 * 1024; /// Allows us to configure and construct an ABCI server. -/// -/// ## Example -/// -/// ```rust -/// use tendermint_abci::{EchoApp, ServerBuilder, ClientBuilder}; -/// use tendermint_proto::abci::RequestEcho; -/// -/// let server = ServerBuilder::default() -/// .bind("127.0.0.1:26658", EchoApp::default()) -/// .unwrap(); -/// let server_addr = server.local_addr(); -/// std::thread::spawn(move || server.listen().unwrap()); -/// -/// let mut client = ClientBuilder::default() -/// .connect(server_addr) -/// .unwrap(); -/// -/// let message = String::from("Hello ABCI!"); -/// let response = client.echo(RequestEcho { message: message.clone() }).unwrap(); -/// assert_eq!(response.message, message); -/// ``` pub struct ServerBuilder { read_buf_size: usize, } From 56a339f22eb6a140d1c44cde5af857608ba7082c Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Thu, 18 Feb 2021 14:53:56 -0500 Subject: [PATCH 24/25] Fix broken link in docs Signed-off-by: Thane Thomson --- abci/src/application.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/abci/src/application.rs b/abci/src/application.rs index e201e14bd..b4a848608 100644 --- a/abci/src/application.rs +++ b/abci/src/application.rs @@ -22,6 +22,8 @@ use tendermint_proto::abci::{ /// each incoming connection to the ABCI [`Server`]. It is up to the /// application developer to manage shared state between these clones of their /// application. +/// +/// [`Server`]: crate::Server pub trait Application: Send + Clone + 'static { /// Echo back the same message as provided in the request. fn echo(&self, request: RequestEcho) -> ResponseEcho { From 5f8cbe2474093a1205c369a7a5d9bc969d4b5f90 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Thu, 18 Feb 2021 14:54:21 -0500 Subject: [PATCH 25/25] Replace EchoApp example with KeyValueStoreApp example Signed-off-by: Thane Thomson --- abci/src/lib.rs | 62 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 19 deletions(-) diff --git a/abci/src/lib.rs b/abci/src/lib.rs index 07b731684..356574372 100644 --- a/abci/src/lib.rs +++ b/abci/src/lib.rs @@ -1,32 +1,56 @@ -//! ABCI framework for building Tendermint applications in Rust. +//! ABCI framework for building [Tendermint] applications in Rust. //! -//! ## Example +//! [Tendermint]: https://tendermint.com //! -//! The following example shows how to use the trivial [`EchoApp`] to -//! instantiate an ABCI server. +//! ## Example //! -//! **NOTE**: `EchoApp` won't work when coupled with a real Tendermint node, as -//! it does not implement the minimum ABCI application functionality required -//! by Tendermint. See the [`KeyValueStoreApp`] for a more functional ABCI -//! application. +//! The following example is adapted from our integration test suite to +//! demonstrate how to instantiate an ABCI application, server and client and +//! have them interact. In practice, the client interaction will be performed +//! by a full Tendermint node. //! //! ```rust -//! use tendermint_abci::{EchoApp, ServerBuilder, ClientBuilder}; -//! use tendermint_proto::abci::RequestEcho; +//! use tendermint_abci::{KeyValueStoreApp, ServerBuilder, ClientBuilder}; +//! use tendermint_proto::abci::{RequestEcho, RequestDeliverTx, RequestQuery}; //! -//! let server = ServerBuilder::default() -//! .bind("127.0.0.1:26658", EchoApp::default()) -//! .unwrap(); +//! // Create our key/value store application +//! let (app, driver) = KeyValueStoreApp::new(); +//! // Create our server, binding it to TCP port 26658 on localhost and +//! // supplying it with our key/value store application +//! let server = ServerBuilder::default().bind("127.0.0.1:26658", app).unwrap(); //! let server_addr = server.local_addr(); -//! std::thread::spawn(move || server.listen().unwrap()); //! -//! let mut client = ClientBuilder::default() -//! .connect(server_addr) +//! // We want the driver and the server to run in the background while we +//! // interact with them via the client in the foreground +//! std::thread::spawn(move || driver.run()); +//! std::thread::spawn(move || server.listen()); +//! +//! let mut client = ClientBuilder::default().connect(server_addr).unwrap(); +//! let res = client +//! .echo(RequestEcho { +//! message: "Hello ABCI!".to_string(), +//! }) //! .unwrap(); +//! assert_eq!(res.message, "Hello ABCI!"); //! -//! let message = String::from("Hello ABCI!"); -//! let response = client.echo(RequestEcho { message: message.clone() }).unwrap(); -//! assert_eq!(response.message, message); +//! // Deliver a transaction and then commit the transaction +//! client +//! .deliver_tx(RequestDeliverTx { +//! tx: "test-key=test-value".as_bytes().to_owned(), +//! }) +//! .unwrap(); +//! client.commit().unwrap(); +//! +//! // We should be able to query for the data we just delivered above +//! let res = client +//! .query(RequestQuery { +//! data: "test-key".as_bytes().to_owned(), +//! path: "".to_string(), +//! height: 0, +//! prove: false, +//! }) +//! .unwrap(); +//! assert_eq!(res.value, "test-value".as_bytes().to_owned()); //! ``` mod application;