Skip to content
This repository has been archived by the owner on Mar 11, 2021. It is now read-only.

Commit

Permalink
add a way to check the response in the responder
Browse files Browse the repository at this point in the history
  • Loading branch information
gterzian committed Nov 4, 2019
1 parent fe72b65 commit b171b03
Showing 1 changed file with 80 additions and 9 deletions.
89 changes: 80 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,27 +141,71 @@ pub trait Application: 'static + Send {
}
}

/// Represents the type of response that can be send with a Responder.
pub enum ResponseType {
Info,
SetOption,
Query,
CheckTx,
InitChain,
BeginBlock,
DeliverTx,
EndBlock,
Commit,
Flush,
Echo,
Exception,
}

/// To be used by the application to provide a response,
/// possibly asynchronously, for a specific request.
pub struct Responder {
response_sender: Sender<(Response, oneshot::Sender<Response>)>,
network_sender: oneshot::Sender<Response>,
response_type: ResponseType,
}

impl Responder {
pub fn new(
response_sender: Sender<(Response, oneshot::Sender<Response>)>,
network_sender: oneshot::Sender<Response>,
response_type: ResponseType,
) -> Self {
Responder {
response_sender,
network_sender: network_sender,
response_type,
}
}

fn check_response(&self, response: &Response) -> Result<(), ()> {
let has_expected = match self.response_type {
ResponseType::Info => response.has_info(),
ResponseType::SetOption => response.has_set_option(),
ResponseType::Query => response.has_query(),
ResponseType::CheckTx => response.has_check_tx(),
ResponseType::InitChain => response.has_init_chain(),
ResponseType::BeginBlock => response.has_begin_block(),
ResponseType::DeliverTx => response.has_deliver_tx(),
ResponseType::EndBlock => response.has_end_block(),
ResponseType::Commit => response.has_commit(),
ResponseType::Flush => response.has_flush(),
ResponseType::Echo => response.has_echo(),
ResponseType::Exception => response.has_exception(),
};
if !has_expected {
return Err(());
}
Ok(())
}

/// Respond with a response, consuming itself.
/// Sends the response back to the server asynchronously, does not block.
/// Checks that the response matches the request for which the responder was created.
pub fn respond(self, response: Response) -> Result<(), ()> {
if self.check_response(&response).is_err() {
return Err(())
}
if self
.response_sender
.send((response, self.network_sender))
Expand Down Expand Up @@ -284,36 +328,62 @@ impl Protocol {
/// handling these calls in the current thread, or handling them in asynchronous fashion.
fn handle_network_request(&mut self, request: Request, sender: oneshot::Sender<Response>) {
// TODO: sanity checks on request.
let responder = Responder::new(self.response_sender.clone(), sender);
match request.value {
// Info
Some(Request_oneof_value::info(r)) => {
let responder = Responder::new(self.response_sender.clone(), sender, ResponseType::Info);
self.application.info(r, responder);
}
// Init chain
Some(Request_oneof_value::init_chain(r)) => self.application.init_chain(r, responder),
Some(Request_oneof_value::init_chain(r)) => {
let responder = Responder::new(self.response_sender.clone(), sender, ResponseType::InitChain);
self.application.init_chain(r, responder);
},
// Set option
Some(Request_oneof_value::set_option(r)) => self.application.set_option(r, responder),
Some(Request_oneof_value::set_option(r)) => {
let responder = Responder::new(self.response_sender.clone(), sender, ResponseType::SetOption);
self.application.set_option(r, responder);
},
// Query
Some(Request_oneof_value::query(r)) => self.application.query(r, responder),
Some(Request_oneof_value::query(r)) => {
let responder = Responder::new(self.response_sender.clone(), sender, ResponseType::Query);
self.application.query(r, responder);
},
// Check tx
Some(Request_oneof_value::check_tx(r)) => self.application.check_tx(r, responder),
Some(Request_oneof_value::check_tx(r)) => {
let responder = Responder::new(self.response_sender.clone(), sender, ResponseType::CheckTx);
self.application.check_tx(r, responder);
},
// Begin block
Some(Request_oneof_value::begin_block(r)) => self.application.begin_block(r, responder),
Some(Request_oneof_value::begin_block(r)) => {
let responder = Responder::new(self.response_sender.clone(), sender, ResponseType::BeginBlock);
self.application.begin_block(r, responder);
},
// Deliver Tx
Some(Request_oneof_value::deliver_tx(r)) => self.application.deliver_tx(r, responder),
Some(Request_oneof_value::deliver_tx(r)) => {
let responder = Responder::new(self.response_sender.clone(), sender, ResponseType::DeliverTx);
self.application.deliver_tx(r, responder);
},
// End block
Some(Request_oneof_value::end_block(r)) => self.application.end_block(r, responder),
Some(Request_oneof_value::end_block(r)) => {
let responder = Responder::new(self.response_sender.clone(), sender, ResponseType::EndBlock);
self.application.end_block(r, responder);
},
// Commit
Some(Request_oneof_value::commit(r)) => self.application.commit(r, responder),
Some(Request_oneof_value::commit(r)) => {
let responder = Responder::new(self.response_sender.clone(), sender, ResponseType::Commit);
self.application.commit(r, responder);
},
// Flush
Some(Request_oneof_value::flush(_)) => {
let responder = Responder::new(self.response_sender.clone(), sender, ResponseType::Flush);
let mut response = Response::new();
response.set_flush(ResponseFlush::new());
let _ = responder.respond(response);
}
// Echo
Some(Request_oneof_value::echo(r)) => {
let responder = Responder::new(self.response_sender.clone(), sender, ResponseType::Echo);
let mut response = Response::new();
let echo_msg = r.get_message().to_string();
let mut echo = ResponseEcho::new();
Expand All @@ -322,6 +392,7 @@ impl Protocol {
let _ = responder.respond(response);
}
_ => {
let responder = Responder::new(self.response_sender.clone(), sender, ResponseType::Exception);
let mut response = Response::new();
let mut re = ResponseException::new();
re.set_error(String::from("Unrecognized request"));
Expand Down

0 comments on commit b171b03

Please sign in to comment.