Skip to content

Commit

Permalink
Expand WebSockets event listener capabilities
Browse files Browse the repository at this point in the history
Closes #311.

Adds responding to WebSockets ping messages during the attempt to fetch
the next event from the event stream.

I debated whether or not to wrap the fetching of the next message in a
finite or an infinite loop, but wrapping it in a finite loop would just
push that same responsibility further up the stack. Using this approach
allows us to do away with the optionality of a `ResultEvent` and
simplifies the method signature.

Also, I added in a `close` method for the listener to allow one to
gracefully close the connection (avoids those ugly `websocket: close
1006 (abnormal closure): unexpected EOF` messages in the Tendermint
logs).

Signed-off-by: Thane Thomson <connect@thanethomson.com>
  • Loading branch information
thanethomson committed Jul 17, 2020
1 parent 02fa831 commit ba0674e
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 32 deletions.
110 changes: 83 additions & 27 deletions rpc/src/client/event_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
// TODO(ismail): document fields or re-use the abci types
#![allow(missing_docs)]

use async_tungstenite::{tokio::connect_async, tokio::TokioAdapter, tungstenite::Message};
use async_tungstenite::{
tokio::connect_async, tokio::TokioAdapter, tungstenite::protocol::frame::coding::CloseCode,
tungstenite::protocol::CloseFrame, tungstenite::Error as tungsteniteError,
tungstenite::Message,
};
use futures::prelude::*;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::collections::HashMap;
use std::error::Error as stdError;
use tokio::net::TcpStream;
Expand All @@ -25,12 +30,12 @@ use crate::{endpoint::subscribe, Error as RPCError};
pub enum EventSubscription {
/// Subscribe to all transactions
TransactionSubscription,
///Subscribe to all blocks
/// Subscribe to all blocks
BlockSubscription,
}

impl EventSubscription {
///Convert the query enum to a string
/// Convert the query enum to a string
pub fn as_str(&self) -> &str {
match self {
EventSubscription::TransactionSubscription => "tm.event='Tx'",
Expand Down Expand Up @@ -83,33 +88,84 @@ impl EventListener {
Ok(())
}

/// Get the next event from the websocket
pub async fn get_event(&mut self) -> Result<Option<ResultEvent>, RPCError> {
let msg = self
.socket
.next()
.await
.ok_or_else(|| RPCError::websocket_error("web socket closed"))??;
/// Get the next event from the websocket. Automatically handles websocket
/// protocol details, like responding to ping messages, so it can either
/// produce events or errors.
pub async fn get_event(&mut self) -> Result<ResultEvent, RPCError> {
loop {
let msg = self
.socket
.next()
.await
.ok_or_else(|| RPCError::websocket_error("web socket closed"))??;

if let Ok(result_event) = serde_json::from_str::<WrappedResultEvent>(&msg.to_string()) {
// if we get an rpc error here, we will bubble it up:
return Ok(Some(result_event.into_result()?));
match msg {
Message::Text(msg_text) => {
match serde_json::from_str::<WrappedResultEvent>(msg_text.as_str()) {
// if we get an rpc error here, we will bubble it up:
Ok(result_event) => return Ok(result_event.into_result()?),
Err(e) => {
return Err(RPCError::new(
Code::Other(-1),
Some(format!(
"failed to decode incoming message as an event: {}",
e
)),
))
}
}
}
Message::Ping(_) => {
self.socket
.send(Message::Pong(Vec::new()))
.await
.or_else(|e| {
Err(RPCError::websocket_error(format!(
"failed to send pong response: {}",
e
)))
})?
}
Message::Pong(_) => (),
Message::Close(_) => return Err(RPCError::websocket_error("web socket closed")),
Message::Binary(_) => {
return Err(RPCError::websocket_error(
"received unexpected binary websocket message",
))
}
}
}
dbg!("We did not receive a valid JSONRPC wrapped ResultEvent!");
if serde_json::from_str::<String>(&msg.to_string()).is_ok() {
// FIXME(ismail): Until this is a proper websocket client
// (or the endpoint moved to grpc in tendermint), we accept whatever was read here
// dbg! it out and return None below.
dbg!("Instead of JSONRPC wrapped ResultEvent, we got:");
dbg!(&msg.to_string());
return Ok(None);
}

/// Attempts to gracefully close the websocket connection and consumes the
/// listener.
pub async fn close(mut self) -> Result<(), RPCError> {
let _ = self
.socket
.close(Some(CloseFrame {
code: CloseCode::Normal,
reason: Cow::from("client closed connection"),
}))
.await
.or_else(|e| {
Err(RPCError::websocket_error(format!(
"failed to close web socket connection: {}",
e
)))
})?;
// try to gracefully close the connection
match self.socket.next().await {
Some(r) => match r {
// we didn't get the connection closed message we wanted, so force connection closure
Ok(_) => Ok(()),
Err(e) => match e {
// this is what we want
tungsteniteError::ConnectionClosed | tungsteniteError::AlreadyClosed => Ok(()),
_ => return Err(RPCError::websocket_error(e.to_string())),
},
},
None => Ok(()),
}
dbg!("received neither event nor generic string message:");
dbg!(&msg.to_string());
Err(RPCError::new(
Code::Other(-1),
Some("received neither event nor generic string message".to_string()),
))
}
}

Expand Down
11 changes: 6 additions & 5 deletions tendermint/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,10 @@ mod rpc {
// client.subscribe("tm.event='NewBlock'".to_owned()).await.unwrap();

// Loop here is helpful when debugging parsing of JSON events
// loop{
let maybe_result_event = client.get_event().await.unwrap();
dbg!(&maybe_result_event);
// }
let result_event = maybe_result_event.expect("unexpected msg read");
//loop{
//for _ in 0..5 {
let result_event = client.get_event().await.unwrap();
dbg!(&result_event);
match result_event.data {
event_listener::TMEventData::EventDataNewBlock(nb) => {
dbg!("got EventDataNewBlock: {:?}", nb);
Expand All @@ -175,5 +174,7 @@ mod rpc {
panic!("got a GenericJSONEvent: {:?}", v);
}
}
//}
client.close().await.unwrap();
}
}

0 comments on commit ba0674e

Please sign in to comment.