Skip to content

Commit

Permalink
WIP testing
Browse files Browse the repository at this point in the history
  • Loading branch information
gshuflin committed Jul 23, 2020
1 parent 43ebd34 commit 4a3fdd2
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/rust/engine/async_semaphore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ pub struct Permit {
impl Drop for Permit {
fn drop(&mut self) {
let mut inner = self.inner.lock();
inner.available_permits += 1;
if let Some(waiter) = inner.waiters.front() {
if let Some(waiter) = inner.waiters.get(inner.available_permits) {
waiter.waker.wake_by_ref()
}
inner.available_permits += 1;
}
}

Expand Down
79 changes: 79 additions & 0 deletions src/rust/engine/async_semaphore/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,85 @@ async fn correct_semaphor_slot_ids() {
assert_eq!(id4, 2);
}

#[tokio::test]
async fn correct_semaphor_slot_ids_2() {
let sema = AsyncSemaphore::new(4);
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
let (tx4, rx4) = oneshot::channel();
let (tx5, rx5) = oneshot::channel();
let (tx6, rx6) = oneshot::channel();
let (tx7, rx7) = oneshot::channel();

println!("Spawning process 1");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 1");
tx1.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 2");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 2");
tx2.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 3");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 3");
tx3.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 4");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 4");
tx4.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 5");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 5");
tx5.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 6");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 6");
tx6.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 7");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 7");
tx7.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
future::ready(())
}));


let id1 = rx1.await.unwrap();
let id2 = rx2.await.unwrap();
let id3 = rx3.await.unwrap();
let id4 = rx4.await.unwrap();
let id5 = rx5.await.unwrap();
let id6 = rx6.await.unwrap();
let id7 = rx7.await.unwrap();

assert_eq!(id1, 4);
assert_eq!(id2, 3);
assert_eq!(id3, 2);
assert_eq!(id4, 1);
assert_eq!(id5, 4);
assert_eq!(id6, 3);
assert_eq!(id7, 2);
}

#[tokio::test]
async fn at_most_n_acquisitions() {
let sema = AsyncSemaphore::new(1);
Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,8 @@ impl CommandRunner for BoundedCommandRunner {
let name = format!("{}-running", req.workunit_name());

semaphore.with_acquired(move |concurrency_id| {
//TODO should be debug
log::info!("Running {} with concurrency id: {}", desc, concurrency_id);
let mut metadata = WorkunitMetadata::with_level(Level::Info);
metadata.desc = Some(desc);

Expand Down

0 comments on commit 4a3fdd2

Please sign in to comment.