Skip to content
This repository has been archived by the owner on Jul 25, 2020. It is now read-only.

Allows users to download large files efficiently and directly to disk. #983

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.jclouds.atmos.options.PutOptions.Builder.publicRead;

import java.io.InputStream;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -115,7 +116,7 @@ protected boolean deleteAndVerifyContainerGone(final String container) {

/**
* This implementation invokes {@link AtmosClient#createDirectory}
*
*
* @param location
* currently ignored
* @param container
Expand Down Expand Up @@ -148,7 +149,7 @@ public void setContainerAccess(String container, ContainerAccess access) {

/**
* This implementation invokes {@link AtmosClient#createDirectory}
*
*
* @param container
* directory name
*/
Expand Down Expand Up @@ -183,7 +184,7 @@ public boolean directoryExists(String container, String directory) {

/**
* This implementation invokes {@link AtmosClient#pathExists}
*
*
* @param container
* container
* @param key
Expand Down Expand Up @@ -341,6 +342,11 @@ public int getMaximumNumberOfParts() {
throw new UnsupportedOperationException("Atmos does not support multipart uploads");
}

@Override
public InputStream streamBlob(String container, String name) {
throw new UnsupportedOperationException("Atmos does not support multipart uploads");
}

@Override
public String copyBlob(String fromContainer, String fromName, String toContainer, String toName,
CopyOptions options) {
Expand Down
Expand Up @@ -25,15 +25,26 @@
import static org.jclouds.location.predicates.LocationPredicates.idEquals;
import static org.jclouds.openstack.swift.v1.options.PutOptions.Builder.metadata;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;

import javax.inject.Inject;
import javax.inject.Named;

import org.jclouds.Constants;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.KeyNotFoundException;
Expand Down Expand Up @@ -80,7 +91,6 @@
import org.jclouds.openstack.swift.v1.reference.SwiftHeaders;

import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
Expand All @@ -96,10 +106,12 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.net.HttpHeaders;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.AbstractModule;
import com.google.inject.Injector;
import com.google.inject.assistedinject.Assisted;
Expand All @@ -109,7 +121,7 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
@Inject
protected RegionScopedSwiftBlobStore(Injector baseGraph, BlobStoreContext context, SwiftApi api,
@Memoized Supplier<Set<? extends Location>> locations, @Assisted String regionId,
PayloadSlicer slicer) {
PayloadSlicer slicer, @Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor) {
checkNotNull(regionId, "regionId");
Optional<? extends Location> found = tryFind(locations.get(), idEquals(regionId));
checkArgument(found.isPresent(), "region %s not in %s", regionId, locations.get());
Expand All @@ -119,6 +131,7 @@ protected RegionScopedSwiftBlobStore(Injector baseGraph, BlobStoreContext contex
this.toResourceMetadata = new ToResourceMetadata(found.get());
this.context = context;
this.api = api;
this.userExecutor = userExecutor;
// until we parameterize ClearListStrategy with a factory
this.clearList = baseGraph.createChildInjector(new AbstractModule() {
@Override
Expand All @@ -137,6 +150,7 @@ protected void configure() {
private final ToListContainerOptions toListContainerOptions = new ToListContainerOptions();
private final ToResourceMetadata toResourceMetadata;
protected final PayloadSlicer slicer;
protected final ListeningExecutorService userExecutor;

@Override
public Set<? extends Location> listAssignableLocations() {
Expand Down Expand Up @@ -586,10 +600,9 @@ public long countBlobs(String containerName, ListContainerOptions options) {
throw new UnsupportedOperationException();
}

@com.google.inject.Inject
@Named(PROPERTY_USER_THREADS)
@VisibleForTesting
ListeningExecutorService userExecutor;
@com.google.inject.Inject(optional = true)
@Named(Constants.PROPERTY_MAX_RETRIES)
protected int retryCountLimit = 5;

/**
* Upload using a user-provided executor, or the jclouds userExecutor
Expand Down Expand Up @@ -618,7 +631,7 @@ protected String putMultipartBlob(String container, Blob blob, PutOptions overri
getMinimumMultipartPartSize(), getMaximumMultipartPartSize(), getMaximumNumberOfParts());
long partSize = algorithm.calculateChunkSize(contentLength);
MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), partSize, overrides);
int partNumber = 1;
int partNumber = 0;

for (Payload payload : slicer.slice(blob.getPayload(), partSize)) {
BlobUploader b =
Expand All @@ -645,4 +658,220 @@ public MultipartPart call() {
return uploadMultipartPart(mpu, partNumber, payload);
}
}

@Override
@Beta
public void downloadBlob(String container, String name, File destination) {
downloadBlob(container, name, destination, userExecutor);
}

@Override
@Beta
public void downloadBlob(String container, String name, File destination, ExecutorService executor) {

ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
RandomAccessFile raf = null;
try {
long contentLength = api
.getObjectApi(regionId, container)
.getWithoutBody(name)
.getPayload()
.getContentMetadata()
.getContentLength();

// Reserve space for performance reasons
raf = new RandomAccessFile(destination.getAbsoluteFile(), "rw");
raf.seek(contentLength - 1);
raf.write(0);

// Determine download buffer size, smaller means less memory usage; larger is faster as long as threads are saturated
long partSize = getMinimumMultipartPartSize();

// Loop through ranges within the file
long from;
long to;
List<ListenableFuture<Void>> results = new ArrayList<ListenableFuture<Void>>();

for (from = 0; from < contentLength; from = from + partSize) {
to = (from + partSize >= contentLength) ? contentLength - 1 : from + partSize - 1;
BlobDownloader b = new BlobDownloader(regionId, container, name, raf, from, to);
results.add(listeningExecutor.submit(b));
}

Futures.getUnchecked(Futures.allAsList(results));

} catch (IOException e) {
// cleanup, attempt to delete large file
if (raf != null) {
try {
raf.close();
} catch (IOException e1) {}
}
destination.delete();
throw new RuntimeException(e);
}
}

private final class BlobDownloader implements Callable<Void> {
String regionId;
String containerName;
String objectName;
private final RandomAccessFile raf;
private final long begin;
private final long end;

BlobDownloader(String regionId, String containerName, String objectName, RandomAccessFile raf, long begin, long end) {
this.regionId = regionId;
this.containerName = containerName;
this.objectName = objectName;
this.raf = raf;
this.begin = begin;
this.end = end;
}

@Override
public Void call() {
IOException lastException = null;
for (int retry = 0; retry < retryCountLimit; retry++) {
try {
SwiftObject object = api.getObjectApi(regionId, containerName)
.get(objectName, org.jclouds.http.options.GetOptions.Builder.range(begin, end));
// Download first, this is the part that usually fails
byte[] targetArray = ByteStreams.toByteArray(object.getPayload().openStream());
// Map file region
MappedByteBuffer out = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, begin, end - begin + 1);
out.put(targetArray);
out.force();
} catch (IOException e) {
lastException = e;
continue;
}
// Success!
return null;
}
throw new RuntimeException("After " + retryCountLimit + " retries: " + lastException);
}
}

@Beta
@Override
public InputStream streamBlob(final String container, final String name) {
return streamBlob(container, name, userExecutor);
}

@Beta
@Override
public InputStream streamBlob(final String container, final String name, final ExecutorService executor) {

final ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
// User will receive the Input end of the piped stream
final PipedOutputStream output;
final PipedInputStream input;
try {
output = new PipedOutputStream();
input = new PipedInputStream(output,
getMinimumMultipartPartSize() * 5 > Integer.MAX_VALUE ?
Integer.MAX_VALUE : (int) getMinimumMultipartPartSize() * 5);
} catch (IOException e) {
throw new RuntimeException(e);
}

// The total length of the file to download is needed to determine ranges
// It has to be obtainable without downloading the whole file
final long contentLength = api
.getObjectApi(regionId, container)
.getWithoutBody(name)
.getPayload()
.getContentMetadata()
.getContentLength();

// Determine download buffer size, smaller means less memory usage; larger is faster as long as threads are saturated
final long partSize = getMinimumMultipartPartSize();

// Used to communicate between the producer and consumer threads
final LinkedBlockingQueue<ListenableFuture<byte[]>> results = new LinkedBlockingQueue<ListenableFuture<byte[]>>();

listeningExecutor.submit(new Runnable() {
@Override
public void run() {
ListenableFuture<byte[]> result;
long from;
for (from = 0; from < contentLength; from = from + partSize) {
try {
System.out.println(Thread.currentThread() + " writing to output");
result = results.take();
if (result == null) {
output.close();
input.close();
throw new RuntimeException("Error downloading file part to stream");
}
output.write(result.get());
} catch (Exception e) {
System.out.println(e);
try {
// close pipe so client is notified of an exception
input.close();
} catch (IOException e1) {}
try {
output.close();
} catch (IOException e1) {}
throw new RuntimeException(e);
}
}
// Finished writing results to stream
try {
output.close();
} catch (IOException e) {
}
}
});

listeningExecutor.submit(new Runnable() {
@Override
public void run() {
long from;
long to;
// Loop through ranges within the file
for (from = 0; from < contentLength; from = from + partSize) {
to = (from + partSize >= contentLength) ? contentLength - 1 : from + partSize - 1;
BlobStreamDownloader b = new BlobStreamDownloader(container, name, from, to);
results.add(listeningExecutor.submit(b));
}
}
});
return input;
}

private final class BlobStreamDownloader implements Callable<byte[]> {
String containerName;
String objectName;
private final long begin;
private final long end;

BlobStreamDownloader(String containerName, String objectName, long begin, long end) {
this.containerName = containerName;
this.objectName = objectName;
this.begin = begin;
this.end = end;
}

@Override
public byte[] call() {
IOException lastException = null;
for (int retry = 0; retry < retryCountLimit; retry++) {
try {
long time = System.nanoTime();
SwiftObject object = api.getObjectApi(regionId, containerName)
.get(objectName, org.jclouds.http.options.GetOptions.Builder.range(begin, end));
byte[] downloadedBlock = ByteStreams.toByteArray(object.getPayload().openStream());
return downloadedBlock;
} catch (IOException e) {
System.out.println(e);
lastException = e;
continue;
}
}
throw new RuntimeException("After " + retryCountLimit + " retries: " + lastException);
}
}
}