Skip to content

Commit

Permalink
Added retry logic to fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
abh1nay committed Oct 4, 2012
1 parent 67627cb commit 4732f8d
Showing 1 changed file with 55 additions and 37 deletions.
Expand Up @@ -68,6 +68,7 @@ public class HdfsFetcher implements FileFetcher {
private EventThrottler throttler = null;
private long minBytesPerSecond = 0;
private DynamicThrottleLimit globalThrottleLimit = null;
private static final int NUM_RETRIES = 3;

public HdfsFetcher(VoldemortConfig config) {
this(config.getMaxBytesPerSecond(),
Expand Down Expand Up @@ -282,46 +283,63 @@ private void copyFileWithCheckSum(FileSystem fs,
logger.info("Starting copy of " + source + " to " + dest);
FSDataInputStream input = null;
OutputStream output = null;
try {
input = fs.open(source);
output = new BufferedOutputStream(new FileOutputStream(dest));
byte[] buffer = new byte[bufferSize];
while(true) {
int read = input.read(buffer);
if(read < 0) {
break;
} else {
output.write(buffer, 0, read);
}

if(fileCheckSumGenerator != null)
fileCheckSumGenerator.update(buffer, 0, read);
if(throttler != null)
throttler.maybeThrottle(read);
stats.recordBytes(read);
if(stats.getBytesSinceLastReport() > reportingIntervalBytes) {
NumberFormat format = NumberFormat.getNumberInstance();
format.setMaximumFractionDigits(2);
logger.info(stats.getTotalBytesCopied() / (1024 * 1024) + " MB copied at "
+ format.format(stats.getBytesPerSecond() / (1024 * 1024))
+ " MB/sec - " + format.format(stats.getPercentCopied())
+ " % complete");
if(this.status != null) {
this.status.setStatus(stats.getTotalBytesCopied()
/ (1024 * 1024)
+ " MB copied at "
+ format.format(stats.getBytesPerSecond()
/ (1024 * 1024)) + " MB/sec - "
+ format.format(stats.getPercentCopied())
+ " % complete");
for(int attempt = 0; attempt < NUM_RETRIES; attempt++) {
boolean success = true;
try {

input = fs.open(source);
output = new BufferedOutputStream(new FileOutputStream(dest));
byte[] buffer = new byte[bufferSize];
while(true) {
int read = input.read(buffer);
if(read < 0) {
break;
} else {
output.write(buffer, 0, read);
}

if(fileCheckSumGenerator != null)
fileCheckSumGenerator.update(buffer, 0, read);
if(throttler != null)
throttler.maybeThrottle(read);
stats.recordBytes(read);
if(stats.getBytesSinceLastReport() > reportingIntervalBytes) {
NumberFormat format = NumberFormat.getNumberInstance();
format.setMaximumFractionDigits(2);
logger.info(stats.getTotalBytesCopied() / (1024 * 1024) + " MB copied at "
+ format.format(stats.getBytesPerSecond() / (1024 * 1024))
+ " MB/sec - " + format.format(stats.getPercentCopied())
+ " % complete");
if(this.status != null) {
this.status.setStatus(stats.getTotalBytesCopied()
/ (1024 * 1024)
+ " MB copied at "
+ format.format(stats.getBytesPerSecond()
/ (1024 * 1024)) + " MB/sec - "
+ format.format(stats.getPercentCopied())
+ " % complete");
}
stats.reset();
}
stats.reset();
}
logger.info("Completed copy of " + source + " to " + dest);

} catch(IOException ioe) {
success = false;
logger.error("Error during copying file ", ioe);
if(attempt < NUM_RETRIES - 1)
logger.info("retrying copying");
else
throw ioe;

} finally {
IOUtils.closeQuietly(output);
IOUtils.closeQuietly(input);
if(success)
break;

}
logger.info("Completed copy of " + source + " to " + dest);
} finally {
IOUtils.closeQuietly(output);
IOUtils.closeQuietly(input);

}
}

Expand Down

0 comments on commit 4732f8d

Please sign in to comment.