Skip to content

Commit

Permalink
Light weight Transport action to verify local term before fetching cl…
Browse files Browse the repository at this point in the history
…uster-state from remote (#12252) (#12825)

Signed-off-by: Rajiv Kumar Vaidyanathan <rajivkv@amazon.com>
  • Loading branch information
rajiv-kv committed Mar 21, 2024
1 parent e2ca2f1 commit 7c80c5f
Show file tree
Hide file tree
Showing 16 changed files with 1,111 additions and 62 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583))
- Add Remote Store Migration Experimental flag and allow mixed mode clusters under same ([#11986](https://github.com/opensearch-project/OpenSearch/pull/11986))
- Built-in secure transports support ([#12435](https://github.com/opensearch-project/OpenSearch/pull/12435))
- Lightweight Transport action to verify local term before fetching cluster-state from remote ([#12252](https://github.com/opensearch-project/OpenSearch/pull/12252/))

### Dependencies
- Bump `com.squareup.okio:okio` from 3.7.0 to 3.8.0 ([#12290](https://github.com/opensearch-project/OpenSearch/pull/12290))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.state;

import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.cluster.state.term.GetTermVersionAction;
import org.opensearch.action.admin.cluster.state.term.GetTermVersionResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.is;

@SuppressWarnings("unchecked")
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class FetchByTermVersionIT extends OpenSearchIntegTestCase {

AtomicBoolean isTermVersionCheckEnabled = new AtomicBoolean();

protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(MockTransportService.TestPlugin.class);
}

AtomicBoolean forceFetchFromCM = new AtomicBoolean();

public void testClusterStateResponseFromDataNode() throws Exception {
String cm = internalCluster().startClusterManagerOnlyNode();
List<String> dns = internalCluster().startDataOnlyNodes(5);
int numberOfShards = dns.size();
stubClusterTermResponse(cm);

ensureClusterSizeConsistency();
ensureGreen();

List<String> indices = new ArrayList<>();

// Create a large sized cluster-state by creating field mappings
IntStream.range(0, 20).forEachOrdered(n -> {
String index = "index_" + n;
createIndex(
index,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE)
.build()
);
indices.add(index);
});
IntStream.range(0, 5).forEachOrdered(n -> {
List<String> mappings = new ArrayList<>();
for (int i = 0; i < 2000; i++) {
mappings.add("t-123456789-123456789-" + n + "-" + i);
mappings.add("type=keyword");
}
PutMappingRequest request = new PutMappingRequest().source(mappings.toArray(new String[0]))
.indices(indices.toArray(new String[0]));
internalCluster().dataNodeClient().admin().indices().putMapping(request).actionGet();
});
ensureGreen();

ClusterStateResponse stateResponseM = internalCluster().clusterManagerClient()
.admin()
.cluster()
.state(new ClusterStateRequest())
.actionGet();

waitUntil(() -> {
ClusterStateResponse stateResponseD = internalCluster().dataNodeClient()
.admin()
.cluster()
.state(new ClusterStateRequest())
.actionGet();
return stateResponseD.getState().stateUUID().equals(stateResponseM.getState().stateUUID());
});
// cluster state response time with term check enabled on datanode
isTermVersionCheckEnabled.set(true);
{
List<Long> latencies = new ArrayList<>();
IntStream.range(0, 50).forEachOrdered(n1 -> {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
long start = System.currentTimeMillis();
ClusterStateResponse stateResponse = dataNodeClient().admin().cluster().state(clusterStateRequest).actionGet();
latencies.add(System.currentTimeMillis() - start);
assertThat(stateResponse.getClusterName().value(), is(internalCluster().getClusterName()));
assertThat(stateResponse.getState().nodes().getSize(), is(internalCluster().getNodeNames().length));
assertThat(stateResponse.getState().metadata().indices().size(), is(indices.size()));
Map<String, Object> fieldMappings = (Map<String, Object>) stateResponse.getState()
.metadata()
.index(indices.get(0))
.mapping()
.sourceAsMap()
.get("properties");

assertThat(fieldMappings.size(), is(10000));
});
Collections.sort(latencies);

logger.info("cluster().state() fetch with Term Version enabled took {} milliseconds", (latencies.get(latencies.size() / 2)));
}
// cluster state response time with term check disabled on datanode
isTermVersionCheckEnabled.set(false);
{
List<Long> latencies = new ArrayList<>();
IntStream.range(0, 50).forEachOrdered(n1 -> {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
long start = System.currentTimeMillis();
ClusterStateResponse stateResponse = dataNodeClient().admin().cluster().state(clusterStateRequest).actionGet();
latencies.add(System.currentTimeMillis() - start);
assertThat(stateResponse.getClusterName().value(), is(internalCluster().getClusterName()));
assertThat(stateResponse.getState().nodes().getSize(), is(internalCluster().getNodeNames().length));
assertThat(stateResponse.getState().metadata().indices().size(), is(indices.size()));
Map<String, Object> typeProperties = (Map<String, Object>) stateResponse.getState()
.metadata()
.index(indices.get(0))
.mapping()
.sourceAsMap()
.get("properties");
assertThat(typeProperties.size(), is(10000));

});
Collections.sort(latencies);
logger.info("cluster().state() fetch with Term Version disabled took {} milliseconds", (latencies.get(latencies.size() / 2)));
}

}

private void stubClusterTermResponse(String master) {
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master);
primaryService.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> {
if (isTermVersionCheckEnabled.get()) {
handler.messageReceived(request, channel, task);
} else {
// always return response that does not match
channel.sendResponse(new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1)));
}
});
}
}
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@
import org.opensearch.action.admin.cluster.snapshots.status.TransportSnapshotsStatusAction;
import org.opensearch.action.admin.cluster.state.ClusterStateAction;
import org.opensearch.action.admin.cluster.state.TransportClusterStateAction;
import org.opensearch.action.admin.cluster.state.term.GetTermVersionAction;
import org.opensearch.action.admin.cluster.state.term.TransportGetTermVersionAction;
import org.opensearch.action.admin.cluster.stats.ClusterStatsAction;
import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction;
import org.opensearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction;
Expand Down Expand Up @@ -607,6 +609,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class);
actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
actions.register(GetTermVersionAction.INSTANCE, TransportGetTermVersionAction.class);
actions.register(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);
actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,12 @@ protected void clusterManagerOperation(
? clusterState -> true
: clusterState -> clusterState.metadata().version() >= request.waitForMetadataVersion();

// action will be executed on local node, if either the request is local only (or) the local node has the same cluster-state as
// ClusterManager
final Predicate<ClusterState> acceptableClusterStateOrNotMasterPredicate = request.local()
? acceptableClusterStatePredicate
: acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedClusterManager() == false);
|| !state.nodes().isLocalNodeElectedClusterManager()
? acceptableClusterStatePredicate
: acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedClusterManager() == false);

