Skip to content

Commit

Permalink
add websocket warp handler
Browse files Browse the repository at this point in the history
  • Loading branch information
ecioppettini committed Mar 2, 2021
1 parent 78d1d6c commit 918004c
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 2 deletions.
1 change: 1 addition & 0 deletions jormungandr/src/rest/context.rs
Expand Up @@ -145,4 +145,5 @@ pub struct FullContext {
pub enclave: Enclave,
pub network_state: NetworkStateR,
pub explorer: Option<crate::explorer::Explorer>,
pub notifier: crate::notifier::Notifier,
}
7 changes: 5 additions & 2 deletions jormungandr/src/rest/mod.rs
Expand Up @@ -2,6 +2,7 @@

pub mod context;
pub mod explorer;
pub mod notifier;
pub mod v0;
mod v1;

Expand Down Expand Up @@ -32,11 +33,13 @@ pub async fn start_rest_server(config: Rest, explorer_enabled: bool, context: Co

let api =
warp::path!("api" / ..).and(v0::filter(context.clone()).or(v1::filter(context.clone())));

let notifier = notifier::filter(context.clone());
if explorer_enabled {
let explorer = explorer::filter(context);
setup_cors(api.or(explorer), config, stopper_rx).await;
setup_cors(api.or(notifier).or(explorer), config, stopper_rx).await;
} else {
setup_cors(api, config, stopper_rx).await;
setup_cors(api.or(notifier), config, stopper_rx).await;
}
}

Expand Down
43 changes: 43 additions & 0 deletions jormungandr/src/rest/notifier/mod.rs
@@ -0,0 +1,43 @@
use crate::rest::{context, ContextLock};
use thiserror::Error;
use warp::{reject::Reject, Filter, Rejection, Reply};

#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Context(#[from] context::Error),
}

impl Reject for Error {}

pub fn filter(
context: ContextLock,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
let with_context = warp::any().map(move || context.clone());

let notifier = warp::path!("notifier")
.and(warp::ws())
.and(with_context)
.and_then(handle_connection);

notifier.boxed()
}

async fn handle_connection(
ws: warp::ws::Ws,
context: ContextLock,
) -> Result<impl Reply, Rejection> {
let context = context.read().await;
let full_context = context
.try_full()
.map_err(Error::Context)
.map_err(warp::reject::custom)?;

let notifier: crate::notifier::Notifier = full_context.notifier.clone();

Ok(ws.on_upgrade(move |socket| add_connection(notifier, socket)))
}

async fn add_connection(notifier: crate::notifier::Notifier, socket: warp::ws::WebSocket) {
notifier.new_connection(socket).await;
}

0 comments on commit 918004c

Please sign in to comment.