Skip to content

Commit

Permalink
Initial commit for api versioning
Browse files Browse the repository at this point in the history
Signed-off-by: Anshu Agarwal <anshukag@amazon.com>
  • Loading branch information
Anshu Agarwal committed Nov 15, 2022
1 parent 650039c commit 6ab7e70
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 32 deletions.
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.OpenSearchParseException;
import org.opensearch.action.ActionResponse;

import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -34,6 +35,7 @@ public class ClusterGetWeightedRoutingResponse extends ActionResponse implements
private WeightedRouting weightedRouting;
private String localNodeWeight;
private static final String NODE_WEIGHT = "node_weight";
private long version;

public String getLocalNodeWeight() {
return localNodeWeight;
Expand All @@ -43,14 +45,16 @@ public String getLocalNodeWeight() {
this.weightedRouting = null;
}

public ClusterGetWeightedRoutingResponse(String localNodeWeight, WeightedRouting weightedRouting) {
public ClusterGetWeightedRoutingResponse(String localNodeWeight, WeightedRouting weightedRouting, long version) {
this.localNodeWeight = localNodeWeight;
this.weightedRouting = weightedRouting;
this.version = version;
}

ClusterGetWeightedRoutingResponse(StreamInput in) throws IOException {
if (in.available() != 0) {
this.weightedRouting = new WeightedRouting(in);
this.version = in.readLong();
}
}

Expand All @@ -67,6 +71,7 @@ public WeightedRouting weights() {
public void writeTo(StreamOutput out) throws IOException {
if (weightedRouting != null) {
weightedRouting.writeTo(out);
out.writeLong(version);
}
}

Expand All @@ -80,6 +85,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (localNodeWeight != null) {
builder.field(NODE_WEIGHT, localNodeWeight);
}
builder.field(WeightedRoutingMetadata.VERSION, version);
}
builder.endObject();
return builder;
Expand All @@ -91,6 +97,7 @@ public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser pars
String attrKey = null, attrValue = null;
String localNodeWeight = null;
Map<String, Double> weights = new HashMap<>();
long version = -1;

while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
Expand All @@ -99,6 +106,8 @@ public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser pars
attrValue = parser.text();
if (attrKey != null && attrKey.equals(NODE_WEIGHT)) {
localNodeWeight = attrValue;
} else if (attrKey != null && attrKey.equals(WeightedRoutingMetadata.VERSION)) {
version = Long.parseLong(attrValue);
} else if (attrKey != null) {
weights.put(attrKey, Double.parseDouble(attrValue));
}
Expand All @@ -107,7 +116,7 @@ public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser pars
}
}
WeightedRouting weightedRouting = new WeightedRouting("", weights);
return new ClusterGetWeightedRoutingResponse(localNodeWeight, weightedRouting);
return new ClusterGetWeightedRoutingResponse(localNodeWeight, weightedRouting, version);
}

