Skip to content

Commit

Permalink
fix(net): stop deleting endpoints we want to keep (#1567)
Browse files Browse the repository at this point in the history
also simplifies the general logic around endpoint management
  • Loading branch information
dignifiedquire committed Oct 3, 2023
1 parent 06b4ac7 commit 96cd106
Showing 1 changed file with 38 additions and 117 deletions.
155 changes: 38 additions & 117 deletions iroh-net/src/magicsock/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,30 +459,7 @@ impl Endpoint {
return msgs;
}
self.last_full_ping.replace(now);

// first cleanout out all old endpoints
self.endpoint_state.retain(|ep, st| {
if st.should_delete() {
// Inlined delete_endpoint
if self
.best_addr
.as_ref()
.map(|a| ep == &a.addr)
.unwrap_or_default()
{
// we no longer rely on a direct connection
if self.best_addr.is_some() {
inc!(MagicsockMetrics, num_direct_conns_removed);
if self.derp_region.is_some() {
inc!(MagicsockMetrics, num_relay_conns_added);
}
}
self.best_addr = None;
}
return false;
}
true
});
self.cleanup_endpoint_state();

let pings: Vec<_> = self
.endpoint_state
Expand Down Expand Up @@ -557,54 +534,13 @@ impl Endpoint {
self.derp_region = n.derp_region;
}

for st in self.endpoint_state.values_mut() {
st.index = Index::Deleted; // assume deleted until updated in next loop
}
for (i, ep) in n
.direct_addresses
.iter()
.take(u16::MAX as usize)
.enumerate()
{
let index = Index::Some(i);
let ep = SendAddr::Udp(*ep);
if let Some(st) = self.endpoint_state.get_mut(&ep) {
st.index = index
} else {
self.endpoint_state.insert(
ep,
EndpointState {
index,
..Default::default()
},
);
}
for addr in n.direct_addresses.iter() {
let addr = SendAddr::Udp(*addr);
self.endpoint_state.entry(addr).or_default();
}

// Now delete anything unless it's still in the network map or was a recently discovered endpoint.
self.endpoint_state.retain(|ep, st| {
if st.should_delete() {
// Inlined delete_endpoint
if self
.best_addr
.as_ref()
.map(|a| ep == &a.addr)
.unwrap_or_default()
{
if self.best_addr.is_some() {
// we no long rely on a direct connection
inc!(MagicsockMetrics, num_direct_conns_removed);
if self.derp_region.is_some() {
// we only have a relay connection to the peer
inc!(MagicsockMetrics, num_relay_conns_added);
}
}
self.best_addr = None;
}
return false;
}
true
});
// Delete outdated endpoints
self.cleanup_endpoint_state();
}

/// Clears all the endpoint's p2p state, reverting it to a DERP-only endpoint.
Expand Down Expand Up @@ -667,29 +603,7 @@ impl Endpoint {
// If for some reason this gets very large, do some cleanup.
let size = self.endpoint_state.len();
if size > 100 {
self.endpoint_state.retain(|ep, st| {
if st.should_delete() {
// Inlined delete_endpoint
if self
.best_addr
.as_ref()
.map(|a| ep == &a.addr)
.unwrap_or_default()
{
// no longer relying on a direct connection, remove conn count
if self.best_addr.is_some() {
inc!(MagicsockMetrics, num_direct_conns_removed);
if self.derp_region.is_some() {
// we now rely on a relay connection, add a relay count
inc!(MagicsockMetrics, num_relay_conns_added);
}
}
self.best_addr = None;
}
return false;
}
true
});
self.cleanup_endpoint_state();
let size2 = self.endpoint_state.len();
info!(
"disco: addCandidateEndpoint pruned candidate set from {} to {} entries",
Expand All @@ -700,6 +614,26 @@ impl Endpoint {
false
}

fn cleanup_endpoint_state(&mut self) {
self.endpoint_state.retain(|ep, st| {
if st.should_delete() {
if let Some(best_addr) = self.best_addr.take() {
if *ep == best_addr.addr {
// no longer relying on a direct connection, remove conn count
inc!(MagicsockMetrics, num_direct_conns_removed);
if self.derp_region.is_some() {
// we now rely on a relay connection, add a relay count
inc!(MagicsockMetrics, num_relay_conns_added);
}
}
}
false
} else {
true
}
});
}

/// Called when connectivity changes enough that we should question our earlier
/// assumptions about which paths work.
pub(super) fn note_connectivity_change(&mut self) {
Expand Down Expand Up @@ -880,20 +814,20 @@ impl Endpoint {
// Delete any prior CallMeMaybe endpoints that weren't included in this message.
self.is_call_me_maybe_ep.retain(|ep, want| {
if !*want {
if self.best_addr.as_ref().map(|a| &a.addr) == Some(ep) {
if self.best_addr.is_some() {
if let Some(best_addr) = self.best_addr.take() {
if *ep == best_addr.addr {
// no longer relying on the direct connection
inc!(MagicsockMetrics, num_direct_conns_removed);
if self.derp_region.is_some() {
// we are now relying on the relay connection, add a relay conn
inc!(MagicsockMetrics, num_relay_conns_added);
}
}
self.best_addr = None;
}
return false;
false
} else {
true
}
true
});

// Zero out all the last_ping times to force send_pings to send new ones,
Expand Down Expand Up @@ -1087,6 +1021,7 @@ impl PeerMap {
/// Add the contact information for a peer.
pub fn add_peer_addr(&mut self, peer_addr: PeerAddr) {
let PeerAddr { peer_id, info } = peer_addr;

if self.endpoint_for_node_key(&peer_id).is_none() {
info!(%peer_id, "inserting peer's endpoint in PeerMap");
self.insert_endpoint(Options {
Expand Down Expand Up @@ -1275,9 +1210,6 @@ struct EndpointState {
recent_pongs: Vec<PongReply>,
/// Index into recentPongs of most recent; older before, wrapped
recent_pong: usize,

/// Index in nodecfg.Node.Endpoints; meaningless if last_got_ping non-zero.
index: Index,
}

/// The type of connection we have to the endpoint.
Expand Down Expand Up @@ -1312,13 +1244,6 @@ pub struct EndpointInfo {
pub latency: Option<Duration>,
}

#[derive(Default, Debug, PartialEq, Eq, Clone, Copy, Hash)]
enum Index {
#[default]
Deleted,
Some(usize),
}

impl EndpointState {
fn add_pong_reply(&mut self, r: PongReply) {
let n = self.recent_pongs.len();
Expand All @@ -1340,13 +1265,13 @@ impl EndpointState {
if self.call_me_maybe_time.is_some() {
return false;
}
if self.last_got_ping.is_none() {
// This was an endpoint from the network map. Is it still in the network map?
return self.index == Index::Deleted;
if let Some(last_got_ping) = self.last_got_ping {
// Receiving no pings anymore, probably gone
return last_got_ping.elapsed() > SESSION_ACTIVE_TIMEOUT;
}

// This was an endpoint discovered at runtime.
self.last_got_ping.as_ref().unwrap().elapsed() > SESSION_ACTIVE_TIMEOUT
// keep by default
false
}

/// Returns the most recent pong if available.
Expand Down Expand Up @@ -1446,7 +1371,6 @@ mod tests {
let endpoint_state = HashMap::from([(
SendAddr::Udp(socket_addr),
EndpointState {
index: Index::Some(0),
last_ping: None,
last_got_ping: None,
last_got_ping_tx_id: None,
Expand Down Expand Up @@ -1490,7 +1414,6 @@ mod tests {
let endpoint_state = HashMap::from([(
SendAddr::Derp(0),
EndpointState {
index: Index::Some(1),
last_ping: None,
last_got_ping: None,
last_got_ping_tx_id: None,
Expand Down Expand Up @@ -1554,7 +1477,6 @@ mod tests {
(
SendAddr::Udp(socket_addr),
EndpointState {
index: Index::Some(0),
last_ping: None,
last_got_ping: None,
last_got_ping_tx_id: None,
Expand All @@ -1571,7 +1493,6 @@ mod tests {
(
SendAddr::Derp(0),
EndpointState {
index: Index::Some(1),
last_ping: None,
last_got_ping: None,
last_got_ping_tx_id: None,
Expand Down

0 comments on commit 96cd106

Please sign in to comment.