Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Zombie connections
Browse files Browse the repository at this point in the history
  • Loading branch information
arkpar committed Feb 20, 2016
1 parent 69df91d commit 6e81d72
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 59 deletions.
6 changes: 3 additions & 3 deletions util/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ impl Connection {
/// Register this connection with the IO event loop.
pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection register; token={:?}", reg);
event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
if let Err(e) = event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()) {
debug!("Failed to register {:?}, {:?}", reg, e);
Ok(())
})
}
Ok(())
}

/// Update connection registration. Should be called at the end of the IO handler.
Expand Down
73 changes: 47 additions & 26 deletions util/src/network/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ pub struct Handshake {
/// A copy of received encryped auth packet
pub auth_cipher: Bytes,
/// A copy of received encryped ack packet
pub ack_cipher: Bytes
pub ack_cipher: Bytes,
/// This Handshake is marked for deleteion flag
pub expired: bool,
}

const AUTH_PACKET_SIZE: usize = 307;
Expand All @@ -84,6 +86,7 @@ impl Handshake {
remote_nonce: H256::new(),
auth_cipher: Bytes::new(),
ack_cipher: Bytes::new(),
expired: false,
})
}

Expand All @@ -97,6 +100,16 @@ impl Handshake {
self.connection.token()
}

/// Mark this handshake as inactive to be deleted lated.
pub fn set_expired(&mut self) {
self.expired = true;
}

/// Check if this handshake is expired.
pub fn expired(&self) -> bool {
self.expired
}

/// Start a handhsake
pub fn start<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo, originated: bool) -> Result<(), UtilError> where Message: Send + Clone{
self.originated = originated;
Expand All @@ -118,47 +131,55 @@ impl Handshake {

/// Readable IO handler. Drives the state change.
pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone {
io.clear_timer(self.connection.token).unwrap();
match self.state {
HandshakeState::ReadingAuth => {
if let Some(data) = try!(self.connection.readable()) {
try!(self.read_auth(host, &data));
try!(self.write_ack());
};
},
HandshakeState::ReadingAck => {
if let Some(data) = try!(self.connection.readable()) {
try!(self.read_ack(host, &data));
self.state = HandshakeState::StartSession;
};
},
HandshakeState::StartSession => {},
_ => { panic!("Unexpected state"); }
}
if self.state != HandshakeState::StartSession {
try!(io.update_registration(self.connection.token));
if !self.expired() {
io.clear_timer(self.connection.token).unwrap();
match self.state {
HandshakeState::ReadingAuth => {
if let Some(data) = try!(self.connection.readable()) {
try!(self.read_auth(host, &data));
try!(self.write_ack());
};
},
HandshakeState::ReadingAck => {
if let Some(data) = try!(self.connection.readable()) {
try!(self.read_ack(host, &data));
self.state = HandshakeState::StartSession;
};
},
HandshakeState::StartSession => {},
_ => { panic!("Unexpected state"); }
}
if self.state != HandshakeState::StartSession {
try!(io.update_registration(self.connection.token));
}
}
Ok(())
}

/// Writabe IO handler.
pub fn writable<Message>(&mut self, io: &IoContext<Message>, _host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone {
io.clear_timer(self.connection.token).unwrap();
try!(self.connection.writable());
if self.state != HandshakeState::StartSession {
io.update_registration(self.connection.token).unwrap();
if !self.expired() {
io.clear_timer(self.connection.token).unwrap();
try!(self.connection.writable());
if self.state != HandshakeState::StartSession {
io.update_registration(self.connection.token).unwrap();
}
}
Ok(())
}

/// Register the socket with the event loop
pub fn register_socket<Host:Handler<Timeout=Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
try!(self.connection.register_socket(reg, event_loop));
if !self.expired() {
try!(self.connection.register_socket(reg, event_loop));
}
Ok(())
}

pub fn update_socket<Host:Handler<Timeout=Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
try!(self.connection.update_socket(reg, event_loop));
if !self.expired() {
try!(self.connection.update_socket(reg, event_loop));
}
Ok(())
}

Expand Down
71 changes: 42 additions & 29 deletions util/src/network/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,11 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
let session = { self.sessions.read().unwrap().get(peer).cloned() };
if let Some(session) = session {
session.lock().unwrap().deref_mut().send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| {
warn!(target: "net", "Send error: {:?}", e);
warn!(target: "network", "Send error: {:?}", e);
}); //TODO: don't copy vector data
try!(self.io.update_registration(peer));
} else {
trace!(target: "net", "Send: Peer no longer exist")
trace!(target: "network", "Send: Peer no longer exist")
}
Ok(())
}
Expand Down Expand Up @@ -470,18 +470,18 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
.take(min(MAX_HANDSHAKES_PER_ROUND, handshake_limit - handshake_count)) {
self.connect_peer(&id, io);
}
debug!(target: "net", "Connecting peers: {} sessions, {} pending", self.session_count(), self.handshake_count());
debug!(target: "network", "Connecting peers: {} sessions, {} pending", self.session_count(), self.handshake_count());
}

