Skip to content

Commit

Permalink
Add peer heartbeat monitor (#157)
Browse files Browse the repository at this point in the history
The commit adds peer heartbeat monitoring functionality that
registers each heartbeat received and removes peers that stopped
missed all heartbeat opportunities in current monitoring period.

The heartbeat-based peer removal will take up to 2x of monitoring
period duration to detect a lost peer.
  • Loading branch information
mkamonMdt committed Jun 11, 2024
1 parent d8eee8a commit 92aeb8f
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 0 deletions.
18 changes: 18 additions & 0 deletions videocall-client/src/client/video_call_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,24 @@ impl VideoCallClient {
})
},
on_connection_lost: self.options.on_connection_lost.clone(),
peer_monitor: {
let inner = Rc::downgrade(&self.inner);
let on_connection_lost = self.options.on_connection_lost.clone();
Callback::from(move |_| {
if let Some(inner) = Weak::upgrade(&inner) {
match inner.try_borrow_mut() {
Ok(mut inner) => {
inner.peer_decode_manager.run_peer_monitor();
on_connection_lost.emit(());
},
Err(_) => {
error!("Unable to borrow inner -- not starting peer monitor");
}
}
}
})
},

};
info!(
"webtransport connect = {}",
Expand Down
7 changes: 7 additions & 0 deletions videocall-client/src/connection/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum Status {
pub struct Connection {
task: Rc<Task>,
heartbeat: Option<Interval>,
heartbeat_monitor: Option<Interval>,
status: Rc<Cell<Status>>,
aes: Rc<Aes128State>,
}
Expand Down Expand Up @@ -53,13 +54,16 @@ impl Connection {
Callback::from(move |_| status.set(Status::Closed)),
);
}
let monitor = options.peer_monitor.clone();
let mut connection = Self {
task: Rc::new(Task::connect(webtransport, options)?),
heartbeat: None,
heartbeat_monitor: Some(Interval::new(5000, move || {monitor.emit(());})),
status,
aes,
};
connection.start_heartbeat(userid);

Ok(connection)
}

Expand Down Expand Up @@ -96,6 +100,9 @@ impl Connection {
if let Some(heartbeat) = self.heartbeat.take() {
heartbeat.cancel();
}
if let Some(heartbeat_monitor) = self.heartbeat_monitor.take() {
heartbeat_monitor.cancel();
}
}

pub fn send_packet(&self, packet: PacketWrapper) {
Expand Down
1 change: 1 addition & 0 deletions videocall-client/src/connection/webmedia.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct ConnectOptions {
pub on_inbound_media: Callback<PacketWrapper>,
pub on_connected: Callback<()>,
pub on_connection_lost: Callback<()>,
pub peer_monitor: Callback<()>,
}

pub(super) trait WebMedia<TASK> {
Expand Down
20 changes: 20 additions & 0 deletions videocall-client/src/decode/hash_map_with_ordered_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,24 @@ impl<K: Ord + Hash + Clone, V> HashMapWithOrderedKeys<K, V> {
pub fn ordered_keys(&self) -> &Vec<K> {
&self.keys
}

pub fn remove_if<F>(&mut self, predicate: F)
where
F: Fn(&mut V) -> bool
{
let mut keys_to_remove = Vec::new();

for key in &self.keys{
if let Some(value) = self.map.get_mut(key) {
if !predicate(value){
keys_to_remove.push(key.clone());
}
}
}

for key in &keys_to_remove {
self.map.remove(&key);
self.keys.retain(|k| k != key);
}
}
}
24 changes: 24 additions & 0 deletions videocall-client/src/decode/peer_decode_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct Peer {
pub video_canvas_id: String,
pub screen_canvas_id: String,
pub aes: Option<Aes128State>,
heartbeat_count: u8,
}

impl Peer {
Expand All @@ -75,6 +76,7 @@ impl Peer {
video_canvas_id,
screen_canvas_id,
aes,
heartbeat_count: 1
}
}

Expand Down Expand Up @@ -152,6 +154,19 @@ impl Peer {
)),
}
}

fn on_heartbeat(&mut self) {
self.heartbeat_count += 1;
}

pub fn check_heartbeat(&mut self) -> bool {
if self.heartbeat_count != 0 {
self.heartbeat_count = 0;
return true;
}
debug!("---@@@--- detected heartbeat stop for {}", self.email.clone());
return false;
}
}

fn parse_media_packet(data: &[u8]) -> Result<Arc<MediaPacket>, PeerDecodeError> {
Expand Down Expand Up @@ -186,11 +201,20 @@ impl PeerDecodeManager {
self.connected_peers.get(key)
}

pub fn run_peer_monitor(&mut self) {
let pred = |peer: &mut Peer| peer.check_heartbeat();
self.connected_peers.remove_if(pred);
}

pub fn decode(&mut self, response: PacketWrapper) -> Result<(), PeerDecodeError> {
let packet = Arc::new(response);
let email = packet.email.clone();
if let Some(peer) = self.connected_peers.get_mut(&email) {
match peer.decode(&packet) {
Ok((MediaType::HEARTBEAT, _)) => {
peer.on_heartbeat();
Ok(())
},
Ok((media_type, decode_status)) => {
if decode_status.first_frame {
self.on_first_frame.emit((email.clone(), media_type));
Expand Down

0 comments on commit 92aeb8f

Please sign in to comment.