Skip to content

Commit

Permalink
Add infrastructure for managing system indices
Browse files Browse the repository at this point in the history
Part of elastic#61656.

Add the necessary support for automatically creating and updating system
indices. This works by making it possible to create a system index
descriptor with all the information needed to manage the mappings,
settings and aliases.

Follow-up work will opt existing indices into this framework.
  • Loading branch information
pugnascotia committed Nov 30, 2020
1 parent be86dd5 commit d7b728c
Show file tree
Hide file tree
Showing 16 changed files with 1,279 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
*/
package org.elasticsearch.action.admin.indices.create;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -40,18 +43,24 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

/**
* Api that auto creates an index or data stream that originate from requests that write into an index that doesn't yet exist.
*/
public final class AutoCreateAction extends ActionType<CreateIndexResponse> {

private static final Logger logger = LogManager.getLogger(AutoCreateAction.class);

public static final AutoCreateAction INSTANCE = new AutoCreateAction();
public static final String NAME = "indices:admin/auto_create";

Expand All @@ -65,15 +74,17 @@ public static final class TransportAction extends TransportMasterNodeAction<Crea
private final MetadataCreateIndexService createIndexService;
private final MetadataCreateDataStreamService metadataCreateDataStreamService;
private final AutoCreateIndex autoCreateIndex;
private final SystemIndices systemIndices;

@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetadataCreateIndexService createIndexService,
MetadataCreateDataStreamService metadataCreateDataStreamService,
AutoCreateIndex autoCreateIndex) {
AutoCreateIndex autoCreateIndex, SystemIndices systemIndices) {
super(NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new, indexNameExpressionResolver,
CreateIndexResponse::new, ThreadPool.Names.SAME);
this.systemIndices = systemIndices;
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
this.createIndexService = createIndexService;
this.metadataCreateDataStreamService = metadataCreateDataStreamService;
Expand Down Expand Up @@ -141,12 +152,56 @@ public ClusterState execute(ClusterState currentState) throws Exception {
return currentState;
}

CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(request.cause(), indexName, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout());
CreateIndexClusterStateUpdateRequest updateRequest = buildUpdateRequest(indexName);

return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false);
}
}

private CreateIndexClusterStateUpdateRequest buildUpdateRequest(String indexName) {
boolean isSystemIndex = false;
String mappings = null;
Settings settings = null;
String aliasName = null;
String concreteIndexName = indexName;

final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(indexName);

if (descriptor != null && descriptor.isAutomaticallyManaged()) {
isSystemIndex = true;

mappings = descriptor.getMappings();
settings = descriptor.getSettings();
aliasName = descriptor.getAliasName();
concreteIndexName = descriptor.getPrimaryIndex();
}

CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(request.cause(), concreteIndexName, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout());

if (isSystemIndex) {
updateRequest.waitForActiveShards(ActiveShardCount.ALL);
}

if (mappings != null) {
updateRequest.mappings(mappings);
}
if (settings != null) {
updateRequest.settings(settings);
}
if (aliasName != null) {
updateRequest.aliases(Set.of(new Alias(aliasName)));
}

if (isSystemIndex) {
logger.info("Auto-creating system index {}", concreteIndexName);
} else {
logger.debug("Auto-creating index {}", concreteIndexName);
}

return updateRequest;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -29,24 +31,32 @@
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Set;

/**
* Create index action.
*/
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {

private final MetadataCreateIndexService createIndexService;
private final SystemIndices systemIndices;

@Inject
public TransportCreateIndexAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetadataCreateIndexService createIndexService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
SystemIndices systemIndices) {
super(CreateIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new,
indexNameExpressionResolver, CreateIndexResponse::new, ThreadPool.Names.SAME);
this.createIndexService = createIndexService;
this.systemIndices = systemIndices;
}

@Override
Expand All @@ -63,13 +73,48 @@ protected void masterOperation(Task task, final CreateIndexRequest request, fina
}

final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());

String mappings = request.mappings();
Settings settings = request.settings();
Set<Alias> aliases = request.aliases();

String concreteIndexName = indexName;
boolean isSystemIndex = false;

SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(indexName);

if (descriptor != null && descriptor.isAutomaticallyManaged()) {
isSystemIndex = true;
// System indices define their own settings and mappings, which cannot be overridden.
mappings = descriptor.getMappings();
settings = descriptor.getSettings();
concreteIndexName = descriptor.getPrimaryIndex();

if (descriptor.getAliasName() == null) {
aliases = Set.of();
} else {
aliases = Set.of(new Alias(descriptor.getAliasName()));
}
}

final CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index())
new CreateIndexClusterStateUpdateRequest(cause, concreteIndexName, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases())
.aliases(aliases)
.waitForActiveShards(request.waitForActiveShards());

if (isSystemIndex) {
updateRequest.waitForActiveShards(ActiveShardCount.ALL);
}

if (mappings != null) {
updateRequest.mappings(mappings);
}

if (settings != null) {
updateRequest.settings(settings);
}

