Skip to content

Augment DST#37

Merged
p14n merged 16 commits intomainfrom
augment-dst
Mar 28, 2025
Merged

Augment DST#37
p14n merged 16 commits intomainfrom
augment-dst

Conversation

@p14n
Copy link
Owner

@p14n p14n commented Mar 28, 2025

Addresses #33

Summary by Sourcery

Augment the distributed system's event processing capabilities with improved event ordering, tracking, and synchronization mechanisms

New Features:

  • Implement a deterministic event processing mechanism that ensures events are processed in order
  • Add a test executor for deterministic event processing testing

Enhancements:

  • Improve event processing reliability by adding high watermark (HWM) tracking for events
  • Enhance logging and error handling in event processing workflows
  • Add synchronization mechanisms to prevent race conditions in event delivery

Build:

  • Update Java compatibility to version 21
  • Add jqwik testing framework to support property-based testing

Tests:

  • Introduce a new deterministic consumer test that validates event ordering and delivery
  • Create a test async executor to simulate controlled event processing

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Mar 28, 2025

Reviewer's Guide by Sourcery

This pull request introduces several enhancements to the event processing pipeline. It ensures events are processed in the correct order by verifying the existence of previous events, prevents concurrent catch-up operations for the same topic, and improves logging for better traceability. Additionally, it ensures that the HWM is initialized only once and events are published only after a successful HWM update.

Sequence diagram for Event Processing with HWM Check

sequenceDiagram
    participant OP as OrderedProcessor
    participant DB as Database
    participant MB as MessageBroker

    OP->DB: Check for unprocessed prior events
    DB-->OP: Returns if prior events exist
    alt Prior events exist
        OP->OP: Skip event
    else No prior events
        OP->DB: Update event status to 'p'
        DB-->OP: Returns update status
        alt Update failed
            OP->OP: Skip event
        else Update successful
            OP->DB: Check if previous event exists (HWM check)
            DB-->OP: Returns if previous event exists
            alt Previous event does not exist
                OP->OP: Ignore event
            else Previous event exists
                OP->DB: Commit transaction
                DB-->OP: Acknowledge commit
                OP->MB: Publish event
                MB->MB: Deliver to subscribers
            end
        end
    end
Loading

Updated class diagram for Event

classDiagram
    class Event {
        String id
        String source
        String type
        String datacontenttype
        String dataschema
        String subject
        byte[] data
        Instant time
        Long idn
        String topic
        +create(String id, String source, String type, String datacontenttype, String dataschema, String subject, byte[] data) Event
        +create(String id, String source, String type, String datacontenttype, String dataschema, String subject, byte[] data, Instant time, Long idn, String topic) Event
    }
Loading

File-Level Changes

Change Details Files
Added a check to ensure that the previous event in a sequence has been processed before processing the current event.
  • Added a previousEventExists method to check if the previous event (based on idn) has reached the client.
  • Log a message if the previous event has not reached the client, and skip processing the current event.
  • Modified the process method to include the previousEventExists check.
src/main/java/com/p14n/postevent/processor/OrderedProcessor.java
Implemented a mechanism to ensure that catch-up operations are not run concurrently for the same topic.
  • Added signals and running fields to track catchup requests and execution status.
  • Modified onMessage to use synchronization to prevent concurrent catchup executions.
  • The catchup method is now re-triggered if new signals are received during its execution.
src/main/java/com/p14n/postevent/catchup/CatchupService.java
Ensured that the HWM (High Water Mark) is initialized only once for each topic.
  • Modified the SQL query in initializeHwm to use ON CONFLICT DO NOTHING to prevent duplicate HWM entries.
  • Call getCurrentHwm after the insert to ensure the HWM is initialized.
src/main/java/com/p14n/postevent/catchup/CatchupService.java
The broker now publishes events only if the HWM was updated.
  • The PersistentBroker now publishes events to the target broker only if the HWM was successfully updated.
  • The number of updated rows is checked to determine if the HWM was updated.
src/main/java/com/p14n/postevent/catchup/PersistentBroker.java
Added logging to track message reception in the gRPC server.
  • Added a log message in subscribeToEvents to indicate when a message is received for a specific topic.
src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcServer.java
The transactional broker is now passed an async executor.
  • The TransactionalBroker constructor now takes an AsyncExecutor.
  • The ConsumerClient now passes the AsyncExecutor to the TransactionalBroker.
src/main/java/com/p14n/postevent/ConsumerClient.java
src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!
  • Generate a plan of action for an issue: Comment @sourcery-ai plan on
    an issue to generate a plan of action for it.

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @p14n - I've reviewed your changes - here's some feedback:

Overall Comments:

  • Consider using a dedicated thread pool for asynchronous tasks to better manage resources.
  • The logging could be improved by including more context, such as the topic name.
