Skip to content

Commit

Permalink
HDDS-2980. Delete replayed entry from OpenKeyTable during commit (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
hanishakoneru committed Mar 30, 2020
1 parent 386baf1 commit 79ce00e
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,24 @@
*/
public class OMReplayException extends IOException {

private final boolean needsDBOperations;

public OMReplayException() {
this(false);
}

/**
* When the transaction is a replay but still needs some DB operations to
* be performed (such as cleanup of old keys).
* @param needsDBOperations
*/
public OMReplayException(boolean needsDBOperations) {
// Dummy message. This exception is not thrown to client.
super("Replayed transaction");
this.needsDBOperations = needsDBOperations;
}

public OMReplayException(String message) {
super(message);
public boolean isDBOperationNeeded() {
return needsDBOperations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,6 @@ public OMFileCreateRequest(OMRequest omRequest) {
super(omRequest);
}

private enum Result {
SUCCESS,
REPLAY,
FAILURE
}

@Override
public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
CreateFileRequest createFileRequest = getOmRequest().getCreateFileRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class OMKeyCommitRequest extends OMKeyRequest {
private enum Result {
SUCCESS,
REPLAY,
DELETE_OPEN_KEY_ONLY,
FAILURE
}

Expand Down Expand Up @@ -113,7 +114,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
OzoneManagerProtocolProtos.OMResponse.newBuilder().setCmdType(
OzoneManagerProtocolProtos.Type.CommitKey).setStatus(
OzoneManagerProtocolProtos.Status.OK).setSuccess(true);
OzoneManagerProtocolProtos.Status.OK).setSuccess(true)
.setCommitKeyResponse(CommitKeyResponse.newBuilder());

IOException exception = null;
OmKeyInfo omKeyInfo = null;
Expand All @@ -122,6 +124,11 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
Result result = null;

OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
String dbOzoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
keyName);
String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName,
keyName, commitKeyRequest.getClientID());

try {
// check Acl
checkKeyAclsInOpenKeyTable(ozoneManager, volumeName, bucketName,
Expand All @@ -133,11 +140,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
.map(OmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList());

String dbOzoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
keyName);
String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName,
keyName, commitKeyRequest.getClientID());

bucketLockAcquired = omMetadataManager.getLock().acquireLock(BUCKET_LOCK,
volumeName, bucketName);

Expand All @@ -152,9 +154,20 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
if (dbKeyInfo != null) {
// Check if this transaction is a replay of ratis logs
if (isReplay(ozoneManager, dbKeyInfo, trxnLogIndex)) {
// Replay implies the response has already been returned to
// the client. So take no further action and return a dummy
// OMClientResponse.
// During KeyCreate, we do not check the OpenKey Table for replay.
// This is so as to avoid an extra DB read during KeyCreate.
// If KeyCommit is a replay, the KeyCreate request could also have
// been replayed. And since we do not check for replay in KeyCreate,
// we should scrub the key from OpenKey table now, is it exists.

omKeyInfo = omMetadataManager.getOpenKeyTable().get(dbOpenKey);
if (omKeyInfo != null) {
omMetadataManager.getOpenKeyTable().addCacheEntry(
new CacheKey<>(dbOpenKey),
new CacheValue<>(Optional.absent(), trxnLogIndex));

throw new OMReplayException(true);
}
throw new OMReplayException();
}
}
Expand All @@ -163,7 +176,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
omKeyInfo = omMetadataManager.getOpenKeyTable().get(dbOpenKey);
if (omKeyInfo == null) {
throw new OMException("Failed to commit key, as " + dbOpenKey +
"entry is not found in the openKey table", KEY_NOT_FOUND);
"entry is not found in the OpenKey table", KEY_NOT_FOUND);
}
omKeyInfo.setDataSize(commitKeyArgs.getDataSize());

Expand All @@ -184,21 +197,26 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
new CacheKey<>(dbOzoneKey),
new CacheValue<>(Optional.of(omKeyInfo), trxnLogIndex));

omResponse.setCommitKeyResponse(CommitKeyResponse.newBuilder().build());
omClientResponse = new OMKeyCommitResponse(omResponse.build(),
omKeyInfo, commitKeyRequest.getClientID());
omKeyInfo, dbOzoneKey, dbOpenKey);

result = Result.SUCCESS;
} catch (IOException ex) {
if (ex instanceof OMReplayException) {
result = Result.REPLAY;
omClientResponse = new OMKeyCommitResponse(createReplayOMResponse(
omResponse));
if (((OMReplayException) ex).isDBOperationNeeded()) {
result = Result.DELETE_OPEN_KEY_ONLY;
omClientResponse = new OMKeyCommitResponse(omResponse.build(),
dbOpenKey);
} else {
result = Result.REPLAY;
omClientResponse = new OMKeyCommitResponse(createReplayOMResponse(
omResponse));
}
} else {
result = Result.FAILURE;
exception = ex;
omClientResponse = new OMKeyCommitResponse(
createErrorOMResponse(omResponse, exception));
omClientResponse = new OMKeyCommitResponse(createErrorOMResponse(
omResponse, exception));
}
} finally {
addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
Expand All @@ -211,15 +229,13 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
}

