Skip to content

Commit

Permalink
A partial fix for issue #1191. Do not panic mio on sockstate error, i…
Browse files Browse the repository at this point in the history
…nstead keep the err sock states and just retry them later

Signed-off-by: Daniel Tacalau <dst4096@gmail.com>
  • Loading branch information
dtacalau authored and Thomasdezeeuw committed Mar 1, 2020
1 parent 653b6cf commit 10df3e1
Showing 1 changed file with 28 additions and 7 deletions.
35 changes: 28 additions & 7 deletions src/sys/windows/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,19 @@ pub struct SockState {
poll_status: SockPollStatus,
delete_pending: bool,

// last raw os error
error: Option<i32>,

pinned: PhantomPinned,
}

impl SockState {
fn update(&mut self, self_arc: &Pin<Arc<Mutex<SockState>>>) -> io::Result<()> {
assert!(!self.delete_pending);

// make sure to reset previous error before a new update
self.error = None;

if let SockPollStatus::Pending = self.poll_status {
if (self.user_evts & afd::KNOWN_EVENTS & !self.pending_evts) == 0 {
/* All the events the user is interested in are already being monitored by
Expand All @@ -122,7 +128,11 @@ impl SockState {
* events that the user is interested in. Therefore, cancel the pending
* poll operation; when we receive it's completion package, a new poll
* operation will be submitted with the correct event mask. */
self.cancel()?;
if let Err(e) = self.cancel() {
self.error = e.raw_os_error();
return Err(e);
}
return Ok(());
}
} else if let SockPollStatus::Cancelled = self.poll_status {
/* The poll operation has already been cancelled, we're still waiting for
Expand Down Expand Up @@ -156,6 +166,7 @@ impl SockState {
self.mark_delete();
return Ok(());
} else {
self.error = e.raw_os_error();
return Err(e);
}
}
Expand All @@ -166,6 +177,7 @@ impl SockState {
} else {
unreachable!("Invalid poll status during update, {:#?}", self)
}

Ok(())
}

Expand Down Expand Up @@ -246,6 +258,10 @@ impl SockState {
self.delete_pending = true;
}
}

fn has_error(&self) -> bool {
self.error.is_some()
}
}

cfg_net! {
Expand All @@ -262,6 +278,7 @@ cfg_net! {
user_data: 0,
poll_status: SockPollStatus::Idle,
delete_pending: false,
error: None,
pinned: PhantomPinned,
})
}
Expand Down Expand Up @@ -455,16 +472,18 @@ impl SelectorInner {

unsafe fn update_sockets_events(&self) -> io::Result<()> {
let mut update_queue = self.update_queue.lock().unwrap();
loop {
let sock = match update_queue.pop_front() {
Some(sock) => sock,
None => break,
};
for sock in update_queue.iter_mut() {
let mut sock_internal = sock.lock().unwrap();
if !sock_internal.is_pending_deletion() {
sock_internal.update(&sock).unwrap();
let _ = sock_internal.update(&sock);
}
}

// remove all sock which do not have error, they have afd op pending
update_queue.retain(|sock| {
sock.lock().unwrap().has_error()
});

self.afd_group.release_unused_afd();
Ok(())
}
Expand Down Expand Up @@ -561,6 +580,8 @@ cfg_net! {
state.lock().unwrap().set_event(event);
}

// FIXME: a sock which has_error true should not be re-added to
// the update queue because it's already there.
self.queue_state(state);
unsafe { self.update_sockets_events_if_polling() }
}
Expand Down

0 comments on commit 10df3e1

Please sign in to comment.