From 211223e92865e5ae276333cc91673d764bbc386d Mon Sep 17 00:00:00 2001 From: PerfectLaugh Date: Thu, 8 Aug 2019 03:09:20 +0800 Subject: [PATCH] Improve overlapped deallocation strategy and docs (#1042) --- src/sys/windows/afd.rs | 2 + src/sys/windows/selector.rs | 109 +++++++++++++++++++++++++++--------- 2 files changed, 86 insertions(+), 25 deletions(-) diff --git a/src/sys/windows/afd.rs b/src/sys/windows/afd.rs index 88f9d439d..e1f4548f7 100644 --- a/src/sys/windows/afd.rs +++ b/src/sys/windows/afd.rs @@ -152,6 +152,7 @@ impl Afd { /// This function is unsafe due to memory of `IO_STATUS_BLOCK` still being used by `Afd` instance while `Ok(false)` (`STATUS_PENDING`). /// `iosb` needs to be untouched after the call while operation is in effective at ALL TIME except for `cancel` method. /// So be careful not to `poll` twice while polling. + /// User should deallocate there overlapped value when error to prevent memory leak. pub unsafe fn poll( &self, info: &mut AfdPollInfo, @@ -189,6 +190,7 @@ impl Afd { /// /// This function is unsafe due to memory of `IO_STATUS_BLOCK` still being used by `Afd` instance while `Ok(false)` (`STATUS_PENDING`). /// Use it only with request is still being polled so that you have valid `IO_STATUS_BLOCK` to use. + /// User should NOT deallocate there overlapped value after the `cancel` to prevent double free. pub unsafe fn cancel(&self, iosb: *mut IO_STATUS_BLOCK) -> io::Result<()> { if (*iosb).u.Status != STATUS_PENDING { return Ok(()); diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs index 603f1d25b..adec47713 100644 --- a/src/sys/windows/selector.rs +++ b/src/sys/windows/selector.rs @@ -33,7 +33,39 @@ use super::SocketState; const POLL_GROUP__MAX_GROUP_SIZE: usize = 32; -#[derive(PartialEq, Debug, Clone, Copy)] +/// This is the deallocation wrapper for overlapped pointer. +/// In case of error or status changing before the overlapped pointer is actually used(or not even being used), +/// this wrapper will decrease the reference count of Arc if being dropped. +/// Remember call `forget` if you have used the Arc, or you could decrease the reference count by two causing double free. +#[derive(Debug)] +struct OverlappedArcWrapper(*const T); + +unsafe impl Send for OverlappedArcWrapper {} + +impl OverlappedArcWrapper { + fn new(arc: &Arc) -> OverlappedArcWrapper { + OverlappedArcWrapper(Arc::into_raw(arc.clone())) + } + + fn forget(&mut self) { + self.0 = 0 as *const T; + } + + fn get_ptr(&self) -> *const T { + self.0 + } +} + +impl Drop for OverlappedArcWrapper { + fn drop(&mut self) { + if self.0 as usize == 0 { + return; + } + drop(unsafe { Arc::from_raw(self.0) }); + } +} + +#[derive(Debug)] enum SockPollStatus { Idle, Pending, @@ -45,12 +77,18 @@ pub struct SockState { iosb: Pin>, poll_info: AfdPollInfo, afd: Arc, + raw_socket: RawSocket, base_socket: RawSocket, + user_evts: u32, pending_evts: u32, + user_data: u64, + poll_status: SockPollStatus, + self_wrapped: Option>>, + delete_pending: bool, } @@ -66,6 +104,7 @@ impl SockState { pending_evts: 0, user_data: 0, poll_status: SockPollStatus::Idle, + self_wrapped: None, delete_pending: false, }) } @@ -84,25 +123,25 @@ impl SockState { fn update(&mut self, self_arc: &Arc>) -> io::Result<()> { assert!(!self.delete_pending); - if self.poll_status == SockPollStatus::Pending - && (self.user_evts & KNOWN_AFD_EVENTS & !self.pending_evts) == 0 - { - /* All the events the user is interested in are already being monitored by - * the pending poll operation. It might spuriously complete because of an - * event that we're no longer interested in; when that happens we'll submit - * a new poll operation with the updated event mask. */ - } else if self.poll_status == SockPollStatus::Pending { - /* A poll operation is already pending, but it's not monitoring for all the - * events that the user is interested in. Therefore, cancel the pending - * poll operation; when we receive it's completion package, a new poll - * operation will be submitted with the correct event mask. */ - self.cancel()?; - return Ok(()); - } else if self.poll_status == SockPollStatus::Cancelled { + if let SockPollStatus::Pending = self.poll_status { + if (self.user_evts & KNOWN_AFD_EVENTS & !self.pending_evts) == 0 { + /* All the events the user is interested in are already being monitored by + * the pending poll operation. It might spuriously complete because of an + * event that we're no longer interested in; when that happens we'll submit + * a new poll operation with the updated event mask. */ + } else { + /* A poll operation is already pending, but it's not monitoring for all the + * events that the user is interested in. Therefore, cancel the pending + * poll operation; when we receive it's completion package, a new poll + * operation will be submitted with the correct event mask. */ + self.cancel()?; + return Ok(()); + } + } else if let SockPollStatus::Cancelled = self.poll_status { /* The poll operation has already been cancelled, we're still waiting for * it to return. For now, there's nothing that needs to be done. */ return Ok(()); - } else if self.poll_status == SockPollStatus::Idle { + } else if let SockPollStatus::Idle = self.poll_status { /* No poll operation is pending; start one. */ self.poll_info.exclusive = 0; self.poll_info.number_of_handles = 1; @@ -113,7 +152,8 @@ impl SockState { self.poll_info.handles[0].status = 0; self.poll_info.handles[0].events = self.user_evts; - let overlapped = Arc::into_raw(self_arc.clone()) as *const _ as PVOID; + let wrapped_overlapped = OverlappedArcWrapper::new(self_arc); + let overlapped = wrapped_overlapped.get_ptr() as *const _ as PVOID; let result = unsafe { self.afd .poll(&mut self.poll_info, (*self.iosb).as_mut_ptr(), overlapped) @@ -133,6 +173,11 @@ impl SockState { } self.poll_status = SockPollStatus::Pending; + if self.self_wrapped.is_some() { + // This shouldn't be happening. We cannot deallocate already pending overlapped before feed_event so we need to stand out here to declare unreachable. + unreachable!(); + } + self.self_wrapped = Some(wrapped_overlapped); self.pending_evts = self.user_evts; } else { unreachable!(); @@ -141,7 +186,10 @@ impl SockState { } fn cancel(&mut self) -> io::Result<()> { - assert!(self.poll_status == SockPollStatus::Pending); + match self.poll_status { + SockPollStatus::Pending => {} + _ => unreachable!(), + }; unsafe { self.afd.cancel((*self.iosb).as_mut_ptr())?; } @@ -152,7 +200,7 @@ impl SockState { fn mark_delete(&mut self) { if !self.delete_pending { - if self.poll_status == SockPollStatus::Pending { + if let SockPollStatus::Pending = self.poll_status { drop(self.cancel()); } @@ -160,11 +208,18 @@ impl SockState { } } + // This is the function called from the overlapped using as Arc>. Watch out for reference counting. fn feed_event(&mut self) -> Option { - let mut afd_events = 0; + if self.self_wrapped.is_some() { + // Forget our arced-self first. We will decrease the reference count by two if we don't do this on overlapped. + self.self_wrapped.as_mut().unwrap().forget(); + self.self_wrapped = None; + } + self.poll_status = SockPollStatus::Idle; self.pending_evts = 0; + let mut afd_events = 0; // We use the status info in IO_STATUS_BLOCK to determine the socket poll status. It is unsafe to use a pointer of IO_STATUS_BLOCK. unsafe { let iosb = &*(*self.iosb).as_ptr(); @@ -191,6 +246,10 @@ impl SockState { return None; } + // In mio, we have to simulate Edge-triggered behavior to match API usage. + // The strategy here is to intercept all read/write from user that could cause WouldBlock usage, + // then reregister the socket to reset the interests. + // Reset readable event if (afd_events & (KNOWN_AFD_EVENTS & !AFD_POLL_SEND)) != 0 { self.user_evts &= !(afd_events & (KNOWN_AFD_EVENTS & !AFD_POLL_SEND)); @@ -433,9 +492,9 @@ impl SelectorInner { }); continue; } - let sock = - unsafe { Arc::from_raw(iocp_event.overlapped() as *mut Mutex) }; - let mut sock_guard = sock.lock().unwrap(); + let sock_arc = + unsafe { Arc::from_raw(iocp_event.overlapped() as *const Mutex) }; + let mut sock_guard = sock_arc.lock().unwrap(); match sock_guard.feed_event() { Some(e) => { events.push(e); @@ -443,7 +502,7 @@ impl SelectorInner { None => {} } if !sock_guard.is_pending_deletion() { - update_queue.push_back(sock.clone()); + update_queue.push_back(sock_arc.clone()); } } }