Skip to content

Commit

Permalink
Remove non-state transforms
Browse files Browse the repository at this point in the history
Reserved cluster state is build from handlers that modify cluster state.
In the past there was one "non-state" handler that stored it's state
outside cluster state. However, with that use case now gone, non-state
transforms are no longer needed. This commit removes support for
non-state transforms.
  • Loading branch information
rjernst committed Jul 9, 2024
1 parent bfc32a2 commit a870b62
Show file tree
Hide file tree
Showing 132 changed files with 5 additions and 16,350 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@

package org.elasticsearch.reservedstate;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;

import java.util.Set;
import java.util.function.Consumer;

/**
* A {@link ClusterState} wrapper used by the ReservedClusterStateService to pass the
Expand All @@ -24,8 +22,4 @@
* other state outside the cluster state. The consumer, if provided, must return a {@link NonStateTransformResult} with
* the keys that will be saved as reserved in the cluster state.
*/
public record TransformState(ClusterState state, Set<String> keys, Consumer<ActionListener<NonStateTransformResult>> nonStateTransform) {
public TransformState(ClusterState state, Set<String> keys) {
this(state, keys, null);
}
}
public record TransformState(ClusterState state, Set<String> keys) {}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata;
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
Expand All @@ -22,16 +21,13 @@
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.reservedstate.NonStateTransformResult;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.TransformState;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentParser;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -45,7 +41,6 @@
import static org.elasticsearch.cluster.metadata.ReservedStateMetadata.EMPTY_VERSION;
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.reservedstate.service.ReservedStateErrorTask.checkErrorVersion;
import static org.elasticsearch.reservedstate.service.ReservedStateErrorTask.isNewError;
import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.checkMetadataVersion;
import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.keysForHandler;

Expand Down Expand Up @@ -218,57 +213,7 @@ public void process(String namespace, ReservedStateChunk reservedStateChunk, Con

if (error != null) {
errorListener.accept(error);
return;
}

// Since we have validated that the cluster state update can be correctly performed in the trial run, we now
// execute the non cluster state transforms. These are assumed to be async and we continue with the cluster state update
// after all have completed. This part of reserved cluster state update is non-atomic, some or all of the non-state
// transformations can succeed, and we can fail to eventually write the reserved cluster state.
executeNonStateTransformationSteps(trialRunResult.nonStateTransforms, new ActionListener<>() {
@Override
public void onResponse(Collection<NonStateTransformResult> nonStateTransformResults) {
// Once all of the non-state transformation results complete, we can proceed to
// do the final save of the cluster state. The non-state transformation reserved keys are applied
// to the reserved state after all other key handlers.
updateTaskQueue.submitTask(
"reserved cluster state [" + namespace + "]",
new ReservedStateUpdateTask(
namespace,
reservedStateChunk,
nonStateTransformResults,
handlers,
orderedHandlers,
ReservedClusterStateService.this::updateErrorState,
new ActionListener<>() {
@Override
public void onResponse(ActionResponse.Empty empty) {
logger.info("Successfully applied new reserved cluster state for namespace [{}]", namespace);
errorListener.accept(null);
}

@Override
public void onFailure(Exception e) {
// Don't spam the logs on repeated errors
if (isNewError(existingMetadata, reservedStateVersion.version())) {
logger.debug("Failed to apply reserved cluster state", e);
errorListener.accept(e);
} else {
errorListener.accept(null);
}
}
}
),
null
);
}

@Override
public void onFailure(Exception e) {
// If we encounter an error while runnin the non-state transforms, we avoid saving any cluster state.
errorListener.accept(checkAndReportError(namespace, List.of(stackTrace(e)), reservedStateVersion));
}
});
}

// package private for testing
Expand Down Expand Up @@ -341,7 +286,6 @@ TrialRunResult trialRun(
Map<String, Object> reservedState = stateChunk.state();

List<String> errors = new ArrayList<>();
List<Consumer<ActionListener<NonStateTransformResult>>> nonStateTransforms = new ArrayList<>();

ClusterState state = currentState;

Expand All @@ -351,39 +295,12 @@ TrialRunResult trialRun(
Set<String> existingKeys = keysForHandler(existingMetadata, handlerName);
TransformState transformState = handler.transform(reservedState.get(handlerName), new TransformState(state, existingKeys));
state = transformState.state();
if (transformState.nonStateTransform() != null) {
nonStateTransforms.add(transformState.nonStateTransform());
}
} catch (Exception e) {
errors.add(format("Error processing %s state change: %s", handler.name(), stackTrace(e)));
}
}

return new TrialRunResult(nonStateTransforms, errors);
}

/**
* Runs the non cluster state transformations asynchronously, collecting the {@link NonStateTransformResult} objects.
* <p>
* Once all non cluster state transformations have completed, we submit the cluster state update task, which
* updates all of the handler state, including the keys produced by the non cluster state transforms. The new reserved
* state version isn't written to the cluster state until the cluster state task runs.
*
* Package private for testing
*/
static void executeNonStateTransformationSteps(
List<Consumer<ActionListener<NonStateTransformResult>>> nonStateTransforms,
ActionListener<Collection<NonStateTransformResult>> listener
) {
final List<NonStateTransformResult> result = Collections.synchronizedList(new ArrayList<>(nonStateTransforms.size()));
try (var listeners = new RefCountingListener(listener.map(ignored -> result))) {
for (var transform : nonStateTransforms) {
// non cluster state transforms don't modify the cluster state, they however are given a chance to return a more
// up-to-date version of the modified keys we should save in the reserved state. These calls are
// async and report back when they are done through the postTasksListener.
transform.accept(listeners.acquire(result::add));
}
}
return new TrialRunResult(errors);
}

