Skip to content

Commit

Permalink
Assert commits order in IndexCommitListener
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Sep 6, 2023
1 parent c9a2555 commit 42c270d
Showing 1 changed file with 35 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
Expand Down Expand Up @@ -360,26 +361,57 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {

@Nullable
private CombinedDeletionPolicy.CommitsListener newCommitsListener() {
final Engine.IndexCommitListener listener = engineConfig.getIndexCommitListener();
Engine.IndexCommitListener listener = engineConfig.getIndexCommitListener();
if (listener != null) {
final IndexCommitListener wrappedListener = Assertions.ENABLED ? assertingCommitsOrderListener(listener) : listener;
return new CombinedDeletionPolicy.CommitsListener() {
@Override
public void onNewAcquiredCommit(final IndexCommit commit, final Set<String> additionalFiles) {
final IndexCommitRef indexCommitRef = acquireIndexCommitRef(() -> commit);
var primaryTerm = config().getPrimaryTermSupplier().getAsLong();
assert indexCommitRef.getIndexCommit() == commit;
listener.onNewCommit(shardId, store, primaryTerm, indexCommitRef, additionalFiles);
wrappedListener.onNewCommit(shardId, store, primaryTerm, indexCommitRef, additionalFiles);
}

@Override
public void onDeletedCommit(IndexCommit commit) {
listener.onIndexCommitDelete(shardId, commit);
wrappedListener.onIndexCommitDelete(shardId, commit);
}
};
}
return null;
}

private IndexCommitListener assertingCommitsOrderListener(final IndexCommitListener listener) {
final AtomicLong generation = new AtomicLong(0L);
return new IndexCommitListener() {
@Override
public void onNewCommit(
ShardId shardId,
Store store,
long primaryTerm,
IndexCommitRef indexCommitRef,
Set<String> additionalFiles
) {
final long nextGen = indexCommitRef.getIndexCommit().getGeneration();
final long prevGen = generation.getAndSet(nextGen);
assert prevGen < nextGen
: "Expect new commit generation "
+ nextGen
+ " to be greater than previous commit generation "
+ prevGen
+ " for shard "
+ shardId;
listener.onNewCommit(shardId, store, primaryTerm, indexCommitRef, additionalFiles);
}

@Override
public void onIndexCommitDelete(ShardId shardId, IndexCommit deletedCommit) {
listener.onIndexCommitDelete(shardId, deletedCommit);
}
};
}

@Override
public CompletionStats completionStats(String... fieldNamePatterns) {
return completionStatsCache.get(fieldNamePatterns);
Expand Down

0 comments on commit 42c270d

Please sign in to comment.