Skip to content

Commit

Permalink
fix: handle hairpining timeout properly (#1049)
Browse files Browse the repository at this point in the history
* fix: handle hairpining timeout properly

* refactor: simplify hair pin tracking
  • Loading branch information
dignifiedquire committed May 25, 2023
1 parent a548371 commit 3867b72
Showing 1 changed file with 35 additions and 33 deletions.
68 changes: 35 additions & 33 deletions src/hp/netcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,8 @@ async fn dns_lookup(
struct ReportState {
hair_tx: stun::TransactionId,
got_hair_stun: broadcast::Receiver<SocketAddr>,
// notified on hair pin timeout
hair_timeout: Arc<sync::Notify>,
/// Expires when hairpinning times out. Set when a hairpin was sent.
hair_timeout: Option<Pin<Box<time::Sleep>>>,
pc4: Option<Arc<UdpSocket>>,
pc6: Option<Arc<UdpSocket>>,
pc4_hair: Arc<UdpSocket>,
Expand All @@ -502,7 +502,6 @@ struct ReportState {
wait_port_map: wg::AsyncWaitGroup,
/// The report which will be returned.
report: Arc<RwLock<Report>>,
sent_hair_check: bool,
got_ep4: Option<SocketAddr>,
timers: JoinSet<()>,
plan: ProbePlan,
Expand Down Expand Up @@ -712,7 +711,9 @@ impl ReportState {
debug!("aborting {} probes, already done", probes.len());
drop(probes);

self.wait_hair_check().await;
if let Some(hair_pin) = self.wait_hair_check().await {
self.report.write().await.hair_pinning = Some(hair_pin);
}
debug!("hair_check done");

if !skip_external_network && port_mapper.is_some() {
Expand Down Expand Up @@ -763,52 +764,54 @@ impl ReportState {
self.report.read().await.udp
}

fn sent_hair_check(&self) -> bool {
self.hair_timeout.is_some()
}

/// Reports whether executing the given probe would yield any new information.
/// The given node is provided just because the sole caller already has it
/// and it saves a lookup.
async fn start_hair_check(&mut self, dst: SocketAddr) {
if self.sent_hair_check || self.incremental {
if self.sent_hair_check() || self.incremental {
return;
}
self.sent_hair_check = true;
if let Err(err) = self
match self
.pc4_hair
.send_to(&stun::request(self.hair_tx), dst)
.await
{
debug!("failed to send haircheck to {}: {:?}", dst, err);
Ok(_) => {
debug!("sent haircheck to {}", dst);
self.hair_timeout = Some(Box::pin(time::sleep(HAIRPIN_CHECK_TIMEOUT)));
}
Err(err) => {
debug!("failed to send haircheck to {}: {:?}", dst, err);
}
}

debug!("sent haircheck to {}", dst);

let timeout = self.hair_timeout.clone();
tokio::task::spawn(async move {
time::sleep(HAIRPIN_CHECK_TIMEOUT).await;
timeout.notify_waiters();
});
}

async fn wait_hair_check(&mut self) {
async fn wait_hair_check(&mut self) -> Option<bool> {
let last = self.last.as_deref();
if self.incremental {
if let Some(last) = last {
let last_val = last.hair_pinning;
self.report.write().await.hair_pinning = last_val;
return last.hair_pinning;
}
return;
}
if !self.sent_hair_check {
return;
return None;
}

tokio::select! {
_ = self.got_hair_stun.recv() => {
self.report.write().await.hair_pinning = Some(true);
}
_ = self.hair_timeout.notified() => {
debug!("hair_check timeout");
self.report.write().await.hair_pinning = Some(false);
match self.hair_timeout {
Some(ref mut hair_timeout) => {
tokio::select! {
biased;
_ = self.got_hair_stun.recv() => {
Some(true)
}
_ = hair_timeout => {
debug!("hair_check timeout");
Some(false)
}
}
}
None => None,
}
}

Expand Down Expand Up @@ -1425,11 +1428,10 @@ impl Actor {
pc4,
pc6,
pc4_hair: Arc::new(pc4_hair),
hair_timeout: Arc::new(sync::Notify::new()),
hair_timeout: None,
stop_probe: Arc::new(sync::Notify::new()),
wait_port_map: wg::AsyncWaitGroup::new(),
report: Default::default(),
sent_hair_check: false,
got_ep4: None,
timers: Default::default(),
hair_tx,
Expand Down

0 comments on commit 3867b72

Please sign in to comment.