Skip to content

Commit

Permalink
Fix math overflow when copying large AWS S3 files
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
pditommaso committed Dec 8, 2022
1 parent 66f4669 commit f32ea0b
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class TraceHelper {
Files.newBufferedWriter(path, Charset.defaultCharset(), openOptions(overwrite))
}
catch (FileAlreadyExistsException e) {
throw new AbortOperationException("$type file already exists: ${path.toUriString()} -- enable the relevant `overwrite` option in your config file to overwrite existing files", e)
throw new AbortOperationException("$type file already exists: ${path.toUriString()} -- enable the '${type.toLowerCase()}.overwrite' option in your config file to overwrite existing files", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class TraceHelperTest extends Specification {
TraceHelper.newFileWriter(path, false, 'Test')
then:
def e = thrown(AbortOperationException)
e.message == "Test file already exists: $path -- enable the relevant `overwrite` option in your config file to overwrite existing files"
e.message == "Test file already exists: $path -- enable the 'test.overwrite' option in your config file to overwrite existing files"

cleanup:
folder?.deleteDir()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.upplication.s3fs.util.S3UploadHelper.*;

/**
* Client Amazon S3
* @see com.amazonaws.services.s3.AmazonS3Client
Expand Down Expand Up @@ -359,10 +361,12 @@ public ObjectListing listNextBatchOfObjects(ObjectListing objectListing) {
return client.listNextBatchOfObjects(objectListing);
}


public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSize, S3MultipartOptions opts, List<Tag> tags, String contentType ) {

final String sourceBucketName = s3Source.getBucket();
final String sourceObjectKey = s3Source.getKey();
final String sourceS3Path = "s3://"+sourceBucketName+'/'+sourceObjectKey;
final String targetBucketName = s3Target.getBucket();
final String targetObjectKey = s3Target.getKey();
final ObjectMetadata meta = new ObjectMetadata();
Expand Down Expand Up @@ -394,15 +398,25 @@ public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSiz
// Step 3: Save upload Id.
String uploadId = initResult.getUploadId();

final int partSize = opts.getChunkSize(objectSize);
// Multipart upload and copy allows max 10_000 parts
// each part can be up to 5 GB
// Max file size is 5 TB
// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
final int defChunkSize = opts.getChunkSize();
final long partSize = computePartSize(objectSize, defChunkSize);
ExecutorService executor = S3OutputStream.getOrCreateExecutor(opts.getMaxThreads());
List<Callable<CopyPartResult>> copyPartRequests = new ArrayList<>();
checkPartSize(partSize);

// Step 4. create copy part requests
long bytePosition = 0;
for (int i = 1; bytePosition < objectSize; i++)
{
long lastPosition = bytePosition + partSize -1 >= objectSize ? objectSize - 1 : bytePosition + partSize - 1;
checkPartIndex(i, sourceS3Path, objectSize, partSize);

long lastPosition = bytePosition + partSize -1;
if( lastPosition >= objectSize )
lastPosition = objectSize - 1;

CopyPartRequest copyRequest = new CopyPartRequest()
.withDestinationBucketName(targetBucketName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class S3MultipartOptions {

private static final Logger log = LoggerFactory.getLogger(S3MultipartOptions.class);

public static final int DEFAULT_CHUNK_SIZE = 100 << 20; // 100 MB
public static final int DEFAULT_CHUNK_SIZE = 100 << 20; // 100 MiB

public static final int DEFAULT_BUFFER_SIZE = 10485760;

Expand Down Expand Up @@ -71,7 +71,7 @@ public class S3MultipartOptions {
private long retrySleep;


/**
/*
* initialize default values
*/
{
Expand Down Expand Up @@ -100,16 +100,6 @@ public int getChunkSize() {
return chunkSize;
}

public int getChunkSize( long objectSize ) {
final int MAX_PARTS = 10_000;
long numOfParts = objectSize / chunkSize;
if( numOfParts > MAX_PARTS ) {
chunkSize = (int) objectSize / MAX_PARTS;
}

return chunkSize;
}

public int getMaxThreads() {
return maxThreads;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2020-2022, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.upplication.s3fs.util;

/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
public class S3UploadHelper {

private static final long _1_KiB = 1024;
private static final long _1_MiB = _1_KiB * _1_KiB;
private static final long _1_GiB = _1_KiB * _1_KiB * _1_KiB;
private static final long _1_TiB = _1_KiB * _1_KiB * _1_KiB * _1_KiB;

/**
* AWS S3 max part size
* https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
*/
public static final long MIN_PART_SIZE = 5 * _1_MiB;

/**
* AWS S3 min part size
* https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
*/
public static final long MAX_PART_SIZE = 5 * _1_GiB;

/**
* AWS S3 max object size
* https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
*/
public static final long MAX_OBJECT_SIZE = 5 * _1_TiB;

/**
* AWS S3 max parts in multi-part upload and copy request
*/
public static final int MAX_PARTS_COUNT = 10_000;

static public long computePartSize( long objectSize, long chunkSize ) {
if( objectSize<0 ) throw new IllegalArgumentException("Argument 'objectSize' cannot be less than zero");
if( chunkSize<MIN_PART_SIZE ) throw new IllegalArgumentException("Argument 'chunkSize' cannot be less than " + MIN_PART_SIZE);
// Multipart upload and copy allows max 10_000 parts
// each part can be up to 5 GB
// Max file size is 5 TB
// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
long numOfParts = objectSize / chunkSize;
if( numOfParts > MAX_PARTS_COUNT) {
final long x = ceilDiv(objectSize, MAX_PARTS_COUNT);
return ceilDiv(x, 10* _1_MiB) *10* _1_MiB;
}
return chunkSize;
}


private static long ceilDiv(long x, long y){
return -Math.floorDiv(-x,y);
}

private static long ceilDiv(long x, int y){
return -Math.floorDiv(-x,y);
}

static public void checkPartSize(long partSize) {
if( partSize<MIN_PART_SIZE ) {
String msg = String.format("The minimum part size for S3 multipart copy and upload operation cannot be less than 5 MiB -- offending value: %d", partSize);
throw new IllegalArgumentException(msg);
}

if( partSize>MAX_PART_SIZE ) {
String msg = String.format("The minimum part size for S3 multipart copy and upload operation cannot be less than 5 GiB -- offending value: %d", partSize);
throw new IllegalArgumentException(msg);
}
}

static public void checkPartIndex(int i, String path, long fileSize, long chunkSize) {
if( i < 1 ) {
String msg = String.format("S3 multipart copy request index cannot less than 1 -- offending value: %d; file: '%s'; size: %d; part-size: %d", i, path, fileSize, chunkSize);
throw new IllegalArgumentException(msg);
}
if( i > MAX_PARTS_COUNT) {
String msg = String.format("S3 multipart copy request exceed the number of max allowed parts -- offending value: %d; file: '%s'; size: %d; part-size: %d", i, path, fileSize, chunkSize);
throw new IllegalArgumentException(msg);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import java.nio.file.attribute.BasicFileAttributes
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.Tag
import groovy.util.logging.Slf4j
import nextflow.Global
import nextflow.Session
import nextflow.exception.AbortOperationException
import nextflow.file.CopyMoveHelper
import nextflow.file.FileHelper
Expand Down Expand Up @@ -51,6 +53,10 @@ class AwsS3NioTest extends Specification implements AwsS3BaseSpec {
s3Client0 = fs.client.getClient()
}

def setup() {
Global.session = Mock(Session) { getConfig() >> [:] }
}

def 'should create a blob' () {
given:
def bucket = createBucket()
Expand Down Expand Up @@ -1332,7 +1338,7 @@ class AwsS3NioTest extends Specification implements AwsS3BaseSpec {
TraceHelper.newFileWriter(path, false, 'Test')
then:
def e = thrown(AbortOperationException)
e.message == "Test file already exists: ${path.toUriString()}"
e.message == "Test file already exists: ${path.toUriString()} -- enable the 'test.overwrite' option in your config file to overwrite existing files"

cleanup:
deleteBucket(bucket1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2020-2022, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.upplication.s3fs.util


import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Unroll
/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class S3UploadHelperTest extends Specification {

@Shared final long _1_KiB = 1_024
@Shared final long _1_MiB = _1_KiB **2
@Shared final long _1_GiB = _1_KiB **3
@Shared final long _1_TiB = _1_KiB **4

@Shared final long _10_MiB = _1_MiB * 10
@Shared final long _100_MiB = _1_MiB * 100

@Unroll
def 'should compute s3 file chunk size' () {

expect:
S3UploadHelper.computePartSize(FILE_SIZE, CHUNK_SIZE) == EXPECTED_CHUNK_SIZE
and:
def parts = FILE_SIZE / EXPECTED_CHUNK_SIZE
parts <= S3UploadHelper.MAX_PARTS_COUNT
parts > 0

where:
FILE_SIZE | EXPECTED_CHUNK_SIZE | CHUNK_SIZE
_1_KiB | _10_MiB | _10_MiB
_1_MiB | _10_MiB | _10_MiB
_1_GiB | _10_MiB | _10_MiB
_1_TiB | 110 * _1_MiB | _10_MiB
5 * _1_TiB | 530 * _1_MiB | _10_MiB
10 * _1_TiB | 1050 * _1_MiB | _10_MiB
and:
_1_KiB | _100_MiB | _100_MiB
_1_MiB | _100_MiB | _100_MiB
_1_GiB | _100_MiB | _100_MiB
_1_TiB | 110 * _1_MiB | _100_MiB
5 * _1_TiB | 530 * _1_MiB | _100_MiB
10 * _1_TiB | 1050 * _1_MiB | _100_MiB

}


def 'should check s3 part size' () {
when:
S3UploadHelper.checkPartSize(S3UploadHelper.MIN_PART_SIZE)
then:
noExceptionThrown()

when:
S3UploadHelper.checkPartSize(S3UploadHelper.MIN_PART_SIZE+1)
then:
noExceptionThrown()

when:
S3UploadHelper.checkPartSize(S3UploadHelper.MAX_PART_SIZE-1)
then:
noExceptionThrown()

when:
S3UploadHelper.checkPartSize(S3UploadHelper.MAX_PART_SIZE)
then:
noExceptionThrown()

when:
S3UploadHelper.checkPartSize(S3UploadHelper.MAX_PART_SIZE+1)
then:
thrown(IllegalArgumentException)

when:
S3UploadHelper.checkPartSize(S3UploadHelper.MIN_PART_SIZE-1)
then:
thrown(IllegalArgumentException)
}

def 'should check part index' () {
when:
S3UploadHelper.checkPartIndex(1, 's3://foo', 1000, 100)
then:
noExceptionThrown()

when:
S3UploadHelper.checkPartIndex(S3UploadHelper.MAX_PARTS_COUNT, 's3://foo', 1000, 100)
then:
noExceptionThrown()

when:
S3UploadHelper.checkPartIndex(S3UploadHelper.MAX_PARTS_COUNT+1, 's3://foo', 1000, 100)
then:
def e1 = thrown(IllegalArgumentException)
e1.message == "S3 multipart copy request exceed the number of max allowed parts -- offending value: 10001; file: 's3://foo'; size: 1000; part-size: 100"

when:
S3UploadHelper.checkPartIndex(0, 's3://foo', 1000, 100)
then:
def e2 = thrown(IllegalArgumentException)
e2.message == "S3 multipart copy request index cannot less than 1 -- offending value: 0; file: 's3://foo'; size: 1000; part-size: 100"


}
}

0 comments on commit f32ea0b

Please sign in to comment.