Skip to content

Commit

Permalink
fetching term information from master using light-weight call before …
Browse files Browse the repository at this point in the history
…fetch of cluster-state
  • Loading branch information
rajiv-kv committed Feb 8, 2024
1 parent 4dcad6d commit accb5e8
Show file tree
Hide file tree
Showing 8 changed files with 439 additions and 40 deletions.
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@
import org.opensearch.action.admin.cluster.snapshots.status.TransportSnapshotsStatusAction;
import org.opensearch.action.admin.cluster.state.ClusterStateAction;
import org.opensearch.action.admin.cluster.state.TransportClusterStateAction;
import org.opensearch.action.admin.cluster.state.term.ClusterTermVersionAction;
import org.opensearch.action.admin.cluster.state.term.TransportClusterTermVersionAction;
import org.opensearch.action.admin.cluster.stats.ClusterStatsAction;
import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction;
import org.opensearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction;
Expand Down Expand Up @@ -607,6 +609,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class);
actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
actions.register(ClusterTermVersionAction.INSTANCE, TransportClusterTermVersionAction.class);
actions.register(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);
actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,8 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi
return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false);
}

@Override
protected boolean checkTermVersion() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.action.admin.cluster.state.term;

import org.opensearch.action.ActionType;

/**
* Transport action for fetching cluster term
*
* @opensearch.internal
*/
public class ClusterTermVersionAction extends ActionType<ClusterTermVersionResponse> {

public static final ClusterTermVersionAction INSTANCE = new ClusterTermVersionAction();
public static final String NAME = "cluster:monitor/term";

private ClusterTermVersionAction() {
super(NAME, ClusterTermVersionResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.action.admin.cluster.state.term;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
import org.opensearch.core.common.io.stream.StreamInput;

import java.io.IOException;

/**
* Request object to get cluster term
*
* @opensearch.internal
*/
public class ClusterTermVersionRequest extends ClusterManagerNodeReadRequest<ClusterTermVersionRequest> {

public ClusterTermVersionRequest() {}

public ClusterTermVersionRequest(StreamInput in) throws IOException {
super(in);
}

@Override
public ActionRequestValidationException validate() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.action.admin.cluster.state.term;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Response object of cluster term
*
* @opensearch.internal
*/
public class ClusterTermVersionResponse extends ActionResponse {

protected DiscoveryNode sourceNode;

protected long term;
protected long version;

public ClusterTermVersionResponse(DiscoveryNode sourceNode, long term, long version) {
this.sourceNode = sourceNode;
this.term = term;
this.version = version;
}

public ClusterTermVersionResponse(StreamInput in) throws IOException {
super(in);
this.sourceNode = new DiscoveryNode(in);
this.term = in.readLong();
this.version = in.readLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
sourceNode.writeTo(out);
out.writeLong(term);
out.writeLong(version);
}

public long getTerm() {
return term;
}

public long getVersion() {
return version;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.action.admin.cluster.state.term;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

/**
* Transport action for obtaining cluster term and version from cluster-manager
*
* @opensearch.internal
*/
public class TransportClusterTermVersionAction extends TransportClusterManagerNodeReadAction<
ClusterTermVersionRequest,
ClusterTermVersionResponse> {

private final Logger logger = LogManager.getLogger(getClass());

@Inject
public TransportClusterTermVersionAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
ClusterTermVersionAction.NAME,
false,
transportService,
clusterService,
threadPool,
actionFilters,
ClusterTermVersionRequest::new,
indexNameExpressionResolver
);
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
public ClusterTermVersionResponse read(StreamInput in) throws IOException {
return new ClusterTermVersionResponse(in);
}

@Override
protected ClusterBlockException checkBlock(ClusterTermVersionRequest request, ClusterState state) {
return null;
}

@Override
protected void clusterManagerOperation(
ClusterTermVersionRequest request,
ClusterState state,
ActionListener<ClusterTermVersionResponse> listener
) throws Exception {
ActionListener.completeWith(listener, () -> buildResponse(request, state));
}

private ClusterTermVersionResponse buildResponse(ClusterTermVersionRequest request, ClusterState state) {
logger.trace("Serving cluster term version request using term {} and version {}", state.term(), state.version());
return new ClusterTermVersionResponse(state.getNodes().getClusterManagerNode(), state.term(), state.getVersion());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** Cluster Term transport handler. */
package org.opensearch.action.admin.cluster.state.term;

0 comments on commit accb5e8

Please sign in to comment.