Skip to content
This repository has been archived by the owner on Mar 11, 2021. It is now read-only.

Commit

Permalink
separate server from protocol and app
Browse files Browse the repository at this point in the history
  • Loading branch information
gterzian committed Nov 1, 2019
1 parent 62dac56 commit f3ffeab
Show file tree
Hide file tree
Showing 3 changed files with 293 additions and 90 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ include = ["src/**/*", "Cargo.toml"]

[dependencies]
bytes = "0.4"
crossbeam-channel = "0.3.9"
protobuf = "2.8.1"
byteorder = "1.3.2"
integer-encoding = "1.0.5"
log = "0.4.8"
env_logger = "0.7.0"
tokio = { version = "0.1", default-features = false, features = ["codec", "io", "tcp", "rt-full"] }
tokio = { version = "0.1", default-features = false, features = ["codec", "io", "sync", "tcp", "rt-full"] }
futures = "0.1"

[build-dependencies]
Expand Down
224 changes: 202 additions & 22 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
//!
extern crate byteorder;
extern crate bytes;
#[macro_use]
extern crate crossbeam_channel;
extern crate env_logger;
extern crate futures;
extern crate integer_encoding;
Expand All @@ -30,7 +32,10 @@ extern crate core;
extern crate protobuf;
extern crate tokio;

use crossbeam_channel::{unbounded, Receiver, Sender};
use std::net::SocketAddr;
use std::thread;
use tokio::sync::oneshot;

