Skip to content

Commit

Permalink
Merge pull request #1065 from rust-lang/accurate
Browse files Browse the repository at this point in the history
Limit concurrency of active backends, not Coordinators
  • Loading branch information
shepmaster committed Jun 7, 2024
2 parents 2377c99 + 58201a4 commit debf744
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 48 deletions.
3 changes: 3 additions & 0 deletions compiler/base/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ edition = "2021"

[workspace]

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(force_docker)'] }

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
asm-cleanup = { path = "../asm-cleanup" }
Expand Down
75 changes: 29 additions & 46 deletions compiler/base/orchestrator/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -861,60 +861,23 @@ impl CoordinatorFactory {
CoordinatorId { start, id }
}

pub async fn build<B>(&self) -> LimitedCoordinator<B>
pub async fn build<B>(&self) -> Coordinator<B>
where
B: Backend + From<CoordinatorId>,
{
let semaphore = self.semaphore.clone();
let permit = semaphore
.acquire_owned()
.await
.expect("Unable to acquire permit");

let id = self.next_id();
let backend = B::from(id);

let coordinator = Coordinator::new(backend);

LimitedCoordinator {
coordinator,
_permit: permit,
}
}
}

pub struct LimitedCoordinator<T> {
coordinator: Coordinator<T>,
_permit: OwnedSemaphorePermit,
}

impl<T> LimitedCoordinator<T>
where
T: Backend,
{
pub async fn shutdown(self) -> Result<T> {
self.coordinator.shutdown().await
}
}

impl<T> ops::Deref for LimitedCoordinator<T> {
type Target = Coordinator<T>;

fn deref(&self) -> &Self::Target {
&self.coordinator
}
}

impl<T> ops::DerefMut for LimitedCoordinator<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.coordinator
Coordinator::new(semaphore, backend)
}
}

#[derive(Debug)]
pub struct Coordinator<B> {
semaphore: Arc<Semaphore>,
backend: B,
// Consider making these lazily-created and/or idly time out
stable: OnceCell<Container>,
beta: OnceCell<Container>,
nightly: OnceCell<Container>,
Expand All @@ -934,10 +897,11 @@ impl<B> Coordinator<B>
where
B: Backend,
{
pub fn new(backend: B) -> Self {
pub fn new(semaphore: Arc<Semaphore>, backend: B) -> Self {
let token = CancellationToken::new();

Self {
semaphore,
backend,
stable: OnceCell::new(),
beta: OnceCell::new(),
Expand Down Expand Up @@ -1174,13 +1138,18 @@ where
};

container
.get_or_try_init(|| Container::new(channel, self.token.clone(), &self.backend))
.get_or_try_init(|| {
let semaphore = self.semaphore.clone();
let token = self.token.clone();
Container::new(channel, semaphore, token, &self.backend)
})
.await
}
}

#[derive(Debug)]
struct Container {
permit: OwnedSemaphorePermit,
task: JoinHandle<Result<()>>,
kill_child: Option<Command>,
modify_cargo_toml: ModifyCargoToml,
Expand All @@ -1190,9 +1159,15 @@ struct Container {
impl Container {
async fn new(
channel: Channel,
semaphore: Arc<Semaphore>,
token: CancellationToken,
backend: &impl Backend,
) -> Result<Self> {
let permit = semaphore
.acquire_owned()
.await
.context(AcquirePermitSnafu)?;

let (mut child, kill_child, stdin, stdout) = backend.run_worker_in_background(channel)?;
let IoQueue {
mut tasks,
Expand Down Expand Up @@ -1231,6 +1206,7 @@ impl Container {
.context(CouldNotLoadCargoTomlSnafu)?;

Ok(Container {
permit,
task,
kill_child,
modify_cargo_toml,
Expand Down Expand Up @@ -1907,6 +1883,7 @@ impl Container {

async fn shutdown(self) -> Result<()> {
let Self {
permit,
task,
kill_child,
modify_cargo_toml,
Expand All @@ -1927,7 +1904,10 @@ impl Container {
.context(KillWorkerSnafu)?;
}

task.await.context(ContainerTaskPanickedSnafu)?
let r = task.await;
drop(permit);

r.context(ContainerTaskPanickedSnafu)?
}
}

Expand Down Expand Up @@ -2705,6 +2685,9 @@ pub enum Error {

#[snafu(display("Unable to load original Cargo.toml"))]
CouldNotLoadCargoToml { source: ModifyCargoTomlError },

#[snafu(display("Could not acquire a semaphore permit"))]
AcquirePermit { source: tokio::sync::AcquireError },
}

struct IoQueue {
Expand Down Expand Up @@ -2866,15 +2849,15 @@ mod tests {
static TEST_COORDINATOR_FACTORY: Lazy<CoordinatorFactory> =
Lazy::new(|| CoordinatorFactory::new(*MAX_CONCURRENT_TESTS));

async fn new_coordinator_test() -> LimitedCoordinator<TestBackend> {
async fn new_coordinator_test() -> Coordinator<TestBackend> {
TEST_COORDINATOR_FACTORY.build().await
}

async fn new_coordinator_docker() -> LimitedCoordinator<DockerBackend> {
async fn new_coordinator_docker() -> Coordinator<DockerBackend> {
TEST_COORDINATOR_FACTORY.build().await
}

async fn new_coordinator() -> LimitedCoordinator<impl Backend> {
async fn new_coordinator() -> Coordinator<impl Backend> {
#[cfg(not(force_docker))]
{
new_coordinator_test().await
Expand Down
4 changes: 2 additions & 2 deletions ui/src/server_axum/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
use axum::extract::ws::{Message, WebSocket};
use futures::{future::Fuse, Future, FutureExt, StreamExt, TryFutureExt};
use orchestrator::{
coordinator::{self, CoordinatorFactory, DockerBackend, LimitedCoordinator},
coordinator::{self, Coordinator, CoordinatorFactory, DockerBackend},
DropErrorDetailsExt,
};
use snafu::prelude::*;
Expand Down Expand Up @@ -222,7 +222,7 @@ pub(crate) async fn handle(

type TaggedError = (Error, Option<Meta>);
type ResponseTx = mpsc::Sender<Result<MessageResponse, TaggedError>>;
type SharedCoordinator = Arc<LimitedCoordinator<DockerBackend>>;
type SharedCoordinator = Arc<Coordinator<DockerBackend>>;

/// Manages a limited amount of access to the `Coordinator`.
///
Expand Down

0 comments on commit debf744

Please sign in to comment.