Skip to content

Commit

Permalink
Fixed the fetchers
Browse files Browse the repository at this point in the history
  • Loading branch information
rsumbaly committed Aug 2, 2011
1 parent 5be2a6e commit 6c2b0cd
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 61 deletions.
Expand Up @@ -19,6 +19,7 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.NumberFormat;
import java.util.Arrays;
Expand Down Expand Up @@ -53,6 +54,10 @@
*/
public class HdfsFetcher extends FileFetcher {

public HdfsFetcher() {
this(new Props());
}

public HdfsFetcher(Props props) {
super(props);
logger.info("Created hdfs fetcher with throttle rate " + maxBytesPerSecond
Expand Down Expand Up @@ -218,6 +223,44 @@ private void copyFileWithCheckSum(FileSystem fs,
}
}

private void copyFileWithCheckSum(InputStream inputStream,
OutputStream outputStream,
CopyStats stats,
CheckSum fileCheckSumGenerator) throws IOException {
byte[] buffer = new byte[bufferSize];
while(true) {
int read = inputStream.read(buffer);
if(read < 0) {
break;
} else if(read < bufferSize) {
buffer = ByteUtils.copy(buffer, 0, read);
}
outputStream.write(buffer);
if(fileCheckSumGenerator != null)
fileCheckSumGenerator.update(buffer);
if(throttler != null)
throttler.maybeThrottle(read);
stats.recordBytes(read);
if(stats.getBytesSinceLastReport() > reportingIntervalBytes) {
NumberFormat format = NumberFormat.getNumberInstance();
format.setMaximumFractionDigits(2);
logger.info(stats.getTotalBytesCopied() / (1024 * 1024) + " MB copied at "
+ format.format(stats.getBytesPerSecond() / (1024 * 1024))
+ " MB/sec - " + format.format(stats.getPercentCopied())
+ " % complete");
if(this.status != null) {
this.status.setStatus(stats.getTotalBytesCopied()
/ (1024 * 1024)
+ " MB copied at "
+ format.format(stats.getBytesPerSecond() / (1024 * 1024))
+ " MB/sec - " + format.format(stats.getPercentCopied())
+ " % complete");
}
stats.reset();
}
}
}

private long sizeOfPath(FileSystem fs, Path path) throws IOException {
long size = 0;
FileStatus[] statuses = fs.listStatus(path);
Expand Down
Expand Up @@ -61,11 +61,6 @@ public File fetch(String sourceFileUrl, String destinationFile) throws IOExcepti
for(Entry<String, InputStream> entry: map.entrySet()) {
System.out.println("BLAH - " + entry.getKey());
}
// for(StorageMetadata metadata: store.list("rsumbaly",
// ListContainerOptions.Builder.inDirectory("voldemort/node-0"))) {
// System.out.println("Metadata - " + metadata.getName());
//
// }

} finally {
if(context != null) {
Expand All @@ -80,18 +75,15 @@ public File fetch(String sourceFileUrl, String destinationFile) throws IOExcepti
* Main method for testing fetching
*/
public static void main(String[] args) throws Exception {
if(args.length != 2) {
System.err.println("java [classname] [identity] [credential]");
System.exit(1);
}
Props props = new Props();
props.put("fetcher.identity", "AKIAIH2ZVHQDVYZUWGNA");
props.put("fetcher.credential", "5ADjaxBgofFKZIfGXwnvpZR4q65AHrEHG2/e1dOF");
props.put("fetcher.identity", args[0]);
props.put("fetcher.credential", args[1]);
S3Fetcher fetcher = new S3Fetcher(props);
long start = System.currentTimeMillis();
File location = fetcher.fetch("", System.getProperty("java.io.tmpdir") + File.separator
+ start);
// double rate = size * Time.MS_PER_SECOND / (double)
// (System.currentTimeMillis() - start);
// NumberFormat nf = NumberFormat.getInstance();
// nf.setMaximumFractionDigits(2);
// System.out.println("Fetch to " + location + " completed: "
// + nf.format(rate / (1024.0 * 1024.0)) + " MB/sec.");
fetcher.fetch("", System.getProperty("java.io.tmpdir") + File.separator + start);
}
}
47 changes: 1 addition & 46 deletions src/java/voldemort/store/readonly/FileFetcher.java
Expand Up @@ -2,18 +2,11 @@

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.NumberFormat;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;

import voldemort.server.protocol.admin.AsyncOperationStatus;
import voldemort.store.readonly.checksum.CheckSum;
import voldemort.store.readonly.fetcher.CopyStats;
import voldemort.store.readonly.fetcher.HdfsFetcher;
import voldemort.utils.ByteUtils;
import voldemort.utils.EventThrottler;
import voldemort.utils.Props;
import voldemort.utils.Utils;
Expand All @@ -30,7 +23,7 @@
*/
public abstract class FileFetcher {

protected static final Logger logger = Logger.getLogger(HdfsFetcher.class);
protected static final Logger logger = Logger.getLogger(FileFetcher.class);

/**
* Strings for parameters
Expand Down Expand Up @@ -66,44 +59,6 @@ public FileFetcher(Props props) {

}

protected void copyFileWithCheckSum(InputStream inputStream,
OutputStream outputStream,
CopyStats stats,
CheckSum fileCheckSumGenerator) throws IOException {
byte[] buffer = new byte[bufferSize];
while(true) {
int read = inputStream.read(buffer);
if(read < 0) {
break;
} else if(read < bufferSize) {
buffer = ByteUtils.copy(buffer, 0, read);
}
outputStream.write(buffer);
if(fileCheckSumGenerator != null)
fileCheckSumGenerator.update(buffer);
if(throttler != null)
throttler.maybeThrottle(read);
stats.recordBytes(read);
if(stats.getBytesSinceLastReport() > reportingIntervalBytes) {
NumberFormat format = NumberFormat.getNumberInstance();
format.setMaximumFractionDigits(2);
logger.info(stats.getTotalBytesCopied() / (1024 * 1024) + " MB copied at "
+ format.format(stats.getBytesPerSecond() / (1024 * 1024))
+ " MB/sec - " + format.format(stats.getPercentCopied())
+ " % complete");
if(this.status != null) {
this.status.setStatus(stats.getTotalBytesCopied()
/ (1024 * 1024)
+ " MB copied at "
+ format.format(stats.getBytesPerSecond() / (1024 * 1024))
+ " MB/sec - " + format.format(stats.getPercentCopied())
+ " % complete");
}
stats.reset();
}
}
}

public abstract File fetch(String source, String dest) throws IOException;

public void setAsyncOperationStatus(AsyncOperationStatus status) {
Expand Down

0 comments on commit 6c2b0cd

Please sign in to comment.