createIndexService.createIndex(updateRequest, ActionListener.map(listener, response ->
new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -55,6 +57,7 @@ public class TransportPutMappingAction extends AcknowledgedTransportMasterNodeAc

private final MetadataMappingService metadataMappingService;
private final RequestValidators<PutMappingRequest> requestValidators;
private final SystemIndices systemIndices;

@Inject
public TransportPutMappingAction(
Expand All @@ -64,11 +67,13 @@ public TransportPutMappingAction(
final MetadataMappingService metadataMappingService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final RequestValidators<PutMappingRequest> requestValidators) {
final RequestValidators<PutMappingRequest> requestValidators,
final SystemIndices systemIndices) {
super(PutMappingAction.NAME, transportService, clusterService, threadPool, actionFilters, PutMappingRequest::new,
indexNameExpressionResolver, ThreadPool.Names.SAME);
this.metadataMappingService = metadataMappingService;
this.requestValidators = Objects.requireNonNull(requestValidators);
this.systemIndices = systemIndices;
}

@Override
Expand All @@ -87,12 +92,25 @@ protected void masterOperation(Task task, final PutMappingRequest request, final
final ActionListener<AcknowledgedResponse> listener) {
try {
final Index[] concreteIndices = resolveIndices(state, request, indexNameExpressionResolver);
final String mappingSource = request.source();

final Optional<Exception> maybeValidationException = requestValidators.validateRequest(request, state, concreteIndices);
if (maybeValidationException.isPresent()) {
listener.onFailure(maybeValidationException.get());
return;
}

final List<String> violations = checkForSystemIndexViolations(concreteIndices, mappingSource);
if (violations.isEmpty() == false) {
final String message = "Cannot update mappings in "
+ violations
+ ": system indices can only use mappings from their descriptors, "
+ "but the mappings in the request did not match those in the descriptors(s)";
logger.warn(message);
listener.onFailure(new IllegalArgumentException(message));
return;
}

performMappingUpdate(concreteIndices, request, listener, metadataMappingService);
} catch (IndexNotFoundException ex) {
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}]",
Expand Down Expand Up @@ -142,4 +160,21 @@ public void onFailure(Exception t) {
});
}

private List<String> checkForSystemIndexViolations(Index[] concreteIndices, String requestMappings) {
List<String> violations = new ArrayList<>();

for (Index index : concreteIndices) {
final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(index.getName());
if (descriptor != null && descriptor.isAutomaticallyManaged()) {
final String descriptorMappings = descriptor.getMappings();

// Technically we could trip over a difference in whitespace here, but then again nobody should be trying to manually
// update a descriptor's mappings.
if (descriptorMappings.equals(requestMappings) == false) {
violations.add(index.getName());
}
}
}
return violations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,36 @@
import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class TransportUpdateSettingsAction extends AcknowledgedTransportMasterNodeAction<UpdateSettingsRequest> {

private static final Logger logger = LogManager.getLogger(TransportUpdateSettingsAction.class);

private final MetadataUpdateSettingsService updateSettingsService;
private final SystemIndices systemIndices;

@Inject
public TransportUpdateSettingsAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetadataUpdateSettingsService updateSettingsService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
SystemIndices systemIndices) {
super(UpdateSettingsAction.NAME, transportService, clusterService, threadPool, actionFilters, UpdateSettingsRequest::new,
indexNameExpressionResolver, ThreadPool.Names.SAME);
this.updateSettingsService = updateSettingsService;
this.systemIndices = systemIndices;
}

@Override
Expand All @@ -75,9 +87,24 @@ protected ClusterBlockException checkBlock(UpdateSettingsRequest request, Cluste
protected void masterOperation(Task task, final UpdateSettingsRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
final Settings requestSettings = request.settings();


final Map<String, List<String>> systemIndexViolations = checkForSystemIndexViolations(concreteIndices, requestSettings);
if (systemIndexViolations.isEmpty() == false) {
final String message = "Cannot override settings on system indices: "
+ systemIndexViolations.entrySet()
.stream()
.map(entry -> "[" + entry.getKey() + "] -> " + entry.getValue())
.collect(Collectors.joining(", "));
logger.warn(message);
listener.onFailure(new IllegalArgumentException(message));
return;
}

UpdateSettingsClusterStateUpdateRequest clusterStateUpdateRequest = new UpdateSettingsClusterStateUpdateRequest()
.indices(concreteIndices)
.settings(request.settings())
.settings(requestSettings)
.setPreserveExisting(request.isPreserveExisting())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout());
Expand All @@ -95,4 +122,37 @@ public void onFailure(Exception t) {
}
});
}

/**
* Checks that if the request is trying to apply settings changes to any system indices, then the settings' values match those
* that the system index's descriptor expects.
*
* @param concreteIndices the indices being updated
* @param requestSettings the settings to be applied
* @return a mapping from system index pattern to the settings whose values would be overridden. Empty if there are no violations.
*/
private Map<String, List<String>> checkForSystemIndexViolations(Index[] concreteIndices, Settings requestSettings) {
final Map<String, List<String>> violations = new HashMap<>();

for (Index index : concreteIndices) {
final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(index.getName());
if (descriptor != null && descriptor.isAutomaticallyManaged()) {
final Settings descriptorSettings = descriptor.getSettings();
List<String> failedKeys = new ArrayList<>();
for (String key : requestSettings.keySet()) {
final String expectedValue = descriptorSettings.get(key);
final String actualValue = requestSettings.get(key);

if (expectedValue.equals(actualValue) == false) {
failedKeys.add(key);
}
}

if (failedKeys.isEmpty() == false) {
violations.put(descriptor.getIndexPattern(), failedKeys);
}
}
}
return violations;
}
}
Loading

0 comments on commit d7b728c

Please sign in to comment.