Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.oracle.bmc.objectstorage.responses.ListMultipartUploadsResponse;
import com.oracle.bmc.objectstorage.transfer.internal.MultipartManifestImpl;
import com.oracle.bmc.objectstorage.transfer.internal.MultipartTransferManager;
import com.oracle.bmc.retrier.RetryConfiguration;
import com.oracle.bmc.util.StreamUtils;
import com.oracle.bmc.util.internal.Consumer;
import lombok.Builder;
Expand Down Expand Up @@ -64,6 +65,8 @@ public class MultipartObjectAssembler {
private MultipartManifestImpl manifest;
private boolean initialized = false;

private RetryConfiguration retryConfiguration;

/**
* The opcClientRequestId to send for all requests related to this multi-part upload.
*/
Expand Down Expand Up @@ -96,7 +99,8 @@ public MultipartObjectAssembler(
allowOverwrite,
executorService,
null /* opcClientRequestId */,
null /* invocationCallback */);
null /* invocationCallback */,
UploadManager.RETRY_CONFIGURATION); /* backwards compatibility */
}

@Builder
Expand All @@ -108,7 +112,8 @@ private MultipartObjectAssembler(
boolean allowOverwrite,
ExecutorService executorService,
String opcClientRequestId,
Consumer<Invocation.Builder> invocationCallback) {
Consumer<Invocation.Builder> invocationCallback,
RetryConfiguration retryConfiguration) {
this.service = service;
this.namespaceName = namespaceName;
this.bucketName = bucketName;
Expand All @@ -117,6 +122,7 @@ private MultipartObjectAssembler(
this.executorService = executorService;
this.opcClientRequestId = opcClientRequestId;
this.invocationCallback = invocationCallback;
this.retryConfiguration = retryConfiguration;
}

/**
Expand Down Expand Up @@ -299,7 +305,7 @@ private int doUploadPart(InputStream stream, long contentLength, String md5, int
.opcClientRequestId(createClientRequestId("-" + partNumber))
.build();

request.setRetryConfiguration(UploadManager.RETRY_CONFIGURATION);
request.setRetryConfiguration(this.retryConfiguration);

transferManager.startTransfer(request);
return partNumber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ private UploadResponse singleUpload(UploadRequest uploadRequest, long contentLen
.build();
}

putObjectRequest.setRetryConfiguration(RETRY_CONFIGURATION);
/* RetryConfiguration used should either be the one set on this UploadRequest or a default */
putObjectRequest.setRetryConfiguration(getRetryToUse(putObjectRequest.getRetryConfiguration()));

PutObjectResponse response = objectStorage.putObject(putObjectRequest);
return new UploadResponse(
Expand Down Expand Up @@ -247,11 +248,33 @@ private UploadResponse multipartUpload(UploadRequest uploadRequest) {
}
}

/**
* Determines the first non-null RetryConfiguration
* 1 -> RetryConfiguration set on UploadConfiguration
* 2 -> Default static RetryConfiguration for UploadManager
*
* @return RetryConfiguration first non-null condition or UploadManager default
*/
private static RetryConfiguration getRetryToUse(RetryConfiguration ...configs) {
for (RetryConfiguration cfg : configs) {
if (cfg != null)
return cfg;
}

return UploadManager.RETRY_CONFIGURATION;
}

@VisibleForTesting
protected MultipartObjectAssembler createAssembler(
PutObjectRequest request,
UploadRequest uploadRequest,
ExecutorService executorService) {

// in case request != uploadRequest.putObjectRequest then choose the correct RetryConfiguration
RetryConfiguration retryToUse = getRetryToUse(
uploadRequest.putObjectRequest.getRetryConfiguration(),
request.getRetryConfiguration());

return MultipartObjectAssembler.builder()
.allowOverwrite(uploadRequest.allowOverwrite)
.bucketName(request.getBucketName())
Expand All @@ -261,6 +284,7 @@ protected MultipartObjectAssembler createAssembler(
.objectName(request.getObjectName())
.opcClientRequestId(request.getOpcClientRequestId())
.service(objectStorage)
.retryConfiguration(retryToUse)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.oracle.bmc.retrier.DefaultRetryCondition;
import com.oracle.bmc.retrier.RetryConfiguration;
import com.oracle.bmc.util.internal.Consumer;
import com.oracle.bmc.waiter.ExponentialBackoffDelayStrategy;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -65,8 +68,14 @@ public class MultipartObjectAssemblerTest {
private static final Map<String, String> OPC_META = new HashMap<>();
private static final boolean ALLOW_OVERWRITE = false;

private static final RetryConfiguration RETRY_CONFIGURATION = RetryConfiguration.builder()
.delayStrategy(new ExponentialBackoffDelayStrategy(120000))
.retryCondition(new DefaultRetryCondition())
.build();

private ExecutorService executorService;
private MultipartObjectAssembler assembler;
private MultipartObjectAssembler assemblerWithRetryConfiguration;

@Mock private Consumer<Invocation.Builder> mockInvocationCallback;

Expand All @@ -87,13 +96,52 @@ public void setUp() {
.objectName(OBJECT)
.service(service)
.build();
assemblerWithRetryConfiguration =
MultipartObjectAssembler.builder()
.allowOverwrite(ALLOW_OVERWRITE)
.bucketName(BUCKET)
.executorService(executorService)
.invocationCallback(mockInvocationCallback)
.namespaceName(NAMESPACE)
.objectName(OBJECT)
.service(service)
.retryConfiguration(RETRY_CONFIGURATION)
.build();
}

@After
public void tearDown() {
executorService.shutdownNow();
}

@Test
public void newRequest_andVerifyManifestWithRetryConfiguration() {
String uploadId = "uploadId";

initializeCreateMultipartUpload(uploadId);

MultipartManifest manifest =
assemblerWithRetryConfiguration.newRequest(CONTENT_TYPE, CONTENT_LANGUAGE, CONTENT_ENCODING, OPC_META);
assertNotNull(manifest);
assertEquals(uploadId, manifest.getUploadId());

ArgumentCaptor<CreateMultipartUploadRequest> captor =
ArgumentCaptor.forClass(CreateMultipartUploadRequest.class);
verify(service).createMultipartUpload(captor.capture());

CreateMultipartUploadRequest request = captor.getValue();
assertEquals(NAMESPACE, request.getNamespaceName());
assertEquals(BUCKET, request.getBucketName());
assertEquals(OBJECT, request.getCreateMultipartUploadDetails().getObject());
assertEquals(CONTENT_TYPE, request.getCreateMultipartUploadDetails().getContentType());
assertEquals(
CONTENT_LANGUAGE, request.getCreateMultipartUploadDetails().getContentLanguage());
assertEquals(
CONTENT_ENCODING, request.getCreateMultipartUploadDetails().getContentEncoding());
assertEquals(OPC_META, request.getCreateMultipartUploadDetails().getMetadata());
assertEquals(mockInvocationCallback, request.getInvocationCallback());
}

@Test
public void newRequest_andVerifyManifest() {
String uploadId = "uploadId";
Expand Down Expand Up @@ -302,6 +350,72 @@ public void addParts_allSuccessful_commit() throws Exception {
file.delete();
}

@Test
public void addParts_allSuccessful_withRetryConfiguration_commit() throws Exception {
String uploadId = "uploadId";
initializeCreateMultipartUpload(uploadId);
MultipartManifest manifest =
assemblerWithRetryConfiguration.newRequest(CONTENT_TYPE, CONTENT_LANGUAGE, CONTENT_ENCODING, OPC_META);

byte[] bytes = "abcd".getBytes();

File file = File.createTempFile("unitTest", ".txt");
file.deleteOnExit();
try (FileOutputStream fos = new FileOutputStream(file)) {
fos.write(bytes);
}

String etag1 = "etag1";
String etag2 = "etag2";
UploadPartResponse uploadPartResponse1 = UploadPartResponse.builder().eTag(etag1).build();
UploadPartResponse uploadPartResponse2 = UploadPartResponse.builder().eTag(etag2).build();
when(service.uploadPart(any(UploadPartRequest.class)))
.thenReturn(uploadPartResponse1)
.thenReturn(uploadPartResponse2);

CommitMultipartUploadResponse finalCommitResponse =
CommitMultipartUploadResponse.builder().build();
when(service.commitMultipartUpload(any(CommitMultipartUploadRequest.class)))
.thenReturn(finalCommitResponse);

String md5_1 = "md5_1";
String md5_2 = "md5_2";

assemblerWithRetryConfiguration.addPart(file, md5_1);
assemblerWithRetryConfiguration.addPart(StreamUtils.createByteArrayInputStream(bytes), bytes.length, md5_2);

CommitMultipartUploadResponse commitResponse = assemblerWithRetryConfiguration.commit();
assertSame(finalCommitResponse, commitResponse);

ArgumentCaptor<CommitMultipartUploadRequest> commitCaptor =
ArgumentCaptor.forClass(CommitMultipartUploadRequest.class);
verify(service).commitMultipartUpload(commitCaptor.capture());
CommitMultipartUploadRequest actualCommitRequest = commitCaptor.getValue();
assertEquals(NAMESPACE, actualCommitRequest.getNamespaceName());
assertEquals(BUCKET, actualCommitRequest.getBucketName());
assertEquals(OBJECT, actualCommitRequest.getObjectName());
assertEquals(uploadId, actualCommitRequest.getUploadId());
assertEquals(mockInvocationCallback, actualCommitRequest.getInvocationCallback());

assertTrue(manifest.isUploadComplete());
assertTrue(manifest.isUploadSuccessful());
assertEquals(2, manifest.listCompletedParts().size());
assertEquals(1, manifest.listCompletedParts().get(0).getPartNum().intValue());
assertEquals(etag1, manifest.listCompletedParts().get(0).getEtag());
assertEquals(2, manifest.listCompletedParts().get(1).getPartNum().intValue());
assertEquals(etag2, manifest.listCompletedParts().get(1).getEtag());

ArgumentCaptor<UploadPartRequest> uploadCaptor =
ArgumentCaptor.forClass(UploadPartRequest.class);
verify(service, times(2)).uploadPart(uploadCaptor.capture());
verifyUploadPart(uploadCaptor.getAllValues().get(0), uploadId, 1, md5_1);
verifyUploadPart(uploadCaptor.getAllValues().get(1), uploadId, 2, md5_2);

uploadCaptor.getAllValues().forEach(r -> assertSame(RETRY_CONFIGURATION, r.getRetryConfiguration()));

file.delete();
}

@Test
public void addParts_someFailed_commitFailure() throws Exception {
String uploadId = "uploadId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
import com.oracle.bmc.objectstorage.transfer.UploadManager.UploadResponse;
import com.oracle.bmc.objectstorage.transfer.internal.MultipartManifestImpl;
import com.oracle.bmc.objectstorage.transfer.internal.MultipartUtils;
import com.oracle.bmc.retrier.DefaultRetryCondition;
import com.oracle.bmc.retrier.RetryConfiguration;
import com.oracle.bmc.util.StreamUtils;
import com.oracle.bmc.waiter.ExponentialBackoffDelayStrategy;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -144,6 +147,7 @@ public void upload_singleUpload() throws IOException {
byte[] buffer = new byte[(int) CONTENT_LENGTH];
putRequestCaptor.getValue().getPutObjectBody().read(buffer);
assertEquals(CONTENT, new String(buffer));
assertSame(UploadManager.RETRY_CONFIGURATION, putRequestCaptor.getValue().getRetryConfiguration());
assertEquals(CONTENT_LENGTH, putRequestCaptor.getValue().getContentLength().longValue());
assertEquals(CLIENT_REQ_ID, putRequestCaptor.getValue().getOpcClientRequestId());
assertSame(METADATA, putRequestCaptor.getValue().getOpcMeta());
Expand Down Expand Up @@ -173,6 +177,47 @@ public void upload_singleUpload_enforceMd5() throws Exception {
putRequestCaptor.getValue().getContentMD5()); // 'a' times content-length
}

@Test
public void upload_singleUpload_uploadRequestRetryConfiguration() throws Exception {
RetryConfiguration retryConfiguration = RetryConfiguration.builder()
.delayStrategy(new ExponentialBackoffDelayStrategy(120000))
.retryCondition(new DefaultRetryCondition())
.build();

UploadConfiguration uploadConfiguration =
UploadConfiguration.builder().allowMultipartUploads(false).build();
UploadManager uploadManager = new UploadManager(objectStorage, uploadConfiguration);

UploadRequest request = createUploadRequest(retryConfiguration);

ArgumentCaptor<PutObjectRequest> putRequestCaptor =
ArgumentCaptor.forClass(PutObjectRequest.class);
PutObjectResponse putResponse =
PutObjectResponse.builder()
.eTag("etag")
.opcContentMd5("md5")
.opcRequestId(REQ_ID)
.opcClientRequestId(CLIENT_REQ_ID)
.build();
when(objectStorage.putObject(putRequestCaptor.capture())).thenReturn(putResponse);

UploadResponse uploadResponse = uploadManager.upload(request);

assertNotNull(uploadResponse);
assertEquals("etag", uploadResponse.getETag());
assertEquals("md5", uploadResponse.getContentMd5());
assertNull(uploadResponse.getMultipartMd5());
assertEquals(REQ_ID, uploadResponse.getOpcRequestId());
assertEquals(CLIENT_REQ_ID, uploadResponse.getOpcClientRequestId());
byte[] buffer = new byte[(int) CONTENT_LENGTH];
putRequestCaptor.getValue().getPutObjectBody().read(buffer);
assertSame(retryConfiguration, putRequestCaptor.getValue().getRetryConfiguration());
assertEquals(CONTENT, new String(buffer));
assertEquals(CONTENT_LENGTH, putRequestCaptor.getValue().getContentLength().longValue());
assertEquals(CLIENT_REQ_ID, putRequestCaptor.getValue().getOpcClientRequestId());
assertSame(METADATA, putRequestCaptor.getValue().getOpcMeta());
}

@Test(expected = BmcException.class)
public void upload_singleUpload_enforceMd5_streamTooLargeToBuffer() {
UploadConfiguration uploadConfiguration =
Expand Down Expand Up @@ -655,7 +700,7 @@ private static UploadConfiguration getMultipartUploadConfiguration() {
.build();
}

private UploadRequest createUploadRequest(ProgressReporter progressReporter) {
private UploadRequest createUploadRequest(ProgressReporter progressReporter, RetryConfiguration retryConfiguration) {
PutObjectRequest request =
PutObjectRequest.builder()
.opcMeta(METADATA)
Expand All @@ -666,14 +711,23 @@ private UploadRequest createUploadRequest(ProgressReporter progressReporter) {
.namespaceName(NAMESPACE_NAME)
.bucketName(BUCKET_NAME)
.objectName(OBJECT_NAME)
.retryConfiguration(retryConfiguration)
.build();
return UploadRequest.builder(body, CONTENT_LENGTH)
.progressReporter(progressReporter)
.build(request);
}

private UploadRequest createUploadRequest(ProgressReporter progressReporter) {
return createUploadRequest(progressReporter, null);
}

private UploadRequest createUploadRequest() {
return createUploadRequest(null);
return createUploadRequest(null, null);
}

private UploadRequest createUploadRequest(RetryConfiguration retryConfiguration) {
return createUploadRequest(null, retryConfiguration);
}

private static void validateUploadResponseForMultipart(final UploadResponse response) {
Expand Down