Networking refactoring #1172
Networking refactoring #1172
Conversation
/// Get remote peer address | ||
pub fn remote_addr(&self) -> io::Result<SocketAddr> { | ||
self.socket.peer_addr() | ||
} | ||
|
||
/// Get remote peer address string | ||
pub fn remote_addr_str(&self) -> String { | ||
self.socket.peer_addr().map(|a| a.to_string()).unwrap_or_else(|_| "Unkwnown".to_owned()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo.
} | ||
} | ||
} | ||
self.num_sessions.fetch_add(1, AtomicOrdering::Relaxed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be SeqCst
(or at least something more then Relaxed
)?
Isn't using Relaxed
here a potential overflow? (when fetch_sub
is called)
(sorry if it's a lame question :))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decrement is always atomic, Relaxed
is fine here because we don't use the returned value and other threads that read the value do not require any synchronization on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the Relaxed
ordering would lead to potentially incorrect values being returned from session_count
. This doesn't pose a problem on the x86 or x86_64 memory model, so any bugs introduced from Relaxed
atomics could be hidden from most of us.
The performance gains from Relaxed
vs SeqCst
are not that significant, so why not trend towards the safe side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no synchronization on the session count, so there is no potential for races. It is perfectly fine if other threads see an updated value with a delay.
@@ -477,19 +474,19 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | |||
} | |||
|
|||
fn have_session(&self, id: &NodeId) -> bool { | |||
self.sessions.read().unwrap().iter().any(|e| e.lock().unwrap().info.id.eq(&id)) | |||
self.sessions.read().unwrap().iter().any(|e| e.lock().unwrap().info.id.eq(&Some(id.clone()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eq vs ==?
@@ -205,7 +205,7 @@ impl Connection { | |||
/// Update connection registration. Should be called at the end of the IO handler. | |||
pub fn update_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> { | |||
trace!(target: "network", "connection reregister; token={:?}", reg); | |||
event_loop.reregister( &self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { | |||
event_loop.reregister( &self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */ ).or_else(|e| { // TODO: oneshot is broken on windows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what ramifications are there for other platforms with this removal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No major issues, except some IO events are now generated earlier than required, and automatically. "oneshot" mode requires updating registration after each event handler and gives more precise control so that events don't queue up, but in our case it does not matter much since the handler is not a bottleneck.
would be lovely to see some intergration tests for this :-) |
There are integration tests in |
@@ -790,12 +722,13 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | |||
if s.is_ready() { | |||
for (p, _) in self.handlers.read().unwrap().iter() { | |||
if s.have_capability(p) { | |||
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst); | |||
to_disconnect.push(p); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trace/debug to help with the reason of disconnect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is logged before calling kill_connection
Refactoring required for windows compatibility. Windows async IO does not support changing the registration token ID for a socket, and that's what we were doing when promoting a
Handshake
object to aSession
, This refactors theSessions
to manage the handshake and use the same registration ID.