diff --git a/Cargo.toml b/Cargo.toml index 3718eb8..9672e29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,12 +13,13 @@ include = ["src/**/*", "Cargo.toml"] [dependencies] bytes = "0.4" +crossbeam-channel = "0.3.9" protobuf = "2.8.1" byteorder = "1.3.2" integer-encoding = "1.0.5" log = "0.4.8" env_logger = "0.7.0" -tokio = { version = "0.1", default-features = false, features = ["codec", "io", "tcp", "rt-full"] } +tokio = { version = "0.1", default-features = false, features = ["codec", "io", "sync", "tcp", "rt-full"] } futures = "0.1" [build-dependencies] diff --git a/examples/counter_app.rs b/examples/counter_app.rs index 3d86ec9..d114f43 100644 --- a/examples/counter_app.rs +++ b/examples/counter_app.rs @@ -31,7 +31,7 @@ fn convert_tx(tx: &[u8]) -> u64 { impl abci::Application for CounterApp { // Validate transactions. Rule: Transactions must be incremental: 1,2,3,4... - fn check_tx(&mut self, req: &RequestCheckTx) -> ResponseCheckTx { + fn check_tx(&mut self, req: RequestCheckTx, mut responder: Responder) { // Get the Tx [u8] and convert to u64 let c = convert_tx(req.get_tx()); let mut resp = ResponseCheckTx::new(); @@ -40,24 +40,29 @@ impl abci::Application for CounterApp { if c != self.count + 1 { resp.set_code(1); resp.set_log(String::from("Count must be incremental!")); - return resp; + } else { + // Update state to keep state correct for next check_tx call + self.count = c; } - // Update state to keep state correct for next check_tx call - self.count = c; - resp + let mut response = Response::new(); + response.set_check_tx(resp); + let _ = responder.respond(response); } - fn deliver_tx(&mut self, req: &RequestDeliverTx) -> ResponseDeliverTx { + fn deliver_tx(&mut self, req: RequestDeliverTx, mut responder: Responder) { // Get the Tx [u8] let c = convert_tx(req.get_tx()); // Update state self.count = c; // Return default code 0 == bueno - ResponseDeliverTx::new() + let res = ResponseDeliverTx::new(); + let mut response = Response::new(); + response.set_deliver_tx(res); + let _ = responder.respond(response); } - fn commit(&mut self, _req: &RequestCommit) -> ResponseCommit { + fn commit(&mut self, _req: RequestCommit, mut responder: Responder) { // Create the response let mut resp = ResponseCommit::new(); // Convert count to bits @@ -65,7 +70,9 @@ impl abci::Application for CounterApp { BigEndian::write_u64(&mut buf, self.count); // Set data so last state is included in the block resp.set_data(buf.to_vec()); - resp + let mut response = Response::new(); + response.set_commit(resp); + let _ = responder.respond(response); } } diff --git a/examples/threaded_counter_app.rs b/examples/threaded_counter_app.rs new file mode 100644 index 0000000..e134943 --- /dev/null +++ b/examples/threaded_counter_app.rs @@ -0,0 +1,157 @@ +extern crate abci; +extern crate byteorder; +extern crate env_logger; + +use abci::*; +use byteorder::{BigEndian, ByteOrder}; +use crossbeam_channel::{unbounded, Receiver, Sender}; +use env_logger::Env; +use std::thread; + +// Simple counter application. Its only state is a u64 count +// We use BigEndian to serialize the data across transactions calls +struct CounterApp { + count: u64, + receiver: Receiver, + exiter: Exiter, +} + +// Convert incoming tx data to the proper BigEndian size. txs.len() > 8 will return 0 +fn convert_tx(tx: &[u8]) -> u64 { + if tx.len() < 8 { + let pad = 8 - tx.len(); + let mut x = vec![0; pad]; + x.extend_from_slice(tx); + return BigEndian::read_u64(x.as_slice()); + } + BigEndian::read_u64(tx) +} + +enum CounterMsg { + CheckTx(RequestCheckTx, Responder), + DeliverTx(RequestDeliverTx, Responder), + Commit(RequestCommit, Responder), + Exit, +} + +/// The front-end to the application, forwards all requests on a channel. +struct CounterProxy { + sender: Sender, + receiver: Option>, +} + +impl CounterProxy { + fn new() -> Self { + let (sender, receiver) = unbounded(); + CounterProxy { + sender, + receiver: Some(receiver), + } + } +} + +impl abci::Application for CounterProxy { + fn start(&mut self, exiter: Exiter) { + let mut app = CounterApp { + count: 0, + receiver: self + .receiver + .take() + .expect("CounterProxy to have a receiver"), + exiter, + }; + thread::spawn(move || while app.run() {}); + } + + fn exit(&mut self) { + let _ = self.sender.send(CounterMsg::Exit); + } + + // Validate transactions. Rule: Transactions must be incremental: 1,2,3,4... + fn check_tx(&mut self, req: RequestCheckTx, responder: Responder) { + let _ = self.sender.send(CounterMsg::CheckTx(req, responder)); + } + + fn deliver_tx(&mut self, req: RequestDeliverTx, responder: Responder) { + let _ = self.sender.send(CounterMsg::DeliverTx(req, responder)); + } + + fn commit(&mut self, req: RequestCommit, responder: Responder) { + let _ = self.sender.send(CounterMsg::Commit(req, responder)); + } +} + +impl CounterApp { + fn run(&mut self) -> bool { + while let Ok(msg) = self.receiver.recv() { + match msg { + CounterMsg::CheckTx(req, responder) => self.check_tx(req, responder), + CounterMsg::DeliverTx(req, responder) => self.deliver_tx(req, responder), + CounterMsg::Commit(req, responder) => self.commit(req, responder), + CounterMsg::Exit => { + // The ABCI layer sent us a message to quit. + return false; + }, + } + // For some reason, stop when the count hits 10, + // signalling the ABCI layer to quit. + if self.count > 10 { + self.exiter.exit(); + return false; + } + } + // Proxy went away, unexpected. + false + } + + // Validate transactions. Rule: Transactions must be incremental: 1,2,3,4... + fn check_tx(&mut self, req: RequestCheckTx, mut responder: Responder) { + // Get the Tx [u8] and convert to u64 + let c = convert_tx(req.get_tx()); + let mut resp = ResponseCheckTx::new(); + + // Validation logic + if c != self.count + 1 { + resp.set_code(1); + resp.set_log(String::from("Count must be incremental!")); + } else { + // Update state to keep state correct for next check_tx call + self.count = c; + } + + let mut response = Response::new(); + response.set_check_tx(resp); + let _ = responder.respond(response); + } + + fn deliver_tx(&mut self, req: RequestDeliverTx, mut responder: Responder) { + // Get the Tx [u8] + let c = convert_tx(req.get_tx()); + // Update state + self.count = c; + // Return default code 0 == bueno + let res = ResponseDeliverTx::new(); + let mut response = Response::new(); + response.set_deliver_tx(res); + let _ = responder.respond(response); + } + + fn commit(&mut self, _req: RequestCommit, mut responder: Responder) { + // Create the response + let mut resp = ResponseCommit::new(); + // Convert count to bits + let mut buf = [0; 8]; + BigEndian::write_u64(&mut buf, self.count); + // Set data so last state is included in the block + resp.set_data(buf.to_vec()); + let mut response = Response::new(); + response.set_commit(resp); + let _ = responder.respond(response); + } +} + +fn main() { + // Run on localhost using default Tendermint port + env_logger::from_env(Env::default().default_filter_or("info")).init(); + abci::run_local(CounterProxy::new()); +} diff --git a/src/lib.rs b/src/lib.rs index 880c348..476f7cb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,8 @@ //! extern crate byteorder; extern crate bytes; +#[macro_use] +extern crate crossbeam_channel; extern crate env_logger; extern crate futures; extern crate integer_encoding; @@ -30,7 +32,10 @@ extern crate core; extern crate protobuf; extern crate tokio; +use crossbeam_channel::{unbounded, Receiver, Sender}; use std::net::SocketAddr; +use std::thread; +use tokio::sync::oneshot; pub use crate::messages::abci::*; pub use crate::messages::merkle::*; @@ -44,36 +49,55 @@ mod server; /// Main Trait for an ABCI application. Provides generic responses for all callbacks /// Override desired callbacks as needed. Tendermint makes 3 TCP connections to the /// application and does so in a synchonized manner. -pub trait Application { +pub trait Application: 'static + Send { + /// Signal the start of the server to the application, passing along an Exiter. + fn start(&mut self, _exiter: Exiter) {} + /// Signal the exit of the server to the application. + fn exit(&mut self) {} /// Query Connection: Called on startup from Tendermint. The application should normally /// return the last know state so Tendermint can determine if it needs to replay blocks /// to the application. - fn info(&mut self, _req: &RequestInfo) -> ResponseInfo { - ResponseInfo::new() + fn info(&mut self, _req: RequestInfo, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseInfo::new(); + response.set_info(res); + let _ = responder.respond(response); } /// Query Connection: Set options on the application (rarely used) - fn set_option(&mut self, _req: &RequestSetOption) -> ResponseSetOption { - ResponseSetOption::new() + fn set_option(&mut self, _req: RequestSetOption, mut responder: Responder) { + let mut response = Response::new(); + let options = ResponseSetOption::new(); + response.set_set_option(options); + let _ = responder.respond(response); } /// Query Connection: Query your application. This usually resolves through a merkle tree holding /// the state of the app. - fn query(&mut self, _req: &RequestQuery) -> ResponseQuery { - ResponseQuery::new() + fn query(&mut self, _req: RequestQuery, mut responder: Responder) { + let mut response = Response::new(); + let query = ResponseQuery::new(); + response.set_query(query); + let _ = responder.respond(response); } /// Mempool Connection: Used to validate incoming transactions. If the application reponds /// with a non-zero value, the transaction is added to Tendermint's mempool for processing /// on the deliver_tx call below. - fn check_tx(&mut self, _req: &RequestCheckTx) -> ResponseCheckTx { - ResponseCheckTx::new() + fn check_tx(&mut self, _req: RequestCheckTx, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseCheckTx::new(); + response.set_check_tx(res); + let _ = responder.respond(response); } /// Consensus Connection: Called once on startup. Usually used to establish initial (genesis) /// state. - fn init_chain(&mut self, _req: &RequestInitChain) -> ResponseInitChain { - ResponseInitChain::new() + fn init_chain(&mut self, _req: RequestInitChain, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseInitChain::new(); + response.set_init_chain(res); + let _ = responder.respond(response); } /// Consensus Connection: Called at the start of processing a block of transactions @@ -82,40 +106,353 @@ pub trait Application { /// deliver_tx() for each transaction in the block /// end_block() /// commit() - fn begin_block(&mut self, _req: &RequestBeginBlock) -> ResponseBeginBlock { - ResponseBeginBlock::new() + fn begin_block(&mut self, _req: RequestBeginBlock, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseBeginBlock::new(); + response.set_begin_block(res); + let _ = responder.respond(response); } /// Consensus Connection: Actually processing the transaction, performing some form of a /// state transistion. - fn deliver_tx(&mut self, _p: &RequestDeliverTx) -> ResponseDeliverTx { - ResponseDeliverTx::new() + fn deliver_tx(&mut self, _p: RequestDeliverTx, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseDeliverTx::new(); + response.set_deliver_tx(res); + let _ = responder.respond(response); } /// Consensus Connection: Called at the end of the block. Often used to update the validator set. - fn end_block(&mut self, _req: &RequestEndBlock) -> ResponseEndBlock { - ResponseEndBlock::new() + fn end_block(&mut self, _req: RequestEndBlock, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseEndBlock::new(); + response.set_end_block(res); + let _ = responder.respond(response); } /// Consensus Connection: Commit the block with the latest state from the application. - fn commit(&mut self, _req: &RequestCommit) -> ResponseCommit { - ResponseCommit::new() + fn commit(&mut self, _req: RequestCommit, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseCommit::new(); + response.set_commit(res); + let _ = responder.respond(response); } } +/// To be used by the application to provide a response, +/// possibly asynchronously, for a specific request. +pub struct Responder { + response_sender: Sender<(Response, oneshot::Sender)>, + network_sender: Option>, +} + +impl Responder { + pub fn new( + response_sender: Sender<(Response, oneshot::Sender)>, + network_sender: oneshot::Sender, + ) -> Self { + Responder { + response_sender, + network_sender: Some(network_sender), + } + } + + /// Respond with a response, can be called successfully only once. + /// Sends the response back to the server asynchronously, does not block. + pub fn respond(&mut self, response: Response) -> Result<(), ()> { + if self.network_sender.is_none() { + return Err(()); + } + if let Ok(_) = self.response_sender.send(( + response, + self.network_sender + .take() + .expect("To have a network sender"), + )) { + return Ok(()); + } + return Err(()); + } +} + +/// Exit signal internally used both to stop the protocol and server. +#[derive(Eq, PartialEq)] +pub struct ExitSignal; + +/// A signaling mechanism passed to the application on start-up, +/// to be used to signal the ABCI layer to exit. +pub struct Exiter(Sender); + +impl Exiter { + /// Can be used by the application to signal to the ABCI layer to exit. + pub fn exit(&self) { + let _ = self.0.send(ExitSignal); + } +} + +/// A layer between the application and the networking. +/// Could be used to keep track of order of requests/responses, +/// and provide other guarantees of sanity to the application. +struct Protocol { + network_receiver: Receiver<(Request, oneshot::Sender)>, + response_sender: Sender<(Response, oneshot::Sender)>, + response_receiver: Receiver<(Response, oneshot::Sender)>, + exit_receiver: Receiver, + exit_sender: Sender, + application: Box, + network_exit_sender: Option>, +} + +impl Protocol { + /// Handles messages from the network and application until exit. + fn run(&mut self) -> bool { + enum Incoming { + Network((Request, oneshot::Sender)), + Application((Response, oneshot::Sender)), + } + let incoming = select! { + recv(self.exit_receiver) -> _ => return false, + recv(self.response_receiver) -> msg => { + msg.map(Incoming::Application).expect("Error in handling message from application") + }, + recv(self.network_receiver) -> msg => { + if let Err(_) = msg { + // Server disconnected, exit. + return false; + } + msg.map(Incoming::Network).expect("Error in handling message from network") + }, + }; + match incoming { + Incoming::Application((response, sender)) => { + self.handle_application_response(response, sender) + } + Incoming::Network((request, sender)) => self.handle_network_request(request, sender), + } + true + } + + /// Called once upon start-up. + fn start(&mut self) { + let exiter = Exiter(self.exit_sender.clone()); + self.application.start(exiter); + } + + /// Called once upon exit. + fn exit(&mut self) { + self.application.exit(); + let _ = self + .network_exit_sender + .take() + .expect("To have an network exit sender") + .send(ExitSignal); + } + + /// Handle a message containing a response from the application. + /// Could be used to perform additional sanity checks. + /// Forwards the response to the network. + fn handle_application_response( + &mut self, + response: Response, + sender: oneshot::Sender, + ) { + // TODO: sanity checks on response. + let _ = sender.send(response); + } + + /// Handle a message containing a request from the network. + /// Calls into the application directly as well, with the application either + /// handling these calls in the current thread, or handling them in asynchronous fashion. + fn handle_network_request(&mut self, request: Request, sender: oneshot::Sender) { + // TODO: sanity checks on request. + let mut responder = Responder::new(self.response_sender.clone(), sender); + match request.value { + // Info + Some(Request_oneof_value::info(r)) => { + self.application.info(r, responder); + } + // Init chain + Some(Request_oneof_value::init_chain(r)) => self.application.init_chain(r, responder), + // Set option + Some(Request_oneof_value::set_option(r)) => self.application.set_option(r, responder), + // Query + Some(Request_oneof_value::query(r)) => self.application.query(r, responder), + // Check tx + Some(Request_oneof_value::check_tx(r)) => self.application.check_tx(r, responder), + // Begin block + Some(Request_oneof_value::begin_block(r)) => self.application.begin_block(r, responder), + // Deliver Tx + Some(Request_oneof_value::deliver_tx(r)) => self.application.deliver_tx(r, responder), + // End block + Some(Request_oneof_value::end_block(r)) => self.application.end_block(r, responder), + // Commit + Some(Request_oneof_value::commit(r)) => self.application.commit(r, responder), + // Flush + Some(Request_oneof_value::flush(_)) => { + let mut response = Response::new(); + response.set_flush(ResponseFlush::new()); + let _ = responder.respond(response); + } + // Echo + Some(Request_oneof_value::echo(r)) => { + let mut response = Response::new(); + let echo_msg = r.get_message().to_string(); + let mut echo = ResponseEcho::new(); + echo.set_message(echo_msg); + response.set_echo(echo); + let _ = responder.respond(response); + } + _ => { + let mut response = Response::new(); + let mut re = ResponseException::new(); + re.set_error(String::from("Unrecognized request")); + response.set_exception(re); + let _ = responder.respond(response); + } + } + } +} + +/// Start the protocol, and the application. +/// Returns a sender of networking requests. +pub fn run_protocol( + app: A, + network_exit_sender: oneshot::Sender, +) -> Sender<(Request, oneshot::Sender)> +where + A: Application, +{ + let (network_sender, network_receiver) = unbounded(); + let (response_sender, response_receiver) = unbounded(); + let (exit_sender, exit_receiver) = unbounded(); + let mut protocol = Protocol { + application: Box::new(app), + network_receiver, + response_sender, + response_receiver, + exit_sender, + exit_receiver, + network_exit_sender: Some(network_exit_sender), + }; + thread::spawn(move || { + protocol.start(); + while protocol.run() { + // running. + } + protocol.exit(); + }); + network_sender +} + /// Setup the app and start the server using localhost and default tendermint port 26658 +/// Blocks the current thread. pub fn run_local(app: A) where - A: Application + 'static + Send + Sync, + A: Application, { let addr = "127.0.0.1:26658".parse().unwrap(); run(addr, app); } /// Setup the application and start the server. Use this fn when setting different ip:port. +/// Blocks the current thread. pub fn run(listen_addr: SocketAddr, app: A) where - A: Application + 'static + Send + Sync, + A: Application, { serve(app, listen_addr).unwrap(); } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_protocol_calls_app() { + #[derive(Eq, PartialEq)] + enum Checks { + CalledStart, + SentCheckTx, + SentDeliverTx, + CalledExit, + } + struct ChecksApp { + checks_sender: Sender, + exiter: Option, + }; + impl Application for ChecksApp { + fn check_tx(&mut self, _req: RequestCheckTx, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseCheckTx::new(); + response.set_check_tx(res); + let _ = responder.respond(response); + let _ = self.checks_sender.send(Checks::SentCheckTx); + } + fn deliver_tx(&mut self, _req: RequestDeliverTx, mut responder: Responder) { + let mut response = Response::new(); + let res = ResponseDeliverTx::new(); + response.set_deliver_tx(res); + let _ = responder.respond(response); + let _ = self.checks_sender.send(Checks::SentDeliverTx); + } + fn start(&mut self, exiter: Exiter) { + self.exiter = Some(exiter); + let _ = self.checks_sender.send(Checks::CalledStart); + } + fn exit(&mut self) { + let _ = self.checks_sender.send(Checks::CalledExit); + } + } + + // Start the protocol and app. + let (checks_sender, checks_receiver) = unbounded(); + let app = ChecksApp { + checks_sender, + exiter: None, + }; + let (exit_sender, mut exit_receiver) = oneshot::channel(); + let protocol_sender = run_protocol(app, exit_sender); + + // Send a check tx req. + let mut request = Request::new(); + request.set_check_tx(RequestCheckTx::new()); + let (response_sender, _) = oneshot::channel(); + let _ = protocol_sender.send((request, response_sender)); + + // Send a deliver tx req. + let mut request = Request::new(); + request.set_deliver_tx(RequestDeliverTx::new()); + let (response_sender, _) = oneshot::channel(); + let _ = protocol_sender.send((request, response_sender)); + + // Start receiving the first three checks. + let mut counter = 0; + while let Ok(msg) = checks_receiver.recv() { + match counter { + 0 => assert!(msg == Checks::CalledStart), + 1 => assert!(msg == Checks::SentCheckTx), + 2 => { + assert!(msg == Checks::SentDeliverTx); + break; + }, + _ => {} + } + counter += 1; + } + + // Simulate an exiting server. + drop(protocol_sender); + + // Check for a last exit message. + let exit = checks_receiver.recv().expect("A last exit message"); + assert!(exit == Checks::CalledExit); + + // Check that the server received the exit message. + loop { + if let Ok(msg) = exit_receiver.try_recv() { + assert!(msg == ExitSignal); + break; + } + } + } +} diff --git a/src/server.rs b/src/server.rs index 0aba324..3950f2c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,96 +1,133 @@ -use std::net::SocketAddr; -use std::ops::DerefMut; -use std::sync::{Arc, Mutex}; - +use crossbeam_channel::Sender; use env_logger::Env; +use std::net::SocketAddr; use tokio; use tokio::codec::Decoder; use tokio::io; -use tokio::net::TcpListener; +use tokio::net::{tcp::Incoming, TcpListener}; use tokio::prelude::*; +use tokio::runtime::current_thread; +use tokio::sync::oneshot; use crate::codec::ABCICodec; use crate::messages::abci::*; -use crate::Application; +use crate::{run_protocol, Application, ExitSignal}; /// Creates the TCP server and listens for connections from Tendermint pub fn serve(app: A, addr: SocketAddr) -> io::Result<()> where - A: Application + 'static + Send + Sync, + A: Application, { env_logger::from_env(Env::default().default_filter_or("info")) .try_init() .ok(); let listener = TcpListener::bind(&addr).unwrap(); + let (exit_sender, exit_receiver) = oneshot::channel(); + let protocol_sender = run_protocol(app, exit_sender); let incoming = listener.incoming(); - let app = Arc::new(Mutex::new(app)); - let server = incoming - .map_err(|err| panic!("Connection failed: {}", err)) - .for_each(move |socket| { - info!("Got connection! {:?}", socket); - let framed = ABCICodec::new().framed(socket); - let (_writer, reader) = framed.split(); - let app_instance = Arc::clone(&app); - - let responses = reader.map(move |request| { - debug!("Got Request! {:?}", request); - respond(&app_instance, &request) - }); + let mut runtime = current_thread::Runtime::new().expect("To start a runtime"); + let server = Server { + runtime_handle: runtime.handle(), + incoming, + protocol_sender, + exit_receiver, + }; - let writes = responses.fold(_writer, |writer, response| { - debug!("Return Response! {:?}", response); - writer.send(response) - }); - tokio::spawn(writes.then(|_| Ok(()))) - }); - tokio::run(server); + runtime + .block_on(server) + .expect("Runtime to block on server"); Ok(()) } -fn respond(app: &Arc>, request: &Request) -> Response -where - A: Application + 'static + Send + Sync, -{ - let mut guard = app.lock().unwrap(); - let app = guard.deref_mut(); +struct Server { + runtime_handle: current_thread::Handle, + incoming: Incoming, + protocol_sender: Sender<(Request, oneshot::Sender)>, + exit_receiver: oneshot::Receiver, +} - let mut response = Response::new(); +struct Connection { + runtime_handle: current_thread::Handle, + protocol_sender: Sender<(Request, oneshot::Sender)>, + response_receiver: oneshot::Receiver, + response_sender: Option>, + writer: Option>>, + reader: Option>>, +} + +impl Future for Server { + type Item = (); + type Error = (); - match request.value { - // Info - Some(Request_oneof_value::info(ref r)) => response.set_info(app.info(r)), - // Init chain - Some(Request_oneof_value::init_chain(ref r)) => response.set_init_chain(app.init_chain(r)), - // Set option - Some(Request_oneof_value::set_option(ref r)) => response.set_set_option(app.set_option(r)), - // Query - Some(Request_oneof_value::query(ref r)) => response.set_query(app.query(r)), - // Check tx - Some(Request_oneof_value::check_tx(ref r)) => response.set_check_tx(app.check_tx(r)), - // Begin block - Some(Request_oneof_value::begin_block(ref r)) => { - response.set_begin_block(app.begin_block(r)) + fn poll(&mut self) -> Result, Self::Error> { + match self.exit_receiver.poll() { + Ok(Async::Ready(_)) => return Ok(Async::Ready(())), + Ok(Async::NotReady) => {} + Err(_) => return Err(()), } - // Deliver Tx - Some(Request_oneof_value::deliver_tx(ref r)) => response.set_deliver_tx(app.deliver_tx(r)), - // End block - Some(Request_oneof_value::end_block(ref r)) => response.set_end_block(app.end_block(r)), - // Commit - Some(Request_oneof_value::commit(ref r)) => response.set_commit(app.commit(r)), - // Flush - Some(Request_oneof_value::flush(_)) => response.set_flush(ResponseFlush::new()), - // Echo - Some(Request_oneof_value::echo(ref r)) => { - let echo_msg = r.get_message().to_string(); - let mut echo = ResponseEcho::new(); - echo.set_message(echo_msg); - response.set_echo(echo); + match self.incoming.poll() { + Ok(Async::Ready(Some(socket))) => { + let framed = ABCICodec::new().framed(socket); + let (writer, reader) = framed.split(); + let (sender, response_receiver) = oneshot::channel(); + let connection = Connection { + runtime_handle: self.runtime_handle.clone(), + protocol_sender: self.protocol_sender.clone(), + response_receiver, + response_sender: Some(sender), + writer: Some(writer), + reader: Some(reader), + }; + self.runtime_handle + .spawn(connection.then(|_| Ok(()))) + .expect("To spawn a connection"); + } + Ok(Async::NotReady) => {} + Err(_) | Ok(Async::Ready(None)) => { + // Connection closed, I assume this will drop the Server struct, + // which will also shutdown the protocol thread, + // when all protocol senders drop. + return Ok(Async::Ready(())); + } + } + Ok(Async::NotReady) + } +} + +impl Future for Connection { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Result, Self::Error> { + if let Some(reader) = &mut self.reader { + match reader.poll() { + Ok(Async::Ready(Some(request))) => { + let _ = self.protocol_sender.send(( + request, + self.response_sender.take().expect("To have a sender"), + )); + self.reader = None; + }, + Ok(Async::Ready(None)) => { + self.reader = None; + }, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_) => return Err(()), + } } - _ => { - let mut re = ResponseException::new(); - re.set_error(String::from("Unrecognized request")); - response.set_exception(re) + if self.writer.is_some() { + match self.response_receiver.poll() { + Ok(Async::Ready(response)) => { + let writer = self.writer.take().expect("To have a writer"); + let writes = writer.send(response); + self.runtime_handle + .spawn(writes.then(|_| Ok(()))) + .expect("To spawn a writer"); + } + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_) => return Err(()), + } } + Ok(Async::Ready(())) } - response }