fix(recorder): prevent close hang and fix corrupt frame splits#5926
Conversation
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Group 5: _split_frame (encode-path helper) | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
|
|
||
| def _ramp_frame(num_samples: int, num_channels: int, sample_rate: int = 24000) -> rtc.AudioFrame: | ||
| """A frame whose samples are a monotonic ramp, so splits can be checked for alignment.""" | ||
| arr = np.arange(num_samples * num_channels, dtype=np.int16) | ||
| return rtc.AudioFrame( | ||
| data=arr.tobytes(), | ||
| num_channels=num_channels, | ||
| samples_per_channel=num_samples, | ||
| sample_rate=sample_rate, | ||
| ) | ||
|
|
||
|
|
||
| @pytest.mark.parametrize("num_channels", [1, 2]) | ||
| @pytest.mark.parametrize("fraction", [0.25, 0.5, 0.75]) | ||
| def test_split_frame_is_consistent_and_lossless(num_channels: int, fraction: float) -> None: | ||
| """`rtc.AudioFrame.data` is a memoryview of int16 *samples*, not bytes. | ||
|
|
||
| A split must keep each half's data length in sync with its samples_per_channel and | ||
| must neither drop nor duplicate samples. This guards the regression where the helper | ||
| indexed the buffer in bytes and produced corrupt frames on interrupted/paused playback. | ||
| """ | ||
| n = 240 | ||
| frame = _ramp_frame(n, num_channels) | ||
| left, right = _split_frame(frame, frame.duration * fraction) | ||
|
|
||
| # each half is internally consistent | ||
| assert len(left.data) == left.samples_per_channel * left.num_channels | ||
| assert len(right.data) == right.samples_per_channel * right.num_channels | ||
|
|
||
| # no samples lost or duplicated across the split | ||
| assert left.samples_per_channel + right.samples_per_channel == n | ||
| recon = np.concatenate( | ||
| [ | ||
| np.frombuffer(bytes(left.data), dtype=np.int16), | ||
| np.frombuffer(bytes(right.data), dtype=np.int16), | ||
| ] | ||
| ) | ||
| assert np.array_equal(recon, np.arange(n * num_channels, dtype=np.int16)) | ||
|
|
||
|
|
||
| def test_split_frame_boundaries() -> None: | ||
| """Splitting at or beyond the edges returns an empty half and the original.""" | ||
| frame = _ramp_frame(100, 1) | ||
|
|
||
| empty, whole = _split_frame(frame, 0.0) | ||
| assert empty.samples_per_channel == 0 and len(empty.data) == 0 | ||
| assert whole.samples_per_channel == 100 | ||
|
|
||
| whole2, empty2 = _split_frame(frame, frame.duration * 2) | ||
| assert whole2.samples_per_channel == 100 | ||
| assert empty2.samples_per_channel == 0 and len(empty2.data) == 0 |
There was a problem hiding this comment.
Are those tests really useful? They're self-contained, and don't even import framework code
There was a problem hiding this comment.
nvm i see it imports _split_frame
2dca404 to
8b35e16
Compare
The recorder's encode thread only resolved `_close_fut` at the end of its normal loop. If the thread died on an unhandled exception, the future was never resolved and `RecorderIO.aclose()` blocked forever on `await asyncio.shield(self._close_fut)`, stalling session shutdown until the caller's aclose watchdog fired (~60s). Because the encoder runs on a daemon thread, any exception went to threading.excepthook (stderr) rather than the structured logger, so the failure was invisible in observability exports. - Wrap the encode loop in try/except/finally so `_close_fut` is always resolved (idempotent `_resolve_close_fut`), even when encoding raises. - Log encoder exceptions via `logger.exception` instead of losing them. - Cancel the previously-leaked `_forward_atask` in `aclose()`. Separately, `_split_frame` treated `rtc.AudioFrame.data` as a byte buffer, but it is a memoryview of int16 samples (format "h") that is indexed in samples, not bytes. It over-sliced by 2x, producing frames whose data length disagreed with `samples_per_channel`. This path runs only on interrupted playback and pause/silence insertion, corrupting the recording around those points. Index the buffer in samples and set each half's sample count correctly. Add regression tests for `_split_frame` (the encode path had no coverage). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
8b35e16 to
6d787c1
Compare
| input_rate=input_rate, | ||
| output_rate=self._sample_rate, | ||
| num_channels=num_channels, | ||
| try: |
There was a problem hiding this comment.
🔴 try/except/finally does not cover initialization code, leaving _close_fut unresolved on early failure
The try/except/finally block added for error handling in _encode_thread starts at line 205, but the initialization code that can fail—self._output_path.parent.mkdir() (line 153), av.open() (line 155), container.add_stream() (line 165)—all execute before the try block. If any of these raise an exception, the finally block never runs, so _close_fut is never resolved. This causes aclose() to hang forever at await asyncio.shield(self._close_fut) (recorder_io.py:90), and since aclose holds self._lock, the entire RecorderIO instance becomes deadlocked. The PR's intent was specifically to add error handling to the encode thread, but the scope is too narrow to cover these early failure points.
Prompt for agents
The try/except/finally block in _encode_thread (line 205) needs to be moved earlier to cover all the initialization code that can fail. Lines 152-179 contain statements that can raise exceptions (mkdir, av.open, container.add_stream, etc.), but they are outside the try block. If any of them fail, the finally block on line 293 never runs, _close_fut is never resolved, and aclose() hangs forever.
The fix is to move the try: statement to just after the constant definitions (after line 150), so that all code that can potentially fail is within the try/except/finally scope. This ensures the finally block always resolves _close_fut, even if initialization fails. Note that moving the try earlier also means the container variable may not exist in the except/finally block, so you should initialize container = None before the try and only use 'with container:' inside the try body if container was successfully opened, or restructure accordingly.
Was this helpful? React with 👍 or 👎 to provide feedback.
Summary
Two distinct bugs in
RecorderIO, both surfaced by a real session where a SIP caller hung up mid-goodbye while the "caller silence check-in" pause/resume feature was active. Symptom: session shutdown stalled ~60s and the recording was incomplete/garbled.Bug 1 —
aclose()hangs forever if the encode thread dies_encode_threadonly resolved_close_futat the end of its normal loop. If the thread raised anywhere in the loop, the future stayed unresolved andRecorderIO.aclose()blocked onawait asyncio.shield(self._close_fut), stalling session shutdown until the caller's aclose watchdog fired (~60s). Because the encoder runs on adaemonthread, the exception went tothreading.excepthook(stderr) instead of the structured logger, so it was invisible in observability exports.try/except/finallyso_close_futis always resolved (idempotent_resolve_close_fut), even when encoding raises.logger.exceptioninstead of losing them._forward_ataskinaclose().Bug 2 —
_split_framecorrupts audio on interrupted/paused playbackrtc.AudioFrame.datais a memoryview of int16 samples (format="h") — indexed in samples, not bytes._split_framecomputedbytes_per_sample = num_channels * 2and over-sliced by 2×, producing frames whosedatalength disagreed withsamples_per_channel. This path runs only on interrupted playback and pause/silence insertion, corrupting the recording around those points.split = samples_needed * num_channels), clampsamples_needed, and set each half's sample count correctly.Tests
_split_frameregression tests (consistency + lossless round-trip for mono/stereo, plus boundaries). The encode path previously had no coverage.tests/test_recording.pypass; lint clean.Note
The structural fix (Bug 1) makes the hang impossible regardless of which exception killed the thread, and the new
logger.exceptionwill surface the precise traceback if it recurs. Bug 2 is a proven correctness bug on the interrupted/paused code path exercised by this session.🤖 Generated with Claude Code