Skip to content

Commit

Permalink
wip: Adapt the rc internals
Browse files Browse the repository at this point in the history
  • Loading branch information
lu-zero committed Apr 8, 2021
1 parent a9510f0 commit 37bce52
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 36 deletions.
113 changes: 84 additions & 29 deletions src/api/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,56 @@ use std::sync::Arc;
/// Endpoint to send previous-pass statistics data
pub struct RcDataSender {
sender: Sender<RcData>,
limit: u64,
count: u64,
pub(crate) limit: u64,
pub(crate) count: u64,
}

impl RcDataSender {
fn new(limit: u64, sender: Sender<RcData>) -> RcDataSender {
Self { sender, limit, count: 0 }
}
fn split_try_send(
&mut self, data: RcData,
) -> Result<(), TrySendError<RcData>> {
match data {
RcData::Summary(_) => self.sender.try_send(data)?,
RcData::Frame(data) => {
for f in data.chunks(crate::rate::TWOPASS_PACKET_SZ) {
self.sender.try_send(RcData::Frame(f.to_vec().into_boxed_slice()))?
}
}
}
Ok(())
}
pub fn try_send(
&mut self, data: RcData,
) -> Result<(), TrySendError<RcData>> {
if self.limit <= self.count {
Err(TrySendError::Disconnected(data))
} else {
let r = self.sender.try_send(data);
let r = self.split_try_send(data);
if r.is_ok() {
self.count += 1;
}
r
}
}
fn split_send(&mut self, data: RcData) -> Result<(), SendError<RcData>> {
match data {
RcData::Summary(_) => self.sender.send(data)?,
RcData::Frame(data) => {
for f in data.chunks(crate::rate::TWOPASS_PACKET_SZ) {
self.sender.send(RcData::Frame(f.to_vec().into_boxed_slice()))?
}
}
}
Ok(())
}
pub fn send(&mut self, data: RcData) -> Result<(), SendError<RcData>> {
if self.limit <= self.count {
Err(SendError(data))
} else {
let r = self.sender.send(data);
let r = self.split_send(data);
if r.is_ok() {
self.count += 1;
}
Expand Down Expand Up @@ -330,7 +354,8 @@ impl Config {
}

trait RcFirstPass {
fn send_pass_data(&mut self, rc_state: &mut RCState);
fn store_pass_data(&mut self, rc_state: &mut RCState);
fn send_pass_data(&mut self);
fn send_pass_summary(&mut self, rc_state: &mut RCState);
}

Expand All @@ -340,29 +365,46 @@ trait RcSecondPass {
) -> Result<(), ()>;
}

impl RcFirstPass for Sender<RcData> {
fn send_pass_data(&mut self, rc_state: &mut RCState) {
struct PassDataSender {
sender: Sender<RcData>,
data: Vec<u8>,
}

impl RcFirstPass for PassDataSender {
fn store_pass_data(&mut self, rc_state: &mut RCState) {
if let Some(data) = rc_state.emit_frame_data() {
let data = data.to_vec().into_boxed_slice();
self.send(RcData::Frame(data)).unwrap();
self.data.extend(data.iter());
} else {
unreachable!(
"The encoder received more frames than its internal
limit allows"
);
}
}

fn send_pass_data(&mut self) {
let data = self.data.to_vec().into_boxed_slice();
self.data.clear();
self.sender.send(RcData::Frame(data)).unwrap();
}
fn send_pass_summary(&mut self, rc_state: &mut RCState) {
let data = rc_state.emit_summary();
let data = data.to_vec().into_boxed_slice();
self.send(RcData::Summary(data)).unwrap();
self.sender.send(RcData::Summary(data)).unwrap();
}
}

impl RcFirstPass for Option<Sender<RcData>> {
fn send_pass_data(&mut self, rc_state: &mut RCState) {
impl RcFirstPass for Option<PassDataSender> {
fn store_pass_data(&mut self, rc_state: &mut RCState) {
match self.as_mut() {
Some(s) => s.store_pass_data(rc_state),
None => {}
}
}

fn send_pass_data(&mut self) {
match self.as_mut() {
Some(s) => s.send_pass_data(rc_state),
Some(s) => s.send_pass_data(),
None => {}
}
}
Expand All @@ -378,16 +420,18 @@ impl RcSecondPass for Receiver<RcData> {
fn feed_pass_data<T: Pixel>(
&mut self, inner: &mut ContextInner<T>,
) -> Result<(), ()> {
while inner.rc_state.twopass_in_frames_needed() > 0
&& !inner.done_processing()
{
if inner.done_processing() {
return Ok(());
}

while inner.rc_state.twopass_in_frames_needed() > 0 {
if let Ok(RcData::Frame(data)) = self.recv() {
inner
.rc_state
.parse_frame_data_packet(data.as_ref())
.unwrap_or_else(|_| todo!("Error reporting"));
} else {
todo!("Error reporting");
todo!("No data reporting");
}
}

Expand Down Expand Up @@ -427,7 +471,10 @@ impl Config {

let (mut send_rc_pass1, rc_data_receiver) = if rc.emit_pass_data {
let (send_rc_pass1, receive_rc_pass1) = unbounded();
(Some(send_rc_pass1), Some(RcDataReceiver(receive_rc_pass1)))
(
Some(PassDataSender { sender: send_rc_pass1, data: Vec::new() }),
Some(RcDataReceiver(receive_rc_pass1)),
)
} else {
(None, None)
};
Expand All @@ -436,7 +483,8 @@ impl Config {
.summary
.is_some()
{
let (frame_limit, pass_limit) =
// the pass data packets are now lumped together to match the packets emitted.
let (frame_limit, _pass_limit) =
rc.summary.as_ref().map(|s| (s.ntus as u64, s.total as u64)).unwrap();

inner.limit = Some(frame_limit);
Expand All @@ -449,7 +497,7 @@ impl Config {
};

(
Some(RcDataSender::new(pass_limit, send_rc_pass2)),
Some(RcDataSender::new(frame_limit, send_rc_pass2)),
Some(receive_rc_pass2),
frame_limit,
)
Expand All @@ -475,17 +523,21 @@ impl Config {
// already.
//
// this call should return either Ok or Err(Encoded)
let has_pass_data = match inner.receive_packet() {
let pass_data_ready = match inner.receive_packet() {
Ok(p) => {
send_packet.send(p).unwrap();
send_rc_pass1.store_pass_data(&mut inner.rc_state);
true
}
Err(EncoderStatus::Encoded) => true,
Err(EncoderStatus::Encoded) => {
send_rc_pass1.store_pass_data(&mut inner.rc_state);
false
}
Err(EncoderStatus::NotReady) => todo!("Error reporting"),
_ => unreachable!(),
};
if has_pass_data {
send_rc_pass1.send_pass_data(&mut inner.rc_state);
if pass_data_ready {
send_rc_pass1.send_pass_data();
}
}

Expand All @@ -499,20 +551,23 @@ impl Config {
loop {
receive_rc_pass2.feed_pass_data(&mut inner).unwrap();
let r = inner.receive_packet();
let has_pass_data = match r {
let pass_data_ready = match r {
Ok(p) => {
// warn!("Sending out {}", p.input_frameno);
send_packet.send(p).unwrap();
send_rc_pass1.store_pass_data(&mut inner.rc_state);
true
}
Err(EncoderStatus::LimitReached) => break,
Err(EncoderStatus::Encoded) => true,
Err(EncoderStatus::Encoded) => {
send_rc_pass1.store_pass_data(&mut inner.rc_state);
false
}
Err(EncoderStatus::NotReady) => todo!("Error reporting"),
_ => unreachable!(),
};

if has_pass_data {
send_rc_pass1.send_pass_data(&mut inner.rc_state);
if pass_data_ready {
send_rc_pass1.send_pass_data();
}
}

Expand Down
27 changes: 21 additions & 6 deletions src/api/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,15 @@ impl<T: Pixel> Context<T> {
///
/// It will return a `RcData::Summary` once the encoder is flushed.
pub fn rc_receive_pass_data(&mut self) -> Option<RcData> {
self.rc_data_receiver.as_mut().and_then(|r| r.recv().ok())
self.rc_data_receiver.as_mut().and_then(|r| {
let data = r.recv().ok();
match data {
Some(RcData::Summary(_)) => println!("-> Summary"),
Some(RcData::Frame(_)) => println!("-> Data"),
_ => println!("-> No data"),
}
data
})
}

/// Lower bound number of pass data packets required to progress the
Expand All @@ -396,11 +404,18 @@ impl<T: Pixel> Context<T> {
/// It should be called iteratively until it returns 0.
///
pub fn rc_second_pass_data_required(&self) -> usize {
if self.packet_receiver.len() > 0 {
0
} else {
1
}
self.rc_data_sender.as_ref().map_or(0, |s| {
let reservoir_delay =
self.config.reservoir_frame_delay.unwrap_or(0) as usize;
if s.len() <= reservoir_delay
&& self.packet_receiver.len() == 0
&& s.limit > s.count
{
1
} else {
0
}
})
}

/// Feed the first pass Rate Control data to the encoder,
Expand Down
2 changes: 1 addition & 1 deletion src/rate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const TWOPASS_VERSION: i32 = 1;
// + FRAME_NSUBTYPES*(4 byte frame count + 1 byte exp + 8 byte scale_sum)
pub(crate) const TWOPASS_HEADER_SZ: usize = 16 + FRAME_NSUBTYPES * (4 + 1 + 8);
// 4 byte frame type (show_frame and fti jointly coded) + 4 byte log_scale_q24
const TWOPASS_PACKET_SZ: usize = 8;
pub(crate) const TWOPASS_PACKET_SZ: usize = 8;

const SEF_BITS: i64 = 24;

Expand Down

0 comments on commit 37bce52

Please sign in to comment.