diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java index 8f86a41d44709..20a115a484ab5 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java @@ -41,6 +41,7 @@ import static org.elasticsearch.cluster.metadata.ReservedStateMetadata.EMPTY_VERSION; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.reservedstate.service.ReservedStateErrorTask.checkErrorVersion; +import static org.elasticsearch.reservedstate.service.ReservedStateErrorTask.isNewError; import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.checkMetadataVersion; import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.keysForHandler; @@ -151,7 +152,6 @@ public void initEmpty(String namespace, ActionListener lis new ReservedStateUpdateTask( namespace, emptyState, - List.of(), Map.of(), List.of(), // error state should not be possible since there is no metadata being parsed or processed @@ -212,7 +212,37 @@ public void process(String namespace, ReservedStateChunk reservedStateChunk, Con if (error != null) { errorListener.accept(error); + return; } + updateTaskQueue.submitTask( + "reserved cluster state [" + namespace + "]", + new ReservedStateUpdateTask( + namespace, + reservedStateChunk, + handlers, + orderedHandlers, + ReservedClusterStateService.this::updateErrorState, + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) { + logger.info("Successfully applied new reserved cluster state for namespace [{}]", namespace); + errorListener.accept(null); + } + + @Override + public void onFailure(Exception e) { + // Don't spam the logs on repeated errors + if (isNewError(existingMetadata, reservedStateVersion.version())) { + logger.debug("Failed to apply reserved cluster state", e); + errorListener.accept(e); + } else { + errorListener.accept(null); + } + } + } + ), + null + ); } // package private for testing diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java index 1ac42a91736c3..93d3619889a48 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java @@ -20,7 +20,6 @@ import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata; import org.elasticsearch.cluster.metadata.ReservedStateMetadata; import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.reservedstate.NonStateTransformResult; import org.elasticsearch.reservedstate.ReservedClusterStateHandler; import org.elasticsearch.reservedstate.TransformState; @@ -51,12 +50,10 @@ public class ReservedStateUpdateTask implements ClusterStateTaskListener { private final Collection orderedHandlers; private final Consumer errorReporter; private final ActionListener listener; - private final Collection nonStateTransformResults; public ReservedStateUpdateTask( String namespace, ReservedStateChunk stateChunk, - Collection nonStateTransformResults, Map> handlers, Collection orderedHandlers, Consumer errorReporter, @@ -64,7 +61,6 @@ public ReservedStateUpdateTask( ) { this.namespace = namespace; this.stateChunk = stateChunk; - this.nonStateTransformResults = nonStateTransformResults; this.handlers = handlers; this.orderedHandlers = orderedHandlers; this.errorReporter = errorReporter; @@ -115,12 +111,6 @@ protected ClusterState execute(final ClusterState currentState) { checkAndThrowOnError(errors, reservedStateVersion); - // Once we have set all of the handler state from the cluster state update tasks, we add the reserved keys - // from the non cluster state transforms. - for (var transform : nonStateTransformResults) { - reservedMetadataBuilder.putHandler(new ReservedStateHandlerMetadata(transform.handlerName(), transform.updatedKeys())); - } - // Remove the last error if we had previously encountered any in prior processing of reserved state reservedMetadataBuilder.errorMetadata(null); diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index a46b85505a262..0c0fd4183022f 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -176,7 +176,7 @@ public void testUpdateStateTasks() throws Exception { AtomicBoolean successCalled = new AtomicBoolean(false); ReservedStateUpdateTask task = spy( - new ReservedStateUpdateTask("test", null, List.of(), Map.of(), Set.of(), errorState -> {}, ActionListener.noop()) + new ReservedStateUpdateTask("test", null, Map.of(), Set.of(), errorState -> {}, ActionListener.noop()) ); doReturn(state).when(task).execute(any()); @@ -389,7 +389,6 @@ public Map fromXContent(XContentParser parser) throws IOExceptio ReservedStateUpdateTask task = new ReservedStateUpdateTask( "namespace_one", chunk, - List.of(), Map.of(exceptionThrower.name(), exceptionThrower, newStateMaker.name(), newStateMaker), orderedHandlers, errorState -> assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, errorState.version())), diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskTests.java index d887d7edb19f2..72d2310a098cf 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskTests.java @@ -22,7 +22,7 @@ public class ReservedStateUpdateTaskTests extends ESTestCase { public void testBlockedClusterState() { - var task = new ReservedStateUpdateTask("dummy", null, List.of(), Map.of(), List.of(), e -> {}, ActionListener.noop()); + var task = new ReservedStateUpdateTask("dummy", null, Map.of(), List.of(), e -> {}, ActionListener.noop()); ClusterState notRecoveredClusterState = ClusterState.builder(ClusterName.DEFAULT) .blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) .build();