Skip to content
This repository has been archived by the owner on Jul 25, 2020. It is now read-only.

Commit

Permalink
Test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
zack-shoylev committed Aug 3, 2016
1 parent 34cabe8 commit 405038a
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 35 deletions.
Expand Up @@ -38,10 +38,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.LinkedBlockingQueue;

import javax.inject.Inject;
import javax.inject.Named;
Expand Down Expand Up @@ -782,50 +780,64 @@ public InputStream streamBlob(final String container, final String name, final E
final long partSize = getMinimumMultipartPartSize();

// Used to communicate between the producer and consumer threads
final ConcurrentLinkedQueue<ListenableFuture<byte[]>> results = new ConcurrentLinkedQueue<ListenableFuture<byte[]>>();

// Reduce the memory footprint further by using a semaphore
final Semaphore permits = new Semaphore(5);
final LinkedBlockingQueue<ListenableFuture<byte[]>> results = new LinkedBlockingQueue<ListenableFuture<byte[]>>();

listeningExecutor.submit(new Runnable() {
@Override
public void run() {
ListenableFuture<byte[]> result;
long from;
long to;
// Loop through ranges within the file
for (from = 0; from < contentLength; from = from + partSize) {
System.out.println(Thread.currentThread() + " aquiring permit from available " + permits.availablePermits());
permits.acquireUninterruptibly();
to = (from + partSize >= contentLength) ? contentLength - 1 : from + partSize - 1;
BlobStreamDownloader b = new BlobStreamDownloader(container, name, from, to);
results.add(listeningExecutor.submit(b));
try {
System.out.println(Thread.currentThread() + " writing to output");
result = results.take();
if (result == null) {
System.out.println("Error downloading file part to stream");
output.close();
input.close();
throw new RuntimeException("Error downloading file part to stream");
}
output.write(result.get());
} catch (Exception e) {
System.out.println(e);
try {
// close pipe so client is notified of an exception
input.close();
} catch (IOException e1) {}
try {
output.close();
} catch (IOException e1) {}
throw new RuntimeException(e);
}
}

System.out.println(Thread.currentThread() + " done writing to output");
try {
output.close();
} catch (IOException e) {
}
}
});

listeningExecutor.submit(new Runnable() {
@Override
public void run() {
try {
output.write( results.poll().get() );
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} finally {
permits.release();
System.out.println(Thread.currentThread() + " aquiring permit from available " + permits.availablePermits());
long from;
long to;
// Loop through ranges within the file
for (from = 0; from < contentLength; from = from + partSize) {
to = (from + partSize >= contentLength) ? contentLength - 1 : from + partSize - 1;
BlobStreamDownloader b = new BlobStreamDownloader(container, name, from, to);
results.add(listeningExecutor.submit(b));
}
System.out.println("All threads queued");
}
});

return input;
}

private final class BlobStreamDownloader implements Callable<byte[]> {
String regionId;
String containerName;
String objectName;
private final long begin;
Expand All @@ -841,12 +853,17 @@ private final class BlobStreamDownloader implements Callable<byte[]> {
@Override
public byte[] call() {
try {
// System.out.println("Downloading " + begin + " to " + end );
System.out.println("Downloading " + begin + " to " + end );
long time = System.nanoTime();
SwiftObject object = api.getObjectApi(regionId, containerName)
.get(objectName, org.jclouds.http.options.GetOptions.Builder.range(begin, end));
return ByteStreams.toByteArray(object.getPayload().openStream());
byte[] downloadedBlock = ByteStreams.toByteArray(object.getPayload().openStream());
System.out.println("Downloading " + begin + " to " + end + " was completed in " + (System.nanoTime() - time) / 1000000000 + " seconds" );
return downloadedBlock;
} catch (IOException e) {
throw new RuntimeException(e);
System.out.println("Failed downloading: " + e);
System.out.println(e);
return null;
}
}
}
Expand Down
Expand Up @@ -30,8 +30,10 @@
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.jclouds.blobstore.BlobStore;
Expand All @@ -50,14 +52,19 @@
import com.google.common.util.concurrent.MoreExecutors;

// TODO: Rolls tests up to BaseBlobStoreIntegrationTest
@Test(groups = "live")
@Test(groups = "live", singleThreaded = true)
public class RegionScopedSwiftBlobStoreParallelLiveTest extends BaseBlobStoreIntegrationTest {

private final File BIG_FILE = new File("random.dat");
private final long SIZE = 10000005; //10 * 1000 * 1000;
private BlobStore blobStore;
private String ETAG;
private ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(20));
private ListeningExecutorService executor =
MoreExecutors.listeningDecorator(
MoreExecutors.getExitingExecutorService(
new ThreadPoolExecutor(20, 20,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10, true), new ThreadPoolExecutor.CallerRunsPolicy())));

private String CONTAINER = "jcloudsparalleltest" + UUID.randomUUID();

Expand Down Expand Up @@ -105,7 +112,6 @@ public void uploadMultipartBlob() {
.build();
// configure the blobstore to use multipart uploading of the file
String eTag = blobStore.putBlob(CONTAINER, blob, multipart(executor));
System.out.println("swift returns md5: " + eTag);
// assertEquals(eTag, ETAG);
// The etag returned by Swift is not the md5 of the Blob uploaded
// It is the md5 of the concatenated segment md5s
Expand All @@ -115,9 +121,7 @@ public void uploadMultipartBlob() {
public void downloadMultipartBlob() throws IOException {
final File downloadedFile = new File(BIG_FILE.getName() + ".downloaded");
blobStore.downloadBlob(CONTAINER, BIG_FILE.getName(), downloadedFile, executor);

String eTag = Files.hash(downloadedFile, Hashing.md5()).toString();
System.out.println("parallel file d5: " + eTag);
assertEquals(eTag, ETAG);
}

Expand All @@ -134,7 +138,8 @@ public void streamMultipartBlob() throws IOException {
hasher.putBytes(segment, 0, read);
}

assertEquals(hasher.hash(), ETAG);
is.close();
assertEquals(hasher.hash().toString(), ETAG);
}

private void createRandomFile(long size, File file) throws IOException, InterruptedException {
Expand Down

0 comments on commit 405038a

Please sign in to comment.