Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes in async_semaphore for concurrency ID calculation #10436

Merged
merged 2 commits into from
Jul 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/rust/engine/async_semaphore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct Waiter {

struct Inner {
waiters: VecDeque<Waiter>,
available_ids: VecDeque<usize>,
available_permits: usize,
// Used as the source of id in Waiters's because
// it is monotonically increasing, and only incremented under the mutex lock.
Expand All @@ -55,9 +56,15 @@ pub struct AsyncSemaphore {

impl AsyncSemaphore {
pub fn new(permits: usize) -> AsyncSemaphore {
let mut available_ids = VecDeque::new();
for id in 1..=permits {
available_ids.push_back(id);
}

AsyncSemaphore {
inner: Arc::new(Mutex::new(Inner {
waiters: VecDeque::new(),
available_ids,
available_permits: permits,
next_waiter_id: 0,
})),
Expand Down Expand Up @@ -99,10 +106,11 @@ pub struct Permit {
impl Drop for Permit {
fn drop(&mut self) {
let mut inner = self.inner.lock();
inner.available_permits += 1;
if let Some(waiter) = inner.waiters.front() {
inner.available_ids.push_back(self.id);
if let Some(waiter) = inner.waiters.get(inner.available_permits) {
waiter.waker.wake_by_ref()
}
inner.available_permits += 1;
}
}

Expand Down Expand Up @@ -158,7 +166,8 @@ impl Future for PermitFuture {
// queue, so we don't have to waste time searching for it in the Drop
// handler.
self.waiter_id = None;
Some(inner.available_permits)
let id = inner.available_ids.pop_front().unwrap_or(0); // The unwrap_or case should never happen.
Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would probably be less error prone for this to either:

  1. replace the inner.available_permits counter entirely, so that we can be sure that we don't promise a permit unless there is an available_id.
  2. be decoupled from the semaphore and completely independent (such that it generated tokens lazily if it was empty, counting upward or something).

But if you trust that the test is sufficient, it's not a blocker per-se.

Some(id)
} else {
// Don't issue a permit to this task if it isn't at the head of the line,
// we added it as a waiter above.
Expand Down
92 changes: 85 additions & 7 deletions src/rust/engine/async_semaphore/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,91 @@ async fn correct_semaphor_slot_ids() {
let id3 = rx3.await.unwrap();
let id4 = rx4.await.unwrap();

// Process 1 should get ID 2, then process 2 should run with id 1 and complete, then process 3
// should run in the same "slot" as process 2 and get the same id. Process 4 is scheduled
// later and gets put into "slot" 2.
assert_eq!(id1, 2);
assert_eq!(id2, 1);
assert_eq!(id3, 1);
assert_eq!(id4, 2);
// Process 1 should get ID 1, then process 2 should run with id 2 and complete, then process 3
// should run in the same "slot" as process 2 and get the same id (2). Process 4 is scheduled
// later and gets put into "slot" 1.
assert_eq!(id1, 1);
assert_eq!(id2, 2);
assert_eq!(id3, 2);
assert_eq!(id4, 1);
}

#[tokio::test]
async fn correct_semaphor_slot_ids_2() {
let sema = AsyncSemaphore::new(4);
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
let (tx4, rx4) = oneshot::channel();
let (tx5, rx5) = oneshot::channel();
let (tx6, rx6) = oneshot::channel();
let (tx7, rx7) = oneshot::channel();

println!("Spawning process 1");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 1");
tx1.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 2");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 2");
tx2.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 3");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 3");
tx3.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 4");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 4");
tx4.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 5");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 5");
tx5.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 6");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 6");
tx6.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 7");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 7");
tx7.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
future::ready(())
}));

let id1 = rx1.await.unwrap();
let id2 = rx2.await.unwrap();
let id3 = rx3.await.unwrap();
let id4 = rx4.await.unwrap();
let id5 = rx5.await.unwrap();
let id6 = rx6.await.unwrap();
let id7 = rx7.await.unwrap();

assert_eq!(id1, 1);
assert_eq!(id2, 2);
assert_eq!(id3, 3);
assert_eq!(id4, 4);
assert_eq!(id5, 1);
assert_eq!(id6, 2);
assert_eq!(id7, 3);
}

#[tokio::test]
Expand Down
5 changes: 5 additions & 0 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,11 @@ impl CommandRunner for BoundedCommandRunner {
let name = format!("{}-running", req.workunit_name());

semaphore.with_acquired(move |concurrency_id| {
log::debug!(
"Running {} under semaphor with concurrency id: {}",
stuhood marked this conversation as resolved.
Show resolved Hide resolved
desc,
concurrency_id
);
let mut metadata = WorkunitMetadata::with_level(Level::Info);
metadata.desc = Some(desc);

Expand Down