Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/transports/ice/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct IceConn {
pub dtls_receiver: RwLock<Option<Weak<dyn PacketReceiver>>>,
pub rtp_receiver: RwLock<Option<Weak<dyn PacketReceiver>>>,
pub latch_on_rtp: AtomicBool,
pub rtp_latched: AtomicBool,
}

impl IceConn {
Expand All @@ -30,6 +31,7 @@ impl IceConn {
dtls_receiver: RwLock::new(None),
rtp_receiver: RwLock::new(None),
latch_on_rtp: AtomicBool::new(false),
rtp_latched: AtomicBool::new(false),
})
}

Expand Down Expand Up @@ -164,13 +166,12 @@ impl PacketReceiver for IceConn {
}
} else if (128..192).contains(&first_byte) {
// RTP / RTCP
if self.latch_on_rtp.load(Ordering::Relaxed) && addr != current_remote {
if self.latch_on_rtp.load(Ordering::Relaxed)
&& addr != current_remote
&& !self.rtp_latched.load(Ordering::Relaxed)
{
*self.remote_addr.write().unwrap() = addr;
let current_rtcp = *self.remote_rtcp_addr.read().unwrap();
if let Some(mut rtcp_addr) = current_rtcp {
rtcp_addr.set_ip(addr.ip());
*self.remote_rtcp_addr.write().unwrap() = Some(rtcp_addr);
}
self.rtp_latched.store(true, Ordering::Relaxed);
}
let receiver = {
let rx_lock = self.rtp_receiver.read().unwrap();
Expand Down
60 changes: 28 additions & 32 deletions src/transports/ice/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use anyhow::{Context, Result, anyhow, bail};
use tokio::net::{UdpSocket, lookup_host};
use tokio::sync::{Mutex, broadcast, mpsc, oneshot, watch};
use tokio::time::timeout;
use tracing::{debug, instrument, trace};
use tracing::{debug, instrument, trace, warn};

#[cfg(any(test, feature = "simulator"))]
use self::stun::random_u32;
Expand Down Expand Up @@ -1219,36 +1219,16 @@ async fn handle_packet(
match StunMessage::decode(packet) {
Ok(msg) => {
if msg.class == StunClass::Request {
// Start of Latching Logic for RTP mode
if inner.config.transport_mode == crate::TransportMode::Rtp
&& inner.config.enable_latching
&& !inner.config.enable_ice_lite
{
let mut update = false;
{
let selected_pair = inner.selected_pair.lock().unwrap();
if let Some(pair) = selected_pair.as_ref() {
if pair.remote.address != addr
&& msg.method == StunMethod::Binding
&& pair.remote.address.port() == addr.port()
// Match port only
{
debug!(
"RTP Latching: Switching remote address from {} to {} based on STUN Binding Request",
pair.remote.address, addr
);
update = true;
}
}
}

if update {
let mut pair = inner.selected_pair.lock().unwrap().clone().unwrap();
pair.remote.address = addr;

*inner.selected_pair.lock().unwrap() = Some(pair.clone());
// This send should trigger the pair_monitor in PeerConnection via watcher
let _ = inner.selected_pair_notifier.send(Some(pair.clone()));
}
warn!(
remote_addr = %addr,
method = ?msg.method,
class = ?msg.class,
"Rejecting STUN on RTP transport without ICE-lite"
);
return;
}
handle_stun_request(&sender, &msg, addr, inner).await;
} else if msg.class == StunClass::SuccessResponse {
Expand Down Expand Up @@ -1976,13 +1956,29 @@ impl IceGatherer {

async fn bind_socket(&self, ip: IpAddr) -> Result<UdpSocket> {
if let (Some(start), Some(end)) = (self.config.rtp_start_port, self.config.rtp_end_port) {
for port in start..=end {
let start = start.saturating_add(start % 2);
let end = end - (end % 2);

if start > end {
bail!("No usable even RTP ports in range {}..={}", start, end);
}

let port_count = (((end - start) / 2) + 1) as u64;
let start_index = (random_u64() % port_count) as u16;
let mut port = start + (start_index * 2);

for _ in 0..port_count {
match UdpSocket::bind(SocketAddr::new(ip, port)).await {
Ok(socket) => return Ok(socket),
Err(_) => continue,
Err(_) => {
port = port.saturating_add(2);
if port > end {
port = start;
}
}
}
}
bail!("No available ports in range {}..{}", start, end)
bail!("No available even RTP ports in range {}..={}", start, end)
} else {
UdpSocket::bind(SocketAddr::new(ip, 0))
.await
Expand Down