Skip to content

Commit

Permalink
restore state update
Browse files Browse the repository at this point in the history
  • Loading branch information
rjernst committed Jul 9, 2024
1 parent d2b8749 commit e97e563
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.elasticsearch.cluster.metadata.ReservedStateMetadata.EMPTY_VERSION;
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.reservedstate.service.ReservedStateErrorTask.checkErrorVersion;
import static org.elasticsearch.reservedstate.service.ReservedStateErrorTask.isNewError;
import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.checkMetadataVersion;
import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.keysForHandler;

Expand Down Expand Up @@ -151,7 +152,6 @@ public void initEmpty(String namespace, ActionListener<ActionResponse.Empty> lis
new ReservedStateUpdateTask(
namespace,
emptyState,
List.of(),
Map.of(),
List.of(),
// error state should not be possible since there is no metadata being parsed or processed
Expand Down Expand Up @@ -212,7 +212,37 @@ public void process(String namespace, ReservedStateChunk reservedStateChunk, Con

if (error != null) {
errorListener.accept(error);
return;
}
updateTaskQueue.submitTask(
"reserved cluster state [" + namespace + "]",
new ReservedStateUpdateTask(
namespace,
reservedStateChunk,
handlers,
orderedHandlers,
ReservedClusterStateService.this::updateErrorState,
new ActionListener<>() {
@Override
public void onResponse(ActionResponse.Empty empty) {
logger.info("Successfully applied new reserved cluster state for namespace [{}]", namespace);
errorListener.accept(null);
}

@Override
public void onFailure(Exception e) {
// Don't spam the logs on repeated errors
if (isNewError(existingMetadata, reservedStateVersion.version())) {
logger.debug("Failed to apply reserved cluster state", e);
errorListener.accept(e);
} else {
errorListener.accept(null);
}
}
}
),
null
);
}

// package private for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata;
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.reservedstate.NonStateTransformResult;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.TransformState;

Expand Down Expand Up @@ -51,20 +50,17 @@ public class ReservedStateUpdateTask implements ClusterStateTaskListener {
private final Collection<String> orderedHandlers;
private final Consumer<ErrorState> errorReporter;
private final ActionListener<ActionResponse.Empty> listener;
private final Collection<NonStateTransformResult> nonStateTransformResults;

public ReservedStateUpdateTask(
String namespace,
ReservedStateChunk stateChunk,
Collection<NonStateTransformResult> nonStateTransformResults,
Map<String, ReservedClusterStateHandler<?>> handlers,
Collection<String> orderedHandlers,
Consumer<ErrorState> errorReporter,
ActionListener<ActionResponse.Empty> listener
) {
this.namespace = namespace;
this.stateChunk = stateChunk;
this.nonStateTransformResults = nonStateTransformResults;
this.handlers = handlers;
this.orderedHandlers = orderedHandlers;
this.errorReporter = errorReporter;
Expand Down Expand Up @@ -115,12 +111,6 @@ protected ClusterState execute(final ClusterState currentState) {

checkAndThrowOnError(errors, reservedStateVersion);

// Once we have set all of the handler state from the cluster state update tasks, we add the reserved keys
// from the non cluster state transforms.
for (var transform : nonStateTransformResults) {
reservedMetadataBuilder.putHandler(new ReservedStateHandlerMetadata(transform.handlerName(), transform.updatedKeys()));
}

// Remove the last error if we had previously encountered any in prior processing of reserved state
reservedMetadataBuilder.errorMetadata(null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void testUpdateStateTasks() throws Exception {
AtomicBoolean successCalled = new AtomicBoolean(false);

ReservedStateUpdateTask task = spy(
new ReservedStateUpdateTask("test", null, List.of(), Map.of(), Set.of(), errorState -> {}, ActionListener.noop())
new ReservedStateUpdateTask("test", null, Map.of(), Set.of(), errorState -> {}, ActionListener.noop())
);

doReturn(state).when(task).execute(any());
Expand Down Expand Up @@ -389,7 +389,6 @@ public Map<String, Object> fromXContent(XContentParser parser) throws IOExceptio
ReservedStateUpdateTask task = new ReservedStateUpdateTask(
"namespace_one",
chunk,
List.of(),
Map.of(exceptionThrower.name(), exceptionThrower, newStateMaker.name(), newStateMaker),
orderedHandlers,
errorState -> assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, errorState.version())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

public class ReservedStateUpdateTaskTests extends ESTestCase {
public void testBlockedClusterState() {
var task = new ReservedStateUpdateTask("dummy", null, List.of(), Map.of(), List.of(), e -> {}, ActionListener.noop());
var task = new ReservedStateUpdateTask("dummy", null, Map.of(), List.of(), e -> {}, ActionListener.noop());
ClusterState notRecoveredClusterState = ClusterState.builder(ClusterName.DEFAULT)
.blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK))
.build();
Expand Down

0 comments on commit e97e563

Please sign in to comment.