Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store Migration] Handling translog metadata update during remote store migration #13810

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
Expand All @@ -36,6 +37,9 @@

import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
Expand Down Expand Up @@ -120,6 +124,36 @@ public void initDocRepToRemoteMigration() {
);
}

public void enableEnhancedPrefixPath() {
assertAcked(
internalCluster().client()
.admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder()
.put(
CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(),
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1.name()
)
.put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_PREFIX.name())
.build()
)
.get()
);
}

public void enableRemoteTranslogMetadata() {
assertAcked(
internalCluster().client()
.admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), "true"))
.get()
);
}

public BulkResponse indexBulk(String indexName, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ public void testRemotePathMetadataAddedWithFirstPrimaryMovingToRemote() throws E

logger.info("---> Adding 3 remote enabled nodes");
initDocRepToRemoteMigration();
enableEnhancedPrefixPath();
enableRemoteTranslogMetadata();
addRemote = true;
List<String> remoteEnabledNodes = internalCluster().startDataOnlyNodes(
3,
Expand Down Expand Up @@ -598,6 +600,15 @@ private void assertCustomIndexMetadata(String index) {
logger.info("---> Asserting custom index metadata");
IndexMetadata iMd = internalCluster().client().admin().cluster().prepareState().get().getState().metadata().index(index);
assertNotNull(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY));
assertEquals(
iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY).get(RemoteStoreEnums.PathHashAlgorithm.NAME),
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1.name()
);
assertEquals(
iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY).get(RemoteStoreEnums.PathType.NAME),
RemoteStoreEnums.PathType.HASHED_PREFIX.name()
);
assertNotNull(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY).get(IndexMetadata.TRANSLOG_METADATA_KEY));
assertEquals(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY).get(IndexMetadata.TRANSLOG_METADATA_KEY), "false");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ protected void clusterManagerOperation(
routingNodes,
state,
clusterInfo,
snapshotsInfoService,
snapshotsInfoService.snapshotShardSizes(),
System.nanoTime()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRout
routingNodes,
clusterState,
clusterInfoService.getClusterInfo(),
snapshotsInfoService,
snapshotsInfoService.snapshotShardSizes(),
currentNanoTime()
);
Expand Down Expand Up @@ -258,6 +259,7 @@ public ClusterState applyFailedShards(
routingNodes,
tmpState,
clusterInfoService.getClusterInfo(),
snapshotsInfoService,
snapshotsInfoService.snapshotShardSizes(),
currentNanoTime
);
Expand Down Expand Up @@ -333,6 +335,7 @@ public ClusterState disassociateDeadNodes(ClusterState clusterState, boolean rer
routingNodes,
clusterState,
clusterInfoService.getClusterInfo(),
snapshotsInfoService,
snapshotsInfoService.snapshotShardSizes(),
currentNanoTime()
);
Expand Down Expand Up @@ -360,6 +363,7 @@ public ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
clusterState.getRoutingNodes(),
clusterState,
clusterInfoService.getClusterInfo(),
snapshotsInfoService,
snapshotsInfoService.snapshotShardSizes(),
currentNanoTime()
);
Expand Down Expand Up @@ -493,6 +497,7 @@ public CommandsResult reroute(final ClusterState clusterState, AllocationCommand
routingNodes,
clusterState,
clusterInfoService.getClusterInfo(),
snapshotsInfoService,
snapshotsInfoService.snapshotShardSizes(),
currentNanoTime()
);
Expand Down Expand Up @@ -530,6 +535,7 @@ public ClusterState reroute(ClusterState clusterState, String reason) {
routingNodes,
fixedClusterState,
clusterInfoService.getClusterInfo(),
snapshotsInfoService,
snapshotsInfoService.snapshotShardSizes(),
currentNanoTime()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater;
import org.opensearch.repositories.RepositoriesService;

import java.util.Collections;
import java.util.Comparator;
Expand All @@ -73,6 +74,7 @@ public class IndexMetadataUpdater extends RoutingChangesObserver.AbstractRouting
private final Logger logger = LogManager.getLogger(IndexMetadataUpdater.class);
private final Map<ShardId, Updates> shardChanges = new HashMap<>();
private boolean ongoingRemoteStoreMigration = false;
private RepositoriesService repositoriesService;

@Override
public void shardInitialized(ShardRouting unassignedShard, ShardRouting initializedShard) {
Expand Down Expand Up @@ -176,7 +178,7 @@ public Metadata applyChanges(Metadata oldMetadata, RoutingTable newRoutingTable,
oldMetadata.settings(),
logger
);
migrationImdUpdater.maybeUpdateRemoteStoreCustomMetadata(indexMetadataBuilder, index.getName());
migrationImdUpdater.maybeUpdateRemoteStoreCustomMetadata(indexMetadataBuilder, index.getName(), repositoriesService);
migrationImdUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, index.getName());
}
}
Expand Down Expand Up @@ -408,6 +410,10 @@ public void setOngoingRemoteStoreMigration(boolean ongoingRemoteStoreMigration)
this.ongoingRemoteStoreMigration = ongoingRemoteStoreMigration;
}

