Skip to content
This repository has been archived by the owner on Aug 1, 2022. It is now read-only.

Commit

Permalink
fix: update radicle-link to properly shutdown
Browse files Browse the repository at this point in the history
Update radicle-link to include
radicle-dev/radicle-link#731.

Fixes #2133

Co-Authored-By: Thomas Scholtes <geigerzaehler@axiom.fm>
Signed-off-by: Thomas Scholtes <geigerzaehler@axiom.fm>
  • Loading branch information
rudolfs and geigerzaehler committed Aug 2, 2021
1 parent c5ffde9 commit 06bd982
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 28 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions proxy/api/Cargo.toml
Expand Up @@ -46,15 +46,15 @@ warp = { version = "0.3", default-features = false }

[dependencies.radicle-daemon]
git = "https://github.com/radicle-dev/radicle-link.git"
rev = "2fe2f9198ae1df5030917ae6b4e7614a2c075437"
rev = "87b3a077f9c821a1c21c8b79b98092239a373bde"

[dependencies.radicle-git-ext]
git = "https://github.com/radicle-dev/radicle-link.git"
rev = "2fe2f9198ae1df5030917ae6b4e7614a2c075437"
rev = "87b3a077f9c821a1c21c8b79b98092239a373bde"

[dependencies.radicle-git-helpers]
git = "https://github.com/radicle-dev/radicle-link.git"
rev = "2fe2f9198ae1df5030917ae6b4e7614a2c075437"
rev = "87b3a077f9c821a1c21c8b79b98092239a373bde"

[dependencies.radicle-avatar]
git = "https://github.com/radicle-dev/radicle-avatar.git"
Expand Down
7 changes: 6 additions & 1 deletion proxy/api/src/context.rs
Expand Up @@ -221,9 +221,14 @@ impl Unsealed {

let peer_control = coco_peer.control();
let run_handle = async move {
if let Err(err) = coco_peer.run().await {
let (shutdown, run) = coco_peer.start();
// We spawn a task for `run` so that it is run to completion even if this future is
// dropped.
let run = tokio::task::spawn(run);
if let Err(err) = run.await {
tracing::error!(?err, "peer run error");
}
drop(shutdown);
};
(peer_control, peer, run_handle)
};
Expand Down
65 changes: 48 additions & 17 deletions proxy/api/src/process.rs
Expand Up @@ -135,12 +135,18 @@ enum RunError {
/// # Errors
///
/// Errors when either the peer or the API error.
#[allow(clippy::too_many_lines)]
async fn run_rigging(
rigging: Rigging,
restart_signal: impl Future<Output = ()> + Send + 'static,
restart_signal: impl Future<Output = ()> + Send + Sync + 'static,
) -> Result<(), RunError> {
// Required for `tokio::select`. We can’t put it on the element directly, though.
#![allow(clippy::unreachable)]
#![allow(clippy::mut_mut)]

let restart_signal = restart_signal.shared();

let (restart_server_signal_tx, restart_server_signal_rx) = tokio::sync::oneshot::channel();

let Rigging {
ctx,
peer,
Expand All @@ -153,14 +159,21 @@ async fn run_rigging(
tracing::info!("starting API");
let api = http::api(server_ctx.clone(), peer_events_sender.clone());
let (_, server) =
warp::serve(api).try_bind_with_graceful_shutdown(server_ctx.http_listen(), async move {
restart_signal.await;
warp::serve(api).try_bind_with_graceful_shutdown(server_ctx.http_listen(), {
let restart_signal = restart_signal.clone();
async move {
futures::future::select(
Box::pin(restart_signal),
Box::pin(restart_server_signal_rx),
)
.await;
}
})?;

let server = server.map(Ok);

if let Some(peer) = peer {
let mut tasks = vec![server.boxed()];
let mut tasks = vec![restart_signal.shared().boxed()];

if let Some(seeds_sender) = seeds_sender {
let mut peer_control = peer.control();
Expand Down Expand Up @@ -191,7 +204,7 @@ async fn run_rigging(
last_seeds = seeds;
}
};
tasks.push(seeds_event_task.map(Ok).boxed());
tasks.push(seeds_event_task.boxed());
}
let peer_event_task = {
let mut peer_events = peer.subscribe();
Expand All @@ -214,17 +227,35 @@ async fn run_rigging(
}
}
};
tasks.push(peer_event_task.map(Ok).boxed());

let peer = async move {
tracing::info!("starting peer");
peer.run().await
};

tasks.push(peer.map_err(RunError::from).boxed());

let (result, _, _) = futures::future::select_all(tasks).await;
result
tasks.push(peer_event_task.boxed());

let mut tasks = futures::future::select_all(tasks).fuse();

tracing::info!("starting peer");
let (peer_shutdown, peer_run) = peer.start();

let peer_run = peer_run.fuse();
futures::pin_mut!(peer_run);
futures::pin_mut!(server);
futures::select! {
_ = tasks => {
let _ = restart_server_signal_tx.send(());
drop(peer_shutdown);
peer_run.await?;
server.await?;
Ok(())
}
result = server => {
drop(peer_shutdown);
peer_run.await?;
result
}
result = peer_run => {
let _ = restart_server_signal_tx.send(());
server.await?;
result.map_err(RunError::Peer)
}
}
} else {
server.await
}
Expand Down

0 comments on commit 06bd982

Please sign in to comment.