Support parallel MotionViews and parallelized frame production in video adapters#31
Conversation
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 9 minutes and 25 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Summary
This PR introduces parallel video frame production across multiple MotionView instances to improve performance. The implementation successfully adds concurrent frame capture and processing using coroutines.
Critical Issues Found
I've identified 4 critical defects that must be addressed before merge:
- Chunk size calculation bug: When
totalFrames < workerCount, some workers receive invalid empty frame ranges (startFrame > endFrame), causing incorrect processing or skipped frames - Race condition in progress reporting: Multiple workers invoke progress callbacks concurrently without ordering, causing out-of-sequence progress updates that break progress tracking
- Shared lazy UUID across calls: The
subDirNamelazy property is reused across multipleproduceVideocalls on the same adapter instance, creating race conditions and file conflicts - Return deleted file on FFmpeg failure: When FFmpeg fails, the code deletes the output file but still returns it, causing crashes for callers expecting valid files
All issues have specific corrected code suggestions attached. Please review and apply the fixes.
You can now have the agent implement changes and create commits directly on your pull request's source branch. Simply comment with /q followed by your request in natural language to ask the agent to make changes.
| throw IllegalStateException("Unable to save frame $frame", e) | ||
| } | ||
|
|
||
| progressListener?.invoke(frame, frameBitmap) |
There was a problem hiding this comment.
🛑 Race Condition: Concurrent progress callbacks without ordering. Multiple parallel workers invoke the progress listener concurrently (line 84), causing out-of-order progress reporting. Frame 100 could be reported before frame 50, breaking progress tracking. Use a thread-safe mechanism to sequence callbacks or document this behavior.
| progressListener?.let { | ||
| it(i, frameBitmap) | ||
| } | ||
| progressListener?.invoke(frame, frameBitmap) |
There was a problem hiding this comment.
🛑 Race Condition: Concurrent progress callbacks without ordering. Multiple parallel workers invoke the progress listener concurrently (line 84), causing out-of-order progress reporting. Frame 100 could be reported before frame 50, breaking progress tracking. Use a thread-safe mechanism to sequence callbacks or document this behavior.
| val safeMotionComposerViews = motionComposerViews.ifEmpty { error("At least one MotionView is required") } | ||
| val workerCount = | ||
| minOf( | ||
| totalFrames.coerceAtLeast(1), | ||
| safeMotionComposerViews.size.coerceAtLeast(1), | ||
| Runtime.getRuntime().availableProcessors().coerceAtLeast(1), | ||
| ) | ||
| val chunkSize = ((totalFrames + workerCount) - 1) / workerCount |
There was a problem hiding this comment.
🛑 Logic Error: Chunk size calculation can create empty ranges. When totalFrames < workerCount, some workers get startFrame > endFrame. For example, with 2 frames and 4 workers, worker 2 gets startFrame=2, endFrame=1 (an invalid empty range). Adjust the calculation to prevent workers from having empty or invalid ranges.
| val safeMotionComposerViews = motionComposerViews.ifEmpty { error("At least one MotionView is required") } | |
| val workerCount = | |
| minOf( | |
| totalFrames.coerceAtLeast(1), | |
| safeMotionComposerViews.size.coerceAtLeast(1), | |
| Runtime.getRuntime().availableProcessors().coerceAtLeast(1), | |
| ) | |
| val chunkSize = ((totalFrames + workerCount) - 1) / workerCount | |
| val safeMotionComposerViews = motionComposerViews.ifEmpty { error("At least one MotionView is required") } | |
| val workerCount = | |
| minOf( | |
| totalFrames.coerceAtLeast(1), | |
| safeMotionComposerViews.size.coerceAtLeast(1), | |
| Runtime.getRuntime().availableProcessors().coerceAtLeast(1), | |
| ) | |
| val chunkSize = if (totalFrames < workerCount) 1 else ((totalFrames + workerCount) - 1) / workerCount |
| val safeMotionComposerViews = motionComposerViews.ifEmpty { error("At least one MotionView is required") } | ||
| val workerCount = | ||
| minOf( | ||
| totalFrames.coerceAtLeast(1), | ||
| safeMotionComposerViews.size.coerceAtLeast(1), | ||
| Runtime.getRuntime().availableProcessors().coerceAtLeast(1), | ||
| ) | ||
| val chunkSize = ((totalFrames + workerCount) - 1) / workerCount |
There was a problem hiding this comment.
🛑 Logic Error: Chunk size calculation can create empty ranges. When totalFrames < workerCount, some workers get startFrame > endFrame. For example, with 2 frames and 4 workers, worker 2 gets startFrame=2, endFrame=1 (an invalid empty range). Adjust the calculation to prevent workers from having empty or invalid ranges.
| val safeMotionComposerViews = motionComposerViews.ifEmpty { error("At least one MotionView is required") } | |
| val workerCount = | |
| minOf( | |
| totalFrames.coerceAtLeast(1), | |
| safeMotionComposerViews.size.coerceAtLeast(1), | |
| Runtime.getRuntime().availableProcessors().coerceAtLeast(1), | |
| ) | |
| val chunkSize = ((totalFrames + workerCount) - 1) / workerCount | |
| val safeMotionComposerViews = motionComposerViews.ifEmpty { error("At least one MotionView is required") } | |
| val workerCount = | |
| minOf( | |
| totalFrames.coerceAtLeast(1), | |
| safeMotionComposerViews.size.coerceAtLeast(1), | |
| Runtime.getRuntime().availableProcessors().coerceAtLeast(1), | |
| ) | |
| val chunkSize = if (totalFrames < workerCount) 1 else ((totalFrames + workerCount) - 1) / workerCount |
There was a problem hiding this comment.
Code Review
This pull request introduces parallel video frame processing by updating the VideoProducerAdapter to accept a list of MotionView objects and utilizing Kotlin coroutines for concurrent execution in the Ffmpeg and Android adapters. While this improves performance, several issues were identified regarding memory management; specifically, capturedBitmap and frameBitmap are not being recycled after use, which could lead to OutOfMemoryError. Furthermore, the progressListener is now invoked concurrently, necessitating thread-safe implementations in the calling code.
| val capturedBitmap = captureFrameBitmap(motionComposerView, frame) | ||
| val frameBitmap: Bitmap = | ||
| capturedBitmap.compressToBitmap(motionConfig.outputQuality) |
There was a problem hiding this comment.
The capturedBitmap returned by captureFrameBitmap is not recycled after being compressed. Since video production can involve hundreds of frames, failing to recycle these bitmaps will lead to rapid memory exhaustion and potential OutOfMemoryError. It should be recycled immediately after compressToBitmap is called.
| val capturedBitmap = captureFrameBitmap(motionComposerView, frame) | |
| val frameBitmap: Bitmap = | |
| capturedBitmap.compressToBitmap(motionConfig.outputQuality) | |
| val capturedBitmap = captureFrameBitmap(motionComposerView, frame) | |
| val frameBitmap: Bitmap = | |
| capturedBitmap.compressToBitmap(motionConfig.outputQuality) | |
| capturedBitmap.recycle() |
| try { | ||
| context.saveBitmapToCacheFolder( | ||
| frameBitmap, | ||
| subDirName, | ||
| String.format(Locale.getDefault(), "%05d.png", frame), | ||
| ) | ||
| } catch (e: Exception) { | ||
| Log.e(TAG, "Error saving frame $frame: ${e.message}", e) | ||
| throw IllegalStateException("Unable to save frame $frame", e) | ||
| } | ||
|
|
||
| progressListener?.invoke(frame, frameBitmap) |
There was a problem hiding this comment.
The frameBitmap should be recycled to free up memory after it has been saved and passed to the progress listener. Additionally, please be aware that progressListener is now invoked concurrently from multiple coroutines. If the listener implementation is not thread-safe (e.g., it updates a UI component or a non-atomic counter), this could lead to race conditions or crashes. Consider documenting this requirement or synchronizing the calls.
try {
context.saveBitmapToCacheFolder(
frameBitmap,
subDirName,
String.format(Locale.getDefault(), "%05d.png", frame),
)
} catch (e: Exception) {
Log.e(TAG, "Error saving frame $frame: ${e.message}", e)
frameBitmap.recycle()
throw IllegalStateException("Unable to save frame $frame", e)
}
progressListener?.invoke(frame, frameBitmap)
frameBitmap.recycle()| val capturedBitmap = captureFrameBitmap(motionComposerView, frame) | ||
| val frameBitmap: Bitmap = | ||
| capturedBitmap.compressToBitmap(motionConfig.outputQuality) |
There was a problem hiding this comment.
The capturedBitmap is not recycled after the compressed version is created. In a parallelized environment producing many frames, this will significantly increase memory pressure. It should be recycled as soon as it is no longer needed.
| val capturedBitmap = captureFrameBitmap(motionComposerView, frame) | |
| val frameBitmap: Bitmap = | |
| capturedBitmap.compressToBitmap(motionConfig.outputQuality) | |
| val capturedBitmap = captureFrameBitmap(motionComposerView, frame) | |
| val frameBitmap: Bitmap = | |
| capturedBitmap.compressToBitmap(motionConfig.outputQuality) | |
| capturedBitmap.recycle() |
| try { | ||
| context.saveBitmapToCacheFolder( | ||
| frameBitmap, | ||
| subDirName, | ||
| String.format(Locale.getDefault(), "%05d.png", frame), | ||
| ) | ||
| } catch (e: Exception) { | ||
| Log.e(TAG, "Error saving frame $frame: ${e.message}", e) | ||
| throw IllegalStateException("Unable to save frame $frame", e) | ||
| } | ||
|
|
||
| progressListener?.let { | ||
| it(i, frameBitmap) | ||
| } | ||
| progressListener?.invoke(frame, frameBitmap) |
There was a problem hiding this comment.
To prevent memory leaks, frameBitmap must be recycled after use. Also, note that the progressListener is called concurrently across different worker threads. Ensure that any implementation of this listener is thread-safe to avoid synchronization issues.
try {
context.saveBitmapToCacheFolder(
frameBitmap,
subDirName,
String.format(Locale.getDefault(), "%05d.png", frame),
)
} catch (e: Exception) {
Log.e(TAG, "Error saving frame $frame: ${e.message}", e)
frameBitmap.recycle()
throw IllegalStateException("Unable to save frame $frame", e)
}
progressListener?.invoke(frame, frameBitmap)
frameBitmap.recycle()
Motivation
Description
VideoProducerAdapter.produceVideosignature to acceptmotionComposerViews: List<MotionView>instead of a singleMotionViewand updated call sites accordingly.AndroidVideoProducerAdapterandFfmpegVideoProducerAdapterto split the total frames into worker chunks, launch concurrent coroutines (Dispatchers.Default) to capture, compress and save frames into a cache subdirectory, and await completion before encoding; addedcaptureFrameBitmaphelper that synchronizes on theMotionViewduringforFrame(...).getViewBitmap()to avoid concurrent access issues.JCodecVideoProducerAdapterto acceptmotionComposerViewsand use the first view (preserving previous behavior) for encoding withAndroidSequenceEncoder.IllegalStateExceptionon failure), ensured cache subdirectory lifecycle (clear/create), and preserved progress callback invocation semantics.MotionVideoProducerto acceptparallelMotionViews(andwith(...)factory) and to pass either the provided parallel views or the singleMotionComposerViewinto the adapter.Testing
./gradlew build, which completed successfully../gradlew test, and the test suite passed locally.Codex Task