Skip to content

Cursor catchup mechanism#21

Merged
p14n merged 10 commits intomainfrom
cursor-catchup-mechanism
Mar 10, 2025
Merged

Cursor catchup mechanism#21
p14n merged 10 commits intomainfrom
cursor-catchup-mechanism

Conversation

@p14n
Copy link
Owner

@p14n p14n commented Mar 10, 2025

Addresses #22

Summary by Sourcery

Implements a catchup mechanism for persistent subscribers to handle gaps in the event sequence. This includes creating a CatchupServer to fetch missing events and updating the database setup to include a messages table.

New Features:

  • Introduces a catchup mechanism to request and process batches of messages from the server to fill gaps in the event sequence for persistent subscribers.
  • Adds a CatchupServer component responsible for fetching events from the database within a specified range.

Tests:

  • Adds CatchupServerTest to verify the functionality of the CatchupServer, including fetching events and handling invalid parameters.

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Mar 10, 2025

Reviewer's Guide by Sourcery

This pull request introduces a catchup mechanism to handle gaps in event sequences. It includes the implementation of a CatchupServer to fetch events within a specified range, database setup changes, and associated tests.

Sequence diagram for Catchup Mechanism

sequenceDiagram
    participant PC as Persistent Consumer
    participant PR as Processor
    participant CC as Catchup Client
    participant CS as Catchup Server
    PC->>PR: New event received (idn)
    PR->>PR: Check CHWM (Contiguous High Water Mark)
    alt CHWM != idn-1
        PR->>CC: Trigger catchup mechanism
        CC->>CS: Request batch of messages (CHWM+1 to min idn)
        CS->>CC: Batch of messages
        CC->>PR: Fill in the gap
        PR->>PR: Update CHWM
        PR->>PR: Restart processor
    else CHWM == idn-1
        PR->>PR: Update CHWM
        PR->>BF: Process event
    end
Loading

Updated class diagram for DatabaseSetup

classDiagram
    class DatabaseSetup {
        -String jdbcUrl
        -String username
        -String password
        +DatabaseSetup(String jdbcUrl, String username, String password)
        +createSchemaIfNotExists() DatabaseSetup
        +createTableIfNotExists(String topic) DatabaseSetup
        +createMessagesTableIfNotExists() DatabaseSetup
        -getConnection() Connection
    }
Loading

Class diagram for CatchupServer

classDiagram
    class CatchupServer {
        -String topic
        -DataSource dataSource
        +CatchupServer(String topic, DataSource dataSource)
        +fetchEvents(long start, long end, int maxResults) List<Event>
    }
Loading

File-Level Changes

Change Details Files
Introduced a catchup mechanism to handle gaps in the event sequence for persistent consumers/processors.
  • Implemented logic to detect gaps in the event sequence.
  • Implemented logic to request a batch of messages from the server to fill the gaps.
  • Implemented logic to update the Contiguous High Water Mark (CHWM) as gaps are filled.
  • Implemented logic to stop the catchup mechanism when it detects it is overwriting live messages.
  • Added diagrams illustrating local and remote constant consumption with and without catchup.
README.md
Added a method to create a messages table if it doesn't exist.
  • Added a createMessagesTableIfNotExists method to the DatabaseSetup class.
  • The method creates a table named postevent.messages with columns for event data.
src/main/java/com/p14n/postevent/DatabaseSetup.java
Refactored PersistentSubscriberTest to use DatabaseSetup for schema and table creation.
  • Replaced manual schema and table creation with calls to DatabaseSetup methods.
  • Ensures the test environment is properly initialized with the required database schema and tables.
src/test/java/com/p14n/postevent/PersistentSubscriberTest.java
Implemented CatchupServer to fetch events within a specified range.
  • Created a CatchupServer class with a fetchEvents method.
  • The fetchEvents method retrieves events from the database within a given ID range, limiting the number of results.
  • Added validation to ensure start is less than or equal to end and maxResults is greater than zero.
src/main/java/com/p14n/postevent/CatchupServer.java
Created CatchupServerTest to test the CatchupServer functionality.
  • Added tests to verify the fetchEvents method returns the correct events.
  • Added tests to verify the maxResults parameter is respected.
  • Added tests to verify invalid parameters throw exceptions.
src/test/java/com/p14n/postevent/CatchupServerTest.java

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!
  • Generate a plan of action for an issue: Comment @sourcery-ai plan on
    an issue to generate a plan of action for it.

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @p14n - I've reviewed your changes - here's some feedback:

Overall Comments:

  • Consider adding a method to DatabaseSetup to drop the schema for testing purposes.
  • The createSchemaIfNotExists and createTableIfNotExists methods in DatabaseSetup could be combined into a single method for better readability.
Here's what I looked at during the review
  • 🟡 General issues: 2 issues found
  • 🟢 Security: all looks good
  • 🟡 Testing: 1 issue found
  • 🟢 Complexity: all looks good
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +48 to +49
@Test
public void testFetchEvents() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add a test case for fetching zero events.

It's important to test the scenario where no events are available within the specified range. This ensures the fetchEvents method handles this case gracefully and returns an empty list.

Suggested implementation:

    }

    @Test
    public void testFetchZeroEvents() throws Exception {
        // Do not publish any events. Verify fetchEvents returns an empty list.
        List<Event> events = catchupServer.fetchEvents();
        assertTrue(events.isEmpty(), "Expected no events to be fetched when none have been published");
    }

Make sure that the fetchEvents method is accessible and does not require any additional parameters for this test.
If fetchEvents requires parameters (for example, a time range or offset), modify the test accordingly and pass the appropriate values.

* The catchup mechanism looks for the next gap (CHWM+1 upwards) and repeats until there are no gaps
* The catchup mechanism restarts the processor
* Request a batch of messages from the server
* Write each message to the consumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Inaccurate description of catchup mechanism

The catchup mechanism writes messages to the database, not directly to the consumer. The consumer then reads from the database.

@p14n p14n merged commit 6a72dfe into main Mar 10, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant