Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 4 additions & 43 deletions crates/nodes/src/audio/codecs/flac.rs
Comment thread
staging-devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,12 @@ impl ProcessorNode for FlacDecoderNode {
.with_boundaries(streamkit_core::metrics::HISTOGRAM_BOUNDARIES_FILE_OPERATION.to_vec())
.build();

// Create channels for communication with the blocking task.
// This must be bounded to provide backpressure and prevent unbounded buffering.
let (stream_tx, stream_rx) = mpsc::channel::<Bytes>(get_stream_channel_capacity());
let (result_tx, mut result_rx) = mpsc::channel::<DecodeResult>(DECODER_CHANNEL_CAPACITY);

// Spawn blocking task that will decode as data streams in
let decode_duration_histogram_clone = decode_duration_histogram.clone();
let decode_task = tokio::task::spawn_blocking(move || {
let decode_start_time = Instant::now();

// Create streaming reader that will block waiting for data from the channel
let reader = StreamingReader::new(stream_rx);

let result = decode_flac_streaming_incremental(reader, &result_tx);
Expand All @@ -120,12 +115,9 @@ impl ProcessorNode for FlacDecoderNode {

state_helpers::emit_running(&context.state_tx, &node_name);

// Stats tracking
let mut stats_tracker = NodeStatsTracker::new(node_name.clone(), context.stats_tx.clone());

// Stream input data to decoder as it arrives.
// This is a separate task so the main loop can keep draining decode results even when
// the stream channel is full (avoids deadlocks).
// Separate input task to avoid deadlocks when the stream channel is full.
let mut input_task = tokio::spawn(async move {
let stream_tx = stream_tx;
while let Some(packet) = input_rx.recv().await {
Expand All @@ -139,7 +131,6 @@ impl ProcessorNode for FlacDecoderNode {
});
let mut input_done = false;

// Process input and results concurrently
loop {
tokio::select! {
maybe_result = result_rx.recv() => {
Expand Down Expand Up @@ -173,10 +164,7 @@ impl ProcessorNode for FlacDecoderNode {
state_helpers::emit_failed(&context.state_tx, &node_name, &err_msg);
return Err(StreamKitError::Runtime(err_msg));
}
None => {
// Result channel closed, blocking task is done
break;
}
None => break,
}
}
Some(control_msg) = context.control_rx.recv() => {
Expand All @@ -186,15 +174,14 @@ impl ProcessorNode for FlacDecoderNode {
break;
}
}
// EOF or upstream closed — keep draining decode results until
// the blocking task closes the result channel.
_ = &mut input_task, if !input_done => {
// Input finished (EOF or upstream closed). Keep draining decode results until
// the blocking task closes the result channel.
input_done = true;
}
Comment thread
staging-devin-ai-integration[bot] marked this conversation as resolved.
}
}

// Wait for the blocking task to complete
let _ = decode_task.await;

state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed");
Expand All @@ -204,25 +191,19 @@ impl ProcessorNode for FlacDecoderNode {
}
}

// Type alias for decode result to simplify complex signatures
type DecodeResult = Result<(Vec<f32>, u32, u16), String>;

/// Decodes FLAC data incrementally from a streaming reader
/// Decodes and emits frames as soon as FLAC packets are available
#[allow(clippy::cognitive_complexity)] // Decoder state machine is inherently complex
fn decode_flac_streaming_incremental(
reader: StreamingReader,
result_tx: &mpsc::Sender<DecodeResult>,
Comment on lines 196 to 199
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Info: Removed doc comments on internal functions are acceptable

Several /// doc comments were removed from non-public functions (e.g. decode_flac_streaming_incremental, decode_mp3_streaming_incremental, demux_wav_streaming_incremental). CONTRIBUTING.md requires "doc comments for public APIs" but these are all fn (not pub fn), so removing the doc comments does not violate the rule. The #[allow(clippy::cognitive_complexity)] annotations on these functions still retain their inline rationale comments.

Open in Devin Review (Staging)

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct — these are all crate-internal functions, so the /// doc comments were removed intentionally as redundant.

) -> Result<(), String> {
// Wrap the streaming reader in ReadOnlySource, then MediaSourceStream
let source = ReadOnlySource::new(reader);
let mss = MediaSourceStream::new(Box::new(source), MediaSourceStreamOptions::default());

// Create a hint for FLAC format
let mut hint = Hint::new();
hint.with_extension("flac");

// Probe the media source
let format_opts = FormatOptions::default();
let metadata_opts = MetadataOptions::default();
let probed = symphonia::default::get_probe()
Expand All @@ -231,12 +212,10 @@ fn decode_flac_streaming_incremental(

let mut format_reader = probed.format;

// Get the default track
let track = format_reader
.default_track()
.ok_or_else(|| "No default track found in FLAC".to_string())?;

// Get codec parameters
let codec_params = &track.codec_params;
let sample_rate =
codec_params.sample_rate.ok_or_else(|| "No sample rate found in FLAC".to_string())?;
Expand All @@ -251,23 +230,18 @@ fn decode_flac_streaming_incremental(
channels
);

// Create decoder
let decoder_opts = DecoderOptions::default();
let mut decoder = symphonia::default::get_codecs()
.make(codec_params, &decoder_opts)
.map_err(|e| format!("Failed to create FLAC decoder: {e}"))?;

// Get the track ID for filtering
let track_id = track.id;

// Decode packets and rechunk for output
// Use VecDeque for O(1) front removal instead of O(n) Vec::drain
let mut sample_buf: Option<SampleBuffer<f32>> = None;
let mut rechunk_buffer: VecDeque<f32> = VecDeque::new();
let mut frame_count = 0;

loop {
// Read next packet - this will block waiting for more data from the stream
let packet = match format_reader.next_packet() {
Ok(packet) => packet,
Err(Error::IoError(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
Expand All @@ -280,32 +254,25 @@ fn decode_flac_streaming_incremental(
},
};

// Filter packets by track ID
if packet.track_id() != track_id {
continue;
}

// Decode the packet
match decoder.decode(&packet) {
Ok(audio_buf) => {
// Initialize sample buffer on first decode
if sample_buf.is_none() {
let spec = *audio_buf.spec();
let duration = audio_buf.capacity() as u64;
sample_buf = Some(SampleBuffer::<f32>::new(duration, spec));
}

// Copy decoded audio and rechunk for output
if let Some(buf) = &mut sample_buf {
buf.copy_interleaved_ref(audio_buf);
rechunk_buffer.extend(buf.samples().iter().copied());

// Send fixed-size chunks as they become available
while rechunk_buffer.len() >= OUTPUT_FRAME_SIZE {
// Drain from front - O(1) amortized with VecDeque
let chunk: Vec<f32> = rechunk_buffer.drain(..OUTPUT_FRAME_SIZE).collect();

// Use blocking_send - more efficient than Handle::block_on
if result_tx.blocking_send(Ok((chunk, sample_rate, channels))).is_err() {
tracing::info!(
"Result channel closed after sending {} frames ({} samples total). Stopping decode.",
Expand All @@ -327,7 +294,6 @@ fn decode_flac_streaming_incremental(
}
},
Err(Error::DecodeError(err)) => {
// Log and continue to next packet - no explicit continue needed
tracing::warn!("FLAC decode error (continuing): {}", err);
},
Err(e) => {
Expand All @@ -336,7 +302,6 @@ fn decode_flac_streaming_incremental(
}
}

// Send any remaining samples as a final frame
if !rechunk_buffer.is_empty() {
tracing::debug!("Sending final FLAC frame with {} samples", rechunk_buffer.len());
let final_chunk: Vec<f32> = rechunk_buffer.into_iter().collect();
Expand Down Expand Up @@ -412,15 +377,13 @@ mod tests {

let (context, mock_sender, mut state_rx) = create_test_context(inputs, 10);

// Create FLAC decoder node
let node = FlacDecoderNode::new(FlacDecoderConfig::default()).unwrap();

let node_handle = tokio::spawn(async move { Box::new(node).run(context).await });

assert_state_initializing(&mut state_rx).await;
assert_state_running(&mut state_rx).await;

// Read and send FLAC test file
let flac_data = read_sample_file("sample.flac");
let packet = create_test_binary_packet(flac_data);
input_tx.send(packet).await.unwrap();
Expand All @@ -429,7 +392,6 @@ mod tests {
assert_state_stopped(&mut state_rx).await;
node_handle.await.unwrap().unwrap();

// Verify output
let output_packets = mock_sender.get_packets_for_pin("out").await;
assert!(!output_packets.is_empty(), "Expected at least one output packet");

Expand All @@ -448,7 +410,6 @@ mod tests {

#[tokio::test]
async fn test_flac_multiple_packets() {
// Test that decoder can handle data split across multiple packets
let (input_tx, input_rx) = mpsc::channel(10);
let mut inputs = HashMap::new();
inputs.insert("in".to_string(), input_rx);
Expand Down
2 changes: 0 additions & 2 deletions crates/nodes/src/audio/codecs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@

use streamkit_core::NodeRegistry;

// Declare the submodules for each codec.
pub mod flac;
pub mod mp3;
pub mod opus;

/// Registers all available audio codec nodes with the engine's registry.
pub fn register_audio_codecs(registry: &mut NodeRegistry) {
// Call the registration function from each submodule.
opus::register_opus_nodes(registry);
mp3::register_mp3_nodes(registry);
flac::register_flac_nodes(registry);
Expand Down
Loading
Loading