Skip to content

Commit

Permalink
Merge pull request #342 from jaspervdm/futures0.3
Browse files Browse the repository at this point in the history
Update hyper/tokio/futures dependencies
  • Loading branch information
jaspervdm committed Feb 24, 2020
2 parents 9213559 + 6024d8e commit bed60df
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 954 deletions.
662 changes: 142 additions & 520 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 3 additions & 5 deletions controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ edition = "2018"
[dependencies]
failure = "0.1"
failure_derive = "0.1"
futures = "0.1"
hyper = "0.12"
futures = "0.3"
hyper = "0.13"
rand = "0.5"
serde = "1"
serde_derive = "1"
Expand All @@ -23,9 +23,7 @@ log = "0.4"
prettytable-rs = "0.7"
ring = "0.16"
term = "0.5"
tokio = "= 0.1.11"
tokio-core = "0.1"
tokio-retry = "0.1"
tokio = { version = "0.2", features = ["full"] }
uuid = { version = "0.7", features = ["serde", "v4"] }
url = "1.7.0"
chrono = { version = "0.4.4", features = ["serde"] }
Expand Down
16 changes: 4 additions & 12 deletions impls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,17 @@ edition = "2018"
blake2-rfc = "0.2"
failure = "0.1"
failure_derive = "0.1"
futures = "0.1"
futures = "0.3"
rand = "0.5"
semver = "0.9"
serde = "1"
serde_derive = "1"
serde_json = "1"
log = "0.4"
ring = "0.16"
tokio = "= 0.1.11"
tokio-core = "0.1"
tokio-retry = "0.1"
tokio = { version = "0.2", features = ["full"] }
uuid = { version = "0.7", features = ["serde", "v4"] }
chrono = { version = "0.4.4", features = ["serde"] }
jsonrpc-client-core = "0.5.0"
jsonrpc-client-http = "0.5.0"

#http client (copied from grin)
http = "0.1.5"
Expand All @@ -36,12 +32,8 @@ hyper-timeout = "0.3"

#Socks/Tor
byteorder = "1"
hyper = "0.12"
#hyper-tls = "0.1"
tokio-tcp = "0.1"
tokio-io = "0.1"
#native-tls = "0.1"
#tokio-tls = "0.1"
hyper = "0.13"
hyper-socks2 = "0.4"
ed25519-dalek = "1.0.0-pre.1"
data-encoding = "2"
regex = "1.3"
Expand Down
211 changes: 72 additions & 139 deletions impls/src/client_utils/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,11 @@

//! High level JSON/HTTP client API

use crate::client_utils::Socksv5Connector;
use crate::util::to_base64;
use failure::{Backtrace, Context, Fail, ResultExt};
use futures::future::result;
use futures::future::{err, ok, Either};
use futures::stream::Stream;
use http::uri::{InvalidUri, Uri};
use hyper::body;
use hyper::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE, USER_AGENT};
use hyper::rt::Future;
use hyper::{self, Body, Request};
use hyper::{self, Body, Client as HyperClient, Request, Uri};
use hyper_rustls;
use hyper_timeout::TimeoutConnector;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -89,8 +84,6 @@ impl From<Context<ErrorKind>> for Error {
}
}

pub type ClientResponseFuture<T> = Box<dyn Future<Item = T, Error = Error> + Send>;

pub struct Client {
/// Whether to use socks proxy
pub use_socks: bool,
Expand Down Expand Up @@ -120,18 +113,16 @@ impl Client {
/// Helper function to easily issue an async HTTP GET request against a given
/// URL that returns a future. Handles request building, JSON deserialization
/// and response code checking.
pub fn _get_async<'a, T>(
pub async fn _get_async<'a, T>(
&self,
url: &'a str,
api_secret: Option<String>,
) -> ClientResponseFuture<T>
) -> Result<T, Error>
where
for<'de> T: Deserialize<'de> + Send + 'static,
{
match self.build_request(url, "GET", api_secret, None) {
Ok(req) => Box::new(self.handle_request_async(req)),
Err(e) => Box::new(err(e)),
}
self.handle_request_async(self.build_request(url, "GET", api_secret, None)?)
.await
}

