Skip to content

Commit

Permalink
add notifier test submodule
Browse files Browse the repository at this point in the history
  • Loading branch information
ecioppettini committed Mar 2, 2021
1 parent 98fabd5 commit c52ea15
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 1 deletion.
@@ -1,4 +1,5 @@
mod configuration_builder;
pub mod notifier;
pub mod process;
pub mod starter;
use chain_core::property::Fragment as _;
Expand Down
@@ -0,0 +1,85 @@
use jormungandr_lib::crypto::hash::Hash;
use serde::Deserialize;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use thiserror::Error;
use tungstenite::connect;
use url::Url;

#[derive(Debug, Error)]
pub enum NotifierError {
#[error("could not deserialize response")]
CannotDeserialize(#[from] serde_json::Error),
#[error("could not send reqeuest")]
RequestError(#[from] reqwest::Error),
#[error("hash parse error")]
HashParseError(#[from] chain_crypto::hash::Error),
}

pub fn uri_from_socket_addr(addr: SocketAddr) -> Url {
Url::parse(&format!("ws://{}/notifier", addr)).unwrap()
}

/// Specialized rest api
#[derive(Debug)]
pub struct JormungandrNotifier {
url: Url,
finished: Arc<RwLock<bool>>,
handles: Vec<JoinHandle<()>>,
}

// TODO: maybe this can be shared with the type in jormungandr (that only implements Serialize)
#[derive(Deserialize, Debug)]
pub enum JsonMessage {
NewBlock(Hash),
NewTip(Hash),
}

impl JormungandrNotifier {
pub fn new(url: Url) -> Self {
JormungandrNotifier {
url,
finished: Arc::new(RwLock::new(false)),
handles: Default::default(),
}
}

pub fn new_client<F>(&mut self, mut for_each: F) -> Result<(), ()>
where
F: FnMut(JsonMessage) -> () + Send + 'static,
{
let url = self.url.clone();
let (mut socket, _response) = connect(url).expect("Can't connect to notifier websocket");

// TODO: handle error?

let finished = Arc::clone(&self.finished);

let join = std::thread::spawn(move || loop {
if *finished.read().unwrap() {
break;
}

let msg = socket.read_message().expect("Error reading message");

let json_msg = serde_json::from_str(msg.to_text().expect("message is not text"))
.expect("Deserialization failed");

for_each(json_msg);
});

self.handles.push(join);

Ok(())
}
}

impl Drop for JormungandrNotifier {
fn drop(&mut self) {
*self.finished.write().unwrap() = true;
for handle in self.handles.drain(..) {
handle.join().expect("failed to join thread");
}
}
}
@@ -1,4 +1,4 @@
use super::JormungandrError;
use super::{notifier, JormungandrError};
use crate::common::jcli::JCli;
use assert_fs::TempDir;
use chain_impl_mockchain::fee::LinearFee;
Expand Down Expand Up @@ -66,6 +66,10 @@ impl JormungandrProcess {
JormungandrRest::new_with_cert(self.rest_uri(), cert)
}

pub fn notifier(&self) -> notifier::JormungandrNotifier {
notifier::JormungandrNotifier::new(self.notifier_url())
}

pub fn shutdown(&self) {
let jcli: JCli = Default::default();
jcli.rest().v0().shutdown(self.rest_uri());
Expand Down Expand Up @@ -108,6 +112,10 @@ impl JormungandrProcess {
uri_from_socket_addr(self.rest_socket_addr)
}

pub fn notifier_url(&self) -> url::Url {
notifier::uri_from_socket_addr(self.rest_socket_addr)
}

pub fn fees(&self) -> LinearFee {
self.fees
}
Expand Down
Expand Up @@ -4,6 +4,7 @@ pub mod fragments;
pub mod genesis;
pub mod grpc;
pub mod legacy;
pub mod notifier;
pub mod recovery;
pub mod tls;
pub mod transactions;
Expand Down
@@ -0,0 +1,45 @@
use crate::common::{
jormungandr::{notifier::JsonMessage, ConfigurationBuilder},
startup,
};

use jormungandr_lib::interfaces::ActiveSlotCoefficient;
use std::sync::{Arc, Condvar, Mutex};

#[test]
pub fn notifier_shows_the_same_tip_as_rest() {
let faucet = startup::create_new_account_address();

let mut config = ConfigurationBuilder::new();
config.with_consensus_genesis_praos_active_slot_coeff(ActiveSlotCoefficient::MAXIMUM);

let (jormungandr, _) = startup::start_stake_pool(&[faucet], &[], &mut config).unwrap();

let rest = jormungandr.rest();

let mut notifier = jormungandr.notifier();

#[allow(clippy::mutex_atomic)]
let pair = Arc::new((Mutex::new(0usize), Condvar::new()));
let pair2 = pair.clone();

notifier
.new_client(move |msg| {
if let JsonMessage::NewTip(hash) = msg {
let rest_tip = rest.tip().expect("couldn't get tip from rest");
assert_eq!(hash, rest_tip);

let (lock, cvar) = &*pair2;
let mut done = lock.lock().unwrap();
*done += 1;
cvar.notify_one();
};
})
.expect("couldn't connect client");

let (lock, cvar) = &*pair;
let mut done = lock.lock().unwrap();
while !*done < 5 {
done = cvar.wait(done).unwrap();
}
}

0 comments on commit c52ea15

Please sign in to comment.