Skip to content
Permalink
Browse files

Change Connect to use hyper API

The connection maker abstraction is now the Connect trait from hyper.
The non-abstract Destination as the connection target allows defaulting
the origin header to the destination URI.
  • Loading branch information...
mzabaluev committed Jun 12, 2019
1 parent 6266664 commit 88c8dcb18a381211240491c5c81f520b30762024
Showing with 70 additions and 121 deletions.
  1. +1 −1 network-grpc/Cargo.toml
  2. +0 −5 network-grpc/src/client.rs
  3. +69 −35 network-grpc/src/client/connect.rs
  4. +0 −80 network-grpc/src/client/transport.rs
@@ -16,11 +16,11 @@ network-core = { path = "../network-core" }
bytes = "0.4"
futures = "0.1"
http = "0.1.16"
http-connection = "0.1"
hyper = "0.12"
prost = "0.5"
tokio = "0.1"
tower-grpc = "0.1"
tower-http-util = "0.1"
tower-service = "0.2"
tower-util = "0.1"

@@ -1,5 +1,4 @@
mod connect;
mod transport;

use crate::{
convert::{
@@ -25,10 +24,6 @@ use std::marker::PhantomData;

pub use connect::{Connect, ConnectError, ConnectFuture};

pub use transport::TcpConnector;
#[cfg(unix)]
pub use transport::UnixConnector;

/// Traits setting additional bounds for blockchain entities
/// that need to be satisfied for the protocol implementation.
///
@@ -6,19 +6,21 @@ use network_core::gossip;
use futures::prelude::*;
use futures::try_ready;
use http::uri::{self, Uri};
use http_connection::HttpConnection;
use hyper::client::connect::Connect as HyperConnect;
use tower_grpc::BoxBody;
use tower_http_util::connection::HttpMakeConnection;
use tower_hyper::client::ConnectExecutor;
use tower_hyper::util::{Connector, Destination};
use tower_util::MakeService;

use std::{error::Error, fmt, mem};

/// Builder-like API for establishing a protocol client connection.
pub struct Connect<P, A, C, E>
pub struct Connect<P, C, E>
where
P: ProtocolConfig,
{
tower_connect: tower_hyper::client::Connect<A, BoxBody, C, E>,
tower_connect: tower_hyper::client::Connect<Destination, BoxBody, Connector<C>, E>,
origin: Option<Origin>,
node_id: Option<<P::Node as gossip::Node>::Id>,
}
@@ -28,14 +30,14 @@ struct Origin {
authority: uri::Authority,
}

impl<P, A, C, E> Connect<P, A, C, E>
impl<P, C, E> Connect<P, C, E>
where
P: ProtocolConfig,
C: HttpMakeConnection<A> + 'static,
C::Connection: Send + 'static,
E: ConnectExecutor<C::Connection, BoxBody> + Clone,
C: HyperConnect,
C::Transport: HttpConnection,
{
pub fn new(connector: C, executor: E) -> Self {
let connector = Connector::new(connector);
let mut settings = tower_hyper::client::Builder::new();
settings.http2_only(true);
let tower_connect =
@@ -46,7 +48,12 @@ where
node_id: None,
}
}
}

impl<P, C, E> Connect<P, C, E>
where
P: ProtocolConfig,
{
pub fn origin(&mut self, scheme: uri::Scheme, authority: uri::Authority) -> &mut Self {
self.origin = Some(Origin { scheme, authority });
self
@@ -56,26 +63,53 @@ where
self.node_id = Some(id);
self
}
}

pub fn connect(&mut self, target: A) -> ConnectFuture<P, A, C, E> {
let origin_uri = match self.origin {
impl<P, C, E> Connect<P, C, E>
where
P: ProtocolConfig,
C: HyperConnect,
{
fn origin_uri(&self, target: &Destination) -> Result<Uri, ConnectError<C::Error>> {
let mut builder = Uri::builder();
match self.origin {
Some(ref origin) => {
match Uri::builder()
builder
.scheme(origin.scheme.clone())
.authority(origin.authority.clone())
.path_and_query("")
.build()
{
Ok(uri) => uri,
Err(e) => {
return ConnectFuture::error(ConnectError(ErrorKind::InvalidOrigin(e)));
}
}
.authority(origin.authority.clone());
}
None => {
return ConnectFuture::error(ConnectError(ErrorKind::OriginMissing));
builder.scheme(target.scheme());
let host = target.host();
match target.port() {
None => {
builder.authority(host);
}
Some(port) => {
builder.authority(format!("{}:{}", host, port).as_str());
}
}
}
};
builder.path_and_query("");
builder
.build()
.map_err(|e| ConnectError(ErrorKind::InvalidOrigin(e)))
}
}

impl<P, C, E> Connect<P, C, E>
where
P: ProtocolConfig,
C: HyperConnect + 'static,
C::Transport: HttpConnection,
E: ConnectExecutor<C::Transport, BoxBody> + Clone,
{
pub fn connect(&mut self, target: Destination) -> ConnectFuture<P, C, E> {
let origin_uri = match self.origin_uri(&target) {
Ok(uri) => uri,
Err(e) => return ConnectFuture::error(e),
};
let node_id = self.node_id.clone();
let inner = self.tower_connect.make_service(target);
ConnectFuture {
@@ -90,32 +124,35 @@ where

/// Completes with a protocol client Connection when it has been
/// set up.
pub struct ConnectFuture<P, A, C, E>
pub struct ConnectFuture<P, C, E>
where
P: ProtocolConfig,
C: HttpMakeConnection<A>,
C: HyperConnect,
C::Transport: HttpConnection,
{
state: State<P, A, C, E>,
state: State<P, C, E>,
}

enum State<P, A, C, E>
enum State<P, C, E>
where
P: ProtocolConfig,
C: HttpMakeConnection<A>,
C: HyperConnect,
C::Transport: HttpConnection,
{
Connecting {
inner: tower_hyper::client::ConnectFuture<A, BoxBody, C, E>,
inner: tower_hyper::client::ConnectFuture<Destination, BoxBody, Connector<C>, E>,
origin_uri: Uri,
node_id: Option<<P::Node as gossip::Node>::Id>,
},
Error(ConnectError<C::Error>),
Finished,
}

impl<P, A, C, E> ConnectFuture<P, A, C, E>
impl<P, C, E> ConnectFuture<P, C, E>
where
P: ProtocolConfig,
C: HttpMakeConnection<A>,
C: HyperConnect,
C::Transport: HttpConnection,
{
fn error(err: ConnectError<C::Error>) -> Self {
ConnectFuture {
@@ -124,12 +161,12 @@ where
}
}

impl<P, A, C, E> Future for ConnectFuture<P, A, C, E>
impl<P, C, E> Future for ConnectFuture<P, C, E>
where
P: ProtocolConfig,
C: HttpMakeConnection<A>,
C::Connection: Send + 'static,
E: ConnectExecutor<C::Connection, BoxBody>,
C: HyperConnect,
C::Transport: HttpConnection,
E: ConnectExecutor<C::Transport, BoxBody>,
{
type Item = Connection<P>;
type Error = ConnectError<C::Error>;
@@ -168,15 +205,13 @@ pub struct ConnectError<T>(ErrorKind<T>);
#[derive(Debug)]
enum ErrorKind<T> {
Http(tower_hyper::client::ConnectError<T>),
OriginMissing,
InvalidOrigin(http::Error),
}

impl<T> fmt::Display for ConnectError<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.0 {
ErrorKind::Http(_) => write!(f, "HTTP/2.0 connection error"),
ErrorKind::OriginMissing => write!(f, "request origin not specified"),
ErrorKind::InvalidOrigin(_) => write!(f, "invalid request origin"),
}
}
@@ -189,7 +224,6 @@ where
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self.0 {
ErrorKind::Http(ref e) => Some(e),
ErrorKind::OriginMissing => None,
ErrorKind::InvalidOrigin(ref e) => Some(e),
}
}

This file was deleted.

0 comments on commit 88c8dcb

Please sign in to comment.
You can’t perform that action at this time.