/// Helper function to easily issue a HTTP GET request
Expand Down Expand Up @@ -165,21 +156,19 @@ impl Client {
/// provided JSON object as body on a given URL that returns a future. Handles
/// request building, JSON serialization and deserialization, and response code
/// checking.
pub fn post_async<IN, OUT>(
pub async fn post_async<IN, OUT>(
&self,
url: &str,
input: &IN,
api_secret: Option<String>,
) -> ClientResponseFuture<OUT>
) -> Result<OUT, Error>
where
IN: Serialize,
OUT: Send + 'static,
for<'de> OUT: Deserialize<'de>,
{
match self.create_post_request(url, api_secret, input) {
Ok(req) => Box::new(self.handle_request_async(req)),
Err(e) => Box::new(err(e)),
}
self.handle_request_async(self.create_post_request(url, api_secret, input)?)
.await
}

/// Helper function to easily issue a HTTP POST request with the provided JSON
Expand All @@ -204,19 +193,18 @@ impl Client {
/// provided JSON object as body on a given URL that returns a future. Handles
/// request building, JSON serialization and deserialization, and response code
/// checking.
pub fn _post_no_ret_async<IN>(
pub async fn _post_no_ret_async<IN>(
&self,
url: &str,
api_secret: Option<String>,
input: &IN,
) -> ClientResponseFuture<()>
) -> Result<(), Error>
where
IN: Serialize,
{
match self.create_post_request(url, api_secret, input) {
Ok(req) => Box::new(self.send_request_async(req).and_then(|_| ok(()))),
Err(e) => Box::new(err(e)),
}
self.send_request_async(self.create_post_request(url, api_secret, input)?)
.await?;
Ok(())
}

fn build_request(
Expand All @@ -226,14 +214,13 @@ impl Client {
api_secret: Option<String>,
body: Option<String>,
) -> Result<Request<Body>, Error> {
let uri = url.parse::<Uri>().map_err::<Error, _>(|e: InvalidUri| {
e.context(ErrorKind::Argument(format!("Invalid url {}", url)))
.into()
})?;
let uri: Uri = url
.parse()
.map_err(|_| ErrorKind::RequestError(format!("Invalid url {}", url)))?;
let mut builder = Request::builder();
if let Some(api_secret) = api_secret {
let basic_auth = format!("Basic {}", to_base64(&format!("grin:{}", api_secret)));
builder.header(AUTHORIZATION, basic_auth);
builder = builder.header(AUTHORIZATION, basic_auth);
}

builder
Expand Down Expand Up @@ -277,124 +264,70 @@ impl Client {
})
}

fn handle_request_async<T>(&self, req: Request<Body>) -> ClientResponseFuture<T>
async fn handle_request_async<T>(&self, req: Request<Body>) -> Result<T, Error>
where
for<'de> T: Deserialize<'de> + Send + 'static,
{
Box::new(self.send_request_async(req).and_then(|data| {
serde_json::from_str(&data).map_err(|e| {
e.context(ErrorKind::ResponseError("Cannot parse response".to_owned()))
.into()
})
}))
let data = self.send_request_async(req).await?;
let ser = serde_json::from_str(&data)
.map_err(|e| e.context(ErrorKind::ResponseError("Cannot parse response".to_owned())))?;
Ok(ser)
}

fn send_request_async(
&self,
req: Request<Body>,
) -> Box<dyn Future<Item = String, Error = Error> + Send> {
//TODO: redundant code, enjoy figuring out type params for dynamic dispatch of client
match self.use_socks {
false => {
let https = hyper_rustls::HttpsConnector::new(1);
let mut connector = TimeoutConnector::new(https);
connector.set_connect_timeout(Some(Duration::from_secs(20)));
connector.set_read_timeout(Some(Duration::from_secs(20)));
connector.set_write_timeout(Some(Duration::from_secs(20)));
let client = hyper::Client::builder().build::<_, hyper::Body>(connector);
Box::new(
client
.request(req)
.map_err(|e| {
ErrorKind::RequestError(format!("Cannot make request: {}", e)).into()
})
.and_then(|resp| {
if !resp.status().is_success() {
Either::A(err(ErrorKind::RequestError(format!(
"Wrong response code: {} with data {:?}",
resp.status(),
resp.body()
))
.into()))
} else {
Either::B(
resp.into_body()
.map_err(|e| {
ErrorKind::RequestError(format!(
"Cannot read response body: {}",
e
))
.into()
})
.concat2()
.and_then(|ch| {
ok(String::from_utf8_lossy(&ch.to_vec()).to_string())
}),
)
}
}),
)
}
true => {
let addr = match self.socks_proxy_addr {
Some(a) => a,
None => {
return Box::new(result(Err(ErrorKind::RequestError(
"Can't parse Socks proxy address".to_string(),
)
.into())))
}
};
let socks_connector = Socksv5Connector::new(addr);
let mut connector = TimeoutConnector::new(socks_connector);
connector.set_connect_timeout(Some(Duration::from_secs(20)));
connector.set_read_timeout(Some(Duration::from_secs(20)));
connector.set_write_timeout(Some(Duration::from_secs(20)));
let client = hyper::Client::builder().build::<_, hyper::Body>(connector);
Box::new(
client
.request(req)
.map_err(|e| {
ErrorKind::RequestError(format!("Cannot make request: {}", e)).into()
})
.and_then(|resp| {
if !resp.status().is_success() {
Either::A(err(ErrorKind::RequestError(format!(
"Wrong response code: {} with data {:?}",
resp.status(),
resp.body()
))
.into()))
} else {
Either::B(
resp.into_body()
.map_err(|e| {
ErrorKind::RequestError(format!(
"Cannot read response body: {}",
e
))
.into()
})
.concat2()
.and_then(|ch| {
ok(String::from_utf8_lossy(&ch.to_vec()).to_string())
}),
)
}
}),
)
}
}
async fn send_request_async(&self, req: Request<Body>) -> Result<String, Error> {
let resp = if !self.use_socks {
let https = hyper_rustls::HttpsConnector::new();
let mut connector = TimeoutConnector::new(https);
connector.set_connect_timeout(Some(Duration::from_secs(20)));
connector.set_read_timeout(Some(Duration::from_secs(20)));
connector.set_write_timeout(Some(Duration::from_secs(20)));
let client = HyperClient::builder().build::<_, Body>(connector);

client.request(req).await
} else {
let addr = self.socks_proxy_addr.ok_or_else(|| {
ErrorKind::RequestError("Missing Socks proxy address".to_string())
})?;
let auth = format!("{}:{}", addr.ip(), addr.port());

let https = hyper_rustls::HttpsConnector::new();
let socks = hyper_socks2::SocksConnector {
proxy_addr: hyper::Uri::builder()
.scheme("socks5")
.authority(auth.as_str())
.path_and_query("/")
.build()
.map_err(|_| {
ErrorKind::RequestError("Can't parse Socks proxy address".to_string())
})?,
auth: None,
connector: https,
};
let mut connector = TimeoutConnector::new(socks);
connector.set_connect_timeout(Some(Duration::from_secs(20)));
connector.set_read_timeout(Some(Duration::from_secs(20)));
connector.set_write_timeout(Some(Duration::from_secs(20)));
let client = HyperClient::builder().build::<_, Body>(connector);

client.request(req).await
};
let resp =
resp.map_err(|e| ErrorKind::RequestError(format!("Cannot make request: {}", e)))?;

let raw = body::to_bytes(resp)
.await
.map_err(|e| ErrorKind::RequestError(format!("Cannot read response body: {}", e)))?;

Ok(String::from_utf8_lossy(&raw).to_string())
}

pub fn send_request(&self, req: Request<Body>) -> Result<String, Error> {
let task = self.send_request_async(req);
let mut rt = Builder::new()
.core_threads(1)
.basic_scheduler()
.enable_all()
.build()
.context(ErrorKind::Internal("can't create Tokio runtime".to_owned()))?;
let res = rt.block_on(task);
let _ = rt.shutdown_now().wait();
res
rt.block_on(task)
}
}
2 changes: 0 additions & 2 deletions impls/src/client_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,5 @@

mod client;
pub mod json_rpc;
mod socksv5;

pub use self::socksv5::Socksv5Connector;
pub use client::{Client, Error as ClientError};
Loading

0 comments on commit bed60df

Please sign in to comment.