diff --git a/tendermint/Cargo.toml b/tendermint/Cargo.toml index 640565eaa..c47cac900 100644 --- a/tendermint/Cargo.toml +++ b/tendermint/Cargo.toml @@ -33,6 +33,7 @@ codecov = { repository = "..."} [dependencies] anomaly = "0.2" async-trait = "0.1" +async-tungstenite = {version="0.5", features = ["tokio-runtime"]} bytes = "0.5" chrono = { version = "0.4", features = ["serde"] } futures = "0.3" @@ -42,6 +43,7 @@ hyper = "0.13" once_cell = "1.3" prost-amino = "0.5" prost-amino-derive = "0.5" +ripemd160 = "0.8" serde = { version = "1", features = ["derive"] } serde_json = { version = "1" } serde_bytes = "0.11" @@ -54,12 +56,11 @@ subtle = "2" subtle-encoding = { version = "0.5", features = ["bech32-preview"] } tai64 = { version = "3", features = ["chrono"] } thiserror = "1" +tokio = { version = "0.2", features = ["full"] } toml = { version = "0.5" } uuid = { version = "0.8", default-features = false } +warp = { version = "0.2", default-features = false } zeroize = { version = "1.1", features = ["zeroize_derive"] } -async-tungstenite = {version="0.5", features = ["tokio-runtime"]} -tokio = { version = "0.2", features = ["macros"] } -ripemd160 = "0.8" [dev-dependencies] -serde_json = "1" +pretty_assertions = "0.5" diff --git a/tendermint/src/rpc.rs b/tendermint/src/rpc.rs index 175c52bfc..ec818fc28 100644 --- a/tendermint/src/rpc.rs +++ b/tendermint/src/rpc.rs @@ -10,6 +10,7 @@ pub mod error; pub mod event_listener; mod id; mod method; +mod proxy; pub mod request; pub mod response; mod version; diff --git a/tendermint/src/rpc/proxy.rs b/tendermint/src/rpc/proxy.rs new file mode 100644 index 000000000..fb722805e --- /dev/null +++ b/tendermint/src/rpc/proxy.rs @@ -0,0 +1,146 @@ +//! Tendermint RPC Proxy + +#![allow(dead_code)] + +use warp::{path, reject, Filter, Rejection, Reply}; + +use crate::rpc; + +impl reject::Reject for rpc::Error {} + +impl From for Rejection { + fn from(err: rpc::Error) -> Self { + reject::custom(err) + } +} + +struct Proxy { + client: rpc::Client, +} + +impl Proxy { + fn new(client: rpc::Client) -> Self { + Self { client } + } + + async fn serve() { + todo!() + } +} + +fn filters(client: rpc::Client) -> impl Filter { + health_filter(client) +} + +fn health_filter(client: rpc::Client) -> impl Filter { + path("health") + .and(warp::get()) + .and(path::end()) + .and(warp::any().map(move || client.clone())) + .and_then(handler::health) +} + +mod handler { + use std::convert::Infallible; + use warp::http::StatusCode; + use warp::{reply, Rejection, Reply}; + + use crate::rpc; + + pub async fn health(client: rpc::Client) -> Result { + client.health().await?; + + Ok(reply()) + } + + pub async fn recover(err: Rejection) -> Result { + // TODO(xla): Log and trace error. + let (status, res) = { + if err.is_not_found() { + (StatusCode::NOT_FOUND, reply::json(&"Not found".to_string())) + } else if let Some(rpc_err) = err.find::() { + (StatusCode::INTERNAL_SERVER_ERROR, reply::json(&rpc_err)) + } else { + ( + StatusCode::INTERNAL_SERVER_ERROR, + reply::json(&"Something went wrong".to_string()), + ) + } + }; + + Ok(reply::with_status(res, status)) + } + + #[cfg(test)] + mod test { + use futures::stream::TryStreamExt; + use pretty_assertions::assert_eq; + use serde_json::{json, Value}; + use warp::{reject, Rejection, Reply as _}; + + use crate::rpc; + + #[tokio::test] + async fn recover_custom() { + let err = rpc::Error::invalid_params("key field must be alphanumeric"); + let have: Value = recover(reject::custom(err.clone())).await; + let want = json!({ + "code": i32::from(err.code()), + "data": err.data(), + "message": err.message(), + }); + + assert_eq!(have, want); + } + + #[tokio::test] + async fn recover_not_found() { + let have: Value = recover(reject::not_found()).await; + let want = json!("Not found"); + + assert_eq!(have, want); + } + + async fn recover(err: Rejection) -> Value { + let res = super::recover(err).await.unwrap(); + let body = res + .into_response() + .body_mut() + .try_fold(Vec::new(), |mut data, chunk| async move { + data.extend_from_slice(&chunk); + Ok(data) + }) + .await + .unwrap(); + + serde_json::from_slice(&body).unwrap() + } + } +} + +#[cfg(test)] +mod test { + use warp::http::StatusCode; + use warp::test::request; + use warp::Filter as _; + + use crate::rpc; + + #[tokio::test] + async fn health() -> Result<(), rpc::Error> { + let client = rpc::Client::new("tcp://127.0.0.1:0".parse().unwrap()); + let api = super::filters(client).recover(super::handler::recover); + + let res = request().method("GET").path("/health").reply(&api).await; + + assert_eq!( + res.status(), + StatusCode::INTERNAL_SERVER_ERROR, + "response status not {}, the body is:\n{:#?}", + StatusCode::OK, + res.body() + ); + + Ok(()) + } +}