From 6c2b0cd3248e0ea82526248f4c91d1d447e16bc1 Mon Sep 17 00:00:00 2001 From: Roshan Sumbaly Date: Tue, 2 Aug 2011 11:36:09 -0700 Subject: [PATCH] Fixed the fetchers --- .../store/readonly/fetcher/HdfsFetcher.java | 43 +++++++++++++++++ .../store/readonly/fetcher/S3Fetcher.java | 22 +++------ .../voldemort/store/readonly/FileFetcher.java | 47 +------------------ 3 files changed, 51 insertions(+), 61 deletions(-) diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java index ee63aa4464..f5e502bce6 100644 --- a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.text.NumberFormat; import java.util.Arrays; @@ -53,6 +54,10 @@ */ public class HdfsFetcher extends FileFetcher { + public HdfsFetcher() { + this(new Props()); + } + public HdfsFetcher(Props props) { super(props); logger.info("Created hdfs fetcher with throttle rate " + maxBytesPerSecond @@ -218,6 +223,44 @@ private void copyFileWithCheckSum(FileSystem fs, } } + private void copyFileWithCheckSum(InputStream inputStream, + OutputStream outputStream, + CopyStats stats, + CheckSum fileCheckSumGenerator) throws IOException { + byte[] buffer = new byte[bufferSize]; + while(true) { + int read = inputStream.read(buffer); + if(read < 0) { + break; + } else if(read < bufferSize) { + buffer = ByteUtils.copy(buffer, 0, read); + } + outputStream.write(buffer); + if(fileCheckSumGenerator != null) + fileCheckSumGenerator.update(buffer); + if(throttler != null) + throttler.maybeThrottle(read); + stats.recordBytes(read); + if(stats.getBytesSinceLastReport() > reportingIntervalBytes) { + NumberFormat format = NumberFormat.getNumberInstance(); + format.setMaximumFractionDigits(2); + logger.info(stats.getTotalBytesCopied() / (1024 * 1024) + " MB copied at " + + format.format(stats.getBytesPerSecond() / (1024 * 1024)) + + " MB/sec - " + format.format(stats.getPercentCopied()) + + " % complete"); + if(this.status != null) { + this.status.setStatus(stats.getTotalBytesCopied() + / (1024 * 1024) + + " MB copied at " + + format.format(stats.getBytesPerSecond() / (1024 * 1024)) + + " MB/sec - " + format.format(stats.getPercentCopied()) + + " % complete"); + } + stats.reset(); + } + } + } + private long sizeOfPath(FileSystem fs, Path path) throws IOException { long size = 0; FileStatus[] statuses = fs.listStatus(path); diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/S3Fetcher.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/S3Fetcher.java index c38cce3fc0..1583f63e1c 100644 --- a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/S3Fetcher.java +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/S3Fetcher.java @@ -61,11 +61,6 @@ public File fetch(String sourceFileUrl, String destinationFile) throws IOExcepti for(Entry entry: map.entrySet()) { System.out.println("BLAH - " + entry.getKey()); } - // for(StorageMetadata metadata: store.list("rsumbaly", - // ListContainerOptions.Builder.inDirectory("voldemort/node-0"))) { - // System.out.println("Metadata - " + metadata.getName()); - // - // } } finally { if(context != null) { @@ -80,18 +75,15 @@ public File fetch(String sourceFileUrl, String destinationFile) throws IOExcepti * Main method for testing fetching */ public static void main(String[] args) throws Exception { + if(args.length != 2) { + System.err.println("java [classname] [identity] [credential]"); + System.exit(1); + } Props props = new Props(); - props.put("fetcher.identity", "AKIAIH2ZVHQDVYZUWGNA"); - props.put("fetcher.credential", "5ADjaxBgofFKZIfGXwnvpZR4q65AHrEHG2/e1dOF"); + props.put("fetcher.identity", args[0]); + props.put("fetcher.credential", args[1]); S3Fetcher fetcher = new S3Fetcher(props); long start = System.currentTimeMillis(); - File location = fetcher.fetch("", System.getProperty("java.io.tmpdir") + File.separator - + start); - // double rate = size * Time.MS_PER_SECOND / (double) - // (System.currentTimeMillis() - start); - // NumberFormat nf = NumberFormat.getInstance(); - // nf.setMaximumFractionDigits(2); - // System.out.println("Fetch to " + location + " completed: " - // + nf.format(rate / (1024.0 * 1024.0)) + " MB/sec."); + fetcher.fetch("", System.getProperty("java.io.tmpdir") + File.separator + start); } } diff --git a/src/java/voldemort/store/readonly/FileFetcher.java b/src/java/voldemort/store/readonly/FileFetcher.java index ec3abf8a39..7e5e087bd6 100644 --- a/src/java/voldemort/store/readonly/FileFetcher.java +++ b/src/java/voldemort/store/readonly/FileFetcher.java @@ -2,18 +2,11 @@ import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.text.NumberFormat; import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import voldemort.server.protocol.admin.AsyncOperationStatus; -import voldemort.store.readonly.checksum.CheckSum; -import voldemort.store.readonly.fetcher.CopyStats; -import voldemort.store.readonly.fetcher.HdfsFetcher; -import voldemort.utils.ByteUtils; import voldemort.utils.EventThrottler; import voldemort.utils.Props; import voldemort.utils.Utils; @@ -30,7 +23,7 @@ */ public abstract class FileFetcher { - protected static final Logger logger = Logger.getLogger(HdfsFetcher.class); + protected static final Logger logger = Logger.getLogger(FileFetcher.class); /** * Strings for parameters @@ -66,44 +59,6 @@ public FileFetcher(Props props) { } - protected void copyFileWithCheckSum(InputStream inputStream, - OutputStream outputStream, - CopyStats stats, - CheckSum fileCheckSumGenerator) throws IOException { - byte[] buffer = new byte[bufferSize]; - while(true) { - int read = inputStream.read(buffer); - if(read < 0) { - break; - } else if(read < bufferSize) { - buffer = ByteUtils.copy(buffer, 0, read); - } - outputStream.write(buffer); - if(fileCheckSumGenerator != null) - fileCheckSumGenerator.update(buffer); - if(throttler != null) - throttler.maybeThrottle(read); - stats.recordBytes(read); - if(stats.getBytesSinceLastReport() > reportingIntervalBytes) { - NumberFormat format = NumberFormat.getNumberInstance(); - format.setMaximumFractionDigits(2); - logger.info(stats.getTotalBytesCopied() / (1024 * 1024) + " MB copied at " - + format.format(stats.getBytesPerSecond() / (1024 * 1024)) - + " MB/sec - " + format.format(stats.getPercentCopied()) - + " % complete"); - if(this.status != null) { - this.status.setStatus(stats.getTotalBytesCopied() - / (1024 * 1024) - + " MB copied at " - + format.format(stats.getBytesPerSecond() / (1024 * 1024)) - + " MB/sec - " + format.format(stats.getPercentCopied()) - + " % complete"); - } - stats.reset(); - } - } - } - public abstract File fetch(String source, String dest) throws IOException; public void setAsyncOperationStatus(AsyncOperationStatus status) {