Skip to content

Commit

Permalink
The reroute command allows to explcitiyly execute a cluster reroute a…
Browse files Browse the repository at this point in the history
…llocation command including specific commands. For example, a shard can be moved from one node to another explicitly, an allocation can be canceled, or an unassigned shard can be explicitly allocated on a specific node.

Here is a short example of how a simple reroute API call:

    curl -XPOST 'localhost:9200/_cluster/reroute' -d '{
        "commands" : [
            {"move" : {"index" : "test", "shard" : 0, "from_node" : "node1", "to_node" : "node2"}},
            {"allocate" : {"index" : "test", "shard" : 1, "node" : "node3"}}
        ]
    }'

An importnat aspect to remember is the fact that once when an allocation occurs, the cluster will aim at rebalancing its state back to an even state. For example, if the allocation includes moving a shard from `node1` to `node2`, in an "even" state, then another shard will be moved from `node2` to `node1` to even things out.

The cluster can be set to disable allocations, which means that only the explicitl allocations will be performed. Obviously, only once all commands has been applied, the cluster will aim to be rebalance its state.

Anohter option is to run the commands in "dry_run" (as a URI flag, or in the request body). This will cause the commands to apply to the current cluster state, and reutrn the resulting cluster after the comamnds (and rebalancing) has been applied.

The commands supporterd are:

* `move`: Move a started shard from one node to anotehr node. Accepts `index` and `shard` for index name and shard number, `from_node` for the node to move the shard "from", and `to_node` for the node to move the shard to.
* `cancel`: Cancel allocation of a shard (or recovery). Accepts `index` and `shard` for index name and shar number, and `node` for the node to cancel the shard allocation on.
* `allocate`: Allocate an unassigned shard to a node. Accepts the `index` and `shard` for index name and shard number, and `node` to allocate the shard to. It also accepts `allow_primary` flag to explciitly specify that it is allowed to explciitly allocate a primary shard (might result in data loss).

closes elastic#2256
  • Loading branch information
kimchy committed Sep 15, 2012
1 parent 8795d52 commit 72e9c32
Show file tree
Hide file tree
Showing 13 changed files with 424 additions and 37 deletions.
Expand Up @@ -19,20 +19,82 @@

package org.elasticsearch.action.admin.cluster.reroute;

import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;

/**
*/
public class ClusterRerouteRequest extends MasterNodeOperationRequest {

AllocationCommands commands = new AllocationCommands();
boolean dryRun;

public ClusterRerouteRequest() {
}

/**
* Adds allocation commands to be applied to the cluster. Note, can be empty, in which case
* will simply run a simple "reroute".
*/
public ClusterRerouteRequest add(AllocationCommand... commands) {
this.commands.add(commands);
return this;
}

/**
* Sets a dry run flag (defaults to <tt>false</tt>) allowing to run the commands without
* actually applying them to the cluster state, and getting the resulting cluster state back.
*/
public ClusterRerouteRequest dryRun(boolean dryRun) {
this.dryRun = dryRun;
return this;
}

public boolean dryRun() {
return this.dryRun;
}

/**
* Sets the source for the request.
*/
public ClusterRerouteRequest source(BytesReference source) throws Exception {
XContentParser parser = XContentHelper.createParser(source);
try {
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_ARRAY) {
if ("commands".equals(currentFieldName)) {
this.commands = AllocationCommands.fromXContent(parser);
} else {
throw new ElasticSearchParseException("failed to parse reroute request, got start array with wrong field name [" + currentFieldName + "]");
}
} else if (token.isValue()) {
if ("dry_run".equals(currentFieldName) || "dryRun".equals(currentFieldName)) {
dryRun = parser.booleanValue();
} else {
throw new ElasticSearchParseException("failed to parse reroute request, got value with wrong field name [" + currentFieldName + "]");
}
}
}
} finally {
parser.close();
}
return this;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand All @@ -41,10 +103,14 @@ public ActionRequestValidationException validate() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
commands = AllocationCommands.readFrom(in);
dryRun = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
AllocationCommands.writeTo(commands, out);
out.writeBoolean(dryRun);
}
}
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.support.BaseClusterRequestBuilder;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;

/**
Expand All @@ -32,6 +34,29 @@ public ClusterRerouteRequestBuilder(ClusterAdminClient clusterClient) {
super(clusterClient, new ClusterRerouteRequest());
}

/**
* Adds allocation commands to be applied to the cluster. Note, can be empty, in which case
* will simply run a simple "reroute".
*/
public ClusterRerouteRequestBuilder add(AllocationCommand... commands) {
request.add(commands);
return this;
}