/**
Expand Down Expand Up @@ -453,5 +370,5 @@ public void installStateHandler(ReservedClusterStateHandler<?> handler) {
/**
* Helper record class to combine the result of a trial run, non cluster state actions and any errors
*/
record TrialRunResult(List<Consumer<ActionListener<NonStateTransformResult>>> nonStateTransforms, List<String> errors) {}
record TrialRunResult(List<String> errors) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.reservedstate.NonStateTransformResult;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.TransformState;
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
Expand All @@ -38,26 +37,21 @@
import org.mockito.ArgumentMatchers;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.checkMetadataVersion;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -408,7 +402,6 @@ public Map<String, Object> fromXContent(XContentParser parser) throws IOExceptio
);

var trialRunResult = controller.trialRun("namespace_one", state, chunk, new LinkedHashSet<>(orderedHandlers));
assertThat(trialRunResult.nonStateTransforms(), empty());
assertThat(trialRunResult.errors(), contains(containsString("Error processing one state change:")));

// We exit on duplicate errors before we update the cluster state error metadata
Expand Down Expand Up @@ -579,11 +572,7 @@ public String name() {

@Override
public TransformState transform(Object source, TransformState prevState) {
return new TransformState(prevState.state(), prevState.keys(), this::internalKeys);
}

private void internalKeys(ActionListener<NonStateTransformResult> listener) {
listener.onResponse(new NonStateTransformResult(name(), Set.of("key non-state")));
return new TransformState(prevState.state(), prevState.keys());
}

@Override
Expand Down Expand Up @@ -617,104 +606,7 @@ public Map<String, Object> fromXContent(XContentParser parser) throws IOExceptio
);

var trialRunResult = controller.trialRun("namespace_one", state, chunk, new LinkedHashSet<>(orderedHandlers));

assertThat(trialRunResult.nonStateTransforms(), hasSize(1));
assertThat(trialRunResult.errors(), empty());
trialRunResult.nonStateTransforms().get(0).accept(new ActionListener<>() {
@Override
public void onResponse(NonStateTransformResult nonStateTransformResult) {
assertThat(nonStateTransformResult.updatedKeys(), containsInAnyOrder("key non-state"));
assertThat(nonStateTransformResult.handlerName(), is("non-state"));
}

@Override
public void onFailure(Exception e) {
fail("Should not reach here");
}
});
}

public void testExecuteNonStateTransformationSteps() {
int count = randomInt(10);
var handlers = new ArrayList<ReservedClusterStateHandler<?>>();
var i = 0;
var builder = ReservedStateMetadata.builder("namespace_one").version(1L);
var chunkMap = new HashMap<String, Object>();

while (i < count) {
final var key = i++;
var handler = new ReservedClusterStateHandler<>() {
@Override
public String name() {
return "non-state:" + key;
}

@Override
public TransformState transform(Object source, TransformState prevState) {
return new TransformState(prevState.state(), prevState.keys(), this::internalKeys);
}

private void internalKeys(ActionListener<NonStateTransformResult> listener) {
listener.onResponse(new NonStateTransformResult(name(), Set.of("key non-state:" + key)));
}

@Override
public Map<String, Object> fromXContent(XContentParser parser) throws IOException {
return parser.map();
}
};

builder.putHandler(new ReservedStateHandlerMetadata(handler.name(), Set.of("a", "b")));
handlers.add(handler);
chunkMap.put(handler.name(), i);
}

final ReservedStateMetadata operatorMetadata = ReservedStateMetadata.builder("namespace_one").version(1L).build();

Metadata metadata = Metadata.builder().put(operatorMetadata).build();
ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build();

var chunk = new ReservedStateChunk(chunkMap, new ReservedStateVersion(2L, Version.CURRENT));

ClusterService clusterService = mock(ClusterService.class);
final var controller = spy(new ReservedClusterStateService(clusterService, mock(RerouteService.class), handlers));

var trialRunResult = controller.trialRun(
"namespace_one",
state,
chunk,
handlers.stream().map(ReservedClusterStateHandler::name).collect(Collectors.toCollection(LinkedHashSet::new))
);

assertThat(trialRunResult.nonStateTransforms(), hasSize(count));
ReservedClusterStateService.executeNonStateTransformationSteps(trialRunResult.nonStateTransforms(), new ActionListener<>() {
@Override
public void onResponse(Collection<NonStateTransformResult> nonStateTransformResults) {
assertEquals(count, nonStateTransformResults.size());
var expectedHandlers = new ArrayList<String>();
var expectedValues = new ArrayList<String>();
for (int i = 0; i < count; i++) {
expectedHandlers.add("non-state:" + i);
expectedValues.add("key non-state:" + i);
}
assertThat(
nonStateTransformResults.stream().map(NonStateTransformResult::handlerName).collect(Collectors.toSet()),
containsInAnyOrder(expectedHandlers.toArray())
);
assertThat(
nonStateTransformResults.stream()
.map(NonStateTransformResult::updatedKeys)
.flatMap(Set::stream)
.collect(Collectors.toSet()),
containsInAnyOrder(expectedValues.toArray())
);
}

@Override
public void onFailure(Exception e) {
fail("Shouldn't reach here");
}
});
}

static class TestHandler implements ReservedClusterStateHandler<Map<String, Object>> {
Expand Down
Loading

0 comments on commit a870b62

Please sign in to comment.