Skip to content

Commit

Permalink
fix: cleanup ping sending logic
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed May 10, 2023
1 parent 1c943a8 commit 7896d37
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 27 deletions.
5 changes: 4 additions & 1 deletion src/hp/magicsock/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,10 @@ struct Actor {
impl Actor {
#[instrument(level = "error", skip_all, fields(self.name = %self.conn.name))]
async fn run(mut self) -> Result<()> {
let mut cleanup_timer = time::interval(DERP_CLEAN_STALE_INTERVAL);
let mut cleanup_timer = time::interval_at(
time::Instant::now() + DERP_CLEAN_STALE_INTERVAL,
DERP_CLEAN_STALE_INTERVAL,
);
let mut endpoint_heartbeat_timer = time::interval(HEARTBEAT_INTERVAL);
let mut endpoints_update_receiver = self.endpoints_update_state.running.subscribe();
let mut recvs = futures::stream::FuturesUnordered::new();
Expand Down
47 changes: 21 additions & 26 deletions src/hp/magicsock/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,35 +710,35 @@ impl Endpoint {
self.pending_cli_pings.clear();
}

/// Send a heartbeat to the peer to keep the connection alive.
fn last_ping(&self, addr: &SocketAddr) -> Option<Instant> {
self.endpoint_state.get(addr).and_then(|ep| ep.last_ping)
}

/// Send a heartbeat to the peer to keep the connection alive, or trigger a full ping
/// if necessary.
pub(super) async fn stayin_alive(&mut self) {
let now = Instant::now();
let (udp_addr, _derp_addr) = self.addr_for_send(&now);
let udp_addr = self.best_addr.as_ref().map(|a| a.addr);

// Send heartbeat ping to keep the current addr going as long as we need it.
if let Some(udp_addr) = udp_addr {
// Send heartbeat ping to keep the current addr going as long as we need it.

if let Some(ep_state) = self.endpoint_state.get(&udp_addr) {
let needs_ping = ep_state
.last_ping
.map(|l| now - l > Duration::from_secs(2))
.unwrap_or(true);
let elapsed = self.last_ping(&udp_addr).map(|l| now - l);
// Send a ping if the last ping is either older than 2 seconds or we don't have one.
let needs_ping = elapsed.map(|e| e >= Duration::from_secs(2)).unwrap_or(true);

if needs_ping {
debug!(
"needs ping {}: {:?} {:?}",
needs_ping,
ep_state.last_ping.map(|l| now - l),
now
);
self.start_ping(udp_addr, now, DiscoPingPurpose::StayinAlive)
.await;
}
if needs_ping {
debug!(
"stayin alive ping for {}: {:?} {:?}",
udp_addr, elapsed, now
);
self.start_ping(udp_addr, now, DiscoPingPurpose::StayinAlive)
.await;
return;
}
}

if udp_addr.is_none() || self.want_full_ping(&now) {
// If we do not have an optimal addr, send pings to all known places.
// If we do not have an optimal addr, send pings to all known places.
if self.want_full_ping(&now) {
debug!("send pings all");
self.send_pings(now, true).await;
}
Expand All @@ -754,11 +754,6 @@ impl Endpoint {
let now = Instant::now();
let (udp_addr, derp_addr) = self.addr_for_send(&now);

// Trigger a round of pings if we haven't had any full pings yet.
if self.last_full_ping.is_none() {
self.stayin_alive().await;
}

debug!(
"sending UDP: {}, DERP: {}",
udp_addr.is_some(),
Expand Down

0 comments on commit 7896d37

Please sign in to comment.