/**
* Sets a dry run flag (defaults to <tt>false</tt>) allowing to run the commands without
* actually applying them to the cluster state, and getting the resulting cluster state back.
*/
public ClusterRerouteRequestBuilder setDryRun(boolean dryRun) {
request.dryRun(dryRun);
return this;
}

public ClusterRerouteRequestBuilder setSource(BytesReference source) throws Exception {
request.source(source);
return this;
}

/**
* Sets the master node timeout in case the master has not yet been discovered.
*/
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.cluster.reroute;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

Expand All @@ -29,14 +30,31 @@
*/
public class ClusterRerouteResponse implements ActionResponse {

private ClusterState state;

ClusterRerouteResponse() {

}

ClusterRerouteResponse(ClusterState state) {
this.state = state;
}

public ClusterState state() {
return this.state;
}

public ClusterState getState() {
return this.state;
}

@Override
public void readFrom(StreamInput in) throws IOException {
state = ClusterState.Builder.readFrom(in, null);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
ClusterState.Builder.writeTo(state, out);
}
}
Expand Up @@ -70,19 +70,26 @@ protected ClusterRerouteResponse newResponse() {
}

@Override
protected ClusterRerouteResponse masterOperation(ClusterRerouteRequest request, ClusterState state) throws ElasticSearchException {
protected ClusterRerouteResponse masterOperation(final ClusterRerouteRequest request, ClusterState state) throws ElasticSearchException {
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final AtomicReference<ClusterState> clusterStateResponse = new AtomicReference<ClusterState>();
final CountDownLatch latch = new CountDownLatch(1);

clusterService.submitStateUpdateTask("cluster_reroute (api)", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
RoutingAllocation.Result routingResult = allocationService.reroute(currentState);
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
RoutingAllocation.Result routingResult = allocationService.reroute(currentState, request.commands);
ClusterState newState = newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
clusterStateResponse.set(newState);
if (request.dryRun) {
return currentState;
}
return newState;
} catch (Exception e) {
logger.debug("failed to reroute", e);
failureRef.set(e);
latch.countDown();
logger.warn("failed to reroute", e);
return currentState;
} finally {
// we don't release the latch here, only after we rerouted
Expand All @@ -109,7 +116,7 @@ public void clusterStateProcessed(ClusterState clusterState) {
}
}

return new ClusterRerouteResponse();
return new ClusterRerouteResponse(clusterStateResponse.get());

}
}
Expand Up @@ -113,7 +113,11 @@ public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCom
// a consistent result of the effect the commands have on the routing
// this allows systems to dry run the commands, see the resulting cluster state, and act on it
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes());
// we ignore disable allocation, because commands are explicit
allocation.ignoreDisable(true);
commands.execute(allocation);
// we revert the ignore disable flag, since when rerouting, we want the original setting to take place
allocation.ignoreDisable(false);
// the assumption is that commands will move / act on shards (or fail through exceptions)
// so, there will always be shard "movements", so no need to check on reroute
reroute(allocation);
Expand Down
Expand Up @@ -71,7 +71,7 @@ public AllocationExplanation explanation() {

private Map<ShardId, String> ignoredShardToNodes = null;

private Map<ShardId, String> ignoreDisable = null;
private boolean ignoreDisable = false;

public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes) {
this.deciders = deciders;
Expand Down Expand Up @@ -103,15 +103,12 @@ public AllocationExplanation explanation() {
return explanation;
}

public void addIgnoreDisable(ShardId shardId, String nodeId) {
if (ignoreDisable == null) {
ignoreDisable = new HashMap<ShardId, String>();
}
ignoreDisable.put(shardId, nodeId);
public void ignoreDisable(boolean ignoreDisable) {
this.ignoreDisable = ignoreDisable;
}

public boolean shouldIgnoreDisable(ShardId shardId, String nodeId) {
return ignoreDisable != null && nodeId.equals(ignoreDisable.get(shardId));
public boolean ignoreDisable() {
return this.ignoreDisable;
}

public void addIgnoreShardForNode(ShardId shardId, String nodeId) {
Expand Down
Expand Up @@ -160,7 +160,6 @@ public void execute(RoutingAllocation allocation) throws ElasticSearchException
}

RoutingNode routingNode = allocation.routingNodes().node(discoNode.id());
allocation.addIgnoreDisable(shardRouting.shardId(), routingNode.nodeId());
if (!allocation.deciders().canAllocate(shardRouting, routingNode, allocation).allowed()) {
throw new ElasticSearchIllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed");
}
Expand Down
Expand Up @@ -126,25 +126,25 @@ public static void writeTo(AllocationCommands commands, StreamOutput out) throws
public static AllocationCommands fromXContent(XContentParser parser) throws IOException {
AllocationCommands commands = new AllocationCommands();

XContentParser.Token token = parser.nextToken();
XContentParser.Token token = parser.currentToken();
if (token == null) {
throw new ElasticSearchParseException("No commands");
}
if (token != XContentParser.Token.START_OBJECT) {
throw new ElasticSearchParseException("No start object, got " + token);
}

token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME) {
throw new ElasticSearchParseException("expected the field name `commands` to exists, got " + token);
}
if (!parser.currentName().equals("commands")) {
throw new ElasticSearchParseException("expected field name to be named `commands`, got " + parser.currentName());
}

token = parser.nextToken();
if (token != XContentParser.Token.START_ARRAY) {
throw new ElasticSearchParseException("commands should follow with an array element");
if (token == XContentParser.Token.FIELD_NAME) {
if (!parser.currentName().equals("commands")) {
throw new ElasticSearchParseException("expected field name to be named `commands`, got " + parser.currentName());
}
if (!parser.currentName().equals("commands")) {
throw new ElasticSearchParseException("expected field name to be named `commands`, got " + parser.currentName());
}
token = parser.nextToken();
if (token != XContentParser.Token.START_ARRAY) {
throw new ElasticSearchParseException("commands should follow with an array element");
}
} else if (token == XContentParser.Token.START_ARRAY) {
// ok...
} else {
throw new ElasticSearchParseException("expected either field name commands, or start array, got " + token);
}
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.START_OBJECT) {
Expand All @@ -165,7 +165,6 @@ public static AllocationCommands fromXContent(XContentParser parser) throws IOEx
}

public static void toXContent(AllocationCommands commands, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.startArray("commands");
for (AllocationCommand command : commands.commands) {
builder.startObject();
Expand All @@ -174,6 +173,5 @@ public static void toXContent(AllocationCommands commands, XContentBuilder build
builder.endObject();
}
builder.endArray();
builder.endObject();
}
}
Expand Up @@ -70,10 +70,10 @@ public DisableAllocationDecider(Settings settings, NodeSettingsService nodeSetti
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (disableAllocation) {
return allocation.shouldIgnoreDisable(shardRouting.shardId(), node.nodeId()) ? Decision.YES : Decision.NO;
return allocation.ignoreDisable() ? Decision.YES : Decision.NO;
}
if (disableReplicaAllocation) {
return shardRouting.primary() ? Decision.YES : allocation.shouldIgnoreDisable(shardRouting.shardId(), node.nodeId()) ? Decision.YES : Decision.NO;
return shardRouting.primary() ? Decision.YES : allocation.ignoreDisable() ? Decision.YES : Decision.NO;
}
return Decision.YES;
}
Expand Down
Expand Up @@ -27,30 +27,61 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestXContentBuilder;

import java.io.IOException;

/**
*/
public class RestClusterRerouteAction extends BaseRestHandler {

private final SettingsFilter settingsFilter;

@Inject
public RestClusterRerouteAction(Settings settings, Client client, RestController controller,
SettingsFilter settingsFilter) {
super(settings, client);
this.settingsFilter = settingsFilter;
controller.registerHandler(RestRequest.Method.POST, "/_cluster/reroute", this);
}

@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
final ClusterRerouteRequest clusterRerouteRequest = Requests.clusterRerouteRequest();
clusterRerouteRequest.listenerThreaded(false);
clusterRerouteRequest.dryRun(request.paramAsBoolean("dry_run", clusterRerouteRequest.dryRun()));
if (request.hasContent()) {
try {
clusterRerouteRequest.source(request.content());
} catch (Exception e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.warn("Failed to send response", e1);
}
return;
}
}
client.admin().cluster().reroute(clusterRerouteRequest, new ActionListener<ClusterRerouteResponse>() {
@Override
public void onResponse(ClusterRerouteResponse response) {
try {
channel.sendResponse(new StringRestResponse(RestStatus.OK));
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject();

builder.field("ok", true);
builder.startObject("state");
// by default, filter metadata
if (request.param("filter_metadata") == null) {
request.params().put("filter_metadata", "true");
}
response.state().settingsFilter(settingsFilter).toXContent(builder, request);
builder.endObject();

builder.endObject();
channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder));
} catch (Exception e) {
onFailure(e);
}
Expand Down

0 comments on commit 72e9c32

Please sign in to comment.