Skip to content

Commit

Permalink
Use RoleRetrievalResult for better caching
Browse files Browse the repository at this point in the history
Security caches the result of role lookups and negative lookups are
cached indefinitely. In the case of transient failures this leads to a
bad experience as the roles could truly exist. The CompositeRolesStore
needs to know if a failure occurred in one of the roles stores in order
to make the appropriate decision as it relates to caching. In order to
provide this information to the CompositeRolesStore, the return type of
methods to retrieve roles has changed to a new class,
RoleRetrievalResult. This class provides the ability to pass back an
exception to the roles store. This exception does not mean that a
request should be failed but instead serves as a signal to the roles
store that missing roles should not be cached and neither should the
combined role if there are missing roles.

As part of this, the negative lookup cache was also changed from an
unbounded cache to a cache with a configurable limit.

Relates elastic#33205
  • Loading branch information
jaymode committed Oct 1, 2018
1 parent a02deba commit daf7712
Show file tree
Hide file tree
Showing 14 changed files with 427 additions and 165 deletions.
Expand Up @@ -53,8 +53,7 @@ protected XPackUsageResponse newResponse() {
}

@Override
protected void masterOperation(XPackUsageRequest request, ClusterState state, ActionListener<XPackUsageResponse> listener)
throws Exception {
protected void masterOperation(XPackUsageRequest request, ClusterState state, ActionListener<XPackUsageResponse> listener) {
final ActionListener<List<XPackFeatureSet.Usage>> usageActionListener = new ActionListener<List<Usage>>() {
@Override
public void onResponse(List<Usage> usages) {
Expand All @@ -73,7 +72,8 @@ public void onFailure(Exception e) {
@Override
public void onResponse(Usage usage) {
featureSetUsages.set(position.getAndIncrement(), usage);
iteratingListener.onResponse(null); // just send null back and keep iterating
// the value sent back doesn't matter since our predicate keeps iterating
iteratingListener.onResponse(Collections.emptyList());
}

@Override
Expand All @@ -84,13 +84,13 @@ public void onFailure(Exception e) {
};
IteratingActionListener<List<XPackFeatureSet.Usage>, XPackFeatureSet> iteratingActionListener =
new IteratingActionListener<>(usageActionListener, consumer, featureSets,
threadPool.getThreadContext(), () -> {
threadPool.getThreadContext(), (ignore) -> {
final List<Usage> usageList = new ArrayList<>(featureSetUsages.length());
for (int i = 0; i < featureSetUsages.length(); i++) {
usageList.add(featureSetUsages.get(i));
}
return usageList;
});
}, (ignore) -> true);
iteratingActionListener.run();
}

Expand Down
Expand Up @@ -6,12 +6,14 @@
package org.elasticsearch.xpack.core.common;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.ThreadContext;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
Expand All @@ -32,7 +34,8 @@ public final class IteratingActionListener<T, U> implements ActionListener<T>, R
private final ActionListener<T> delegate;
private final BiConsumer<U, ActionListener<T>> consumer;
private final ThreadContext threadContext;
private final Supplier<T> consumablesFinishedResponse;
private final Function<T, T> finalResultFunction;
private final Predicate<T> iterationPredicate;

private int position = 0;

Expand All @@ -46,7 +49,7 @@ public final class IteratingActionListener<T, U> implements ActionListener<T>, R
*/
public IteratingActionListener(ActionListener<T> delegate, BiConsumer<U, ActionListener<T>> consumer, List<U> consumables,
ThreadContext threadContext) {
this(delegate, consumer, consumables, threadContext, null);
this(delegate, consumer, consumables, threadContext, Function.identity());
}

/**
Expand All @@ -56,18 +59,36 @@ public IteratingActionListener(ActionListener<T> delegate, BiConsumer<U, ActionL
* @param consumer the consumer that is executed for each consumable instance
* @param consumables the instances that can be consumed to produce a response which is ultimately sent on the delegate listener
* @param threadContext the thread context for the thread pool that created the listener
* @param consumablesFinishedResponse a supplier that maps the last consumable's response to a response
* to be sent on the delegate listener, in case the last consumable returns a
* {@code null} value, but the delegate listener should respond with some other value
* (perhaps a concatenation of the results of all the consumables).
* @param finalResultFunction a function that maps the response which terminated iteration to a response that will be sent to the
* delegate listener. This is useful if the delegate listener should receive some other value (perhaps
* a concatenation of the results of all the called consumables).
*/
public IteratingActionListener(ActionListener<T> delegate, BiConsumer<U, ActionListener<T>> consumer, List<U> consumables,
ThreadContext threadContext, @Nullable Supplier<T> consumablesFinishedResponse) {
ThreadContext threadContext, Function<T, T> finalResultFunction) {
this(delegate, consumer, consumables, threadContext, finalResultFunction, Objects::isNull);
}

/**
* Constructs an {@link IteratingActionListener}.
*
* @param delegate the delegate listener to call when all consumables have finished executing
* @param consumer the consumer that is executed for each consumable instance
* @param consumables the instances that can be consumed to produce a response which is ultimately sent on the delegate listener
* @param threadContext the thread context for the thread pool that created the listener
* @param finalResultFunction a function that maps the response which terminated iteration to a response that will be sent to the
* delegate listener. This is useful if the delegate listener should receive some other value (perhaps
* a concatenation of the results of all the called consumables).
* @param iterationPredicate a {@link Predicate} that checks if iteration should continue based on the returned result
*/
public IteratingActionListener(ActionListener<T> delegate, BiConsumer<U, ActionListener<T>> consumer, List<U> consumables,
ThreadContext threadContext, Function<T, T> finalResultFunction,
Predicate<T> iterationPredicate) {
this.delegate = delegate;
this.consumer = consumer;
this.consumables = Collections.unmodifiableList(consumables);
this.threadContext = threadContext;
this.consumablesFinishedResponse = consumablesFinishedResponse;
this.finalResultFunction = finalResultFunction;
this.iterationPredicate = iterationPredicate;
}

@Override
Expand All @@ -88,18 +109,15 @@ public void onResponse(T response) {
// we need to store the context here as there is a chance that this method is called from a thread outside of the ThreadPool
// like a LDAP connection reader thread and we can pollute the context in certain cases
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext(false)) {
if (response == null) {
final boolean continueIteration = iterationPredicate.test(response);
if (continueIteration) {
if (position == consumables.size()) {
if (consumablesFinishedResponse != null) {
delegate.onResponse(consumablesFinishedResponse.get());
} else {
delegate.onResponse(null);
}
delegate.onResponse(finalResultFunction.apply(response));
} else {
consumer.accept(consumables.get(position++), this);
}
} else {
delegate.onResponse(response);
delegate.onResponse(finalResultFunction.apply(response));
}
}
}
Expand Down
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.xpack.core.security.authc.Realm;
import org.elasticsearch.xpack.core.security.authc.RealmConfig;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.authz.store.RoleRetrievalResult;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -72,16 +73,20 @@ default AuthenticationFailureHandler getAuthenticationFailureHandler() {
* should be asynchronous if the computation is lengthy or any disk and/or network
* I/O is involved. The implementation is responsible for resolving whatever roles
* it can into a set of {@link RoleDescriptor} instances. If successful, the
* implementation must invoke {@link ActionListener#onResponse(Object)} to pass along
* the resolved set of role descriptors. If a failure was encountered, the
* implementation must invoke {@link ActionListener#onFailure(Exception)}.
* implementation must wrap the set of {@link RoleDescriptor} instances in a
* {@link RoleRetrievalResult} using {@link RoleRetrievalResult#success(Set)} and then invoke
* {@link ActionListener#onResponse(Object)}. If a failure was encountered, the
* implementation should wrap the failure in a {@link RoleRetrievalResult} using
* {@link RoleRetrievalResult#failure(Exception)} and then invoke
* {@link ActionListener#onResponse(Object)} unless the failure needs to terminate the request,
* in which case the implementation should invoke {@link ActionListener#onFailure(Exception)}.
*
* By default, an empty list is returned.
*
* @param settings The configured settings for the node
* @param resourceWatcherService Use to watch configuration files for changes
*/
default List<BiConsumer<Set<String>, ActionListener<Set<RoleDescriptor>>>>
default List<BiConsumer<Set<String>, ActionListener<RoleRetrievalResult>>>
getRolesProviders(Settings settings, ResourceWatcherService resourceWatcherService) {
return Collections.emptyList();
}
Expand Down
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.security.authz.store;

import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;

import java.util.Objects;
import java.util.Set;

/**
* The result of attempting to retrieve roles from a roles provider. The result can either be
* successful or a failure. A successful result indicates that no errors occurred while retrieving
* roles, even if none of the requested roles could be found. A failure indicates an error
* occurred while retrieving the results but the error is not fatal and the request may be able
* to continue.
*/
public final class RoleRetrievalResult {

private final Set<RoleDescriptor> descriptors;
private final Exception failure;

private RoleRetrievalResult(Set<RoleDescriptor> descriptors, Exception failure) {
if (descriptors != null && failure != null) {
throw new IllegalArgumentException("either descriptors or failure must be null");
}
this.descriptors = descriptors;
this.failure = failure;
}

/**
* @return the resolved descriptors or {@code null} if there was a failure
*/
public Set<RoleDescriptor> getDescriptors() {
return descriptors;
}

/**
* @return the failure or {@code null} if retrieval succeeded
*/
public Exception getFailure() {
return failure;
}

/**
* @return true if the retrieval succeeded
*/
public boolean isSuccess() {
return descriptors != null;
}

/**
* Creates a successful result with the provided {@link RoleDescriptor} set,
* which must be non-null
*/
public static RoleRetrievalResult success(Set<RoleDescriptor> descriptors) {
Objects.requireNonNull(descriptors, "descriptors must not be null if successful");
return new RoleRetrievalResult(descriptors, null);
}

/**
* Creates a failed result with the provided non-null exception
*/
public static RoleRetrievalResult failure(Exception e) {
Objects.requireNonNull(e, "Exception must be provided");
return new RoleRetrievalResult(null, e);
}
}
Expand Up @@ -7,7 +7,6 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.collect.HppcMaps.Object;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -18,8 +17,12 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;

import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;

public class IteratingActionListenerTests extends ESTestCase {
Expand Down Expand Up @@ -136,4 +139,49 @@ public void testFailure() {
assertEquals(numberOfIterations, iterations.get());
assertTrue(onFailureCalled.get());
}

public void testFunctionApplied() {
final int numberOfItems = scaledRandomIntBetween(2, 32);
final int numberOfIterations = scaledRandomIntBetween(1, numberOfItems);
List<Object> items = new ArrayList<>(numberOfItems);
for (int i = 0; i < numberOfItems; i++) {
items.add(new Object());
}

final AtomicInteger iterations = new AtomicInteger(0);
final Predicate<Object> iterationPredicate = object -> {
final int current = iterations.incrementAndGet();
return current != numberOfIterations;
};
final BiConsumer<Object, ActionListener<Object>> consumer = (listValue, listener) -> {
listener.onResponse(items.get(iterations.get()));
};

final AtomicReference<Object> originalObject = new AtomicReference<>();
final AtomicReference<Object> result = new AtomicReference<>();
final Function<Object, Object> responseFunction = object -> {
originalObject.set(object);
Object randomResult;
do {
randomResult = randomFrom(items);
} while (randomResult == object);
result.set(randomResult);
return randomResult;
};

IteratingActionListener<Object, Object> iteratingListener = new IteratingActionListener<>(ActionListener.wrap((object) -> {
assertNotNull(object);
assertNotNull(originalObject.get());
assertThat(object, sameInstance(result.get()));
assertThat(object, not(sameInstance(originalObject.get())));
assertThat(originalObject.get(), sameInstance(items.get(iterations.get() - 1)));
}, (e) -> {
logger.error("unexpected exception", e);
fail("exception should not have been thrown");
}), consumer, items, new ThreadContext(Settings.EMPTY), responseFunction, iterationPredicate);
iteratingListener.run();

// we never really went async, its all chained together so verify this for sanity
assertEquals(numberOfIterations, iterations.get());
}
}
Expand Up @@ -116,7 +116,6 @@
import org.elasticsearch.xpack.core.security.authc.RealmSettings;
import org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken;
import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
import org.elasticsearch.xpack.core.security.authz.accesscontrol.SecurityIndexSearcherWrapper;
import org.elasticsearch.xpack.core.security.authz.permission.FieldPermissions;
Expand Down Expand Up @@ -184,6 +183,7 @@
import org.elasticsearch.xpack.security.authz.store.FileRolesStore;
import org.elasticsearch.xpack.security.authz.store.NativePrivilegeStore;
import org.elasticsearch.xpack.security.authz.store.NativeRolesStore;
import org.elasticsearch.xpack.core.security.authz.store.RoleRetrievalResult;
import org.elasticsearch.xpack.security.ingest.SetSecurityUserProcessor;
import org.elasticsearch.xpack.security.rest.SecurityRestFilter;
import org.elasticsearch.xpack.security.rest.action.RestAuthenticateAction;
Expand Down Expand Up @@ -458,7 +458,7 @@ Collection<Object> createComponents(Client client, ThreadPool threadPool, Cluste
final FileRolesStore fileRolesStore = new FileRolesStore(settings, env, resourceWatcherService, getLicenseState());
final NativeRolesStore nativeRolesStore = new NativeRolesStore(settings, client, getLicenseState(), securityIndex.get());
final ReservedRolesStore reservedRolesStore = new ReservedRolesStore();
List<BiConsumer<Set<String>, ActionListener<Set<RoleDescriptor>>>> rolesProviders = new ArrayList<>();
List<BiConsumer<Set<String>, ActionListener<RoleRetrievalResult>>> rolesProviders = new ArrayList<>();
for (SecurityExtension extension : securityExtensions) {
rolesProviders.addAll(extension.getRolesProviders(settings, resourceWatcherService));
}
Expand Down Expand Up @@ -610,7 +610,7 @@ public static List<Setting<?>> getSettings(boolean transportClientMode, List<Sec
AuthenticationService.addSettings(settingsList);
AuthorizationService.addSettings(settingsList);
settingsList.add(Automatons.MAX_DETERMINIZED_STATES_SETTING);
settingsList.add(CompositeRolesStore.CACHE_SIZE_SETTING);
settingsList.addAll(CompositeRolesStore.getSettings());
settingsList.add(FieldPermissionsCache.CACHE_SIZE_SETTING);
settingsList.add(TokenService.TOKEN_EXPIRATION);
settingsList.add(TokenService.DELETE_INTERVAL);
Expand Down
Expand Up @@ -67,10 +67,14 @@ protected void doExecute(Task task, final GetRolesRequest request, final ActionL
listener.onResponse(new GetRolesResponse(roles.toArray(new RoleDescriptor[roles.size()])));
} else {
String[] roleNames = rolesToSearchFor.toArray(new String[rolesToSearchFor.size()]);
nativeRolesStore.getRoleDescriptors(roleNames, ActionListener.wrap((foundRoles) -> {
roles.addAll(foundRoles);
listener.onResponse(new GetRolesResponse(roles.toArray(new RoleDescriptor[roles.size()])));
}, listener::onFailure));
nativeRolesStore.getRoleDescriptors(roleNames, ActionListener.wrap((retrievalResult) -> {
if (retrievalResult.isSuccess()) {
roles.addAll(retrievalResult.getDescriptors());
listener.onResponse(new GetRolesResponse(roles.toArray(new RoleDescriptor[roles.size()])));
} else {
listener.onFailure(retrievalResult.getFailure());
}
}, listener::onFailure));
}
}
}

0 comments on commit daf7712

Please sign in to comment.