From 54d5efcf72ab548209eda2f1ce8ecfaec3f73b7d Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Fri, 15 Dec 2023 20:51:10 +0100 Subject: [PATCH] fix: do not block on network change (#1885) There was a deadlock due to the usage of the inner actor message loop in handling network changes, which would trigger a deadlock. Additionally the following two fixes are included - clearing endpoint state when connectivity is changed - drop incoming messages when the magicsock is overloaded --- iroh-net/src/derp/client_conn.rs | 12 +- iroh-net/src/magicsock.rs | 289 ++++++++++++++++++-- iroh-net/src/magicsock/derp_actor.rs | 13 +- iroh-net/src/magicsock/peer_map/endpoint.rs | 11 + 4 files changed, 298 insertions(+), 27 deletions(-) diff --git a/iroh-net/src/derp/client_conn.rs b/iroh-net/src/derp/client_conn.rs index f5d4792335..b3973106bb 100644 --- a/iroh-net/src/derp/client_conn.rs +++ b/iroh-net/src/derp/client_conn.rs @@ -294,14 +294,14 @@ where _ = done.cancelled() => { trace!("cancelled"); // final flush - self.io.flush().await?; + self.io.flush().await.context("flush")?; return Ok(()); } read_res = self.io.next() => { trace!("handle read"); match read_res { Some(Ok(frame)) => { - self.handle_read(frame).await?; + self.handle_read(frame).await.context("handle_read")?; } Some(Err(err)) => { return Err(err); @@ -325,26 +325,26 @@ where packet = self.send_queue.recv() => { let packet = packet.context("Server.send_queue dropped")?; trace!("send packet"); - self.send_packet(packet).await?; + self.send_packet(packet).await.context("send packet")?; // TODO: stats // record `packet.enqueuedAt` } packet = self.disco_send_queue.recv() => { let packet = packet.context("Server.disco_send_queue dropped")?; trace!("send disco packet"); - self.send_packet(packet).await?; + self.send_packet(packet).await.context("send packet")?; // TODO: stats // record `packet.enqueuedAt` } _ = keep_alive.tick() => { trace!("keep alive"); - self.send_keep_alive().await?; + self.send_keep_alive().await.context("send keep alive")?; } } // TODO: golang batches as many writes as are in all the channels // & then flushes when there is no more work to be done at the moment. // refactor to get something similar - self.io.flush().await?; + self.io.flush().await.context("final flush")?; } } diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index e6c11b394d..0d4c8dea85 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -406,7 +406,7 @@ impl Inner { }); Poll::Ready(Err(err)) } else { - debug!( + trace!( node = %public_key.fmt_short(), transmit_count = %transmits.len(), packet_count = &transmits.iter().map(|t| t.segment_size.map(|ss| t.contents.len() / ss).unwrap_or(1)).sum::(), @@ -548,7 +548,7 @@ impl Inner { meta.len = 0; } Some((node_id, quic_mapped_addr)) => { - debug!(src = ?meta.addr, node = %node_id.fmt_short(), count = %quic_packets_count, len = meta.len, "UDP recv quic packets"); + trace!(src = ?meta.addr, node = %node_id.fmt_short(), count = %quic_packets_count, len = meta.len, "UDP recv quic packets"); quic_packets_total += quic_packets_count; meta.addr = quic_mapped_addr.0; } @@ -596,7 +596,7 @@ impl Inner { Ok(Err(err)) => return Poll::Ready(Err(err)), Ok(Ok((node_id, meta, bytes))) => { inc_by!(MagicsockMetrics, recv_data_derp, bytes.len() as _); - debug!(src = %meta.addr, node = %node_id.fmt_short(), count = meta.len / meta.stride, len = meta.len, "recv quic packets from derp"); + trace!(src = %meta.addr, node = %node_id.fmt_short(), count = meta.len / meta.stride, len = meta.len, "recv quic packets from derp"); buf_out[..bytes.len()].copy_from_slice(&bytes); *meta_out = meta; num_msgs += 1; @@ -1143,7 +1143,7 @@ impl MagicSock { let net_checker = netcheck::Client::new(Some(port_mapper.clone()))?; - let (actor_sender, actor_receiver) = mpsc::channel(128); + let (actor_sender, actor_receiver) = mpsc::channel(256); let (derp_actor_sender, derp_actor_receiver) = mpsc::channel(256); let (udp_disco_sender, mut udp_disco_receiver) = mpsc::channel(256); @@ -1403,6 +1403,15 @@ impl MagicSock { .await .ok(); } + + #[cfg(test)] + async fn force_network_change(&self, is_major: bool) { + self.inner + .actor_sender + .send(ActorMessage::ForceNetworkChange(is_major)) + .await + .ok(); + } } #[derive(Debug, Default)] @@ -1530,6 +1539,8 @@ enum ActorMessage { EndpointPingExpired(usize, stun::TransactionId), NetcheckReport(Result>>, &'static str), NetworkChange, + #[cfg(test)] + ForceNetworkChange(bool), } struct Actor { @@ -1632,6 +1643,7 @@ impl Actor { } } _ = save_nodes_timer.tick(), if self.nodes_path.is_some() => { + trace!("tick: nodes_timer"); let path = self.nodes_path.as_ref().expect("precondition: `is_some()`"); self.inner.node_map.prune_inactive(); @@ -1641,6 +1653,7 @@ impl Actor { } } Some(is_major) = link_change_r.recv() => { + trace!("tick: link change {}", is_major); self.handle_network_change(is_major).await; } else => { @@ -1651,20 +1664,13 @@ impl Actor { } async fn handle_network_change(&mut self, is_major: bool) { - info!("link change detected: major? {}", is_major); + debug!("link change detected: major? {}", is_major); if is_major { // Clear DNS cache DNS_RESOLVER.clear_cache(); - - let (s, r) = sync::oneshot::channel(); self.inner.re_stun("link-change-major"); - self.inner - .actor_sender - .send(ActorMessage::RebindAll(s)) - .await - .ok(); - r.await.ok(); + self.rebind_all().await; } else { self.inner.re_stun("link-change-minor"); } @@ -1768,6 +1774,10 @@ impl Actor { ActorMessage::NetworkChange => { self.network_monitor.network_change().await.ok(); } + #[cfg(test)] + ActorMessage::ForceNetworkChange(is_major) => { + self.handle_network_change(is_major).await; + } } false @@ -2217,9 +2227,10 @@ impl Actor { } async fn rebind_all(&mut self) { + trace!("rebind_all"); inc!(MagicsockMetrics, rebind_calls); if let Err(err) = self.rebind(CurrentPortFate::Keep).await { - debug!("{:?}", err); + warn!("unable to rebind: {:?}", err); return; } @@ -2355,7 +2366,7 @@ fn new_re_stun_timer(initial_delay: bool) -> time::Interval { fn bind(port: u16) -> Result<(RebindingUdpConn, Option)> { let pconn4 = RebindingUdpConn::bind(port, IpFamily::V4).context("bind IPv4 failed")?; let ip4_port = pconn4.local_addr()?.port(); - let ip6_port = ip4_port + 1; + let ip6_port = ip4_port.checked_add(1).unwrap_or(ip4_port - 1); let pconn6 = match RebindingUdpConn::bind(ip6_port, IpFamily::V6) { Ok(conn) => Some(conn), @@ -2934,7 +2945,192 @@ pub(crate) mod tests { }; } - for i in 0..10 { + for i in 0..5 { + println!("-- round {}", i + 1); + roundtrip!(m1, m2, b"hello m1"); + roundtrip!(m2, m1, b"hello m2"); + + println!("-- larger data"); + let mut data = vec![0u8; 10 * 1024]; + rand::thread_rng().fill_bytes(&mut data); + roundtrip!(m1, m2, data); + + let mut data = vec![0u8; 10 * 1024]; + rand::thread_rng().fill_bytes(&mut data); + roundtrip!(m2, m1, data); + } + + println!("cleaning up"); + cleanup_mesh(); + Ok(()) + } + + /// Same structure as `test_two_devices_roundtrip_quinn_magic`, but interrupts regularly + /// with (simulated) network changes. + #[tokio::test(flavor = "multi_thread")] + async fn test_two_devices_roundtrip_network_change() -> Result<()> { + setup_multithreaded_logging(); + let (derp_map, region, _cleanup) = run_derper().await?; + + let m1 = MagicStack::new(derp_map.clone()).await?; + let m2 = MagicStack::new(derp_map.clone()).await?; + + let cleanup_mesh = mesh_stacks(vec![m1.clone(), m2.clone()])?; + + // Wait for magicsock to be told about nodes from mesh_stacks. + let m1t = m1.clone(); + let m2t = m2.clone(); + time::timeout(Duration::from_secs(10), async move { + loop { + let ab = m1t.tracked_endpoints().await.contains(&m2t.public()); + let ba = m2t.tracked_endpoints().await.contains(&m1t.public()); + if ab && ba { + break; + } + } + }) + .await + .context("failed to connect nodes")?; + + // msg from m2 -> m1 + macro_rules! roundtrip { + ($a:expr, $b:expr, $msg:expr) => { + let a = $a.clone(); + let b = $b.clone(); + let a_name = stringify!($a); + let b_name = stringify!($b); + println!("{} -> {} ({} bytes)", a_name, b_name, $msg.len()); + println!("[{}] {:?}", a_name, a.endpoint.local_addr()); + println!("[{}] {:?}", b_name, b.endpoint.local_addr()); + + let a_addr = b.endpoint.magic_sock().get_mapping_addr(&a.public()).await.unwrap(); + let b_addr = a.endpoint.magic_sock().get_mapping_addr(&b.public()).await.unwrap(); + let b_node_id = b.endpoint.node_id(); + + println!("{}: {}, {}: {}", a_name, a_addr, b_name, b_addr); + + let b_span = debug_span!("receiver", b_name, %b_addr); + let b_task = tokio::task::spawn( + async move { + println!("[{}] accepting conn", b_name); + let conn = b.endpoint.accept().await.expect("no conn"); + + println!("[{}] connecting", b_name); + let conn = conn + .await + .with_context(|| format!("[{}] connecting", b_name))?; + println!("[{}] accepting bi", b_name); + let (mut send_bi, mut recv_bi) = conn + .accept_bi() + .await + .with_context(|| format!("[{}] accepting bi", b_name))?; + + println!("[{}] reading", b_name); + let val = recv_bi + .read_to_end(usize::MAX) + .await + .with_context(|| format!("[{}] reading to end", b_name))?; + + println!("[{}] replying", b_name); + for chunk in val.chunks(12) { + send_bi + .write_all(chunk) + .await + .with_context(|| format!("[{}] sending chunk", b_name))?; + } + + println!("[{}] finishing", b_name); + send_bi + .finish() + .await + .with_context(|| format!("[{}] finishing", b_name))?; + + let stats = conn.stats(); + assert!(stats.path.lost_packets < 10, "[{}] should not loose many packets", b_name); + + println!("[{}] close", b_name); + conn.close(0u32.into(), b"done"); + println!("[{}] closed", b_name); + + Ok::<_, anyhow::Error>(()) + } + .instrument(b_span), + ); + + let a_span = debug_span!("sender", a_name, %a_addr); + async move { + println!("[{}] connecting to {}", a_name, b_addr); + let node_b_data = NodeAddr::new(b_node_id).with_derp_region(region).with_direct_addresses([b_addr]); + let conn = a + .endpoint + .connect(node_b_data, &ALPN) + .await + .with_context(|| format!("[{}] connect", a_name))?; + + println!("[{}] opening bi", a_name); + let (mut send_bi, mut recv_bi) = conn + .open_bi() + .await + .with_context(|| format!("[{}] open bi", a_name))?; + + println!("[{}] writing message", a_name); + send_bi + .write_all(&$msg[..]) + .await + .with_context(|| format!("[{}] write all", a_name))?; + + println!("[{}] finishing", a_name); + send_bi + .finish() + .await + .with_context(|| format!("[{}] finish", a_name))?; + + println!("[{}] reading_to_end", a_name); + let val = recv_bi + .read_to_end(usize::MAX) + .await + .with_context(|| format!("[{}]", a_name))?; + anyhow::ensure!( + val == $msg, + "expected {}, got {}", + hex::encode($msg), + hex::encode(val) + ); + + let stats = conn.stats(); + assert!(stats.path.lost_packets < 10, "[{}] should not loose many packets", a_name); + + println!("[{}] close", a_name); + conn.close(0u32.into(), b"done"); + println!("[{}] wait idle", a_name); + a.endpoint.endpoint().wait_idle().await; + println!("[{}] waiting for channel", a_name); + b_task.await??; + Ok(()) + } + .instrument(a_span) + .await?; + }; + } + + let offset = || { + let delay = rand::thread_rng().gen_range(10..=500); + Duration::from_millis(delay) + }; + let rounds = 5; + + let m1_t = m1.clone(); + + // only m1 + let t = tokio::task::spawn(async move { + loop { + println!("[m1] network change"); + m1_t.endpoint.magic_sock().force_network_change(true).await; + time::sleep(offset()).await; + } + }); + + for i in 0..rounds { println!("-- round {}", i + 1); roundtrip!(m1, m2, b"hello m1"); roundtrip!(m2, m1, b"hello m2"); @@ -2949,6 +3145,67 @@ pub(crate) mod tests { roundtrip!(m2, m1, data); } + t.abort(); + + let m2_t = m2.clone(); + + // only m2 + let t = tokio::task::spawn(async move { + loop { + println!("[m2] network change"); + m2_t.endpoint.magic_sock().force_network_change(true).await; + time::sleep(offset()).await; + } + }); + + for i in 0..rounds { + println!("-- round {}", i + 1); + roundtrip!(m1, m2, b"hello m1"); + roundtrip!(m2, m1, b"hello m2"); + + println!("-- larger data"); + let mut data = vec![0u8; 10 * 1024]; + rand::thread_rng().fill_bytes(&mut data); + roundtrip!(m1, m2, data); + + let mut data = vec![0u8; 10 * 1024]; + rand::thread_rng().fill_bytes(&mut data); + roundtrip!(m2, m1, data); + } + + t.abort(); + + let m1_t = m1.clone(); + let m2_t = m2.clone(); + + // both + let t = tokio::task::spawn(async move { + loop { + println!("[m1] network change"); + m1_t.endpoint.magic_sock().force_network_change(true).await; + println!("[m2] network change"); + m2_t.endpoint.magic_sock().force_network_change(true).await; + time::sleep(offset()).await; + } + }); + + for i in 0..rounds { + println!("-- round {}", i + 1); + roundtrip!(m1, m2, b"hello m1"); + roundtrip!(m2, m1, b"hello m2"); + + println!("-- larger data"); + let mut data = vec![0u8; 10 * 1024]; + rand::thread_rng().fill_bytes(&mut data); + roundtrip!(m1, m2, data); + + let mut data = vec![0u8; 10 * 1024]; + rand::thread_rng().fill_bytes(&mut data); + roundtrip!(m2, m1, data); + } + + t.abort(); + println!("cleaning up"); cleanup_mesh(); Ok(()) diff --git a/iroh-net/src/magicsock/derp_actor.rs b/iroh-net/src/magicsock/derp_actor.rs index 248af01160..a370e4ddee 100644 --- a/iroh-net/src/magicsock/derp_actor.rs +++ b/iroh-net/src/magicsock/derp_actor.rs @@ -114,6 +114,7 @@ impl ActiveDerp { loop { tokio::select! { Some(msg) = inbox.recv() => { + trace!("tick: inbox: {:?}", msg); match msg { ActiveDerpMessage::GetLastWrite(r) => { r.send(self.last_write).ok(); @@ -146,6 +147,7 @@ impl ActiveDerp { } } msg = self.derp_client_receiver.recv() => { + trace!("tick: derp_client_receiver"); if let Some(msg) = msg { if self.handle_derp_msg(msg).await == ReadResult::Break { // fatal error @@ -233,10 +235,10 @@ impl ActiveDerp { src: source, buf: data, }; - self.msg_sender - .send(ActorMessage::ReceiveDerp(res)) - .await - .ok(); + if let Err(err) = self.msg_sender.try_send(ActorMessage::ReceiveDerp(res)) { + warn!("dropping received DERP packet: {:?}", err); + } + ReadResult::Continue } derp::ReceivedMessage::Ping(data) => { @@ -254,7 +256,8 @@ impl ActiveDerp { self.derp_routes.retain(|peer| peer != &key); ReadResult::Continue } - _ => { + other => { + trace!("ignoring: {:?}", other); // Ignore. ReadResult::Continue } diff --git a/iroh-net/src/magicsock/peer_map/endpoint.rs b/iroh-net/src/magicsock/peer_map/endpoint.rs index 3a6bffe3c9..bb623de0e6 100644 --- a/iroh-net/src/magicsock/peer_map/endpoint.rs +++ b/iroh-net/src/magicsock/peer_map/endpoint.rs @@ -610,6 +610,9 @@ impl Endpoint { pub(super) fn note_connectivity_change(&mut self) { trace!("connectivity changed"); self.best_addr.clear_trust(); + for es in self.direct_addr_state.values_mut() { + es.clear(); + } } /// Handles a Pong message (a reply to an earlier ping). @@ -1064,6 +1067,14 @@ impl EndpointState { } } } + + fn clear(&mut self) { + self.last_ping = None; + self.last_got_ping = None; + self.last_got_ping_tx_id = None; + self.call_me_maybe_time = None; + self.recent_pong = None; + } } #[derive(Debug, Clone, PartialEq, Eq, Hash)]