Skip to content

Commit

Permalink
feat: add a message repeater to the relay
Browse files Browse the repository at this point in the history
  • Loading branch information
jpraynaud committed Apr 16, 2024
1 parent 709ece1 commit 8a8d71c
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 0 deletions.
1 change: 1 addition & 0 deletions mithril-relay/src/lib.rs
Expand Up @@ -5,6 +5,7 @@ mod commands;
/// Peer to peer module
pub mod p2p;
mod relay;
mod repeater;

pub use commands::Args;
pub use commands::RelayCommands;
Expand Down
110 changes: 110 additions & 0 deletions mithril-relay/src/repeater.rs
@@ -0,0 +1,110 @@
use anyhow::anyhow;
use mithril_common::StdResult;
use slog_scope::debug;
use std::{fmt::Debug, sync::Arc, time::Duration};
use tokio::sync::{mpsc::UnboundedSender, Mutex};

/// A message repeater will send a message to a channel at a given delay
pub struct MessageRepeater<M: Clone + Debug + Sync + Send + 'static> {
message: Arc<Mutex<Option<M>>>,
tx_message: UnboundedSender<M>,
delay: Duration,
}

impl<M: Clone + Debug + Sync + Send + 'static> MessageRepeater<M> {
/// Factory for MessageRepeater
pub fn new(tx_message: UnboundedSender<M>, delay: Duration) -> Self {
Self {
message: Arc::new(Mutex::new(None)),
tx_message,
delay,
}
}

/// Set the message to repeat
pub async fn set_message(&self, message: M) {
debug!("MessageRepeater: set message"; "message" => format!("{:#?}", message));
*self.message.lock().await = Some(message);
}

/// Start repeating the message if any
pub async fn repeat_message(&self) -> StdResult<()> {
tokio::time::sleep(self.delay).await;
match self.message.lock().await.as_ref() {
Some(message) => {
debug!("MessageRepeater: repeat message"; "message" => format!("{:#?}", message));
self.tx_message
.send(message.clone())
.map_err(|e| anyhow!(e))?
}
None => {
debug!("MessageRepeater: no message to repeat");
}
}

Ok(())
}
}

#[cfg(test)]
mod tests {
use tokio::{sync::mpsc, time};

use super::*;

#[tokio::test]
async fn should_repeat_message_when_exists() {
let (tx, mut rx) = mpsc::unbounded_channel();
let delay = Duration::from_millis(100);
let repeater = MessageRepeater::new(tx, delay);

let message = "Hello, world!";
repeater.set_message(message.to_string()).await;
repeater.repeat_message().await.unwrap();

let received = rx.recv().await.unwrap();
assert_eq!(message, received);
}

#[tokio::test]
async fn should_repeat_message_when_exists_with_expected_delay() {
let (tx, _rx) = mpsc::unbounded_channel();
let delay = Duration::from_secs(1);
let repeater = MessageRepeater::new(tx, delay);

let message = "Hello, world!";
repeater.set_message(message.to_string()).await;

let result = tokio::select! {
_ = time::sleep(delay-Duration::from_millis(100)) => {Err(anyhow!("Timeout"))}
_ = repeater.repeat_message() => {Ok(())}
};

result.expect_err("should have timed out");
}

#[tokio::test]
async fn should_do_nothing_when_message_not_exists() {
let (tx, rx) = mpsc::unbounded_channel::<String>();
let delay = Duration::from_millis(100);
let repeater = MessageRepeater::new(tx, delay);

repeater.repeat_message().await.unwrap();

assert!(rx.is_empty());
}

#[tokio::test]
async fn should_do_nothing_when_message_not_exists_with_expected_delay() {
let (tx, _rx) = mpsc::unbounded_channel::<String>();
let delay = Duration::from_secs(1);
let repeater = MessageRepeater::new(tx, delay);

let result = tokio::select! {
_ = time::sleep(delay-Duration::from_millis(100)) => {Err(anyhow!("Timeout"))}
_ = repeater.repeat_message() => {Ok(())}
};

result.expect_err("should have timed out");
}
}

0 comments on commit 8a8d71c

Please sign in to comment.