diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index bb45129048..797aa36b14 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -357,23 +357,6 @@ impl RemoteStateActor { } } - async fn send_datagram( - &self, - dst: transports::Addr, - owned_transmit: OwnedTransmit, - ) -> n0_error::Result<()> { - let transmit = transports::Transmit { - ecn: owned_transmit.ecn, - contents: owned_transmit.contents.as_ref(), - segment_size: owned_transmit.segment_size, - }; - self.sender - .send(&dst, None, &transmit) - .await - .with_context(|_| format!("failed to send datagram to {dst:?}"))?; - Ok(()) - } - /// Handles [`RemoteStateMessage::SendDatagram`]. async fn handle_msg_send_datagram(&mut self, transmit: OwnedTransmit) { // Sending datagrams might fail, e.g. because we don't have the right transports set @@ -385,7 +368,7 @@ impl RemoteStateActor { if let Some(addr) = self.selected_path.get() { trace!(?addr, "sending datagram to selected path"); - if let Err(err) = self.send_datagram(addr.clone(), transmit).await { + if let Err(err) = send_datagram(&mut self.sender, addr.clone(), transmit).await { debug!(?addr, "failed to send datagram on selected_path: {err:#}"); } } else { @@ -396,6 +379,7 @@ impl RemoteStateActor { if self.paths.is_empty() { warn!("Cannot send datagrams: No paths to remote endpoint known"); } + for addr in self.paths.addrs() { // We never want to send to our local addresses. // The local address set is updated in the main loop so we can use `peek` here. @@ -407,7 +391,9 @@ impl RemoteStateActor { .any(|a| a.addr == *sockaddr) { trace!(%sockaddr, "not sending datagram to our own address"); - } else if let Err(err) = self.send_datagram(addr.clone(), transmit.clone()).await { + } else if let Err(err) = + send_datagram(&mut self.sender, addr.clone(), transmit.clone()).await + { debug!(?addr, "failed to send datagram: {err:#}"); } } @@ -980,6 +966,24 @@ impl RemoteStateActor { } } +fn send_datagram<'a>( + sender: &'a mut TransportsSender, + dst: transports::Addr, + owned_transmit: OwnedTransmit, +) -> impl Future> + 'a { + std::future::poll_fn(move |cx| { + let transmit = transports::Transmit { + ecn: owned_transmit.ecn, + contents: owned_transmit.contents.as_ref(), + segment_size: owned_transmit.segment_size, + }; + + Pin::new(&mut *sender) + .poll_send(cx, &dst, None, &transmit) + .map(|res| res.with_context(|_| format!("failed to send datagram to {dst:?}"))) + }) +} + /// Messages to send to the [`RemoteStateActor`]. #[derive(derive_more::Debug)] pub(crate) enum RemoteStateMessage { diff --git a/iroh/src/magicsock/transports.rs b/iroh/src/magicsock/transports.rs index 85efb5cf2e..7b95ec8bee 100644 --- a/iroh/src/magicsock/transports.rs +++ b/iroh/src/magicsock/transports.rs @@ -493,62 +493,8 @@ pub(crate) struct TransportsSender { } impl TransportsSender { - #[instrument(skip(self, transmit), fields(len = transmit.contents.len()))] - pub(crate) async fn send( - &self, - dst: &Addr, - src: Option, - transmit: &Transmit<'_>, - ) -> io::Result<()> { - let mut any_match = false; - match dst { - #[cfg(wasm_browser)] - Addr::Ip(..) => return Err(io::Error::other("IP is unsupported in browser")), - #[cfg(not(wasm_browser))] - Addr::Ip(addr) => { - for sender in &self.ip { - if sender.is_valid_send_addr(addr) { - any_match = true; - match sender.send(*addr, src, transmit).await { - Ok(()) => { - trace!("sent"); - return Ok(()); - } - Err(err) => { - warn!("ip failed to send: {:?}", err); - } - } - } - } - } - Addr::Relay(url, endpoint_id) => { - for sender in &self.relay { - if sender.is_valid_send_addr(url, endpoint_id) { - any_match = true; - match sender.send(url.clone(), *endpoint_id, transmit).await { - Ok(()) => { - trace!("sent"); - return Ok(()); - } - Err(err) => { - warn!("relay failed to send: {:?}", err); - } - } - } - } - } - } - if any_match { - Err(io::Error::other("all available transports failed")) - } else { - Err(io::Error::other( - "no transport available for this destination", - )) - } - } - #[instrument(name = "poll_send", skip(self, cx, transmit), fields(len = transmit.contents.len()))] - pub(crate) fn inner_poll_send( + pub(crate) fn poll_send( mut self: Pin<&mut Self>, cx: &mut std::task::Context, dst: &Addr, @@ -789,7 +735,7 @@ impl quinn::UdpSender for MagicSender { match this .sender - .inner_poll_send(cx, &transport_addr, quinn_transmit.src_ip, &transmit) + .poll_send(cx, &transport_addr, quinn_transmit.src_ip, &transmit) { Poll::Ready(Ok(())) => Poll::Ready(Ok(())), Poll::Ready(Err(ref err)) => { diff --git a/iroh/src/magicsock/transports/ip.rs b/iroh/src/magicsock/transports/ip.rs index e2ee9b5ffc..dc4ab293da 100644 --- a/iroh/src/magicsock/transports/ip.rs +++ b/iroh/src/magicsock/transports/ip.rs @@ -195,40 +195,6 @@ impl IpSender { SocketAddr::new(addr.ip().to_canonical(), addr.port()) } - pub(super) async fn send( - &self, - dst: SocketAddr, - src: Option, - transmit: &Transmit<'_>, - ) -> io::Result<()> { - let total_bytes = transmit.contents.len() as u64; - let res = self - .sender - .send(&quinn_udp::Transmit { - destination: Self::canonical_addr(dst), - ecn: transmit.ecn, - contents: transmit.contents, - segment_size: transmit.segment_size, - src_ip: src, - }) - .await; - - match res { - Ok(res) => { - match dst { - SocketAddr::V4(_) => { - self.metrics.send_ipv4.inc_by(total_bytes); - } - SocketAddr::V6(_) => { - self.metrics.send_ipv6.inc_by(total_bytes); - } - } - Ok(res) - } - Err(err) => Err(err), - } - } - pub(super) fn poll_send( mut self: Pin<&mut Self>, cx: &mut std::task::Context, diff --git a/iroh/src/magicsock/transports/relay.rs b/iroh/src/magicsock/transports/relay.rs index 6cdb2a388c..fdaf503fd5 100644 --- a/iroh/src/magicsock/transports/relay.rs +++ b/iroh/src/magicsock/transports/relay.rs @@ -244,32 +244,6 @@ impl RelaySender { true } - pub(super) async fn send( - &self, - dest_url: RelayUrl, - dest_endpoint: EndpointId, - transmit: &Transmit<'_>, - ) -> io::Result<()> { - let contents = datagrams_from_transmit(transmit); - - let item = RelaySendItem { - remote_endpoint: dest_endpoint, - url: dest_url.clone(), - datagrams: contents, - }; - - let Some(sender) = self.sender.get_ref() else { - return Err(io::Error::other("channel closed")); - }; - match sender.send(item).await { - Ok(_) => Ok(()), - Err(mpsc::error::SendError(_)) => Err(io::Error::new( - io::ErrorKind::ConnectionReset, - "channel to actor is closed", - )), - } - } - pub(super) fn poll_send( &mut self, cx: &mut Context,