Skip to content

Commit

Permalink
[CCR] Added history uuid validation
Browse files Browse the repository at this point in the history
For correctness we need to verify whether the history uuid of the leader
index shards never changes while that index is being followed.

* The history UUIDs are recorded as custom index metadata in the follow index.
* The follow api validates whether the current history UUIDs of the leader
  index shards are the same as the recorded history UUIDs.
  If not the follow api fails.
* While a follow index is following a leader index; shard follow tasks
  on each shard changes api call verify whether their current history uuid
  is the same as the recorded history uuid.

Relates to elastic#30086
  • Loading branch information
martijnvg committed Sep 8, 2018
1 parent f27c3dc commit 61c8ad9
Show file tree
Hide file tree
Showing 15 changed files with 394 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
Expand All @@ -26,6 +27,7 @@

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

public class FollowIndexSecurityIT extends ESRestTestCase {
Expand Down Expand Up @@ -96,16 +98,13 @@ public void testFollowIndex() throws Exception {
assertThat(countCcrNodeTasks(), equalTo(0));
});

createAndFollowIndex("leader_cluster:" + unallowedIndex, unallowedIndex);
// Verify that nothing has been replicated and no node tasks are running
// These node tasks should have been failed due to the fact that the user
// has no sufficient priviledges.
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
verifyDocuments(adminClient(), unallowedIndex, 0);
Exception e = expectThrows(ResponseException.class,
() -> createAndFollowIndex("leader_cluster:" + unallowedIndex, unallowedIndex));
assertThat(e.getMessage(), containsString("action [indices:monitor/stats] is unauthorized for user [test_ccr]"));

