Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Prioritize replica shard movement during shard relocation #8875

Merged
merged 6 commits into from
Aug 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [distribution/archives] [Linux] [x64] Provide the variant of the distributions bundled with JRE ([#8195]()https://github.com/opensearch-project/OpenSearch/pull/8195)
- Add configuration for file cache size to max remote data ratio to prevent oversubscription of file cache ([#8606](https://github.com/opensearch-project/OpenSearch/pull/8606))
- Disallow compression level to be set for default and best_compression index codecs ([#8737]()https://github.com/opensearch-project/OpenSearch/pull/8737)
- Prioritize replica shard movement during shard relocation ([#8875](https://github.com/opensearch-project/OpenSearch/pull/8875))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRecoveriesAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
addAllocationDecider(deciders, new NodeVersionAllocationDecider(settings));
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1310,100 +1310,131 @@
}

/**
* Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from
* the first node, then the first shard of the second node, etc. until one shard from each node has been returned.
* The iterator then resumes on the first node by returning the second shard and continues until all shards from
* all the nodes have been returned.
* @param movePrimaryFirst if true, all primary shards are iterated over before iterating replica for any node
* @return iterator of shard routings
* Returns iterator of shard routings used by {@link #nodeInterleavedShardIterator(ShardMovementStrategy)}
* @param primaryFirst true when ShardMovementStrategy = ShardMovementStrategy.PRIMARY_FIRST, false when it is ShardMovementStrategy.REPLICA_FIRST
*/
public Iterator<ShardRouting> nodeInterleavedShardIterator(boolean movePrimaryFirst) {
private Iterator<ShardRouting> buildIteratorForMovementStrategy(boolean primaryFirst) {
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
queue.add(entry.getValue().copyShards().iterator());
}
if (movePrimaryFirst) {
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> replicaShards = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> replicaIterators = new ArrayDeque<>();

public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
if (!replicaShards.isEmpty()) {
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> shardRoutings = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> shardIterators = new ArrayDeque<>();

public boolean hasNext() {
while (queue.isEmpty() == false) {
if (queue.peek().hasNext()) {
return true;
}
while (!replicaIterators.isEmpty()) {
if (replicaIterators.peek().hasNext()) {
return true;
}
replicaIterators.poll();
queue.poll();
}
if (!shardRoutings.isEmpty()) {
return true;
}
while (!shardIterators.isEmpty()) {
if (shardIterators.peek().hasNext()) {
return true;
}
return false;
shardIterators.poll();
}
return false;
}

public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();

Check warning on line 1346 in server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java#L1346

Added line #L1346 was not covered by tests
}
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
if (primaryFirst) {
if (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary()) {
queue.offer(iter);
return result;
}
replicaShards.offer(result);
replicaIterators.offer(iter);
shardRoutings.offer(result);
shardIterators.offer(iter);
}
} else {
Poojita-Raj marked this conversation as resolved.
Show resolved Hide resolved
while (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary() == false) {
queue.offer(iter);
return result;
}
shardRoutings.offer(result);
shardIterators.offer(iter);
}
}
if (!replicaShards.isEmpty()) {
return replicaShards.poll();
}
Iterator<ShardRouting> replicaIterator = replicaIterators.poll();
ShardRouting replicaShard = replicaIterator.next();
replicaIterators.offer(replicaIterator);

assert !replicaShard.primary();
return replicaShard;
}

public void remove() {
throw new UnsupportedOperationException();
if (!shardRoutings.isEmpty()) {
return shardRoutings.poll();
}
};
Iterator<ShardRouting> replicaIterator = shardIterators.poll();
ShardRouting replicaShard = replicaIterator.next();
shardIterators.offer(replicaIterator);

assert replicaShard.primary() != primaryFirst;
return replicaShard;
}

public void remove() {
throw new UnsupportedOperationException();

Check warning on line 1384 in server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java#L1384

Added line #L1384 was not covered by tests
}

};
}

/**
* Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from
* the first node, then the first shard of the second node, etc. until one shard from each node has been returned.
* The iterator then resumes on the first node by returning the second shard and continues until all shards from
* all the nodes have been returned.
* @param shardMovementStrategy if ShardMovementStrategy.PRIMARY_FIRST, all primary shards are iterated over before iterating replica for any node
* if ShardMovementStrategy.REPLICA_FIRST, all replica shards are iterated over before iterating primary for any node
* if ShardMovementStrategy.NO_PREFERENCE, order of replica and primary shards doesn't matter in iteration
* @return iterator of shard routings
*/
public Iterator<ShardRouting> nodeInterleavedShardIterator(ShardMovementStrategy shardMovementStrategy) {
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
queue.add(entry.getValue().copyShards().iterator());
}
if (shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST) {
return buildIteratorForMovementStrategy(true);
} else {
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
if (shardMovementStrategy == ShardMovementStrategy.REPLICA_FIRST) {
return buildIteratorForMovementStrategy(false);
} else {
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
queue.poll();
return false;
}
return false;
}

@Override
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
@Override
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();

Check warning on line 1426 in server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java#L1426

Added line #L1426 was not covered by tests
}
Iterator<ShardRouting> iter = queue.poll();
queue.offer(iter);
return iter.next();
}
Iterator<ShardRouting> iter = queue.poll();
queue.offer(iter);
return iter.next();
}

public void remove() {
throw new UnsupportedOperationException();
}
};
public void remove() {
throw new UnsupportedOperationException();

Check warning on line 1434 in server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java#L1434

Added line #L1434 was not covered by tests
}
};
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;

