Skip to content

Commit

Permalink
HADOOP-18647. x-ms-client-request-id to identify the retry of an API. (
Browse files Browse the repository at this point in the history
…apache#5437)

The x-ms-client-request-id now includes a field to indicate a call is a retry of a previous
operation

Contributed by Pranav Saxena

(cherry picked from commit 759ddeb)
  • Loading branch information
saxenapranav committed Jun 1, 2023
1 parent 4ad9424 commit 89a7096
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 4 deletions.
Expand Up @@ -64,6 +64,16 @@ public class TracingContext {
private String fallbackDFSAppend = "B";
private String header = EMPTY_STRING;

/**
* If {@link #primaryRequestId} is null, this field shall be set equal
* to the last part of the {@link #clientRequestId}'s UUID
* in {@link #constructHeader(AbfsHttpOperation, String)} only on the
* first API call for an operation. Subsequent retries for that operation
* will not change this field. In case {@link #primaryRequestId} is non-null,
* this field shall not be set.
*/
private String primaryRequestIdForRetry;

private static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
public static final int MAX_CLIENT_CORRELATION_ID_LENGTH = 72;
public static final String CLIENT_CORRELATION_ID_PATTERN = "[a-zA-Z0-9-]*";
Expand Down Expand Up @@ -165,17 +175,17 @@ public String getFallbackDFSAppend() {
* X_MS_CLIENT_REQUEST_ID header of the http operation
* @param httpOperation AbfsHttpOperation instance to set header into
* connection
* @param previousFailure List of failures seen before this API trigger on
* same operation from AbfsClient.
* @param previousFailure Failure seen before this API trigger on same operation
* from AbfsClient.
*/
public void constructHeader(AbfsHttpOperation httpOperation, String previousFailure) {
clientRequestId = UUID.randomUUID().toString();
switch (format) {
case ALL_ID_FORMAT: // Optional IDs (e.g. streamId) may be empty
header =
clientCorrelationID + ":" + clientRequestId + ":" + fileSystemID + ":"
+ primaryRequestId + ":" + streamID + ":" + opType + ":"
+ retryCount;
+ getPrimaryRequestIdForHeader(retryCount > 0) + ":" + streamID
+ ":" + opType + ":" + retryCount;
header = addFailureReasons(header, previousFailure) + ":" + fallbackDFSAppend;
break;
case TWO_ID_FORMAT:
Expand All @@ -188,6 +198,31 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail
listener.callTracingHeaderValidator(header, format);
}
httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID, header);
/*
* In case the primaryRequestId is an empty-string and if it is the first try to
* API call (previousFailure shall be null), maintain the last part of clientRequestId's
* UUID in primaryRequestIdForRetry. This field shall be used as primaryRequestId part
* of the x-ms-client-request-id header in case of retry of the same API-request.
*/
if (primaryRequestId.isEmpty() && previousFailure == null) {
String[] clientRequestIdParts = clientRequestId.split("-");
primaryRequestIdForRetry = clientRequestIdParts[
clientRequestIdParts.length - 1];
}
}

/**
* Provide value to be used as primaryRequestId part of x-ms-client-request-id header.
* @param isRetry define if it's for a retry case.
* @return {@link #primaryRequestIdForRetry}:If the {@link #primaryRequestId}
* is an empty-string, and it's a retry iteration.
* {@link #primaryRequestId} for other cases.
*/
private String getPrimaryRequestIdForHeader(final Boolean isRetry) {
if (!primaryRequestId.isEmpty() || !isRetry) {
return primaryRequestId;
}
return primaryRequestIdForRetry;
}

private String addFailureReasons(final String header,
Expand Down
Expand Up @@ -32,12 +32,14 @@
import org.junit.AssumptionViolatedException;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;

import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
Expand Down Expand Up @@ -206,4 +208,74 @@ public void testExternalOps() throws Exception {
fs.getAbfsStore().setNamespaceEnabled(Trilean.TRUE);
fs.access(new Path("/"), FsAction.READ);
}

@Test
public void testRetryPrimaryRequestIdWhenInitiallySuppliedEmpty() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
final String fileSystemId = fs.getFileSystemId();
final String clientCorrelationId = fs.getClientCorrelationId();
final TracingHeaderFormat tracingHeaderFormat = TracingHeaderFormat.ALL_ID_FORMAT;
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new TracingHeaderValidator(
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.CREATE_FILESYSTEM, false,
0));
AbfsHttpOperation abfsHttpOperation = Mockito.mock(AbfsHttpOperation.class);
Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito.anyString(), Mockito.anyString());
tracingContext.constructHeader(abfsHttpOperation, null);
String header = tracingContext.getHeader();
String clientRequestIdUsed = header.split(":")[1];
String[] clientRequestIdUsedParts = clientRequestIdUsed.split("-");
String assertionPrimaryId = clientRequestIdUsedParts[clientRequestIdUsedParts.length - 1];

tracingContext.setRetryCount(1);
tracingContext.setListener(new TracingHeaderValidator(
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.CREATE_FILESYSTEM, false,
1));

tracingContext.constructHeader(abfsHttpOperation, "RT");
header = tracingContext.getHeader();
String primaryRequestId = header.split(":")[3];

Assertions.assertThat(primaryRequestId)
.describedAs("PrimaryRequestId in a retried request's "
+ "tracingContext should be equal to last part of original "
+ "request's clientRequestId UUID")
.isEqualTo(assertionPrimaryId);
}

@Test
public void testRetryPrimaryRequestIdWhenInitiallySuppliedNonEmpty() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
final String fileSystemId = fs.getFileSystemId();
final String clientCorrelationId = fs.getClientCorrelationId();
final TracingHeaderFormat tracingHeaderFormat = TracingHeaderFormat.ALL_ID_FORMAT;
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new TracingHeaderValidator(
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.CREATE_FILESYSTEM, false,
0));
tracingContext.setPrimaryRequestID();
AbfsHttpOperation abfsHttpOperation = Mockito.mock(AbfsHttpOperation.class);
Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito.anyString(), Mockito.anyString());
tracingContext.constructHeader(abfsHttpOperation, null);
String header = tracingContext.getHeader();
String assertionPrimaryId = header.split(":")[3];

tracingContext.setRetryCount(1);
tracingContext.setListener(new TracingHeaderValidator(
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.CREATE_FILESYSTEM, false,
1));

tracingContext.constructHeader(abfsHttpOperation, "RT");
header = tracingContext.getHeader();
String primaryRequestId = header.split(":")[3];

Assertions.assertThat(primaryRequestId)
.describedAs("PrimaryRequestId in a retried request's tracingContext "
+ "should be equal to PrimaryRequestId in the original request.")
.isEqualTo(assertionPrimaryId);
}
}
Expand Up @@ -130,6 +130,9 @@ private void validateBasicFormat(String[] idList) {
}
Assertions.assertThat(idList[5]).describedAs("Operation name incorrect")
.isEqualTo(operation.toString());
if (idList[6].contains("_")) {
idList[6] = idList[6].split("_")[0];
}
int retryCount = Integer.parseInt(idList[6]);
Assertions.assertThat(retryCount)
.describedAs("Retry was required due to issue on server side")
Expand Down

0 comments on commit 89a7096

Please sign in to comment.