Skip to content
Permalink
Browse files

Update to API changes in network-grpc

Some useful code reorg in the network module as well.
  • Loading branch information...
mzabaluev committed Jun 12, 2019
1 parent 3e5e425 commit 03587fcd08cf238790b69529ba509663b806d87d

Large diffs are not rendered by default.

@@ -34,6 +34,7 @@ cbor_event = "2.1.2"
cryptoxide = "0.1"
futures = "0.1"
http = "0.1.16"
hyper = "0.12"
mime = "^0.3.7"
tokio = "^0.1.16"
structopt = "^0.2"
@@ -21,6 +21,7 @@ extern crate cryptoxide;
extern crate futures;
extern crate generic_array;
extern crate http;
extern crate hyper;
extern crate jormungandr_utils;
extern crate native_tls;
extern crate network_core;
@@ -13,6 +13,7 @@ use crate::{
use futures::prelude::*;
use network_core::{
client::{block::BlockService, gossip::GossipService, P2pService},
gossip::Node,
subscription::BlockEvent,
};
use slog::Logger;
@@ -215,7 +216,7 @@ pub fn connect(
channels: Channels,
) -> impl Future<Item = (Client<grpc::Connection>, PeerComms), Error = ()> {
let err_logger = state.logger().clone();
grpc::connect(&state)
grpc::connect(state.connection, Some(state.global.as_ref().node.id()))
.map_err(move |err| {
warn!(err_logger, "error connecting to peer: {:?}", err);
})
@@ -1,23 +1,18 @@
use super::super::BlockConfig;
use super::origin_authority;
use super::connect;
use crate::{blockchain::BlockchainR, settings::start::network::Peer};
use blockcfg::Block;
use chain_core::property::HasHeader;
use http::uri;
use network_core::client::block::BlockService as _;
use network_grpc::client::{Connect, Connection, TcpConnector};
use network_grpc::client::Connection;
use slog::Logger;
use std::fmt::Debug;
use tokio::prelude::*;
use tokio::{executor::DefaultExecutor, runtime::current_thread};
use tokio::runtime::current_thread;

pub fn bootstrap_from_peer(peer: Peer, blockchain: BlockchainR, logger: &Logger) {
info!(logger, "connecting to bootstrap peer {}", peer.connection);
let addr = peer.address();
let origin = origin_authority(addr);
let bootstrap = Connect::new(TcpConnector, DefaultExecutor::current())
.origin(uri::Scheme::HTTP, origin)
.connect(addr)
let bootstrap = connect(peer.address(), None)
.map_err(move |e| {
error!(logger, "failed to connect to bootstrap peer: {:?}", e);
})
@@ -1,28 +1,38 @@
use super::origin_authority;
use crate::{
blockcfg::{Block, HeaderHash},
network::{BlockConfig, ConnectionState, FetchBlockError},
network::p2p::topology::NodeId,
network::{BlockConfig, FetchBlockError},
settings::start::network::Peer,
};
use futures::prelude::*;
use http::uri;
use network_core::{client::block::BlockService, gossip::Node};
use network_grpc::client::{Connect, ConnectFuture, TcpConnector};
use http::{HttpTryFrom, Uri};
use hyper::client::connect::{Destination, HttpConnector};
use network_core::client::block::BlockService;
use network_grpc::client::{Connect, ConnectFuture};
use slog::Logger;
use std::{net::SocketAddr, slice};
use std::net::SocketAddr;
use std::slice;
use tokio::{executor::DefaultExecutor, runtime};

pub type Connection = network_grpc::client::Connection<BlockConfig>;

pub fn connect(
state: &ConnectionState,
) -> ConnectFuture<BlockConfig, SocketAddr, TcpConnector, DefaultExecutor> {
let addr = state.connection;
let origin = origin_authority(addr);
Connect::new(TcpConnector, DefaultExecutor::current())
.origin(uri::Scheme::HTTP, origin)
.node_id(state.global.node.id().clone())
.connect(addr)
addr: SocketAddr,
node_id: Option<NodeId>,
) -> ConnectFuture<BlockConfig, HttpConnector, DefaultExecutor> {
let uri = destination_uri(addr);
let mut connector = HttpConnector::new(1);
connector.set_nodelay(true);
let mut builder = Connect::new(connector, DefaultExecutor::current());
if let Some(id) = node_id {
builder.node_id(id);
}
builder.connect(Destination::try_from_uri(uri).unwrap())
}

fn destination_uri(addr: SocketAddr) -> Uri {
let uri = format!("http://{}:{}", addr.ip(), addr.port());
HttpTryFrom::try_from(&uri).unwrap()
}

// Fetches a block from a network peer in a one-off, blocking call.
@@ -33,11 +43,7 @@ pub fn fetch_block(
logger: &Logger,
) -> Result<Block, FetchBlockError> {
info!(logger, "fetching block {} from {}", hash, peer.connection);
let addr = peer.address();
let origin = origin_authority(addr);
let fetch = Connect::new(TcpConnector, DefaultExecutor::current())
.origin(uri::Scheme::HTTP, origin)
.connect(addr)
let fetch = connect(peer.address(), None)
.map_err(|err| FetchBlockError::Connect {
source: Box::new(err),
})
@@ -5,10 +5,6 @@ mod server;
use super::{p2p::topology as p2p, BlockConfig};
use crate::blockcfg::{Block, BlockDate, Header, HeaderHash};

use http::{uri, HttpTryFrom};

use std::net::SocketAddr;

pub use self::bootstrap::bootstrap_from_peer;
pub use self::client::{connect, fetch_block, Connection};
pub use self::server::run_listen_socket;
@@ -21,8 +17,3 @@ impl network_grpc::client::ProtocolConfig for BlockConfig {
type Node = p2p::Node;
type NodeId = p2p::NodeId;
}

fn origin_authority(addr: SocketAddr) -> uri::Authority {
let authority = format!("{}:{}", addr.ip(), addr.port());
HttpTryFrom::try_from(authority.as_str()).unwrap()
}

0 comments on commit 03587fc

Please sign in to comment.
You can’t perform that action at this time.