Skip to content

Commit

Permalink
[Segment Replication] Handle failover in mixed cluster mode (#9536)
Browse files Browse the repository at this point in the history
* pick oldest OS version replica to promote as primary

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* add test

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* refactor

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* refactor to avoid coupling

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* add comments

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

---------

Signed-off-by: Poojita Raj <poojiraj@amazon.com>
  • Loading branch information
Poojita-Raj committed Aug 31, 2023
1 parent e563a0c commit ff65403
Show file tree
Hide file tree
Showing 11 changed files with 233 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.support.single.shard.TransportSingleShardAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -92,16 +93,21 @@ protected boolean resolveIndex(GetRequest request) {
/**
* Returns true if GET request should be routed to primary shards, else false.
*/
protected static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) {
return state.isSegmentReplicationEnabled(indexName) && realtime && preference == null;
protected static boolean shouldForcePrimaryRouting(Metadata metadata, boolean realtime, String preference, String indexName) {
return metadata.isSegmentReplicationEnabled(indexName) && realtime && preference == null;
}

@Override
protected ShardIterator shards(ClusterState state, InternalRequest request) {
final String preference;
// route realtime GET requests when segment replication is enabled to primary shards,
// iff there are no other preferences/routings enabled for routing to a specific shard
if (shouldForcePrimaryRouting(state, request.request().realtime, request.request().preference(), request.concreteIndex())) {
if (shouldForcePrimaryRouting(
state.getMetadata(),
request.request().realtime,
request.request().preference(),
request.concreteIndex()
)) {
preference = Preference.PRIMARY.type();
} else {
preference = request.request().preference();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
Expand All @@ -51,8 +52,6 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.action.get.TransportGetAction.shouldForcePrimaryRouting;

/**
* Perform the multi get action.
*
Expand All @@ -78,6 +77,10 @@ public TransportMultiGetAction(
this.indexNameExpressionResolver = resolver;
}

protected static boolean shouldForcePrimaryRouting(Metadata metadata, boolean realtime, String preference, String indexName) {
return metadata.isSegmentReplicationEnabled(indexName) && realtime && preference == null;
}

@Override
protected void doExecute(Task task, final MultiGetRequest request, final ActionListener<MultiGetResponse> listener) {
ClusterState clusterState = clusterService.state();
Expand Down Expand Up @@ -112,7 +115,7 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL

MultiGetShardRequest shardRequest = shardRequests.get(shardId);
if (shardRequest == null) {
if (shouldForcePrimaryRouting(clusterState, request.realtime, request.preference, concreteSingleIndex)) {
if (shouldForcePrimaryRouting(clusterState.getMetadata(), request.realtime(), request.preference(), concreteSingleIndex)) {
request.preference(Preference.PRIMARY.type());
}
shardRequest = new MultiGetShardRequest(request, shardId.getIndexName(), shardId.getId());
Expand Down
16 changes: 0 additions & 16 deletions server/src/main/java/org/opensearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.discovery.Discovery;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -410,21 +409,6 @@ public boolean supersedes(ClusterState other) {

}

/**
* Utility to identify whether input index belongs to SEGMENT replication in established cluster state.
*
* @param indexName Index name
* @return true if index belong SEGMENT replication, false otherwise
*/
public boolean isSegmentReplicationEnabled(String indexName) {
return Optional.ofNullable(this.getMetadata().index(indexName))
.map(
indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))
.equals(ReplicationType.SEGMENT)
)
.orElse(false);
}

/**
* Metrics for cluster state.
*
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/cluster/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.gateway.MetadataStateFormat;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.MapperPlugin;

import java.io.IOException;
Expand Down Expand Up @@ -107,6 +108,22 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
public static final String UNKNOWN_CLUSTER_UUID = Strings.UNKNOWN_UUID_VALUE;
public static final Pattern NUMBER_PATTERN = Pattern.compile("[0-9]+$");

/**
* Utility to identify whether input index uses SEGMENT replication strategy in established cluster state metadata.
* Note: Method intended for use by other plugins as well.
*
* @param indexName Index name
* @return true if index uses SEGMENT replication, false otherwise
*/
public boolean isSegmentReplicationEnabled(String indexName) {
return Optional.ofNullable(index(indexName))
.map(
indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))
.equals(ReplicationType.SEGMENT)
)
.orElse(false);
}

/**
* Context of the XContent.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
* @opensearch.internal
*/
public class RoutingNodes implements Iterable<RoutingNode> {
private final Metadata metadata;

private final Map<String, RoutingNode> nodesToShards = new HashMap<>();

Expand All @@ -107,6 +108,7 @@ public RoutingNodes(ClusterState clusterState) {
}

public RoutingNodes(ClusterState clusterState, boolean readOnly) {
this.metadata = clusterState.getMetadata();
this.readOnly = readOnly;
final RoutingTable routingTable = clusterState.routingTable();
this.nodesPerAttributeNames = Collections.synchronizedMap(new HashMap<>());
Expand Down Expand Up @@ -368,9 +370,9 @@ public ShardRouting activePrimary(ShardId shardId) {
* Returns one active replica shard for the given shard id or <code>null</code> if
* no active replica is found.
*
* Since replicas could possibly be on nodes with a older version of OpenSearch than
* the primary is, this will return replicas on the highest version of OpenSearch.
*
* Since replicas could possibly be on nodes with an older version of OpenSearch than
* the primary is, this will return replicas on the highest version of OpenSearch when document
* replication is enabled.
*/
public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) {
// It's possible for replicaNodeVersion to be null, when disassociating dead nodes
Expand All @@ -390,6 +392,30 @@ public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) {
.orElse(null);
}

/**
* Returns one active replica shard for the given shard id or <code>null</code> if
* no active replica is found.
*
* Since replicas could possibly be on nodes with a higher version of OpenSearch than
* the primary is, this will return replicas on the oldest version of OpenSearch when segment
* replication is enabled to allow for replica to read segments from primary.
*
*/
public ShardRouting activeReplicaWithOldestVersion(ShardId shardId) {
// It's possible for replicaNodeVersion to be null. Therefore, we need to protect against the version being null
// (meaning the node will be going away).
return assignedShards(shardId).stream()
.filter(shr -> !shr.primary() && shr.active())
.filter(shr -> node(shr.currentNodeId()) != null)
.min(
Comparator.comparing(
shr -> node(shr.currentNodeId()).node(),
Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion))
)
)
.orElse(null);
}

/**
* Returns <code>true</code> iff all replicas are active for the given shard routing. Otherwise <code>false</code>
*/
Expand Down Expand Up @@ -724,7 +750,12 @@ private void unassignPrimaryAndPromoteActiveReplicaIfExists(
RoutingChangesObserver routingChangesObserver
) {
assert failedShard.primary();
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
ShardRouting activeReplica;
if (metadata.isSegmentReplicationEnabled(failedShard.getIndexName())) {
activeReplica = activeReplicaWithOldestVersion(failedShard.shardId());
} else {
activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
}
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,24 @@ private static ClusterState clusterState(ReplicationType replicationType) {

public void testShouldForcePrimaryRouting() {

ClusterState clusterState = clusterState(ReplicationType.SEGMENT);
Metadata metadata = clusterState(ReplicationType.SEGMENT).getMetadata();

// should return false since preference is set for request
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, Preference.REPLICA.type(), "index1"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, Preference.REPLICA.type(), "index1"));

// should return false since request is not realtime
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, false, null, "index1"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, false, null, "index1"));

