Skip to content

Commit

Permalink
Kill all non-Terminated backends on drone start (#739)
Browse files Browse the repository at this point in the history
Fixes the following bug:
* drone process restarts
* drone executor's `apply_action` receives a `BackendAction::Terminate`
for a backend that's been running since before the restart
* the `self.backends` map does not contain the backend's manager anymore
because the process was restarted
* the drone assumes the backend has been terminated and the backend
continues running

Tested locally: running `docker ps` shows that all previously created
backends are terminated upon drone agent restart.

Creating a unit test for this might involve a side quest because
terminate currently hangs when the backend can't be found. This is a
pre-existing issue outside of the scope of this bug, and I think the
code here is sufficiently straightforward to merge without a dedicated
test.

# Demo
In separate terminals:
```shell
$ ./dev/postgres.sh && ./dev/controller.sh
$ ./dev/drone.sh
$ ./dev/proxy.sh
```

Spawn a bunch:
```shell
$ ./dev/cli.sh connect --cluster 'localhost:9090' --image 'ghcr.io/jamsocket/demo-image-drop-four'
$ ./dev/cli.sh connect --cluster 'localhost:9090' --image 'ghcr.io/jamsocket/demo-image-drop-four'
```

Check what's running in docker:
```shell
$ docker ps
```

Stop the drone process, then restart it, observe that backends get
killed:
```
$ ./dev/drone.sh 
...
INFO plane::drone::executor: Terminating preexisting backends backends=[(BackendName("21p3dk3zvlm4jl"), Ready { address: Some(BackendAddr(127.0.0.1:53635)) })...
```

Verify all containers have been killed in docker:
```shell
$ docker ps
```

This is idempotent: stopping and restarting the drone multiple times
works without issue.
  • Loading branch information
michaelsilver committed Jun 5, 2024
1 parent a07831a commit ec3e8c7
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 6 deletions.
96 changes: 91 additions & 5 deletions plane/src/drone/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use crate::{
drone::runtime::Runtime,
names::BackendName,
protocol::{BackendAction, BackendEventId, BackendStateMessage},
types::BackendState,
util::GuardHandle,
types::{BackendState, TerminationKind, TerminationReason},
util::{ExponentialBackoff, GuardHandle},
};
use anyhow::Result;
use chrono::Utc;
use dashmap::DashMap;
use futures_util::StreamExt;
use futures_util::{future::join_all, StreamExt};
use std::{
net::IpAddr,
sync::{Arc, Mutex},
Expand All @@ -24,8 +25,14 @@ pub struct Executor<R: Runtime> {
}

impl<R: Runtime> Executor<R> {
pub fn new(runtime: Arc<R>, state_store: StateStore, ip: IpAddr) -> Self {
pub async fn new(runtime: Arc<R>, state_store: StateStore, ip: IpAddr) -> Self {
let backends: Arc<DashMap<BackendName, Arc<BackendManager<R>>>> = Arc::default();
let state_store = Arc::new(Mutex::new(state_store));

#[allow(clippy::unwrap_used)]
Self::terminate_preexisting_backends(runtime.clone(), state_store.clone())
.await
.expect("Failed to terminate all preexisting backends! Locks may be violated, Drone aborting startup.");

let backend_event_listener = {
let docker = runtime.clone();
Expand Down Expand Up @@ -53,13 +60,92 @@ impl<R: Runtime> Executor<R> {

Self {
runtime,
state_store: Arc::new(Mutex::new(state_store)),
state_store,
backends,
ip,
_backend_event_listener: backend_event_listener,
}
}

// On restart, we want to terminate all existing backends and start fresh.
// This prevents bugs where an agent restart leaves the drone unable to
// terminate old backends.
async fn terminate_preexisting_backends(
runtime: Arc<R>,
state_store: Arc<Mutex<StateStore>>,
) -> Result<()> {
let backends = state_store
.lock()
.expect("State store lock poisoned.")
.active_backends()?;

if !backends.is_empty() {
tracing::info!(?backends, "Terminating preexisting backends");
}
let mut tasks = vec![];
for (backend_id, state) in backends {
let runtime = runtime.clone();
let state_store = state_store.clone();
let state = state.clone();
tasks.push(async move {
state_store
.lock()
.expect("State store lock poisoned.")
.register_event(
&backend_id,
&state.to_terminating(TerminationKind::Hard, TerminationReason::KeyExpired),
Utc::now(),
)
.unwrap_or_else(|_| {
panic!(
"Failed to register backend terminating for backend {:?}",
backend_id
)
});

let mut backoff = ExponentialBackoff::default();
let mut success = false;
for attempt in 1..=10 {
match runtime.terminate(&backend_id, true).await {
Ok(()) => {
success = true;
break;
}
Err(err) => {
tracing::warn!(
?err,
?backend_id,
?attempt,
"Attempt failed to terminate backend"
);
backoff.wait().await;
}
}
}
if !success {
tracing::warn!(
?backend_id,
"Failed to terminate backend after 10 attempts. Marking terminated anyways."
);
}
state_store
.lock()
.expect("State store lock poisoned.")
.register_event(&backend_id, &state.to_terminated(None), Utc::now())
.unwrap_or_else(|_| {
panic!(
"Failed to register backend termination for backend {:?}",
backend_id
)
});
});
}

join_all(tasks).await;

Ok(())
}

pub fn register_listener<F>(&self, listener: F) -> Result<()>
where
F: Fn(BackendStateMessage) + Send + Sync + 'static,
Expand Down
2 changes: 1 addition & 1 deletion plane/src/drone/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl Drone {
let state_store = StateStore::new(sqlite_connection)?;

let runtime = Arc::new(runtime);
let executor = Executor::new(runtime, state_store, config.ip);
let executor = Executor::new(runtime, state_store, config.ip).await;

let id = config.name.clone();
let drone_loop = tokio::spawn(drone_loop(id.clone(), connector, executor));
Expand Down
25 changes: 25 additions & 0 deletions plane/src/drone/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,31 @@ impl StateStore {

Ok(())
}

/// Retrieves a list of all backends that are not in a Terminated state.
pub fn active_backends(&self) -> Result<Vec<(BackendName, BackendState)>> {
let mut stmt = self.db_conn.prepare(
r#"
select "id", "state"
from "backend"
"#,
)?;

let mut rows = stmt.query([])?;
let mut active_backends = Vec::new();

while let Some(row) = rows.next()? {
let id: String = row.get(0)?;
let state_json: String = row.get(1)?;
let state: BackendState = serde_json::from_str(&state_json)?;

if !matches!(state, BackendState::Terminated { .. }) {
active_backends.push((BackendName::try_from(id)?, state));
}
}

Ok(active_backends)
}
}

#[cfg(test)]
Expand Down

0 comments on commit ec3e8c7

Please sign in to comment.