Skip to content

Commit

Permalink
Bootstrap rpc proxy for light-node API
Browse files Browse the repository at this point in the history
Introducing a proxy which serves the Tendermint RPC API with the help of
connect rpc client. Later to be extended to add light client
functionality. This is in nature equivalent to the Go proxy[0].

Instead of reaching for hyper this change introduces warp to build an
http server. This decision should be evaluated very carefully, while
warp is a thinly wrapper around hyper, it brings a lot of very important
tooling (including websockets), it also has its own set of dependencies.

Also currently unclear if the module that implements the rpc API with
lite client functionality should live in the rpc source tree.

Part of #219

[0] https://github.com/tendermint/tendermint/tree/master/light/proxy
  • Loading branch information
xla committed Jun 16, 2020
1 parent 1df3ac3 commit 740cd1d
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 4 deletions.
9 changes: 5 additions & 4 deletions tendermint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ codecov = { repository = "..."}
[dependencies]
anomaly = "0.2"
async-trait = "0.1"
async-tungstenite = {version="0.5", features = ["tokio-runtime"]}
bytes = "0.5"
chrono = { version = "0.4", features = ["serde"] }
futures = "0.3"
Expand All @@ -42,6 +43,7 @@ hyper = "0.13"
once_cell = "1.3"
prost-amino = "0.5"
prost-amino-derive = "0.5"
ripemd160 = "0.8"
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1" }
serde_bytes = "0.11"
Expand All @@ -54,12 +56,11 @@ subtle = "2"
subtle-encoding = { version = "0.5", features = ["bech32-preview"] }
tai64 = { version = "3", features = ["chrono"] }
thiserror = "1"
tokio = { version = "0.2", features = ["full"] }
toml = { version = "0.5" }
uuid = { version = "0.8", default-features = false }
warp = { version = "0.2", default-features = false }
zeroize = { version = "1.1", features = ["zeroize_derive"] }
async-tungstenite = {version="0.5", features = ["tokio-runtime"]}
tokio = { version = "0.2", features = ["macros"] }
ripemd160 = "0.8"

[dev-dependencies]
serde_json = "1"
pretty_assertions = "0.5"
1 change: 1 addition & 0 deletions tendermint/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod error;
pub mod event_listener;
mod id;
mod method;
mod proxy;
pub mod request;
pub mod response;
mod version;
Expand Down
146 changes: 146 additions & 0 deletions tendermint/src/rpc/proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
//! Tendermint RPC Proxy

#![allow(dead_code)]

use warp::{path, reject, Filter, Rejection, Reply};

use crate::rpc;

impl reject::Reject for rpc::Error {}

impl From<rpc::Error> for Rejection {
fn from(err: rpc::Error) -> Self {
reject::custom(err)
}
}

struct Proxy {
client: rpc::Client,
}

impl Proxy {
fn new(client: rpc::Client) -> Self {
Self { client }
}

async fn serve() {
todo!()
}
}

fn filters(client: rpc::Client) -> impl Filter<Extract = impl Reply, Error = Rejection> {
health_filter(client)
}

fn health_filter(client: rpc::Client) -> impl Filter<Extract = impl Reply, Error = Rejection> {
path("health")
.and(warp::get())
.and(path::end())
.and(warp::any().map(move || client.clone()))
.and_then(handler::health)
}

mod handler {
use std::convert::Infallible;
use warp::http::StatusCode;
use warp::{reply, Rejection, Reply};

use crate::rpc;

pub async fn health(client: rpc::Client) -> Result<impl Reply, Rejection> {
client.health().await?;

Ok(reply())
}

pub async fn recover(err: Rejection) -> Result<impl Reply, Infallible> {
// TODO(xla): Log and trace error.
let (status, res) = {
if err.is_not_found() {
(StatusCode::NOT_FOUND, reply::json(&"Not found".to_string()))
} else if let Some(rpc_err) = err.find::<rpc::Error>() {
(StatusCode::INTERNAL_SERVER_ERROR, reply::json(&rpc_err))
} else {
(
StatusCode::INTERNAL_SERVER_ERROR,
reply::json(&"Something went wrong".to_string()),
)
}
};

Ok(reply::with_status(res, status))
}

#[cfg(test)]
mod test {
use futures::stream::TryStreamExt;
use pretty_assertions::assert_eq;
use serde_json::{json, Value};
use warp::{reject, Rejection, Reply as _};

use crate::rpc;

#[tokio::test]
async fn recover_custom() {
let err = rpc::Error::invalid_params("key field must be alphanumeric");
let have: Value = recover(reject::custom(err.clone())).await;
let want = json!({
"code": i32::from(err.code()),
"data": err.data(),
"message": err.message(),
});

assert_eq!(have, want);
}

#[tokio::test]
async fn recover_not_found() {
let have: Value = recover(reject::not_found()).await;
let want = json!("Not found");

assert_eq!(have, want);
}

async fn recover(err: Rejection) -> Value {
let res = super::recover(err).await.unwrap();
let body = res
.into_response()
.body_mut()
.try_fold(Vec::new(), |mut data, chunk| async move {
data.extend_from_slice(&chunk);
Ok(data)
})
.await
.unwrap();

serde_json::from_slice(&body).unwrap()
}
}
}

#[cfg(test)]
mod test {
use warp::http::StatusCode;
use warp::test::request;
use warp::Filter as _;

use crate::rpc;

#[tokio::test]
async fn health() -> Result<(), rpc::Error> {
let client = rpc::Client::new("tcp://127.0.0.1:0".parse().unwrap());
let api = super::filters(client).recover(super::handler::recover);

let res = request().method("GET").path("/health").reply(&api).await;

assert_eq!(
res.status(),
StatusCode::INTERNAL_SERVER_ERROR,
"response status not {}, the body is:\n{:#?}",
StatusCode::OK,
res.body()
);

Ok(())
}
}

0 comments on commit 740cd1d

Please sign in to comment.