// should return true since segment replication is enabled
assertTrue(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index1"));
assertTrue(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1"));

// should return false since index doesn't exist
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index3"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index3"));

clusterState = clusterState(ReplicationType.DOCUMENT);
metadata = clusterState(ReplicationType.DOCUMENT).getMetadata();

// should fail since document replication enabled
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index1"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1"));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.OperationRouting;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
Expand All @@ -58,6 +59,7 @@
import org.opensearch.core.tasks.TaskId;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
import org.opensearch.test.OpenSearchTestCase;
Expand All @@ -68,6 +70,7 @@
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -91,39 +94,16 @@ public class TransportMultiGetActionTests extends OpenSearchTestCase {
private static TransportMultiGetAction transportAction;
private static TransportShardMultiGetAction shardAction;

@BeforeClass
public static void beforeClass() throws Exception {
threadPool = new TestThreadPool(TransportMultiGetActionTests.class.getSimpleName());

transportService = new TransportService(
Settings.EMPTY,
mock(Transport.class),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(
Settings.builder().put("node.name", "node1").build(),
boundAddress.publishAddress(),
randomBase64UUID()
),
null,
emptySet()
) {
@Override
public TaskManager getTaskManager() {
return taskManager;
}
};

final Index index1 = new Index("index1", randomBase64UUID());
final Index index2 = new Index("index2", randomBase64UUID());
final ClusterState clusterState = ClusterState.builder(new ClusterName(TransportMultiGetActionTests.class.getSimpleName()))
private static ClusterState clusterState(ReplicationType replicationType, Index index1, Index index2) throws IOException {
return ClusterState.builder(new ClusterName(TransportMultiGetActionTests.class.getSimpleName()))
.metadata(
new Metadata.Builder().put(
new IndexMetadata.Builder(index1.getName()).settings(
Settings.builder()
.put("index.version.created", Version.CURRENT)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, replicationType)
.put(IndexMetadata.SETTING_INDEX_UUID, index1.getUUID())
)
.putMapping(
Expand All @@ -149,6 +129,7 @@ public TaskManager getTaskManager() {
.put("index.version.created", Version.CURRENT)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, replicationType)
.put(IndexMetadata.SETTING_INDEX_UUID, index1.getUUID())
)
.putMapping(
Expand All @@ -170,6 +151,34 @@ public TaskManager getTaskManager() {
)
)
.build();
}

@BeforeClass
public static void beforeClass() throws Exception {
threadPool = new TestThreadPool(TransportMultiGetActionTests.class.getSimpleName());

transportService = new TransportService(
Settings.EMPTY,
mock(Transport.class),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(
Settings.builder().put("node.name", "node1").build(),
boundAddress.publishAddress(),
randomBase64UUID()
),
null,
emptySet()
) {
@Override
public TaskManager getTaskManager() {
return taskManager;
}
};

final Index index1 = new Index("index1", randomBase64UUID());
final Index index2 = new Index("index2", randomBase64UUID());
ClusterState clusterState = clusterState(randomBoolean() ? ReplicationType.SEGMENT : ReplicationType.DOCUMENT, index1, index2);

final ShardIterator index1ShardIterator = mock(ShardIterator.class);
when(index1ShardIterator.shardId()).thenReturn(new ShardId(index1, randomInt()));
Expand Down Expand Up @@ -285,6 +294,30 @@ protected void executeShardAction(

}

public void testShouldForcePrimaryRouting() throws IOException {
final Index index1 = new Index("index1", randomBase64UUID());
final Index index2 = new Index("index2", randomBase64UUID());
Metadata metadata = clusterState(ReplicationType.SEGMENT, index1, index2).getMetadata();

// should return false since preference is set for request
assertFalse(TransportMultiGetAction.shouldForcePrimaryRouting(metadata, true, Preference.REPLICA.type(), "index1"));

// should return false since request is not realtime
assertFalse(TransportMultiGetAction.shouldForcePrimaryRouting(metadata, false, null, "index2"));

// should return true since segment replication is enabled
assertTrue(TransportMultiGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1"));

// should return false since index doesn't exist
assertFalse(TransportMultiGetAction.shouldForcePrimaryRouting(metadata, true, null, "index3"));

metadata = clusterState(ReplicationType.DOCUMENT, index1, index2).getMetadata();

// should fail since document replication enabled
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1"));

}

private static Task createTask() {
return new Task(
randomLong(),
Expand Down

0 comments on commit ff65403

Please sign in to comment.