Skip to content
This repository has been archived by the owner on Aug 1, 2022. It is now read-only.

Commit

Permalink
fix(proxy): do not drop event tasks (#1217)
Browse files Browse the repository at this point in the history
  • Loading branch information
Merle Breitkreuz committed Nov 11, 2020
1 parent b850f31 commit 0dfc333
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 34 deletions.
74 changes: 40 additions & 34 deletions proxy/api/src/process.rs
@@ -1,6 +1,6 @@
//! Provides [`run`] to run the proxy process.
use futures::prelude::*;
use std::{sync::Arc, time::Duration};
use std::{future::Future, sync::Arc, time::Duration};
use thiserror::Error;
use tokio::{
signal::unix::{signal, SignalKind},
Expand Down Expand Up @@ -79,7 +79,7 @@ pub async fn run(args: Args) -> Result<(), Box<dyn std::error::Error>> {
}
}

/// Error running either the peer or the API.
/// Error running either the peer, the event tasks or the API.
#[derive(Debug, Error)]
enum RunError {
/// The peer errored
Expand All @@ -89,6 +89,10 @@ enum RunError {
/// Warp errored
#[error(transparent)]
Warp(#[from] warp::Error),

/// Event task aborted
#[error(transparent)]
SpawnAbortable(#[from] coco::SpawnAbortableError),
}

/// Run the API and peer.
Expand All @@ -110,38 +114,13 @@ async fn run_rigging(
seeds_sender,
} = rigging;

if let Some(seeds_sender) = seeds_sender {
let seeds_store = ctx.store().clone();
coco::SpawnAbortable::new(async move {
let mut last_seeds: Vec<seed::Seed> = vec![];
let mut timer = tokio::time::interval(Duration::from_secs(1));

loop {
let _timestamp = timer.tick().await;

let seeds = session_seeds(&seeds_store)
.await
.expect("Failed to read session store");

if seeds == last_seeds {
continue;
}

if seeds_sender.broadcast(seeds.clone()).is_err() {
break;
}

last_seeds = seeds;
}
});
}

let subscriptions = notification::Subscriptions::default();
let peer_subscriptions = subscriptions.clone();
let server_ctx = ctx.clone();

let server = async move {
log::info!("starting API");
let api = http::api(ctx, subscriptions.clone());
let api = http::api(server_ctx, subscriptions.clone());
let (_, server) = warp::serve(api).try_bind_with_graceful_shutdown(
([127, 0, 0, 1], PORT),
async move {
Expand All @@ -155,7 +134,8 @@ async fn run_rigging(
};

if let Some(peer) = peer {
coco::SpawnAbortable::new({
let mut tasks = vec![server.boxed()];
let peer_event_task = coco::SpawnAbortable::new({
let mut peer_events = peer.subscribe();

async move {
Expand All @@ -171,15 +151,41 @@ async fn run_rigging(
}
}
});
tasks.push(peer_event_task.map_err(RunError::from).boxed());
let peer = async move {
log::info!("starting peer");
peer.into_running().await
};
tasks.push(peer.map_err(RunError::from).boxed());

let result = tokio::select! {
server_status = server => server_status,
peer_status = peer => Ok(peer_status?),
};
if let Some(seeds_sender) = seeds_sender {
let seeds_store = ctx.store().clone();
let seeds_event_task = coco::SpawnAbortable::new(async move {
let mut last_seeds: Vec<seed::Seed> = vec![];
let mut timer = tokio::time::interval(Duration::from_secs(1));

loop {
let _timestamp = timer.tick().await;

let seeds = session_seeds(&seeds_store)
.await
.expect("Failed to read session store");

if seeds == last_seeds {
continue;
}

if seeds_sender.broadcast(seeds.clone()).is_err() {
break;
}

last_seeds = seeds;
}
});
tasks.push(seeds_event_task.map_err(RunError::from).boxed());
}

let (result, _, _) = futures::future::select_all(tasks).await;
result
} else {
server.await
Expand Down
1 change: 1 addition & 0 deletions proxy/coco/src/spawn_abortable.rs
Expand Up @@ -26,6 +26,7 @@ pub enum Error {
/// Stop-gap until we can abort [`JoinHandle`]s directly:
/// tokio-rs@cbb14a7bb9a13363e1abee8caff2bad1f996c263
#[allow(clippy::missing_docs_in_private_items)]
#[must_use = "keep the task running"]
pub struct SpawnAbortable<T> {
join_handle: JoinHandle<Result<T, future::Aborted>>,
abort_handle: future::AbortHandle,
Expand Down

0 comments on commit 0dfc333

Please sign in to comment.