From f3ffeab1bcfb37d0c49d681a03390d28b4957aa2 Mon Sep 17 00:00:00 2001 From: Gregory Terzian Date: Fri, 1 Nov 2019 15:29:58 +0800 Subject: [PATCH] separate server from protocol and app --- Cargo.toml | 3 +- src/lib.rs | 224 +++++++++++++++++++++++++++++++++++++++++++++----- src/server.rs | 156 ++++++++++++++++++++--------------- 3 files changed, 293 insertions(+), 90 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3718eb8..9672e29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,12 +13,13 @@ include = ["src/**/*", "Cargo.toml"] [dependencies] bytes = "0.4" +crossbeam-channel = "0.3.9" protobuf = "2.8.1" byteorder = "1.3.2" integer-encoding = "1.0.5" log = "0.4.8" env_logger = "0.7.0" -tokio = { version = "0.1", default-features = false, features = ["codec", "io", "tcp", "rt-full"] } +tokio = { version = "0.1", default-features = false, features = ["codec", "io", "sync", "tcp", "rt-full"] } futures = "0.1" [build-dependencies] diff --git a/src/lib.rs b/src/lib.rs index 880c348..34f3a44 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,8 @@ //! extern crate byteorder; extern crate bytes; +#[macro_use] +extern crate crossbeam_channel; extern crate env_logger; extern crate futures; extern crate integer_encoding; @@ -30,7 +32,10 @@ extern crate core; extern crate protobuf; extern crate tokio; +use crossbeam_channel::{unbounded, Receiver, Sender}; use std::net::SocketAddr; +use std::thread; +use tokio::sync::oneshot; pub use crate::messages::abci::*; pub use crate::messages::merkle::*; @@ -44,36 +49,51 @@ mod server; /// Main Trait for an ABCI application. Provides generic responses for all callbacks /// Override desired callbacks as needed. Tendermint makes 3 TCP connections to the /// application and does so in a synchonized manner. -pub trait Application { +pub trait Application: 'static + Send { /// Query Connection: Called on startup from Tendermint. The application should normally /// return the last know state so Tendermint can determine if it needs to replay blocks /// to the application. - fn info(&mut self, _req: &RequestInfo) -> ResponseInfo { - ResponseInfo::new() + fn info(&mut self, _req: RequestInfo, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseInfo::new(); + response.set_info(res); + let _ = responder.respond(response); } /// Query Connection: Set options on the application (rarely used) - fn set_option(&mut self, _req: &RequestSetOption) -> ResponseSetOption { - ResponseSetOption::new() + fn set_option(&mut self, _req: RequestSetOption, mut responder: Responder) { + let mut response = Response::new(); + let options = ResponseSetOption::new(); + response.set_set_option(options); + let _ = responder.respond(response); } /// Query Connection: Query your application. This usually resolves through a merkle tree holding /// the state of the app. - fn query(&mut self, _req: &RequestQuery) -> ResponseQuery { - ResponseQuery::new() + fn query(&mut self, _req: RequestQuery, mut responder: Responder) { + let mut response = Response::new(); + let query = ResponseQuery::new(); + response.set_query(query); + let _ = responder.respond(response); } /// Mempool Connection: Used to validate incoming transactions. If the application reponds /// with a non-zero value, the transaction is added to Tendermint's mempool for processing /// on the deliver_tx call below. - fn check_tx(&mut self, _req: &RequestCheckTx) -> ResponseCheckTx { - ResponseCheckTx::new() + fn check_tx(&mut self, _req: RequestCheckTx, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseCheckTx::new(); + response.set_check_tx(res); + let _ = responder.respond(response); } /// Consensus Connection: Called once on startup. Usually used to establish initial (genesis) /// state. - fn init_chain(&mut self, _req: &RequestInitChain) -> ResponseInitChain { - ResponseInitChain::new() + fn init_chain(&mut self, _req: RequestInitChain, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseInitChain::new(); + response.set_init_chain(res); + let _ = responder.respond(response); } /// Consensus Connection: Called at the start of processing a block of transactions @@ -82,31 +102,189 @@ pub trait Application { /// deliver_tx() for each transaction in the block /// end_block() /// commit() - fn begin_block(&mut self, _req: &RequestBeginBlock) -> ResponseBeginBlock { - ResponseBeginBlock::new() + fn begin_block(&mut self, _req: RequestBeginBlock, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseBeginBlock::new(); + response.set_begin_block(res); + let _ = responder.respond(response); } /// Consensus Connection: Actually processing the transaction, performing some form of a /// state transistion. - fn deliver_tx(&mut self, _p: &RequestDeliverTx) -> ResponseDeliverTx { - ResponseDeliverTx::new() + fn deliver_tx(&mut self, _p: RequestDeliverTx, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseDeliverTx::new(); + response.set_deliver_tx(res); + let _ = responder.respond(response); } /// Consensus Connection: Called at the end of the block. Often used to update the validator set. - fn end_block(&mut self, _req: &RequestEndBlock) -> ResponseEndBlock { - ResponseEndBlock::new() + fn end_block(&mut self, _req: RequestEndBlock, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseEndBlock::new(); + response.set_end_block(res); + let _ = responder.respond(response); } /// Consensus Connection: Commit the block with the latest state from the application. - fn commit(&mut self, _req: &RequestCommit) -> ResponseCommit { - ResponseCommit::new() + fn commit(&mut self, _req: RequestCommit, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseCommit::new(); + response.set_commit(res); + let _ = responder.respond(response); + } +} + +pub struct Responder { + response_sender: Sender<(Response, oneshot::Sender)>, + network_sender: Option>, +} + +impl Responder { + pub fn new( + response_sender: Sender<(Response, oneshot::Sender)>, + network_sender: oneshot::Sender, + ) -> Self { + Responder { + response_sender, + network_sender: Some(network_sender), + } } + + /// Respond with a response, can be called successfully only once. + pub fn respond(&mut self, response: Response) -> Result<(), ()> { + if self.network_sender.is_none() { + return Err(()); + } + if let Ok(_) = self.response_sender.send(( + response, + self.network_sender + .take() + .expect("To have a network sender"), + )) { + return Ok(()); + } + return Err(()); + } +} + +struct Protocol { + network_receiver: Receiver<(Request, oneshot::Sender)>, + response_sender: Sender<(Response, oneshot::Sender)>, + response_receiver: Receiver<(Response, oneshot::Sender)>, + application: Box, +} + +impl Protocol { + fn run(&mut self) -> bool { + enum Incoming { + Network((Request, oneshot::Sender)), + Application((Response, oneshot::Sender)), + } + let incoming = select! { + recv(self.response_receiver) -> msg => { + msg.map(Incoming::Application).expect("Error in handling message from application") + }, + recv(self.network_receiver) -> msg => { + if let Err(_) = msg { + // Server disconnected, exit. + return false; + } + msg.map(Incoming::Network).expect("Error in handling message from network") + }, + }; + match incoming { + Incoming::Application((response, sender)) => { + self.handle_application_response(response, sender) + } + Incoming::Network((request, sender)) => self.handle_network_request(request, sender), + } + true + } + + fn handle_application_response( + &mut self, + response: Response, + sender: oneshot::Sender, + ) { + // TODO: sanity checks on response. + let _ = sender.send(response); + } + + fn handle_network_request(&mut self, request: Request, sender: oneshot::Sender) { + // TODO: sanity checks on request. + let mut responder = Responder::new(self.response_sender.clone(), sender); + match request.value { + // Info + Some(Request_oneof_value::info(r)) => { + self.application.info(r, responder); + } + // Init chain + Some(Request_oneof_value::init_chain(r)) => self.application.init_chain(r, responder), + // Set option + Some(Request_oneof_value::set_option(r)) => self.application.set_option(r, responder), + // Query + Some(Request_oneof_value::query(r)) => self.application.query(r, responder), + // Check tx + Some(Request_oneof_value::check_tx(r)) => self.application.check_tx(r, responder), + // Begin block + Some(Request_oneof_value::begin_block(r)) => self.application.begin_block(r, responder), + // Deliver Tx + Some(Request_oneof_value::deliver_tx(r)) => self.application.deliver_tx(r, responder), + // End block + Some(Request_oneof_value::end_block(r)) => self.application.end_block(r, responder), + // Commit + Some(Request_oneof_value::commit(r)) => self.application.commit(r, responder), + // Flush + Some(Request_oneof_value::flush(_)) => { + let mut response = Response::new(); + response.set_flush(ResponseFlush::new()); + let _ = responder.respond(response); + } + // Echo + Some(Request_oneof_value::echo(r)) => { + let mut response = Response::new(); + let echo_msg = r.get_message().to_string(); + let mut echo = ResponseEcho::new(); + echo.set_message(echo_msg); + response.set_echo(echo); + let _ = responder.respond(response); + } + _ => { + let mut response = Response::new(); + let mut re = ResponseException::new(); + re.set_error(String::from("Unrecognized request")); + response.set_exception(re); + let _ = responder.respond(response); + } + } + } +} + +pub fn run_protocol(app: A) -> Sender<(Request, oneshot::Sender)> +where + A: Application, +{ + let (network_sender, network_receiver) = unbounded(); + let (response_sender, response_receiver) = unbounded(); + let mut protocol = Protocol { + application: Box::new(app), + network_receiver, + response_sender, + response_receiver, + }; + thread::spawn(move || { + while protocol.run() { + // running. + } + }); + network_sender } /// Setup the app and start the server using localhost and default tendermint port 26658 pub fn run_local(app: A) where - A: Application + 'static + Send + Sync, + A: Application, { let addr = "127.0.0.1:26658".parse().unwrap(); run(addr, app); @@ -115,7 +293,9 @@ where /// Setup the application and start the server. Use this fn when setting different ip:port. pub fn run(listen_addr: SocketAddr, app: A) where - A: Application + 'static + Send + Sync, + A: Application, { - serve(app, listen_addr).unwrap(); + thread::spawn(move || { + serve(app, listen_addr).unwrap(); + }); } diff --git a/src/server.rs b/src/server.rs index 0aba324..a28c039 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,96 +1,118 @@ -use std::net::SocketAddr; -use std::ops::DerefMut; -use std::sync::{Arc, Mutex}; - +use crossbeam_channel::Sender; use env_logger::Env; +use std::net::SocketAddr; use tokio; use tokio::codec::Decoder; use tokio::io; -use tokio::net::TcpListener; +use tokio::net::{tcp::Incoming, TcpListener}; use tokio::prelude::*; +use tokio::runtime::current_thread; +use tokio::sync::oneshot; use crate::codec::ABCICodec; use crate::messages::abci::*; -use crate::Application; +use crate::{run_protocol, Application}; /// Creates the TCP server and listens for connections from Tendermint pub fn serve(app: A, addr: SocketAddr) -> io::Result<()> where - A: Application + 'static + Send + Sync, + A: Application, { env_logger::from_env(Env::default().default_filter_or("info")) .try_init() .ok(); let listener = TcpListener::bind(&addr).unwrap(); + let protocol_sender = run_protocol(app); let incoming = listener.incoming(); - let app = Arc::new(Mutex::new(app)); - let server = incoming - .map_err(|err| panic!("Connection failed: {}", err)) - .for_each(move |socket| { - info!("Got connection! {:?}", socket); - let framed = ABCICodec::new().framed(socket); - let (_writer, reader) = framed.split(); - let app_instance = Arc::clone(&app); - - let responses = reader.map(move |request| { - debug!("Got Request! {:?}", request); - respond(&app_instance, &request) - }); + let mut runtime = current_thread::Runtime::new().expect("To start a runtime"); + let server = Server { + runtime_handle: runtime.handle(), + incoming, + protocol_sender, + }; - let writes = responses.fold(_writer, |writer, response| { - debug!("Return Response! {:?}", response); - writer.send(response) - }); - tokio::spawn(writes.then(|_| Ok(()))) - }); - tokio::run(server); + runtime + .block_on(server) + .expect("Runtime to block on server"); Ok(()) } -fn respond(app: &Arc>, request: &Request) -> Response -where - A: Application + 'static + Send + Sync, -{ - let mut guard = app.lock().unwrap(); - let app = guard.deref_mut(); +struct Server { + runtime_handle: current_thread::Handle, + incoming: Incoming, + protocol_sender: Sender<(Request, oneshot::Sender)>, +} - let mut response = Response::new(); +struct Connection { + runtime_handle: current_thread::Handle, + protocol_sender: Sender<(Request, oneshot::Sender)>, + response_receiver: oneshot::Receiver, + response_sender: Option>, + writer: Option>>, + reader: Option>>, +} + +impl Future for Server { + type Item = (); + type Error = (); - match request.value { - // Info - Some(Request_oneof_value::info(ref r)) => response.set_info(app.info(r)), - // Init chain - Some(Request_oneof_value::init_chain(ref r)) => response.set_init_chain(app.init_chain(r)), - // Set option - Some(Request_oneof_value::set_option(ref r)) => response.set_set_option(app.set_option(r)), - // Query - Some(Request_oneof_value::query(ref r)) => response.set_query(app.query(r)), - // Check tx - Some(Request_oneof_value::check_tx(ref r)) => response.set_check_tx(app.check_tx(r)), - // Begin block - Some(Request_oneof_value::begin_block(ref r)) => { - response.set_begin_block(app.begin_block(r)) + fn poll(&mut self) -> Result, Self::Error> { + match self.incoming.poll() { + Ok(Async::Ready(Some(socket))) => { + let framed = ABCICodec::new().framed(socket); + let (writer, reader) = framed.split(); + let (sender, response_receiver) = oneshot::channel(); + let connection = Connection { + runtime_handle: self.runtime_handle.clone(), + protocol_sender: self.protocol_sender.clone(), + response_receiver, + response_sender: Some(sender), + writer: Some(writer), + reader: Some(reader), + }; + self.runtime_handle + .spawn(connection.then(|_| Ok(()))) + .expect("To spawn a connection"); + } + Ok(Async::NotReady) => {} + Err(_) | Ok(Async::Ready(None)) => { + // Connection closed, I assume this will drop the Server struct, + // which will also shutdown the protocol thread, + // when all protocol senders drop. + return Ok(Async::Ready(())); + } } - // Deliver Tx - Some(Request_oneof_value::deliver_tx(ref r)) => response.set_deliver_tx(app.deliver_tx(r)), - // End block - Some(Request_oneof_value::end_block(ref r)) => response.set_end_block(app.end_block(r)), - // Commit - Some(Request_oneof_value::commit(ref r)) => response.set_commit(app.commit(r)), - // Flush - Some(Request_oneof_value::flush(_)) => response.set_flush(ResponseFlush::new()), - // Echo - Some(Request_oneof_value::echo(ref r)) => { - let echo_msg = r.get_message().to_string(); - let mut echo = ResponseEcho::new(); - echo.set_message(echo_msg); - response.set_echo(echo); + Ok(Async::NotReady) + } +} + +impl Future for Connection { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Result, Self::Error> { + if let Some(reader) = &mut self.reader { + if let Ok(Async::Ready(Some(request))) = reader.poll() { + let _ = self.protocol_sender.send(( + request, + self.response_sender.take().expect("To have a sender"), + )); + self.reader = None; + } } - _ => { - let mut re = ResponseException::new(); - re.set_error(String::from("Unrecognized request")); - response.set_exception(re) + if self.writer.is_some() { + match self.response_receiver.poll() { + Ok(Async::Ready(response)) => { + let writer = self.writer.take().expect("To have a writer"); + let writes = writer.send(response); + self.runtime_handle + .spawn(writes.then(|_| Ok(()))) + .expect("To spawn a writer"); + } + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_) => return Err(()), + } } + Ok(Async::Ready(())) } - response }