Skip to content

Commit

Permalink
Security: don't call prepare index for reads
Browse files Browse the repository at this point in the history
The security native stores follow a pattern where
`SecurityIndexManager#prepareIndexIfNeededThenExecute` wraps most calls
made for the security index. The reasoning behind this was to check if
the security index had been upgraded to the latest version in a
consistent manner. However, this has the potential side effect that a
read will trigger the creation of the security index or an updating of
its mappings, which can lead to issues such as failures due to put
mapping requests timing out even though we might have been able to read
from the index and get the data necessary.

This change introduces a new method, `checkIndexVersionThenExecute`,
that provides the consistent checking of the security index to make
sure it has been upgraded. That is the only check that this method
performs prior to running the passed in operation, which removes the
possible triggering of index creation and mapping updates for reads.

Additionally, areas where we do reads now check the availability of the
security index and can short circuit requests. Availability in this
context means that the index exists and all primaries are active.

Relates elastic#33205
  • Loading branch information
jaymode committed Oct 2, 2018
1 parent a1c441f commit 97d5ac3
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -361,30 +361,34 @@ void decodeToken(String token, ActionListener<UserToken> listener) throws IOExce
final Cipher cipher = getDecryptionCipher(iv, decodeKey, version, decodedSalt);
if (version.onOrAfter(Version.V_6_2_0)) {
// we only have the id and need to get the token from the doc!
decryptTokenId(in, cipher, version, ActionListener.wrap(tokenId ->
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
final GetRequest getRequest =
decryptTokenId(in, cipher, version, ActionListener.wrap(tokenId -> {
if (securityIndex.isAvailable() == false) {
logger.warn("failed to get token [{}] since index is not available", tokenId);
listener.onResponse(null);
} else {
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
final GetRequest getRequest =
client.prepareGet(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE,
getTokenDocumentId(tokenId)).request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
getTokenDocumentId(tokenId)).request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
ActionListener.<GetResponse>wrap(response -> {
if (response.isExists()) {
Map<String, Object> accessTokenSource =
(Map<String, Object>) response.getSource().get("access_token");
(Map<String, Object>) response.getSource().get("access_token");
if (accessTokenSource == null) {
listener.onFailure(new IllegalStateException("token document is missing " +
"the access_token field"));
"the access_token field"));
} else if (accessTokenSource.containsKey("user_token") == false) {
listener.onFailure(new IllegalStateException("token document is missing " +
"the user_token field"));
"the user_token field"));
} else {
Map<String, Object> userTokenSource =
(Map<String, Object>) accessTokenSource.get("user_token");
(Map<String, Object>) accessTokenSource.get("user_token");
listener.onResponse(UserToken.fromSourceMap(userTokenSource));
}
} else {
listener.onFailure(
new IllegalStateException("token document is missing and must be present"));
new IllegalStateException("token document is missing and must be present"));
}
}, e -> {
// if the index or the shard is not there / available we assume that
Expand All @@ -397,7 +401,8 @@ void decodeToken(String token, ActionListener<UserToken> listener) throws IOExce
listener.onFailure(e);
}
}), client::get);
}), listener::onFailure));
});
}}, listener::onFailure));
} else {
decryptToken(in, cipher, version, listener);
}
Expand Down Expand Up @@ -673,30 +678,36 @@ private void findTokenFromRefreshToken(String refreshToken, ActionListener<Tuple
.setVersion(true)
.request();

securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
if (securityIndex.isAvailable() == false) {
logger.debug("security index is not available to find token from refresh token, retrying");
attemptCount.incrementAndGet();
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
} else {
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
ActionListener.<SearchResponse>wrap(searchResponse -> {
if (searchResponse.isTimedOut()) {
attemptCount.incrementAndGet();
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
} else if (searchResponse.getHits().getHits().length < 1) {
logger.info("could not find token document with refresh_token [{}]", refreshToken);
listener.onFailure(invalidGrantException("could not refresh the requested token"));
} else if (searchResponse.getHits().getHits().length > 1) {
listener.onFailure(new IllegalStateException("multiple tokens share the same refresh token"));
} else {
listener.onResponse(new Tuple<>(searchResponse, attemptCount));
}
}, e -> {
if (isShardNotAvailableException(e)) {
logger.debug("failed to search for token document, retrying", e);
attemptCount.incrementAndGet();
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
} else {
listener.onFailure(e);
}
}),
client::search));
ActionListener.<SearchResponse>wrap(searchResponse -> {
if (searchResponse.isTimedOut()) {
attemptCount.incrementAndGet();
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
} else if (searchResponse.getHits().getHits().length < 1) {
logger.info("could not find token document with refresh_token [{}]", refreshToken);
listener.onFailure(invalidGrantException("could not refresh the requested token"));
} else if (searchResponse.getHits().getHits().length > 1) {
listener.onFailure(new IllegalStateException("multiple tokens share the same refresh token"));
} else {
listener.onResponse(new Tuple<>(searchResponse, attemptCount));
}
}, e -> {
if (isShardNotAvailableException(e)) {
logger.debug("failed to search for token document, retrying", e);
attemptCount.incrementAndGet();
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
} else {
listener.onFailure(e);
}
}),
client::search));
}
}
}

Expand Down Expand Up @@ -831,32 +842,33 @@ public void findActiveTokensForRealm(String realmName, ActionListener<Collection

if (Strings.isNullOrEmpty(realmName)) {
listener.onFailure(new IllegalArgumentException("Realm name is required"));
return;
}

final Instant now = clock.instant();
final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
} else if (securityIndex.isAvailable() == false) {
listener.onResponse(Collections.emptyList());
} else {
final Instant now = clock.instant();
final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("doc_type", "token"))
.filter(QueryBuilders.termQuery("access_token.realm", realmName))
.filter(QueryBuilders.boolQuery()
.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("access_token.invalidated", false))
.must(QueryBuilders.rangeQuery("access_token.user_token.expiration_time").gte(now.toEpochMilli()))
)
.should(QueryBuilders.termQuery("refresh_token.invalidated", false))
.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("access_token.invalidated", false))
.must(QueryBuilders.rangeQuery("access_token.user_token.expiration_time").gte(now.toEpochMilli()))
)
.should(QueryBuilders.termQuery("refresh_token.invalidated", false))
);

final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
.setScroll(DEFAULT_KEEPALIVE_SETTING.get(settings))
.setQuery(boolQuery)
.setVersion(false)
.setSize(1000)
.setFetchSource(true)
.request();

final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
ScrollHelper.fetchAllByEntity(client, request, new ContextPreservingActionListener<>(supplier, listener), this::parseHit));
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
ScrollHelper.fetchAllByEntity(client, request, new ContextPreservingActionListener<>(supplier, listener), this::parseHit));
}
}

private Tuple<UserToken, String> parseHit(SearchHit hit) {
Expand Down Expand Up @@ -926,7 +938,7 @@ private void checkIfTokenIsRevoked(UserToken userToken, ActionListener<UserToken
// index doesn't exist so the token is considered valid.
listener.onResponse(userToken);
} else {
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
MultiGetRequest mGetRequest = client.prepareMultiGet()
.add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getInvalidatedTokenDocumentId(userToken))
.add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getTokenDocumentId(userToken))
Expand Down
Loading

0 comments on commit 97d5ac3

Please sign in to comment.