feat: add async Stream API and enhance ergonomics#1
Conversation
isdaniel
commented
Dec 30, 2025
- Add EventStream and EventStreamRef implementing futures::Stream
- Add into_stream() and stream() methods for ergonomic event iteration
- Update ChangeEvent to have non-optional LSN field for consistency
- Add futures dependency for Stream trait support
- Enhance documentation with Stream API examples
- Improve error messages using inline format strings
- Add comprehensive unit tests for stream functionality
- Update README with dual API examples (Stream vs Polling)
- Add AsRef<[u8]> implementation for BufferWriter
- Refactor clippy warnings and improve code quality
- Add EventStream and EventStreamRef implementing futures::Stream - Add into_stream() and stream() methods for ergonomic event iteration - Update ChangeEvent to have non-optional LSN field for consistency - Add futures dependency for Stream trait support - Enhance documentation with Stream API examples - Improve error messages using inline format strings - Add comprehensive unit tests for stream functionality - Update README with dual API examples (Stream vs Polling) - Add AsRef<[u8]> implementation for BufferWriter - Refactor clippy warnings and improve code quality
Summary of ChangesHello @isdaniel, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request primarily introduces a new Stream API for consuming PostgreSQL logical replication events, leveraging the futures::Stream trait for a more ergonomic, iterator-like interface. Key changes include adding the futures crate and related dependencies (futures-channel, futures-executor, futures-macro, futures-task, futures-util, pin-utils, slab), and updating README.md and lib.rs documentation to showcase both the new Stream API and the existing Polling API. The LogicalReplicationStream now provides into_stream() and stream() methods to create EventStream and EventStreamRef types, which implement futures::Stream and handle automatic retries, reconnections, and cancellation. Additionally, the ChangeEvent struct was modified to make the lsn field non-optional, and its constructors were updated accordingly. Minor code style improvements were made across several files, primarily updating format! macros to use the shorthand {var} syntax and refining test assertions. Review comments highlighted that the tokio::time::sleep call in the retry logic should be wrapped in a tokio::select! to ensure responsive cancellation during backoff, and that futures::Stream implementations should return Poll::Ready(None) instead of Some(Err(ReplicationError::Cancelled(...))) for graceful termination upon cancellation.
| // Exponential backoff before retry | ||
| let delay = Duration::from_millis(1000 * (1 << (attempt - 1))); | ||
| debug!("Waiting {:?} before retry attempt {}", delay, attempt + 1); | ||
| tokio::time::sleep(delay).await; |
There was a problem hiding this comment.
The tokio::time::sleep call in the retry logic doesn't handle cancellation. If the stream is cancelled while in this sleep, it will wait for the full delay before checking the cancellation token again in the next loop iteration. This can delay graceful shutdown.
To make cancellation more responsive, you can use tokio::select! to race the sleep future against the cancellation token's cancelled() future.
tokio::select! {
biased;
_ = tokio::time::sleep(delay) => {},
_ = cancellation_token.cancelled() => {
return Err(ReplicationError::cancelled("Operation cancelled during retry backoff"));
}
}| if self.cancellation_token.is_cancelled() { | ||
| return Poll::Ready(Some(Err(ReplicationError::Cancelled( | ||
| "Stream cancelled".to_string(), | ||
| )))); | ||
| } |
There was a problem hiding this comment.
When the stream is cancelled, you're returning Some(Err(ReplicationError::Cancelled(...))). While this works, it's more idiomatic for a Rust stream that is gracefully terminated to return Poll::Ready(None). This signals the end of the stream to consumers like while let Some(...) loops without them needing to handle a specific error case for cancellation.
if self.cancellation_token.is_cancelled() {
return Poll::Ready(None);
}| if self.cancellation_token.is_cancelled() { | ||
| return Poll::Ready(Some(Err(ReplicationError::Cancelled( | ||
| "Stream cancelled".to_string(), | ||
| )))); | ||
| } |
There was a problem hiding this comment.
When the stream is cancelled, you're returning Some(Err(ReplicationError::Cancelled(...))). While this works, it's more idiomatic for a Rust stream that is gracefully terminated to return Poll::Ready(None). This signals the end of the stream to consumers like while let Some(...) loops without them needing to handle a specific error case for cancellation. This is the same feedback as for EventStream.
if self.cancellation_token.is_cancelled() {
return Poll::Ready(None);
}available. The stream now automatically creates and manages a SharedLsnFeedback instance that is directly accessible via the `shared_lsn_feedback` field. Changes: - Remove `set_shared_lsn_feedback()` method - feedback is now auto-created - Make `shared_lsn_feedback` field public for direct access - Update all documentation and examples to reflect new usage pattern - Simplify feedback handling logic by removing Option wrapper - Improve Stream cancellation: return None instead of Error on cancel - Add cancellation support during retry backoff periods - Update README.md and lib.rs with new API examples Breaking Changes: - Remove `set_shared_lsn_feedback()` method (replaced by direct field access) - Change `shared_lsn_feedback` from `Option<Arc<SharedLsnFeedback>>` to `Arc<SharedLsnFeedback>`