// Performing audit logging outside of the lock.
if (result != Result.REPLAY) {
if (result != Result.REPLAY && result != Result.DELETE_OPEN_KEY_ONLY) {
auditLog(auditLogger, buildAuditMessage(OMAction.COMMIT_KEY, auditMap,
exception, getOmRequest().getUserInfo()));
}

switch (result) {
case SUCCESS:
omResponse.setCommitKeyResponse(CommitKeyResponse.newBuilder().build());

// As when we commit the key, then it is visible in ozone, so we should
// increment here.
// As key also can have multiple versions, we need to increment keys
Expand All @@ -236,6 +252,10 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
LOG.debug("Replayed Transaction {} ignored. Request: {}", trxnLogIndex,
commitKeyRequest);
break;
case DELETE_OPEN_KEY_ONLY:
LOG.debug("Replayed Transaction {}. Deleting old key {} from OpenKey " +
"table. Request: {}", trxnLogIndex, dbOpenKey, commitKeyRequest);
break;
case FAILURE:
LOG.error("Key commit failed. Volume:{}, Bucket:{}, Key:{}. Exception:{}",
volumeName, bucketName, keyName, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ public class S3InitiateMultipartUploadRequest extends OMKeyRequest {
private static final Logger LOG =
LoggerFactory.getLogger(S3InitiateMultipartUploadRequest.class);

private enum Result {
SUCCESS,
REPLAY,
FAILURE
}

public S3InitiateMultipartUploadRequest(OMRequest omRequest) {
super(omRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
*/
public class S3MultipartUploadCommitPartRequest extends OMKeyRequest {


private static final Logger LOG =
LoggerFactory.getLogger(S3MultipartUploadCommitPartRequest.class);

Expand Down Expand Up @@ -147,11 +146,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
throw new OMException("Failed to commit Multipart Upload key, as " +
openKey + "entry is not found in the openKey table",
KEY_NOT_FOUND);
} else {
// Check the OpenKeyTable if this transaction is a replay of ratis logs.
if (isReplay(ozoneManager, omKeyInfo, trxnLogIndex)) {
throw new OMReplayException();
}
}

// set the data size and location info list
Expand All @@ -177,48 +171,48 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
throw new OMException("No such Multipart upload is with specified " +
"uploadId " + uploadID,
OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
} else {
int partNumber = keyArgs.getMultipartNumber();
oldPartKeyInfo = multipartKeyInfo.getPartKeyInfo(partNumber);
}

// Build this multipart upload part info.
OzoneManagerProtocolProtos.PartKeyInfo.Builder partKeyInfo =
OzoneManagerProtocolProtos.PartKeyInfo.newBuilder();
partKeyInfo.setPartName(partName);
partKeyInfo.setPartNumber(partNumber);
partKeyInfo.setPartKeyInfo(omKeyInfo.getProtobuf());
int partNumber = keyArgs.getMultipartNumber();
oldPartKeyInfo = multipartKeyInfo.getPartKeyInfo(partNumber);

// Add this part information in to multipartKeyInfo.
multipartKeyInfo.addPartKeyInfo(partNumber, partKeyInfo.build());
// Build this multipart upload part info.
OzoneManagerProtocolProtos.PartKeyInfo.Builder partKeyInfo =
OzoneManagerProtocolProtos.PartKeyInfo.newBuilder();
partKeyInfo.setPartName(partName);
partKeyInfo.setPartNumber(partNumber);
partKeyInfo.setPartKeyInfo(omKeyInfo.getProtobuf());

// Set the UpdateID to current transactionLogIndex
multipartKeyInfo.setUpdateID(trxnLogIndex,
ozoneManager.isRatisEnabled());
// Add this part information in to multipartKeyInfo.
multipartKeyInfo.addPartKeyInfo(partNumber, partKeyInfo.build());

// Set the UpdateID to current transactionLogIndex
multipartKeyInfo.setUpdateID(trxnLogIndex,
ozoneManager.isRatisEnabled());

// OldPartKeyInfo will be deleted. Its updateID will be set in
// S3MultipartUplodaCommitPartResponse before being added to
// DeletedKeyTable.
// OldPartKeyInfo will be deleted. Its updateID will be set in
// S3MultipartUplodaCommitPartResponse before being added to
// DeletedKeyTable.

// Add to cache.
// Add to cache.

// Delete from open key table and add it to multipart info table.
// No need to add cache entries to delete table, as no
// read/write requests that info for validation.
omMetadataManager.getMultipartInfoTable().addCacheEntry(
new CacheKey<>(multipartKey),
new CacheValue<>(Optional.of(multipartKeyInfo),
trxnLogIndex));
// Delete from open key table and add it to multipart info table.
// No need to add cache entries to delete table, as no
// read/write requests that info for validation.
omMetadataManager.getMultipartInfoTable().addCacheEntry(
new CacheKey<>(multipartKey),
new CacheValue<>(Optional.of(multipartKeyInfo),
trxnLogIndex));

omMetadataManager.getOpenKeyTable().addCacheEntry(
new CacheKey<>(openKey),
new CacheValue<>(Optional.absent(), trxnLogIndex));
}
omMetadataManager.getOpenKeyTable().addCacheEntry(
new CacheKey<>(openKey),
new CacheValue<>(Optional.absent(), trxnLogIndex));

omResponse.setCommitMultiPartUploadResponse(
MultipartCommitUploadPartResponse.newBuilder()
.setPartName(partName));
omClientResponse = new S3MultipartUploadCommitPartResponse(
omResponse.build(), multipartKey, openKey, omKeyInfo,
omResponse.build(), multipartKey, openKey,
multipartKeyInfo, oldPartKeyInfo, ozoneManager.isRatisEnabled());

result = Result.SUCCESS;
Expand All @@ -231,8 +225,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
result = Result.FAILURE;
exception = ex;
omClientResponse = new S3MultipartUploadCommitPartResponse(
createErrorOMResponse(omResponse, exception), multipartKey,
openKey, omKeyInfo, multipartKeyInfo, oldPartKeyInfo,
createErrorOMResponse(omResponse, exception), openKey, omKeyInfo,
ozoneManager.isRatisEnabled());
}
} finally {
Expand Down
Loading

0 comments on commit 79ce00e

Please sign in to comment.