followIndex("leader_cluster:" + unallowedIndex, unallowedIndex);
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
verifyDocuments(adminClient(), unallowedIndex, 0);
e = expectThrows(ResponseException.class,
() -> followIndex("leader_cluster:" + unallowedIndex, unallowedIndex));
assertThat(e.getMessage(), containsString("action [indices:monitor/stats] is unauthorized for user [test_ccr]"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@
public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin {

public static final String CCR_THREAD_POOL_NAME = "ccr";
public static final String CCR_CUSTOM_METADATA_KEY = "ccr";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY = "leader_index_history_uuid";

private final boolean enabled;
private final Settings settings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,30 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.XPackPlugin;

import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;

/**
* Encapsulates licensing checking for CCR.
Expand Down Expand Up @@ -62,19 +73,19 @@ public boolean isCcrAllowed() {
* of the specified listener is invoked. Otherwise, the specified consumer is invoked with the leader index metadata fetched from the
* remote cluster.
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param leaderIndex the name of the leader index
* @param listener the listener
* @param leaderIndexMetadataConsumer the leader index metadata consumer
* @param <T> the type of response the listener is waiting for
* @param client the client
* @param clusterAlias the remote cluster alias
* @param leaderIndex the name of the leader index
* @param listener the listener
* @param historyUUIDAndLeaderIndexMetadataConsumer the leader index history uuid and the leader index metadata consumer
* @param <T> the type of response the listener is waiting for
*/
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata(
final Client client,
final String clusterAlias,
final String leaderIndex,
final ActionListener<T> listener,
final Consumer<IndexMetaData> leaderIndexMetadataConsumer) {
final BiConsumer<Map<Integer, String>, IndexMetaData> historyUUIDAndLeaderIndexMetadataConsumer) {
// we have to check the license on the remote cluster
new RemoteClusterLicenseChecker(client, XPackLicenseState::isCcrAllowedForOperationMode).checkRemoteClusterLicenses(
Collections.singletonList(clusterAlias),
Expand All @@ -93,7 +104,23 @@ public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseChe
final ClusterState remoteClusterState = r.getState();
final IndexMetaData leaderIndexMetadata =
remoteClusterState.getMetaData().index(leaderIndex);
leaderIndexMetadataConsumer.accept(leaderIndexMetadata);
CheckedConsumer<IndicesStatsResponse, Exception> indicesStatsHandler = indicesStatsResponse -> {
IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex);
Map<Integer, String> historyUUIDs = new HashMap<>();
for (IndexShardStats indexShardStats : indexStats) {
for (ShardStats shardStats : indexShardStats) {
CommitStats commitStats = shardStats.getCommitStats();
String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY);
ShardId shardId = shardStats.getShardRouting().shardId();
historyUUIDs.put(shardId.id(), historyUUID);
}
}
historyUUIDAndLeaderIndexMetadataConsumer.accept(historyUUIDs, leaderIndexMetadata);
};
IndicesStatsRequest request = new IndicesStatsRequest();
request.indices(leaderIndex);
remoteClient.admin().indices().stats(request,
ActionListener.wrap(indicesStatsHandler, listener::onFailure));
},
listener::onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ActiveShardsObserver;
Expand All @@ -29,25 +34,31 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

public class CreateAndFollowIndexAction extends Action<CreateAndFollowIndexAction.Response> {

Expand Down Expand Up @@ -242,8 +253,12 @@ protected void masterOperation(
private void createFollowerIndexAndFollowLocalIndex(
final Request request, final ClusterState state, final ActionListener<Response> listener) {
// following an index in local cluster, so use local cluster state to fetch leader index metadata
final IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getFollowRequest().getLeaderIndex());
createFollowerIndex(leaderIndexMetadata, request, listener);
final String leaderIndex = request.getFollowRequest().getLeaderIndex();
final IndexMetaData leaderIndexMetadata = state.getMetaData().index(leaderIndex);
Consumer<Map<Integer, String>> handler = historyUUID -> {
createFollowerIndex(leaderIndexMetadata, historyUUID, request, listener);
};
fetchHistoryUUID(client, leaderIndex, handler, listener::onFailure);
}

private void createFollowerIndexAndFollowRemoteIndex(
Expand All @@ -256,11 +271,14 @@ private void createFollowerIndexAndFollowRemoteIndex(
clusterAlias,
leaderIndex,
listener,
leaderIndexMetaData -> createFollowerIndex(leaderIndexMetaData, request, listener));
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));
}

private void createFollowerIndex(
final IndexMetaData leaderIndexMetaData, final Request request, final ActionListener<Response> listener) {
final IndexMetaData leaderIndexMetaData,
final Map<Integer, String> historyUUIDs,
final Request request,
final ActionListener<Response> listener) {
if (leaderIndexMetaData == null) {
listener.onFailure(new IllegalArgumentException("leader index [" + request.getFollowRequest().getLeaderIndex() +
"] does not exist"));
Expand Down Expand Up @@ -296,6 +314,13 @@ public ClusterState execute(ClusterState currentState) throws Exception {
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(followIndex);

// Adding the leader index uuid for each shard as custom metadata:
Map<String, String> metadata = new HashMap<>();
for (Map.Entry<Integer, String> entry : historyUUIDs.entrySet()) {
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_" + entry.getKey(), entry.getValue());
}
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata);

// Copy all settings, but overwrite a few settings.
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(leaderIndexMetaData.getSettings());
Expand Down Expand Up @@ -350,6 +375,29 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowerIndex());
}

// would be great if can reuse some of the logic in CcrLicenseChecker to do remote calls for
// fetching leader index metadata and leader index uuid
static void fetchHistoryUUID(final Client client,
final String leaderIndex,
final Consumer<Map<Integer, String>> handler,
final Consumer<Exception> errorHandler) {
IndicesStatsRequest request = new IndicesStatsRequest();
request.indices(leaderIndex);
CheckedConsumer<IndicesStatsResponse, Exception> onResponseHandler = indicesStatsResponse -> {
IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex);
Map<Integer, String> historyUUIDs = new HashMap<>();
for (IndexShardStats indexShardStats : indexStats) {
for (ShardStats shardStats : indexShardStats) {
String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY);
ShardId shardId = shardStats.getShardRouting().shardId();
historyUUIDs.put(shardId.id(), historyUUID);
}
}
handler.accept(historyUUIDs);
};
client.admin().indices().stats(request, ActionListener.wrap(onResponseHandler, errorHandler));
}

}

}
Loading

0 comments on commit 61c8ad9

Please sign in to comment.