Skip to content
This repository has been archived by the owner on Aug 1, 2022. It is now read-only.

Commit

Permalink
fix: tests wait until keystore is sealed
Browse files Browse the repository at this point in the history
We ensure that the tests wait until the proxy has been properly sealed
before continuing.

To make this work we also need to ensure that the HTTP server shuts down
properly by terminating any event streams.

Signed-off-by: Thomas Scholtes <geigerzaehler@axiom.fm>
  • Loading branch information
geigerzaehler committed Aug 11, 2021
1 parent 0c8de68 commit 4b17c2e
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 9 deletions.
34 changes: 32 additions & 2 deletions cypress/support/commands.ts
Expand Up @@ -5,15 +5,22 @@
// LICENSE file.

import * as proxy from "../../ui/src/proxy";
import { sleep } from "ui/src/sleep";

const proxyClient = new proxy.Client("http://127.0.0.1:17246");

export const resetProxyState = (): void => {
cy.then(() => proxyClient.control.reset());
cy.then(async () => {
await proxyClient.control.reset();
await waitSealed();
});
};

export const sealKeystore = (): void => {
cy.then(() => proxyClient.control.seal());
cy.then(async () => {
await proxyClient.control.seal();
await waitSealed();
});
};

export const restartAndUnlock = (): void => {
Expand Down Expand Up @@ -195,3 +202,26 @@ function getCurrentTestName() {
testTitles = testTitles.reverse();
return testTitles.join(" -- ");
}

// Wait until the proxy has been re-sealed or reset.
async function waitSealed() {
let remainingTries = 100;
for (;;) {
remainingTries -= 1;
if (remainingTries < 0) {
throw new Error("Waiting for unsealed timed out");
}

try {
await proxyClient.sessionGet();
await sleep(10);
} catch (err) {
if (
err instanceof proxy.ResponseError &&
[404, 403].includes(err.response.status)
) {
return;
}
}
}
}
3 changes: 3 additions & 0 deletions proxy/api/src/context.rs
Expand Up @@ -175,6 +175,8 @@ pub struct Unsealed {
pub auth_token: Arc<RwLock<Option<String>>>,
/// Reference to the key store.
pub keystore: Arc<dyn keystore::Keystore + Send + Sync>,
/// Notification to shutdown the HTTP server
pub shutdown: Arc<tokio::sync::Notify>,
}

/// Context for HTTP request if the coco peer APIs have not been initialized yet.
Expand Down Expand Up @@ -244,6 +246,7 @@ impl Unsealed {
service_handle: service::Handle::dummy(),
auth_token: Arc::new(RwLock::new(None)),
keystore: Arc::new(keystore::memory()),
shutdown: Arc::new(tokio::sync::Notify::new()),
},
run_handle,
))
Expand Down
20 changes: 15 additions & 5 deletions proxy/api/src/http/notification.rs
Expand Up @@ -33,7 +33,7 @@ pub fn local_peer_status_stream(

/// Notification handlers to serve event streams.
mod handler {
use futures::StreamExt as _;
use futures::{future::Either, StreamExt as _};
use warp::{sse, Rejection, Reply};

use crate::{
Expand All @@ -60,15 +60,25 @@ mod handler {
Notification::LocalPeer(event) => Some(sse::Event::default().json_data(event)),
}
};

let shutdown = ctx.shutdown.clone();

let stream = async_stream::stream! {
use tokio::sync::broadcast::error::RecvError;
loop {
match notifications.recv().await {
Ok(notification) => yield notification,
Err(RecvError::Lagged(_)) => {},
Err(RecvError::Closed) => break,
match futures::future::select(Box::pin(notifications.recv()), Box::pin(shutdown.notified())).await {
Either::Left((notification, _)) =>
match notification {
Ok(notification) => yield notification,
Err(RecvError::Lagged(_)) => {},
Err(RecvError::Closed) => break,

}
Either::Right(_) => {
break;
}
}

}
};

Expand Down
10 changes: 9 additions & 1 deletion proxy/api/src/process.rs
Expand Up @@ -155,6 +155,10 @@ async fn run_rigging(

let (peer_events_sender, _) = tokio::sync::broadcast::channel(32);
let server_ctx = ctx.clone();
let ctx_shutdown = match ctx {
context::Context::Sealed(_) => None,
context::Context::Unsealed(ref unsealed) => Some(unsealed.shutdown.clone()),
};

tracing::info!("starting API");
let api = http::api(server_ctx.clone(), peer_events_sender.clone());
Expand All @@ -167,6 +171,9 @@ async fn run_rigging(
Box::pin(restart_server_signal_rx),
)
.await;
if let Some(ctx_shutdown) = ctx_shutdown {
ctx_shutdown.notify_waiters()
}
}
})?;

Expand Down Expand Up @@ -240,9 +247,9 @@ async fn run_rigging(
futures::select! {
_ = tasks => {
let _ = restart_server_signal_tx.send(());
server.await?;
drop(peer_shutdown);
peer_run.await?;
server.await?;
Ok(())
}
result = server => {
Expand Down Expand Up @@ -302,6 +309,7 @@ async fn rig(
service_handle: service_handle.clone(),
auth_token,
keystore: environment.keystore.clone(),
shutdown: Arc::new(tokio::sync::Notify::new()),
});

Ok(Rigging {
Expand Down
2 changes: 1 addition & 1 deletion proxy/api/src/service.rs
Expand Up @@ -199,7 +199,7 @@ impl Handle {
},
},
}
self.reload_notify.notify_one();
self.reload_notify.notify_waiters()
}

/// Create a handle where none of the methods have any effect.
Expand Down

0 comments on commit 4b17c2e

Please sign in to comment.