Address PR review feedback: enhance safety, documentation, and test coverage for stream lock detection#761
Conversation
|
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
…est coverage Co-authored-by: TooTallNate <71256+TooTallNate@users.noreply.github.com>
|
Just as a heads up, I was blocked by some firewall rules while working on your feedback. Expand below for details. |
d15050d
into
12-20-fix_stream_serialization_to_resolve_when_user_releases_lock_instead_of_waiting_for_stream_to_close
There was a problem hiding this comment.
Pull request overview
This PR addresses review feedback on the stream serialization lock detection implementation by enhancing safety, documentation, and test coverage. The changes focus on making the polling mechanism more robust and well-documented.
Key changes:
- Added comprehensive documentation explaining the 100ms polling interval rationale and tradeoffs
- Implemented protection against concurrent polling operations on the same stream state
- Fixed a race condition by moving the
streamEndedcheck to immediately after read operations - Enhanced error handling with detailed comments in lock detection functions
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
packages/core/src/flushable-stream.ts |
Added polling interval documentation, concurrent polling protection via interval ID tracking, race condition fix, and enhanced error handling with split try-catch blocks |
packages/core/src/flushable-stream.test.ts |
Added 6 new test cases covering write errors, pollReadableLock behavior, concurrent writes, duplicate polling prevention, and stream termination scenarios |
docs/content/docs/foundations/streaming.mdx |
Clarified lock re-acquisition behavior and that explicit close is recommended but not required to prevent hanging |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| <Callout type="warn"> | ||
| **The lock MUST be released to prevent the function from hanging.** If you acquire a lock but never release it, the serverless function will remain active until it times out, even after the step returns and the workflow continues. | ||
| **The lock MUST be released to prevent the function from hanging.** If you acquire a lock but never release it, the serverless function will remain active until it times out, even after the step returns and the workflow continues. Releasing the lock is sufficient; while explicitly closing the stream is safe and recommended for cleanup, it is not required to prevent the function from hanging. |
There was a problem hiding this comment.
The sentence structure is confusing with multiple nested clauses. Consider breaking this into two clearer sentences: "The lock MUST be released to prevent the function from hanging. If you acquire a lock but never release it, the serverless function will remain active until it times out, even after the step returns and the workflow continues. Releasing the lock is sufficient; explicitly closing the stream is safe and recommended for cleanup but not required to prevent hanging."
| **The lock MUST be released to prevent the function from hanging.** If you acquire a lock but never release it, the serverless function will remain active until it times out, even after the step returns and the workflow continues. Releasing the lock is sufficient; while explicitly closing the stream is safe and recommended for cleanup, it is not required to prevent the function from hanging. | |
| **The lock MUST be released to prevent the function from hanging.** If you acquire a lock but never release it, the serverless function will remain active until it times out, even after the step returns and the workflow continues. Releasing the lock is sufficient. Explicitly closing the stream is safe and recommended for cleanup, but it is not required to prevent the function from hanging. |
| it('should prevent multiple simultaneous polling operations on writable', async () => { | ||
| const { readable, writable } = new TransformStream<string, string>(); | ||
| const mockSink = new WritableStream<string>(); | ||
| const state = createFlushableState(); | ||
|
|
||
| // Start piping in background | ||
| flushablePipe(readable, mockSink, state).catch(() => {}); | ||
|
|
||
| // Start polling multiple times | ||
| pollWritableLock(writable, state); | ||
| pollWritableLock(writable, state); | ||
| pollWritableLock(writable, state); | ||
|
|
||
| // Should only have one interval active | ||
| expect(state.writablePollingInterval).toBeDefined(); | ||
|
|
||
| // Write and release to clean up | ||
| const userWriter = writable.getWriter(); | ||
| await userWriter.write('data'); | ||
| userWriter.releaseLock(); | ||
|
|
||
| // Wait for cleanup | ||
| await new Promise((r) => setTimeout(r, LOCK_POLL_INTERVAL_MS + 50)); | ||
| }); |
There was a problem hiding this comment.
The test validates that duplicate polling calls don't create multiple intervals, but it doesn't verify that the interval is actually cleaned up after the operation completes. Consider adding an assertion after the wait to check that state.writablePollingInterval is undefined after cleanup, which would confirm the interval was properly cleared.
| it('should prevent multiple simultaneous polling operations on readable', async () => { | ||
| let controller: ReadableStreamDefaultController<string>; | ||
| const source = new ReadableStream<string>({ | ||
| start(c) { | ||
| controller = c; | ||
| }, | ||
| }); | ||
|
|
||
| const mockSink = new WritableStream<string>(); | ||
| const state = createFlushableState(); | ||
|
|
||
| // Start piping in background | ||
| flushablePipe(source, mockSink, state).catch(() => {}); | ||
|
|
||
| // Start polling multiple times | ||
| pollReadableLock(source, state); | ||
| pollReadableLock(source, state); | ||
| pollReadableLock(source, state); | ||
|
|
||
| // Should only have one interval active | ||
| expect(state.readablePollingInterval).toBeDefined(); | ||
|
|
||
| // Close to clean up | ||
| controller?.close(); | ||
|
|
||
| // Wait for cleanup | ||
| await new Promise((r) => setTimeout(r, 100)); | ||
| }); |
There was a problem hiding this comment.
The test validates that duplicate polling calls don't create multiple intervals, but it doesn't verify that the interval is actually cleaned up after the operation completes. Consider adding an assertion after the wait to check that state.readablePollingInterval is undefined after cleanup, which would confirm the interval was properly cleared.
|
|
||
| <Callout type="warn"> | ||
| **Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock. | ||
| **Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock, and do not rely on re-acquiring a lock on the same stream within the same step after it has been released, even if internal implementation details might technically allow it. |
There was a problem hiding this comment.
The sentence is quite long and has some redundancy. Consider simplifying to: "The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock. Re-acquiring a lock on the same stream within the same step after it has been released is not supported."
| **Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock, and do not rely on re-acquiring a lock on the same stream within the same step after it has been released, even if internal implementation details might technically allow it. | |
| **Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock. Re-acquiring a lock on the same stream within the same step after it has been released is not supported. |
…overage for stream lock detection (#761) * Initial plan * Address PR review comments: improve docs, add safety checks, expand test coverage Co-authored-by: TooTallNate <71256+TooTallNate@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: TooTallNate <71256+TooTallNate@users.noreply.github.com>
Description
Addresses review feedback on the stream serialization lock detection implementation. Key improvements:
Documentation
LOCK_POLL_INTERVAL_MSpolling rationale: Web Streams API provides no lock-release event, requiring periodic reader/writer acquisition attempts. 100ms balances detection latency against serverless execution cost.close()is recommended but not required to prevent hangingSafety & Correctness
FlushableStreamState)state.streamEndedimmediately after read, before starting write operationsstate.reject()+ re-throw) for different caller patternsTest Coverage
pollReadableLockbehavior (previously onlypollWritableLocktested)How did you test your changes?
Added 6 new test cases covering error handling, concurrent operations, and edge cases in
flushable-stream.test.ts. All tests validate the polling mechanism and state management under various failure scenarios.Verified build and lint pass for core package.
PR Checklist - Required to merge
pnpm changesetwas run to create a changelog for this PRpnpm changeset --emptyif you are changing documentation or workbench appsgit commit --signoffon your commits)💡 You can make Copilot smarter by setting up custom instructions, customizing its development environment and configuring Model Context Protocol (MCP) servers. Learn more Copilot coding agent tips in the docs.