Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve overlapped deallocation strategy and docs #1042

Merged
merged 2 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/sys/windows/afd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ impl Afd {
/// This function is unsafe due to memory of `IO_STATUS_BLOCK` still being used by `Afd` instance while `Ok(false)` (`STATUS_PENDING`).
/// `iosb` needs to be untouched after the call while operation is in effective at ALL TIME except for `cancel` method.
/// So be careful not to `poll` twice while polling.
/// User should deallocate there overlapped value when error to prevent memory leak.
pub unsafe fn poll(
&self,
info: &mut AfdPollInfo,
Expand Down Expand Up @@ -189,6 +190,7 @@ impl Afd {
///
/// This function is unsafe due to memory of `IO_STATUS_BLOCK` still being used by `Afd` instance while `Ok(false)` (`STATUS_PENDING`).
/// Use it only with request is still being polled so that you have valid `IO_STATUS_BLOCK` to use.
/// User should NOT deallocate there overlapped value after the `cancel` to prevent double free.
pub unsafe fn cancel(&self, iosb: *mut IO_STATUS_BLOCK) -> io::Result<()> {
if (*iosb).u.Status != STATUS_PENDING {
return Ok(());
Expand Down
109 changes: 84 additions & 25 deletions src/sys/windows/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,39 @@ use super::SocketState;

const POLL_GROUP__MAX_GROUP_SIZE: usize = 32;

#[derive(PartialEq, Debug, Clone, Copy)]
/// This is the deallocation wrapper for overlapped pointer.
/// In case of error or status changing before the overlapped pointer is actually used(or not even being used),
/// this wrapper will decrease the reference count of Arc if being dropped.
/// Remember call `forget` if you have used the Arc, or you could decrease the reference count by two causing double free.
#[derive(Debug)]
struct OverlappedArcWrapper<T>(*const T);

unsafe impl<T> Send for OverlappedArcWrapper<T> {}

impl<T> OverlappedArcWrapper<T> {
fn new(arc: &Arc<T>) -> OverlappedArcWrapper<T> {
OverlappedArcWrapper(Arc::into_raw(arc.clone()))
}

fn forget(&mut self) {
self.0 = 0 as *const T;
}

fn get_ptr(&self) -> *const T {
self.0
}
}

impl<T> Drop for OverlappedArcWrapper<T> {
fn drop(&mut self) {
if self.0 as usize == 0 {
return;
}
drop(unsafe { Arc::from_raw(self.0) });
}
}

#[derive(Debug)]
enum SockPollStatus {
Idle,
Pending,
Expand All @@ -45,12 +77,18 @@ pub struct SockState {
iosb: Pin<Box<IoStatusBlock>>,
poll_info: AfdPollInfo,
afd: Arc<Afd>,

raw_socket: RawSocket,
base_socket: RawSocket,

user_evts: u32,
pending_evts: u32,

user_data: u64,

poll_status: SockPollStatus,
self_wrapped: Option<OverlappedArcWrapper<Mutex<SockState>>>,

delete_pending: bool,
}

Expand All @@ -66,6 +104,7 @@ impl SockState {
pending_evts: 0,
user_data: 0,
poll_status: SockPollStatus::Idle,
self_wrapped: None,
delete_pending: false,
})
}
Expand All @@ -84,25 +123,25 @@ impl SockState {
fn update(&mut self, self_arc: &Arc<Mutex<SockState>>) -> io::Result<()> {
assert!(!self.delete_pending);

if self.poll_status == SockPollStatus::Pending
&& (self.user_evts & KNOWN_AFD_EVENTS & !self.pending_evts) == 0
{
/* All the events the user is interested in are already being monitored by
* the pending poll operation. It might spuriously complete because of an
* event that we're no longer interested in; when that happens we'll submit
* a new poll operation with the updated event mask. */
} else if self.poll_status == SockPollStatus::Pending {
/* A poll operation is already pending, but it's not monitoring for all the
* events that the user is interested in. Therefore, cancel the pending
* poll operation; when we receive it's completion package, a new poll
* operation will be submitted with the correct event mask. */
self.cancel()?;
return Ok(());
} else if self.poll_status == SockPollStatus::Cancelled {
if let SockPollStatus::Pending = self.poll_status {
if (self.user_evts & KNOWN_AFD_EVENTS & !self.pending_evts) == 0 {
/* All the events the user is interested in are already being monitored by
* the pending poll operation. It might spuriously complete because of an
* event that we're no longer interested in; when that happens we'll submit
* a new poll operation with the updated event mask. */
} else {
/* A poll operation is already pending, but it's not monitoring for all the
* events that the user is interested in. Therefore, cancel the pending
* poll operation; when we receive it's completion package, a new poll
* operation will be submitted with the correct event mask. */
self.cancel()?;
return Ok(());
}
} else if let SockPollStatus::Cancelled = self.poll_status {
/* The poll operation has already been cancelled, we're still waiting for
* it to return. For now, there's nothing that needs to be done. */
return Ok(());
} else if self.poll_status == SockPollStatus::Idle {
} else if let SockPollStatus::Idle = self.poll_status {
/* No poll operation is pending; start one. */
self.poll_info.exclusive = 0;
self.poll_info.number_of_handles = 1;
Expand All @@ -113,7 +152,8 @@ impl SockState {
self.poll_info.handles[0].status = 0;
self.poll_info.handles[0].events = self.user_evts;

let overlapped = Arc::into_raw(self_arc.clone()) as *const _ as PVOID;
let wrapped_overlapped = OverlappedArcWrapper::new(self_arc);
let overlapped = wrapped_overlapped.get_ptr() as *const _ as PVOID;
let result = unsafe {
self.afd
.poll(&mut self.poll_info, (*self.iosb).as_mut_ptr(), overlapped)
Expand All @@ -133,6 +173,11 @@ impl SockState {
}

self.poll_status = SockPollStatus::Pending;
if self.self_wrapped.is_some() {
// This shouldn't be happening. We cannot deallocate already pending overlapped before feed_event so we need to stand out here to declare unreachable.
unreachable!();
}
self.self_wrapped = Some(wrapped_overlapped);
self.pending_evts = self.user_evts;
} else {
unreachable!();
Expand All @@ -141,7 +186,10 @@ impl SockState {
}

fn cancel(&mut self) -> io::Result<()> {
assert!(self.poll_status == SockPollStatus::Pending);
match self.poll_status {
SockPollStatus::Pending => {}
_ => unreachable!(),
};
unsafe {
self.afd.cancel((*self.iosb).as_mut_ptr())?;
}
Expand All @@ -152,19 +200,26 @@ impl SockState {

fn mark_delete(&mut self) {
if !self.delete_pending {
if self.poll_status == SockPollStatus::Pending {
if let SockPollStatus::Pending = self.poll_status {
drop(self.cancel());
}

self.delete_pending = true;
}
}

// This is the function called from the overlapped using as Arc<Mutex<SockState>>. Watch out for reference counting.
fn feed_event(&mut self) -> Option<Event> {
let mut afd_events = 0;
if self.self_wrapped.is_some() {
// Forget our arced-self first. We will decrease the reference count by two if we don't do this on overlapped.
self.self_wrapped.as_mut().unwrap().forget();
self.self_wrapped = None;
}

self.poll_status = SockPollStatus::Idle;
self.pending_evts = 0;

let mut afd_events = 0;
// We use the status info in IO_STATUS_BLOCK to determine the socket poll status. It is unsafe to use a pointer of IO_STATUS_BLOCK.
unsafe {
let iosb = &*(*self.iosb).as_ptr();
Expand All @@ -191,6 +246,10 @@ impl SockState {
return None;
}

// In mio, we have to simulate Edge-triggered behavior to match API usage.
// The strategy here is to intercept all read/write from user that could cause WouldBlock usage,
// then reregister the socket to reset the interests.

// Reset readable event
if (afd_events & (KNOWN_AFD_EVENTS & !AFD_POLL_SEND)) != 0 {
self.user_evts &= !(afd_events & (KNOWN_AFD_EVENTS & !AFD_POLL_SEND));
Expand Down Expand Up @@ -433,17 +492,17 @@ impl SelectorInner {
});
continue;
}
let sock =
unsafe { Arc::from_raw(iocp_event.overlapped() as *mut Mutex<SockState>) };
let mut sock_guard = sock.lock().unwrap();
let sock_arc =
unsafe { Arc::from_raw(iocp_event.overlapped() as *const Mutex<SockState>) };
let mut sock_guard = sock_arc.lock().unwrap();
match sock_guard.feed_event() {
Some(e) => {
events.push(e);
}
None => {}
}
if !sock_guard.is_pending_deletion() {
update_queue.push_back(sock.clone());
update_queue.push_back(sock_arc.clone());
}
}
}
Expand Down