import java.util.Locale;

/**
* ShardMovementStrategy defines the order in which shard movement occurs.
*
* ShardMovementStrategy values or rather their string representation to be used with
* {@link BalancedShardsAllocator#SHARD_MOVEMENT_STRATEGY_SETTING} via cluster settings.
*
* @opensearch.internal
*/
public enum ShardMovementStrategy {
Poojita-Raj marked this conversation as resolved.
Show resolved Hide resolved
/**
* default behavior in which order of shard movement doesn't matter.
*/
NO_PREFERENCE,

/**
* primary shards are moved first
*/
PRIMARY_FIRST,

/**
* replica shards are moved first
*/
REPLICA_FIRST;

public static ShardMovementStrategy parse(String strValue) {
if (strValue == null) {
return null;

Check warning on line 41 in server/src/main/java/org/opensearch/cluster/routing/ShardMovementStrategy.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/ShardMovementStrategy.java#L41

Added line #L41 was not covered by tests
} else {
strValue = strValue.toUpperCase(Locale.ROOT);
try {
return ShardMovementStrategy.valueOf(strValue);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Illegal allocation.shard_movement_strategy value [" + strValue + "]");

Check warning on line 47 in server/src/main/java/org/opensearch/cluster/routing/ShardMovementStrategy.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/ShardMovementStrategy.java#L46-L47

Added lines #L46 - L47 were not covered by tests
}
}
}

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.lucene.util.IntroSorter;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardMovementStrategy;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus;
Expand Down Expand Up @@ -107,8 +108,22 @@
"cluster.routing.allocation.move.primary_first",
false,
Property.Dynamic,
Property.NodeScope,
Property.Deprecated
);

/**
* Decides order in which to move shards from node when shards can not stay on node anymore. {@link LocalShardsBalancer#moveShards()}
* Encapsulates behavior of above SHARD_MOVE_PRIMARY_FIRST_SETTING.
*/
public static final Setting<ShardMovementStrategy> SHARD_MOVEMENT_STRATEGY_SETTING = new Setting<ShardMovementStrategy>(
"cluster.routing.allocation.shard_movement_strategy",
ShardMovementStrategy.NO_PREFERENCE.toString(),
ShardMovementStrategy::parse,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Float> THRESHOLD_SETTING = Setting.floatSetting(
"cluster.routing.allocation.balance.threshold",
1.0f,
Expand All @@ -131,6 +146,7 @@
);

private volatile boolean movePrimaryFirst;
private volatile ShardMovementStrategy shardMovementStrategy;

private volatile boolean preferPrimaryShardBalance;
private volatile WeightFunction weightFunction;
Expand All @@ -145,8 +161,10 @@
setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings));
setThreshold(THRESHOLD_SETTING.get(settings));
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
}
Expand All @@ -155,6 +173,10 @@
this.movePrimaryFirst = movePrimaryFirst;
}

private void setShardMovementStrategy(ShardMovementStrategy shardMovementStrategy) {
this.shardMovementStrategy = shardMovementStrategy;
}

private void setWeightFunction(float indexBalance, float shardBalanceFactor) {
weightFunction = new WeightFunction(indexBalance, shardBalanceFactor);
}
Expand Down Expand Up @@ -184,6 +206,7 @@
logger,
allocation,
movePrimaryFirst,
shardMovementStrategy,
weightFunction,
threshold,
preferPrimaryShardBalance
Expand All @@ -205,6 +228,7 @@
logger,
allocation,
movePrimaryFirst,
shardMovementStrategy,
weightFunction,
threshold,
preferPrimaryShardBalance
Expand Down Expand Up @@ -456,11 +480,12 @@
Logger logger,
RoutingAllocation allocation,
boolean movePrimaryFirst,
ShardMovementStrategy shardMovementStrategy,
BalancedShardsAllocator.WeightFunction weight,
float threshold,
boolean preferPrimaryBalance
) {
super(logger, allocation, movePrimaryFirst, weight, threshold, preferPrimaryBalance);
super(logger, allocation, movePrimaryFirst, shardMovementStrategy, weight, threshold, preferPrimaryBalance);

Check warning on line 488 in server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java#L488

Added line #L488 was not covered by tests
}
}

Expand Down