Skip to content

Commit

Permalink
Improve overlapped deallocation strategy and docs (#1042)
Browse files Browse the repository at this point in the history
  • Loading branch information
PerfectLaugh authored and carllerche committed Aug 7, 2019
1 parent cd7331a commit 211223e
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 25 deletions.
2 changes: 2 additions & 0 deletions src/sys/windows/afd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ impl Afd {
/// This function is unsafe due to memory of `IO_STATUS_BLOCK` still being used by `Afd` instance while `Ok(false)` (`STATUS_PENDING`).
/// `iosb` needs to be untouched after the call while operation is in effective at ALL TIME except for `cancel` method.
/// So be careful not to `poll` twice while polling.
/// User should deallocate there overlapped value when error to prevent memory leak.
pub unsafe fn poll(
&self,
info: &mut AfdPollInfo,
Expand Down Expand Up @@ -189,6 +190,7 @@ impl Afd {
///
/// This function is unsafe due to memory of `IO_STATUS_BLOCK` still being used by `Afd` instance while `Ok(false)` (`STATUS_PENDING`).
/// Use it only with request is still being polled so that you have valid `IO_STATUS_BLOCK` to use.
/// User should NOT deallocate there overlapped value after the `cancel` to prevent double free.
pub unsafe fn cancel(&self, iosb: *mut IO_STATUS_BLOCK) -> io::Result<()> {
if (*iosb).u.Status != STATUS_PENDING {
return Ok(());
Expand Down
109 changes: 84 additions & 25 deletions src/sys/windows/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,39 @@ use super::SocketState;

const POLL_GROUP__MAX_GROUP_SIZE: usize = 32;

#[derive(PartialEq, Debug, Clone, Copy)]
/// This is the deallocation wrapper for overlapped pointer.
/// In case of error or status changing before the overlapped pointer is actually used(or not even being used),
/// this wrapper will decrease the reference count of Arc if being dropped.
/// Remember call `forget` if you have used the Arc, or you could decrease the reference count by two causing double free.
#[derive(Debug)]
struct OverlappedArcWrapper<T>(*const T);

unsafe impl<T> Send for OverlappedArcWrapper<T> {}

impl<T> OverlappedArcWrapper<T> {
fn new(arc: &Arc<T>) -> OverlappedArcWrapper<T> {
OverlappedArcWrapper(Arc::into_raw(arc.clone()))
}

fn forget(&mut self) {
self.0 = 0 as *const T;
}

fn get_ptr(&self) -> *const T {
self.0
}
}

impl<T> Drop for OverlappedArcWrapper<T> {
fn drop(&mut self) {
if self.0 as usize == 0 {
return;
}
drop(unsafe { Arc::from_raw(self.0) });
}
}

#[derive(Debug)]
enum SockPollStatus {
Idle,
Pending,
Expand All @@ -45,12 +77,18 @@ pub struct SockState {
iosb: Pin<Box<IoStatusBlock>>,
poll_info: AfdPollInfo,
afd: Arc<Afd>,

raw_socket: RawSocket,
base_socket: RawSocket,

user_evts: u32,
pending_evts: u32,

user_data: u64,

poll_status: SockPollStatus,
self_wrapped: Option<OverlappedArcWrapper<Mutex<SockState>>>,

delete_pending: bool,
}

Expand All @@ -66,6 +104,7 @@ impl SockState {
pending_evts: 0,
user_data: 0,
poll_status: SockPollStatus::Idle,
self_wrapped: None,
delete_pending: false,
})
}
Expand All @@ -84,25 +123,25 @@ impl SockState {
fn update(&mut self, self_arc: &Arc<Mutex<SockState>>) -> io::Result<()> {
assert!(!self.delete_pending);

if self.poll_status == SockPollStatus::Pending
&& (self.user_evts & KNOWN_AFD_EVENTS & !self.pending_evts) == 0
{
/* All the events the user is interested in are already being monitored by
* the pending poll operation. It might spuriously complete because of an
* event that we're no longer interested in; when that happens we'll submit
* a new poll operation with the updated event mask. */
} else if self.poll_status == SockPollStatus::Pending {
/* A poll operation is already pending, but it's not monitoring for all the
* events that the user is interested in. Therefore, cancel the pending
* poll operation; when we receive it's completion package, a new poll
* operation will be submitted with the correct event mask. */
self.cancel()?;
return Ok(());
} else if self.poll_status == SockPollStatus::Cancelled {
if let SockPollStatus::Pending = self.poll_status {
if (self.user_evts & KNOWN_AFD_EVENTS & !self.pending_evts) == 0 {
/* All the events the user is interested in are already being monitored by
* the pending poll operation. It might spuriously complete because of an
* event that we're no longer interested in; when that happens we'll submit
* a new poll operation with the updated event mask. */
} else {
/* A poll operation is already pending, but it's not monitoring for all the
* events that the user is interested in. Therefore, cancel the pending
* poll operation; when we receive it's completion package, a new poll
* operation will be submitted with the correct event mask. */
self.cancel()?;
return Ok(());
}
} else if let SockPollStatus::Cancelled = self.poll_status {
/* The poll operation has already been cancelled, we're still waiting for
* it to return. For now, there's nothing that needs to be done. */
return Ok(());
} else if self.poll_status == SockPollStatus::Idle {
} else if let SockPollStatus::Idle = self.poll_status {
/* No poll operation is pending; start one. */
self.poll_info.exclusive = 0;
self.poll_info.number_of_handles = 1;
Expand All @@ -113,7 +152,8 @@ impl SockState {
self.poll_info.handles[0].status = 0;
self.poll_info.handles[0].events = self.user_evts;

let overlapped = Arc::into_raw(self_arc.clone()) as *const _ as PVOID;
let wrapped_overlapped = OverlappedArcWrapper::new(self_arc);
let overlapped = wrapped_overlapped.get_ptr() as *const _ as PVOID;
let result = unsafe {
self.afd
.poll(&mut self.poll_info, (*self.iosb).as_mut_ptr(), overlapped)
Expand All @@ -133,6 +173,11 @@ impl SockState {
}

self.poll_status = SockPollStatus::Pending;
if self.self_wrapped.is_some() {
// This shouldn't be happening. We cannot deallocate already pending overlapped before feed_event so we need to stand out here to declare unreachable.
unreachable!();
}
self.self_wrapped = Some(wrapped_overlapped);
self.pending_evts = self.user_evts;
} else {
unreachable!();
Expand All @@ -141,7 +186,10 @@ impl SockState {
}

fn cancel(&mut self) -> io::Result<()> {
assert!(self.poll_status == SockPollStatus::Pending);
match self.poll_status {
SockPollStatus::Pending => {}
_ => unreachable!(),
};
unsafe {
self.afd.cancel((*self.iosb).as_mut_ptr())?;
}
Expand All @@ -152,19 +200,26 @@ impl SockState {

fn mark_delete(&mut self) {
if !self.delete_pending {
if self.poll_status == SockPollStatus::Pending {
if let SockPollStatus::Pending = self.poll_status {
drop(self.cancel());
}

self.delete_pending = true;
}
}

// This is the function called from the overlapped using as Arc<Mutex<SockState>>. Watch out for reference counting.
fn feed_event(&mut self) -> Option<Event> {
let mut afd_events = 0;
if self.self_wrapped.is_some() {
// Forget our arced-self first. We will decrease the reference count by two if we don't do this on overlapped.
self.self_wrapped.as_mut().unwrap().forget();
self.self_wrapped = None;
}

self.poll_status = SockPollStatus::Idle;
self.pending_evts = 0;

let mut afd_events = 0;
// We use the status info in IO_STATUS_BLOCK to determine the socket poll status. It is unsafe to use a pointer of IO_STATUS_BLOCK.
unsafe {
let iosb = &*(*self.iosb).as_ptr();
Expand All @@ -191,6 +246,10 @@ impl SockState {
return None;
}

// In mio, we have to simulate Edge-triggered behavior to match API usage.
// The strategy here is to intercept all read/write from user that could cause WouldBlock usage,
// then reregister the socket to reset the interests.

// Reset readable event
if (afd_events & (KNOWN_AFD_EVENTS & !AFD_POLL_SEND)) != 0 {
self.user_evts &= !(afd_events & (KNOWN_AFD_EVENTS & !AFD_POLL_SEND));
Expand Down Expand Up @@ -433,17 +492,17 @@ impl SelectorInner {
});
continue;
}
let sock =
unsafe { Arc::from_raw(iocp_event.overlapped() as *mut Mutex<SockState>) };
let mut sock_guard = sock.lock().unwrap();
let sock_arc =
unsafe { Arc::from_raw(iocp_event.overlapped() as *const Mutex<SockState>) };
let mut sock_guard = sock_arc.lock().unwrap();
match sock_guard.feed_event() {
Some(e) => {
events.push(e);
}
None => {}
}
if !sock_guard.is_pending_deletion() {
update_queue.push_back(sock.clone());
update_queue.push_back(sock_arc.clone());
}
}
}
Expand Down

0 comments on commit 211223e

Please sign in to comment.