Skip to content

Commit

Permalink
NXP-24208: use S3 copy when writing a blob from another S3 binary man…
Browse files Browse the repository at this point in the history
…ager
  • Loading branch information
Luís Duarte authored and Florent Guillaume committed Mar 8, 2018
1 parent fcd823b commit a25ed36
Show file tree
Hide file tree
Showing 4 changed files with 323 additions and 1 deletion.
4 changes: 4 additions & 0 deletions nuxeo-core-binarymanager-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-cloudfront</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
* Contributors:
* Mathieu Guillaume
* Florent Guillaume
* Luís Duarte
*/
package org.nuxeo.ecm.core.storage.sql;

import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.nuxeo.ecm.core.storage.sql.S3Utils.NON_MULTIPART_COPY_MAX_SIZE;

import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -49,6 +51,10 @@
import org.nuxeo.common.Environment;
import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector;
import org.nuxeo.ecm.blob.AbstractCloudBinaryManager;
import org.nuxeo.ecm.core.api.Blob;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.blob.BlobManager;
import org.nuxeo.ecm.core.blob.BlobProvider;
import org.nuxeo.ecm.core.blob.ManagedBlob;
import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
import org.nuxeo.ecm.core.blob.binary.FileStorage;
Expand Down Expand Up @@ -428,6 +434,85 @@ protected FileStorage getFileStorage() {
return new S3FileStorage();
}

@Override
public String writeBlob(Blob blob) throws IOException {
// Attempt to do S3 Copy if the Source Blob provider is also S3
if (blob instanceof ManagedBlob) {
ManagedBlob managedBlob = (ManagedBlob) blob;
BlobProvider blobProvider = Framework.getService(BlobManager.class)
.getBlobProvider(managedBlob.getProviderId());
if (blobProvider instanceof S3BinaryManager && blobProvider != this) {
// use S3 direct copy as the source blob provider is also S3
String key = copyBlob((S3BinaryManager) blobProvider, managedBlob.getKey());
if (key != null) {
return key;
}
}
}
return super.writeBlob(blob);
}

/**
* Copies a blob. Returns {@code null} if the copy was not possible.
*
* @param sourceBlobProvider the source blob provider
* @param blobKey the source blob key
* @return the copied blob key, or {@code null} if the copy was not possible
* @throws IOException
* @since 10.1
*/
protected String copyBlob(S3BinaryManager sourceBlobProvider, String blobKey) throws IOException {
String digest = blobKey;
int colon = digest.indexOf(':');
if (colon >= 0) {
digest = digest.substring(colon + 1);
}
String sourceBucketName = sourceBlobProvider.bucketName;
String sourceKey = sourceBlobProvider.bucketNamePrefix + digest;
String key = bucketNamePrefix + digest;
long t0 = 0;
if (log.isDebugEnabled()) {
t0 = System.currentTimeMillis();
log.debug("copying blob " + sourceKey + " to " + key);
}

try {
amazonS3.getObjectMetadata(bucketName, key);
if (log.isDebugEnabled()) {
log.debug("blob " + key + " is already in S3");
}
return digest;
} catch (AmazonServiceException e) {
if (!isMissingKey(e)) {
throw new IOException(e);
}
// object does not exist, just continue
}

// not already present -> copy the blob
ObjectMetadata sourceMetadata;
try {
sourceMetadata = amazonS3.getObjectMetadata(sourceBucketName, sourceKey);
} catch (AmazonServiceException e) {
throw new NuxeoException("Source blob does not exists: s3://" + sourceBucketName + "/" + sourceKey, e);
}
try {
if (sourceMetadata.getContentLength() > NON_MULTIPART_COPY_MAX_SIZE) {
S3Utils.copyFileMultipart(amazonS3, sourceMetadata, sourceBucketName, sourceKey, bucketName, key, true);
} else {
S3Utils.copyFile(amazonS3, sourceMetadata, sourceBucketName, sourceKey, bucketName, key, true);
}
if (log.isDebugEnabled()) {
long dtms = System.currentTimeMillis() - t0;
log.debug("copied blob " + sourceKey + " to " + key + " in " + dtms + "ms");
}
return digest;
} catch (AmazonServiceException e) {
log.warn("direct S3 copy not supported, please check your keys and policies", e);
return null;
}
}

