Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt at fixing sporadic failures of shuttle-deployer #980

Merged
merged 7 commits into from
Jun 27, 2023
43 changes: 26 additions & 17 deletions deployer/src/deployment/deploy_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ mod tests {
DatabaseDeletionResponse, DatabaseRequest, DatabaseResponse,
};
use tempfile::Builder;
use tokio::{select, time::sleep};
use tokio::{select, task::JoinSet, time::sleep};
use tonic::transport::Server;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use uuid::Uuid;
Expand Down Expand Up @@ -574,9 +574,14 @@ mod tests {
async fn test_states(id: &Uuid, expected_states: Vec<StateLog>) {
loop {
let states = RECORDER.lock().unwrap().get_deployment_states(id);
if states == expected_states {
return;
}

if *states == expected_states {
break;
for (actual, expected) in states.iter().zip(&expected_states) {
if actual != expected {
return;
}
oddgrd marked this conversation as resolved.
Show resolved Hide resolved
}

sleep(Duration::from_millis(250)).await;
Expand All @@ -585,7 +590,7 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
async fn deployment_to_be_queued() {
let deployment_manager = get_deployment_manager().await;
let (_set, deployment_manager) = get_deployment_manager();

let queued = get_queue("sleep-async");
let id = queued.id;
Expand Down Expand Up @@ -628,12 +633,8 @@ mod tests {
// Send kill signal
deployment_manager.kill(id).await;

sleep(Duration::from_secs(1)).await;

let states = RECORDER.lock().unwrap().get_deployment_states(&id);

assert_eq!(
*states,
let test = test_states(
&id,
vec![
StateLog {
id,
Expand All @@ -659,13 +660,21 @@ mod tests {
id,
state: State::Stopped,
},
]
],
);

select! {
_ = sleep(Duration::from_secs(60)) => {
let states = RECORDER.lock().unwrap().get_deployment_states(&id);
panic!("states should go into 'Stopped' for a valid service: {:#?}", states);
},
_ = test => {}
};
}

#[tokio::test(flavor = "multi_thread")]
async fn deployment_self_stop() {
let deployment_manager = get_deployment_manager().await;
let (_set, deployment_manager) = get_deployment_manager();

let queued = get_queue("self-stop");
let id = queued.id;
Expand Down Expand Up @@ -712,7 +721,7 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
async fn deployment_bind_panic() {
let deployment_manager = get_deployment_manager().await;
let (_set, deployment_manager) = get_deployment_manager();

let queued = get_queue("bind-panic");
let id = queued.id;
Expand Down Expand Up @@ -759,7 +768,7 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
async fn deployment_main_panic() {
let deployment_manager = get_deployment_manager().await;
let (_set, deployment_manager) = get_deployment_manager();

let queued = get_queue("main-panic");
let id = queued.id;
Expand Down Expand Up @@ -802,7 +811,7 @@ mod tests {

#[tokio::test]
async fn deployment_from_run() {
let deployment_manager = get_deployment_manager().await;
let (_set, deployment_manager) = get_deployment_manager();

let id = Uuid::new_v4();
deployment_manager
Expand Down Expand Up @@ -845,7 +854,7 @@ mod tests {

#[tokio::test]
async fn scope_with_nil_id() {
let deployment_manager = get_deployment_manager().await;
let (_set, deployment_manager) = get_deployment_manager();

let id = Uuid::nil();
deployment_manager
Expand All @@ -872,7 +881,7 @@ mod tests {
);
}

async fn get_deployment_manager() -> DeploymentManager {
fn get_deployment_manager() -> (JoinSet<()>, DeploymentManager) {
DeploymentManager::builder()
.build_log_recorder(RECORDER.clone())
.secret_recorder(RECORDER.clone())
Expand Down
27 changes: 17 additions & 10 deletions deployer/src/deployment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use crate::{
persistence::{DeploymentUpdater, ResourceManager, SecretGetter, SecretRecorder, State},
RuntimeManager,
};
use tokio::sync::{mpsc, Mutex};
use tokio::{
sync::{mpsc, Mutex},
task::JoinSet,
};
use uuid::Uuid;

use self::{deploy_layer::LogRecorder, gateway_client::BuildQueueClient};
Expand Down Expand Up @@ -103,7 +106,7 @@ where
/// executing/deploying built services. Two multi-producer, single consumer
/// channels are also created which are for moving on-going service
/// deployments between the aforementioned tasks.
pub fn build(self) -> DeploymentManager {
pub fn build(self) -> (JoinSet<()>, DeploymentManager) {
let build_log_recorder = self
.build_log_recorder
.expect("a build log recorder to be set");
Expand All @@ -125,8 +128,9 @@ where
let storage_manager = ArtifactsStorageManager::new(artifacts_path);

let run_send_clone = run_send.clone();
let mut set = JoinSet::new();

tokio::spawn(queue::task(
set.spawn(queue::task(
queue_recv,
run_send_clone,
deployment_updater.clone(),
Expand All @@ -135,7 +139,7 @@ where
storage_manager.clone(),
queue_client,
));
tokio::spawn(run::task(
set.spawn(run::task(
run_recv,
runtime_manager.clone(),
deployment_updater,
Expand All @@ -145,12 +149,15 @@ where
storage_manager.clone(),
));

DeploymentManager {
queue_send,
run_send,
runtime_manager,
storage_manager,
}
(
set,
Copy link
Contributor

@iulianbarbu iulianbarbu Jun 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we attach this to the deployment manager?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we attach it to the deployer, then we have to wrap it in an Arc and a Mutex. I tried to avoid that (no particular reason actually, now that I think about it) but it might be better to wrap it and keep the JoinSet with the manager.

DeploymentManager {
queue_send,
run_send,
runtime_manager,
storage_manager,
},
)
}
}

Expand Down
12 changes: 11 additions & 1 deletion deployer/src/deployment/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use opentelemetry::global;
use serde_json::json;
use shuttle_common::claims::Claim;
use shuttle_service::builder::{build_workspace, BuiltService};
use tokio::task::JoinSet;
use tokio::time::{sleep, timeout};
use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
Expand Down Expand Up @@ -40,6 +41,8 @@ pub async fn task(
) {
info!("Queue task started");

let mut tasks = JoinSet::new();

while let Some(queued) = recv.recv().await {
let id = queued.id;

Expand All @@ -52,7 +55,7 @@ pub async fn task(
let storage_manager = storage_manager.clone();
let queue_client = queue_client.clone();

tokio::spawn(async move {
tasks.spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The while loop this spawn is in has a 'static lifetime. So I'm wondering if an overflow will eventually happen if tasks are only inserted into the set and are never waited for?

Copy link
Member Author

@Kazy Kazy Jun 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point, I think this can happen yes. If the queue never ends (e.g. in the case of a long running process), the task set will never be awaited, and the memory will keep growing with each deployment.

I wonder how this could be fixed. I see two solutions, I don't know if there are some simpler ones:

  1. Spawn a thread that will be responsible for running join_next + a signal once we're done with the queue to notify that thread that the next time join_next returns None (i.e. that the set is empty), it must return.
  2. Instead of doing a while loop on recv.recv(), use a select! between recv.recv(), tasks.join_next(), and the else case where both returned None.

I think the second one is the cleanest, let me know what you think !

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we would use something like 2. However, this doesn't seem to crash anything in our stress testing so we think about merging it without the extra handling for now. There is also a hard limit imposed on the container memory which will most probably crash the user's deployer, but not affect the rest of the system. Also, we will move away from this one deployer-per-user's project architecture soon, so it is not critical to address this.

let parent_cx = global::get_text_map_propagator(|propagator| {
propagator.extract(&queued.tracing_context)
});
Expand Down Expand Up @@ -93,6 +96,13 @@ pub async fn task(
.await
});
}

while let Some(res) = tasks.join_next().await {
match res {
Ok(_) => (),
Err(err) => error!(error = %err, "an error happened when joining a builder task"),
}
}
oddgrd marked this conversation as resolved.
Show resolved Hide resolved
}

#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))]
Expand Down
33 changes: 25 additions & 8 deletions deployer/src/deployment/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use shuttle_proto::runtime::{
runtime_client::RuntimeClient, LoadRequest, StartRequest, StopReason, SubscribeStopRequest,
SubscribeStopResponse,
};
use tokio::sync::Mutex;
use tokio::{
sync::Mutex,
task::{JoinHandle, JoinSet},
};
use tonic::{transport::Channel, Code};
use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;
Expand All @@ -44,6 +47,8 @@ pub async fn task(
) {
info!("Run task started");

let mut set = JoinSet::new();

while let Some(built) = recv.recv().await {
let id = built.id;

Expand Down Expand Up @@ -83,15 +88,15 @@ pub async fn task(
};
let runtime_manager = runtime_manager.clone();

tokio::spawn(async move {
set.spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same goes for this while loop which also has a 'static lifetime

let parent_cx = global::get_text_map_propagator(|propagator| {
propagator.extract(&built.tracing_context)
});
let span = debug_span!("runner");
span.set_parent(parent_cx);

async move {
if let Err(err) = built
match built
.handle(
storage_manager,
secret_getter,
Expand All @@ -103,15 +108,27 @@ pub async fn task(
)
.await
{
start_crashed_cleanup(&id, err)
}
Ok(handle) => handle
.await
.expect("the call to run in built.handle to be done"),
Err(err) => start_crashed_cleanup(&id, err),
};

info!("deployment done");
}
.instrument(span)
.await
});
}

while let Some(res) = set.join_next().await {
match res {
Ok(_) => (),
Err(err) => {
error!(error = %err, "an error happened when joining a deployment run task")
}
}
}
}

#[instrument(skip(active_deployment_getter, runtime_manager))]
Expand Down Expand Up @@ -199,7 +216,7 @@ impl Built {
deployment_updater: impl DeploymentUpdater,
kill_old_deployments: impl futures::Future<Output = Result<()>>,
cleanup: impl FnOnce(Option<SubscribeStopResponse>) + Send + 'static,
) -> Result<()> {
) -> Result<JoinHandle<()>> {
// For alpha this is the path to the users project with an embedded runtime.
// For shuttle-next this is the path to the compiled .wasm file, which will be
// used in the load request.
Expand Down Expand Up @@ -244,7 +261,7 @@ impl Built {
)
.await?;

tokio::spawn(run(
let handler = tokio::spawn(run(
self.id,
self.service_name,
runtime_client,
Expand All @@ -253,7 +270,7 @@ impl Built {
cleanup,
));

Ok(())
Ok(handler)
}
}

Expand Down
3 changes: 2 additions & 1 deletion deployer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ pub async fn start(
runtime_manager: Arc<Mutex<RuntimeManager>>,
args: Args,
) {
let deployment_manager = DeploymentManager::builder()
// when _set is dropped once axum exits, the deployment tasks will be aborted.
let (_set, deployment_manager) = DeploymentManager::builder()
.build_log_recorder(persistence.clone())
.secret_recorder(persistence.clone())
.active_deployment_getter(persistence.clone())
Expand Down