pub use crate::messages::abci::*;
pub use crate::messages::merkle::*;
Expand All @@ -44,36 +49,51 @@ mod server;
/// Main Trait for an ABCI application. Provides generic responses for all callbacks
/// Override desired callbacks as needed. Tendermint makes 3 TCP connections to the
/// application and does so in a synchonized manner.
pub trait Application {
pub trait Application: 'static + Send {
/// Query Connection: Called on startup from Tendermint. The application should normally
/// return the last know state so Tendermint can determine if it needs to replay blocks
/// to the application.
fn info(&mut self, _req: &RequestInfo) -> ResponseInfo {
ResponseInfo::new()
fn info(&mut self, _req: RequestInfo, mut responder: Responder) {
let mut response = Response::new();
let res = ResponseInfo::new();
response.set_info(res);
let _ = responder.respond(response);
}

/// Query Connection: Set options on the application (rarely used)
fn set_option(&mut self, _req: &RequestSetOption) -> ResponseSetOption {
ResponseSetOption::new()
fn set_option(&mut self, _req: RequestSetOption, mut responder: Responder) {
let mut response = Response::new();
let options = ResponseSetOption::new();
response.set_set_option(options);
let _ = responder.respond(response);
}

/// Query Connection: Query your application. This usually resolves through a merkle tree holding
/// the state of the app.
fn query(&mut self, _req: &RequestQuery) -> ResponseQuery {
ResponseQuery::new()
fn query(&mut self, _req: RequestQuery, mut responder: Responder) {
let mut response = Response::new();
let query = ResponseQuery::new();
response.set_query(query);
let _ = responder.respond(response);
}

/// Mempool Connection: Used to validate incoming transactions. If the application reponds
/// with a non-zero value, the transaction is added to Tendermint's mempool for processing
/// on the deliver_tx call below.
fn check_tx(&mut self, _req: &RequestCheckTx) -> ResponseCheckTx {
ResponseCheckTx::new()
fn check_tx(&mut self, _req: RequestCheckTx, mut responder: Responder) {
let mut response = Response::new();
let res = ResponseCheckTx::new();
response.set_check_tx(res);
let _ = responder.respond(response);
}

/// Consensus Connection: Called once on startup. Usually used to establish initial (genesis)
/// state.
fn init_chain(&mut self, _req: &RequestInitChain) -> ResponseInitChain {
ResponseInitChain::new()
fn init_chain(&mut self, _req: RequestInitChain, mut responder: Responder) {
let mut response = Response::new();
let res = ResponseInitChain::new();
response.set_init_chain(res);
let _ = responder.respond(response);
}

/// Consensus Connection: Called at the start of processing a block of transactions
Expand All @@ -82,31 +102,189 @@ pub trait Application {
/// deliver_tx() for each transaction in the block
/// end_block()
/// commit()
fn begin_block(&mut self, _req: &RequestBeginBlock) -> ResponseBeginBlock {
ResponseBeginBlock::new()
fn begin_block(&mut self, _req: RequestBeginBlock, mut responder: Responder) {
let mut response = Response::new();
let res = ResponseBeginBlock::new();
response.set_begin_block(res);
let _ = responder.respond(response);
}

/// Consensus Connection: Actually processing the transaction, performing some form of a
/// state transistion.
fn deliver_tx(&mut self, _p: &RequestDeliverTx) -> ResponseDeliverTx {
ResponseDeliverTx::new()
fn deliver_tx(&mut self, _p: RequestDeliverTx, mut responder: Responder) {
let mut response = Response::new();
let res = ResponseDeliverTx::new();
response.set_deliver_tx(res);
let _ = responder.respond(response);
}

/// Consensus Connection: Called at the end of the block. Often used to update the validator set.
fn end_block(&mut self, _req: &RequestEndBlock) -> ResponseEndBlock {
ResponseEndBlock::new()
fn end_block(&mut self, _req: RequestEndBlock, mut responder: Responder) {
let mut response = Response::new();
let res = ResponseEndBlock::new();
response.set_end_block(res);
let _ = responder.respond(response);
}

/// Consensus Connection: Commit the block with the latest state from the application.
fn commit(&mut self, _req: &RequestCommit) -> ResponseCommit {
ResponseCommit::new()
fn commit(&mut self, _req: RequestCommit, mut responder: Responder) {
let mut response = Response::new();
let res = ResponseCommit::new();
response.set_commit(res);
let _ = responder.respond(response);
}
}

pub struct Responder {
response_sender: Sender<(Response, oneshot::Sender<Response>)>,
network_sender: Option<oneshot::Sender<Response>>,
}

impl Responder {
pub fn new(
response_sender: Sender<(Response, oneshot::Sender<Response>)>,
network_sender: oneshot::Sender<Response>,
) -> Self {
Responder {
response_sender,
network_sender: Some(network_sender),
}
}

/// Respond with a response, can be called successfully only once.
pub fn respond(&mut self, response: Response) -> Result<(), ()> {
if self.network_sender.is_none() {
return Err(());
}
if let Ok(_) = self.response_sender.send((
response,
self.network_sender
.take()
.expect("To have a network sender"),
)) {
return Ok(());
}
return Err(());
}
}

struct Protocol {
network_receiver: Receiver<(Request, oneshot::Sender<Response>)>,
response_sender: Sender<(Response, oneshot::Sender<Response>)>,
response_receiver: Receiver<(Response, oneshot::Sender<Response>)>,
application: Box<dyn Application>,
}

impl Protocol {
fn run(&mut self) -> bool {
enum Incoming {
Network((Request, oneshot::Sender<Response>)),
Application((Response, oneshot::Sender<Response>)),
}
let incoming = select! {
recv(self.response_receiver) -> msg => {
msg.map(Incoming::Application).expect("Error in handling message from application")
},
recv(self.network_receiver) -> msg => {
if let Err(_) = msg {
// Server disconnected, exit.
return false;
}
msg.map(Incoming::Network).expect("Error in handling message from network")
},
};
match incoming {
Incoming::Application((response, sender)) => {
self.handle_application_response(response, sender)
}
Incoming::Network((request, sender)) => self.handle_network_request(request, sender),
}
true
}

fn handle_application_response(
&mut self,
response: Response,
sender: oneshot::Sender<Response>,
) {
// TODO: sanity checks on response.
let _ = sender.send(response);
}

fn handle_network_request(&mut self, request: Request, sender: oneshot::Sender<Response>) {
// TODO: sanity checks on request.
let mut responder = Responder::new(self.response_sender.clone(), sender);
match request.value {
// Info
Some(Request_oneof_value::info(r)) => {
self.application.info(r, responder);
}
// Init chain
Some(Request_oneof_value::init_chain(r)) => self.application.init_chain(r, responder),
// Set option
Some(Request_oneof_value::set_option(r)) => self.application.set_option(r, responder),
// Query
Some(Request_oneof_value::query(r)) => self.application.query(r, responder),
// Check tx
Some(Request_oneof_value::check_tx(r)) => self.application.check_tx(r, responder),
// Begin block
Some(Request_oneof_value::begin_block(r)) => self.application.begin_block(r, responder),
// Deliver Tx
Some(Request_oneof_value::deliver_tx(r)) => self.application.deliver_tx(r, responder),
// End block
Some(Request_oneof_value::end_block(r)) => self.application.end_block(r, responder),
// Commit
Some(Request_oneof_value::commit(r)) => self.application.commit(r, responder),
// Flush
Some(Request_oneof_value::flush(_)) => {
let mut response = Response::new();
response.set_flush(ResponseFlush::new());
let _ = responder.respond(response);
}
// Echo
Some(Request_oneof_value::echo(r)) => {
let mut response = Response::new();
let echo_msg = r.get_message().to_string();
let mut echo = ResponseEcho::new();
echo.set_message(echo_msg);
response.set_echo(echo);
let _ = responder.respond(response);
}
_ => {
let mut response = Response::new();
let mut re = ResponseException::new();
re.set_error(String::from("Unrecognized request"));
response.set_exception(re);
let _ = responder.respond(response);
}
}
}
}

pub fn run_protocol<A>(app: A) -> Sender<(Request, oneshot::Sender<Response>)>
where
A: Application,
{
let (network_sender, network_receiver) = unbounded();
let (response_sender, response_receiver) = unbounded();
let mut protocol = Protocol {
application: Box::new(app),
network_receiver,
response_sender,
response_receiver,
};
thread::spawn(move || {
while protocol.run() {
// running.
}
});
network_sender
}

/// Setup the app and start the server using localhost and default tendermint port 26658
pub fn run_local<A>(app: A)
where
A: Application + 'static + Send + Sync,
A: Application,
{
let addr = "127.0.0.1:26658".parse().unwrap();
run(addr, app);
Expand All @@ -115,7 +293,9 @@ where
/// Setup the application and start the server. Use this fn when setting different ip:port.
pub fn run<A>(listen_addr: SocketAddr, app: A)
where
A: Application + 'static + Send + Sync,
A: Application,
{
serve(app, listen_addr).unwrap();
thread::spawn(move || {
serve(app, listen_addr).unwrap();
});
}

0 comments on commit f3ffeab

Please sign in to comment.