Skip to content

Commit

Permalink
Fix issue in newer tokio
Browse files Browse the repository at this point in the history
  • Loading branch information
jonhoo committed Dec 5, 2018
1 parent 4b11795 commit c49519b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 46 deletions.
73 changes: 28 additions & 45 deletions noria-server/src/controller/mod.rs
Expand Up @@ -17,6 +17,7 @@ use dataflow::{
};
use failure::{self, ResultExt};
use fnv::{FnvHashMap, FnvHashSet};
use futures::stream::futures_unordered::FuturesUnordered;
use futures::sync::mpsc::UnboundedSender;
use futures::{self, Future, Sink, Stream};
use hyper::header::CONTENT_TYPE;
Expand All @@ -31,7 +32,7 @@ use serde_json;
use slog;
use std::collections::{HashMap, VecDeque};
use std::fs;
use std::io::{self, ErrorKind};
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -946,16 +947,6 @@ fn listen_external<A: Authority + 'static>(
server::Server::builder(on).serve(service)
}

// NOTE: tokio::net::TcpStream doesn't expose underlying stream :(
fn set_nonblocking(s: &tokio::net::TcpStream, on: bool) {
use std::net::TcpStream;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
let t = unsafe { TcpStream::from_raw_fd(s.as_raw_fd()) };
t.set_nonblocking(on).unwrap();
// avoid closing on Drop
t.into_raw_fd();
}

fn instance_campaign<A: Authority + 'static>(
event_tx: UnboundedSender<Event>,
authority: Arc<A>,
Expand Down Expand Up @@ -1115,6 +1106,7 @@ struct Replica {
coord: Arc<ChannelCoordinator>,

incoming: Valved<tokio::net::tcp::Incoming>,
first_byte: FuturesUnordered<tokio::io::ReadExact<tokio::net::tcp::TcpStream, Vec<u8>>>,
locals: futures::sync::mpsc::UnboundedReceiver<Box<Packet>>,
inputs: StreamUnordered<
DualTcpStream<
Expand Down Expand Up @@ -1153,6 +1145,7 @@ impl Replica {
coord: cc,
domain,
incoming: valve.wrap(on.incoming()),
first_byte: FuturesUnordered::new(),
locals,
log: log.new(o!{"id" => id}),
inputs: Default::default(),
Expand Down Expand Up @@ -1301,49 +1294,39 @@ impl Replica {
}

fn try_new(&mut self) -> io::Result<bool> {
'more: while let Async::Ready(stream) = self.incoming.poll()? {
while let Async::Ready(stream) = self.incoming.poll()? {
match stream {
Some(mut stream) => {
Some(stream) => {
// we know that any new connection to a domain will first send a one-byte
// token to indicate whether the connection is from a base or not.
set_nonblocking(&stream, false);
let mut tag = [0];
use std::io::Read;
while let Err(e) = stream.read_exact(&mut tag[..]) {
if e.kind() == ErrorKind::WouldBlock {
// TODO: async
continue;
}

// well.. that failed quickly..
info!(self.log, "worker discarded new connection: {:?}", e);
continue 'more;
}

let is_base = tag[0] == CONNECTION_FROM_BASE;
set_nonblocking(&stream, true);

debug!(self.log, "accepted new connection"; "base" => ?is_base);
let slot = self.inputs.stream_slot();
let token = slot.token();
let tcp = if is_base {
DualTcpStream::upgrade(BufStream::new(stream), move |input| {
Box::new(Packet::Input {
inner: input,
src: Some(SourceChannelIdentifier { token }),
senders: Vec::new(),
})
})
} else {
BufStream::with_capacities(2 * 1024 * 1024, 4 * 1024, stream).into()
};
slot.insert(tcp);
self.first_byte
.push(tokio::io::read_exact(stream, vec![0; 1]));
}
None => {
return Ok(false);
}
}
}

while let Async::Ready(Some((stream, tag))) = self.first_byte.poll()? {
let is_base = tag[0] == CONNECTION_FROM_BASE;

debug!(self.log, "accepted new connection"; "base" => ?is_base);
let slot = self.inputs.stream_slot();
let token = slot.token();
let tcp = if is_base {
DualTcpStream::upgrade(BufStream::new(stream), move |input| {
Box::new(Packet::Input {
inner: input,
src: Some(SourceChannelIdentifier { token }),
senders: Vec::new(),
})
})
} else {
BufStream::with_capacities(2 * 1024 * 1024, 4 * 1024, stream).into()
};
slot.insert(tcp);
}
Ok(true)
}

Expand Down
2 changes: 1 addition & 1 deletion noria/src/channel/mod.rs
Expand Up @@ -23,7 +23,7 @@ pub mod tcp;
pub use self::tcp::{channel, DualTcpStream, TcpReceiver, TcpSender};

pub const CONNECTION_FROM_BASE: u8 = 1;
pub const CONNECTION_FROM_DOMAIN: u8 = 0;
pub const CONNECTION_FROM_DOMAIN: u8 = 2;

pub struct Remote;
pub struct MaybeLocal;
Expand Down

0 comments on commit c49519b

Please sign in to comment.