@Override
Expand Down
Expand Up @@ -100,8 +100,13 @@ protected void clusterManagerOperation(
weight = weightedRouting.weights().get(attrVal).toString();
}
}

}
clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse(weight, weightedRouting);
clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse(
weight,
weightedRouting,
weightedRoutingMetadata.getVersion()
);
}
listener.onResponse(clusterGetWeightedRoutingResponse);
} catch (Exception ex) {
Expand Down
Expand Up @@ -43,6 +43,15 @@ public class ClusterPutWeightedRoutingRequest extends ClusterManagerNodeRequest<

private WeightedRouting weightedRouting;
private String attributeName;
private long version;

public void version(long version) {
this.version = version;
}

public long getVersion() {
return this.version;
}

public ClusterPutWeightedRoutingRequest() {}

Expand All @@ -62,6 +71,7 @@ public void attributeName(String attributeName) {
public ClusterPutWeightedRoutingRequest(StreamInput in) throws IOException {
super(in);
weightedRouting = new WeightedRouting(in);
version = in.readLong();
}

public ClusterPutWeightedRoutingRequest(String attributeName) {
Expand Down Expand Up @@ -163,6 +173,7 @@ public ClusterPutWeightedRoutingRequest source(Map<String, String> source) {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
weightedRouting.writeTo(out);
out.writeLong(version);
}

@Override
Expand Down
Expand Up @@ -30,4 +30,8 @@ public ClusterPutWeightedRoutingRequestBuilder setWeightedRouting(WeightedRoutin
return this;
}

public ClusterPutWeightedRoutingRequestBuilder setVersion(long version) {
request.version(version);
return this;
}
}
Expand Up @@ -36,6 +36,14 @@ public class WeightedRoutingMetadata extends AbstractNamedDiffable<Metadata.Cust
private static final Logger logger = LogManager.getLogger(WeightedRoutingMetadata.class);
public static final String TYPE = "weighted_shard_routing";
public static final String AWARENESS = "awareness";
public static final String VERSION = "_version";
public static final long INITIAL_VERSION = 0;

public long getVersion() {
return version;
}

private long version;
private WeightedRouting weightedRouting;

public WeightedRouting getWeightedRouting() {
Expand All @@ -50,11 +58,13 @@ public WeightedRoutingMetadata setWeightedRouting(WeightedRouting weightedRoutin
public WeightedRoutingMetadata(StreamInput in) throws IOException {
if (in.available() != 0) {
this.weightedRouting = new WeightedRouting(in);
this.version = in.readLong();
}
}

public WeightedRoutingMetadata(WeightedRouting weightedRouting) {
public WeightedRoutingMetadata(WeightedRouting weightedRouting, long version) {
this.weightedRouting = weightedRouting;
this.version = version;
}

@Override
Expand All @@ -76,6 +86,7 @@ public Version getMinimalSupportedVersion() {
public void writeTo(StreamOutput out) throws IOException {
if (weightedRouting != null) {
weightedRouting.writeTo(out);
out.writeLong(version);
}
}

Expand All @@ -91,10 +102,18 @@ public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws
WeightedRouting weightedRouting = null;
XContentParser.Token token;
String awarenessField = null;
String versionAttr = null;
long version = 0;

while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
awarenessField = parser.currentName();
String attr = parser.currentName();
if (attr != null && attr.equals(VERSION)) {
versionAttr = parser.currentName();
continue;
} else {
awarenessField = parser.currentName();
}
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new OpenSearchParseException(
"failed to parse weighted routing metadata [{}], expected " + "object",
Expand All @@ -112,9 +131,15 @@ public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
attrKey = parser.currentName();

} else if (token == XContentParser.Token.VALUE_NUMBER) {
attrValue = Double.parseDouble(parser.text());
weights.put(attrKey, attrValue);
if (attrKey != null && attrKey.equals("_version")) {
version = Long.parseLong(parser.text());
} else {
attrValue = Double.parseDouble(parser.text());
weights.put(attrKey, attrValue);
}

} else {
throw new OpenSearchParseException(
"failed to parse weighted routing metadata attribute " + "[{}], unknown type",
Expand All @@ -123,10 +148,14 @@ public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws
}
}
}
} else if (token == XContentParser.Token.VALUE_STRING) {
if (versionAttr.equals(VERSION)) {
version = Long.parseLong(parser.text());
}
}
}
weightedRouting = new WeightedRouting(attributeName, weights);
return new WeightedRoutingMetadata(weightedRouting);
return new WeightedRoutingMetadata(weightedRouting, version);
}

@Override
Expand All @@ -144,18 +173,19 @@ public int hashCode() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
toXContent(weightedRouting, builder);
toXContent(weightedRouting, builder, version);
return builder;
}

public static void toXContent(WeightedRouting weightedRouting, XContentBuilder builder) throws IOException {
public static void toXContent(WeightedRouting weightedRouting, XContentBuilder builder, long version) throws IOException {
builder.startObject(AWARENESS);
builder.startObject(weightedRouting.attributeName());
for (Map.Entry<String, Double> entry : weightedRouting.weights().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
builder.endObject();
builder.field(VERSION, version);
}

@Override
Expand Down
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.shards.routing.weighted.WeightedRoutingVersionMismatchException;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
Expand Down Expand Up @@ -66,28 +67,56 @@ public void registerWeightedRoutingMetadata(
final ClusterPutWeightedRoutingRequest request,
final ActionListener<ClusterStateUpdateResponse> listener
) {
final WeightedRoutingMetadata newWeightedRoutingMetadata = new WeightedRoutingMetadata(request.getWeightedRouting());
final WeightedRouting newWeightedRouting = new WeightedRouting(request.getWeightedRouting());

final long requestVersion = request.getVersion();
clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
public ClusterState execute(ClusterState currentState) throws Exception {
// verify currently no decommission action is ongoing
ensureNoOngoingDecommissionAction(currentState);
Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
Metadata metadata;
Metadata.Builder mdBuilder;

metadata = currentState.metadata();
mdBuilder = Metadata.builder(currentState.metadata());
WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE);

if (weightedRoutingMetadata == null) {
logger.info("put weighted routing weights in metadata [{}]", request.getWeightedRouting());
weightedRoutingMetadata = new WeightedRoutingMetadata(request.getWeightedRouting());
logger.info("put weighted routing weights in metadata [{}]", newWeightedRouting);
weightedRoutingMetadata = new WeightedRoutingMetadata(
newWeightedRouting,
WeightedRoutingMetadata.INITIAL_VERSION
);
} else if (weightedRoutingMetadata.getVersion() != requestVersion) {
throw new WeightedRoutingVersionMismatchException(
String.format(
Locale.ROOT,
"weighted routing "
+ "version in request is %s but cluster weighted routing metadata is at a different version %s ",
requestVersion,
weightedRoutingMetadata.getVersion()
)
);

} else {
if (!checkIfSameWeightsInMetadata(newWeightedRoutingMetadata, weightedRoutingMetadata)) {
logger.info("updated weighted routing weights [{}] in metadata", request.getWeightedRouting());
weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRoutingMetadata.getWeightedRouting());
if (!checkIfSameWeightsInMetadata(request.getWeightedRouting(), weightedRoutingMetadata.getWeightedRouting())) {
logger.info("updated weighted routing weights [{}] in metadata", newWeightedRouting);
weightedRoutingMetadata = new WeightedRoutingMetadata(
newWeightedRouting,
weightedRoutingMetadata.getVersion() + 1
);
} else {
logger.info(
"weights are same, not updating weighted routing weights [{}] in metadata",
newWeightedRouting
);
return currentState;
}
}

mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata);
logger.info("building cluster state with weighted routing weights [{}]", request.getWeightedRouting());
logger.info("building cluster state with weighted routing weights [{}]", newWeightedRouting);
return ClusterState.builder(currentState).metadata(mdBuilder).build();
}

Expand All @@ -105,11 +134,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

private boolean checkIfSameWeightsInMetadata(
WeightedRoutingMetadata newWeightedRoutingMetadata,
WeightedRoutingMetadata oldWeightedRoutingMetadata
) {
return newWeightedRoutingMetadata.getWeightedRouting().equals(oldWeightedRoutingMetadata.getWeightedRouting());
private boolean checkIfSameWeightsInMetadata(WeightedRouting newWeights, WeightedRouting oldWeights) {
return newWeights.equals(oldWeights);
}

public void deleteWeightedRoutingMetadata(
Expand Down
Expand Up @@ -51,6 +51,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli

public static ClusterPutWeightedRoutingRequest createRequest(RestRequest request) throws IOException {
ClusterPutWeightedRoutingRequest putWeightedRoutingRequest = Requests.putWeightedRoutingRequest(request.param("attribute"));
putWeightedRoutingRequest.version(request.paramAsLong("_version", -1));
request.applyContentParser(p -> putWeightedRoutingRequest.source(p.mapStrings()));
return putWeightedRoutingRequest;
}
Expand Down
Expand Up @@ -21,7 +21,7 @@ public class ClusterGetWeightedRoutingResponseTests extends AbstractXContentTest
protected ClusterGetWeightedRoutingResponse createTestInstance() {
Map<String, Double> weights = Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 1.0);
WeightedRouting weightedRouting = new WeightedRouting("", weights);
ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse("1", weightedRouting);
ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse("1", weightedRouting, 1);
return response;
}

Expand Down
Expand Up @@ -174,7 +174,7 @@ private ClusterState setLocalNode(ClusterState clusterState, String nodeId) {

private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map<String, Double> weights) {
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting);
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0);
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata());
metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata);
clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build();
Expand Down
Expand Up @@ -396,7 +396,7 @@ public void onFailure(Exception e) {
private void setWeightedRoutingWeights(Map<String, Double> weights) {
ClusterState clusterState = clusterService.state();
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting);
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0);
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata());
metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata);
clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build();
Expand Down
Expand Up @@ -20,7 +20,7 @@ public class WeightedRoutingMetadataTests extends AbstractXContentTestCase<Weigh
protected WeightedRoutingMetadata createTestInstance() {
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting);
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0);
return weightedRoutingMetadata;
}

Expand Down

0 comments on commit 6ab7e70

Please sign in to comment.