From fb88eca138684be2534f98841b9b412e77187b3b Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Wed, 23 Aug 2023 23:15:16 -0700 Subject: [PATCH 1/5] pick oldest OS version replica to promote as primary Signed-off-by: Poojita Raj --- .../cluster/routing/RoutingNodes.java | 30 ++++++++--- .../allocation/FailedShardsRoutingTests.java | 50 ++++++++++++++----- 2 files changed, 60 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 981e21537c078..f78977492a168 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -66,6 +66,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.action.get.TransportGetAction.isSegmentReplicationEnabled; + /** * {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}. * It can be either initialized as mutable or immutable (see {@link #RoutingNodes(ClusterState, boolean)}), allowing @@ -82,6 +84,7 @@ * @opensearch.internal */ public class RoutingNodes implements Iterable { + private final ClusterState clusterState; private final Map nodesToShards = new HashMap<>(); @@ -107,6 +110,7 @@ public RoutingNodes(ClusterState clusterState) { } public RoutingNodes(ClusterState clusterState, boolean readOnly) { + this.clusterState = clusterState; this.readOnly = readOnly; final RoutingTable routingTable = clusterState.routingTable(); this.nodesPerAttributeNames = Collections.synchronizedMap(new HashMap<>()); @@ -368,26 +372,36 @@ public ShardRouting activePrimary(ShardId shardId) { * Returns one active replica shard for the given shard id or null if * no active replica is found. * - * Since replicas could possibly be on nodes with a older version of OpenSearch than - * the primary is, this will return replicas on the highest version of OpenSearch. + * Since replicas could possibly be on nodes with an older version of OpenSearch than + * the primary is, this will return replicas on the highest version of OpenSearch when + * document replication strategy is in use, and will return replicas on oldest version + * of OpenSearch when segment replication is enabled. * */ - public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) { + public ShardRouting activeReplicaBasedOnReplicationStrategy(ShardId shardId) { // It's possible for replicaNodeVersion to be null, when disassociating dead nodes // that have been removed, the shards are failed, and part of the shard failing // calls this method with an out-of-date RoutingNodes, where the version might not // be accessible. Therefore, we need to protect against the version being null // (meaning the node will be going away). - return assignedShards(shardId).stream() + Stream candidateShards = assignedShards(shardId).stream() .filter(shr -> !shr.primary() && shr.active()) - .filter(shr -> node(shr.currentNodeId()) != null) - .max( + .filter(shr -> node(shr.currentNodeId()) != null); + if (isSegmentReplicationEnabled(clusterState, shardId.getIndexName())) { + return candidateShards.min( Comparator.comparing( shr -> node(shr.currentNodeId()).node(), Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion)) ) + ).orElse(null); + + } + return candidateShards.max( + Comparator.comparing( + shr -> node(shr.currentNodeId()).node(), + Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion)) ) - .orElse(null); + ).orElse(null); } /** @@ -724,7 +738,7 @@ private void unassignPrimaryAndPromoteActiveReplicaIfExists( RoutingChangesObserver routingChangesObserver ) { assert failedShard.primary(); - ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); + ShardRouting activeReplica = activeReplicaBasedOnReplicationStrategy(failedShard.shardId()); if (activeReplica == null) { moveToUnassigned(failedShard, unassignedInfo); } else { diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java index f2dc745ad33bf..2970a27d73414 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -49,6 +49,7 @@ import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.common.settings.Settings; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.VersionUtils; import java.util.ArrayList; @@ -587,7 +588,7 @@ public void testFailAllReplicasInitializingOnPrimaryFail() { clusterState = startShardsAndReroute(allocation, clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0)); assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); - ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); + ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaBasedOnReplicationStrategy(shardId); // fail the primary shard, check replicas get removed as well... ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); @@ -647,10 +648,21 @@ public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToEle } public void testReplicaOnNewestVersionIsPromoted() { + testReplicaIsPromoted(false); + } + + public void testReplicaOnOldestVersionIsPromoted() { + testReplicaIsPromoted(true); + } + + private void testReplicaIsPromoted(boolean isSegmentReplicationEnabled) { AllocationService allocation = createAllocationService(Settings.builder().build()); + Settings.Builder settingsBuilder = isSegmentReplicationEnabled + ? settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + : settings(Version.CURRENT); Metadata metadata = Metadata.builder() - .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(3)) + .put(IndexMetadata.builder("test").settings(settingsBuilder).numberOfShards(1).numberOfReplicas(3)) .build(); RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); @@ -714,7 +726,7 @@ public void testReplicaOnNewestVersionIsPromoted() { assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4)); assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); - ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); + ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaBasedOnReplicationStrategy(shardId); logger.info("--> all shards allocated, replica that should be promoted: {}", startedReplica); // fail the primary shard again and make sure the correct replica is promoted @@ -739,13 +751,20 @@ public void testReplicaOnNewestVersionIsPromoted() { continue; } Version nodeVer = cursor.getVersion(); - assertTrue( - "expected node [" + cursor.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, - replicaNodeVersion.onOrAfter(nodeVer) - ); + if (isSegmentReplicationEnabled) { + assertTrue( + "expected node [" + cursor.getId() + "] with version " + nodeVer + " to be after " + replicaNodeVersion, + replicaNodeVersion.onOrBefore(nodeVer) + ); + } else { + assertTrue( + "expected node [" + cursor.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, + replicaNodeVersion.onOrAfter(nodeVer) + ); + } } - startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); + startedReplica = clusterState.getRoutingNodes().activeReplicaBasedOnReplicationStrategy(shardId); logger.info("--> failing primary shard a second time, should select: {}", startedReplica); // fail the primary shard again, and ensure the same thing happens @@ -771,10 +790,17 @@ public void testReplicaOnNewestVersionIsPromoted() { continue; } Version nodeVer = cursor.getVersion(); - assertTrue( - "expected node [" + cursor.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, - replicaNodeVersion.onOrAfter(nodeVer) - ); + if (isSegmentReplicationEnabled) { + assertTrue( + "expected node [" + cursor.getId() + "] with version " + nodeVer + " to be after " + replicaNodeVersion, + replicaNodeVersion.onOrBefore(nodeVer) + ); + } else { + assertTrue( + "expected node [" + cursor.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, + replicaNodeVersion.onOrAfter(nodeVer) + ); + } } } } From b71276e483ba1b6e50c870794e2b4b91ad9a24fc Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Thu, 24 Aug 2023 09:29:08 -0700 Subject: [PATCH 2/5] add test Signed-off-by: Poojita Raj --- .../allocation/FailedNodeRoutingTests.java | 37 +++++++++++++++---- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedNodeRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedNodeRoutingTests.java index 80afc1d9b0b0f..c245e608edbec 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedNodeRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedNodeRoutingTests.java @@ -53,6 +53,7 @@ import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.common.settings.Settings; import org.opensearch.indices.cluster.ClusterStateChanges; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -69,6 +70,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.hamcrest.Matchers.equalTo; @@ -137,7 +139,15 @@ public void testSimpleFailedNodeTest() { } } + public void testRandomClusterPromotesOldestReplica() throws InterruptedException { + testRandomClusterPromotesReplica(true); + } + public void testRandomClusterPromotesNewestReplica() throws InterruptedException { + testRandomClusterPromotesReplica(false); + } + + void testRandomClusterPromotesReplica(boolean isSegmentReplicationEnabled) throws InterruptedException { ThreadPool threadPool = new TestThreadPool(getClass().getName()); ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); @@ -164,6 +174,9 @@ public void testRandomClusterPromotesNewestReplica() throws InterruptedException Settings.Builder settingsBuilder = Settings.builder() .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 4)) .put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(2, 4)); + if (isSegmentReplicationEnabled) { + settingsBuilder.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + } CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE); state = cluster.createIndex(state, request); assertTrue(state.metadata().hasIndex(name)); @@ -206,13 +219,23 @@ public void testRandomClusterPromotesNewestReplica() throws InterruptedException Version candidateVer = getNodeVersion(sr, compareState); if (candidateVer != null) { logger.info("--> candidate on {} node; shard routing: {}", candidateVer, sr); - assertTrue( - "candidate was not on the newest version, new primary is on " - + newPrimaryVersion - + " and there is a candidate on " - + candidateVer, - candidateVer.onOrBefore(newPrimaryVersion) - ); + if (isSegmentReplicationEnabled) { + assertTrue( + "candidate was not on the oldest version, new primary is on " + + newPrimaryVersion + + " and there is a candidate on " + + candidateVer, + candidateVer.onOrAfter(newPrimaryVersion) + ); + } else { + assertTrue( + "candidate was not on the newest version, new primary is on " + + newPrimaryVersion + + " and there is a candidate on " + + candidateVer, + candidateVer.onOrBefore(newPrimaryVersion) + ); + } } }); }); From dc410022819966687273c0c1691531bf3a7165a9 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Tue, 29 Aug 2023 10:22:36 -0700 Subject: [PATCH 3/5] refactor Signed-off-by: Poojita Raj --- .../action/get/TransportGetAction.java | 12 +++++-- .../action/get/TransportMultiGetAction.java | 2 +- .../org/opensearch/cluster/ClusterState.java | 16 --------- .../opensearch/cluster/metadata/Metadata.java | 16 +++++++++ .../cluster/routing/RoutingNodes.java | 8 ++--- .../action/get/TransportGetActionTests.java | 14 ++++---- .../opensearch/cluster/ClusterStateTests.java | 34 ------------------- .../cluster/metadata/MetadataTests.java | 24 +++++++++++++ 8 files changed, 60 insertions(+), 66 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java index 0c444732fb12b..00a795c86356f 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java @@ -37,6 +37,7 @@ import org.opensearch.action.support.single.shard.TransportSingleShardAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.service.ClusterService; @@ -92,8 +93,8 @@ protected boolean resolveIndex(GetRequest request) { /** * Returns true if GET request should be routed to primary shards, else false. */ - protected static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) { - return state.isSegmentReplicationEnabled(indexName) && realtime && preference == null; + protected static boolean shouldForcePrimaryRouting(Metadata metadata, boolean realtime, String preference, String indexName) { + return metadata.isSegmentReplicationEnabled(indexName) && realtime && preference == null; } @Override @@ -101,7 +102,12 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) { final String preference; // route realtime GET requests when segment replication is enabled to primary shards, // iff there are no other preferences/routings enabled for routing to a specific shard - if (shouldForcePrimaryRouting(state, request.request().realtime, request.request().preference(), request.concreteIndex())) { + if (shouldForcePrimaryRouting( + state.getMetadata(), + request.request().realtime, + request.request().preference(), + request.concreteIndex() + )) { preference = Preference.PRIMARY.type(); } else { preference = request.request().preference(); diff --git a/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java index a1a74208dc725..5d58d09d7a320 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java @@ -112,7 +112,7 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL MultiGetShardRequest shardRequest = shardRequests.get(shardId); if (shardRequest == null) { - if (shouldForcePrimaryRouting(clusterState, request.realtime, request.preference, concreteSingleIndex)) { + if (shouldForcePrimaryRouting(clusterState.getMetadata(), request.realtime, request.preference, concreteSingleIndex)) { request.preference(Preference.PRIMARY.type()); } shardRequest = new MultiGetShardRequest(request, shardId.getIndexName(), shardId.getId()); diff --git a/server/src/main/java/org/opensearch/cluster/ClusterState.java b/server/src/main/java/org/opensearch/cluster/ClusterState.java index 2fd58d3db4975..1b87a60c2ccf5 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterState.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterState.java @@ -61,7 +61,6 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.discovery.Discovery; -import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; import java.util.Collections; @@ -410,21 +409,6 @@ public boolean supersedes(ClusterState other) { } - /** - * Utility to identify whether input index belongs to SEGMENT replication in established cluster state. - * - * @param indexName Index name - * @return true if index belong SEGMENT replication, false otherwise - */ - public boolean isSegmentReplicationEnabled(String indexName) { - return Optional.ofNullable(this.getMetadata().index(indexName)) - .map( - indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE)) - .equals(ReplicationType.SEGMENT) - ) - .orElse(false); - } - /** * Metrics for cluster state. * diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index 13a27f76a181c..bd6fec9c90d5c 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -66,6 +66,7 @@ import org.opensearch.core.xcontent.XContentParser; import org.opensearch.gateway.MetadataStateFormat; import org.opensearch.index.IndexNotFoundException; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.MapperPlugin; import java.io.IOException; @@ -107,6 +108,21 @@ public class Metadata implements Iterable, Diffable, To public static final String UNKNOWN_CLUSTER_UUID = Strings.UNKNOWN_UUID_VALUE; public static final Pattern NUMBER_PATTERN = Pattern.compile("[0-9]+$"); + /** + * Utility to identify whether input index uses SEGMENT replication strategy in established cluster state metadata. + * + * @param indexName Index name + * @return true if index uses SEGMENT replication, false otherwise + */ + public boolean isSegmentReplicationEnabled(String indexName) { + return Optional.ofNullable(index(indexName)) + .map( + indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE)) + .equals(ReplicationType.SEGMENT) + ) + .orElse(false); + } + /** * Context of the XContent. * diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index f78977492a168..320e3ddf7a59c 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -66,8 +66,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.opensearch.action.get.TransportGetAction.isSegmentReplicationEnabled; - /** * {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}. * It can be either initialized as mutable or immutable (see {@link #RoutingNodes(ClusterState, boolean)}), allowing @@ -84,7 +82,7 @@ * @opensearch.internal */ public class RoutingNodes implements Iterable { - private final ClusterState clusterState; + private final Metadata metadata; private final Map nodesToShards = new HashMap<>(); @@ -110,7 +108,7 @@ public RoutingNodes(ClusterState clusterState) { } public RoutingNodes(ClusterState clusterState, boolean readOnly) { - this.clusterState = clusterState; + this.metadata = clusterState.getMetadata(); this.readOnly = readOnly; final RoutingTable routingTable = clusterState.routingTable(); this.nodesPerAttributeNames = Collections.synchronizedMap(new HashMap<>()); @@ -387,7 +385,7 @@ public ShardRouting activeReplicaBasedOnReplicationStrategy(ShardId shardId) { Stream candidateShards = assignedShards(shardId).stream() .filter(shr -> !shr.primary() && shr.active()) .filter(shr -> node(shr.currentNodeId()) != null); - if (isSegmentReplicationEnabled(clusterState, shardId.getIndexName())) { + if (metadata.isSegmentReplicationEnabled(shardId.getIndexName())) { return candidateShards.min( Comparator.comparing( shr -> node(shr.currentNodeId()).node(), diff --git a/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java b/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java index 2eca49fb3032f..9565e219d1a78 100644 --- a/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java +++ b/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java @@ -67,24 +67,24 @@ private static ClusterState clusterState(ReplicationType replicationType) { public void testShouldForcePrimaryRouting() { - ClusterState clusterState = clusterState(ReplicationType.SEGMENT); + Metadata metadata = clusterState(ReplicationType.SEGMENT).getMetadata(); // should return false since preference is set for request - assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, Preference.REPLICA.type(), "index1")); + assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, Preference.REPLICA.type(), "index1")); // should return false since request is not realtime - assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, false, null, "index1")); + assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, false, null, "index1")); // should return true since segment replication is enabled - assertTrue(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index1")); + assertTrue(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1")); // should return false since index doesn't exist - assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index3")); + assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index3")); - clusterState = clusterState(ReplicationType.DOCUMENT); + metadata = clusterState(ReplicationType.DOCUMENT).getMetadata(); // should fail since document replication enabled - assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index1")); + assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1")); } diff --git a/server/src/test/java/org/opensearch/cluster/ClusterStateTests.java b/server/src/test/java/org/opensearch/cluster/ClusterStateTests.java index c4fb3271ae3ce..457bdac1809ef 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterStateTests.java @@ -57,7 +57,6 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.TestCustomMetadata; @@ -117,39 +116,6 @@ public void testSupersedes() { ); } - public void testIsSegmentReplicationEnabled() { - final String indexName = "test"; - ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build(); - Settings.Builder builder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); - IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName) - .settings(builder) - .numberOfShards(1) - .numberOfReplicas(1); - Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder().addAsNew(indexMetadataBuilder.build()); - clusterState = ClusterState.builder(clusterState) - .metadata(metadataBuilder.build()) - .routingTable(routingTableBuilder.build()) - .build(); - assertTrue(clusterState.isSegmentReplicationEnabled(indexName)); - } - - public void testIsSegmentReplicationDisabled() { - final String indexName = "test"; - ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build(); - IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName) - .settings(settings(Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(1); - Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder().addAsNew(indexMetadataBuilder.build()); - clusterState = ClusterState.builder(clusterState) - .metadata(metadataBuilder.build()) - .routingTable(routingTableBuilder.build()) - .build(); - assertFalse(clusterState.isSegmentReplicationEnabled(indexName)); - } - public void testBuilderRejectsNullCustom() { final ClusterState.Builder builder = ClusterState.builder(ClusterName.DEFAULT); final String key = randomAlphaOfLength(10); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataTests.java index fff39d14e9702..40eefa6cdbf03 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataTests.java @@ -52,6 +52,7 @@ import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.MapperPlugin; import org.opensearch.test.OpenSearchTestCase; @@ -1425,6 +1426,29 @@ public void testMetadataBuildInvocations() { compareMetadata(previousMetadata, builtMetadata, false, true, true); } + public void testIsSegmentReplicationEnabled() { + final String indexName = "test"; + Settings.Builder builder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName) + .settings(builder) + .numberOfShards(1) + .numberOfReplicas(1); + Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder); + Metadata metadata = metadataBuilder.build(); + assertTrue(metadata.isSegmentReplicationEnabled(indexName)); + } + + public void testIsSegmentReplicationDisabled() { + final String indexName = "test"; + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1); + Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder); + Metadata metadata = metadataBuilder.build(); + assertFalse(metadata.isSegmentReplicationEnabled(indexName)); + } + public static Metadata randomMetadata() { Metadata.Builder md = Metadata.builder() .put(buildIndexMetadata("index", "alias", randomBoolean() ? null : randomBoolean()).build(), randomBoolean()) From 16c4751ea64abb36755d281cd11a7655215d9f5f Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Tue, 29 Aug 2023 14:06:50 -0700 Subject: [PATCH 4/5] refactor to avoid coupling Signed-off-by: Poojita Raj --- .../action/get/TransportMultiGetAction.java | 9 +- .../cluster/routing/RoutingNodes.java | 48 ++++++----- .../get/TransportMultiGetActionTests.java | 85 +++++++++++++------ .../allocation/FailedShardsRoutingTests.java | 15 +++- 4 files changed, 102 insertions(+), 55 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java index 5d58d09d7a320..8bbfef381aea8 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java @@ -38,6 +38,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; @@ -51,8 +52,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import static org.opensearch.action.get.TransportGetAction.shouldForcePrimaryRouting; - /** * Perform the multi get action. * @@ -78,6 +77,10 @@ public TransportMultiGetAction( this.indexNameExpressionResolver = resolver; } + protected static boolean shouldForcePrimaryRouting(Metadata metadata, boolean realtime, String preference, String indexName) { + return metadata.isSegmentReplicationEnabled(indexName) && realtime && preference == null; + } + @Override protected void doExecute(Task task, final MultiGetRequest request, final ActionListener listener) { ClusterState clusterState = clusterService.state(); @@ -112,7 +115,7 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL MultiGetShardRequest shardRequest = shardRequests.get(shardId); if (shardRequest == null) { - if (shouldForcePrimaryRouting(clusterState.getMetadata(), request.realtime, request.preference, concreteSingleIndex)) { + if (shouldForcePrimaryRouting(clusterState.getMetadata(), request.realtime(), request.preference(), concreteSingleIndex)) { request.preference(Preference.PRIMARY.type()); } shardRequest = new MultiGetShardRequest(request, shardId.getIndexName(), shardId.getId()); diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 320e3ddf7a59c..772e81fb9d52c 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -366,40 +366,37 @@ public ShardRouting activePrimary(ShardId shardId) { return null; } - /** - * Returns one active replica shard for the given shard id or null if - * no active replica is found. - * - * Since replicas could possibly be on nodes with an older version of OpenSearch than - * the primary is, this will return replicas on the highest version of OpenSearch when - * document replication strategy is in use, and will return replicas on oldest version - * of OpenSearch when segment replication is enabled. - * - */ - public ShardRouting activeReplicaBasedOnReplicationStrategy(ShardId shardId) { + public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) { // It's possible for replicaNodeVersion to be null, when disassociating dead nodes // that have been removed, the shards are failed, and part of the shard failing // calls this method with an out-of-date RoutingNodes, where the version might not // be accessible. Therefore, we need to protect against the version being null // (meaning the node will be going away). - Stream candidateShards = assignedShards(shardId).stream() + return assignedShards(shardId).stream() .filter(shr -> !shr.primary() && shr.active()) - .filter(shr -> node(shr.currentNodeId()) != null); - if (metadata.isSegmentReplicationEnabled(shardId.getIndexName())) { - return candidateShards.min( + .filter(shr -> node(shr.currentNodeId()) != null) + .max( Comparator.comparing( shr -> node(shr.currentNodeId()).node(), Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion)) ) - ).orElse(null); + ) + .orElse(null); + } - } - return candidateShards.max( - Comparator.comparing( - shr -> node(shr.currentNodeId()).node(), - Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion)) + public ShardRouting activeReplicaWithOldestVersion(ShardId shardId) { + // It's possible for replicaNodeVersion to be null. Therefore, we need to protect against the version being null + // (meaning the node will be going away). + return assignedShards(shardId).stream() + .filter(shr -> !shr.primary() && shr.active()) + .filter(shr -> node(shr.currentNodeId()) != null) + .min( + Comparator.comparing( + shr -> node(shr.currentNodeId()).node(), + Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion)) + ) ) - ).orElse(null); + .orElse(null); } /** @@ -736,7 +733,12 @@ private void unassignPrimaryAndPromoteActiveReplicaIfExists( RoutingChangesObserver routingChangesObserver ) { assert failedShard.primary(); - ShardRouting activeReplica = activeReplicaBasedOnReplicationStrategy(failedShard.shardId()); + ShardRouting activeReplica; + if (metadata.isSegmentReplicationEnabled(failedShard.getIndexName())) { + activeReplica = activeReplicaWithOldestVersion(failedShard.shardId()); + } else { + activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); + } if (activeReplica == null) { moveToUnassigned(failedShard, unassignedInfo); } else { diff --git a/server/src/test/java/org/opensearch/action/get/TransportMultiGetActionTests.java b/server/src/test/java/org/opensearch/action/get/TransportMultiGetActionTests.java index c9f40e0acc56c..0503bb39427a1 100644 --- a/server/src/test/java/org/opensearch/action/get/TransportMultiGetActionTests.java +++ b/server/src/test/java/org/opensearch/action/get/TransportMultiGetActionTests.java @@ -44,6 +44,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.OperationRouting; +import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; @@ -58,6 +59,7 @@ import org.opensearch.core.tasks.TaskId; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; import org.opensearch.test.OpenSearchTestCase; @@ -68,6 +70,7 @@ import org.junit.AfterClass; import org.junit.BeforeClass; +import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -91,32 +94,8 @@ public class TransportMultiGetActionTests extends OpenSearchTestCase { private static TransportMultiGetAction transportAction; private static TransportShardMultiGetAction shardAction; - @BeforeClass - public static void beforeClass() throws Exception { - threadPool = new TestThreadPool(TransportMultiGetActionTests.class.getSimpleName()); - - transportService = new TransportService( - Settings.EMPTY, - mock(Transport.class), - threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal( - Settings.builder().put("node.name", "node1").build(), - boundAddress.publishAddress(), - randomBase64UUID() - ), - null, - emptySet() - ) { - @Override - public TaskManager getTaskManager() { - return taskManager; - } - }; - - final Index index1 = new Index("index1", randomBase64UUID()); - final Index index2 = new Index("index2", randomBase64UUID()); - final ClusterState clusterState = ClusterState.builder(new ClusterName(TransportMultiGetActionTests.class.getSimpleName())) + private static ClusterState clusterState(ReplicationType replicationType, Index index1, Index index2) throws IOException { + return ClusterState.builder(new ClusterName(TransportMultiGetActionTests.class.getSimpleName())) .metadata( new Metadata.Builder().put( new IndexMetadata.Builder(index1.getName()).settings( @@ -124,6 +103,7 @@ public TaskManager getTaskManager() { .put("index.version.created", Version.CURRENT) .put("index.number_of_shards", 1) .put("index.number_of_replicas", 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, replicationType) .put(IndexMetadata.SETTING_INDEX_UUID, index1.getUUID()) ) .putMapping( @@ -149,6 +129,7 @@ public TaskManager getTaskManager() { .put("index.version.created", Version.CURRENT) .put("index.number_of_shards", 1) .put("index.number_of_replicas", 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, replicationType) .put(IndexMetadata.SETTING_INDEX_UUID, index1.getUUID()) ) .putMapping( @@ -170,6 +151,34 @@ public TaskManager getTaskManager() { ) ) .build(); + } + + @BeforeClass + public static void beforeClass() throws Exception { + threadPool = new TestThreadPool(TransportMultiGetActionTests.class.getSimpleName()); + + transportService = new TransportService( + Settings.EMPTY, + mock(Transport.class), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal( + Settings.builder().put("node.name", "node1").build(), + boundAddress.publishAddress(), + randomBase64UUID() + ), + null, + emptySet() + ) { + @Override + public TaskManager getTaskManager() { + return taskManager; + } + }; + + final Index index1 = new Index("index1", randomBase64UUID()); + final Index index2 = new Index("index2", randomBase64UUID()); + ClusterState clusterState = clusterState(randomBoolean() ? ReplicationType.SEGMENT : ReplicationType.DOCUMENT, index1, index2); final ShardIterator index1ShardIterator = mock(ShardIterator.class); when(index1ShardIterator.shardId()).thenReturn(new ShardId(index1, randomInt())); @@ -285,6 +294,30 @@ protected void executeShardAction( } + public void testShouldForcePrimaryRouting() throws IOException { + final Index index1 = new Index("index1", randomBase64UUID()); + final Index index2 = new Index("index2", randomBase64UUID()); + Metadata metadata = clusterState(ReplicationType.SEGMENT, index1, index2).getMetadata(); + + // should return false since preference is set for request + assertFalse(TransportMultiGetAction.shouldForcePrimaryRouting(metadata, true, Preference.REPLICA.type(), "index1")); + + // should return false since request is not realtime + assertFalse(TransportMultiGetAction.shouldForcePrimaryRouting(metadata, false, null, "index2")); + + // should return true since segment replication is enabled + assertTrue(TransportMultiGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1")); + + // should return false since index doesn't exist + assertFalse(TransportMultiGetAction.shouldForcePrimaryRouting(metadata, true, null, "index3")); + + metadata = clusterState(ReplicationType.DOCUMENT, index1, index2).getMetadata(); + + // should fail since document replication enabled + assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1")); + + } + private static Task createTask() { return new Task( randomLong(), diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 2970a27d73414..db4cedbbbe7b5 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -588,7 +588,7 @@ public void testFailAllReplicasInitializingOnPrimaryFail() { clusterState = startShardsAndReroute(allocation, clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0)); assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); - ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaBasedOnReplicationStrategy(shardId); + ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); // fail the primary shard, check replicas get removed as well... ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); @@ -726,7 +726,12 @@ private void testReplicaIsPromoted(boolean isSegmentReplicationEnabled) { assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4)); assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); - ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaBasedOnReplicationStrategy(shardId); + ShardRouting startedReplica; + if (isSegmentReplicationEnabled) { + startedReplica = clusterState.getRoutingNodes().activeReplicaWithOldestVersion(shardId); + } else { + startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); + } logger.info("--> all shards allocated, replica that should be promoted: {}", startedReplica); // fail the primary shard again and make sure the correct replica is promoted @@ -764,7 +769,11 @@ private void testReplicaIsPromoted(boolean isSegmentReplicationEnabled) { } } - startedReplica = clusterState.getRoutingNodes().activeReplicaBasedOnReplicationStrategy(shardId); + if (isSegmentReplicationEnabled) { + startedReplica = clusterState.getRoutingNodes().activeReplicaWithOldestVersion(shardId); + } else { + startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); + } logger.info("--> failing primary shard a second time, should select: {}", startedReplica); // fail the primary shard again, and ensure the same thing happens From 6f31434f858ed0ca2e0c2526e7ced374a3452ade Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Wed, 30 Aug 2023 10:23:25 -0700 Subject: [PATCH 5/5] add comments Signed-off-by: Poojita Raj --- .../opensearch/cluster/metadata/Metadata.java | 1 + .../cluster/routing/RoutingNodes.java | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index bd6fec9c90d5c..146193b8d22c4 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -110,6 +110,7 @@ public class Metadata implements Iterable, Diffable, To /** * Utility to identify whether input index uses SEGMENT replication strategy in established cluster state metadata. + * Note: Method intended for use by other plugins as well. * * @param indexName Index name * @return true if index uses SEGMENT replication, false otherwise diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 772e81fb9d52c..4f7b935f15f93 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -366,6 +366,14 @@ public ShardRouting activePrimary(ShardId shardId) { return null; } + /** + * Returns one active replica shard for the given shard id or null if + * no active replica is found. + * + * Since replicas could possibly be on nodes with an older version of OpenSearch than + * the primary is, this will return replicas on the highest version of OpenSearch when document + * replication is enabled. + */ public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) { // It's possible for replicaNodeVersion to be null, when disassociating dead nodes // that have been removed, the shards are failed, and part of the shard failing @@ -384,6 +392,15 @@ public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) { .orElse(null); } + /** + * Returns one active replica shard for the given shard id or null if + * no active replica is found. + * + * Since replicas could possibly be on nodes with a higher version of OpenSearch than + * the primary is, this will return replicas on the oldest version of OpenSearch when segment + * replication is enabled to allow for replica to read segments from primary. + * + */ public ShardRouting activeReplicaWithOldestVersion(ShardId shardId) { // It's possible for replicaNodeVersion to be null. Therefore, we need to protect against the version being null // (meaning the node will be going away).