#[cfg_attr(feature="dev", allow(single_match))]
fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage<Message>>) {
if self.have_session(id)
{
trace!("Aborted connect. Node already connected.");
trace!(target: "network", "Aborted connect. Node already connected.");
return;
}
if self.connecting_to(id) {
trace!("Aborted connect. Node already connecting.");
trace!(target: "network", "Aborted connect. Node already connecting.");
return;
}

Expand All @@ -493,7 +493,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
node.endpoint.address
}
else {
debug!("Connection to expired node aborted");
debug!(target: "network", "Connection to expired node aborted");
return;
}
};
Expand All @@ -515,16 +515,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
if handshakes.insert_with(|token| {
let mut handshake = Handshake::new(token, id, socket, &nonce, self.stats.clone()).expect("Can't create handshake");
handshake.start(io, &self.info.read().unwrap(), id.is_some()).and_then(|_| io.register_stream(token)).unwrap_or_else (|e| {
debug!(target: "net", "Handshake create error: {:?}", e);
debug!(target: "network", "Handshake create error: {:?}", e);
});
Arc::new(Mutex::new(handshake))
}).is_none() {
debug!("Max handshakes reached");
debug!(target: "network", "Max handshakes reached");
}
}

fn accept(&self, io: &IoContext<NetworkIoMessage<Message>>) {
trace!(target: "net", "accept");
trace!(target: "network", "Accepting incoming connection");
loop {
let socket = match self.tcp_listener.lock().unwrap().accept() {
Ok(None) => break,
Expand All @@ -544,7 +544,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
if let Some(handshake) = handshake {
let mut h = handshake.lock().unwrap();
if let Err(e) = h.writable(io, &self.info.read().unwrap()) {
trace!(target: "net", "Handshake write error: {}: {:?}", token, e);
trace!(target: "network", "Handshake write error: {}: {:?}", token, e);
}
}
}
Expand All @@ -554,9 +554,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
if let Some(session) = session {
let mut s = session.lock().unwrap();
if let Err(e) = s.writable(io, &self.info.read().unwrap()) {
trace!(target: "net", "Session write error: {}: {:?}", token, e);
trace!(target: "network", "Session write error: {}: {:?}", token, e);
}
io.update_registration(token).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Session registration error: {:?}", e));
}
}

Expand All @@ -571,7 +571,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
if let Some(handshake) = handshake {
let mut h = handshake.lock().unwrap();
if let Err(e) = h.readable(io, &self.info.read().unwrap()) {
debug!(target: "net", "Handshake read error: {}: {:?}", token, e);
debug!(target: "network", "Handshake read error: {}: {:?}", token, e);
kill = true;
}
if h.done() {
Expand All @@ -585,7 +585,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
self.start_session(token, io);
return;
}
io.update_registration(token).unwrap_or_else(|e| debug!(target: "net", "Token registration error: {:?}", e));
io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Token registration error: {:?}", e));
}

fn session_readable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
Expand All @@ -597,7 +597,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let mut s = session.lock().unwrap();
match s.readable(io, &self.info.read().unwrap()) {
Err(e) => {
debug!(target: "net", "Session read error: {}: {:?}", token, e);
debug!(target: "network", "Session read error: {}: {:?}", token, e);
kill = true;
},
Ok(SessionData::Ready) => {
Expand All @@ -613,7 +613,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
packet_id,
}) => {
match self.handlers.read().unwrap().get(protocol) {
None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) },
None => { warn!(target: "network", "No handler found for protocol: {:?}", protocol) },
Some(_) => packet_data = Some((protocol, packet_id, data)),
}
},
Expand All @@ -631,7 +631,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
h.read(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token, packet_id, &data[1..]);
}
io.update_registration(token).unwrap_or_else(|e| debug!(target: "net", "Token registration error: {:?}", e));
io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Token registration error: {:?}", e));
}