public class S3FileStorage implements FileStorage {

@Override
Expand All @@ -454,7 +539,7 @@ public void storeFile(String digest, File file) throws IOException {
if (useServerSideEncryption) {
ObjectMetadata objectMetadata = new ObjectMetadata();
if (isNotBlank(serverSideKMSKeyID)) {
SSEAwsKeyManagementParams keyManagementParams =
SSEAwsKeyManagementParams keyManagementParams =
new SSEAwsKeyManagementParams(serverSideKMSKeyID);
request = request.withSSEAwsKeyManagementParams(keyManagementParams);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* (C) Copyright 2011-2018 Nuxeo (http://nuxeo.com/) and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* Luís Duarte
* Florent Guillaume
*/
package org.nuxeo.ecm.core.storage.sql;

import static java.lang.Math.min;
import static org.apache.commons.lang3.StringUtils.isBlank;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import org.nuxeo.ecm.core.api.NuxeoException;

import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.CopyPartRequest;
import com.amazonaws.services.s3.model.CopyPartResult;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;

/**
* AWS S3 utilities.
*
* @since 10.1
*/
public class S3Utils {

/** The maximum size of a file that can be copied without using multipart: 5 GB */
public static final long NON_MULTIPART_COPY_MAX_SIZE = 5L * 1024 * 1024 * 1024;

/** The size of the parts that we use for multipart copy. */
public static final long PART_SIZE = 5L * 1024 * 1024; // 5 MB

private S3Utils() {
// utility class
}

/**
* Represents an operation that accepts a slice number and a slice begin and end position.
*/
@FunctionalInterface
public static interface SliceConsumer {
/**
* Performs this operation on the arguments.
*
* @param num the slice number, starting at 0
* @param begin the begin position
* @param end the end position + 1
*/
public void accept(int num, long begin, long end);
}

/**
* Calls the consumer on all slices.
*
* @param slice the slice size
* @param length the total length
* @param consumer the slice consumer
*/
public static void processSlices(long slice, long length, SliceConsumer consumer) {
if (slice <= 0) {
throw new IllegalArgumentException("Invalid slice length: " + slice);
}
long begin = 0;
for (int num = 0; begin < length; num++) {
long end = min(begin + slice, length);
consumer.accept(num, begin, end);
begin += slice;
}
}

/**
* Copies a file using multipart upload.
*
* @param amazonS3 the S3 client
* @param objectMetadata the metadata of the object being copied
* @param sourceBucket the source bucket
* @param sourceKey the source key
* @param targetBucket the target bucket
* @param targetKey the target key
* @param deleteSource whether to delete the source object if the copy is successful
*/
public static ObjectMetadata copyFileMultipart(AmazonS3 amazonS3, ObjectMetadata objectMetadata,
String sourceBucket, String sourceKey, String targetBucket, String targetKey, boolean deleteSource) {
InitiateMultipartUploadRequest initiateMultipartUploadRequest = new InitiateMultipartUploadRequest(sourceBucket,
targetKey);
InitiateMultipartUploadResult initiateMultipartUploadResult = amazonS3.initiateMultipartUpload(
initiateMultipartUploadRequest);

String uploadId = initiateMultipartUploadResult.getUploadId();
long objectSize = objectMetadata.getContentLength();
List<CopyPartResult> copyResponses = new ArrayList<>();

SliceConsumer partCopy = (num, begin, end) -> {
CopyPartRequest copyRequest = new CopyPartRequest().withSourceBucketName(sourceBucket)
.withSourceKey(sourceKey)
.withDestinationBucketName(targetBucket)
.withDestinationKey(targetKey)
.withFirstByte(begin)
.withLastByte(end - 1)
.withUploadId(uploadId)
.withPartNumber(num + 1);
copyResponses.add(amazonS3.copyPart(copyRequest));
};
processSlices(PART_SIZE, objectSize, partCopy);

CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(targetBucket, targetKey,
uploadId, responsesToETags(copyResponses));
amazonS3.completeMultipartUpload(completeRequest);
if (deleteSource) {
amazonS3.deleteObject(sourceBucket, sourceKey);
}
return amazonS3.getObjectMetadata(targetBucket, targetKey);
}

protected static List<PartETag> responsesToETags(List<CopyPartResult> responses) {
return responses.stream().map(response -> new PartETag(response.getPartNumber(), response.getETag())).collect(
Collectors.toList());
}

/**
* Copies a file without using multipart upload.
*
* @param amazonS3 the S3 client
* @param objectMetadata the metadata of the object being copied
* @param sourceBucket the source bucket
* @param sourceKey the source key
* @param targetBucket the target bucket
* @param targetKey the target key
* @param deleteSource whether to delete the source object if the copy is successful
*/
public static ObjectMetadata copyFile(AmazonS3 amazonS3, ObjectMetadata objectMetadata, String sourceBucket,
String sourceKey, String targetBucket, String targetKey, boolean deleteSource) {
CopyObjectRequest copyObjectRequest = new CopyObjectRequest(sourceBucket, sourceKey, targetBucket, targetKey);
amazonS3.copyObject(copyObjectRequest);
if (deleteSource) {
amazonS3.deleteObject(sourceBucket, sourceKey);
}
return amazonS3.getObjectMetadata(targetBucket, targetKey);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* Florent Guillaume
*/
package org.nuxeo.ecm.core.storage.sql;

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.junit.Test;
import org.nuxeo.ecm.core.storage.sql.S3Utils.SliceConsumer;

public class TestS3Utils {

@Test
public void testProcessSlices() {
List<String> list = new ArrayList<>();
SliceConsumer recorder = (num, begin, end) -> {
list.add(num + ":" + begin + "-" + end);
};

// typical case
list.clear();
S3Utils.processSlices(10, 25, recorder);
assertEquals(Arrays.asList("0:0-10", "1:10-20", "2:20-25"), list);

// exactly at the end
list.clear();
S3Utils.processSlices(10, 30, recorder);
assertEquals(Arrays.asList("0:0-10", "1:10-20", "2:20-30"), list);

// exactly one slice
list.clear();
S3Utils.processSlices(10, 10, recorder);
assertEquals(Arrays.asList("0:0-10"), list);

// slice smaller than total length
list.clear();
S3Utils.processSlices(10, 5, recorder);
assertEquals(Arrays.asList("0:0-5"), list);

// degenerate case
list.clear();
S3Utils.processSlices(10, 0, recorder);
assertEquals(Collections.emptyList(), list);
}

}

0 comments on commit a25ed36

Please sign in to comment.