Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jkakavas committed Mar 1, 2019
1 parent 35eb8ef commit cbc626d
Showing 1 changed file with 92 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -864,104 +864,106 @@ public void onFailure(Exception e) {
final Integer version = (Integer) userTokenSource.get("version");
final Map<String, Object> metadata = (Map<String, Object>) userTokenSource.get("metadata");
Version authVersion = Version.fromId(version);
Authentication authentication;
try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(authString))) {
in.setVersion(authVersion);
Authentication authentication = new Authentication(in);
final String newUserTokenId = UUIDs.randomBase64UUID();
final Instant refreshTime = clock.instant();
Map<String, Object> updateMap = new HashMap<>();
updateMap.put("refreshed", true);
updateMap.put("refresh_time", refreshTime.toEpochMilli());
updateMap.put("superseded_by", getTokenDocumentId(newUserTokenId));
UpdateRequestBuilder updateRequest =
client.prepareUpdate(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, tokenDocId)
.setDoc("refresh_token", updateMap)
.setFetchSource(true)
.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO : "expected an assigned sequence number";
updateRequest.setIfSeqNo(seqNo);
assert primaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM : "expected an assigned primary term";
updateRequest.setIfPrimaryTerm(primaryTerm);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, updateRequest.request(),
ActionListener.<UpdateResponse>wrap(
updateResponse -> {
if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
logger.debug("updated the original token document to {}", updateResponse.getGetResult().sourceAsMap());
createUserToken(newUserTokenId, authentication, clientAuth, listener, metadata, true);
} else if (backoff.hasNext()) {
logger.info("failed to update the original token document [{}], the update result was [{}]. Retrying",
tokenDocId, updateResponse.getResult());
client.threadPool().schedule(
() -> innerRefresh(tokenDocId, source, seqNo, primaryTerm, clientAuth, listener, backoff,
refreshRequested),
backoff.next(), GENERIC);
} else {
logger.info("failed to update the original token document [{}] after all retries, " +
"the update result was [{}]. ", tokenDocId, updateResponse.getResult());
listener.onFailure(invalidGrantException("could not refresh the requested token"));
}
}, e -> {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof VersionConflictEngineException) {
//The document has been updated by another thread, get it again.
if (backoff.hasNext()) {
logger.debug("version conflict while updating document [{}], attempting to get it again",
tokenDocId);
final ActionListener<GetResponse> getListener = new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse response) {
if (response.isExists()) {
innerRefresh(tokenDocId, response.getSource(), response.getSeqNo(),
response.getPrimaryTerm(), clientAuth, listener, backoff, refreshRequested);
} else {
logger.warn("could not find token document [{}] for refresh", tokenDocId);
onFailure.accept(invalidGrantException("could not refresh the requested token"));
}
authentication = new Authentication(in);
} catch (IOException e) {
logger.error("failed to decode the authentication stored with token document [{}]", tokenDocId);
onFailure.accept(invalidGrantException("could not refresh the requested token"));
return;
}
final String newUserTokenId = UUIDs.randomBase64UUID();
final Instant refreshTime = clock.instant();
Map<String, Object> updateMap = new HashMap<>();
updateMap.put("refreshed", true);
updateMap.put("refresh_time", refreshTime.toEpochMilli());
updateMap.put("superseded_by", getTokenDocumentId(newUserTokenId));
UpdateRequestBuilder updateRequest =
client.prepareUpdate(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, tokenDocId)
.setDoc("refresh_token", updateMap)
.setFetchSource(true)
.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO : "expected an assigned sequence number";
updateRequest.setIfSeqNo(seqNo);
assert primaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM : "expected an assigned primary term";
updateRequest.setIfPrimaryTerm(primaryTerm);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, updateRequest.request(),
ActionListener.<UpdateResponse>wrap(
updateResponse -> {
if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
logger.debug("updated the original token document to {}", updateResponse.getGetResult().sourceAsMap());
createUserToken(newUserTokenId, authentication, clientAuth, listener, metadata, true);
} else if (backoff.hasNext()) {
logger.info("failed to update the original token document [{}], the update result was [{}]. Retrying",
tokenDocId, updateResponse.getResult());
client.threadPool().schedule(
() -> innerRefresh(tokenDocId, source, seqNo, primaryTerm, clientAuth, listener, backoff,
refreshRequested),
backoff.next(), GENERIC);
} else {
logger.info("failed to update the original token document [{}] after all retries, " +
"the update result was [{}]. ", tokenDocId, updateResponse.getResult());
listener.onFailure(invalidGrantException("could not refresh the requested token"));
}
}, e -> {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof VersionConflictEngineException) {
//The document has been updated by another thread, get it again.
if (backoff.hasNext()) {
logger.debug("version conflict while updating document [{}], attempting to get it again",
tokenDocId);
final ActionListener<GetResponse> getListener = new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse response) {
if (response.isExists()) {
innerRefresh(tokenDocId, response.getSource(), response.getSeqNo(),
response.getPrimaryTerm(), clientAuth, listener, backoff, refreshRequested);
} else {
logger.warn("could not find token document [{}] for refresh", tokenDocId);
onFailure.accept(invalidGrantException("could not refresh the requested token"));
}

@Override
public void onFailure(Exception e) {
if (isShardNotAvailableException(e)) {
if (backoff.hasNext()) {
logger.info("could not get token document [{}] for refresh, " +
"retrying", tokenDocId);
client.threadPool().schedule(
() -> getTokenDocAsync(tokenDocId, this), backoff.next(), GENERIC);
} else {
logger.warn("could not get token document [{}] for refresh after all retries",
tokenDocId);
onFailure.accept(invalidGrantException("could not refresh the requested token"));
}
}

@Override
public void onFailure(Exception e) {
if (isShardNotAvailableException(e)) {
if (backoff.hasNext()) {
logger.info("could not get token document [{}] for refresh, " +
"retrying", tokenDocId);
client.threadPool().schedule(
() -> getTokenDocAsync(tokenDocId, this), backoff.next(), GENERIC);
} else {
onFailure.accept(e);
logger.warn("could not get token document [{}] for refresh after all retries",
tokenDocId);
onFailure.accept(invalidGrantException("could not refresh the requested token"));
}
} else {
onFailure.accept(e);
}
};
getTokenDocAsync(tokenDocId, getListener);
} else {
logger.warn("version conflict while updating document [{}], no retries left", tokenDocId);
onFailure.accept(invalidGrantException("could not refresh the requested token"));
}
} else if (isShardNotAvailableException(e)) {
if (backoff.hasNext()) {
logger.debug("failed to update the original token document [{}], retrying", tokenDocId);
client.threadPool().schedule(
() -> innerRefresh(tokenDocId, source, seqNo, primaryTerm, clientAuth, listener, backoff,
refreshRequested),
backoff.next(), GENERIC);
} else {
logger.warn("failed to update the original token document [{}], after all retries", tokenDocId);
onFailure.accept(invalidGrantException("could not refresh the requested token"));
}
}
};
getTokenDocAsync(tokenDocId, getListener);
} else {
onFailure.accept(e);
logger.warn("version conflict while updating document [{}], no retries left", tokenDocId);
onFailure.accept(invalidGrantException("could not refresh the requested token"));
}
}),
client::update);
} catch (IOException e) {
logger.error("failed to decode the authentication stored with token document [{}]", tokenDocId);
onFailure.accept(invalidGrantException("could not refresh the requested token"));
}
} else if (isShardNotAvailableException(e)) {
if (backoff.hasNext()) {
logger.debug("failed to update the original token document [{}], retrying", tokenDocId);
client.threadPool().schedule(
() -> innerRefresh(tokenDocId, source, seqNo, primaryTerm, clientAuth, listener, backoff,
refreshRequested),
backoff.next(), GENERIC);
} else {
logger.warn("failed to update the original token document [{}], after all retries", tokenDocId);
onFailure.accept(invalidGrantException("could not refresh the requested token"));
}
} else {
onFailure.accept(e);
}
}),
client::update);
}
}
}
Expand Down

0 comments on commit cbc626d

Please sign in to comment.