Skip to content
This repository has been archived by the owner on Aug 1, 2022. It is now read-only.

Commit

Permalink
fix: update radicle-link to properly shutdown
Browse files Browse the repository at this point in the history
Update radicle-link to include radicle-dev/radicle-link#731.
Fixes #2133

Signed-off-by: Thomas Scholtes <geigerzaehler@axiom.fm>
  • Loading branch information
geigerzaehler committed Jul 21, 2021
1 parent 8750756 commit 5e5660b
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 63 deletions.
9 changes: 5 additions & 4 deletions .github/workflows/build.yaml
@@ -1,13 +1,12 @@
name: build
on:
push:
branches: [master]
branches: ['**']
pull_request:
jobs:
build-linux:
# If this was triggered by a pull request, only run it if it
# originates from a fork.
if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name != 'radicle-dev/radicle-upstream'
runs-on: ubuntu-latest
container:
image: "gcr.io/opensourcecoin/radicle-upstream:0.12.0"
Expand All @@ -19,16 +18,18 @@ jobs:
path: |
~/cache/yarn
~/cache/cypress
key: build-${{ runner.os }}-yarn-v1-${{ hashFiles('yarn.lock') }}
key: build-${{ runner.os }}-yarn-v2-${{ hashFiles('yarn.lock') }}
restore-keys: |
build-${{ runner.os }}-yarn-v1-
build-${{ runner.os }}-yarn-v2-
- name: Cache Rust
uses: actions/cache@v2
with:
path: |
~/cache/cargo
~/cache/proxy-target
key: build-${{ runner.os }}-rust-v1-${{ hashFiles('Cargo.lock') }}
restore-keys: |
build-${{ runner.os }}-rust-v1-
- run: ci/run.sh
- uses: actions/upload-artifact@v2
if: always()
Expand Down
62 changes: 31 additions & 31 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions proxy/api/Cargo.toml
Expand Up @@ -46,15 +46,15 @@ warp = { version = "0.3", default-features = false }

[dependencies.radicle-daemon]
git = "https://github.com/radicle-dev/radicle-link.git"
rev = "276df21c303653c2fb0faa4ecc423f9ca573600e"
branch = "thomas/daemon-cleanup"

[dependencies.radicle-git-ext]
git = "https://github.com/radicle-dev/radicle-link.git"
rev = "276df21c303653c2fb0faa4ecc423f9ca573600e"
branch = "thomas/daemon-cleanup"

[dependencies.radicle-git-helpers]
git = "https://github.com/radicle-dev/radicle-link.git"
rev = "276df21c303653c2fb0faa4ecc423f9ca573600e"
branch = "thomas/daemon-cleanup"

[dependencies.radicle-avatar]
git = "https://github.com/radicle-dev/radicle-avatar.git"
Expand Down
2 changes: 1 addition & 1 deletion proxy/api/src/context.rs
Expand Up @@ -221,7 +221,7 @@ impl Unsealed {

let peer_control = coco_peer.control();
let run_handle = async move {
if let Err(err) = coco_peer.run().await {
if let Err(err) = coco_peer.run(futures::future::pending()).await {
log::error!("peer run error: {:?}", err);
}
};
Expand Down
61 changes: 37 additions & 24 deletions proxy/api/src/process.rs
Expand Up @@ -137,10 +137,14 @@ enum RunError {
/// Errors when either the peer or the API error.
async fn run_rigging(
rigging: Rigging,
restart_signal: impl Future<Output = ()> + Send + 'static,
restart_signal: impl Future<Output = ()> + Send + Sync + 'static,
) -> Result<(), RunError> {
// Required for `tokio::select`. We can’t put it on the element directly, though.
#![allow(clippy::unreachable)]
let (restart_signal_tx, restart_signal_rx) = tokio::sync::oneshot::channel();
let restart_signal =
futures::future::select(Box::pin(restart_signal), Box::pin(restart_signal_rx))
.map(|_| {})
.shared();

let Rigging {
ctx,
peer,
Expand All @@ -151,23 +155,27 @@ async fn run_rigging(
let peer_subscriptions = subscriptions.clone();
let server_ctx = ctx.clone();

let server = async move {
log::info!("starting API");
let api = http::api(server_ctx.clone(), subscriptions.clone());
let (_, server) = warp::serve(api).try_bind_with_graceful_shutdown(
server_ctx.http_listen(),
async move {
restart_signal.await;
subscriptions.clear().await;
},
)?;

server.await;
Ok(())
let server = {
let restart_signal = restart_signal.clone();

async move {
log::info!("starting API");
let api = http::api(server_ctx.clone(), subscriptions.clone());
let (_, server) = warp::serve(api).try_bind_with_graceful_shutdown(
server_ctx.http_listen(),
async move {
restart_signal.await;
subscriptions.clear().await;
},
)?;

server.await;
Ok(())
}
};

if let Some(peer) = peer {
let mut tasks = vec![server.boxed()];
let mut tasks = vec![server.boxed(), restart_signal.clone().map(Ok).boxed()];

if let Some(seeds_sender) = seeds_sender {
let mut peer_control = peer.control();
Expand Down Expand Up @@ -223,15 +231,20 @@ async fn run_rigging(
};
tasks.push(peer_event_task.map(Ok).boxed());

let peer = async move {
log::info!("starting peer");
peer.run().await
};
log::info!("starting peer");
let peer_run = peer.run(restart_signal.clone());

tasks.push(peer.map_err(RunError::from).boxed());
let tasks = futures::future::select_all(tasks);

let (result, _, _) = futures::future::select_all(tasks).await;
result
futures::pin_mut!(peer_run);
match future::select(tasks, peer_run).await {
future::Either::Left(((result, _, _), peer_run)) => {
let _ = restart_signal_tx.send(());
peer_run.await?;
result
},
future::Either::Right((result, _tasks)) => result.map_err(RunError::Peer),
}
} else {
server.await
}
Expand Down

0 comments on commit 5e5660b

Please sign in to comment.