Skip to content

Commit

Permalink
Add append tests for multipart uploads.
Browse files Browse the repository at this point in the history
Former-commit-id: 1b4afcd17eb6122f2b4a67836aed60ec71a8a023
  • Loading branch information
dkocher committed Aug 25, 2013
1 parent 4bfd4b2 commit 4063ebc
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 16 deletions.
40 changes: 24 additions & 16 deletions source/ch/cyberduck/core/s3/S3MultipartUploadService.java
Expand Up @@ -87,9 +87,6 @@ public S3MultipartUploadService(final S3Session session, final Long partsize) {
public void upload(final Path file, final BandwidthThrottle throttle, final StreamListener listener,
final TransferStatus status) throws BackgroundException {
try {
final S3TouchFeature touch = new S3TouchFeature(session);
// Placeholder
touch.touch(file);
MultipartUpload multipart = null;
if(status.isAppend()) {
// This operation lists in-progress multipart uploads. An in-progress multipart upload is a
Expand All @@ -112,19 +109,23 @@ public void upload(final Path file, final BandwidthThrottle throttle, final Stre
}
}
if(null == multipart) {
log.info("No pending multipart upload found");
if(log.isInfoEnabled()) {
log.info("No pending multipart upload found");
}
final S3TouchFeature touch = new S3TouchFeature(session);
// Placeholder
touch.touch(file);
final StorageObject object = this.createObjectDetails(file);
// Initiate multipart upload with metadata
Map<String, Object> metadata = object.getModifiableMetadata();
if(StringUtils.isNotBlank(Preferences.instance().getProperty("s3.storage.class"))) {
metadata.put(session.getClient().getRestHeaderPrefix() + "storage-class",
metadata.put(String.format("%sstorage-class", session.getClient().getRestHeaderPrefix()),
Preferences.instance().getProperty("s3.storage.class"));
}
if(StringUtils.isNotBlank(Preferences.instance().getProperty("s3.encryption.algorithm"))) {
metadata.put(session.getClient().getRestHeaderPrefix() + "server-side-encryption",
metadata.put(String.format("%sserver-side-encryption", session.getClient().getRestHeaderPrefix()),
Preferences.instance().getProperty("s3.encryption.algorithm"));
}

multipart = session.getClient().multipartStartUpload(
containerService.getContainer(file).getName(), containerService.getKey(file), metadata);
}
Expand All @@ -145,10 +146,14 @@ public void upload(final Path file, final BandwidthThrottle throttle, final Stre
for(int partNumber = 1; remaining > 0; partNumber++) {
boolean skip = false;
if(status.isAppend()) {
log.info(String.format("Determine if part %d can be skipped", partNumber));
if(log.isInfoEnabled()) {
log.info(String.format("Determine if part number %d can be skipped", partNumber));
}
for(MultipartPart c : completed) {
if(c.getPartNumber().equals(partNumber)) {
log.info("Skip completed part number " + partNumber);
if(log.isInfoEnabled()) {
log.info(String.format("Skip completed part number %d", partNumber));
}
listener.bytesSent(c.getSize());
skip = true;
break;
Expand All @@ -171,11 +176,11 @@ public void upload(final Path file, final BandwidthThrottle throttle, final Stre
completed.add(future.get());
}
catch(InterruptedException e) {
log.error("Part upload failed:" + e.getMessage());
log.error("Part upload failed with interrupt failure");
throw new ConnectionCanceledException(e);
}
catch(ExecutionException e) {
log.warn("Part upload failed:" + e.getMessage());
log.warn(String.format("Part upload failed with execution failure %s", e.getMessage()));
if(e.getCause() instanceof ServiceException) {
throw (ServiceException) e.getCause();
}
Expand All @@ -185,10 +190,8 @@ public void upload(final Path file, final BandwidthThrottle throttle, final Stre
throw new ConnectionCanceledException(e);
}
}
if(status.isComplete()) {
// Combining all the given parts into the final object.
session.getClient().multipartCompleteUpload(multipart, completed);
}
// Combining all the given parts into the final object.
session.getClient().multipartCompleteUpload(multipart, completed);
}
finally {
// Cancel future tasks
Expand All @@ -207,7 +210,9 @@ private Future<MultipartPart> submitPart(final Path file,
final BandwidthThrottle throttle, final StreamListener listener,
final TransferStatus status, final MultipartUpload multipart,
final int partNumber, final long offset, final long length) throws BackgroundException {
log.info(String.format("Submit part %d to queue", partNumber));
if(log.isInfoEnabled()) {
log.info(String.format("Submit part %d of %s to queue with offset %d and length %d", partNumber, file, offset, length));
}
return pool.execute(new Callable<MultipartPart>() {
@Override
public MultipartPart call() throws BackgroundException {
Expand All @@ -230,6 +235,9 @@ public MultipartPart call() throws BackgroundException {
IOUtils.closeQuietly(out);
}
final StorageObject part = out.getResponse();
if(log.isInfoEnabled()) {
log.info(String.format("Received response for part number %d", partNumber));
}
// Populate part with response data that is accessible via the object's metadata
return new MultipartPart(partNumber, part.getLastModifiedDate(),
part.getETag(), part.getContentLength());
Expand Down
92 changes: 92 additions & 0 deletions test/ch/cyberduck/core/s3/S3MultipartUploadServiceTest.java
Expand Up @@ -8,6 +8,7 @@
import ch.cyberduck.core.DisabledPasswordStore;
import ch.cyberduck.core.Host;
import ch.cyberduck.core.Path;
import ch.cyberduck.core.exception.ConnectionCanceledException;
import ch.cyberduck.core.exception.NotfoundException;
import ch.cyberduck.core.io.AbstractStreamListener;
import ch.cyberduck.core.io.BandwidthThrottle;
Expand Down Expand Up @@ -97,4 +98,95 @@ public void testMultipleParts() throws Exception {
new S3DefaultDeleteFeature(session).delete(Collections.<Path>singletonList(test), new DisabledLoginController());
session.close();
}

@Test
public void testAppendSecondPart() throws Exception {
final S3Session session = new S3Session(
new Host(new S3Protocol(), new S3Protocol().getDefaultHostname(),
new Credentials(
properties.getProperty("s3.key"), properties.getProperty("s3.secret")
)));
session.open(new DefaultHostKeyController());
session.login(new DisabledPasswordStore(), new DisabledLoginController());
final Path container = new Path("test.cyberduck.ch", Path.VOLUME_TYPE);
final Path test = new Path(container, UUID.randomUUID().toString(), Path.FILE_TYPE);
test.setLocal(new FinderLocal(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()));
final byte[] random = new byte[10485760];
new Random().nextBytes(random);
IOUtils.write(random, test.getLocal().getOutputStream(false));
final TransferStatus status = new TransferStatus();
status.setLength(random.length);
try {
new S3MultipartUploadService(session, 10485760L).upload(test, new BandwidthThrottle(BandwidthThrottle.UNLIMITED), new AbstractStreamListener() {
long count;

@Override
public void bytesSent(final long bytes) {
if(count >= 5242880) {
throw new RuntimeException();
}
count += bytes;
}
}, status);
}
catch(ConnectionCanceledException e) {
// Expected
}
status.setAppend(true);
assertTrue(session.exists(test));
assertEquals(0L, session.list(container,
new DisabledListProgressListener()).get(test.getReference()).attributes().getSize());
new S3MultipartUploadService(session, 10485760L).upload(test, new BandwidthThrottle(BandwidthThrottle.UNLIMITED), new AbstractStreamListener(), status);
assertTrue(session.exists(test));
assertEquals(random.length, session.list(container,
new DisabledListProgressListener()).get(test.getReference()).attributes().getSize());
new S3DefaultDeleteFeature(session).delete(Collections.<Path>singletonList(test), new DisabledLoginController());
session.close();
}


@Test
public void testAppendNoPartCompleted() throws Exception {
final S3Session session = new S3Session(
new Host(new S3Protocol(), new S3Protocol().getDefaultHostname(),
new Credentials(
properties.getProperty("s3.key"), properties.getProperty("s3.secret")
)));
session.open(new DefaultHostKeyController());
session.login(new DisabledPasswordStore(), new DisabledLoginController());
final Path container = new Path("test.cyberduck.ch", Path.VOLUME_TYPE);
final Path test = new Path(container, UUID.randomUUID().toString(), Path.FILE_TYPE);
test.setLocal(new FinderLocal(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()));
final byte[] random = new byte[32769];
new Random().nextBytes(random);
IOUtils.write(random, test.getLocal().getOutputStream(false));
final TransferStatus status = new TransferStatus();
status.setLength(random.length);
try {
new S3MultipartUploadService(session, 10485760L).upload(test, new BandwidthThrottle(BandwidthThrottle.UNLIMITED), new AbstractStreamListener() {
long count;

@Override
public void bytesSent(final long bytes) {
if(count >= 32768) {
throw new RuntimeException();
}
count += bytes;
}
}, status);
}
catch(ConnectionCanceledException e) {
// Expected
}
status.setAppend(true);
assertTrue(session.exists(test));
assertEquals(0L, session.list(container,
new DisabledListProgressListener()).get(test.getReference()).attributes().getSize());
new S3MultipartUploadService(session, 10485760L).upload(test, new BandwidthThrottle(BandwidthThrottle.UNLIMITED), new AbstractStreamListener(), status);
assertTrue(session.exists(test));
assertEquals(random.length, session.list(container,
new DisabledListProgressListener()).get(test.getReference()).attributes().getSize());
new S3DefaultDeleteFeature(session).delete(Collections.<Path>singletonList(test), new DisabledLoginController());
session.close();
}
}

0 comments on commit 4063ebc

Please sign in to comment.