Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions iroh/src/magicsock/remote_map/remote_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,23 +357,6 @@ impl RemoteStateActor {
}
}

async fn send_datagram(
&self,
dst: transports::Addr,
owned_transmit: OwnedTransmit,
) -> n0_error::Result<()> {
let transmit = transports::Transmit {
ecn: owned_transmit.ecn,
contents: owned_transmit.contents.as_ref(),
segment_size: owned_transmit.segment_size,
};
self.sender
.send(&dst, None, &transmit)
.await
.with_context(|_| format!("failed to send datagram to {dst:?}"))?;
Ok(())
}

/// Handles [`RemoteStateMessage::SendDatagram`].
async fn handle_msg_send_datagram(&mut self, transmit: OwnedTransmit) {
// Sending datagrams might fail, e.g. because we don't have the right transports set
Expand All @@ -385,7 +368,7 @@ impl RemoteStateActor {
if let Some(addr) = self.selected_path.get() {
trace!(?addr, "sending datagram to selected path");

if let Err(err) = self.send_datagram(addr.clone(), transmit).await {
if let Err(err) = send_datagram(&mut self.sender, addr.clone(), transmit).await {
debug!(?addr, "failed to send datagram on selected_path: {err:#}");
}
} else {
Expand All @@ -396,6 +379,7 @@ impl RemoteStateActor {
if self.paths.is_empty() {
warn!("Cannot send datagrams: No paths to remote endpoint known");
}

for addr in self.paths.addrs() {
// We never want to send to our local addresses.
// The local address set is updated in the main loop so we can use `peek` here.
Expand All @@ -407,7 +391,9 @@ impl RemoteStateActor {
.any(|a| a.addr == *sockaddr)
{
trace!(%sockaddr, "not sending datagram to our own address");
} else if let Err(err) = self.send_datagram(addr.clone(), transmit.clone()).await {
} else if let Err(err) =
send_datagram(&mut self.sender, addr.clone(), transmit.clone()).await
{
debug!(?addr, "failed to send datagram: {err:#}");
}
}
Expand Down Expand Up @@ -980,6 +966,24 @@ impl RemoteStateActor {
}
}

fn send_datagram<'a>(
sender: &'a mut TransportsSender,
dst: transports::Addr,
owned_transmit: OwnedTransmit,
) -> impl Future<Output = n0_error::Result<()>> + 'a {
std::future::poll_fn(move |cx| {
let transmit = transports::Transmit {
ecn: owned_transmit.ecn,
contents: owned_transmit.contents.as_ref(),
segment_size: owned_transmit.segment_size,
};

Pin::new(&mut *sender)
.poll_send(cx, &dst, None, &transmit)
.map(|res| res.with_context(|_| format!("failed to send datagram to {dst:?}")))
})
}

/// Messages to send to the [`RemoteStateActor`].
#[derive(derive_more::Debug)]
pub(crate) enum RemoteStateMessage {
Expand Down
58 changes: 2 additions & 56 deletions iroh/src/magicsock/transports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,62 +493,8 @@ pub(crate) struct TransportsSender {
}

impl TransportsSender {
#[instrument(skip(self, transmit), fields(len = transmit.contents.len()))]
pub(crate) async fn send(
&self,
dst: &Addr,
src: Option<IpAddr>,
transmit: &Transmit<'_>,
) -> io::Result<()> {
let mut any_match = false;
match dst {
#[cfg(wasm_browser)]
Addr::Ip(..) => return Err(io::Error::other("IP is unsupported in browser")),
#[cfg(not(wasm_browser))]
Addr::Ip(addr) => {
for sender in &self.ip {
if sender.is_valid_send_addr(addr) {
any_match = true;
match sender.send(*addr, src, transmit).await {
Ok(()) => {
trace!("sent");
return Ok(());
}
Err(err) => {
warn!("ip failed to send: {:?}", err);
}
}
}
}
}
Addr::Relay(url, endpoint_id) => {
for sender in &self.relay {
if sender.is_valid_send_addr(url, endpoint_id) {
any_match = true;
match sender.send(url.clone(), *endpoint_id, transmit).await {
Ok(()) => {
trace!("sent");
return Ok(());
}
Err(err) => {
warn!("relay failed to send: {:?}", err);
}
}
}
}
}
}
if any_match {
Err(io::Error::other("all available transports failed"))
} else {
Err(io::Error::other(
"no transport available for this destination",
))
}
}

#[instrument(name = "poll_send", skip(self, cx, transmit), fields(len = transmit.contents.len()))]
pub(crate) fn inner_poll_send(
pub(crate) fn poll_send(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context,
dst: &Addr,
Expand Down Expand Up @@ -789,7 +735,7 @@ impl quinn::UdpSender for MagicSender {

match this
.sender
.inner_poll_send(cx, &transport_addr, quinn_transmit.src_ip, &transmit)
.poll_send(cx, &transport_addr, quinn_transmit.src_ip, &transmit)
{
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(ref err)) => {
Expand Down
34 changes: 0 additions & 34 deletions iroh/src/magicsock/transports/ip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,40 +195,6 @@ impl IpSender {
SocketAddr::new(addr.ip().to_canonical(), addr.port())
}

pub(super) async fn send(
&self,
dst: SocketAddr,
src: Option<IpAddr>,
transmit: &Transmit<'_>,
) -> io::Result<()> {
let total_bytes = transmit.contents.len() as u64;
let res = self
.sender
.send(&quinn_udp::Transmit {
destination: Self::canonical_addr(dst),
ecn: transmit.ecn,
contents: transmit.contents,
segment_size: transmit.segment_size,
src_ip: src,
})
.await;

match res {
Ok(res) => {
match dst {
SocketAddr::V4(_) => {
self.metrics.send_ipv4.inc_by(total_bytes);
}
SocketAddr::V6(_) => {
self.metrics.send_ipv6.inc_by(total_bytes);
}
}
Ok(res)
}
Err(err) => Err(err),
}
}

pub(super) fn poll_send(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context,
Expand Down
26 changes: 0 additions & 26 deletions iroh/src/magicsock/transports/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,32 +244,6 @@ impl RelaySender {
true
}

pub(super) async fn send(
&self,
dest_url: RelayUrl,
dest_endpoint: EndpointId,
transmit: &Transmit<'_>,
) -> io::Result<()> {
let contents = datagrams_from_transmit(transmit);

let item = RelaySendItem {
remote_endpoint: dest_endpoint,
url: dest_url.clone(),
datagrams: contents,
};

let Some(sender) = self.sender.get_ref() else {
return Err(io::Error::other("channel closed"));
};
match sender.send(item).await {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(_)) => Err(io::Error::new(
io::ErrorKind::ConnectionReset,
"channel to actor is closed",
)),
}
}

pub(super) fn poll_send(
&mut self,
cx: &mut Context,
Expand Down
Loading