Skip to content

Commit

Permalink
add max_connections to settings
Browse files Browse the repository at this point in the history
  • Loading branch information
ecioppettini committed Mar 2, 2021
1 parent ddb9545 commit 4fcdc00
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 11 deletions.
10 changes: 10 additions & 0 deletions jormungandr-lib/src/interfaces/config/node.rs
Expand Up @@ -17,6 +17,9 @@ pub struct Rest {
/// Enables CORS if provided
#[serde(skip_serializing_if = "Option::is_none")]
pub cors: Option<Cors>,
/// Change notifier api settings if provided
#[serde(skip_serializing_if = "Option::is_none")]
pub notifier: Option<Notifier>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
Expand Down Expand Up @@ -86,6 +89,13 @@ impl AsRef<str> for CorsOrigin {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Notifier {
/// Limit on the number of simultaneous connections.
/// If not specified, an internal default limit is used.
pub max_connections: Option<usize>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct P2p {
/// The public address to which other peers may connect to
Expand Down
8 changes: 7 additions & 1 deletion jormungandr/src/main.rs
Expand Up @@ -129,7 +129,13 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
let event_notifier = {
let (msgbox, queue) = async_msg::channel(NOTIFIER_TASK_QUEUE_LEN);

let mut notifier = notifier::Notifier::new(None);
let max_connections = bootstrapped_node
.settings
.rest
.and_then(|settings| settings.notifier)
.and_then(|settings| settings.max_connections);

let mut notifier = notifier::Notifier::new(max_connections);

let context = notifier.clone();

Expand Down
35 changes: 25 additions & 10 deletions jormungandr/src/notifier/mod.rs
Expand Up @@ -2,29 +2,35 @@ use crate::utils::async_msg::{channel, MessageBox, MessageQueue};
use crate::utils::task::TokioServiceInfo;
use chain_impl_mockchain::header::HeaderId;
use futures::{SinkExt, StreamExt};
use slog::Logger;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use thiserror::Error;

#[derive(Clone)]
pub struct Notifier {
next_user_id: Arc<AtomicUsize>,
clients: Arc<tokio::sync::Mutex<Clients>>,
max_connections: u32,
max_connections: usize,
}

// TODO: move to intercom?
pub enum Message {
NewBlock(HeaderId),
NewTip(HeaderId),
}

type Clients = std::collections::HashMap<usize, warp::ws::WebSocket>;

const MAX_CONNECTIONS_DEFAULT: u32 = 3;
// TODO: define this in Settings
const MAX_CONNECTIONS_DEFAULT: usize = 255;

// error codes in 4000-4999 are reserved for private use.
// I couldn't find an error code for max connections
const MAX_CONNECTIONS_ERROR_CLOSE_CODE: u16 = 4000;
const MAX_CONNECTIONS_ERROR_REASON: &str = "MAX CONNECTIONS reached";

impl Notifier {
pub fn new(max_connections: Option<u32>) -> Notifier {
pub fn new(max_connections: Option<usize>) -> Notifier {
Notifier {
next_user_id: Arc::new(AtomicUsize::new(1)),
clients: Default::default(),
Expand All @@ -35,6 +41,7 @@ impl Notifier {
pub async fn start(&mut self, info: TokioServiceInfo, queue: MessageQueue<Message>) {
let clients1 = self.clients.clone();
let clients2 = self.clients.clone();
let logger = info.logger();

// TODO: what limit should I put in there?
let (deleted_msgbox, deleted_queue) = channel::<usize>(32);
Expand All @@ -48,7 +55,12 @@ impl Notifier {
.for_each(|input| {
info.spawn(
"notifier send new messages",
process_message(clients1.clone(), input, deleted_msgbox.clone()),
process_message(
logger.clone(),
clients1.clone(),
input,
deleted_msgbox.clone(),
),
);
futures::future::ready(())
})
Expand All @@ -74,7 +86,7 @@ impl Notifier {
if let Some(mut ws) = rejected {
let close_msg = warp::ws::Message::close_with(
MAX_CONNECTIONS_ERROR_CLOSE_CODE,
"MAX CONNECTIONS reached",
MAX_CONNECTIONS_ERROR_REASON,
);
if ws.send(close_msg).await.is_ok() {
let _ = ws.close().await;
Expand All @@ -84,6 +96,7 @@ impl Notifier {
}

async fn process_message(
logger: Logger,
clients: Arc<tokio::sync::Mutex<Clients>>,
msg: Message,
mut disconnected: MessageBox<usize>,
Expand All @@ -96,10 +109,12 @@ async fn process_message(
let dead = async move { notify_all(clients, warp_msg).await };

for id in dead.await {
disconnected
.send(id)
.await
.expect("couldn't push disconnected client id");
disconnected.send(id).await.unwrap_or_else(|err| {
error!(
logger,
"notifier error when adding id to disconnected: {}", err
);
});
}
}

Expand Down
1 change: 1 addition & 0 deletions jormungandr/src/settings/start/mod.rs
Expand Up @@ -128,6 +128,7 @@ impl RawSettings {
listen: cmd_listen,
tls: None,
cors: None,
notifier: None,
}),
(None, None) => None,
}
Expand Down
Expand Up @@ -313,6 +313,7 @@ impl Prepare for Rest {
listen: context.generate_new_rest_listen_address(),
tls: None,
cors: None,
notifier: None,
}
}
}
Expand Down
Expand Up @@ -90,6 +90,7 @@ impl LegacyNodeConfigConverter {
listen: source.rest.listen,
cors: None,
tls: None,
notifier: None,
},
p2p: P2p {
trusted_peers,
Expand Down Expand Up @@ -145,6 +146,7 @@ impl LegacyNodeConfigConverter {
listen: source.rest.listen,
cors: None,
tls: None,
notifier: None,
},
p2p: P2p {
trusted_peers,
Expand Down
Expand Up @@ -48,6 +48,7 @@ impl NodeConfigBuilder {
.unwrap(),
tls: None,
cors: None,
notifier: None,
},
p2p: P2p {
trusted_peers: vec![],
Expand Down

0 comments on commit 4fcdc00

Please sign in to comment.