Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow replica to fetch segments from remote store instead of leader node #7028

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,8 @@ public static final IndexShard newIndexShard(
cbs,
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null
null,
RemoteStoreSegmentUploadNotificationPublisher.EMPTY
);
}

Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.RemoteStoreSegmentUploadNotificationPublisher;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardNotFoundException;
Expand Down Expand Up @@ -438,7 +439,8 @@ public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -506,7 +508,8 @@ public synchronized IndexShard createShard(
circuitBreakerService,
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
remoteStore,
remoteSegmentNotificationPublisher
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {

@Override
public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
throw new UnsupportedOperationException("Read only replicas do not have an IndexWriter and cannot recover from a translog.");
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) {
if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()
&& !shard.indexSettings.isRemoteStoreEnabled()) {
publisher.publish(shard, shard.getLatestReplicationCheckpoint());
}
}
Expand Down
39 changes: 21 additions & 18 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ Runnable getGlobalCheckpointSyncer() {

private final Store remoteStore;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher;

public IndexShard(
final ShardRouting shardRouting,
Expand All @@ -351,7 +352,8 @@ public IndexShard(
final CircuitBreakerService circuitBreakerService,
final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore
@Nullable final Store remoteStore,
RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -403,6 +405,7 @@ public IndexShard(
this.pendingPrimaryTerm = primaryTerm;
this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.scheduler(), logger);
this.pendingReplicationActions = new PendingReplicationActions(shardId, threadPool);
this.remoteSegmentNotificationPublisher = remoteSegmentNotificationPublisher;
this.replicationTracker = new ReplicationTracker(
shardId,
aId,
Expand Down Expand Up @@ -2182,7 +2185,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
syncSegmentsFromRemoteSegmentStore(false, true, false);
}
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = engineFactory.newReadWriteEngine(config);
Expand Down Expand Up @@ -3090,6 +3093,7 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S
* When remote translog is enabled for an index, replication operation is limited to primary term validation and does not
* update local checkpoint at replica, so the local checkpoint at replica can be less than globalCheckpoint.
*/

assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED)
|| indexSettings.isRemoteTranslogStoreEnabled() : "supposedly in-sync shard copy received a global checkpoint ["
+ globalCheckpoint
Expand Down Expand Up @@ -3499,11 +3503,12 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (isRemoteStoreEnabled()) {
internalRefreshListener.add(new RemoteStoreRefreshListener(this));
internalRefreshListener.add(new RemoteStoreRefreshListener(this, remoteSegmentNotificationPublisher));
}
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) {
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary() && !indexSettings.isRemoteStoreEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}

/**
* With segment replication enabled for primary relocation, recover replica shard initially as read only and
* change to a writeable engine during relocation handoff after a round of segment replication.
Expand Down Expand Up @@ -4078,7 +4083,7 @@ EngineConfigFactory getEngineConfigFactory() {
}

// for tests
ReplicationTracker getReplicationTracker() {
public ReplicationTracker getReplicationTracker() {
return replicationTracker;
}

Expand Down Expand Up @@ -4347,7 +4352,7 @@ public void close() throws IOException {
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
syncSegmentsFromRemoteSegmentStore(false, true, false);
}
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
Expand Down Expand Up @@ -4380,23 +4385,14 @@ public void close() throws IOException {
onSettingsChanged();
}

/**
* Downloads segments from remote segment store. This method will download segments till
* last refresh checkpoint.
* @param overrideLocal flag to override local segment files with those in remote store
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException {
syncSegmentsFromRemoteSegmentStore(overrideLocal, true);
}

/**
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
* @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise
* @param shouldCommit if the shard requires committing the changes after sync from remote.
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException {
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit) throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
Expand Down Expand Up @@ -4448,6 +4444,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
skippedSegments.add(file);
}
}

if (refreshLevelSegmentSync && segmentInfosSnapshotFilename != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
Expand All @@ -4460,7 +4457,13 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
Long.parseLong(segmentInfosSnapshotFilename.split("__")[1])
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
if (shouldCommit) {
finalizeReplication(infosSnapshot);
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infosSnapshot);
}
else {
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
}
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -57,14 +59,16 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
private final RemoteSegmentStoreDirectory remoteDirectory;
private final Map<String, String> localSegmentChecksumMap;
private long primaryTerm;
private final RemoteStoreSegmentUploadNotificationPublisher notificationPublisher;
private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class);

public RemoteStoreRefreshListener(IndexShard indexShard) {
public RemoteStoreRefreshListener(IndexShard indexShard, RemoteStoreSegmentUploadNotificationPublisher notificationPublisher) {
this.indexShard = indexShard;
this.storeDirectory = indexShard.store().directory();
this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
.getDelegate()).getDelegate();
this.primaryTerm = indexShard.getOperationPrimaryTerm();
this.notificationPublisher = notificationPublisher;
localSegmentChecksumMap = new HashMap<>();
if (indexShard.shardRouting.primary()) {
try {
Expand Down Expand Up @@ -103,6 +107,9 @@ public void afterRefresh(boolean didRefresh) {
deleteStaleCommits();
}

// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can move.
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();

String segmentInfoSnapshotFilename = null;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
Expand Down Expand Up @@ -148,6 +155,10 @@ public void afterRefresh(boolean didRefresh) {
.lastRefreshedCheckpoint();
((InternalEngine) indexShard.getEngine()).translogManager()
.setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);

if (!RemoteStoreSegmentUploadNotificationPublisher.EMPTY.equals(notificationPublisher)) {
notificationPublisher.notifySegmentUpload(indexShard, checkpoint);
}
}
}
} catch (EngineException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.opensearch.common.inject.Inject;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;


/**
* Hook to publish notification after primary uploads segments to the remote store.
*
* @opensearch.internal
*/
public class RemoteStoreSegmentUploadNotificationPublisher {
private final SegmentReplicationCheckpointPublisher segRepPublisher;

@Inject
public RemoteStoreSegmentUploadNotificationPublisher(SegmentReplicationCheckpointPublisher segRepPublisher) {
this.segRepPublisher = segRepPublisher;
}

public void notifySegmentUpload(IndexShard indexShard, ReplicationCheckpoint checkpoint) {
// TODO: Add separate publisher for CCR.
// we don't call indexShard.getLatestReplicationCheckpoint() as it might have a newer refreshed checkpoint.
// Instead we send the one which has been uploaded to remote store.
if (segRepPublisher != null) segRepPublisher.publish(indexShard, checkpoint);
}

public static final RemoteStoreSegmentUploadNotificationPublisher EMPTY = new RemoteStoreSegmentUploadNotificationPublisher(null);
}
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository
remoteStore.incRef();
try {
// Download segments from remote segment store
indexShard.syncSegmentsFromRemoteSegmentStore(true);
indexShard.syncSegmentsFromRemoteSegmentStore(true, true, false);

if (store.directory().listAll().length == 0) {
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void init() throws IOException {
* @return Map of segment filename to uploaded filename with checksum
* @throws IOException if there were any failures in reading the metadata file
*/
private Map<String, UploadedSegmentMetadata> readLatestMetadataFile() throws IOException {
public Map<String, UploadedSegmentMetadata> readLatestMetadataFile() throws IOException {
Map<String, UploadedSegmentMetadata> segmentMetadataMap = new HashMap<>();

Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX);
Expand Down Expand Up @@ -149,7 +149,6 @@ private Map<String, UploadedSegmentMetadata> readMetadataFile(String metadataFil
public static class UploadedSegmentMetadata {
// Visible for testing
static final String SEPARATOR = "::";

private final String originalFilename;
private final String uploadedFilename;
private final String checksum;
Expand Down Expand Up @@ -179,6 +178,10 @@ public static UploadedSegmentMetadata fromString(String uploadedFilename) {
String[] values = uploadedFilename.split(SEPARATOR);
return new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3]));
}

public String getOriginalFilename() {
return originalFilename;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ public static RecoveryDiff segmentReplicationDiff(Map<String, StoreFileMetadata>
missing.add(value);
} else {
final StoreFileMetadata fileMetadata = target.get(value.name());
if (fileMetadata.isSame(value)) {
// match segments using checksum
if (fileMetadata.checksum().equals(value.checksum())) {
identical.add(value);
} else {
different.add(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.seqno.GlobalCheckpointSyncAction;
import org.opensearch.index.shard.PrimaryReplicaSyncer;
import org.opensearch.index.shard.RemoteStoreSegmentUploadNotificationPublisher;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
Expand Down Expand Up @@ -287,6 +288,11 @@ protected void configure() {
} else {
bind(SegmentReplicationCheckpointPublisher.class).toInstance(SegmentReplicationCheckpointPublisher.EMPTY);
}
if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) {
bind(RemoteStoreSegmentUploadNotificationPublisher.class).asEagerSingleton();
} else {
bind(RemoteStoreSegmentUploadNotificationPublisher.class).toInstance(RemoteStoreSegmentUploadNotificationPublisher.EMPTY);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
import org.opensearch.index.shard.IndexingStats;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheCleaner;
import org.opensearch.index.shard.RemoteStoreSegmentUploadNotificationPublisher;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.TranslogFactory;
Expand All @@ -161,7 +162,6 @@
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -1023,14 +1023,15 @@ public IndexShard createShard(
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final DiscoveryNode targetNode,
final DiscoveryNode sourceNode
final DiscoveryNode sourceNode,
final RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
ensureChangesAllowed();
IndexService indexService = indexService(shardRouting.index());
assert indexService != null;
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher, remoteSegmentNotificationPublisher);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
Expand Down
Loading