From bf7ede736399e44c42e7d42cec85ba7978767a35 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Thu, 23 Jan 2020 17:19:08 -0500 Subject: [PATCH] preemption: add yield points to some leaf futures --- tokio/src/io/registration.rs | 6 ++++++ tokio/src/process/mod.rs | 3 +++ tokio/src/sync/mpsc/chan.rs | 3 +++ tokio/src/sync/oneshot.rs | 6 ++++++ tokio/src/sync/semaphore_ll.rs | 6 ++++++ tokio/src/task/join.rs | 3 +++ tokio/src/time/driver/registration.rs | 3 +++ 7 files changed, 30 insertions(+) diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index 16e8fe4d675..4c5cfe27d75 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -139,6 +139,9 @@ impl Registration { /// /// This function will panic if called from outside of a task context. pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + // Keep track of task budget + ready!(crate::league::poll_cooperate(cx)); + let v = self.poll_ready(Direction::Read, Some(cx))?; match v { Some(v) => Poll::Ready(Ok(v)), @@ -190,6 +193,9 @@ impl Registration { /// /// This function will panic if called from outside of a task context. pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + // Keep track of task budget + ready!(crate::league::poll_cooperate(cx)); + let v = self.poll_ready(Direction::Write, Some(cx))?; match v { Some(v) => Poll::Ready(Ok(v)), diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index 6562d9289a1..2e46a87f069 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -700,6 +700,9 @@ where type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Keep track of task budget + ready!(crate::league::poll_cooperate(cx)); + let ret = Pin::new(&mut self.inner).poll(cx); if let Poll::Ready(Ok(_)) = ret { diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 847a0b70844..77cc6b1bd95 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -265,6 +265,9 @@ where pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll> { use super::block::Read::*; + // Keep track of task budget + ready!(crate::league::poll_cooperate(cx)); + self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 5bbea381adf..ffbc9818727 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -196,6 +196,9 @@ impl Sender { #[doc(hidden)] // TODO: remove pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> { + // Keep track of task budget + ready!(crate::league::poll_cooperate(cx)); + let inner = self.inner.as_ref().unwrap(); let mut state = State::load(&inner.state, Acquire); @@ -544,6 +547,9 @@ impl Inner { } fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> { + // Keep track of task budget + ready!(crate::league::poll_cooperate(cx)); + // Load the state let mut state = State::load(&self.state, Acquire); diff --git a/tokio/src/sync/semaphore_ll.rs b/tokio/src/sync/semaphore_ll.rs index 6550f13d3ab..1e4db845b3c 100644 --- a/tokio/src/sync/semaphore_ll.rs +++ b/tokio/src/sync/semaphore_ll.rs @@ -185,6 +185,9 @@ impl Semaphore { num_permits: u16, permit: &mut Permit, ) -> Poll> { + // Keep track of task budget + ready!(crate::league::poll_cooperate(cx)); + self.poll_acquire2(num_permits, || { let waiter = permit.waiter.get_or_insert_with(|| Box::new(Waiter::new())); @@ -630,6 +633,9 @@ impl Permit { match self.state { Waiting(requested) => { + // Keep track of task budget + ready!(crate::league::poll_cooperate(cx)); + // There must be a waiter let waiter = self.waiter.as_ref().unwrap(); diff --git a/tokio/src/task/join.rs b/tokio/src/task/join.rs index 8a8f25714e5..a5dfcead68b 100644 --- a/tokio/src/task/join.rs +++ b/tokio/src/task/join.rs @@ -102,6 +102,9 @@ impl Future for JoinHandle { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { use std::mem::MaybeUninit; + // Keep track of task budget + ready!(crate::league::poll_cooperate(cx)); + // Raw should always be set let raw = self.raw.as_ref().unwrap(); diff --git a/tokio/src/time/driver/registration.rs b/tokio/src/time/driver/registration.rs index 141ca0154c1..bbe256fb399 100644 --- a/tokio/src/time/driver/registration.rs +++ b/tokio/src/time/driver/registration.rs @@ -39,6 +39,9 @@ impl Registration { } pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll> { + // Keep track of task budget + ready!(crate::league::poll_cooperate(cx)); + self.entry.poll_elapsed(cx) } }