From c49519bba8d98ffee0c9d2fefec3cb630ce7d0c9 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Wed, 5 Dec 2018 15:24:52 -0500 Subject: [PATCH] Fix issue in newer tokio See https://github.com/tokio-rs/tokio/issues/790. --- noria-server/src/controller/mod.rs | 73 ++++++++++++------------------ noria/src/channel/mod.rs | 2 +- 2 files changed, 29 insertions(+), 46 deletions(-) diff --git a/noria-server/src/controller/mod.rs b/noria-server/src/controller/mod.rs index 37828b20e..c68b00178 100644 --- a/noria-server/src/controller/mod.rs +++ b/noria-server/src/controller/mod.rs @@ -17,6 +17,7 @@ use dataflow::{ }; use failure::{self, ResultExt}; use fnv::{FnvHashMap, FnvHashSet}; +use futures::stream::futures_unordered::FuturesUnordered; use futures::sync::mpsc::UnboundedSender; use futures::{self, Future, Sink, Stream}; use hyper::header::CONTENT_TYPE; @@ -31,7 +32,7 @@ use serde_json; use slog; use std::collections::{HashMap, VecDeque}; use std::fs; -use std::io::{self, ErrorKind}; +use std::io; use std::net::{IpAddr, SocketAddr}; use std::sync::{ atomic::{AtomicUsize, Ordering}, @@ -946,16 +947,6 @@ fn listen_external( server::Server::builder(on).serve(service) } -// NOTE: tokio::net::TcpStream doesn't expose underlying stream :( -fn set_nonblocking(s: &tokio::net::TcpStream, on: bool) { - use std::net::TcpStream; - use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; - let t = unsafe { TcpStream::from_raw_fd(s.as_raw_fd()) }; - t.set_nonblocking(on).unwrap(); - // avoid closing on Drop - t.into_raw_fd(); -} - fn instance_campaign( event_tx: UnboundedSender, authority: Arc, @@ -1115,6 +1106,7 @@ struct Replica { coord: Arc, incoming: Valved, + first_byte: FuturesUnordered>>, locals: futures::sync::mpsc::UnboundedReceiver>, inputs: StreamUnordered< DualTcpStream< @@ -1153,6 +1145,7 @@ impl Replica { coord: cc, domain, incoming: valve.wrap(on.incoming()), + first_byte: FuturesUnordered::new(), locals, log: log.new(o!{"id" => id}), inputs: Default::default(), @@ -1301,49 +1294,39 @@ impl Replica { } fn try_new(&mut self) -> io::Result { - 'more: while let Async::Ready(stream) = self.incoming.poll()? { + while let Async::Ready(stream) = self.incoming.poll()? { match stream { - Some(mut stream) => { + Some(stream) => { // we know that any new connection to a domain will first send a one-byte // token to indicate whether the connection is from a base or not. - set_nonblocking(&stream, false); - let mut tag = [0]; - use std::io::Read; - while let Err(e) = stream.read_exact(&mut tag[..]) { - if e.kind() == ErrorKind::WouldBlock { - // TODO: async - continue; - } - - // well.. that failed quickly.. - info!(self.log, "worker discarded new connection: {:?}", e); - continue 'more; - } - - let is_base = tag[0] == CONNECTION_FROM_BASE; - set_nonblocking(&stream, true); - - debug!(self.log, "accepted new connection"; "base" => ?is_base); - let slot = self.inputs.stream_slot(); - let token = slot.token(); - let tcp = if is_base { - DualTcpStream::upgrade(BufStream::new(stream), move |input| { - Box::new(Packet::Input { - inner: input, - src: Some(SourceChannelIdentifier { token }), - senders: Vec::new(), - }) - }) - } else { - BufStream::with_capacities(2 * 1024 * 1024, 4 * 1024, stream).into() - }; - slot.insert(tcp); + self.first_byte + .push(tokio::io::read_exact(stream, vec![0; 1])); } None => { return Ok(false); } } } + + while let Async::Ready(Some((stream, tag))) = self.first_byte.poll()? { + let is_base = tag[0] == CONNECTION_FROM_BASE; + + debug!(self.log, "accepted new connection"; "base" => ?is_base); + let slot = self.inputs.stream_slot(); + let token = slot.token(); + let tcp = if is_base { + DualTcpStream::upgrade(BufStream::new(stream), move |input| { + Box::new(Packet::Input { + inner: input, + src: Some(SourceChannelIdentifier { token }), + senders: Vec::new(), + }) + }) + } else { + BufStream::with_capacities(2 * 1024 * 1024, 4 * 1024, stream).into() + }; + slot.insert(tcp); + } Ok(true) } diff --git a/noria/src/channel/mod.rs b/noria/src/channel/mod.rs index a48a3a95e..5295eb62e 100644 --- a/noria/src/channel/mod.rs +++ b/noria/src/channel/mod.rs @@ -23,7 +23,7 @@ pub mod tcp; pub use self::tcp::{channel, DualTcpStream, TcpReceiver, TcpSender}; pub const CONNECTION_FROM_BASE: u8 = 1; -pub const CONNECTION_FROM_DOMAIN: u8 = 0; +pub const CONNECTION_FROM_DOMAIN: u8 = 2; pub struct Remote; pub struct MaybeLocal;