fn start_session(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
Expand All @@ -643,19 +643,24 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
// turn a handshake into a session
let mut sessions = self.sessions.write().unwrap();
let mut h = handshakes.get_mut(token).unwrap().lock().unwrap();
if h.expired {
return;
}
let originated = h.originated;
let mut session = match Session::new(&mut h, &self.info.read().unwrap()) {
Ok(s) => s,
Err(e) => {
debug!("Session creation error: {:?}", e);
debug!(target: "network", "Session creation error: {:?}", e);
return;
}
};
let result = sessions.insert_with(move |session_token| {
session.set_token(session_token);
io.deregister_stream(token).expect("Error deleting handshake registration");
h.set_expired();
io.register_stream(session_token).expect("Error creating session registration");
self.stats.inc_sessions();
trace!(target: "network", "Creating session {} -> {}", token, session_token);
if !originated {
// Add it no node table
if let Ok(address) = session.remote_addr() {
Expand Down Expand Up @@ -684,26 +689,34 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
FIRST_HANDSHAKE ... LAST_HANDSHAKE => {
let handshakes = self.handshakes.write().unwrap();
if let Some(handshake) = handshakes.get(token).cloned() {
failure_id = Some(handshake.lock().unwrap().id().clone());
let mut handshake = handshake.lock().unwrap();
if !handshake.expired() {
handshake.set_expired();
failure_id = Some(handshake.id().clone());
io.deregister_stream(token).expect("Error deregistering stream");
}
}
},
FIRST_SESSION ... LAST_SESSION => {
let sessions = self.sessions.write().unwrap();
if let Some(session) = sessions.get(token).cloned() {
let s = session.lock().unwrap();
if s.is_ready() {
for (p, _) in self.handlers.read().unwrap().iter() {
if s.have_capability(p) {
to_disconnect.push(p);
let mut s = session.lock().unwrap();
if !s.expired() {
if s.is_ready() {
for (p, _) in self.handlers.read().unwrap().iter() {
if s.have_capability(p) {
to_disconnect.push(p);
}
}
}
s.set_expired();
failure_id = Some(s.id().clone());
io.deregister_stream(token).expect("Error deregistering stream");
}
failure_id = Some(s.id().clone());
}
},
_ => {},
}
io.deregister_stream(token).expect("Error deregistering stream");
if let Some(id) = failure_id {
if remote {
self.nodes.write().unwrap().note_failure(&id);
Expand Down Expand Up @@ -758,11 +771,11 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
}

fn stream_hup(&self, io: &IoContext<NetworkIoMessage<Message>>, stream: StreamToken) {
trace!(target: "net", "Hup: {}", stream);
trace!(target: "network", "Hup: {}", stream);
match stream {
FIRST_SESSION ... LAST_SESSION => self.connection_closed(stream, io),
FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.connection_closed(stream, io),
_ => warn!(target: "net", "Unexpected hup"),
_ => warn!(target: "network", "Unexpected hup"),
};
}

Expand Down Expand Up @@ -810,7 +823,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
},
_ => match self.timers.read().unwrap().get(&token).cloned() {
Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() {
None => { warn!(target: "net", "No handler found for protocol: {:?}", timer.protocol) },
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); }
},
None => { warn!("Unknown timer token: {}", token); } // timer is not registerd through us
Expand Down

0 comments on commit 6e81d72

Please sign in to comment.