From 40eb0bd75a14089fcf4496081b6d91fe7b004f33 Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Wed, 5 Apr 2023 15:12:14 +0530 Subject: [PATCH] Allow replica to fetch segments from remote store instead of leader node Signed-off-by: Ankit Kala --- .../opensearch/index/shard/IndexShardIT.java | 3 +- .../org/opensearch/index/IndexService.java | 7 +- .../index/engine/NRTReplicationEngine.java | 2 +- .../shard/CheckpointRefreshListener.java | 3 +- .../opensearch/index/shard/IndexShard.java | 39 +++--- .../shard/RemoteStoreRefreshListener.java | 13 +- ...oreSegmentUploadNotificationPublisher.java | 37 ++++++ .../opensearch/index/shard/StoreRecovery.java | 2 +- .../store/RemoteSegmentStoreDirectory.java | 7 +- .../org/opensearch/index/store/Store.java | 3 +- .../org/opensearch/indices/IndicesModule.java | 6 + .../opensearch/indices/IndicesService.java | 7 +- .../cluster/IndicesClusterStateService.java | 18 ++- .../recovery/PeerRecoveryTargetService.java | 2 +- .../PrimaryShardReplicationSource.java | 5 +- .../RemoteStoreReplicationSource.java | 92 ++++++++++++++ .../replication/SegmentReplicationSource.java | 6 +- .../SegmentReplicationSourceFactory.java | 19 +-- .../replication/SegmentReplicationTarget.java | 119 ++++++++++-------- .../RemoteStoreRefreshListenerTests.java | 2 +- .../SegmentReplicationIndexShardTests.java | 8 +- ...dicesLifecycleListenerSingleNodeTests.java | 4 +- ...actIndicesClusterStateServiceTestCase.java | 25 ++-- ...ClusterStateServiceRandomUpdatesTests.java | 4 +- .../PrimaryShardReplicationSourceTests.java | 6 +- .../SegmentReplicationTargetServiceTests.java | 2 +- .../SegmentReplicationTargetTests.java | 14 +-- .../snapshots/SnapshotResiliencyTests.java | 3 +- .../replication/TestReplicationSource.java | 3 +- .../index/shard/IndexShardTestCase.java | 5 +- 30 files changed, 329 insertions(+), 137 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/shard/RemoteStoreSegmentUploadNotificationPublisher.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 020985cc9668c..ab4b772641ba9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -678,7 +678,8 @@ public static final IndexShard newIndexShard( cbs, (indexSettings, shardRouting) -> new InternalTranslogFactory(), SegmentReplicationCheckpointPublisher.EMPTY, - null + null, + RemoteStoreSegmentUploadNotificationPublisher.EMPTY ); } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 7d791ace44682..86d3ea746a158 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -81,6 +81,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.index.shard.IndexingOperationListener; +import org.opensearch.index.shard.RemoteStoreSegmentUploadNotificationPublisher; import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardNotFoundException; @@ -438,7 +439,8 @@ public synchronized IndexShard createShard( final ShardRouting routing, final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final SegmentReplicationCheckpointPublisher checkpointPublisher + final SegmentReplicationCheckpointPublisher checkpointPublisher, + final RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -506,7 +508,8 @@ public synchronized IndexShard createShard( circuitBreakerService, translogFactorySupplier, this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, - remoteStore + remoteStore, + remoteSegmentNotificationPublisher ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index da3f914d8bd7e..67bc95d0db0f9 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -485,7 +485,7 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException { @Override public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { - throw new UnsupportedOperationException("Read only replicas do not have an IndexWriter and cannot recover from a translog."); + return this; } @Override diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index 66d095878d123..8ee56b68f8ef8 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -40,7 +40,8 @@ public void beforeRefresh() throws IOException { @Override public void afterRefresh(boolean didRefresh) throws IOException { - if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) { + if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode() + && !shard.indexSettings.isRemoteStoreEnabled()) { publisher.publish(shard, shard.getLatestReplicationCheckpoint()); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 9d06ce7c6a391..1b1fdd72e9f46 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -327,6 +327,7 @@ Runnable getGlobalCheckpointSyncer() { private final Store remoteStore; private final BiFunction translogFactorySupplier; + private final RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher; public IndexShard( final ShardRouting shardRouting, @@ -351,7 +352,8 @@ public IndexShard( final CircuitBreakerService circuitBreakerService, final BiFunction translogFactorySupplier, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, - @Nullable final Store remoteStore + @Nullable final Store remoteStore, + RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -403,6 +405,7 @@ public IndexShard( this.pendingPrimaryTerm = primaryTerm; this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.scheduler(), logger); this.pendingReplicationActions = new PendingReplicationActions(shardId, threadPool); + this.remoteSegmentNotificationPublisher = remoteSegmentNotificationPublisher; this.replicationTracker = new ReplicationTracker( shardId, aId, @@ -2182,7 +2185,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t assert currentEngineReference.get() == null : "engine is running"; verifyNotClosed(); if (indexSettings.isRemoteStoreEnabled()) { - syncSegmentsFromRemoteSegmentStore(false); + syncSegmentsFromRemoteSegmentStore(false, true, false); } // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = engineFactory.newReadWriteEngine(config); @@ -3090,6 +3093,7 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S * When remote translog is enabled for an index, replication operation is limited to primary term validation and does not * update local checkpoint at replica, so the local checkpoint at replica can be less than globalCheckpoint. */ + assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED) || indexSettings.isRemoteTranslogStoreEnabled() : "supposedly in-sync shard copy received a global checkpoint [" + globalCheckpoint @@ -3499,11 +3503,12 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro final List internalRefreshListener = new ArrayList<>(); internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); if (isRemoteStoreEnabled()) { - internalRefreshListener.add(new RemoteStoreRefreshListener(this)); + internalRefreshListener.add(new RemoteStoreRefreshListener(this, remoteSegmentNotificationPublisher)); } - if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) { + if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary() && !indexSettings.isRemoteStoreEnabled()) { internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); } + /** * With segment replication enabled for primary relocation, recover replica shard initially as read only and * change to a writeable engine during relocation handoff after a round of segment replication. @@ -4078,7 +4083,7 @@ EngineConfigFactory getEngineConfigFactory() { } // for tests - ReplicationTracker getReplicationTracker() { + public ReplicationTracker getReplicationTracker() { return replicationTracker; } @@ -4347,7 +4352,7 @@ public void close() throws IOException { }; IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); if (indexSettings.isRemoteStoreEnabled()) { - syncSegmentsFromRemoteSegmentStore(false); + syncSegmentsFromRemoteSegmentStore(false, true, false); } newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); @@ -4380,23 +4385,14 @@ public void close() throws IOException { onSettingsChanged(); } - /** - * Downloads segments from remote segment store. This method will download segments till - * last refresh checkpoint. - * @param overrideLocal flag to override local segment files with those in remote store - * @throws IOException if exception occurs while reading segments from remote store - */ - public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException { - syncSegmentsFromRemoteSegmentStore(overrideLocal, true); - } - /** * Downloads segments from remote segment store. * @param overrideLocal flag to override local segment files with those in remote store * @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise + * @param shouldCommit if the shard requires committing the changes after sync from remote. * @throws IOException if exception occurs while reading segments from remote store */ - public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException { + public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit) throws IOException { assert indexSettings.isRemoteStoreEnabled(); logger.info("Downloading segments from remote segment store"); assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; @@ -4448,6 +4444,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re skippedSegments.add(file); } } + if (refreshLevelSegmentSync && segmentInfosSnapshotFilename != null) { try ( ChecksumIndexInput indexInput = new BufferedChecksumIndexInput( @@ -4460,7 +4457,13 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re Long.parseLong(segmentInfosSnapshotFilename.split("__")[1]) ); long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); - store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); + if (shouldCommit) { + finalizeReplication(infosSnapshot); + store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infosSnapshot); + } + else { + store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); + } } } } catch (IOException e) { diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index ee3b392472fa0..e653cba2cf4c8 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -25,6 +25,8 @@ import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.io.IOException; import java.util.Collection; @@ -57,14 +59,16 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private final RemoteSegmentStoreDirectory remoteDirectory; private final Map localSegmentChecksumMap; private long primaryTerm; + private final RemoteStoreSegmentUploadNotificationPublisher notificationPublisher; private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class); - public RemoteStoreRefreshListener(IndexShard indexShard) { + public RemoteStoreRefreshListener(IndexShard indexShard, RemoteStoreSegmentUploadNotificationPublisher notificationPublisher) { this.indexShard = indexShard; this.storeDirectory = indexShard.store().directory(); this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) .getDelegate()).getDelegate(); this.primaryTerm = indexShard.getOperationPrimaryTerm(); + this.notificationPublisher = notificationPublisher; localSegmentChecksumMap = new HashMap<>(); if (indexShard.shardRouting.primary()) { try { @@ -103,6 +107,9 @@ public void afterRefresh(boolean didRefresh) { deleteStaleCommits(); } + // Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can move. + ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint(); + String segmentInfoSnapshotFilename = null; try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); @@ -148,6 +155,10 @@ public void afterRefresh(boolean didRefresh) { .lastRefreshedCheckpoint(); ((InternalEngine) indexShard.getEngine()).translogManager() .setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); + + if (!RemoteStoreSegmentUploadNotificationPublisher.EMPTY.equals(notificationPublisher)) { + notificationPublisher.notifySegmentUpload(indexShard, checkpoint); + } } } } catch (EngineException e) { diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreSegmentUploadNotificationPublisher.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreSegmentUploadNotificationPublisher.java new file mode 100644 index 0000000000000..81ad3abae8354 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreSegmentUploadNotificationPublisher.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.opensearch.common.inject.Inject; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; + + +/** + * Hook to publish notification after primary uploads segments to the remote store. + * + * @opensearch.internal + */ +public class RemoteStoreSegmentUploadNotificationPublisher { + private final SegmentReplicationCheckpointPublisher segRepPublisher; + + @Inject + public RemoteStoreSegmentUploadNotificationPublisher(SegmentReplicationCheckpointPublisher segRepPublisher) { + this.segRepPublisher = segRepPublisher; + } + + public void notifySegmentUpload(IndexShard indexShard, ReplicationCheckpoint checkpoint) { + // TODO: Add separate publisher for CCR. + // we don't call indexShard.getLatestReplicationCheckpoint() as it might have a newer refreshed checkpoint. + // Instead we send the one which has been uploaded to remote store. + if (segRepPublisher != null) segRepPublisher.publish(indexShard, checkpoint); + } + + public static final RemoteStoreSegmentUploadNotificationPublisher EMPTY = new RemoteStoreSegmentUploadNotificationPublisher(null); +} diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 31a863129cc8c..6c2336501e64f 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -457,7 +457,7 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository remoteStore.incRef(); try { // Download segments from remote segment store - indexShard.syncSegmentsFromRemoteSegmentStore(true); + indexShard.syncSegmentsFromRemoteSegmentStore(true, true, false); if (store.directory().listAll().length == 0) { store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index c385303813844..ffb83f9febe28 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -118,7 +118,7 @@ public void init() throws IOException { * @return Map of segment filename to uploaded filename with checksum * @throws IOException if there were any failures in reading the metadata file */ - private Map readLatestMetadataFile() throws IOException { + public Map readLatestMetadataFile() throws IOException { Map segmentMetadataMap = new HashMap<>(); Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX); @@ -149,7 +149,6 @@ private Map readMetadataFile(String metadataFil public static class UploadedSegmentMetadata { // Visible for testing static final String SEPARATOR = "::"; - private final String originalFilename; private final String uploadedFilename; private final String checksum; @@ -179,6 +178,10 @@ public static UploadedSegmentMetadata fromString(String uploadedFilename) { String[] values = uploadedFilename.split(SEPARATOR); return new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3])); } + + public String getOriginalFilename() { + return originalFilename; + } } /** diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index f923532b3d9ad..0c9437682c07d 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -399,7 +399,8 @@ public static RecoveryDiff segmentReplicationDiff(Map missing.add(value); } else { final StoreFileMetadata fileMetadata = target.get(value.name()); - if (fileMetadata.isSame(value)) { + // match segments using checksum + if (fileMetadata.checksum().equals(value.checksum())) { identical.add(value); } else { different.add(value); diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index 5310e1b1e8397..9da05c10408ca 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -73,6 +73,7 @@ import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.shard.PrimaryReplicaSyncer; +import org.opensearch.index.shard.RemoteStoreSegmentUploadNotificationPublisher; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; @@ -287,6 +288,11 @@ protected void configure() { } else { bind(SegmentReplicationCheckpointPublisher.class).toInstance(SegmentReplicationCheckpointPublisher.EMPTY); } + if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { + bind(RemoteStoreSegmentUploadNotificationPublisher.class).asEagerSingleton(); + } else { + bind(RemoteStoreSegmentUploadNotificationPublisher.class).toInstance(RemoteStoreSegmentUploadNotificationPublisher.EMPTY); + } } /** diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index b0d488a5b2cf7..7a249c35e0e74 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -135,6 +135,7 @@ import org.opensearch.index.shard.IndexingStats; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.remote.filecache.FileCacheCleaner; +import org.opensearch.index.shard.RemoteStoreSegmentUploadNotificationPublisher; import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.TranslogFactory; @@ -161,7 +162,6 @@ import org.opensearch.search.query.QueryPhase; import org.opensearch.search.query.QuerySearchResult; import org.opensearch.threadpool.ThreadPool; - import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -1023,14 +1023,15 @@ public IndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, - final DiscoveryNode sourceNode + final DiscoveryNode sourceNode, + final RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher); + IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher, remoteSegmentNotificationPublisher); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 439191f7635d9..299a4ff03df87 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -74,6 +74,7 @@ import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask; +import org.opensearch.index.shard.RemoteStoreSegmentUploadNotificationPublisher; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardNotFoundException; import org.opensearch.indices.IndicesService; @@ -144,6 +145,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final RetentionLeaseSyncer retentionLeaseSyncer; private final SegmentReplicationTargetService segmentReplicationTargetService; + private final RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher; private final SegmentReplicationCheckpointPublisher checkpointPublisher; @@ -165,7 +167,8 @@ public IndicesClusterStateService( final PrimaryReplicaSyncer primaryReplicaSyncer, final GlobalCheckpointSyncAction globalCheckpointSyncAction, final RetentionLeaseSyncer retentionLeaseSyncer, - final SegmentReplicationCheckpointPublisher checkpointPublisher + final SegmentReplicationCheckpointPublisher checkpointPublisher, + final RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher ) { this( settings, @@ -184,7 +187,8 @@ public IndicesClusterStateService( snapshotShardsService, primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard, - retentionLeaseSyncer + retentionLeaseSyncer, + remoteSegmentNotificationPublisher ); } @@ -206,7 +210,8 @@ public IndicesClusterStateService( final SnapshotShardsService snapshotShardsService, final PrimaryReplicaSyncer primaryReplicaSyncer, final Consumer globalCheckpointSyncer, - final RetentionLeaseSyncer retentionLeaseSyncer + final RetentionLeaseSyncer retentionLeaseSyncer, + final RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher ) { this.settings = settings; this.checkpointPublisher = checkpointPublisher; @@ -232,6 +237,7 @@ public IndicesClusterStateService( this.globalCheckpointSyncer = globalCheckpointSyncer; this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer); this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); + this.remoteSegmentNotificationPublisher = remoteSegmentNotificationPublisher; } @Override @@ -661,7 +667,8 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR globalCheckpointSyncer, retentionLeaseSyncer, nodes.getLocalNode(), - sourceNode + sourceNode, + remoteSegmentNotificationPublisher ); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); @@ -1019,7 +1026,8 @@ T createShard( Consumer globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, DiscoveryNode targetNode, - @Nullable DiscoveryNode sourceNode + @Nullable DiscoveryNode sourceNode, + @Nullable RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher ) throws IOException; /** diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 22614ee7ca063..6693f1f66e8c6 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -246,7 +246,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi indexShard.prepareForIndexRecovery(); final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled(); if (hasRemoteSegmentStore) { - indexShard.syncSegmentsFromRemoteSegmentStore(false, false); + indexShard.syncSegmentsFromRemoteSegmentStore(false, false, false); } final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot(); diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index b211d81c1c76a..5e793d0d0f214 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -13,6 +13,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.io.stream.Writeable; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.RecoverySettings; @@ -79,7 +80,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { final Writeable.Reader reader = GetSegmentFilesResponse::new; @@ -99,7 +100,7 @@ public void getSegmentFiles( @Override public String getDescription() { - return sourceNode.getName(); + return "remote store"; } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java new file mode 100644 index 0000000000000..8d3ffa9461c08 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.util.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.recovery.RetryableTransportClient; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.transport.TransportResponse; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO; + +/** + * Implementation of a {@link SegmentReplicationSource} where the source is remote store. + * + * @opensearch.internal + */ +public class RemoteStoreReplicationSource implements SegmentReplicationSource { + + private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class); + + private final IndexShard indexShard; + + public RemoteStoreReplicationSource(IndexShard indexShard) { + this.indexShard = indexShard; + } + + @Override + public void getCheckpointMetadata(long replicationId, ReplicationCheckpoint checkpoint, ActionListener listener) { + FilterDirectory remoteStoreDirectory = (FilterDirectory) indexShard.remoteStore().directory(); + FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); + RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate(); + + Map metadataMap = null; + // TODO: Need to figure out a way to pass this information for segment metadata via remote store. + final Version version = indexShard.getSegmentInfosSnapshot().get().getCommitLuceneVersion(); + try { + metadataMap = remoteDirectory.readLatestMetadataFile().entrySet().stream().collect(Collectors.toMap( + e -> e.getKey(), + e -> new StoreFileMetadata(e.getValue().getOriginalFilename(), e.getValue().getLength(), + Store.digestToString(Long.valueOf(e.getValue().getChecksum())), version, null))); + } catch (IOException e) { + logger.error("Error fetching checkpoint metadata from remote store {}", e); + e.printStackTrace(); + } + // TODO: GET current checkpoint from remote store. + listener.onResponse(new CheckpointInfoResponse(checkpoint, metadataMap, null)); + } + + @Override + public void getSegmentFiles(long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, ActionListener listener) { + try { + indexShard.syncSegmentsFromRemoteSegmentStore(false, true, true); + } catch (IOException e) { + logger.error("Failed to sync segments {}", e); + listener.onFailure(e); + return; + } + listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); + } + + @Override + public String getDescription() { + return null; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java index 2fa74819fe4de..fbdba81f31837 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java @@ -10,10 +10,12 @@ import org.opensearch.action.ActionListener; import org.opensearch.common.util.CancellableThreads.ExecutionCancelledException; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import java.io.IOException; import java.util.List; /** @@ -38,14 +40,14 @@ public interface SegmentReplicationSource { * @param replicationId {@link long} - ID of the replication event. * @param checkpoint {@link ReplicationCheckpoint} Checkpoint to fetch metadata for. * @param filesToFetch {@link List} List of files to fetch. - * @param store {@link Store} Reference to the local store. + * @param indexShard {@link IndexShard} Reference to the IndexShard. * @param listener {@link ActionListener} Listener that completes with the list of files copied. */ void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java index 1867fc59c5a56..af447d3df3a48 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -38,13 +38,18 @@ public SegmentReplicationSourceFactory( } public SegmentReplicationSource get(IndexShard shard) { - return new PrimaryShardReplicationSource( - shard.recoveryState().getTargetNode(), - shard.routingEntry().allocationId().getId(), - transportService, - recoverySettings, - getPrimaryNode(shard.shardId()) - ); + if(shard.indexSettings().isRemoteStoreEnabled()) { + return new RemoteStoreReplicationSource(shard); + } + else { + return new PrimaryShardReplicationSource( + shard.recoveryState().getTargetNode(), + shard.routingEntry().allocationId().getId(), + transportService, + recoverySettings, + getPrimaryNode(shard.shardId()) + ); + } } private DiscoveryNode getPrimaryNode(ShardId shardId) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 995ec58d8768f..8a1229bd9831a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Map; /** * Represents the target of a replication event. @@ -209,73 +210,81 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener listener) { - ActionListener.completeWith(listener, () -> { - cancellableThreads.checkForCancel(); - state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); - Store store = null; - try { - multiFileWriter.renameAllTempFiles(); - store = store(); - store.incRef(); - // Deserialize the new SegmentInfos object sent from the primary. - final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); - SegmentInfos infos = SegmentInfos.readCommit( - store.directory(), - toIndexInput(checkpointInfoResponse.getInfosBytes()), - responseCheckpoint.getSegmentsGen() - ); + if (source instanceof RemoteStoreReplicationSource) { + ActionListener.completeWith(listener, () -> { + state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); + return null; + }); + } else { + ActionListener.completeWith(listener, () -> { cancellableThreads.checkForCancel(); - indexShard.finalizeReplication(infos); - store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos); - } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { - // this is a fatal exception at this stage. - // this means we transferred files from the remote that have not be checksummed and they are - // broken. We have to clean up this shard entirely, remove all files and bubble it up to the - // source shard since this index might be broken there as well? The Source can handle this and checks - // its content on disk if possible. + state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); + Store store = null; try { + multiFileWriter.renameAllTempFiles(); + store = store(); + store.incRef(); + // Deserialize the new SegmentInfos object sent from the primary. + final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); + SegmentInfos infos = SegmentInfos.readCommit( + store.directory(), + toIndexInput(checkpointInfoResponse.getInfosBytes()), + responseCheckpoint.getSegmentsGen() + ); + cancellableThreads.checkForCancel(); + indexShard.finalizeReplication(infos); + store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos); + } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { + // this is a fatal exception at this stage. + // this means we transferred files from the remote that have not be checksummed and they are + // broken. We have to clean up this shard entirely, remove all files and bubble it up to the + // source shard since this index might be broken there as well? The Source can handle this and checks + // its content on disk if possible. try { - store.removeCorruptionMarker(); - } finally { - Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files + try { + store.removeCorruptionMarker(); + } finally { + Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files + } + } catch (Exception e) { + logger.debug("Failed to clean lucene index", e); + ex.addSuppressed(e); } - } catch (Exception e) { - logger.debug("Failed to clean lucene index", e); - ex.addSuppressed(e); - } - ReplicationFailedException rfe = new ReplicationFailedException( - indexShard.shardId(), - "failed to clean after replication", - ex - ); - fail(rfe, true); - throw rfe; - } catch (OpenSearchException ex) { + ReplicationFailedException rfe = new ReplicationFailedException( + indexShard.shardId(), + "failed to clean after replication", + ex + ); + fail(rfe, true); + throw rfe; + } catch (OpenSearchException ex) { /* Ignore closed replication target as it can happen due to index shard closed event in a separate thread. In such scenario, ignore the exception */ - assert cancellableThreads.isCancelled() : "Replication target closed but segment replication not cancelled"; - logger.info("Replication target closed", ex); - } catch (Exception ex) { - ReplicationFailedException rfe = new ReplicationFailedException( - indexShard.shardId(), - "failed to clean after replication", - ex - ); - fail(rfe, true); - throw rfe; - } finally { - if (store != null) { - store.decRef(); + assert cancellableThreads.isCancelled() : "Replication target closed but segment replication not cancelled"; + logger.info("Replication target closed", ex); + } catch (Exception ex) { + ReplicationFailedException rfe = new ReplicationFailedException( + indexShard.shardId(), + "failed to clean after replication", + ex + ); + fail(rfe, true); + throw rfe; + } finally { + if (store != null) { + store.decRef(); + } } - } - return null; - }); + return null; + }); + } } /** diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index c9b8c023e26aa..818a8f4d61856 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -47,7 +47,7 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { indexDocs(1, numberOfDocs); indexShard.refresh("test"); - remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard); + remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, RemoteStoreSegmentUploadNotificationPublisher.EMPTY); } private void indexDocs(int startDocId, int numberOfDocs) throws IOException { diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 014a37249612b..9b899b69abda4 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -833,7 +833,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); @@ -903,7 +903,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { Assert.fail("Should not be reached"); @@ -943,7 +943,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { // randomly resolve the listener, indicating the source has resolved. @@ -985,7 +985,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) {} }; diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 0989bf869f18e..f25ee8c0beb04 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -46,6 +46,7 @@ import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.shard.RemoteStoreSegmentUploadNotificationPublisher; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.opensearch.indices.recovery.RecoveryState; @@ -153,7 +154,8 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, - SegmentReplicationCheckpointPublisher.EMPTY + SegmentReplicationCheckpointPublisher.EMPTY, + RemoteStoreSegmentUploadNotificationPublisher.EMPTY ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 0619e3e3f62a2..132bf8c2dffdf 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -51,6 +51,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask; +import org.opensearch.index.shard.RemoteStoreSegmentUploadNotificationPublisher; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; import org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndex; @@ -252,18 +253,18 @@ public MockIndexService indexService(Index index) { } @Override - public MockIndexShard createShard( - final ShardRouting shardRouting, - final SegmentReplicationCheckpointPublisher checkpointPublisher, - final PeerRecoveryTargetService recoveryTargetService, - final RecoveryListener recoveryListener, - final RepositoriesService repositoriesService, - final Consumer onShardFailure, - final Consumer globalCheckpointSyncer, - final RetentionLeaseSyncer retentionLeaseSyncer, - final DiscoveryNode targetNode, - final DiscoveryNode sourceNode - ) throws IOException { + public MockIndexShard createShard(ShardRouting shardRouting, + SegmentReplicationCheckpointPublisher checkpointPublisher, + PeerRecoveryTargetService recoveryTargetService, + RecoveryListener recoveryListener, + RepositoriesService repositoriesService, + Consumer onShardFailure, + Consumer globalCheckpointSyncer, + RetentionLeaseSyncer retentionLeaseSyncer, + DiscoveryNode targetNode, + DiscoveryNode sourceNode, + RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher) + throws IOException { failRandomly(); RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode); MockIndexService indexService = indexService(recoveryState.getShardId().getIndex()); diff --git a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 22481b5a7b99f..e43ad291c2d76 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -64,6 +64,7 @@ import org.opensearch.index.Index; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.PrimaryReplicaSyncer; +import org.opensearch.index.shard.RemoteStoreSegmentUploadNotificationPublisher; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.replication.SegmentReplicationSourceService; @@ -583,7 +584,8 @@ private IndicesClusterStateService createIndicesClusterStateService( null, primaryReplicaSyncer, s -> {}, - RetentionLeaseSyncer.EMPTY + RetentionLeaseSyncer.EMPTY, + RemoteStoreSegmentUploadNotificationPublisher.EMPTY ); } diff --git a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java index d925956bd95ef..3b06a4ffa0f69 100644 --- a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java @@ -110,7 +110,7 @@ public void testGetSegmentFiles() { REPLICATION_ID, checkpoint, Arrays.asList(testMetadata), - mock(Store.class), + mock(IndexShard.class), mock(ActionListener.class) ); CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); @@ -132,7 +132,7 @@ public void testTransportTimeoutForGetSegmentFilesAction() { REPLICATION_ID, checkpoint, Arrays.asList(testMetadata), - mock(Store.class), + mock(IndexShard.class), mock(ActionListener.class) ); CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); @@ -151,7 +151,7 @@ public void testGetSegmentFiles_CancelWhileRequestOpen() throws InterruptedExcep REPLICATION_ID, checkpoint, Arrays.asList(testMetadata), - mock(Store.class), + mock(IndexShard.class), new ActionListener<>() { @Override public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index bae0afb5bcc3b..671e067aa32b1 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -130,7 +130,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { Assert.fail("Should not be called"); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 599e73b548ddb..ffb29cbb5d4c3 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -127,7 +127,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { assertEquals(1, filesToFetch.size()); @@ -178,7 +178,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -221,7 +221,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onFailure(exception); @@ -264,7 +264,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -309,7 +309,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -353,7 +353,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -404,7 +404,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 6c4b636e3c002..ab9a957b2d722 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -175,6 +175,7 @@ import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.PrimaryReplicaSyncer; +import org.opensearch.index.shard.RemoteStoreSegmentUploadNotificationPublisher; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.remote.filecache.FileCacheCleaner; import org.opensearch.indices.IndicesModule; @@ -1940,7 +1941,7 @@ public void onFailure(final Exception e) { actionFilters ), RetentionLeaseSyncer.EMPTY, - SegmentReplicationCheckpointPublisher.EMPTY + RemoteStoreSegmentUploadNotificationPublisher.EMPTY ); Map actions = new HashMap<>(); final SystemIndices systemIndices = new SystemIndices(emptyMap()); diff --git a/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java b/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java index a3adedcbdef86..f6c0331d2056a 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java @@ -9,6 +9,7 @@ package org.opensearch.index.replication; import org.opensearch.action.ActionListener; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.CheckpointInfoResponse; @@ -35,7 +36,7 @@ public abstract void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index ab0cf38f77c7d..c8c5ff1f58f5e 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -600,7 +600,8 @@ protected IndexShard newShard( breakerService, translogFactorySupplier, checkpointPublisher, - remoteStore + remoteStore, + new RemoteStoreSegmentUploadNotificationPublisher(null) ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; @@ -1315,7 +1316,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { try (