Skip to content

Commit

Permalink
Retry if token document not found
Browse files Browse the repository at this point in the history
Due to a bug in real-time-get on stateless ES, it is possible for a
document to temporarily disappear.

This change updates the Token Service to retry when a document is not
found, so that we can works around the bug until it is resolved.
  • Loading branch information
tvernum committed Sep 29, 2023
1 parent 44a2d68 commit d7139e3
Showing 1 changed file with 76 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -560,61 +560,90 @@ private void getTokenDocById(
}
final GetRequest getRequest = client.prepareGet(tokensIndex.aliasName(), getTokenDocumentId(tokenId)).request();
final Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("get token from id", tokenId, ex));
// This backoff policy adds 445ms [15+25+45+115+245] to the case when the document really doesn't exist
final BackoffPolicy backoffPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(15), 5);
tokensIndex.checkIndexVersionThenExecute(
ex -> listener.onFailure(traceLog("prepare tokens index [" + tokensIndex.aliasName() + "]", tokenId, ex)),
() -> executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
SECURITY_ORIGIN,
getRequest,
ActionListener.<GetResponse>wrap(response -> {
if (response.isExists() == false) {
// The chances of a random token string decoding to something that we can read is minimal, so
// we assume that this was a token we have created but is now expired/revoked and deleted
logger.trace("The token [{}] probably expired and has already been deleted", tokenId);
listener.onResponse(null);
return;
}
Map<String, Object> accessSource = (Map<String, Object>) response.getSource().get("access_token");
Map<String, Object> refreshSource = (Map<String, Object>) response.getSource().get("refresh_token");
boolean versionGetForRefresh = tokenVersion.onOrAfter(VERSION_GET_TOKEN_DOC_FOR_REFRESH);
if (accessSource == null) {
onFailure.accept(new IllegalStateException("token document is missing the access_token field"));
} else if (accessSource.containsKey("user_token") == false) {
onFailure.accept(new IllegalStateException("token document is missing the user_token field"));
} else if (versionGetForRefresh && accessSource.containsKey("token") == false) {
onFailure.accept(new IllegalStateException("token document is missing the user_token.token field"));
} else if (versionGetForRefresh && refreshSource != null && refreshSource.containsKey("token") == false) {
onFailure.accept(new IllegalStateException("token document is missing the refresh_token.token field"));
} else if (storedAccessToken != null && storedAccessToken.equals(accessSource.get("token")) == false) {
() -> getDocumentWithRetry(getRequest, backoffPolicy.iterator(), response -> {
if (response.isExists() == false) {
// The chances of a random token string decoding to something that we can read is minimal, so
// we assume that this was a token we have created but is now expired/revoked and deleted
logger.trace("The token [{}] probably expired and has already been deleted", tokenId);
listener.onResponse(null);
return;
}
Map<String, Object> accessSource = (Map<String, Object>) response.getSource().get("access_token");
Map<String, Object> refreshSource = (Map<String, Object>) response.getSource().get("refresh_token");
boolean versionGetForRefresh = tokenVersion.onOrAfter(VERSION_GET_TOKEN_DOC_FOR_REFRESH);
if (accessSource == null) {
onFailure.accept(new IllegalStateException("token document is missing the access_token field"));
} else if (accessSource.containsKey("user_token") == false) {
onFailure.accept(new IllegalStateException("token document is missing the user_token field"));
} else if (versionGetForRefresh && accessSource.containsKey("token") == false) {
onFailure.accept(new IllegalStateException("token document is missing the user_token.token field"));
} else if (versionGetForRefresh && refreshSource != null && refreshSource.containsKey("token") == false) {
onFailure.accept(new IllegalStateException("token document is missing the refresh_token.token field"));
} else if (storedAccessToken != null && storedAccessToken.equals(accessSource.get("token")) == false) {
logger.error("The stored access token [{}] for token doc id [{}] could not be verified", storedAccessToken, tokenId);
listener.onResponse(null);
} else if (storedRefreshToken != null
&& (refreshSource == null || storedRefreshToken.equals(refreshSource.get("token")) == false)) {
logger.error(
"The stored access token [{}] for token doc id [{}] could not be verified",
storedAccessToken,
"The stored refresh token [{}] for token doc id [{}] could not be verified",
storedRefreshToken,
tokenId
);
listener.onResponse(null);
} else if (storedRefreshToken != null
&& (refreshSource == null || storedRefreshToken.equals(refreshSource.get("token")) == false)) {
logger.error(
"The stored refresh token [{}] for token doc id [{}] could not be verified",
storedRefreshToken,
tokenId
);
listener.onResponse(null);
} else {
listener.onResponse(new Doc(response));
}
}, e -> {
// if the index or the shard is not there / available we assume that
// the token is not valid
if (isShardNotAvailableException(e)) {
logger.warn("failed to get token doc [{}] because index [{}] is not available", tokenId, tokensIndex.aliasName());
} else {
logger.error(() -> "failed to get token doc [" + tokenId + "]", e);
listener.onResponse(new Doc(response));
}
listener.onFailure(e);
}),
client::get
)
}, e -> {
// if the index or the shard is not there / available we assume that
// the token is not valid
if (isShardNotAvailableException(e)) {
logger.warn("failed to get token doc [{}] because index [{}] is not available", tokenId, tokensIndex.aliasName());
} else {
logger.error(() -> "failed to get token doc [" + tokenId + "]", e);
}
listener.onFailure(e);
})
);
}

private void getDocumentWithRetry(
GetRequest getRequest,
Iterator<TimeValue> backoff,
Consumer<GetResponse> onResponse,
Consumer<Exception> onFailure
) {
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
SECURITY_ORIGIN,
getRequest,
ActionListener.<GetResponse>wrap(response -> {
logger.trace("Token document [{}] is [{}]", getRequest, response);
if (response.isExists() == false) {
if (backoff.hasNext()) {
// This is needed due to a bug in real-time-get on stateless
final TimeValue waitTime = backoff.next();
logger.debug(
"Token document [{}][{}] not found, waiting [{}] then trying again",
getRequest.index(),
getRequest.id(),
waitTime
);
client.threadPool()
.schedule(
() -> getDocumentWithRetry(getRequest, backoff, onResponse, onFailure),
waitTime,
client.threadPool().generic()
);
return;
}
}
onResponse.accept(response);
}, onFailure),
client::get
);
}

Expand Down

0 comments on commit d7139e3

Please sign in to comment.