Here's what I looked at during the review
  • 🟡 General issues: 5 issues found
  • 🟢 Security: all looks good
  • 🟢 Testing: all looks good
  • 🟡 Complexity: 1 issue found
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
int hwm = rs.getInt(1);
System.err.println("HWM: " + hwm + " IDN: " + event.idn());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Avoid using System.err.println for logging in production code.

Using the project's Logger would ensure consistent log formatting and levels. Consider replacing System.err.println with a logger call.

Suggested change
System.err.println("HWM: " + hwm + " IDN: " + event.idn());
LOGGER.debug("HWM: {} IDN: {}", hwm, event.idn());

public void publish(InT message) {

if(!canProcess(message)){
System.err.println("DMB got event " + message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Use the configured Logger instead of System.err.println.

Utilizing the Logger facilitates centralized log management and better control over logging levels. This change will enhance maintainability.

Suggested implementation:

package com.p14n.postevent.broker;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultMessageBroker<InT, OutT> {

    private static final Logger logger = LoggerFactory.getLogger(DefaultMessageBroker.class);
    @Override
    public void publish(InT message) {

        logger.info("DMB got event {}", message);
        if (!canProcess(message)) {
            return;
        }
        // Deliver to all subscribers
        for (MessageSubscriber<OutT> subscriber : subscribers) {
            logger.info("TO ASYNC {}", message);

            asyncExecutor.submit(() -> {
                try {
                    subscriber.onMessage(convert(message));

Comment on lines +189 to +194
signals.set(0);
catchup(message.topic);
running.set(false);
if (signals.get() > 0) {
onMessage(message);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Recursion in onMessage may risk stack overflows.

Re-invoking onMessage recursively when signals are pending could lead to deep recursion under heavy load. Consider an iterative approach or scheduling a new execution to handle pending signals.

Suggested change
signals.set(0);
catchup(message.topic);
running.set(false);
if (signals.get() > 0) {
onMessage(message);
}
do {
signals.set(0);
catchup(message.topic);
} while (signals.get() > 0);
running.set(false);


// Forward to actual subscriber after successful persistence
targetBroker.publish(event);
System.err.println("PB got event " + event.id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Replace System.err.println with proper logging.

To maintain logging consistency and log level management across modules, it is advisable to use the established Logger instead of System.err.println.

Suggested implementation:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// other imports...
public class PersistentBroker {
    private static final Logger LOGGER = LoggerFactory.getLogger(PersistentBroker.class);
            LOGGER.info("PB got event {}", event.id());

If PersistentBroker already has a logger instance declared, remove the duplicate declaration. Also, adjust the logger level if info is not appropriate.

Consumer<ChangeEvent<String, String>> consumer = record -> {
try {
Event event = changeEventToEvent(record);
System.err.println("LC got event " + event.id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Utilize the Logger rather than System.err.println for client notifications.

Switching to the Logger will keep logging behavior consistent across components and allow for better control over output, especially in production environments.

Suggested implementation:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// (existing imports)
public class LocalConsumer {

    private static final Logger logger = LoggerFactory.getLogger(LocalConsumer.class);
                logger.info("LC got event " + event.id());

If the project uses a different logging framework (e.g., Log4j2) adjust the imports and Logger instantiation accordingly. Ensure that the Logger configuration is set properly in your application configuration.

private AtomicInteger signals = new AtomicInteger(0);
private AtomicBoolean running = new AtomicBoolean(false);

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider using a loop with a try-finally block to process pending signals, which avoids recursion and reduces lock contention by using a dedicated lock and ensuring the running flag is reset even if an exception occurs.

The recursive call and mix of atomic flags can be simplified by processing pending signals in a single loop, preventing recursion and reducing lock contention. One option is to use a loop within a try-finally block to drain the queued signals. For example:

@Override
public void onMessage(SystemEvent message) {
    if (Objects.requireNonNull(message) == SystemEvent.CatchupRequired) {
        signals.incrementAndGet();
        synchronized (this) { // Using 'this' as the lock instead of running
            if (running.get()) {
                return;
            }
            running.set(true);
        }
        try {
            while (signals.getAndSet(0) > 0) {
                catchup(message.topic);
            }
        } finally {
            running.set(false);
        }
    }
}

Actionable Steps:

  1. Replace the recursive call with a while loop to process any queued catchup signals.
  2. Use a dedicated lock (e.g., synchronizing on this) to ensure mutual exclusion.
  3. Use a try-finally block to guarantee that the running flag is reset even if an exception occurs.

This keeps all functionality intact while reducing complexity and potential concurrency pitfalls.

@p14n p14n merged commit 737cf28 into main Mar 28, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant