Skip to content

Commit

Permalink
[Remote Store] Add segment transfer timeout dynamic setting (#13679)
Browse files Browse the repository at this point in the history
* [Remote Store] Add segment transfer timeout dynamic setting

Signed-off-by: Varun Bansal <bansvaru@amazon.com>
  • Loading branch information
linuxpi committed May 23, 2024
1 parent 5441d55 commit b3049fb
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
- [Remote Store] Add dynamic cluster settings to set timeout for segments upload to Remote Store ([#13679](https://github.com/opensearch-project/OpenSearch/pull/13679))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3970,7 +3970,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
new RemoteStoreRefreshListener(
this,
this.checkpointPublisher,
remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId())
remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId()),
remoteStoreSettings
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -45,6 +46,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -89,11 +91,13 @@ public final class RemoteStoreRefreshListener extends ReleasableRetryableRefresh
private volatile long primaryTerm;
private volatile Iterator<TimeValue> backoffDelayIterator;
private final SegmentReplicationCheckpointPublisher checkpointPublisher;
private final RemoteStoreSettings remoteStoreSettings;

public RemoteStoreRefreshListener(
IndexShard indexShard,
SegmentReplicationCheckpointPublisher checkpointPublisher,
RemoteSegmentTransferTracker segmentTracker
RemoteSegmentTransferTracker segmentTracker,
RemoteStoreSettings remoteStoreSettings
) {
super(indexShard.getThreadPool());
logger = Loggers.getLogger(getClass(), indexShard.shardId());
Expand All @@ -116,6 +120,7 @@ public RemoteStoreRefreshListener(
this.segmentTracker = segmentTracker;
resetBackOffDelayIterator();
this.checkpointPublisher = checkpointPublisher;
this.remoteStoreSettings = remoteStoreSettings;
}

@Override
Expand Down Expand Up @@ -286,7 +291,12 @@ public void onFailure(Exception e) {

// Start the segments files upload
uploadNewSegments(localSegmentsPostRefresh, localSegmentsSizeMap, segmentUploadsCompletedListener);
latch.await();
if (latch.await(
remoteStoreSettings.getClusterRemoteSegmentTransferTimeout().millis(),
TimeUnit.MILLISECONDS
) == false) {
throw new SegmentUploadFailedException("Timeout while waiting for remote segment transfer to complete");
}
} catch (EngineException e) {
logger.warn("Exception while reading SegmentInfosSnapshot", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import java.io.IOException;

/**
* Exception to be thrown when a segment upload fails.
*
* @opensearch.internal
*/
public class SegmentUploadFailedException extends IOException {

/**
* Creates a new SegmentUploadFailedException.
*
* @param message error message
*/
public SegmentUploadFailedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,21 @@ public class RemoteStoreSettings {
Property.NodeScope
);

/**
* Controls timeout value while uploading segment files to remote segment store
*/
public static final Setting<TimeValue> CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.segment.transfer_timeout",
TimeValue.timeValueMinutes(30),
TimeValue.timeValueMinutes(10),
Property.NodeScope,
Property.Dynamic
);

private volatile TimeValue clusterRemoteTranslogBufferInterval;
private volatile int minRemoteSegmentMetadataFiles;
private volatile TimeValue clusterRemoteTranslogTransferTimeout;
private volatile TimeValue clusterRemoteSegmentTransferTimeout;
private volatile RemoteStoreEnums.PathType pathType;
private volatile RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm;
private volatile int maxRemoteTranslogReaders;
Expand Down Expand Up @@ -139,6 +151,12 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {

maxRemoteTranslogReaders = CLUSTER_REMOTE_MAX_TRANSLOG_READERS.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_TRANSLOG_READERS, this::setMaxRemoteTranslogReaders);

clusterRemoteSegmentTransferTimeout = CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING,
this::setClusterRemoteSegmentTransferTimeout
);
}

public TimeValue getClusterRemoteTranslogBufferInterval() {
Expand All @@ -161,10 +179,18 @@ public TimeValue getClusterRemoteTranslogTransferTimeout() {
return clusterRemoteTranslogTransferTimeout;
}

public TimeValue getClusterRemoteSegmentTransferTimeout() {
return clusterRemoteSegmentTransferTimeout;
}

private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) {
this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout;
}

private void setClusterRemoteSegmentTransferTimeout(TimeValue clusterRemoteSegmentTransferTimeout) {
this.clusterRemoteSegmentTransferTimeout = clusterRemoteSegmentTransferTimeout;
}

@ExperimentalApi
public RemoteStoreEnums.PathType getPathType() {
return pathType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.engine.InternalEngineFactory;
Expand All @@ -34,6 +35,7 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.indices.DefaultRemoteStoreSettings;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationType;
Expand Down Expand Up @@ -91,7 +93,12 @@ public void setup(boolean primary, int numberOfDocs) throws IOException {
remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, Settings.EMPTY);
remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard);
RemoteSegmentTransferTracker tracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId());
remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY, tracker);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(
indexShard,
SegmentReplicationCheckpointPublisher.EMPTY,
tracker,
DefaultRemoteStoreSettings.INSTANCE
);
}