if (acceptableClusterStatePredicate.test(state)) {
ActionListener.completeWith(listener, () -> buildResponse(request, state));
Expand Down Expand Up @@ -231,4 +234,8 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi
return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false);
}

@Override
protected boolean localExecuteSupportedByAction() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.state.term;

import org.opensearch.action.ActionType;

/**
* Transport action for fetching cluster term and version
*
* @opensearch.internal
*/
public class GetTermVersionAction extends ActionType<GetTermVersionResponse> {

public static final GetTermVersionAction INSTANCE = new GetTermVersionAction();
public static final String NAME = "cluster:monitor/term";

private GetTermVersionAction() {
super(NAME, GetTermVersionResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.state.term;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
import org.opensearch.core.common.io.stream.StreamInput;

import java.io.IOException;

/**
* Request object to get cluster term and version
*
* @opensearch.internal
*/
public class GetTermVersionRequest extends ClusterManagerNodeReadRequest<GetTermVersionRequest> {

public GetTermVersionRequest() {}

public GetTermVersionRequest(StreamInput in) throws IOException {
super(in);
}

@Override
public ActionRequestValidationException validate() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.state.term;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Response object of cluster term
*
* @opensearch.internal
*/
public class GetTermVersionResponse extends ActionResponse {

private final ClusterStateTermVersion clusterStateTermVersion;

public GetTermVersionResponse(ClusterStateTermVersion clusterStateTermVersion) {
this.clusterStateTermVersion = clusterStateTermVersion;
}

public GetTermVersionResponse(StreamInput in) throws IOException {
super(in);
this.clusterStateTermVersion = new ClusterStateTermVersion(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterStateTermVersion.writeTo(out);
}

public ClusterStateTermVersion getClusterStateTermVersion() {
return clusterStateTermVersion;
}

public boolean matches(ClusterState clusterState) {
return clusterStateTermVersion != null && clusterStateTermVersion.equals(new ClusterStateTermVersion(clusterState));
}

}

0 comments on commit 7c80c5f

Please sign in to comment.