Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: Refactor out usage of Weak in the io handle #4656

Merged
merged 3 commits into from May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
137 changes: 74 additions & 63 deletions tokio/src/io/driver/mod.rs
Expand Up @@ -18,13 +18,13 @@ mod metrics;

use crate::park::{Park, Unpark};
use crate::util::slab::{self, Slab};
use crate::{loom::sync::Mutex, util::bit};
use crate::{loom::sync::RwLock, util::bit};

use metrics::IoDriverMetrics;

use std::fmt;
use std::io;
use std::sync::{Arc, Weak};
use std::sync::Arc;
use std::time::Duration;

/// I/O driver, backed by Mio.
Expand All @@ -37,10 +37,8 @@ pub(crate) struct Driver {
events: Option<mio::Events>,

/// Primary slab handle containing the state for each resource registered
/// with this driver. During Drop this is moved into the Inner structure, so
/// this is an Option to allow it to be vacated (until Drop this is always
/// Some).
resources: Option<Slab<ScheduledIo>>,
/// with this driver.
resources: Slab<ScheduledIo>,

/// The system event queue.
poll: mio::Poll,
Expand All @@ -52,7 +50,7 @@ pub(crate) struct Driver {
/// A reference to an I/O driver.
#[derive(Clone)]
pub(crate) struct Handle {
inner: Weak<Inner>,
pub(super) inner: Arc<Inner>,
}

#[derive(Debug)]
Expand All @@ -61,20 +59,17 @@ pub(crate) struct ReadyEvent {
pub(crate) ready: Ready,
}

pub(super) struct Inner {
/// Primary slab handle containing the state for each resource registered
/// with this driver.
///
/// The ownership of this slab is moved into this structure during
/// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles
/// without risking new ones being registered in the meantime.
resources: Mutex<Option<Slab<ScheduledIo>>>,
struct IoDispatcher {
allocator: slab::Allocator<ScheduledIo>,
is_shutdown: bool,
}

pub(super) struct Inner {
/// Registers I/O resources.
registry: mio::Registry,

/// Allocates `ScheduledIo` handles when creating new resources.
pub(super) io_dispatch: slab::Allocator<ScheduledIo>,
io_dispatch: RwLock<IoDispatcher>,

/// Used to wake up the reactor from a call to `turn`.
waker: mio::Waker,
Expand Down Expand Up @@ -130,11 +125,10 @@ impl Driver {
tick: 0,
events: Some(mio::Events::with_capacity(1024)),
poll,
resources: Some(slab),
resources: slab,
inner: Arc::new(Inner {
resources: Mutex::new(None),
registry,
io_dispatch: allocator,
io_dispatch: RwLock::new(IoDispatcher::new(allocator)),
waker,
metrics: IoDriverMetrics::default(),
}),
Expand All @@ -149,7 +143,7 @@ impl Driver {
/// to bind them to this event loop.
pub(crate) fn handle(&self) -> Handle {
Handle {
inner: Arc::downgrade(&self.inner),
inner: Arc::clone(&self.inner),
}
}

Expand All @@ -160,7 +154,7 @@ impl Driver {
self.tick = self.tick.wrapping_add(1);

if self.tick == COMPACT_INTERVAL {
self.resources.as_mut().unwrap().compact()
self.resources.compact()
}

let mut events = self.events.take().expect("i/o driver event store missing");
Expand Down Expand Up @@ -194,7 +188,7 @@ impl Driver {
fn dispatch(&mut self, token: mio::Token, ready: Ready) {
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));

let resources = self.resources.as_mut().unwrap();
let resources = &mut self.resources;

let io = match resources.get(addr) {
Some(io) => io,
Expand All @@ -214,22 +208,7 @@ impl Driver {

impl Drop for Driver {
fn drop(&mut self) {
(*self.inner.resources.lock()) = self.resources.take();
}
}

impl Drop for Inner {
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
fn drop(&mut self) {
let resources = self.resources.lock().take();

if let Some(mut slab) = resources {
slab.for_each(|io| {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
io.shutdown();
});
}
self.shutdown();
}
}

Expand All @@ -251,7 +230,16 @@ impl Park for Driver {
Ok(())
}

fn shutdown(&mut self) {}
fn shutdown(&mut self) {
if self.inner.shutdown() {
self.resources.for_each(|io| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this statement is commented out, do tests fail? i.e. if there is a memory leak, is it caught my CI? If not, we should add one.

Copy link
Contributor Author

@hidva hidva May 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

First, we have to comment out clear_wakers().

Second, if we commented out io.shutdown() here, we'd get a memory leak, otherwise we wouldn't.

$ valgrind --error-exitcode=1 --leak-check=full --show-leak-kinds=all ./target/debug/test-mem
==756352== LEAK SUMMARY:
==756352==    definitely lost: 96 bytes in 1 blocks
==756352==    indirectly lost: 20,856 bytes in 35 blocks
==756352==      possibly lost: 0 bytes in 0 blocks
==756352==    still reachable: 9,952 bytes in 69 blocks
==756352==         suppressed: 0 bytes in 0 blocks
==756352==
==756352== For lists of detected and suppressed errors, rerun with: -s
==756352== ERROR SUMMARY: 1 errors from 1 contexts (suppressed: 0 from 0)

(It looks like we don't need to call clear_wakers() in Drop for Registration anymore.

// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown. And shutdown will clear all wakers.
io.shutdown();
});
}
}
}

impl fmt::Debug for Driver {
Expand Down Expand Up @@ -292,18 +280,11 @@ cfg_not_rt! {

cfg_net! {
cfg_metrics! {
impl Handle {
// TODO: Remove this when handle contains `Arc<Inner>` so that we can return
// &IoDriverMetrics instead of using a closure.
//
// Related issue: https://github.com/tokio-rs/tokio/issues/4509
pub(crate) fn with_io_driver_metrics<F, R>(&self, f: F) -> Option<R>
where
F: Fn(&IoDriverMetrics) -> R,
{
self.inner().map(|inner| f(&inner.metrics))
impl Handle {
pub(crate) fn metrics(&self) -> &IoDriverMetrics {
&self.inner.metrics
}
}
}
}
}

Expand All @@ -318,13 +299,7 @@ impl Handle {
/// blocked in `turn`, then the next call to `turn` will not block and
/// return immediately.
fn wakeup(&self) {
if let Some(inner) = self.inner() {
inner.waker.wake().expect("failed to wake I/O driver");
}
}

pub(super) fn inner(&self) -> Option<Arc<Inner>> {
self.inner.upgrade()
self.inner.waker.wake().expect("failed to wake I/O driver");
}
}

Expand All @@ -340,6 +315,17 @@ impl fmt::Debug for Handle {
}
}

// ===== impl IoDispatcher =====

impl IoDispatcher {
fn new(allocator: slab::Allocator<ScheduledIo>) -> Self {
Self {
allocator,
is_shutdown: false,
}
}
}

// ===== impl Inner =====

impl Inner {
Expand All @@ -351,12 +337,7 @@ impl Inner {
source: &mut impl mio::event::Source,
interest: Interest,
) -> io::Result<slab::Ref<ScheduledIo>> {
let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"reactor at max registered I/O resources",
)
})?;
let (address, shared) = self.allocate()?;

let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));

Expand All @@ -376,6 +357,36 @@ impl Inner {

Ok(())
}

/// shutdown the dispatcher.
fn shutdown(&self) -> bool {
let mut io = self.io_dispatch.write().unwrap();
hidva marked this conversation as resolved.
Show resolved Hide resolved
if io.is_shutdown {
return false;
}
io.is_shutdown = true;
true
}

fn is_shutdown(&self) -> bool {
return self.io_dispatch.read().unwrap().is_shutdown;
}

fn allocate(&self) -> io::Result<(slab::Address, slab::Ref<ScheduledIo>)> {
let io = self.io_dispatch.read().unwrap();
if io.is_shutdown {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to find event loop",
));
}
io.allocator.allocate().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"reactor at max registered I/O resources",
)
})
}
}

impl Direction {
Expand Down
19 changes: 4 additions & 15 deletions tokio/src/io/driver/registration.rs
Expand Up @@ -72,14 +72,7 @@ impl Registration {
interest: Interest,
handle: Handle,
) -> io::Result<Registration> {
let shared = if let Some(inner) = handle.inner() {
inner.add_source(io, interest)?
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to find event loop",
));
};
let shared = handle.inner.add_source(io, interest)?;

Ok(Registration { handle, shared })
}
Expand All @@ -101,11 +94,7 @@ impl Registration {
///
/// `Err` is returned if an error is encountered.
pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.deregister_source(io)
self.handle.inner.deregister_source(io)
}

pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
Expand Down Expand Up @@ -157,7 +146,7 @@ impl Registration {
let coop = ready!(crate::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));

if self.handle.inner().is_none() {
if self.handle.inner.is_shutdown() {
return Poll::Ready(Err(gone()));
}

Expand Down Expand Up @@ -235,7 +224,7 @@ cfg_io_readiness! {
pin!(fut);

crate::future::poll_fn(|cx| {
if self.handle.inner().is_none() {
if self.handle.inner.is_shutdown() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/metrics/runtime.rs
Expand Up @@ -529,7 +529,7 @@ cfg_net! {
.as_inner()
.io_handle
.as_ref()
.and_then(|h| h.with_io_driver_metrics(f))
.map(|h| f(h.metrics()))
hidva marked this conversation as resolved.
Show resolved Hide resolved
.unwrap_or(0)
}
}
Expand Down