Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent enqueuing into closed ReadableStream #1781

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

CodeMan62
Copy link

@CodeMan62 CodeMan62 commented Mar 10, 2025

Closes #1751

✅ Checklist

  • I have followed every step in the contributing guide
  • The PR title follows the convention.
  • I ran and tested the code works

Testing

  1. Ran package tests using:
pnpm run test --filter @trigger.dev/core
  1. Verified that the ReadableStream properly handles closure states
  2. Confirmed that the stream cleanup works as expected
  3. Tested the realtime runs functionality to ensure no errors after extended periods

Changelog

  • Added #isStreamClosed flag to track stream state
  • Added checks to prevent enqueueing data after stream closure
  • Improved stream cleanup in the cancel handler
  • Enhanced error handling to properly mark stream as closed
  • Added proper cleanup of resources in stop() method
  • Fixed the "Cannot enqueue a chunk into a closed readable stream" error that occurred after 5-15 minutes

Screenshots

N/A - This is a backend fix for stream handling, no UI changes.

💯

Summary by CodeRabbit

  • Bug Fixes

    • Improved streaming reliability by ensuring that no additional messages are processed once a stream is stopped or canceled.
    • Enhanced error handling, resulting in a more stable and predictable streaming experience.
  • Refactor

    • Optimized stream lifecycle management for improved performance and consistency.

Copy link

changeset-bot bot commented Mar 10, 2025

⚠️ No Changeset found

Latest commit: 0967cad

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Mar 10, 2025

Walkthrough

The changes add a private boolean property #isStreamClosed to the ReadableShapeStream class to track the state of the stream. The stop, cancel, and transform methods have been modified to check or update this flag, ensuring no further message processing occurs when the stream is closed. Error handling in the transform method now wraps processing logic in a try-catch block, logging errors and calling an error handler that also sets the stream to a closed state and unsubscribes from the stream.

Changes

File Path Change Summary
packages/core/.../apiClient/stream.ts Added a private boolean property #isStreamClosed. Modified the stop, cancel, and transform methods to check/update this flag, and enhanced error handling with a try-catch block that logs errors and unsubscribes via the error handler.

Sequence Diagram(s)

Loading
sequenceDiagram
    participant C as Client
    participant S as ReadableShapeStream
    participant U as Unsubscribe
    participant EH as ErrorHandler

    C->>S: stop()
    S->>S: set #isStreamClosed = true
    S->>U: call unsubscribe function

    C->>S: cancel()
    S->>S: set #isStreamClosed = true
    S->>U: call unsubscribe function

    S->>S: transform(message)
    alt Stream Open
        S->>S: Process message
    else Stream Closed
        S->>S: Skip processing
    end

    S->>S: Error in transform
    alt Error Caught
        S->>EH: invoke error handler
        S->>U: call unsubscribe function
    end

Poem

I'm a rabbit with a happy beat,
In code streams, I'm light on my feet.
When the stream is closed, I hop away,
Unsubscribing troubles, safe and gray.
Errors caught keep my trail so neat!
🐰✨

Warning

There were issues while running some tools. Please review the errors and either fix the tool’s configuration or disable the tool if it’s a critical failure.

🔧 ESLint

If the error stems from missing dependencies, add them to the package.json file. For unrecoverable errors (e.g., due to private dependencies), disable the tool in the CodeRabbit configuration.

Scope: all 2 workspace projects
 ERR_PNPM_OPTIONAL_DEPS_REQUIRE_PROD_DEPS  Optional dependencies cannot be installed without production dependencies


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6cf86d5 and e56bc39.

📒 Files selected for processing (1)
  • packages/core/src/v3/apiClient/stream.ts (3 hunks)
🔇 Additional comments (8)
packages/core/src/v3/apiClient/stream.ts (8)

92-92: Excellent addition of a stream state tracking flag.

The new #isStreamClosed boolean property is a clean solution for tracking the stream's closed state. This will serve as the foundation for preventing data from being enqueued into a closed ReadableStream.


94-97: Good implementation in the stop method.

Setting the #isStreamClosed flag to true before calling the unsubscribe function ensures that no further messages will be processed after stopping the stream, which properly addresses the issue of enqueueing into a closed stream.


106-110: Critical fix to prevent enqueueing after stream closure.

This check prevents the controller from enqueueing messages when the stream is closed, directly addressing the "Cannot enqueue a chunk into a closed readable stream" error mentioned in the PR objectives.


114-117: Proper resource cleanup in cancel handler.

The updated cancel method now sets the stream as closed and calls the unsubscribe function, ensuring proper cleanup when the consumer cancels the stream. This prevents potential memory leaks and lingering connections.


123-125: Effective early check for closed stream state.

This early return pattern efficiently prevents unnecessary message processing when the stream is already closed, improving performance and preventing potential errors.


127-170: Robust error handling with try-catch implementation.

Wrapping the message processing logic in a try-catch block improves error resilience. The error handling appropriately logs the error and calls the error handler, ensuring that errors during processing don't crash the application and are properly handled.


158-165: Additional safety check before enqueueing rows.

While this check might seem redundant with the early return at lines 123-125, it provides an extra layer of protection specifically for the row enqueueing process. This defensive programming approach ensures that even if the stream becomes closed during message processing, no rows will be enqueued.


214-215: Complete cleanup on error.

Setting #isStreamClosed to true and calling unsubscribe in the error handler ensures proper cleanup when errors occur. This prevents resource leaks and further attempts to enqueue data after an error has occurred.

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

coderabbitai bot commented Mar 12, 2025

🧩 Analysis chain

I'll review this PR for you. Let me gather more information about the changes first.


🏁 Script executed:

#!/bin/bash
# Get the PR diff to examine the changes
git diff HEAD^ HEAD

Length of output: 209


I need to first check out the PR to properly review the changes. Let me try again.


🏁 Script executed:

#!/bin/bash
# First, let's check if we can fetch the PR
git fetch origin pull/1781/head:pr-1781
git checkout pr-1781

# Let's look at the diff
git diff origin/main...pr-1781

Length of output: 272


🏁 Script executed:

#!/bin/bash
# Let's check the commit history on this PR branch to understand what changes were made
git log -n 5 --oneline

# Let's also check what files were changed in the PR
git diff --name-only HEAD~1 HEAD

# Now, let's look at the specific file mentioned in the summary
cat packages/core/src/v3/apiClient/stream.ts

Length of output: 6943


@CodeMan62: Yes, this PR looks excellent. The changes properly address the issue of enqueueing data into closed ReadableStreams by:

  1. Adding the #isStreamClosed flag to track stream state
  2. Adding appropriate checks before enqueueing data
  3. Ensuring the flag is set to true in all necessary places (stop(), cancel handler, error handler)
  4. Improving error handling with proper stream closure

These changes will effectively prevent the "Cannot enqueue a chunk into a closed readable stream" error that was occurring after 5-15 minutes of operation.

✅ Actions performed

Comments resolved.

@CodeMan62
Copy link
Author

@ericallam can you check this PR and tell me if there is any problem in it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Realtime bug: Failed to execute 'enqueue' on ReadableStreamDefaultController
1 participant