Skip to content

Commit

Permalink
Merge pull request #375 from teozkr/bugfix/runner-enqueue-scheduling
Browse files Browse the repository at this point in the history
Runtime: Always wake runner immediately once a new task is enqueued
  • Loading branch information
clux committed Jan 6, 2021
2 parents b059b66 + ef8f2b6 commit 72261c1
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ UNRELEASED
* docs: fix broken documentation for `kube` 0.46.0 - #367
* bug: `AttachParams` now fixes owned method chaining (slightly breaks from 0.46 if using &mut ref before) - #364
* feat: `AttachParams::interactive_tty` convenience method added - #364
* bug: fix `Runner` (and thus `Controller` and `applier`) not waking correctly when starting new tasks - #375

0.46.0 / 2021-01-02
===================
Expand Down
41 changes: 39 additions & 2 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ where
slots.insert(msg, msg_fut).is_none(),
"Runner tried to replace a running future.. please report this as a kube-rs bug!"
);
cx.waker().wake_by_ref();
}
Poll::Ready(Some(Err(err))) => break Poll::Ready(Some(Err(err))),
Poll::Ready(None) => {
Expand All @@ -86,9 +87,16 @@ where
mod tests {
use super::Runner;
use crate::scheduler::{scheduler, ScheduleRequest};
use futures::{channel::mpsc, poll, SinkExt, TryStreamExt};
use futures::{
channel::{mpsc, oneshot},
poll, SinkExt, TryStreamExt,
};
use std::{cell::RefCell, time::Duration};
use tokio::time::{pause, sleep, Instant};
use tokio::{
runtime::Handle,
task::yield_now,
time::{pause, sleep, timeout, Instant},
};

#[tokio::test]
async fn runner_should_never_run_two_instances_at_once() {
Expand Down Expand Up @@ -128,4 +136,33 @@ mod tests {
// Validate that we actually ran both requests
assert_eq!(count, 2);
}

// Test MUST be single-threaded to be consistent, since it concerns a relatively messy
// interplay between multiple tasks
#[tokio::test(flavor = "current_thread")]
async fn runner_should_wake_when_scheduling_messages() {
// pause();
let (mut sched_tx, sched_rx) = mpsc::unbounded();
let (result_tx, result_rx) = oneshot::channel();
let mut runner = Runner::new(scheduler(sched_rx), |msg: &u8| futures::future::ready(*msg));
// Start a background task that starts listening /before/ we enqueue the message
// We can't just use Stream::poll_next(), since that bypasses the waker system
Handle::current().spawn(async move { result_tx.send(runner.try_next().await.unwrap()).unwrap() });
// Ensure that the background task actually gets to initiate properly, and starts polling the runner
yield_now().await;
sched_tx
.send(ScheduleRequest {
message: 8,
run_at: Instant::now(),
})
.await
.unwrap();
// Eventually the background task should finish up and report the message received,
// a timeout here *should* mean that the background task isn't getting awoken properly
// when the new message is ready.
assert_eq!(
timeout(Duration::from_secs(1), result_rx).await.unwrap().unwrap(),
Some(8)
);
}
}

0 comments on commit 72261c1

Please sign in to comment.