diff --git a/CHANGELOG.md b/CHANGELOG.md index db57363ab605f..8dee8390cbc5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -174,6 +174,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Handle null partSize in OnDemandBlockSnapshotIndexInput ([#9291](https://github.com/opensearch-project/OpenSearch/issues/9291)) - Fix condition to remove index create block ([#9437](https://github.com/opensearch-project/OpenSearch/pull/9437)) - Add support to clear archived index setting ([#9019](https://github.com/opensearch-project/OpenSearch/pull/9019)) +- [Segment Replication] Fixed bug where replica shard temporarily serves stale data during an engine reset ([#9495](https://github.com/opensearch-project/OpenSearch/pull/9495)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 69cdd80bb5085..5855ed7470559 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -143,8 +143,9 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica); assertNotNull(replicaShardRouting); assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); - refresh(INDEX_NAME); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + final SearchResponse response = client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); + // new primary should have at least the doc count from the first set of segments. + assertTrue(response.getHits().getTotalHits().value >= 1); // assert we can index into the new primary. client().prepareIndex(INDEX_NAME).setId("3").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 48556cc6b9709..b529dfbe13bf4 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -39,6 +39,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; @@ -57,6 +59,7 @@ public class NRTReplicationEngine extends Engine { private final CompletionStatsCache completionStatsCache; private final LocalCheckpointTracker localCheckpointTracker; private final WriteOnlyTranslogManager translogManager; + private final Lock flushLock = new ReentrantLock(); protected final ReplicaFileTracker replicaFileTracker; private volatile long lastReceivedPrimaryGen = SequenceNumbers.NO_OPS_PERFORMED; @@ -156,7 +159,7 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep // a lower gen from a newly elected primary shard that is behind this shard's last commit gen. // In that case we still commit into the next local generation. if (incomingGeneration != this.lastReceivedPrimaryGen) { - commitSegmentInfos(); + flush(false, true); translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo); translogManager.rollTranslogGeneration(); } @@ -184,7 +187,7 @@ private void commitSegmentInfos(SegmentInfos infos) throws IOException { translogManager.syncTranslog(); } - protected void commitSegmentInfos() throws IOException { + private void commitSegmentInfos() throws IOException { commitSegmentInfos(getLatestSegmentInfos()); } @@ -351,7 +354,27 @@ public boolean shouldPeriodicallyFlush() { } @Override - public void flush(boolean force, boolean waitIfOngoing) throws EngineException {} + public void flush(boolean force, boolean waitIfOngoing) throws EngineException { + ensureOpen(); + // readLock is held here to wait/block any concurrent close that acquires the writeLock. + try (final ReleasableLock lock = readLock.acquire()) { + ensureOpen(); + if (flushLock.tryLock() == false) { + if (waitIfOngoing == false) { + return; + } + flushLock.lock(); + } + // we are now locked. + try { + commitSegmentInfos(); + } catch (IOException e) { + throw new FlushFailedEngineException(shardId, e); + } finally { + flushLock.unlock(); + } + } + } @Override public void forceMerge( @@ -365,6 +388,9 @@ public void forceMerge( @Override public GatedCloseable acquireLastIndexCommit(boolean flushFirst) throws EngineException { + if (flushFirst) { + flush(false, true); + } try { final IndexCommit indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, store.directory()); return new GatedCloseable<>(indexCommit, () -> {}); diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index cb93d3a8db20e..22eb5195af507 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -152,7 +152,7 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); // commit the infos to push us to segments_3. - nrtEngine.commitSegmentInfos(); + nrtEngine.flush(); assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration()); @@ -283,7 +283,7 @@ public void testTrimTranslogOps() throws Exception { } } - public void testCommitSegmentInfos() throws Exception { + public void testFlush() throws Exception { // This test asserts that NRTReplication#commitSegmentInfos creates a new commit point with the latest checkpoints // stored in user data. final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); @@ -304,7 +304,7 @@ public void testCommitSegmentInfos() throws Exception { LocalCheckpointTracker localCheckpointTracker = nrtEngine.getLocalCheckpointTracker(); final long maxSeqNo = localCheckpointTracker.getMaxSeqNo(); final long processedCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); - nrtEngine.commitSegmentInfos(); + nrtEngine.flush(); // ensure getLatestSegmentInfos returns an updated infos ref with correct userdata. final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos(); @@ -322,6 +322,10 @@ public void testCommitSegmentInfos() throws Exception { userData = committedInfos.getUserData(); assertEquals(processedCheckpoint, Long.parseLong(userData.get(LOCAL_CHECKPOINT_KEY))); assertEquals(maxSeqNo, Long.parseLong(userData.get(MAX_SEQ_NO))); + + try (final GatedCloseable indexCommit = nrtEngine.acquireLastIndexCommit(true)) { + assertEquals(committedInfos.getGeneration() + 1, indexCommit.get().getGeneration()); + } } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 8e27c9ff9ae1a..ead9c1c22c931 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -236,7 +236,7 @@ public void testReplicaCommitsInfosBytesOnRecovery() throws Exception { MatcherAssert.assertThat( "Replica commits infos bytes referencing latest refresh point", latestReplicaCommit.files(true), - containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs", "segments_5") + containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs", "segments_6") ); MatcherAssert.assertThat( "Segments are referenced in memory", @@ -294,20 +294,20 @@ public void testRepicaCleansUpOldCommitsWhenReceivingNew() throws Exception { replicateSegments(primary, shards.getReplicas()); assertDocCount(primary, 1); assertDocCount(replica, 1); - assertEquals("segments_4", replica.store().readLastCommittedSegmentsInfo().getSegmentsFileName()); - assertSingleSegmentFile(replica, "segments_4"); + assertEquals("segments_5", replica.store().readLastCommittedSegmentsInfo().getSegmentsFileName()); + assertSingleSegmentFile(replica, "segments_5"); shards.indexDocs(1); primary.refresh("test"); replicateSegments(primary, shards.getReplicas()); assertDocCount(replica, 2); - assertSingleSegmentFile(replica, "segments_4"); + assertSingleSegmentFile(replica, "segments_5"); shards.indexDocs(1); flushShard(primary); replicateSegments(primary, shards.getReplicas()); assertDocCount(replica, 3); - assertSingleSegmentFile(replica, "segments_5"); + assertSingleSegmentFile(replica, "segments_6"); final Store.RecoveryDiff diff = Store.segmentReplicationDiff(primary.getSegmentMetadataMap(), replica.getSegmentMetadataMap()); assertTrue(diff.missing.isEmpty()); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 807b4a9cd7482..e8220830063ee 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -34,6 +34,7 @@ import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.engine.ReadOnlyEngine; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; import org.opensearch.index.replication.TestReplicationSource; @@ -71,6 +72,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber; @@ -773,6 +775,35 @@ public void testNoDuplicateSeqNo() throws Exception { } } + public void testQueryDuringEngineResetShowsDocs() throws Exception { + final NRTReplicationEngineFactory engineFactory = new NRTReplicationEngineFactory(); + final NRTReplicationEngineFactory spy = spy(engineFactory); + try (ReplicationGroup shards = createGroup(1, settings, indexMapping, spy, createTempDir())) { + final IndexShard primaryShard = shards.getPrimary(); + final IndexShard replicaShard = shards.getReplicas().get(0); + shards.startAll(); + shards.indexDocs(10); + shards.refresh("test"); + replicateSegments(primaryShard, shards.getReplicas()); + shards.assertAllEqual(10); + + final AtomicReference failed = new AtomicReference<>(); + doAnswer(ans -> { + try { + final Engine engineOrNull = replicaShard.getEngineOrNull(); + assertNotNull(engineOrNull); + assertTrue(engineOrNull instanceof ReadOnlyEngine); + shards.assertAllEqual(10); + } catch (Throwable e) { + failed.set(e); + } + return ans.callRealMethod(); + }).when(spy).newReadWriteEngine(any()); + shards.promoteReplicaToPrimary(replicaShard).get(); + assertNull("Expected correct doc count during engine reset", failed.get()); + } + } + /** * Assert persisted and searchable doc counts. This method should not be used while docs are concurrently indexed because * it asserts point in time seqNos are relative to the doc counts.