From 5df95b031cd5cfb7c0a95152c24e8ed0da6028f8 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 24 Nov 2023 22:46:23 +0530 Subject: [PATCH 01/11] [Remote Store] Drain ongoing refreshes, translog uploads during primary relocation Signed-off-by: Ashish Singh --- .../CloseableRetryableRefreshListener.java | 19 +++++++--- .../opensearch/index/shard/IndexShard.java | 26 ++++++++++--- .../translog/InternalTranslogManager.java | 7 ++++ .../index/translog/NoOpTranslogManager.java | 7 ++++ .../index/translog/RemoteFsTranslog.java | 38 +++++++++++++++++-- .../opensearch/index/translog/Translog.java | 7 ++++ .../index/translog/TranslogManager.java | 6 +++ ...loseableRetryableRefreshListenerTests.java | 26 ++++++------- 8 files changed, 108 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java index 3ee74e5267718..6b9c619e3141a 100644 --- a/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java @@ -10,10 +10,11 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.search.ReferenceManager; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; import org.opensearch.common.unit.TimeValue; import org.opensearch.threadpool.ThreadPool; -import java.io.Closeable; import java.io.IOException; import java.util.Objects; import java.util.concurrent.Semaphore; @@ -26,7 +27,7 @@ * is closed, all the permits are acquired and there are no available permits to afterRefresh. This abstract class provides * necessary abstract methods to schedule retry. */ -public abstract class CloseableRetryableRefreshListener implements ReferenceManager.RefreshListener, Closeable { +public abstract class CloseableRetryableRefreshListener implements ReferenceManager.RefreshListener { /** * Total permits = 1 ensures that there is only single instance of runAfterRefreshWithPermit that is running at a time. @@ -184,18 +185,24 @@ private void scheduleRetry(boolean afterRefreshSuccessful, boolean didRefresh) { */ protected abstract boolean performAfterRefreshWithPermit(boolean didRefresh); - @Override - public final void close() throws IOException { + public final Releasable drainRefreshes() { try { if (semaphore.tryAcquire(TOTAL_PERMITS, 10, TimeUnit.MINUTES)) { boolean result = closed.compareAndSet(false, true); assert result && semaphore.availablePermits() == 0; getLogger().info("All permits are acquired and refresh listener is closed"); + return Releasables.releaseOnce(() -> { + semaphore.release(TOTAL_PERMITS); + boolean wasClosed = closed.getAndSet(false); + assert semaphore.availablePermits() == TOTAL_PERMITS : "Available permits is " + semaphore.availablePermits(); + assert wasClosed : "RefreshListener is not closed before reopening it"; + getLogger().info("All permits are released and refresh listener is open"); + }); } else { - throw new TimeoutException("timeout while closing gated refresh listener"); + throw new TimeoutException("Timeout while acquiring all permits"); } } catch (InterruptedException | TimeoutException e) { - throw new RuntimeException("Failed to close the closeable retryable listener", e); + throw new RuntimeException("Failed to acquire all permits", e); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 405845d58f227..4a3e6cbb7d266 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -851,6 +851,9 @@ public void relocated( final Runnable performSegRep ) throws IllegalIndexShardStateException, IllegalStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; + // The below list of releasable ensures that if the relocation does not happen, we undo the activity of close and + // acquire all permits. This will ensure that the remote store uploads can still be done by the existing primary shard. + List releasablesOnNoHandoff = new ArrayList<>(); try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { forceRefreshes.close(); @@ -863,11 +866,15 @@ public void relocated( maybeSync(); } - // Ensures all in-flight remote store operations drain, before we perform the handoff. - internalRefreshListener.stream() - .filter(refreshListener -> refreshListener instanceof Closeable) - .map(refreshListener -> (Closeable) refreshListener) - .close(); + // Ensures all in-flight remote store refreshes drain, before we perform the performSegRep. + for (ReferenceManager.RefreshListener refreshListener : internalRefreshListener) { + if (refreshListener instanceof CloseableRetryableRefreshListener) { + releasablesOnNoHandoff.add(((CloseableRetryableRefreshListener) refreshListener).drainRefreshes()); + } + } + + // Ensure all in-flight remote store translog upload drains, before we perform the performSegRep. + releasablesOnNoHandoff.add(getEngineOrNull().translogManager().drainSyncToStore()); // no shard operation permits are being held here, move state from started to relocated assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED @@ -902,6 +909,15 @@ public void relocated( // Fail primary relocation source and target shards. failShard("timed out waiting for relocation hand-off to complete", null); throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete"); + } finally { + // If the primary mode is still true after the end of handoff attempt, it basically means that the relocation + // failed. The existing primary will continue to be the primary, so we need to allow the segments and translog + // upload to resume. + if (replicationTracker.isPrimaryMode()) { + for (Releasable releasable : releasablesOnNoHandoff) { + releasable.close(); + } + } } } diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 4d0fc13d433c6..eabf08d85fa7c 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.logging.Loggers; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; @@ -301,10 +302,16 @@ public void setMinSeqNoToKeep(long seqNo) { translog.setMinSeqNoToKeep(seqNo); } + @Override public void onDelete() { translog.onDelete(); } + @Override + public Releasable drainSyncToStore() { + return translog.drainSyncToStore(); + } + @Override public Translog.TranslogGeneration getTranslogGeneration() { return translog.getGeneration(); diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index 3e6a8e69edfbb..33b64a20e934b 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.core.index.shard.ShardId; @@ -121,8 +122,14 @@ public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolea throw new UnsupportedOperationException("Translog snapshot unsupported with no-op translogs"); } + @Override public void onDelete() {} + @Override + public Releasable drainSyncToStore() { + return () -> {}; + } + @Override public Translog.TranslogGeneration getTranslogGeneration() { return null; diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 8fb420e8fa1da..0a4c4e5801cad 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.common.SetOnce; import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; import org.opensearch.common.logging.Loggers; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; @@ -38,6 +39,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; @@ -53,7 +56,6 @@ public class RemoteFsTranslog extends Translog { private final Logger logger; - private final BlobStoreRepository blobStoreRepository; private final TranslogTransferManager translogTransferManager; private final FileTransferTracker fileTransferTracker; private final BooleanSupplier primaryModeSupplier; @@ -75,6 +77,10 @@ public class RemoteFsTranslog extends Translog { // Semaphore used to allow only single remote generation to happen at a time private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS); + // These permits exist to allow any inflight background triggered upload. + private static final int UPLOAD_PERMITS = 1; + private final Semaphore uploadPermits = new Semaphore(UPLOAD_PERMITS); + public RemoteFsTranslog( TranslogConfig config, String translogUUID, @@ -89,7 +95,6 @@ public RemoteFsTranslog( ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); logger = Loggers.getLogger(getClass(), shardId); - this.blobStoreRepository = blobStoreRepository; this.primaryModeSupplier = primaryModeSupplier; this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); @@ -321,8 +326,13 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws // below ensures that the real primary only is uploading. Before the primary mode is set as true for the new // primary, the engine is reset to InternalEngine which also initialises the RemoteFsTranslog which in turns // downloads all the translogs from remote store and does a flush before the relocation finishes. - if (primaryModeSupplier.getAsBoolean() == false) { - logger.debug("skipped uploading translog for {} {}", primaryTerm, generation); + if (primaryModeSupplier.getAsBoolean() == false || uploadPermits.tryAcquire(1) == false) { + logger.debug( + "skipped uploading translog for {} {} uploadPermits={}", + primaryTerm, + generation, + uploadPermits.availablePermits() + ); // NO-OP return true; } @@ -341,6 +351,8 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws transferSnapshotProvider, new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo) ); + } finally { + uploadPermits.release(1); } } @@ -423,6 +435,24 @@ protected void setMinSeqNoToKeep(long seqNo) { this.minSeqNoToKeep = seqNo; } + @Override + protected Releasable drainSyncToStore() { + try { + if (uploadPermits.tryAcquire(UPLOAD_PERMITS, 1, TimeUnit.MINUTES)) { + logger.info("All permits acquired"); + return Releasables.releaseOnce(() -> { + uploadPermits.release(UPLOAD_PERMITS); + assert uploadPermits.availablePermits() == UPLOAD_PERMITS : "Available permits is " + uploadPermits.availablePermits(); + logger.info("All permits released"); + }); + } else { + throw new TimeoutException("Timeout while acquiring all permits"); + } + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException("Failed to acquire all permits", e); + } + } + @Override public void trimUnreferencedReaders() throws IOException { // clean up local translog files and updates readers diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 8b4662238ed25..6e08d7c4d7bdb 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -1817,6 +1817,13 @@ protected void setMinSeqNoToKeep(long seqNo) {} protected void onDelete() {} + /** + * Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back. + */ + protected Releasable drainSyncToStore() { + return () -> {}; + } + /** * deletes all files associated with a reader. package-private to be able to simulate node failures at this point */ diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index 4279e0289c1dc..d66f12dc3da81 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.lease.Releasable; import java.io.IOException; import java.util.stream.Stream; @@ -135,5 +136,10 @@ public interface TranslogManager { */ void onDelete(); + /** + * Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back. + */ + Releasable drainSyncToStore(); + Translog.TranslogGeneration getTranslogGeneration(); } diff --git a/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java index 01242063caa77..cc192e4eefa0a 100644 --- a/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java @@ -66,7 +66,7 @@ protected Logger getLogger() { // Second invocation of afterRefresh method testRefreshListener.afterRefresh(true); assertEquals(0, countDownLatch.getCount()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); } /** @@ -98,7 +98,7 @@ protected Logger getLogger() { assertEquals(initialCount - refreshCount, countDownLatch.getCount()); // Closing the refresh listener so that no further afterRefreshes are executed going forward - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); for (int i = 0; i < initialCount - refreshCount; i++) { testRefreshListener.afterRefresh(true); @@ -129,7 +129,7 @@ protected Logger getLogger() { }; testRefreshListener.afterRefresh(true); assertEquals(initialCount - 1, countDownLatch.getCount()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { @Override @@ -148,7 +148,7 @@ protected Logger getLogger() { }; testRefreshListener.afterRefresh(true); assertEquals(initialCount - 2, countDownLatch.getCount()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { @Override @@ -172,7 +172,7 @@ protected Logger getLogger() { }; testRefreshListener.afterRefresh(true); assertEquals(initialCount - 3, countDownLatch.getCount()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { @Override @@ -196,7 +196,7 @@ protected Logger getLogger() { }; testRefreshListener.afterRefresh(true); assertEquals(initialCount - 4, countDownLatch.getCount()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); } /** @@ -237,7 +237,7 @@ protected boolean isRetryEnabled() { }; testRefreshListener.afterRefresh(true); assertBusy(() -> assertEquals(0, countDownLatch.getCount())); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); } /** @@ -272,7 +272,7 @@ protected Logger getLogger() { } }; testRefreshListener.afterRefresh(randomBoolean()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); assertNotEquals(0, countDownLatch.getCount()); } @@ -307,7 +307,7 @@ protected Logger getLogger() { }); thread.start(); assertBusy(() -> assertEquals(0, countDownLatch.getCount())); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); } public void testScheduleRetryAfterClose() throws Exception { @@ -358,8 +358,8 @@ protected TimeValue getNextRetryInterval() { Thread thread2 = new Thread(() -> { try { Thread.sleep(500); - testRefreshListener.close(); - } catch (IOException | InterruptedException e) { + testRefreshListener.drainRefreshes(); + } catch (InterruptedException e) { throw new AssertionError(e); } }); @@ -408,7 +408,7 @@ protected boolean isRetryEnabled() { testRefreshListener.afterRefresh(true); testRefreshListener.afterRefresh(true); assertBusy(() -> assertEquals(3, runCount.get())); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); } public void testExceptionDuringThreadPoolSchedule() throws Exception { @@ -450,7 +450,7 @@ protected boolean isRetryEnabled() { assertThrows(RuntimeException.class, () -> testRefreshListener.afterRefresh(true)); assertBusy(() -> assertFalse(testRefreshListener.getRetryScheduledStatus())); assertEquals(1, runCount.get()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); } @After From 11899bef393d21fc8ed05e8cf06bb0b8c9cbd829 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sat, 25 Nov 2023 10:27:21 +0530 Subject: [PATCH 02/11] Prevent remote upload before CM starts the primary shard Signed-off-by: Ashish Singh --- .../org/opensearch/index/shard/IndexShard.java | 13 +++++++++++-- .../index/shard/RemoteStoreRefreshListener.java | 9 ++++----- .../shard/RemoteStoreRefreshListenerTests.java | 14 +++++++------- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 4a3e6cbb7d266..24430999c55d6 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3887,10 +3887,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases, - () -> getOperationPrimaryTerm(), + this::getOperationPrimaryTerm, tombstoneDocSupplier(), isReadOnlyReplica, - replicationTracker::isPrimaryMode, + this::isStartedPrimary, translogFactorySupplier.apply(indexSettings, shardRouting), isTimeSeriesDescSortOptimizationEnabled() ? DataStream.TIMESERIES_LEAF_SORTER : null // DESC @timestamp default order for // timeseries @@ -3905,6 +3905,15 @@ public boolean isRemoteTranslogEnabled() { return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled(); } + /** + * This checks if we are in state to upload to remote store. Until the cluster-manager informs the shard through + * cluster state, the shard will not be in STARTED state. This method is used to prevent pre-emptive segment or + * translog uploads. + */ + public boolean isStartedPrimary() { + return getReplicationTracker().isPrimaryMode() && state() == IndexShardState.STARTED; + } + /** * @return true if segment reverse search optimization is enabled for time series based workload. */ diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index b02d858ce8b39..17d1d3a01b196 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -487,8 +487,7 @@ private void initializeRemoteDirectoryOnTermUpdate() throws IOException { * @return true iff primaryMode is true and index shard is not in closed state. */ private boolean isReadyForUpload() { - boolean isReady = (indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED) - || isLocalOrSnapshotRecovery(); + boolean isReady = indexShard.isStartedPrimary() || isLocalOrSnapshotRecovery(); if (isReady == false) { StringBuilder sb = new StringBuilder("Skipped syncing segments with"); @@ -501,11 +500,11 @@ private boolean isReadyForUpload() { if (indexShard.getEngineOrNull() != null) { sb.append(" engineType=").append(indexShard.getEngine().getClass().getSimpleName()); } - if (isLocalOrSnapshotRecovery() == false) { + if (indexShard.recoveryState() != null) { sb.append(" recoverySourceType=").append(indexShard.recoveryState().getRecoverySource().getType()); sb.append(" primary=").append(indexShard.shardRouting.primary()); } - logger.trace(sb.toString()); + logger.info(sb.toString()); } return isReady; } @@ -514,8 +513,8 @@ private boolean isLocalOrSnapshotRecovery() { // In this case when the primary mode is false, we need to upload segments to Remote Store // This is required in case of snapshots/shrink/ split/clone where we need to durable persist // all segments to remote before completing the recovery to ensure durability. - return (indexShard.state() == IndexShardState.RECOVERING && indexShard.shardRouting.primary()) + && indexShard.recoveryState() != null && (indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS || indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT); } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 74da9a3fff19c..811d6a722d0f6 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -320,7 +320,7 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception { // This is the case of isRetry=false, shouldRetry=false // Succeed on 1st attempt int succeedOnAttempt = 1; - // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down @@ -341,7 +341,7 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { // This covers 2 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false // Succeed on 2nd attempt int succeedOnAttempt = 2; - // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down @@ -365,7 +365,7 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { public void testRefreshSuccessAfterFailureInFirstAttemptAfterSnapshotAndMetadataUpload() throws Exception { int succeedOnAttempt = 1; int checkpointPublishSucceedOnAttempt = 2; - // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 6 as during a successful upload IndexShard.getEngine() is hit thrice and here we are running the flow twice @@ -387,7 +387,7 @@ public void testRefreshSuccessOnThirdAttempt() throws Exception { // This covers 3 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false 3) isRetry=True, shouldRetry=true // Succeed on 3rd attempt int succeedOnAttempt = 3; - // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down @@ -479,6 +479,7 @@ private Tuple mockIn IndexShard shard = mock(IndexShard.class); Store store = mock(Store.class); when(shard.store()).thenReturn(store); + when(shard.state()).thenReturn(IndexShardState.STARTED); when(store.directory()).thenReturn(indexShard.store().directory()); // Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) @@ -500,13 +501,12 @@ private Tuple mockIn when(shard.getThreadPool()).thenReturn(threadPool); // Mock indexShard.getReplicationTracker().isPrimaryMode() - doAnswer(invocation -> { if (Objects.nonNull(refreshCountLatch)) { refreshCountLatch.countDown(); } - return indexShard.getReplicationTracker(); - }).when(shard).getReplicationTracker(); + return true; + }).when(shard).isStartedPrimary(); AtomicLong counter = new AtomicLong(); // Mock indexShard.getSegmentInfosSnapshot() From 69d1ee70e9941f65529d256e26911faf955a5b1f Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sat, 25 Nov 2023 16:54:20 +0530 Subject: [PATCH 03/11] Add new UTs and enhance existing UTs Signed-off-by: Ashish Singh --- .../CloseableRetryableRefreshListener.java | 23 +++- ...loseableRetryableRefreshListenerTests.java | 129 ++++++++++++++++++ 2 files changed, 151 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java index 6b9c619e3141a..23874603d3f52 100644 --- a/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java @@ -35,6 +35,8 @@ public abstract class CloseableRetryableRefreshListener implements ReferenceMana */ private static final int TOTAL_PERMITS = 1; + private static final TimeValue DRAIN_TIMEOUT = TimeValue.timeValueMinutes(10); + private final AtomicBoolean closed = new AtomicBoolean(false); private final Semaphore semaphore = new Semaphore(TOTAL_PERMITS); @@ -187,7 +189,8 @@ private void scheduleRetry(boolean afterRefreshSuccessful, boolean didRefresh) { public final Releasable drainRefreshes() { try { - if (semaphore.tryAcquire(TOTAL_PERMITS, 10, TimeUnit.MINUTES)) { + TimeValue timeout = getDrainTimeout(); + if (semaphore.tryAcquire(TOTAL_PERMITS, timeout.seconds(), TimeUnit.SECONDS)) { boolean result = closed.compareAndSet(false, true); assert result && semaphore.availablePermits() == 0; getLogger().info("All permits are acquired and refresh listener is closed"); @@ -208,6 +211,14 @@ public final Releasable drainRefreshes() { protected abstract Logger getLogger(); + // Made available for unit testing purpose only + /** + * Returns the timeout which is used while draining refreshes. + */ + protected TimeValue getDrainTimeout() { + return DRAIN_TIMEOUT; + } + // Visible for testing /** * Returns if the retry is scheduled or not. @@ -217,4 +228,14 @@ public final Releasable drainRefreshes() { boolean getRetryScheduledStatus() { return retryScheduled.get(); } + + // Visible for testing + int availablePermits() { + return semaphore.availablePermits(); + } + + // Visible for testing + boolean isClosed() { + return closed.get(); + } } diff --git a/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java index cc192e4eefa0a..13694043c450a 100644 --- a/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.unit.TimeValue; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -274,6 +275,7 @@ protected Logger getLogger() { testRefreshListener.afterRefresh(randomBoolean()); testRefreshListener.drainRefreshes(); assertNotEquals(0, countDownLatch.getCount()); + assertRefreshListenerClosed(testRefreshListener); } public void testCloseWaitsForAcquiringAllPermits() throws Exception { @@ -308,6 +310,7 @@ protected Logger getLogger() { thread.start(); assertBusy(() -> assertEquals(0, countDownLatch.getCount())); testRefreshListener.drainRefreshes(); + assertRefreshListenerClosed(testRefreshListener); } public void testScheduleRetryAfterClose() throws Exception { @@ -368,6 +371,7 @@ protected TimeValue getNextRetryInterval() { thread1.join(); thread2.join(); assertBusy(() -> assertEquals(1, runCount.get())); + assertRefreshListenerClosed(testRefreshListener); } public void testConcurrentScheduleRetry() throws Exception { @@ -409,6 +413,7 @@ protected boolean isRetryEnabled() { testRefreshListener.afterRefresh(true); assertBusy(() -> assertEquals(3, runCount.get())); testRefreshListener.drainRefreshes(); + assertRefreshListenerClosed(testRefreshListener); } public void testExceptionDuringThreadPoolSchedule() throws Exception { @@ -451,6 +456,120 @@ protected boolean isRetryEnabled() { assertBusy(() -> assertFalse(testRefreshListener.getRetryScheduledStatus())); assertEquals(1, runCount.get()); testRefreshListener.drainRefreshes(); + assertRefreshListenerClosed(testRefreshListener); + } + + public void testTimeoutDuringClose() throws Exception { + // This test checks the expected behaviour when the drainRefreshes times out. + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { + @Override + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { + try { + Thread.sleep(TimeValue.timeValueSeconds(2).millis()); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + return true; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + + @Override + protected TimeValue getDrainTimeout() { + return TimeValue.timeValueSeconds(1); + } + }; + Thread thread1 = new Thread(() -> { + try { + testRefreshListener.afterRefresh(true); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + thread1.start(); + assertBusy(() -> assertEquals(0, testRefreshListener.availablePermits())); + RuntimeException ex = assertThrows(RuntimeException.class, testRefreshListener::drainRefreshes); + assertEquals("Failed to acquire all permits", ex.getMessage()); + thread1.join(); + } + + public void testThreadInterruptDuringClose() throws Exception { + // This test checks the expected behaviour when the thread performing the drainRefresh is interrupted. + CountDownLatch latch = new CountDownLatch(2); + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { + @Override + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { + try { + Thread.sleep(TimeValue.timeValueSeconds(2).millis()); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + return true; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + + @Override + protected TimeValue getDrainTimeout() { + return TimeValue.timeValueSeconds(2); + } + }; + Thread thread1 = new Thread(() -> { + try { + testRefreshListener.afterRefresh(true); + latch.countDown(); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + Thread thread2 = new Thread(() -> { + RuntimeException ex = assertThrows(RuntimeException.class, testRefreshListener::drainRefreshes); + assertEquals("Failed to acquire all permits", ex.getMessage()); + latch.countDown(); + }); + thread1.start(); + assertBusy(() -> assertEquals(0, testRefreshListener.availablePermits())); + thread2.start(); + thread2.interrupt(); + thread1.join(); + thread2.join(); + assertEquals(0, latch.getCount()); + } + + public void testResumeRefreshesAfterDrainRefreshes() { + // This test checks the expected behaviour when the refresh listener is drained, but then refreshes are resumed again + // by closing the releasables acquired by calling the drainRefreshes method. + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { + @Override + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { + return true; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + }; + assertRefreshListenerOpen(testRefreshListener); + Releasable releasable = testRefreshListener.drainRefreshes(); + assertRefreshListenerClosed(testRefreshListener); + releasable.close(); + assertRefreshListenerOpen(testRefreshListener); } @After @@ -458,4 +577,14 @@ public void tearDown() throws Exception { super.tearDown(); terminate(threadPool); } + + private void assertRefreshListenerClosed(CloseableRetryableRefreshListener testRefreshListener) { + assertTrue(testRefreshListener.isClosed()); + assertEquals(0, testRefreshListener.availablePermits()); + } + + private void assertRefreshListenerOpen(CloseableRetryableRefreshListener testRefreshListener) { + assertFalse(testRefreshListener.isClosed()); + assertEquals(1, testRefreshListener.availablePermits()); + } } From 808f162438d0cd50bfb22ba04c9adf570bfb241d Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sat, 25 Nov 2023 19:41:27 +0530 Subject: [PATCH 04/11] Fix failing test and incorporate PR review feedback Signed-off-by: Ashish Singh --- .../opensearch/index/shard/IndexShard.java | 2 +- .../translog/InternalTranslogManager.java | 4 +- .../index/translog/NoOpTranslogManager.java | 2 +- .../index/translog/RemoteFsTranslog.java | 46 +++++++++---------- .../opensearch/index/translog/Translog.java | 4 +- .../index/translog/TranslogManager.java | 2 +- 6 files changed, 29 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 24430999c55d6..1ee83a15306b7 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -874,7 +874,7 @@ public void relocated( } // Ensure all in-flight remote store translog upload drains, before we perform the performSegRep. - releasablesOnNoHandoff.add(getEngineOrNull().translogManager().drainSyncToStore()); + releasablesOnNoHandoff.add(getEngineOrNull().translogManager().drainSync()); // no shard operation permits are being held here, move state from started to relocated assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index eabf08d85fa7c..7f930eeb5ac4d 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -308,8 +308,8 @@ public void onDelete() { } @Override - public Releasable drainSyncToStore() { - return translog.drainSyncToStore(); + public Releasable drainSync() { + return translog.drainSync(); } @Override diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index 33b64a20e934b..b4aa7865570a6 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -126,7 +126,7 @@ public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolea public void onDelete() {} @Override - public Releasable drainSyncToStore() { + public Releasable drainSync() { return () -> {}; } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 0a4c4e5801cad..eb18cddc9e11c 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog; import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.SetOnce; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; @@ -78,8 +79,8 @@ public class RemoteFsTranslog extends Translog { private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS); // These permits exist to allow any inflight background triggered upload. - private static final int UPLOAD_PERMITS = 1; - private final Semaphore uploadPermits = new Semaphore(UPLOAD_PERMITS); + private static final int SYNC_PERMITS = 1; + private final Semaphore syncPermits = new Semaphore(SYNC_PERMITS); public RemoteFsTranslog( TranslogConfig config, @@ -321,18 +322,13 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc } private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws IOException { - // During primary relocation (primary-primary peer recovery), both the old and the new primary have engine - // created with the RemoteFsTranslog. Both primaries are equipped to upload the translogs. The primary mode check - // below ensures that the real primary only is uploading. Before the primary mode is set as true for the new - // primary, the engine is reset to InternalEngine which also initialises the RemoteFsTranslog which in turns - // downloads all the translogs from remote store and does a flush before the relocation finishes. - if (primaryModeSupplier.getAsBoolean() == false || uploadPermits.tryAcquire(1) == false) { - logger.debug( - "skipped uploading translog for {} {} uploadPermits={}", - primaryTerm, - generation, - uploadPermits.availablePermits() - ); + // During primary relocation, both the old and new primary have engine created with RemoteFsTranslog and having + // ReplicationTracker.primaryMode() as true. However, before we perform the `internal:index/shard/replication/segments_sync` + // action which re-downloads the segments and translog on the new primary. We are ensuring 2 things here - + // 1. Using primaryModeSupplier, we prevent the new primary to do pre-emptive syncs + // 2. Using syncPermits, we prevent syncs at the desired time during primary relocation. + if (primaryModeSupplier.getAsBoolean() == false || syncPermits.tryAcquire(1) == false) { + logger.debug("skipped uploading translog for {} {} uploadPermits={}", primaryTerm, generation, syncPermits.availablePermits()); // NO-OP return true; } @@ -352,7 +348,7 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo) ); } finally { - uploadPermits.release(1); + syncPermits.release(1); } } @@ -436,13 +432,13 @@ protected void setMinSeqNoToKeep(long seqNo) { } @Override - protected Releasable drainSyncToStore() { + protected Releasable drainSync() { try { - if (uploadPermits.tryAcquire(UPLOAD_PERMITS, 1, TimeUnit.MINUTES)) { + if (syncPermits.tryAcquire(SYNC_PERMITS, 1, TimeUnit.MINUTES)) { logger.info("All permits acquired"); return Releasables.releaseOnce(() -> { - uploadPermits.release(UPLOAD_PERMITS); - assert uploadPermits.availablePermits() == UPLOAD_PERMITS : "Available permits is " + uploadPermits.availablePermits(); + syncPermits.release(SYNC_PERMITS); + assert syncPermits.availablePermits() == SYNC_PERMITS : "Available permits is " + syncPermits.availablePermits(); logger.info("All permits released"); }); } else { @@ -458,6 +454,12 @@ public void trimUnreferencedReaders() throws IOException { // clean up local translog files and updates readers super.trimUnreferencedReaders(); + // This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote + // store. + if (syncPermits.availablePermits() == 0) { + return; + } + // Since remote generation deletion is async, this ensures that only one generation deletion happens at a time. // Remote generations involves 2 async operations - 1) Delete translog generation files 2) Delete metadata files // We try to acquire 2 permits and if we can not, we return from here itself. @@ -535,11 +537,7 @@ public static void cleanup(Repository repository, ShardId shardId, ThreadPool th } protected void onDelete() { - if (primaryModeSupplier.getAsBoolean() == false) { - logger.trace("skipped delete translog"); - // NO-OP - return; - } + ClusterService.assertClusterOrClusterManagerStateThread(); // clean up all remote translog files translogTransferManager.delete(); } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 6e08d7c4d7bdb..cb64db19c8337 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -1820,8 +1820,8 @@ protected void onDelete() {} /** * Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back. */ - protected Releasable drainSyncToStore() { - return () -> {}; + protected Releasable drainSync() { + return () -> {}; // noop } /** diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index d66f12dc3da81..e1a0b7d1c1293 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -139,7 +139,7 @@ public interface TranslogManager { /** * Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back. */ - Releasable drainSyncToStore(); + Releasable drainSync(); Translog.TranslogGeneration getTranslogGeneration(); } From 675bbc82aa90024ed2e8cc89462c61685440f73b Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sun, 26 Nov 2023 00:06:48 +0530 Subject: [PATCH 05/11] Add UTs in RemoteFsTranslogTests & incorporate PR review feedback Signed-off-by: Ashish Singh --- .../opensearch/index/engine/EngineConfig.java | 16 ++-- .../index/engine/EngineConfigFactory.java | 4 +- .../index/engine/InternalEngine.java | 2 +- .../index/engine/NRTReplicationEngine.java | 2 +- .../opensearch/index/engine/NoOpEngine.java | 2 +- .../index/engine/ReadOnlyEngine.java | 2 +- .../shard/CheckpointRefreshListener.java | 2 +- .../opensearch/index/shard/IndexShard.java | 16 ++-- ...> ReleasableRetryableRefreshListener.java} | 12 +-- .../shard/RemoteStoreRefreshListener.java | 2 +- .../translog/InternalTranslogFactory.java | 2 +- .../translog/InternalTranslogManager.java | 8 +- ...emoteBlobStoreInternalTranslogFactory.java | 4 +- .../index/translog/RemoteFsTranslog.java | 41 ++++++--- .../index/translog/TranslogFactory.java | 2 +- .../translog/WriteOnlyTranslogManager.java | 4 +- ...easableRetryableRefreshListenerTests.java} | 42 ++++----- .../index/translog/RemoteFsTranslogTests.java | 91 ++++++++++++++++++- 18 files changed, 178 insertions(+), 76 deletions(-) rename server/src/main/java/org/opensearch/index/shard/{CloseableRetryableRefreshListener.java => ReleasableRetryableRefreshListener.java} (95%) rename server/src/test/java/org/opensearch/index/shard/{CloseableRetryableRefreshListenerTests.java => ReleasableRetryableRefreshListenerTests.java} (89%) diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index af65e993fcf26..bf3e10d684c94 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -108,7 +108,7 @@ public final class EngineConfig { private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; private final boolean isReadOnlyReplica; - private final BooleanSupplier primaryModeSupplier; + private final BooleanSupplier startedPrimarySupplier; private final Comparator leafSorter; /** @@ -287,7 +287,7 @@ private EngineConfig(Builder builder) { this.primaryTermSupplier = builder.primaryTermSupplier; this.tombstoneDocSupplier = builder.tombstoneDocSupplier; this.isReadOnlyReplica = builder.isReadOnlyReplica; - this.primaryModeSupplier = builder.primaryModeSupplier; + this.startedPrimarySupplier = builder.startedPrimarySupplier; this.translogFactory = builder.translogFactory; this.leafSorter = builder.leafSorter; } @@ -495,11 +495,11 @@ public boolean isReadOnlyReplica() { } /** - * Returns the underlying primaryModeSupplier. + * Returns the underlying startedPrimarySupplier. * @return the primary mode supplier. */ - public BooleanSupplier getPrimaryModeSupplier() { - return primaryModeSupplier; + public BooleanSupplier getStartedPrimarySupplier() { + return startedPrimarySupplier; } /** @@ -577,7 +577,7 @@ public static class Builder { private TombstoneDocSupplier tombstoneDocSupplier; private TranslogDeletionPolicyFactory translogDeletionPolicyFactory; private boolean isReadOnlyReplica; - private BooleanSupplier primaryModeSupplier; + private BooleanSupplier startedPrimarySupplier; private TranslogFactory translogFactory = new InternalTranslogFactory(); Comparator leafSorter; @@ -701,8 +701,8 @@ public Builder readOnlyReplica(boolean isReadOnlyReplica) { return this; } - public Builder primaryModeSupplier(BooleanSupplier primaryModeSupplier) { - this.primaryModeSupplier = primaryModeSupplier; + public Builder startedPrimarySupplier(BooleanSupplier startedPrimarySupplier) { + this.startedPrimarySupplier = startedPrimarySupplier; return this; } diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java index 38eea92b6c757..77e2f1c55201d 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java @@ -152,7 +152,7 @@ public EngineConfig newEngineConfig( LongSupplier primaryTermSupplier, EngineConfig.TombstoneDocSupplier tombstoneDocSupplier, boolean isReadOnlyReplica, - BooleanSupplier primaryModeSupplier, + BooleanSupplier startedPrimarySupplier, TranslogFactory translogFactory, Comparator leafSorter ) { @@ -185,7 +185,7 @@ public EngineConfig newEngineConfig( .primaryTermSupplier(primaryTermSupplier) .tombstoneDocSupplier(tombstoneDocSupplier) .readOnlyReplica(isReadOnlyReplica) - .primaryModeSupplier(primaryModeSupplier) + .startedPrimarySupplier(startedPrimarySupplier) .translogFactory(translogFactory) .leafSorter(leafSorter) .build(); diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 8e1627af274c5..650cb4688f9bf 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -292,7 +292,7 @@ public void onFailure(String reason, Exception ex) { new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId), this::ensureOpen, engineConfig.getTranslogFactory(), - engineConfig.getPrimaryModeSupplier() + engineConfig.getStartedPrimarySupplier() ); this.translogManager = translogManagerRef; this.softDeletesPolicy = newSoftDeletesPolicy(); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 3b55c4e648f1c..ed8dba2f8902d 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -125,7 +125,7 @@ public void onAfterTranslogSync() { }, this, engineConfig.getTranslogFactory(), - engineConfig.getPrimaryModeSupplier() + engineConfig.getStartedPrimarySupplier() ); this.translogManager = translogManagerRef; success = true; diff --git a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java index ca31d5518df47..9071b0e7a1eb3 100644 --- a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java @@ -203,7 +203,7 @@ public void trimUnreferencedTranslogFiles() throws TranslogException { engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {}, - engineConfig.getPrimaryModeSupplier() + engineConfig.getStartedPrimarySupplier() ) ) { translog.trimUnreferencedReaders(); diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index 80cef214a08cd..7ff3145055df8 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -278,7 +278,7 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), seqNo -> {}, - config.getPrimaryModeSupplier() + config.getStartedPrimarySupplier() ) ) { return translog.stats(); diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index 85d744e58265f..675d60ec2b63d 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -21,7 +21,7 @@ * * @opensearch.internal */ -public class CheckpointRefreshListener extends CloseableRetryableRefreshListener { +public class CheckpointRefreshListener extends ReleasableRetryableRefreshListener { protected static Logger logger = LogManager.getLogger(CheckpointRefreshListener.class); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 1ee83a15306b7..5bc99fb96ee7b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -853,7 +853,7 @@ public void relocated( assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; // The below list of releasable ensures that if the relocation does not happen, we undo the activity of close and // acquire all permits. This will ensure that the remote store uploads can still be done by the existing primary shard. - List releasablesOnNoHandoff = new ArrayList<>(); + List releasablesOnHandoffFailures = new ArrayList<>(2); try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { forceRefreshes.close(); @@ -868,13 +868,13 @@ public void relocated( // Ensures all in-flight remote store refreshes drain, before we perform the performSegRep. for (ReferenceManager.RefreshListener refreshListener : internalRefreshListener) { - if (refreshListener instanceof CloseableRetryableRefreshListener) { - releasablesOnNoHandoff.add(((CloseableRetryableRefreshListener) refreshListener).drainRefreshes()); + if (refreshListener instanceof ReleasableRetryableRefreshListener) { + releasablesOnHandoffFailures.add(((ReleasableRetryableRefreshListener) refreshListener).drainRefreshes()); } } // Ensure all in-flight remote store translog upload drains, before we perform the performSegRep. - releasablesOnNoHandoff.add(getEngineOrNull().translogManager().drainSync()); + releasablesOnHandoffFailures.add(getEngine().translogManager().drainSync()); // no shard operation permits are being held here, move state from started to relocated assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED @@ -909,14 +909,16 @@ public void relocated( // Fail primary relocation source and target shards. failShard("timed out waiting for relocation hand-off to complete", null); throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete"); + } catch (Exception ex) { + logger.warn("exception occurred during relocation hand-off to complete errorMsg={}", ex.getMessage()); + assert replicationTracker.isPrimaryMode(); + throw ex; } finally { // If the primary mode is still true after the end of handoff attempt, it basically means that the relocation // failed. The existing primary will continue to be the primary, so we need to allow the segments and translog // upload to resume. if (replicationTracker.isPrimaryMode()) { - for (Releasable releasable : releasablesOnNoHandoff) { - releasable.close(); - } + Releasables.close(releasablesOnHandoffFailures); } } } diff --git a/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java similarity index 95% rename from server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java rename to server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java index 23874603d3f52..757275932c5f1 100644 --- a/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java @@ -23,11 +23,11 @@ import java.util.concurrent.atomic.AtomicBoolean; /** - * RefreshListener that runs afterRefresh method if and only if there is a permit available. Once the listener - * is closed, all the permits are acquired and there are no available permits to afterRefresh. This abstract class provides + * RefreshListener that runs afterRefresh method if and only if there is a permit available. Once the {@code drainRefreshes()} + * is called, all the permits are acquired and there are no available permits to afterRefresh. This abstract class provides * necessary abstract methods to schedule retry. */ -public abstract class CloseableRetryableRefreshListener implements ReferenceManager.RefreshListener { +public abstract class ReleasableRetryableRefreshListener implements ReferenceManager.RefreshListener { /** * Total permits = 1 ensures that there is only single instance of runAfterRefreshWithPermit that is running at a time. @@ -48,11 +48,11 @@ public abstract class CloseableRetryableRefreshListener implements ReferenceMana */ private final AtomicBoolean retryScheduled = new AtomicBoolean(false); - public CloseableRetryableRefreshListener() { + public ReleasableRetryableRefreshListener() { this.threadPool = null; } - public CloseableRetryableRefreshListener(ThreadPool threadPool) { + public ReleasableRetryableRefreshListener(ThreadPool threadPool) { assert Objects.nonNull(threadPool); this.threadPool = threadPool; } @@ -215,7 +215,7 @@ public final Releasable drainRefreshes() { /** * Returns the timeout which is used while draining refreshes. */ - protected TimeValue getDrainTimeout() { + TimeValue getDrainTimeout() { return DRAIN_TIMEOUT; } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 17d1d3a01b196..a6f46fb666860 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -54,7 +54,7 @@ * * @opensearch.internal */ -public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshListener { +public final class RemoteStoreRefreshListener extends ReleasableRetryableRefreshListener { private final Logger logger; diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java index d7be1250c0b5b..415d7dc4d1a9d 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java @@ -28,7 +28,7 @@ public Translog newTranslog( LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier, LongConsumer persistedSequenceNumberConsumer, - BooleanSupplier primaryModeSupplier + BooleanSupplier startedPrimarySupplier ) throws IOException { return new LocalTranslog( diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 7f930eeb5ac4d..a22c538286a88 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -59,7 +59,7 @@ public InternalTranslogManager( TranslogEventListener translogEventListener, LifecycleAware engineLifeCycleAware, TranslogFactory translogFactory, - BooleanSupplier primaryModeSupplier + BooleanSupplier startedPrimarySupplier ) throws IOException { this.shardId = shardId; this.readLock = readLock; @@ -72,7 +72,7 @@ public InternalTranslogManager( if (tracker != null) { tracker.markSeqNoAsPersisted(seqNo); } - }, translogUUID, translogFactory, primaryModeSupplier); + }, translogUUID, translogFactory, startedPrimarySupplier); assert translog.getGeneration() != null; this.translog = translog; assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; @@ -369,7 +369,7 @@ protected Translog openTranslog( LongConsumer persistedSequenceNumberConsumer, String translogUUID, TranslogFactory translogFactory, - BooleanSupplier primaryModeSupplier + BooleanSupplier startedPrimarySupplier ) throws IOException { return translogFactory.newTranslog( translogConfig, @@ -378,7 +378,7 @@ protected Translog openTranslog( globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer, - primaryModeSupplier + startedPrimarySupplier ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java index 1e2cb388e690e..e100ffaabf13d 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -59,7 +59,7 @@ public Translog newTranslog( LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier, LongConsumer persistedSequenceNumberConsumer, - BooleanSupplier primaryModeSupplier + BooleanSupplier startedPrimarySupplier ) throws IOException { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; @@ -73,7 +73,7 @@ public Translog newTranslog( persistedSequenceNumberConsumer, blobStoreRepository, threadPool, - primaryModeSupplier, + startedPrimarySupplier, remoteTranslogTransferTracker ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index eb18cddc9e11c..5aeb70d8a4c91 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -59,7 +59,7 @@ public class RemoteFsTranslog extends Translog { private final Logger logger; private final TranslogTransferManager translogTransferManager; private final FileTransferTracker fileTransferTracker; - private final BooleanSupplier primaryModeSupplier; + private final BooleanSupplier startedPrimarySupplier; private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; private volatile long maxRemoteTranslogGenerationUploaded; @@ -79,8 +79,8 @@ public class RemoteFsTranslog extends Translog { private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS); // These permits exist to allow any inflight background triggered upload. - private static final int SYNC_PERMITS = 1; - private final Semaphore syncPermits = new Semaphore(SYNC_PERMITS); + private static final int SYNC_PERMIT = 1; + private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT); public RemoteFsTranslog( TranslogConfig config, @@ -91,12 +91,12 @@ public RemoteFsTranslog( LongConsumer persistedSequenceNumberConsumer, BlobStoreRepository blobStoreRepository, ThreadPool threadPool, - BooleanSupplier primaryModeSupplier, + BooleanSupplier startedPrimarySupplier, RemoteTranslogTransferTracker remoteTranslogTransferTracker ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); logger = Loggers.getLogger(getClass(), shardId); - this.primaryModeSupplier = primaryModeSupplier; + this.startedPrimarySupplier = startedPrimarySupplier; this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); this.translogTransferManager = buildTranslogTransferManager( @@ -325,14 +325,15 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws // During primary relocation, both the old and new primary have engine created with RemoteFsTranslog and having // ReplicationTracker.primaryMode() as true. However, before we perform the `internal:index/shard/replication/segments_sync` // action which re-downloads the segments and translog on the new primary. We are ensuring 2 things here - - // 1. Using primaryModeSupplier, we prevent the new primary to do pre-emptive syncs + // 1. Using startedPrimarySupplier, we prevent the new primary to do pre-emptive syncs // 2. Using syncPermits, we prevent syncs at the desired time during primary relocation. - if (primaryModeSupplier.getAsBoolean() == false || syncPermits.tryAcquire(1) == false) { - logger.debug("skipped uploading translog for {} {} uploadPermits={}", primaryTerm, generation, syncPermits.availablePermits()); + if (startedPrimarySupplier.getAsBoolean() == false || syncPermit.tryAcquire(SYNC_PERMIT) == false) { + logger.debug("skipped uploading translog for {} {} syncPermits={}", primaryTerm, generation, syncPermit.availablePermits()); // NO-OP return true; } logger.trace("uploading translog for {} {}", primaryTerm, generation); + induceDelayForTest(); try ( TranslogCheckpointTransferSnapshot transferSnapshotProvider = new TranslogCheckpointTransferSnapshot.Builder( primaryTerm, @@ -348,11 +349,16 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo) ); } finally { - syncPermits.release(1); + syncPermit.release(SYNC_PERMIT); } } + // Made available for testing only + void induceDelayForTest() { + + } + // Visible for testing public Set allUploaded() { return fileTransferTracker.allUploaded(); @@ -434,12 +440,12 @@ protected void setMinSeqNoToKeep(long seqNo) { @Override protected Releasable drainSync() { try { - if (syncPermits.tryAcquire(SYNC_PERMITS, 1, TimeUnit.MINUTES)) { - logger.info("All permits acquired"); + if (syncPermit.tryAcquire(SYNC_PERMIT, 1, TimeUnit.MINUTES)) { + logger.info("All inflight remote translog syncs finished and further syncs paused"); return Releasables.releaseOnce(() -> { - syncPermits.release(SYNC_PERMITS); - assert syncPermits.availablePermits() == SYNC_PERMITS : "Available permits is " + syncPermits.availablePermits(); - logger.info("All permits released"); + syncPermit.release(SYNC_PERMIT); + assert syncPermit.availablePermits() == SYNC_PERMIT : "Available permits is " + syncPermit.availablePermits(); + logger.info("Resumed remote translog sync back on relocation failure"); }); } else { throw new TimeoutException("Timeout while acquiring all permits"); @@ -456,7 +462,7 @@ public void trimUnreferencedReaders() throws IOException { // This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote // store. - if (syncPermits.availablePermits() == 0) { + if (syncPermit.availablePermits() != SYNC_PERMIT) { return; } @@ -598,4 +604,9 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) { return minSeqNoToKeep; } + + // Visible for testing + int availablePermits() { + return syncPermit.availablePermits(); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java index a139b10f563b2..4300435093b5d 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java @@ -32,6 +32,6 @@ Translog newTranslog( final LongSupplier globalCheckpointSupplier, final LongSupplier primaryTermSupplier, final LongConsumer persistedSequenceNumberConsumer, - final BooleanSupplier primaryModeSupplier + final BooleanSupplier startedPrimarySupplier ) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java index adeeb213b2913..a7a524ad78e95 100644 --- a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java @@ -38,7 +38,7 @@ public WriteOnlyTranslogManager( TranslogEventListener translogEventListener, LifecycleAware engineLifecycleAware, TranslogFactory translogFactory, - BooleanSupplier primaryModeSupplier + BooleanSupplier startedPrimarySupplier ) throws IOException { super( translogConfig, @@ -52,7 +52,7 @@ public WriteOnlyTranslogManager( translogEventListener, engineLifecycleAware, translogFactory, - primaryModeSupplier + startedPrimarySupplier ); } diff --git a/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/ReleasableRetryableRefreshListenerTests.java similarity index 89% rename from server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java rename to server/src/test/java/org/opensearch/index/shard/ReleasableRetryableRefreshListenerTests.java index 13694043c450a..a0641c365a2a1 100644 --- a/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReleasableRetryableRefreshListenerTests.java @@ -27,9 +27,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class CloseableRetryableRefreshListenerTests extends OpenSearchTestCase { +public class ReleasableRetryableRefreshListenerTests extends OpenSearchTestCase { - private static final Logger logger = LogManager.getLogger(CloseableRetryableRefreshListenerTests.class); + private static final Logger logger = LogManager.getLogger(ReleasableRetryableRefreshListenerTests.class); private ThreadPool threadPool; @@ -44,7 +44,7 @@ public void init() { public void testPerformAfterRefresh() throws IOException { CountDownLatch countDownLatch = new CountDownLatch(2); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(mock(ThreadPool.class)) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -76,7 +76,7 @@ protected Logger getLogger() { public void testCloseAfterRefresh() throws IOException { final int initialCount = randomIntBetween(10, 100); final CountDownLatch countDownLatch = new CountDownLatch(initialCount); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(mock(ThreadPool.class)) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -113,7 +113,7 @@ protected Logger getLogger() { public void testNoRetry() throws IOException { int initialCount = randomIntBetween(10, 100); final CountDownLatch countDownLatch = new CountDownLatch(initialCount); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(mock(ThreadPool.class)) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -132,7 +132,7 @@ protected Logger getLogger() { assertEquals(initialCount - 1, countDownLatch.getCount()); testRefreshListener.drainRefreshes(); - testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -151,7 +151,7 @@ protected Logger getLogger() { assertEquals(initialCount - 2, countDownLatch.getCount()); testRefreshListener.drainRefreshes(); - testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -175,7 +175,7 @@ protected Logger getLogger() { assertEquals(initialCount - 3, countDownLatch.getCount()); testRefreshListener.drainRefreshes(); - testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -206,7 +206,7 @@ protected Logger getLogger() { public void testRetry() throws Exception { int initialCount = randomIntBetween(10, 20); final CountDownLatch countDownLatch = new CountDownLatch(initialCount); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -247,7 +247,7 @@ protected boolean isRetryEnabled() { public void testCloseWithRetryPending() throws IOException { int initialCount = randomIntBetween(10, 20); final CountDownLatch countDownLatch = new CountDownLatch(initialCount); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -280,7 +280,7 @@ protected Logger getLogger() { public void testCloseWaitsForAcquiringAllPermits() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(1); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { try { @@ -316,7 +316,7 @@ protected Logger getLogger() { public void testScheduleRetryAfterClose() throws Exception { // This tests that once the listener has been closed, even the retries would not be scheduled. final AtomicLong runCount = new AtomicLong(); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { try { @@ -378,7 +378,7 @@ public void testConcurrentScheduleRetry() throws Exception { // This tests that there can be only 1 retry that can be scheduled at a time. final AtomicLong runCount = new AtomicLong(); final AtomicInteger retryCount = new AtomicInteger(0); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { retryCount.incrementAndGet(); @@ -422,7 +422,7 @@ public void testExceptionDuringThreadPoolSchedule() throws Exception { AtomicInteger runCount = new AtomicInteger(); ThreadPool mockThreadPool = mock(ThreadPool.class); when(mockThreadPool.schedule(any(), any(), any())).thenThrow(new RuntimeException()); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mockThreadPool) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(mockThreadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { runCount.incrementAndGet(); @@ -461,7 +461,7 @@ protected boolean isRetryEnabled() { public void testTimeoutDuringClose() throws Exception { // This test checks the expected behaviour when the drainRefreshes times out. - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(mock(ThreadPool.class)) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { try { @@ -481,7 +481,7 @@ protected Logger getLogger() { } @Override - protected TimeValue getDrainTimeout() { + TimeValue getDrainTimeout() { return TimeValue.timeValueSeconds(1); } }; @@ -502,7 +502,7 @@ protected TimeValue getDrainTimeout() { public void testThreadInterruptDuringClose() throws Exception { // This test checks the expected behaviour when the thread performing the drainRefresh is interrupted. CountDownLatch latch = new CountDownLatch(2); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(mock(ThreadPool.class)) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { try { @@ -522,7 +522,7 @@ protected Logger getLogger() { } @Override - protected TimeValue getDrainTimeout() { + TimeValue getDrainTimeout() { return TimeValue.timeValueSeconds(2); } }; @@ -551,7 +551,7 @@ protected TimeValue getDrainTimeout() { public void testResumeRefreshesAfterDrainRefreshes() { // This test checks the expected behaviour when the refresh listener is drained, but then refreshes are resumed again // by closing the releasables acquired by calling the drainRefreshes method. - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(mock(ThreadPool.class)) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { return true; @@ -578,12 +578,12 @@ public void tearDown() throws Exception { terminate(threadPool); } - private void assertRefreshListenerClosed(CloseableRetryableRefreshListener testRefreshListener) { + private void assertRefreshListenerClosed(ReleasableRetryableRefreshListener testRefreshListener) { assertTrue(testRefreshListener.isClosed()); assertEquals(0, testRefreshListener.availablePermits()); } - private void assertRefreshListenerOpen(CloseableRetryableRefreshListener testRefreshListener) { + private void assertRefreshListenerOpen(ReleasableRetryableRefreshListener testRefreshListener) { assertFalse(testRefreshListener.isClosed()); assertEquals(1, testRefreshListener.availablePermits()); } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 3cb65610fab58..3a5bf7f581e41 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -25,8 +25,11 @@ import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.bytes.ReleasableBytesReference; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.io.IOUtils; @@ -125,6 +128,7 @@ public class RemoteFsTranslogTests extends OpenSearchTestCase { private ThreadPool threadPool; private final static String METADATA_DIR = "metadata"; private final static String DATA_DIR = "data"; + private volatile boolean slowDownEnsureOpen = false; AtomicInteger writeCalls = new AtomicInteger(); BlobStoreRepository repository; @@ -187,7 +191,19 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin threadPool, primaryMode::get, new RemoteTranslogTransferTracker(shardId, 10) - ); + ) { + @Override + void induceDelayForTest() { + // We are inducing 2 seconds delay for testing out available permits + if (slowDownEnsureOpen) { + try { + Thread.sleep(TimeValue.timeValueSeconds(2).millis()); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + } + }; } private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException { @@ -819,6 +835,79 @@ public void testMetadataFileDeletion() throws Exception { } } + public void testDrainSync() throws Exception { + // This test checks following scenarios - + // 1. During ongoing uploads, the available permits are 0. + // 2. During an upload, if drainSync is called, it will wait for it to acquire and available permits are 0. + // 3. After drainSync, if trimUnreferencedReaders is attempted, we do not delete from remote store. + // 4. After drainSync, if an upload is an attempted, we do not upload to remote store. + ArrayList ops = new ArrayList<>(); + assertEquals(0, translog.allUploaded().size()); + assertEquals(1, translog.readers.size()); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index(String.valueOf(0), 0, primaryTerm.get(), new byte[] { 1 })); + assertEquals(4, translog.allUploaded().size()); + assertEquals(2, translog.readers.size()); + assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + + translog.setMinSeqNoToKeep(0); + translog.trimUnreferencedReaders(); + assertEquals(1, translog.readers.size()); + + // Case 1 - During ongoing uploads, the available permits are 0. + slowDownEnsureOpen = true; + CountDownLatch latch = new CountDownLatch(1); + Thread thread1 = new Thread(() -> { + try { + addToTranslogAndListAndUpload(translog, ops, new Translog.Index(String.valueOf(1), 1, primaryTerm.get(), new byte[] { 1 })); + assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + latch.countDown(); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + thread1.start(); + assertBusy(() -> assertEquals(0, translog.availablePermits())); + // Case 2 - During an upload, if drainSync is called, it will wait for it to acquire and available permits are 0. + Releasable releasable = translog.drainSync(); + assertBusy(() -> assertEquals(0, latch.getCount())); + assertEquals(0, translog.availablePermits()); + slowDownEnsureOpen = false; + assertEquals(6, translog.allUploaded().size()); + assertEquals(2, translog.readers.size()); + Set mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); + + // Case 3 - After drainSync, if trimUnreferencedReaders is attempted, we do not delete from remote store. + translog.setMinSeqNoToKeep(1); + translog.trimUnreferencedReaders(); + assertEquals(1, translog.readers.size()); + assertEquals(6, translog.allUploaded().size()); + assertEquals(mdFiles, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR))); + + // Case 4 - After drainSync, if an upload is an attempted, we do not upload to remote store. + Translog.Location loc = addToTranslogAndListAndUpload( + translog, + ops, + new Translog.Index(String.valueOf(2), 2, primaryTerm.get(), new byte[] { 1 }) + ); + assertEquals(2, translog.readers.size()); + assertEquals(6, translog.allUploaded().size()); + assertEquals(mdFiles, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR))); + + // Refill the permits back + Releasables.close(releasable); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index(String.valueOf(3), 3, primaryTerm.get(), new byte[] { 1 })); + assertEquals(3, translog.readers.size()); + assertEquals(10, translog.allUploaded().size()); + assertEquals(3, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + + translog.setMinSeqNoToKeep(3); + translog.trimUnreferencedReaders(); + assertEquals(1, translog.readers.size()); + assertBusy(() -> assertEquals(6, translog.allUploaded().size())); + assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + } + private BlobPath getTranslogDirectory() { return repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(TRANSLOG); } From 8b961ac8222d892c543183b7c624bd230dcef9cc Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sun, 26 Nov 2023 15:52:43 +0530 Subject: [PATCH 06/11] Add IT for multiple writer validation Signed-off-by: Ashish Singh --- .../RemoteStoreBaseIntegTestCase.java | 16 +++ .../opensearch/remotestore/RemoteStoreIT.java | 121 +++++++++++++++--- 2 files changed, 117 insertions(+), 20 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 8b4981a15433a..0c68b11ebff56 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -8,6 +8,8 @@ package org.opensearch.remotestore; +import org.opensearch.action.admin.indices.get.GetIndexRequest; +import org.opensearch.action.admin.indices.get.GetIndexResponse; import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; @@ -23,9 +25,13 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.index.Index; import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -43,6 +49,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -380,4 +387,13 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { return filesExisting.get(); } + + protected IndexShard getIndexShard(String dataNode, String indexName) throws ExecutionException, InterruptedException { + String clusterManagerName = internalCluster().getClusterManagerName(); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode); + GetIndexResponse getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get(); + String uuid = getIndexResponse.getSettings().get(indexName).get(IndexMetadata.SETTING_INDEX_UUID); + IndexService indexService = indicesService.indexService(new Index(indexName, uuid)); + return indexService.getShard(0); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 28294686d4370..b8c5fe0351a4a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -8,30 +8,34 @@ package org.opensearch.remotestore; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; -import org.opensearch.action.admin.indices.get.GetIndexRequest; -import org.opensearch.action.admin.indices.get.GetIndexResponse; +import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.action.index.IndexResponse; +import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor; -import org.opensearch.core.index.Index; -import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.index.translog.Translog.Durability; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; import org.hamcrest.MatcherAssert; import java.io.IOException; @@ -42,6 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -180,7 +185,7 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception { .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata"); - IndexShard indexShard = getIndexShard(dataNode); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); int lastNMetadataFilesToKeep = indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles(); // Delete is async. assertBusy(() -> { @@ -265,7 +270,7 @@ public void testDefaultBufferInterval() throws ExecutionException, InterruptedEx ensureGreen(INDEX_NAME); assertClusterRemoteBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, dataNode); - IndexShard indexShard = getIndexShard(dataNode); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor); assertBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, indexShard); @@ -298,7 +303,7 @@ public void testOverriddenBufferInterval() throws ExecutionException, Interrupte ensureYellowAndNoInitializingShards(INDEX_NAME); ensureGreen(INDEX_NAME); - IndexShard indexShard = getIndexShard(dataNode); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor); assertBufferInterval(bufferInterval, indexShard); @@ -414,7 +419,7 @@ private void testRestrictSettingFalse(boolean setRestrictFalse, Durability durab .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability) .build(); createIndex(INDEX_NAME, indexSettings); - IndexShard indexShard = getIndexShard(dataNode); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); assertEquals(durability, indexShard.indexSettings().getTranslogDurability()); durability = randomFrom(Durability.values()); @@ -447,7 +452,7 @@ public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() throws E // Case 2 - Test update index fails createIndex(INDEX_NAME); - IndexShard indexShard = getIndexShard(dataNode); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); assertEquals(Durability.REQUEST, indexShard.indexSettings().getTranslogDurability()); exception = assertThrows( IllegalArgumentException.class, @@ -459,15 +464,6 @@ public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() throws E assertEquals(expectedExceptionMsg, exception.getMessage()); } - private IndexShard getIndexShard(String dataNode) throws ExecutionException, InterruptedException { - String clusterManagerName = internalCluster().getClusterManagerName(); - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode); - GetIndexResponse getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get(); - String uuid = getIndexResponse.getSettings().get(INDEX_NAME).get(IndexMetadata.SETTING_INDEX_UUID); - IndexService indexService = indicesService.indexService(new Index(INDEX_NAME, uuid)); - return indexService.getShard(0); - } - private void assertClusterRemoteBufferInterval(TimeValue expectedBufferInterval, String dataNode) { IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode); assertEquals(expectedBufferInterval, indicesService.getClusterRemoteTranslogBufferInterval()); @@ -559,7 +555,7 @@ public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, Inte createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); ensureGreen(INDEX_NAME); - IndexShard indexShard = getIndexShard(primaryShardNode); + IndexShard indexShard = getIndexShard(primaryShardNode, INDEX_NAME); assertFalse(indexShard.isSearchIdleSupported()); String replicaShardNode = internalCluster().startDataOnlyNodes(1).get(0); @@ -572,7 +568,7 @@ public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, Inte ensureGreen(INDEX_NAME); assertFalse(indexShard.isSearchIdleSupported()); - indexShard = getIndexShard(replicaShardNode); + indexShard = getIndexShard(replicaShardNode, INDEX_NAME); assertFalse(indexShard.isSearchIdleSupported()); } @@ -621,4 +617,89 @@ public void testFallbackToNodeToNodeSegmentCopy() throws Exception { assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); }); } + + public void testNoMultipleWriterDuringPrimaryRelocation() throws ExecutionException, InterruptedException { + // In this test, we trigger a force flush on existing primary while the primary mode on new primary has been + // activated. There was a bug in primary relocation of remote store enabled indexes where the new primary + // starts uploading translog and segments even before the cluster manager has started this shard. With this test, + // we check that we do not overwrite any file on remote store. Here we will also increase the replica count to + // check that there are no duplicate metadata files for translog or upload. + + internalCluster().startClusterManagerOnlyNode(); + String oldPrimary = internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureGreen(INDEX_NAME); + indexBulk(INDEX_NAME, randomIntBetween(5, 10)); + String newPrimary = internalCluster().startDataOnlyNodes(1).get(0); + ensureStableCluster(3); + + IndexShard oldPrimaryIndexShard = getIndexShard(oldPrimary, INDEX_NAME); + CountDownLatch flushLatch = new CountDownLatch(1); + + MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + oldPrimary + )); + mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT.equals(action)) { + flushLatch.countDown(); + } + connection.sendRequest(requestId, action, request, options); + }); + + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary)) + .execute() + .actionGet(); + + CountDownLatch flushDone = new CountDownLatch(1); + Thread flushThread = new Thread(() -> { + try { + flushLatch.await(2, TimeUnit.SECONDS); + oldPrimaryIndexShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + // newPrimaryTranslogRepo.setSleepSeconds(0); + } catch (IndexShardClosedException e) { + // this is fine + } catch (InterruptedException e) { + throw new AssertionError(e); + } finally { + flushDone.countDown(); + } + }); + flushThread.start(); + flushDone.await(5, TimeUnit.SECONDS); + flushThread.join(); + + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForStatus(ClusterHealthStatus.GREEN) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(TimeValue.timeValueSeconds(5)) + .execute() + .actionGet(); + assertFalse(clusterHealthResponse.isTimedOut()); + + client().admin() + .indices() + .updateSettings( + new UpdateSettingsRequest(INDEX_NAME).settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ) + .get(); + + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForStatus(ClusterHealthStatus.GREEN) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(TimeValue.timeValueSeconds(5)) + .execute() + .actionGet(); + assertFalse(clusterHealthResponse.isTimedOut()); + } } From f357d57e7290c94ad4070536236e25b97c23a33c Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sun, 26 Nov 2023 22:35:23 +0530 Subject: [PATCH 07/11] Incorporate PR review feedback Signed-off-by: Ashish Singh --- .../opensearch/index/shard/IndexShard.java | 7 +- .../shard/RemoteStoreRefreshListener.java | 2 +- .../index/translog/LocalTranslog.java | 6 ++ .../index/translog/RemoteFsTranslog.java | 6 -- .../opensearch/index/translog/Translog.java | 4 +- .../index/translog/RemoteFsTranslogTests.java | 72 +++++++++++-------- .../index/translog/TestTranslog.java | 12 ++++ 7 files changed, 66 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5bc99fb96ee7b..7704a76d85a48 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -912,14 +912,11 @@ public void relocated( } catch (Exception ex) { logger.warn("exception occurred during relocation hand-off to complete errorMsg={}", ex.getMessage()); assert replicationTracker.isPrimaryMode(); - throw ex; - } finally { // If the primary mode is still true after the end of handoff attempt, it basically means that the relocation // failed. The existing primary will continue to be the primary, so we need to allow the segments and translog // upload to resume. - if (replicationTracker.isPrimaryMode()) { - Releasables.close(releasablesOnHandoffFailures); - } + Releasables.close(releasablesOnHandoffFailures); + throw ex; } } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index a6f46fb666860..d96a7e7c95ecf 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -484,7 +484,7 @@ private void initializeRemoteDirectoryOnTermUpdate() throws IOException { /** * This checks for readiness of the index shard and primary mode. This has separated from shouldSync since we use the * returned value of this method for scheduling retries in syncSegments method. - * @return true iff primaryMode is true and index shard is not in closed state. + * @return true iff the shard is a started with primary mode true or it is local or snapshot recovery. */ private boolean isReadyForUpload() { boolean isReady = indexShard.isStartedPrimary() || isLocalOrSnapshotRecovery(); diff --git a/server/src/main/java/org/opensearch/index/translog/LocalTranslog.java b/server/src/main/java/org/opensearch/index/translog/LocalTranslog.java index 22dba3973cfc1..7664631e0ed07 100644 --- a/server/src/main/java/org/opensearch/index/translog/LocalTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/LocalTranslog.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; @@ -140,6 +141,11 @@ public TranslogStats stats() { } } + @Override + Releasable drainSync() { + return () -> {}; // noop + } + @Override public void close() throws IOException { assert Translog.calledFromOutsideOrViaTragedyClose() diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 5aeb70d8a4c91..594713ee10ca2 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -333,7 +333,6 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws return true; } logger.trace("uploading translog for {} {}", primaryTerm, generation); - induceDelayForTest(); try ( TranslogCheckpointTransferSnapshot transferSnapshotProvider = new TranslogCheckpointTransferSnapshot.Builder( primaryTerm, @@ -354,11 +353,6 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws } - // Made available for testing only - void induceDelayForTest() { - - } - // Visible for testing public Set allUploaded() { return fileTransferTracker.allUploaded(); diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index cb64db19c8337..9f877e87415dd 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -1820,9 +1820,7 @@ protected void onDelete() {} /** * Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back. */ - protected Releasable drainSync() { - return () -> {}; // noop - } + abstract Releasable drainSync(); /** * deletes all files associated with a reader. package-private to be able to simulate node failures at this point diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 3a5bf7f581e41..026749e7903dc 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -29,7 +29,6 @@ import org.opensearch.common.lease.Releasables; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.io.IOUtils; @@ -128,8 +127,6 @@ public class RemoteFsTranslogTests extends OpenSearchTestCase { private ThreadPool threadPool; private final static String METADATA_DIR = "metadata"; private final static String DATA_DIR = "data"; - private volatile boolean slowDownEnsureOpen = false; - AtomicInteger writeCalls = new AtomicInteger(); BlobStoreRepository repository; @@ -137,6 +134,8 @@ public class RemoteFsTranslogTests extends OpenSearchTestCase { TestTranslog.FailSwitch fail; + TestTranslog.SlowDownWriteSwitch slowDown; + private LongConsumer getPersistedSeqNoConsumer() { return seqNo -> { final LongConsumer consumer = persistedSeqNoConsumer.get(); @@ -191,19 +190,7 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin threadPool, primaryMode::get, new RemoteTranslogTransferTracker(shardId, 10) - ) { - @Override - void induceDelayForTest() { - // We are inducing 2 seconds delay for testing out available permits - if (slowDownEnsureOpen) { - try { - Thread.sleep(TimeValue.timeValueSeconds(2).millis()); - } catch (InterruptedException e) { - throw new AssertionError(e); - } - } - } - }; + ); } private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException { @@ -244,13 +231,15 @@ private BlobStoreRepository createRepository() { final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); fail = new TestTranslog.FailSwitch(); fail.failNever(); + slowDown = new TestTranslog.SlowDownWriteSwitch(); final FsRepository repository = new ThrowingBlobRepository( repositoryMetadata, createEnvironment(), xContentRegistry(), clusterService, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), - fail + fail, + slowDown ) { @Override protected void assertSnapshotOrGenericThread() { @@ -855,7 +844,7 @@ public void testDrainSync() throws Exception { assertEquals(1, translog.readers.size()); // Case 1 - During ongoing uploads, the available permits are 0. - slowDownEnsureOpen = true; + slowDown.setSleepSeconds(2); CountDownLatch latch = new CountDownLatch(1); Thread thread1 = new Thread(() -> { try { @@ -872,7 +861,7 @@ public void testDrainSync() throws Exception { Releasable releasable = translog.drainSync(); assertBusy(() -> assertEquals(0, latch.getCount())); assertEquals(0, translog.availablePermits()); - slowDownEnsureOpen = false; + slowDown.setSleepSeconds(0); assertEquals(6, translog.allUploaded().size()); assertEquals(2, translog.readers.size()); Set mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); @@ -1713,9 +1702,10 @@ public void testDownloadWithRetries() throws IOException { } public class ThrowingBlobRepository extends FsRepository { - private final Environment environment; - private TestTranslog.FailSwitch fail; + private final Environment environment; + private final TestTranslog.FailSwitch fail; + private final TestTranslog.SlowDownWriteSwitch slowDown; public ThrowingBlobRepository( RepositoryMetadata metadata, @@ -1723,33 +1713,43 @@ public ThrowingBlobRepository( NamedXContentRegistry namedXContentRegistry, ClusterService clusterService, RecoverySettings recoverySettings, - TestTranslog.FailSwitch fail + TestTranslog.FailSwitch fail, + TestTranslog.SlowDownWriteSwitch slowDown ) { super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings); this.environment = environment; this.fail = fail; + this.slowDown = slowDown; } protected BlobStore createBlobStore() throws Exception { final String location = REPOSITORIES_LOCATION_SETTING.get(getMetadata().settings()); final Path locationFile = environment.resolveRepoFile(location); - return new ThrowingBlobStore(bufferSize, locationFile, isReadOnly(), fail); + return new ThrowingBlobStore(bufferSize, locationFile, isReadOnly(), fail, slowDown); } } private class ThrowingBlobStore extends FsBlobStore { - private TestTranslog.FailSwitch fail; + private final TestTranslog.FailSwitch fail; + private final TestTranslog.SlowDownWriteSwitch slowDown; - public ThrowingBlobStore(int bufferSizeInBytes, Path path, boolean readonly, TestTranslog.FailSwitch fail) throws IOException { + public ThrowingBlobStore( + int bufferSizeInBytes, + Path path, + boolean readonly, + TestTranslog.FailSwitch fail, + TestTranslog.SlowDownWriteSwitch slowDown + ) throws IOException { super(bufferSizeInBytes, path, readonly); this.fail = fail; + this.slowDown = slowDown; } @Override public BlobContainer blobContainer(BlobPath path) { try { - return new ThrowingBlobContainer(this, path, buildAndCreate(path), fail); + return new ThrowingBlobContainer(this, path, buildAndCreate(path), fail, slowDown); } catch (IOException ex) { throw new OpenSearchException("failed to create blob container", ex); } @@ -1759,17 +1759,33 @@ public BlobContainer blobContainer(BlobPath path) { private class ThrowingBlobContainer extends FsBlobContainer { private TestTranslog.FailSwitch fail; - - public ThrowingBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path, TestTranslog.FailSwitch fail) { + private final TestTranslog.SlowDownWriteSwitch slowDown; + + public ThrowingBlobContainer( + FsBlobStore blobStore, + BlobPath blobPath, + Path path, + TestTranslog.FailSwitch fail, + TestTranslog.SlowDownWriteSwitch slowDown + ) { super(blobStore, blobPath, path); this.fail = fail; + this.slowDown = slowDown; } + @Override public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists) throws IOException { if (fail.fail()) { throw new IOException("blob container throwing error"); } + if (slowDown.getSleepSeconds() > 0) { + try { + Thread.sleep(slowDown.getSleepSeconds() * 1000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } super.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists); } } diff --git a/server/src/test/java/org/opensearch/index/translog/TestTranslog.java b/server/src/test/java/org/opensearch/index/translog/TestTranslog.java index fd4be1d7a8635..01c8844b51b02 100644 --- a/server/src/test/java/org/opensearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/opensearch/index/translog/TestTranslog.java @@ -335,6 +335,18 @@ public void onceFailedFailAlways() { } } + static class SlowDownWriteSwitch { + private volatile int sleepSeconds; + + public void setSleepSeconds(int sleepSeconds) { + this.sleepSeconds = sleepSeconds; + } + + public int getSleepSeconds() { + return sleepSeconds; + } + } + static class SortedSnapshot implements Translog.Snapshot { private final Translog.Snapshot snapshot; private List operations = null; From 06f034ad53beacba1cd616609725dd91bd206c82 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sun, 26 Nov 2023 23:27:44 +0530 Subject: [PATCH 08/11] Add integ test for resuming indexing on relocation failure after drain step Signed-off-by: Ashish Singh --- .../opensearch/remotestore/RemoteStoreIT.java | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index b8c5fe0351a4a..25146d53b5654 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -8,6 +8,7 @@ package org.opensearch.remotestore; +import org.opensearch.OpenSearchException; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; @@ -16,6 +17,7 @@ import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchPhaseExecutionException; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RecoverySource; @@ -702,4 +704,61 @@ public void testNoMultipleWriterDuringPrimaryRelocation() throws ExecutionExcept .actionGet(); assertFalse(clusterHealthResponse.isTimedOut()); } + + public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionException, InterruptedException { + // In this test, we fail the hand off during the primary relocation. This will undo the drainRefreshes and + // drainSync performed as part of relocation handoff (before performing the handoff transport action). + // We validate the same here by failing the peer recovery and ensuring we can index afterward as well. + + internalCluster().startClusterManagerOnlyNode(); + String oldPrimary = internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureGreen(INDEX_NAME); + int docs = randomIntBetween(5, 10); + indexBulk(INDEX_NAME, docs); + flushAndRefresh(INDEX_NAME); + assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), docs); + String newPrimary = internalCluster().startDataOnlyNodes(1).get(0); + ensureStableCluster(3); + + IndexShard oldPrimaryIndexShard = getIndexShard(oldPrimary, INDEX_NAME); + CountDownLatch handOffLatch = new CountDownLatch(1); + + MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + oldPrimary + )); + mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT.equals(action)) { + handOffLatch.countDown(); + throw new OpenSearchException("failing recovery for test purposes"); + } + connection.sendRequest(requestId, action, request, options); + }); + + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary)) + .execute() + .actionGet(); + + handOffLatch.await(30, TimeUnit.SECONDS); + + assertTrue(oldPrimaryIndexShard.isStartedPrimary()); + assertEquals(oldPrimary, primaryNodeName(INDEX_NAME)); + assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), docs); + + SearchPhaseExecutionException ex = assertThrows( + SearchPhaseExecutionException.class, + () -> client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get() + ); + assertEquals("all shards failed", ex.getMessage()); + + int moreDocs = randomIntBetween(5, 10); + indexBulk(INDEX_NAME, moreDocs); + flushAndRefresh(INDEX_NAME); + assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), docs + moreDocs); + } } From 112bff6c0b9699ad2465bbe68add95ef9bab4bcf Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sun, 26 Nov 2023 23:42:14 +0530 Subject: [PATCH 09/11] Incorporate PR review feedback Signed-off-by: Ashish Singh --- .../org/opensearch/index/translog/RemoteFsTranslog.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 594713ee10ca2..effbc4b9d7fa4 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -42,6 +42,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; @@ -81,6 +82,7 @@ public class RemoteFsTranslog extends Translog { // These permits exist to allow any inflight background triggered upload. private static final int SYNC_PERMIT = 1; private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT); + private final AtomicBoolean pauseSync = new AtomicBoolean(false); public RemoteFsTranslog( TranslogConfig config, @@ -435,10 +437,14 @@ protected void setMinSeqNoToKeep(long seqNo) { protected Releasable drainSync() { try { if (syncPermit.tryAcquire(SYNC_PERMIT, 1, TimeUnit.MINUTES)) { + boolean result = pauseSync.compareAndSet(false, true); + assert result && syncPermit.availablePermits() == 0; logger.info("All inflight remote translog syncs finished and further syncs paused"); return Releasables.releaseOnce(() -> { syncPermit.release(SYNC_PERMIT); + boolean wasSyncPaused = pauseSync.getAndSet(false); assert syncPermit.availablePermits() == SYNC_PERMIT : "Available permits is " + syncPermit.availablePermits(); + assert wasSyncPaused : "RemoteFsTranslog sync was not paused before re-enabling it"; logger.info("Resumed remote translog sync back on relocation failure"); }); } else { @@ -456,7 +462,7 @@ public void trimUnreferencedReaders() throws IOException { // This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote // store. - if (syncPermit.availablePermits() != SYNC_PERMIT) { + if (pauseSync.get()) { return; } From a27e03076eec2e38ac208ae455c8ccf7f143ec5d Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 27 Nov 2023 12:38:09 +0530 Subject: [PATCH 10/11] Address comment - Move upload check in prepareAndUpload Signed-off-by: Ashish Singh --- .../opensearch/index/shard/IndexShard.java | 1 - .../index/translog/RemoteFsTranslog.java | 22 +++++++++---------- .../index/translog/RemoteFsTranslogTests.java | 8 +++---- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 7704a76d85a48..7f9e5f31d1976 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -910,7 +910,6 @@ public void relocated( failShard("timed out waiting for relocation hand-off to complete", null); throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete"); } catch (Exception ex) { - logger.warn("exception occurred during relocation hand-off to complete errorMsg={}", ex.getMessage()); assert replicationTracker.isPrimaryMode(); // If the primary mode is still true after the end of handoff attempt, it basically means that the relocation // failed. The existing primary will continue to be the primary, so we need to allow the segments and translog diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index effbc4b9d7fa4..7b969a37e4aa6 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -275,6 +275,16 @@ public void rollGeneration() throws IOException { } private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOException { + // During primary relocation, both the old and new primary have engine created with RemoteFsTranslog and having + // ReplicationTracker.primaryMode() as true. However, before we perform the `internal:index/shard/replication/segments_sync` + // action which re-downloads the segments and translog on the new primary. We are ensuring 2 things here - + // 1. Using startedPrimarySupplier, we prevent the new primary to do pre-emptive syncs + // 2. Using syncPermits, we prevent syncs at the desired time during primary relocation. + if (startedPrimarySupplier.getAsBoolean() == false || syncPermit.tryAcquire(SYNC_PERMIT) == false) { + logger.debug("skipped uploading translog for {} {} syncPermits={}", primaryTerm, generation, syncPermit.availablePermits()); + // NO-OP + return false; + } long maxSeqNo = -1; try (Releasable ignored = writeLock.acquire()) { if (generation == null || generation == current.getGeneration()) { @@ -324,16 +334,6 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc } private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws IOException { - // During primary relocation, both the old and new primary have engine created with RemoteFsTranslog and having - // ReplicationTracker.primaryMode() as true. However, before we perform the `internal:index/shard/replication/segments_sync` - // action which re-downloads the segments and translog on the new primary. We are ensuring 2 things here - - // 1. Using startedPrimarySupplier, we prevent the new primary to do pre-emptive syncs - // 2. Using syncPermits, we prevent syncs at the desired time during primary relocation. - if (startedPrimarySupplier.getAsBoolean() == false || syncPermit.tryAcquire(SYNC_PERMIT) == false) { - logger.debug("skipped uploading translog for {} {} syncPermits={}", primaryTerm, generation, syncPermit.availablePermits()); - // NO-OP - return true; - } logger.trace("uploading translog for {} {}", primaryTerm, generation); try ( TranslogCheckpointTransferSnapshot transferSnapshotProvider = new TranslogCheckpointTransferSnapshot.Builder( @@ -462,7 +462,7 @@ public void trimUnreferencedReaders() throws IOException { // This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote // store. - if (pauseSync.get()) { + if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) { return; } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 026749e7903dc..6bfab278993ed 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -879,21 +879,21 @@ public void testDrainSync() throws Exception { ops, new Translog.Index(String.valueOf(2), 2, primaryTerm.get(), new byte[] { 1 }) ); - assertEquals(2, translog.readers.size()); + assertEquals(1, translog.readers.size()); assertEquals(6, translog.allUploaded().size()); assertEquals(mdFiles, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR))); // Refill the permits back Releasables.close(releasable); addToTranslogAndListAndUpload(translog, ops, new Translog.Index(String.valueOf(3), 3, primaryTerm.get(), new byte[] { 1 })); - assertEquals(3, translog.readers.size()); - assertEquals(10, translog.allUploaded().size()); + assertEquals(2, translog.readers.size()); + assertEquals(8, translog.allUploaded().size()); assertEquals(3, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); translog.setMinSeqNoToKeep(3); translog.trimUnreferencedReaders(); assertEquals(1, translog.readers.size()); - assertBusy(() -> assertEquals(6, translog.allUploaded().size())); + assertBusy(() -> assertEquals(4, translog.allUploaded().size())); assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); } From ea629f41c25e0bf5015a23594cd24b169b4690e6 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 28 Nov 2023 10:26:07 +0530 Subject: [PATCH 11/11] Add more checks in testResumeUploadAfterFailedPrimaryRelocation IT Signed-off-by: Ashish Singh --- .../remotestore/BaseRemoteStoreRestoreIT.java | 15 ------- .../RemoteStoreBaseIntegTestCase.java | 15 +++++++ .../opensearch/remotestore/RemoteStoreIT.java | 43 ++++++++++++++++++- 3 files changed, 57 insertions(+), 16 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java index 99c5d7fb2bae7..d29dacb001434 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java @@ -8,9 +8,7 @@ package org.opensearch.remotestore; -import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.index.IndexResponse; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.common.settings.Settings; import org.opensearch.plugins.Plugin; import org.opensearch.test.transport.MockTransportService; @@ -20,7 +18,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; public class BaseRemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase { @@ -49,18 +46,6 @@ protected void restore(String... indices) { restore(randomBoolean(), indices); } - protected void restore(boolean restoreAllShards, String... indices) { - if (restoreAllShards) { - assertAcked(client().admin().indices().prepareClose(indices)); - } - client().admin() - .cluster() - .restoreRemoteStore( - new RestoreRemoteStoreRequest().indices(indices).restoreAllShards(restoreAllShards), - PlainActionFuture.newFuture() - ); - } - protected void verifyRestoredData(Map indexStats, String indexName, boolean indexMoreData) throws Exception { ensureYellowAndNoInitializingShards(indexName); ensureGreen(indexName); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 0c68b11ebff56..8c15ebd0505d9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -8,6 +8,7 @@ package org.opensearch.remotestore; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.indices.get.GetIndexRequest; import org.opensearch.action.admin.indices.get.GetIndexResponse; import org.opensearch.action.bulk.BulkItemResponse; @@ -16,6 +17,7 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; @@ -58,6 +60,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { protected static final String REPOSITORY_NAME = "test-remote-store-repo"; @@ -396,4 +399,16 @@ protected IndexShard getIndexShard(String dataNode, String indexName) throws Exe IndexService indexService = indicesService.indexService(new Index(indexName, uuid)); return indexService.getShard(0); } + + protected void restore(boolean restoreAllShards, String... indices) { + if (restoreAllShards) { + assertAcked(client().admin().indices().prepareClose(indices)); + } + client().admin() + .cluster() + .restoreRemoteStore( + new RestoreRemoteStoreRequest().indices(indices).restoreAllShards(restoreAllShards), + PlainActionFuture.newFuture() + ); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 25146d53b5654..e1997fea3433a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -35,6 +35,7 @@ import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; @@ -705,7 +706,7 @@ public void testNoMultipleWriterDuringPrimaryRelocation() throws ExecutionExcept assertFalse(clusterHealthResponse.isTimedOut()); } - public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionException, InterruptedException { + public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionException, InterruptedException, IOException { // In this test, we fail the hand off during the primary relocation. This will undo the drainRefreshes and // drainSync performed as part of relocation handoff (before performing the handoff transport action). // We validate the same here by failing the peer recovery and ensuring we can index afterward as well. @@ -759,6 +760,46 @@ public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionExcep int moreDocs = randomIntBetween(5, 10); indexBulk(INDEX_NAME, moreDocs); flushAndRefresh(INDEX_NAME); + int uncommittedOps = randomIntBetween(5, 10); + indexBulk(INDEX_NAME, uncommittedOps); assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), docs + moreDocs); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + + restore(true, INDEX_NAME); + ensureGreen(INDEX_NAME); + assertHitCount( + client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + docs + moreDocs + uncommittedOps + ); + + String newNode = internalCluster().startDataOnlyNodes(1).get(0); + ensureStableCluster(3); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, newPrimary, newNode)) + .execute() + .actionGet(); + + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForStatus(ClusterHealthStatus.GREEN) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(TimeValue.timeValueSeconds(10)) + .execute() + .actionGet(); + assertFalse(clusterHealthResponse.isTimedOut()); + + ex = assertThrows( + SearchPhaseExecutionException.class, + () -> client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get() + ); + assertEquals("all shards failed", ex.getMessage()); + assertHitCount( + client(newNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + docs + moreDocs + uncommittedOps + ); } }