Skip to content
This repository was archived by the owner on Aug 23, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/mock/mock_time.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::sync::Mutex;
use std::time::{Duration, SystemTime};
use tokio::sync::Mutex;

/// MockTime is a helper to replace SystemTime::now() for testing purposes.
pub struct MockTime {
Expand All @@ -16,20 +16,20 @@ impl Default for MockTime {

impl MockTime {
/// set_now sets the current time.
pub async fn set_now(&self, now: SystemTime) {
let mut cur_now = self.cur_now.lock().await;
pub fn set_now(&self, now: SystemTime) {
let mut cur_now = self.cur_now.lock().unwrap();
*cur_now = now;
}

/// now returns the current time.
pub async fn now(&self) -> SystemTime {
let cur_now = self.cur_now.lock().await;
pub fn now(&self) -> SystemTime {
let cur_now = self.cur_now.lock().unwrap();
*cur_now
}

/// advance advances duration d
pub async fn advance(&mut self, d: Duration) {
let mut cur_now = self.cur_now.lock().await;
pub fn advance(&mut self, d: Duration) {
let mut cur_now = self.cur_now.lock().unwrap();
*cur_now = cur_now.checked_add(d).unwrap_or(*cur_now);
}
}
13 changes: 8 additions & 5 deletions src/nack/generator/generator_stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::sync::Mutex;

use super::*;

use crate::nack::UINT16SIZE_HALF;

use util::Unmarshal;
Expand Down Expand Up @@ -135,13 +138,13 @@ impl GeneratorStream {
}
}

pub(super) async fn missing_seq_numbers(&self, skip_last_n: u16) -> Vec<u16> {
let internal = self.internal.lock().await;
pub(super) fn missing_seq_numbers(&self, skip_last_n: u16) -> Vec<u16> {
let internal = self.internal.lock().unwrap();
internal.missing_seq_numbers(skip_last_n)
}

pub(super) async fn add(&self, seq: u16) {
let mut internal = self.internal.lock().await;
pub(super) fn add(&self, seq: u16) {
let mut internal = self.internal.lock().unwrap();
internal.add(seq);
}
}
Expand All @@ -155,7 +158,7 @@ impl RTPReader for GeneratorStream {

let mut b = &buf[..n];
let pkt = rtp::packet::Packet::unmarshal(&mut b)?;
self.add(pkt.header.sequence_number).await;
self.add(pkt.header.sequence_number);

Ok((n, attr))
}
Expand Down
2 changes: 1 addition & 1 deletion src/nack/generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl Generator {
let mut nacks = vec![];
let streams = internal.streams.lock().await;
for (ssrc, stream) in streams.iter() {
let missing = stream.missing_seq_numbers(internal.skip_last_n).await;
let missing = stream.missing_seq_numbers(internal.skip_last_n);
if missing.is_empty(){
continue;
}
Expand Down
5 changes: 3 additions & 2 deletions src/report/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use rtp::packetizer::FnTimeGen;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime};
use tokio::sync::{mpsc, Mutex};
use waitgroup::WaitGroup;

Expand All @@ -13,6 +12,8 @@ use crate::{Interceptor, InterceptorBuilder};
use receiver::{ReceiverReport, ReceiverReportInternal};
use sender::{SenderReport, SenderReportInternal};

type FnTimeGen = Arc<dyn Fn() -> SystemTime + Sync + 'static + Send>;

/// ReceiverBuilder can be used to configure ReceiverReport Interceptor.
#[derive(Default)]
pub struct ReportBuilder {
Expand Down
12 changes: 7 additions & 5 deletions src/report/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl RTCPReader for ReceiverReportRtcpReader {
let pkts = rtcp::packet::unmarshal(&mut b)?;

let now = if let Some(f) = &self.internal.now {
f().await
f()
} else {
SystemTime::now()
};
Expand All @@ -48,7 +48,7 @@ impl RTCPReader for ReceiverReportRtcpReader {
m.get(&sr.ssrc).cloned()
};
if let Some(stream) = stream {
stream.process_sender_report(now, sr).await;
stream.process_sender_report(now, sr);
}
}
}
Expand Down Expand Up @@ -96,17 +96,19 @@ impl ReceiverReport {
loop {
tokio::select! {
_ = ticker.tick() =>{
// TODO(cancel safety): This branch isn't cancel safe

let now = if let Some(f) = &internal.now {
f().await
}else{
f()
} else {
SystemTime::now()
};
let streams:Vec<Arc<ReceiverStream>> = {
let m = internal.streams.lock().await;
m.values().cloned().collect()
};
for stream in streams {
let pkt = stream.generate_report(now).await;
let pkt = stream.generate_report(now);

let a = Attributes::new();
if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{
Expand Down
21 changes: 9 additions & 12 deletions src/report/receiver/receiver_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;
use crate::{Attributes, RTPReader};

use async_trait::async_trait;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use util::Unmarshal;

Expand Down Expand Up @@ -184,25 +184,22 @@ impl ReceiverStream {
}
}

pub(crate) async fn process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet) {
let mut internal = self.internal.lock().await;
pub(crate) fn process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet) {
let mut internal = self.internal.lock().unwrap();
internal.process_rtp(now, pkt);
}

pub(crate) async fn process_sender_report(
pub(crate) fn process_sender_report(
&self,
now: SystemTime,
sr: &rtcp::sender_report::SenderReport,
) {
let mut internal = self.internal.lock().await;
let mut internal = self.internal.lock().unwrap();
internal.process_sender_report(now, sr);
}

pub(crate) async fn generate_report(
&self,
now: SystemTime,
) -> rtcp::receiver_report::ReceiverReport {
let mut internal = self.internal.lock().await;
pub(crate) fn generate_report(&self, now: SystemTime) -> rtcp::receiver_report::ReceiverReport {
let mut internal = self.internal.lock().unwrap();
internal.generate_report(now)
}
}
Expand All @@ -217,11 +214,11 @@ impl RTPReader for ReceiverStream {
let mut b = &buf[..n];
let pkt = rtp::packet::Packet::unmarshal(&mut b)?;
let now = if let Some(f) = &self.now {
f().await
f()
} else {
SystemTime::now()
};
self.process_rtp(now, &pkt).await;
self.process_rtp(now, &pkt);

Ok((n, attr))
}
Expand Down
Loading