From 89cb2b720ffc68b6e5e451cd9988576977258600 Mon Sep 17 00:00:00 2001 From: Lionel Faber Date: Wed, 30 Jun 2021 19:15:54 +0530 Subject: [PATCH] fix(igd): stop renewing port mapping after endpoint.close() is called --- src/endpoint.rs | 13 +++++++++++-- src/igd.rs | 10 +++++++++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/endpoint.rs b/src/endpoint.rs index 3c3bee06..9b1aec17 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -24,6 +24,7 @@ use super::{ use bytes::Bytes; use log::{debug, error, info, trace, warn}; use std::{net::SocketAddr, time::Duration}; +use tokio::sync::broadcast::{self, Receiver, Sender}; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::time::timeout; @@ -81,6 +82,7 @@ pub struct Endpoint { client_cfg: quinn::ClientConfig, bootstrap_nodes: Vec, qp2p_config: Config, + termination_tx: Sender<()>, connection_pool: ConnectionPool, connection_deduplicator: ConnectionDeduplicator, } @@ -118,6 +120,7 @@ impl Endpoint { let (message_tx, message_rx) = mpsc::unbounded_channel(); let (connection_tx, connection_rx) = mpsc::unbounded_channel(); let (disconnection_tx, disconnection_rx) = mpsc::unbounded_channel(); + let (termination_tx, termination_rx) = broadcast::channel(1); let mut endpoint = Self { local_addr, @@ -128,6 +131,7 @@ impl Endpoint { client_cfg, bootstrap_nodes, qp2p_config, + termination_tx, connection_pool: connection_pool.clone(), connection_deduplicator: ConnectionDeduplicator::new(), }; @@ -182,7 +186,7 @@ impl Endpoint { warn!("Public IP address not verified since bootstrap contacts are empty"); } } else { - endpoint.public_addr = Some(endpoint.fetch_public_address().await?); + endpoint.public_addr = Some(endpoint.fetch_public_address(termination_rx).await?); } listen_for_incoming_connections( @@ -219,7 +223,10 @@ impl Endpoint { /// simply build our connection info by querying the underlying bound socket for our address. /// Note that if such an obtained address is of unspecified category we will ignore that as /// such an address cannot be reached and hence not useful. - async fn fetch_public_address(&mut self) -> Result { + async fn fetch_public_address( + &mut self, + #[allow(unused)] termination_rx: Receiver<()>, + ) -> Result { // Skip port forwarding if self.local_addr.ip().is_loopback() || !self.qp2p_config.forward_port { self.public_addr = Some(self.local_addr); @@ -258,6 +265,7 @@ impl Endpoint { self.qp2p_config .upnp_lease_duration .unwrap_or(DEFAULT_UPNP_LEASE_DURATION_SEC), + termination_rx, ), ) .await @@ -493,6 +501,7 @@ impl Endpoint { /// Close all the connections of this endpoint immediately and stop accepting new connections. pub fn close(&self) { + let _ = self.termination_tx.send(()); self.quic_endpoint.close(0_u32.into(), b"") } diff --git a/src/igd.rs b/src/igd.rs index c78d01b2..41cc4b10 100644 --- a/src/igd.rs +++ b/src/igd.rs @@ -12,10 +12,15 @@ use igd::SearchOptions; use log::{debug, info, warn}; use std::net::SocketAddr; use std::time::Duration; +use tokio::sync::broadcast::{error::TryRecvError, Receiver}; use tokio::time::{self, Instant}; /// Automatically forwards a port and setups a tokio task to renew it periodically. -pub async fn forward_port(local_addr: SocketAddr, lease_duration: u32) -> Result { +pub async fn forward_port( + local_addr: SocketAddr, + lease_duration: u32, + mut termination_rx: Receiver<()>, +) -> Result { let igd_res = add_port(local_addr, lease_duration).await; if let Ok(ext_port) = &igd_res { @@ -27,6 +32,9 @@ pub async fn forward_port(local_addr: SocketAddr, lease_duration: u32) -> Result loop { let _ = timer.tick().await; + if let Err(TryRecvError::Empty) = termination_rx.try_recv() { + break; + } debug!("Renewing IGD lease for port {}", local_addr); let renew_res = renew_port(local_addr, ext_port, lease_duration).await;