Skip to content

Conversation

@jingy-li
Copy link
Contributor

@jingy-li jingy-li commented Apr 18, 2025

Problem Statement

RocksDB keeps new data in memory before writing to disk. For example: 200 partitions at 16MB each. In extreme case, 3.2GB of data (in small key-value pairs) can exists only in memory.

Since snapshots only capture data already written to disk, failing to flush before creating a snapshot means those in-memory records won't be included.

Previously we create snapshot directly call getPartitionOrThrow(partitionId).createSnapshot(); However, now we want to call execSyncOffsetCommandAsync in SIT before create snapshot. Then this PR restructure the snapshot creation logic:

(1) Add a snapshot creation listener to each partition.
(2) When need to snapshot creation, partition triggers the notifySnapshotCreationListener event.
(3) Subsequently, the listener executes syncOffsetAndCreateSnapshot, which is overridden at SIT.
(4) This syncOffsetAndCreateSnapshot initiates execSyncOffsetCommandAsync to send the SYNC OFFSET command to the drainer. Upon asynchronous completion of this command, the partition then trigger the snapshot creation.

Solution

  1. Add a snapshot creation listener.
  2. Trigger the listener's snapshot-creation event handler to sync the offset and snapshot creation in SIT.

Code changes

  • Added new code behind a config. If so list the config names and their default values in the PR description.
  • Introduced new log lines.
  • Confirmed if logs need to be rate limited to avoid excessive logging.

Concurrency-Specific Checks

Both reviewer and PR author to verify

  • Code has no race conditions or thread safety issues.
  • Proper synchronization mechanisms (e.g., synchronized, RWLock) are used where needed.
  • No blocking calls inside critical sections that could lead to deadlocks or performance degradation.
  • Verified thread-safe collections are used (e.g., ConcurrentHashMap, CopyOnWriteArrayList).
  • Validated proper exception handling in multi-threaded code to avoid silent thread termination.

How was this PR tested?

  • New unit tests added.
  • New integration tests added.
  • Modified or extended existing tests.
  • Verified backward compatibility (if applicable).

Does this PR introduce any user-facing or breaking changes?

  • No. You can skip the rest of this section.
  • Yes. Clearly explain the behavior change and its impact.

@jingy-li jingy-li requested a review from gaojieliu April 22, 2025 20:37
@jingy-li jingy-li changed the title [server][dvc] flush RocksDB before create snapshot [server][dvc] add pre snapshot creation listener for disk flush and offset sync Apr 22, 2025
}

// 2. Create snapshot for blob transfer
storageEngine.createSnapshot(storagePartitionConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we will create a snapshot for hybrid store in Server all the time when this feature is enabled?
Who will clean this up if there is no blob transfer request?
Unused checkpoint can keep the stale data lingering around.

LOGGER
.info("Beginning pre-snapshot offset sync for store: {}, partition: {}", storeNameAndVersion, partitionId);
try {
syncOffset(storeNameAndVersion, getPartitionConsumptionState(partitionId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can trigger race conditions.
So far, this function is mainly triggered in drainer thread, and if we invoke it here, the race condition can happen.
If we make it synchronized, it may not solve all the issues since we would like to invoke this function as the last step of the processing.
Some related javadoc:

/**
     * Syncing offset checking in syncOffset() should be the very last step for processing a record.
     *
     * Check whether offset metadata checkpoint will happen; if so, update the producer states recorded in OffsetRecord
     * with the updated producer states maintained in {@link #drainerDiv}
     */
    if (shouldSyncOffset(partitionConsumptionState, record, leaderProducedRecordContext)) {
      updateOffsetMetadataAndSyncOffset(partitionConsumptionState);
    }

Can we leverage something like this?

CompletableFuture<Void> cmdFuture = storeBufferService.execSyncOffsetCommandAsync(topicPartition, this);
              waitForSyncOffsetCmd(cmdFuture, topicPartition);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the syncOffset to this asynchronous execSyncOffsetCommandAsync command.

Because we need to wait for this command completion before creating a snapshot for the partition.Resulting we moved some of the snapshot creation logic to SIT.

The updated approach is as follows:
(1) In SIT, we add a snapshot creation listener for all stores all partitions.
(2) For batch stores, we notify the listener to trigger snapshot creation after EOP. For hybrid stores, we fetch the partition and then notify the listener based on blob transfer requests.
(3) When the listener receives the notification, it executes syncOffsetAndCreateSnapshot, a method overridden in the SIT class, to synchronize the offset and create the snapshot.

Copy link
Contributor

@sushantmane sushantmane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let’s use Utils.getReplicaId(pubSubTopic, partitionId) to log topic and partition information consistently. A standardized format makes it easier to search and filter logs.

@jingy-li jingy-li requested a review from gaojieliu April 29, 2025 19:49
@jingy-li jingy-li changed the title [server][dvc] add pre snapshot creation listener for disk flush and offset sync [server][dvc] add snapshot creation listener for disk flush and offset sync Apr 29, 2025
@jingy-li jingy-li changed the title [server][dvc] add snapshot creation listener for disk flush and offset sync [server][dvc] add snapshot creation listener for sync offset and flush before create snapshot Apr 29, 2025
@jingy-li jingy-li changed the title [server][dvc] add snapshot creation listener for sync offset and flush before create snapshot [server][dvc] add snapshot creation listener for flush and sync offset before create snapshot Apr 30, 2025
@github-actions
Copy link

github-actions bot commented Oct 3, 2025

Hi there. This pull request has been inactive for 30 days.
To keep our review queue healthy, we plan to close it in 7 days
unless there is new activity. If you are still working on this,
please push a commit, leave a comment, or convert it to draft to
signal intent. Thank you for your time and contributions.

@github-actions github-actions bot added the stale label Oct 3, 2025
@github-actions
Copy link

Closing this pull request due to 37 days of inactivity.
This is not a judgment on the value of the work. If you would like
to continue, please reopen or open a new PR and we will be happy
to take another look. Thank you again for contributing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants