Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge abci-rs in tendermint-rs repository #489

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]

members = [
"abci",
"light-client",
"light-node",
"proto",
Expand Down
1 change: 1 addition & 0 deletions abci/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/src/proto/*.rs
44 changes: 44 additions & 0 deletions abci/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
[package]
name = "abci"
version = "0.10.0"
authors = ["Devashish Dixit <devashishdxt@gmail.com>"]
license = "MIT/Apache-2.0"
description = "A Rust crate for creating ABCI applications"
homepage = "https://github.com/devashishdxt/abci-rs"
repository = "https://github.com/devashishdxt/abci-rs"
categories = ["network-programming"]
keywords = ["blockchain", "tendermint", "abci"]
readme = "README.md"
include = ["Cargo.toml", "src/**/*.rs", "README.md"]
edition = "2018"

[lib]
name = "abci"

[package.metadata.docs.rs]
features = ["doc"]
rustdoc-args = ["--cfg", "feature=\"doc\""]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-std = { version = "1.6", optional = true }
async-trait = "0.1"
bytes = "0.5"
integer-encoding = { version = "1.1", optional = true }
prost = "0.6"
prost-types = "0.6"
tendermint-proto = { path = "../proto" }
tokio = { version = "0.2", optional = true, features = ["io-util", "sync", "tcp", "stream", "rt-core", "uds"] }
tracing = { version = "0.1", features = ["log"] }
tracing-futures = "0.2"

[dev-dependencies]
tokio = { version = "0.2", features = ["macros"] }
tracing-subscriber = "0.2"

[features]
default = ["use-tokio"]
doc = []
use-async-std = ["async-std", "integer-encoding/futures_async"]
use-tokio = ["tokio", "integer-encoding/tokio_async"]
83 changes: 83 additions & 0 deletions abci/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# abci

[![Continuous Integration](https://github.com/devashishdxt/abci-rs/workflows/Continuous%20Integration/badge.svg)](https://github.com/devashishdxt/abci-rs/actions?query=workflow%3A%22Continuous+Integration%22)
[![Crates.io](https://img.shields.io/crates/v/abci-rs)](https://crates.io/crates/abci-rs)
[![Documentation](https://docs.rs/abci-rs/badge.svg)](https://docs.rs/abci-rs)
[![License](https://img.shields.io/crates/l/abci-rs)](https://github.com/devashishdxt/abci-rs/blob/master/LICENSE-MIT)

A Rust crate for creating ABCI applications.

## ABCI Overview

ABCI is the interface between Tendermint (a state-machine replication engine) and your application (the actual state
machine). It consists of a set of methods, where each method has a corresponding `Request` and `Response` message type.
Tendermint calls the ABCI methods on the ABCI application by sending the `Request` messages and receiving the `Response`
messages in return.

ABCI methods are split across 4 separate ABCI connections:

- `Consensus` Connection: `InitChain`, `BeginBlock`, `DeliverTx`, `EndBlock`, `Commit`
- `Mempool` Connection: `CheckTx`
- `Info` Connection: `Info`, `SetOption`, `Query`
- `Snapshot` Connection: `ListSnapshots`, `LoadSnapshotChunk`, `OfferSnapshot`, `ApplySnapshotChunk`

Additionally, there is a `Flush` method that is called on every connection, and an `Echo` method that is just for
debugging.

To know more about ABCI protocol specifications, go to official ABCI [documentation](https://tendermint.com/docs/spec/abci/).

## Usage

Add `abci` in your `Cargo.toml`'s `dependencies` section:

```toml
[dependencies]
abci = "0.10"
```

Each ABCI application has to implement three core traits corresponding to all three ABCI connections, `Consensus`,
`Mempool` and `Info`.

> Note: Implementations of these traits are expected to be `Send + Sync` and methods take immutable reference of `self`.
So, internal mutability must be handled using thread safe (`Arc`, `Mutex`, etc.) constructs.

After implementing all three above mentioned `trait`s, you can create a `Server` object and use `Server::run()` to start
ABCI application.

`Server::run()` is an `async` function and returns a `Future`. So, you'll need an executor to drive `Future` returned
from `Server::run()`. `async-std` and `tokio` are two popular options. In `counter` example, we use `tokio`'s executor.

To know more, go to `examples/` to see a sample ABCI application.

### Documentation

- [`master`](https://devashishdxt.github.io/abci-rs/abci/)
- [`release`](https://docs.rs/abci-rs/)

### Features

- `use-tokio`: Enables `tokio` backend for running ABCI TCP/UDS server
- **Enabled** by default.
- `use-async-std`: Enables `async-std` backend for running ABCI TCP/UDS server
- **Disabled** by default.

> Features `use-tokio` and `use-async-std` are mutually exclusive, i.e., only one of them can be enabled at a time.
Compilation will fail if either both of them are enabled or none of them are enabled.

## Minimum Supported Versions

- Tendermint: [`v0.33.6`](https://github.com/tendermint/tendermint/releases/tag/v0.33.6)

## License

Licensed under either of

- Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE))
- MIT license ([LICENSE-MIT](LICENSE-MIT))

at your option.

## Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as
defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
213 changes: 213 additions & 0 deletions abci/examples/counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
use std::{
net::SocketAddr,
sync::{Arc, Mutex},
};

use abci::{async_trait, Consensus, Info, Mempool, Server, Snapshot};
use tendermint_proto::abci::*;
use tracing::{subscriber::set_global_default, Level};
use tracing_subscriber::FmtSubscriber;

/// Simple counter
#[derive(Debug, Default, Clone)]
pub struct CounterState {
block_height: i64,
app_hash: Vec<u8>,
counter: u64,
}

#[derive(Debug)]
pub struct ConsensusConnection {
committed_state: Arc<Mutex<CounterState>>,
current_state: Arc<Mutex<Option<CounterState>>>,
}

impl ConsensusConnection {
pub fn new(
committed_state: Arc<Mutex<CounterState>>,
current_state: Arc<Mutex<Option<CounterState>>>,
) -> Self {
Self {
committed_state,
current_state,
}
}
}

#[async_trait]
impl Consensus for ConsensusConnection {
async fn init_chain(&self, _init_chain_request: RequestInitChain) -> ResponseInitChain {
Default::default()
}

async fn begin_block(&self, _begin_block_request: RequestBeginBlock) -> ResponseBeginBlock {
let committed_state = self.committed_state.lock().unwrap().clone();

let mut current_state = self.current_state.lock().unwrap();
*current_state = Some(committed_state);

Default::default()
}

async fn deliver_tx(&self, deliver_tx_request: RequestDeliverTx) -> ResponseDeliverTx {
let new_counter = parse_bytes_to_counter(&deliver_tx_request.tx);

if new_counter.is_err() {
let mut error = ResponseDeliverTx::default();
error.code = 1;
error.codespace = "Parsing error".to_owned();
error.log = "Transaction should be 8 bytes long".to_owned();
error.info = "Transaction is big-endian encoding of 64-bit integer".to_owned();

return error;
}

let new_counter = new_counter.unwrap();

let mut current_state_lock = self.current_state.lock().unwrap();
let mut current_state = current_state_lock.as_mut().unwrap();

if current_state.counter + 1 != new_counter {
let mut error = ResponseDeliverTx::default();
error.code = 2;
error.codespace = "Validation error".to_owned();
error.log = "Only consecutive integers are allowed".to_owned();
error.info = "Numbers to counter app should be supplied in increasing order of consecutive integers staring from 1".to_owned();

return error;
}

current_state.counter = new_counter;

Default::default()
}

async fn end_block(&self, end_block_request: RequestEndBlock) -> ResponseEndBlock {
let mut current_state_lock = self.current_state.lock().unwrap();
let mut current_state = current_state_lock.as_mut().unwrap();

current_state.block_height = end_block_request.height;
current_state.app_hash = current_state.counter.to_be_bytes().to_vec();

Default::default()
}

async fn commit(&self, _commit_request: RequestCommit) -> ResponseCommit {
let current_state = self.current_state.lock().unwrap().as_ref().unwrap().clone();
let mut committed_state = self.committed_state.lock().unwrap();
*committed_state = current_state;

ResponseCommit {
data: (*committed_state).app_hash.clone(),
retain_height: 0,
}
}
}

#[derive(Debug)]
pub struct MempoolConnection {
state: Arc<Mutex<Option<CounterState>>>,
}

impl MempoolConnection {
pub fn new(state: Arc<Mutex<Option<CounterState>>>) -> Self {
Self { state }
}
}

#[async_trait]
impl Mempool for MempoolConnection {
async fn check_tx(&self, check_tx_request: RequestCheckTx) -> ResponseCheckTx {
let new_counter = parse_bytes_to_counter(&check_tx_request.tx);

if new_counter.is_err() {
let mut error = ResponseCheckTx::default();
error.code = 1;
error.codespace = "Parsing error".to_owned();
error.log = "Transaction should be 8 bytes long".to_owned();
error.info = "Transaction is big-endian encoding of 64-bit integer".to_owned();

return error;
}

let new_counter = new_counter.unwrap();

let state_lock = self.state.lock().unwrap();
let state = state_lock.as_ref().unwrap();

if state.counter + 1 != new_counter {
let mut error = ResponseCheckTx::default();
error.code = 2;
error.codespace = "Validation error".to_owned();
error.log = "Only consecutive integers are allowed".to_owned();
error.info = "Numbers to counter app should be supplied in increasing order of consecutive integers staring from 1".to_owned();

return error;
} else {
Default::default()
}
}
}

pub struct InfoConnection {
state: Arc<Mutex<CounterState>>,
}

impl InfoConnection {
pub fn new(state: Arc<Mutex<CounterState>>) -> Self {
Self { state }
}
}

#[async_trait]
impl Info for InfoConnection {
async fn info(&self, _info_request: RequestInfo) -> ResponseInfo {
let state = self.state.lock().unwrap();

ResponseInfo {
data: Default::default(),
version: Default::default(),
app_version: Default::default(),
last_block_height: (*state).block_height,
last_block_app_hash: (*state).app_hash.clone(),
}
}
}

pub struct SnapshotConnection;

#[async_trait]
impl Snapshot for SnapshotConnection {}

fn parse_bytes_to_counter(bytes: &[u8]) -> Result<u64, ()> {
if bytes.len() != 8 {
return Err(());
}

let mut counter_bytes = [0; 8];
counter_bytes.copy_from_slice(bytes);

Ok(u64::from_be_bytes(counter_bytes))
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::DEBUG)
.finish();
set_global_default(subscriber).unwrap();

let committed_state: Arc<Mutex<CounterState>> = Default::default();
let current_state: Arc<Mutex<Option<CounterState>>> = Default::default();

let consensus = ConsensusConnection::new(committed_state.clone(), current_state.clone());
let mempool = MempoolConnection::new(current_state.clone());
let info = InfoConnection::new(committed_state.clone());
let snapshot = SnapshotConnection;

let server = Server::new(consensus, mempool, info, snapshot)?;

server
.run("127.0.0.1:26658".parse::<SocketAddr>().unwrap())
.await
}
Loading