Skip to content

Commit

Permalink
add notifier max connections tests
Browse files Browse the repository at this point in the history
add also setting to configuration builder, in order to test it, and
improve the panic handling of the notifier client
  • Loading branch information
ecioppettini committed Mar 2, 2021
1 parent f20546f commit 6b1f5cc
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 23 deletions.
2 changes: 1 addition & 1 deletion jormungandr-lib/src/interfaces/config/mod.rs
Expand Up @@ -6,7 +6,7 @@ mod secret;
pub use log::{Log, LogEntry, LogOutput};
pub use mempool::{LogMaxEntries, Mempool, PoolMaxEntries};
pub use node::{
Cors, Explorer, LayersConfig, NodeConfig, P2p, Policy, PreferredListConfig, Rest, Tls,
Cors, Explorer, LayersConfig, NodeConfig, Notifier, P2p, Policy, PreferredListConfig, Rest, Tls,
TopicsOfInterest, TrustedPeer,
};
pub use secret::{Bft, GenesisPraos, NodeSecret};
5 changes: 5 additions & 0 deletions jormungandr-lib/src/interfaces/config/node.rs
Expand Up @@ -153,6 +153,11 @@ pub struct LayersConfig {
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct PreferredViewMax(usize);

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Notifier {
pub max_connections: Option<usize>,
}

impl Default for PreferredViewMax {
fn default() -> Self {
Self(DEFAULT_PREFERRED_VIEW_MAX)
Expand Down
Expand Up @@ -286,6 +286,12 @@ impl ConfigurationBuilder {
self
}

pub fn with_notifier_max_connections(&mut self, max_connections: usize) -> &mut Self {
self.node_config_builder
.with_notifier_max_connections(max_connections);
self
}

pub fn build(&self, temp_dir: &impl PathChild) -> JormungandrParams<NodeConfig> {
let mut node_config = self.node_config_builder.build();

Expand Down
Expand Up @@ -4,7 +4,7 @@ use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use thiserror::Error;
use tungstenite::connect;
use tungstenite::{connect, Message};
use url::Url;

#[derive(Debug, Error)]
Expand Down Expand Up @@ -36,6 +36,11 @@ pub enum JsonMessage {
NewTip(Hash),
}

pub enum NotifierMessage {
JsonMessage(JsonMessage),
MaxConnectionsReached,
}

impl JormungandrNotifier {
pub fn new(url: Url) -> Self {
JormungandrNotifier {
Expand All @@ -47,7 +52,7 @@ impl JormungandrNotifier {

pub fn new_client<F>(&mut self, mut for_each: F) -> Result<(), ()>
where
F: FnMut(JsonMessage) -> () + Send + 'static,
F: FnMut(NotifierMessage) -> bool + Send + 'static,
{
let url = self.url.clone();
let (mut socket, _response) = connect(url).expect("Can't connect to notifier websocket");
Expand All @@ -63,23 +68,47 @@ impl JormungandrNotifier {

let msg = socket.read_message().expect("Error reading message");

let json_msg = serde_json::from_str(msg.to_text().expect("message is not text"))
.expect("Deserialization failed");
match msg {
Message::Text(text) => {
let json_msg: JsonMessage =
serde_json::from_str(&text).expect("Deserialization failed");

if !for_each(NotifierMessage::JsonMessage(json_msg)) {
break;
}
}
Message::Close(close_frame) => {
if let tungstenite::protocol::frame::coding::CloseCode::Library(4000) =
close_frame.expect("no close code").code
{
for_each(NotifierMessage::MaxConnectionsReached);
}

for_each(json_msg);
break;
}
_ => unreachable!("unexpected notifier message"),
}
});

self.handles.push(join);

Ok(())
}

pub fn wait_all(&mut self) -> std::thread::Result<()> {
for handle in self.handles.drain(..) {
handle.join()?;
}

Ok(())
}
}

impl Drop for JormungandrNotifier {
fn drop(&mut self) {
*self.finished.write().unwrap() = true;
for handle in self.handles.drain(..) {
handle.join().expect("failed to join thread");
let _ = handle.join();
}
}
}
@@ -1,10 +1,16 @@
use crate::common::{
jormungandr::{notifier::JsonMessage, ConfigurationBuilder},
jormungandr::{
notifier::{JsonMessage, NotifierMessage},
ConfigurationBuilder,
},
startup,
};

use jormungandr_lib::interfaces::ActiveSlotCoefficient;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
};

#[test]
pub fn notifier_shows_the_same_tip_as_rest() {
Expand All @@ -19,27 +25,61 @@ pub fn notifier_shows_the_same_tip_as_rest() {

let mut notifier = jormungandr.notifier();

#[allow(clippy::mutex_atomic)]
let pair = Arc::new((Mutex::new(0usize), Condvar::new()));
let pair2 = pair.clone();
let mut counter = Arc::new(AtomicUsize::new(0));

notifier
.new_client(move |msg| {
if let JsonMessage::NewTip(hash) = msg {
if let NotifierMessage::JsonMessage(JsonMessage::NewTip(hash)) = msg {
let rest_tip = rest.tip().expect("couldn't get tip from rest");
assert_eq!(hash, rest_tip);

let (lock, cvar) = &*pair2;
let mut done = lock.lock().unwrap();
*done += 1;
cvar.notify_one();
let count = Arc::get_mut(&mut counter).unwrap();

let count = count.fetch_add(1, Ordering::AcqRel);

if count == 5 {
return false;
}
};
true
})
.expect("couldn't connect client");

notifier.wait_all().unwrap();
}

#[test]
pub fn notifier_fails_with_more_than_max_connections() {
let faucet = startup::create_new_account_address();

let mut config = ConfigurationBuilder::new();
config
.with_consensus_genesis_praos_active_slot_coeff(ActiveSlotCoefficient::MAXIMUM)
.with_notifier_max_connections(1);

let (jormungandr, _) = startup::start_stake_pool(&[faucet], &[], &mut config).unwrap();

let mut notifier = jormungandr.notifier();

let waiting = Arc::new(AtomicBool::new(true));
let waiting1 = Arc::clone(&waiting);

notifier
.new_client(move |_msg| waiting1.load(Ordering::Acquire))
.expect("couldn't connect client");

notifier
.new_client(move |msg| {
match msg {
NotifierMessage::MaxConnectionsReached => {
waiting.store(false, Ordering::Release);
}
_ => unreachable!("shouldn't be able to connect"),
}

true
})
.expect("couldn't connect client");

let (lock, cvar) = &*pair;
let mut done = lock.lock().unwrap();
while !*done < 5 {
done = cvar.wait(done).unwrap();
}
notifier.wait_all().unwrap();
}
Expand Up @@ -4,7 +4,7 @@ use std::path::PathBuf;

use jormungandr_lib::{
interfaces::{
Explorer, Log, Mempool, NodeConfig, P2p, Policy, Rest, Tls, TopicsOfInterest, TrustedPeer,
Explorer, Log, Mempool, NodeConfig, Notifier, P2p, Policy, Rest, Tls, TopicsOfInterest, TrustedPeer,
},
time::Duration,
};
Expand Down Expand Up @@ -46,8 +46,11 @@ impl NodeConfigBuilder {
listen: format!("{}:{}", DEFAULT_HOST, rest_port.to_string())
.parse()
.unwrap(),
<<<<<<< HEAD:testing/jormungandr-testing-utils/src/testing/configuration/node_config_builder.rs
tls: None,
cors: None,
=======
>>>>>>> add notifier max connections tests:testing/jormungandr-integration-tests/src/common/configuration/node_config_builder.rs
notifier: None,
},
p2p: P2p {
Expand Down Expand Up @@ -118,6 +121,13 @@ impl NodeConfigBuilder {
self
}

pub fn with_notifier_max_connections(&mut self, max_connections: usize) -> &mut Self {
self.rest.notifier.replace(Notifier {
max_connections: Some(max_connections),
});
self
}

pub fn build(&self) -> NodeConfig {
NodeConfig {
storage: self.storage.clone(),
Expand Down

0 comments on commit 6b1f5cc

Please sign in to comment.