From 42c270dd9ffc5ac1cca89591b31f0172c7aa4123 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 6 Sep 2023 15:31:08 +0200 Subject: [PATCH] Assert commits order in IndexCommitListener --- .../index/engine/InternalEngine.java | 38 +++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 363a71719efbe..94e9bbdc22917 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -88,6 +88,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardLongFieldRange; +import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogCorruptedException; @@ -360,26 +361,57 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException { @Nullable private CombinedDeletionPolicy.CommitsListener newCommitsListener() { - final Engine.IndexCommitListener listener = engineConfig.getIndexCommitListener(); + Engine.IndexCommitListener listener = engineConfig.getIndexCommitListener(); if (listener != null) { + final IndexCommitListener wrappedListener = Assertions.ENABLED ? assertingCommitsOrderListener(listener) : listener; return new CombinedDeletionPolicy.CommitsListener() { @Override public void onNewAcquiredCommit(final IndexCommit commit, final Set additionalFiles) { final IndexCommitRef indexCommitRef = acquireIndexCommitRef(() -> commit); var primaryTerm = config().getPrimaryTermSupplier().getAsLong(); assert indexCommitRef.getIndexCommit() == commit; - listener.onNewCommit(shardId, store, primaryTerm, indexCommitRef, additionalFiles); + wrappedListener.onNewCommit(shardId, store, primaryTerm, indexCommitRef, additionalFiles); } @Override public void onDeletedCommit(IndexCommit commit) { - listener.onIndexCommitDelete(shardId, commit); + wrappedListener.onIndexCommitDelete(shardId, commit); } }; } return null; } + private IndexCommitListener assertingCommitsOrderListener(final IndexCommitListener listener) { + final AtomicLong generation = new AtomicLong(0L); + return new IndexCommitListener() { + @Override + public void onNewCommit( + ShardId shardId, + Store store, + long primaryTerm, + IndexCommitRef indexCommitRef, + Set additionalFiles + ) { + final long nextGen = indexCommitRef.getIndexCommit().getGeneration(); + final long prevGen = generation.getAndSet(nextGen); + assert prevGen < nextGen + : "Expect new commit generation " + + nextGen + + " to be greater than previous commit generation " + + prevGen + + " for shard " + + shardId; + listener.onNewCommit(shardId, store, primaryTerm, indexCommitRef, additionalFiles); + } + + @Override + public void onIndexCommitDelete(ShardId shardId, IndexCommit deletedCommit) { + listener.onIndexCommitDelete(shardId, deletedCommit); + } + }; + } + @Override public CompletionStats completionStats(String... fieldNamePatterns) { return completionStatsCache.get(fieldNamePatterns);