private void indexDocs(int startDocId, int numberOfDocs) throws IOException {
Expand Down Expand Up @@ -176,7 +183,12 @@ public void testRemoteDirectoryInitThrowsException() throws IOException {
when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory);

// Since the thrown IOException is caught in the constructor, ctor should be invoked successfully.
new RemoteStoreRefreshListener(shard, SegmentReplicationCheckpointPublisher.EMPTY, mock(RemoteSegmentTransferTracker.class));
new RemoteStoreRefreshListener(
shard,
SegmentReplicationCheckpointPublisher.EMPTY,
mock(RemoteSegmentTransferTracker.class),
DefaultRemoteStoreSettings.INSTANCE
);

// Validate that the stream of metadata file of remoteMetadataDirectory has been opened only once and the
// listFilesByPrefixInLexicographicOrder has been called twice.
Expand Down Expand Up @@ -371,6 +383,33 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception {
assertNoLagAndTotalUploadsFailed(segmentTracker, 1);
}

public void testSegmentUploadTimeout() throws Exception {
// This covers the case where segment upload fails due to timeout
int succeedOnAttempt = 1;
// We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
CountDownLatch successLatch = new CountDownLatch(2);
Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> tuple = mockIndexShardWithRetryAndScheduleRefresh(
succeedOnAttempt,
refreshCountLatch,
successLatch,
1,
new CountDownLatch(0),
true,
true
);
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
assertBusy(() -> assertEquals(1, successLatch.getCount()));
RemoteStoreStatsTrackerFactory trackerFactory = tuple.v2();
RemoteSegmentTransferTracker segmentTracker = trackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId());
assertBusy(() -> {
assertTrue(segmentTracker.getTotalUploadsFailed() > 1);
assertTrue(segmentTracker.getTotalUploadsSucceeded() < 2);
});
// shutdown threadpool for avoid leaking threads
indexShard.getThreadPool().shutdownNow();
}

/**
* Tests retry flow after snapshot and metadata files have been uploaded to remote store in the failed attempt.
* Snapshot and metadata files created in failed attempt should not break retry.
Expand Down Expand Up @@ -470,6 +509,7 @@ public void testRefreshFailedDueToPrimaryTermMisMatch() throws Exception {
successLatch,
checkpointPublishSucceedOnAttempt,
reachedCheckpointPublishLatch,
false,
false
);

Expand Down Expand Up @@ -521,7 +561,8 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
successLatch,
succeedCheckpointPublishOnAttempt,
reachedCheckpointPublishLatch,
true
true,
false
);
}

Expand All @@ -531,7 +572,8 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
CountDownLatch successLatch,
int succeedCheckpointPublishOnAttempt,
CountDownLatch reachedCheckpointPublishLatch,
boolean mockPrimaryTerm
boolean mockPrimaryTerm,
boolean testUploadTimeout
) throws IOException {
// Create index shard that we will be using to mock different methods in IndexShard for the unit test
indexShard = newStartedShard(
Expand Down Expand Up @@ -565,9 +607,22 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
// Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
Store remoteStore = mock(Store.class);
when(shard.remoteStore()).thenReturn(remoteStore);
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
(RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate())
.getDelegate();
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory;
RemoteDirectory remoteDirectory = mock(RemoteDirectory.class);

if (testUploadTimeout) {
remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(
remoteDirectory,
mock(RemoteDirectory.class),
mock(RemoteStoreLockManager.class),
indexShard.getThreadPool(),
indexShard.shardId
);
} else {
remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore()
.directory()).getDelegate()).getDelegate();
}

FilterDirectory remoteStoreFilterDirectory = new TestFilterDirectory(new TestFilterDirectory(remoteSegmentStoreDirectory));
when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory);

Expand Down Expand Up @@ -639,7 +694,28 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
RemoteStoreSettings remoteStoreSettings = mock(RemoteStoreSettings.class);
when(remoteStoreSettings.getMinRemoteSegmentMetadataFiles()).thenReturn(10);
when(shard.getRemoteStoreSettings()).thenReturn(remoteStoreSettings);
RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker);
if (testUploadTimeout) {
when(remoteStoreSettings.getClusterRemoteSegmentTransferTimeout()).thenReturn(TimeValue.timeValueMillis(10));
doAnswer(invocation -> {
ActionListener<Void> actionListener = invocation.getArgument(5);
indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
logger.warn("copyFrom thread interrupted during sleep");
}
actionListener.onResponse(null);
});
return true;
}).when(remoteDirectory).copyFrom(any(), any(), any(), any(), any(), any(ActionListener.class), any(Boolean.class));
}

RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(
shard,
emptyCheckpointPublisher,
tracker,
remoteStoreSettings
);
refreshListener.afterRefresh(true);
return Tuple.tuple(refreshListener, remoteStoreStatsTrackerFactory);
}
Expand Down

0 comments on commit b3049fb

Please sign in to comment.