public void setRepositoriesService(RepositoriesService repositoriesService) {
this.repositoriesService = repositoriesService;
}

private static class Updates {
private boolean increaseTerm; // whether primary term should be increased
private Set<String> addedAllocationIds = new HashSet<>(); // allocation ids that should be added to the in-sync set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
import org.opensearch.snapshots.RestoreService.RestoreInProgressUpdater;
import org.opensearch.snapshots.SnapshotShardSizeInfo;
import org.opensearch.snapshots.SnapshotsInfoService;

import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -55,6 +58,7 @@

import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableSet;
import static org.opensearch.index.remote.RemoteStoreUtils.hasAtleastOneRemoteNode;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.isMigratingToRemoteStore;

/**
Expand Down Expand Up @@ -102,6 +106,18 @@ public class RoutingAllocation {
restoreInProgressUpdater
);

// Used for tests
public RoutingAllocation(
AllocationDeciders deciders,
RoutingNodes routingNodes,
ClusterState clusterState,
ClusterInfo clusterInfo,
SnapshotShardSizeInfo shardSizeInfo,
long currentNanoTime
) {
this(deciders, routingNodes, clusterState, clusterInfo, null, shardSizeInfo, currentNanoTime);
}

/**
* Creates a new {@link RoutingAllocation}
* @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations
Expand All @@ -114,6 +130,7 @@ public RoutingAllocation(
RoutingNodes routingNodes,
ClusterState clusterState,
ClusterInfo clusterInfo,
SnapshotsInfoService snapshotsInfoService,
SnapshotShardSizeInfo shardSizeInfo,
long currentNanoTime
) {
Expand All @@ -126,8 +143,9 @@ public RoutingAllocation(
this.clusterInfo = clusterInfo;
this.shardSizeInfo = shardSizeInfo;
this.currentNanoTime = currentNanoTime;
if (isMigratingToRemoteStore(metadata)) {
if (isMigratingToRemoteStore(metadata) && hasAtleastOneRemoteNode(nodes)) {
indexMetadataUpdater.setOngoingRemoteStoreMigration(true);
indexMetadataUpdater.setRepositoriesService(determineRepositoriesService(snapshotsInfoService));
}
}

Expand All @@ -136,6 +154,21 @@ public long getCurrentNanoTime() {
return currentNanoTime;
}

/**
* Extracts instance of {@link RepositoriesService} to be used during the remote store migration flow
* Uses the {@link InternalSnapshotsInfoService} implementation of {@link SnapshotsInfoService}
* which is wired directly from {@link org.opensearch.cluster.ClusterModule} and the reference is passed through {@link AllocationService}
*
* @param snapshotsInfoService SnapshotsInfoService passed from ClusterModule through AllocationService
* @return RepositoriesService reference to be used in the remote migration flow
*/
private RepositoriesService determineRepositoriesService(SnapshotsInfoService snapshotsInfoService) {
assert snapshotsInfoService != null : "Cannot have null snapshotsInfo during remote store migration";
RepositoriesService repositoriesService = ((InternalSnapshotsInfoService) snapshotsInfoService).getRepositoriesService().get();
assert repositoriesService != null : "Cannot have null repositoriesService during remote store migration";
return repositoriesService;
}

/**
* Get {@link AllocationDeciders} used for allocation
* @return {@link AllocationDeciders} used for allocation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.RepositoriesService;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -127,12 +128,16 @@ private boolean needsRemoteIndexSettingsUpdate(
* @param indexMetadataBuilder Mutated {@link IndexMetadata.Builder} having the previous state updates
* @param index index name
*/
public void maybeUpdateRemoteStoreCustomMetadata(IndexMetadata.Builder indexMetadataBuilder, String index) {
public void maybeUpdateRemoteStoreCustomMetadata(
IndexMetadata.Builder indexMetadataBuilder,
String index,
RepositoriesService repositoriesService
) {
if (indexHasRemoteCustomMetadata(indexMetadata) == false) {
logger.info("Adding remote store custom data for index [{}] during migration", index);
indexMetadataBuilder.putCustom(
REMOTE_STORE_CUSTOM_KEY,
determineRemoteStoreCustomMetadataDuringMigration(clusterSettings, discoveryNodes)
determineRemoteStoreCustomMetadataDuringMigration(clusterSettings, discoveryNodes, repositoriesService)
);
} else {
logger.debug("Index {} already has remote store custom data", index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.nio.ByteBuffer;
import java.util.Arrays;
Expand All @@ -33,6 +35,7 @@
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;

/**
* Utils for remote store
Expand Down Expand Up @@ -203,22 +206,25 @@
*/
public static Map<String, String> determineRemoteStoreCustomMetadataDuringMigration(
Settings clusterSettings,
DiscoveryNodes discoveryNodes
DiscoveryNodes discoveryNodes,
RepositoriesService repositoriesService
) {
Map<String, String> remoteCustomData = new HashMap<>();
Version minNodeVersion = discoveryNodes.getMinNodeVersion();
boolean blobStoreMetadataClusterSettingsEnabled = Version.V_2_15_0.compareTo(minNodeVersion) <= 0
&& CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.get(clusterSettings);

// TODO: During the document replication to a remote store migration, there should be a check to determine if the registered
// translog blobstore supports custom metadata or not.
// Currently, the blobStoreMetadataEnabled flag is set to false because the integration tests run on the local file system, which
// does not support custom metadata.
// https://github.com/opensearch-project/OpenSearch/issues/13745
boolean blobStoreMetadataEnabled = false;
boolean translogMetadata = Version.CURRENT.compareTo(minNodeVersion) <= 0
&& CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.get(clusterSettings)
&& blobStoreMetadataEnabled;

remoteCustomData.put(IndexMetadata.TRANSLOG_METADATA_KEY, Boolean.toString(translogMetadata));
if (blobStoreMetadataClusterSettingsEnabled) {
String translogRepositoryName = RemoteStoreUtils.getRemoteStoreRepoName(discoveryNodes)
.get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
if (((BlobStoreRepository) repositoriesService.repository(translogRepositoryName)).blobStore().isBlobMetadataEnabled()) {
logger.debug("Repository {} supports object metadata. Setting translog_metadata to true", translogRepositoryName);
remoteCustomData.put(IndexMetadata.TRANSLOG_METADATA_KEY, Boolean.toString(true));
} else {
logger.debug("Repository {} does not support object metadata. Setting translog_metadata to false", translogRepositoryName);
remoteCustomData.put(IndexMetadata.TRANSLOG_METADATA_KEY, Boolean.toString(false));

Check warning on line 225 in server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java#L224-L225

Added lines #L224 - L225 were not covered by tests
}
}

RemoteStoreEnums.PathType pathType = Version.CURRENT.compareTo(minNodeVersion) <= 0
? CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.get(clusterSettings)
Expand Down Expand Up @@ -250,4 +256,10 @@
.findFirst();
return remoteNode.map(RemoteStoreNodeAttribute::getDataRepoNames).orElseGet(HashMap::new);
}

public static boolean hasAtleastOneRemoteNode(DiscoveryNodes discoveryNodes) {
Map<String, DiscoveryNode> dataNodes = discoveryNodes.getDataNodes();
return dataNodes.isEmpty() == false
&& dataNodes.keySet().stream().anyMatch(nodeId -> discoveryNodes.get(nodeId).isRemoteStoreNode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ public class InternalSnapshotsInfoService implements ClusterStateListener, Snaps
);

private final ThreadPool threadPool;
private final Supplier<RepositoriesService> repositoriesService;

public final Supplier<RepositoriesService> repositoriesService;

private final Supplier<RerouteService> rerouteService;

/** contains the snapshot shards for which the size is known **/
Expand Down Expand Up @@ -362,6 +364,10 @@ private static Set<SnapshotShard> listOfSnapshotShards(final ClusterState state)
return Collections.unmodifiableSet(snapshotShards);
}

public Supplier<RepositoriesService> getRepositoriesService() {
return repositoriesService;
}

/**
* A snapshot of a shard
*
Expand Down
Loading
Loading