Skip to content
This repository has been archived by the owner on Jul 25, 2020. It is now read-only.

Commit

Permalink
Allows users to download large files efficiently and directly to disk.
Browse files Browse the repository at this point in the history
  • Loading branch information
zack-shoylev committed Aug 5, 2016
1 parent 42079e1 commit 1fbc551
Show file tree
Hide file tree
Showing 12 changed files with 656 additions and 51 deletions.
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.jclouds.atmos.options.PutOptions.Builder.publicRead;

import java.io.InputStream;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -115,7 +116,7 @@ protected boolean deleteAndVerifyContainerGone(final String container) {

/**
* This implementation invokes {@link AtmosClient#createDirectory}
*
*
* @param location
* currently ignored
* @param container
Expand Down Expand Up @@ -148,7 +149,7 @@ public void setContainerAccess(String container, ContainerAccess access) {

/**
* This implementation invokes {@link AtmosClient#createDirectory}
*
*
* @param container
* directory name
*/
Expand Down Expand Up @@ -183,7 +184,7 @@ public boolean directoryExists(String container, String directory) {

/**
* This implementation invokes {@link AtmosClient#pathExists}
*
*
* @param container
* container
* @param key
Expand Down Expand Up @@ -341,6 +342,11 @@ public int getMaximumNumberOfParts() {
throw new UnsupportedOperationException("Atmos does not support multipart uploads");
}

@Override
public InputStream streamBlob(String container, String name) {
throw new UnsupportedOperationException("Atmos does not support multipart uploads");
}

@Override
public String copyBlob(String fromContainer, String fromName, String toContainer, String toName,
CopyOptions options) {
Expand Down
Expand Up @@ -25,15 +25,26 @@
import static org.jclouds.location.predicates.LocationPredicates.idEquals;
import static org.jclouds.openstack.swift.v1.options.PutOptions.Builder.metadata;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;

import javax.inject.Inject;
import javax.inject.Named;

import org.jclouds.Constants;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.KeyNotFoundException;
Expand Down Expand Up @@ -80,7 +91,6 @@
import org.jclouds.openstack.swift.v1.reference.SwiftHeaders;

import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
Expand All @@ -96,10 +106,12 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.net.HttpHeaders;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.AbstractModule;
import com.google.inject.Injector;
import com.google.inject.assistedinject.Assisted;
Expand All @@ -109,7 +121,7 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
@Inject
protected RegionScopedSwiftBlobStore(Injector baseGraph, BlobStoreContext context, SwiftApi api,
@Memoized Supplier<Set<? extends Location>> locations, @Assisted String regionId,
PayloadSlicer slicer) {
PayloadSlicer slicer, @Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor) {
checkNotNull(regionId, "regionId");
Optional<? extends Location> found = tryFind(locations.get(), idEquals(regionId));
checkArgument(found.isPresent(), "region %s not in %s", regionId, locations.get());
Expand All @@ -119,6 +131,7 @@ protected RegionScopedSwiftBlobStore(Injector baseGraph, BlobStoreContext contex
this.toResourceMetadata = new ToResourceMetadata(found.get());
this.context = context;
this.api = api;
this.userExecutor = userExecutor;
// until we parameterize ClearListStrategy with a factory
this.clearList = baseGraph.createChildInjector(new AbstractModule() {
@Override
Expand All @@ -137,6 +150,7 @@ protected void configure() {
private final ToListContainerOptions toListContainerOptions = new ToListContainerOptions();
private final ToResourceMetadata toResourceMetadata;
protected final PayloadSlicer slicer;
protected final ListeningExecutorService userExecutor;

@Override
public Set<? extends Location> listAssignableLocations() {
Expand Down Expand Up @@ -586,10 +600,9 @@ public long countBlobs(String containerName, ListContainerOptions options) {
throw new UnsupportedOperationException();
}

@com.google.inject.Inject
@Named(PROPERTY_USER_THREADS)
@VisibleForTesting
ListeningExecutorService userExecutor;
@com.google.inject.Inject(optional = true)
@Named(Constants.PROPERTY_MAX_RETRIES)
protected int retryCountLimit = 5;

/**
* Upload using a user-provided executor, or the jclouds userExecutor
Expand Down Expand Up @@ -618,7 +631,7 @@ protected String putMultipartBlob(String container, Blob blob, PutOptions overri
getMinimumMultipartPartSize(), getMaximumMultipartPartSize(), getMaximumNumberOfParts());
long partSize = algorithm.calculateChunkSize(contentLength);
MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), partSize, overrides);
int partNumber = 1;
int partNumber = 0;

for (Payload payload : slicer.slice(blob.getPayload(), partSize)) {
BlobUploader b =
Expand All @@ -645,4 +658,220 @@ public MultipartPart call() {
return uploadMultipartPart(mpu, partNumber, payload);
}
}

@Override
@Beta
public void downloadBlob(String container, String name, File destination) {
downloadBlob(container, name, destination, userExecutor);
}

@Override
@Beta
public void downloadBlob(String container, String name, File destination, ExecutorService executor) {

ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
RandomAccessFile raf = null;
try {
long contentLength = api
.getObjectApi(regionId, container)
.getWithoutBody(name)
.getPayload()
.getContentMetadata()
.getContentLength();

// Reserve space for performance reasons
raf = new RandomAccessFile(destination.getAbsoluteFile(), "rw");
raf.seek(contentLength - 1);
raf.write(0);

// Determine download buffer size, smaller means less memory usage; larger is faster as long as threads are saturated
long partSize = getMinimumMultipartPartSize();

// Loop through ranges within the file
long from;
long to;
List<ListenableFuture<Void>> results = new ArrayList<ListenableFuture<Void>>();

for (from = 0; from < contentLength; from = from + partSize) {
to = (from + partSize >= contentLength) ? contentLength - 1 : from + partSize - 1;
BlobDownloader b = new BlobDownloader(regionId, container, name, raf, from, to);
results.add(listeningExecutor.submit(b));
}

Futures.getUnchecked(Futures.allAsList(results));

} catch (IOException e) {
// cleanup, attempt to delete large file
if (raf != null) {
try {
raf.close();
} catch (IOException e1) {}
}
destination.delete();
throw new RuntimeException(e);
}
}

private final class BlobDownloader implements Callable<Void> {
String regionId;
String containerName;
String objectName;
private final RandomAccessFile raf;
private final long begin;
private final long end;

BlobDownloader(String regionId, String containerName, String objectName, RandomAccessFile raf, long begin, long end) {
this.regionId = regionId;
this.containerName = containerName;
this.objectName = objectName;
this.raf = raf;
this.begin = begin;
this.end = end;
}

@Override
public Void call() {
IOException lastException = null;
for (int retry = 0; retry < retryCountLimit; retry++) {
try {
SwiftObject object = api.getObjectApi(regionId, containerName)
.get(objectName, org.jclouds.http.options.GetOptions.Builder.range(begin, end));
// Download first, this is the part that usually fails
byte[] targetArray = ByteStreams.toByteArray(object.getPayload().openStream());
// Map file region
MappedByteBuffer out = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, begin, end - begin + 1);
out.put(targetArray);
out.force();
} catch (IOException e) {
lastException = e;
continue;
}
// Success!
return null;
}
throw new RuntimeException("After " + retryCountLimit + " retries: " + lastException);
}
}

@Beta
@Override
public InputStream streamBlob(final String container, final String name) {
return streamBlob(container, name, userExecutor);
}

@Beta
@Override
public InputStream streamBlob(final String container, final String name, final ExecutorService executor) {

final ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
// User will receive the Input end of the piped stream
final PipedOutputStream output;
final PipedInputStream input;
try {
output = new PipedOutputStream();
input = new PipedInputStream(output,
getMinimumMultipartPartSize() * 5 > Integer.MAX_VALUE ?
Integer.MAX_VALUE : (int) getMinimumMultipartPartSize() * 5);
} catch (IOException e) {
throw new RuntimeException(e);
}

// The total length of the file to download is needed to determine ranges
// It has to be obtainable without downloading the whole file
final long contentLength = api
.getObjectApi(regionId, container)
.getWithoutBody(name)
.getPayload()
.getContentMetadata()
.getContentLength();

// Determine download buffer size, smaller means less memory usage; larger is faster as long as threads are saturated
final long partSize = getMinimumMultipartPartSize();

// Used to communicate between the producer and consumer threads
final LinkedBlockingQueue<ListenableFuture<byte[]>> results = new LinkedBlockingQueue<ListenableFuture<byte[]>>();

listeningExecutor.submit(new Runnable() {
@Override
public void run() {
ListenableFuture<byte[]> result;
long from;
for (from = 0; from < contentLength; from = from + partSize) {
try {
System.out.println(Thread.currentThread() + " writing to output");
result = results.take();
if (result == null) {
output.close();
input.close();
throw new RuntimeException("Error downloading file part to stream");
}
output.write(result.get());
} catch (Exception e) {
System.out.println(e);
try {
// close pipe so client is notified of an exception
input.close();
} catch (IOException e1) {}
try {
output.close();
} catch (IOException e1) {}
throw new RuntimeException(e);
}
}
// Finished writing results to stream
try {
output.close();
} catch (IOException e) {
}
}
});

listeningExecutor.submit(new Runnable() {
@Override
public void run() {
long from;
long to;
// Loop through ranges within the file
for (from = 0; from < contentLength; from = from + partSize) {
to = (from + partSize >= contentLength) ? contentLength - 1 : from + partSize - 1;
BlobStreamDownloader b = new BlobStreamDownloader(container, name, from, to);
results.add(listeningExecutor.submit(b));
}
}
});
return input;
}

private final class BlobStreamDownloader implements Callable<byte[]> {
String containerName;
String objectName;
private final long begin;
private final long end;

BlobStreamDownloader(String containerName, String objectName, long begin, long end) {
this.containerName = containerName;
this.objectName = objectName;
this.begin = begin;
this.end = end;
}

@Override
public byte[] call() {
IOException lastException = null;
for (int retry = 0; retry < retryCountLimit; retry++) {
try {
long time = System.nanoTime();
SwiftObject object = api.getObjectApi(regionId, containerName)
.get(objectName, org.jclouds.http.options.GetOptions.Builder.range(begin, end));
byte[] downloadedBlock = ByteStreams.toByteArray(object.getPayload().openStream());
return downloadedBlock;
} catch (IOException e) {
System.out.println(e);
lastException = e;
continue;
}
}
throw new RuntimeException("After " + retryCountLimit + " retries: " + lastException);
}
}
}

0 comments on commit 1fbc551

Please sign in to comment.