From 0dfc333345740028f9c3ca4e7fd9d6537e28220f Mon Sep 17 00:00:00 2001 From: Merle Breitkreuz Date: Wed, 11 Nov 2020 14:11:08 +0100 Subject: [PATCH] fix(proxy): do not drop event tasks (#1217) --- proxy/api/src/process.rs | 74 +++++++++++++++++-------------- proxy/coco/src/spawn_abortable.rs | 1 + 2 files changed, 41 insertions(+), 34 deletions(-) diff --git a/proxy/api/src/process.rs b/proxy/api/src/process.rs index c32ba824e8..c438cc8715 100644 --- a/proxy/api/src/process.rs +++ b/proxy/api/src/process.rs @@ -1,6 +1,6 @@ //! Provides [`run`] to run the proxy process. use futures::prelude::*; -use std::{sync::Arc, time::Duration}; +use std::{future::Future, sync::Arc, time::Duration}; use thiserror::Error; use tokio::{ signal::unix::{signal, SignalKind}, @@ -79,7 +79,7 @@ pub async fn run(args: Args) -> Result<(), Box> { } } -/// Error running either the peer or the API. +/// Error running either the peer, the event tasks or the API. #[derive(Debug, Error)] enum RunError { /// The peer errored @@ -89,6 +89,10 @@ enum RunError { /// Warp errored #[error(transparent)] Warp(#[from] warp::Error), + + /// Event task aborted + #[error(transparent)] + SpawnAbortable(#[from] coco::SpawnAbortableError), } /// Run the API and peer. @@ -110,38 +114,13 @@ async fn run_rigging( seeds_sender, } = rigging; - if let Some(seeds_sender) = seeds_sender { - let seeds_store = ctx.store().clone(); - coco::SpawnAbortable::new(async move { - let mut last_seeds: Vec = vec![]; - let mut timer = tokio::time::interval(Duration::from_secs(1)); - - loop { - let _timestamp = timer.tick().await; - - let seeds = session_seeds(&seeds_store) - .await - .expect("Failed to read session store"); - - if seeds == last_seeds { - continue; - } - - if seeds_sender.broadcast(seeds.clone()).is_err() { - break; - } - - last_seeds = seeds; - } - }); - } - let subscriptions = notification::Subscriptions::default(); let peer_subscriptions = subscriptions.clone(); + let server_ctx = ctx.clone(); let server = async move { log::info!("starting API"); - let api = http::api(ctx, subscriptions.clone()); + let api = http::api(server_ctx, subscriptions.clone()); let (_, server) = warp::serve(api).try_bind_with_graceful_shutdown( ([127, 0, 0, 1], PORT), async move { @@ -155,7 +134,8 @@ async fn run_rigging( }; if let Some(peer) = peer { - coco::SpawnAbortable::new({ + let mut tasks = vec![server.boxed()]; + let peer_event_task = coco::SpawnAbortable::new({ let mut peer_events = peer.subscribe(); async move { @@ -171,15 +151,41 @@ async fn run_rigging( } } }); + tasks.push(peer_event_task.map_err(RunError::from).boxed()); let peer = async move { log::info!("starting peer"); peer.into_running().await }; + tasks.push(peer.map_err(RunError::from).boxed()); - let result = tokio::select! { - server_status = server => server_status, - peer_status = peer => Ok(peer_status?), - }; + if let Some(seeds_sender) = seeds_sender { + let seeds_store = ctx.store().clone(); + let seeds_event_task = coco::SpawnAbortable::new(async move { + let mut last_seeds: Vec = vec![]; + let mut timer = tokio::time::interval(Duration::from_secs(1)); + + loop { + let _timestamp = timer.tick().await; + + let seeds = session_seeds(&seeds_store) + .await + .expect("Failed to read session store"); + + if seeds == last_seeds { + continue; + } + + if seeds_sender.broadcast(seeds.clone()).is_err() { + break; + } + + last_seeds = seeds; + } + }); + tasks.push(seeds_event_task.map_err(RunError::from).boxed()); + } + + let (result, _, _) = futures::future::select_all(tasks).await; result } else { server.await diff --git a/proxy/coco/src/spawn_abortable.rs b/proxy/coco/src/spawn_abortable.rs index 6f67fe1968..c525a577f8 100644 --- a/proxy/coco/src/spawn_abortable.rs +++ b/proxy/coco/src/spawn_abortable.rs @@ -26,6 +26,7 @@ pub enum Error { /// Stop-gap until we can abort [`JoinHandle`]s directly: /// tokio-rs@cbb14a7bb9a13363e1abee8caff2bad1f996c263 #[allow(clippy::missing_docs_in_private_items)] +#[must_use = "keep the task running"] pub struct SpawnAbortable { join_handle: JoinHandle>, abort_handle: future::AbortHandle,