Skip to content

Commit

Permalink
ref(netcheck): Bring back the ActorMessage (#1023)
Browse files Browse the repository at this point in the history
  • Loading branch information
flub committed May 17, 2023
1 parent 7c87b94 commit 4437d7e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
5 changes: 3 additions & 2 deletions src/hp/magicsock/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ impl Actor {
);

let (ip_sender, mut ip_receiver) = mpsc::channel(128);
let stun_packet_channel = self.net_checker.get_stun_packet_channel();
let stun_packet_channel = self.net_checker.get_msg_sender();

// Process incoming packets in an independent task of the other work.

Expand All @@ -872,7 +872,8 @@ impl Actor {
conn.enable_stun_packets.load(Ordering::Relaxed);
debug!("on_stun_receive, processing {}", enable_stun_packets);
if enable_stun_packets {
stun_packet_channel.try_send((packet, meta.addr)).ok();
let msg = netcheck::ActorMessage::StunPacket(packet, meta.addr);
stun_packet_channel.try_send(msg).ok();
}
continue;
}
Expand Down
28 changes: 17 additions & 11 deletions src/hp/netcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl fmt::Display for Report {
/// do so.
#[derive(Debug)]
pub struct Client {
msg_sender: mpsc::Sender<(Bytes, SocketAddr)>,
msg_sender: mpsc::Sender<ActorMessage>,
actor: Actor,
}

Expand Down Expand Up @@ -181,8 +181,11 @@ impl Client {
Ok(Client { msg_sender, actor })
}

/// Used by [`crate::hp::magicsock::Conn`] to pass received stun packets to the running netcheck actor.
pub fn get_stun_packet_channel(&self) -> mpsc::Sender<(Bytes, SocketAddr)> {
/// Returns the sender which can send messages to the actor.
///
/// Used by [`crate::hp::magicsock::Conn`] to pass received stun packets to the running
/// netcheck actor.
pub(crate) fn get_msg_sender(&self) -> mpsc::Sender<ActorMessage> {
self.msg_sender.clone()
}

Expand Down Expand Up @@ -253,7 +256,7 @@ impl Client {
/// is fine since the socket is already bound so packets will not be lost.
fn spawn_udp_listener(
sock: Arc<UdpSocket>,
sender: mpsc::Sender<(Bytes, SocketAddr)>,
sender: mpsc::Sender<ActorMessage>,
cancel_token: CancellationToken,
) {
let span = debug_span!(
Expand Down Expand Up @@ -291,18 +294,16 @@ impl Client {
async fn recv_stun_once(
sock: &UdpSocket,
buf: &mut [u8],
sender: &mpsc::Sender<(Bytes, SocketAddr)>,
sender: &mpsc::Sender<ActorMessage>,
) -> Result<()> {
let (count, mut from_addr) = sock
.recv_from(buf)
.await
.context("Error reading from stun socket")?;
let payload = &buf[..count];
from_addr.set_ip(to_canonical(from_addr.ip()));
sender
.send((Bytes::from(payload.to_vec()), from_addr))
.await
.context("actor stopped")
let msg = ActorMessage::StunPacket(Bytes::from(payload.to_vec()), from_addr);
sender.send(msg).await.context("actor stopped")
}
}

Expand Down Expand Up @@ -1105,10 +1106,15 @@ impl ProbeReport {
}
}

#[derive(Debug)]
pub(crate) enum ActorMessage {
StunPacket(Bytes, SocketAddr),
}

#[derive(Debug)]
struct Actor {
/// Actor messages channel.
receiver: mpsc::Receiver<(Bytes, SocketAddr)>,
receiver: mpsc::Receiver<ActorMessage>,
reports: Reports,
/// Whether the client should try to reach things other than localhost.
///
Expand Down Expand Up @@ -1157,7 +1163,7 @@ impl Actor {
debug!("incoming stun packet: {:?}", msg);
match msg {
None => bail!("client dropped, abort"),
Some((pkt, source)) =>
Some(ActorMessage::StunPacket(pkt, source)) =>
self.receive_stun_packet(&mut in_flight, &pkt, source),
}
}
Expand Down

0 comments on commit 4437d7e

Please sign in to comment.