Skip to content

Commit

Permalink
Using a per file checksum generator in the file copy in HdfsFetcher. …
Browse files Browse the repository at this point in the history
…This is used to handle the case where we might retry the copy in case of a Filesystem (hdfs) error.
  • Loading branch information
Chinmay Soman authored and abh1nay committed Mar 22, 2013
1 parent 8f4e8b0 commit b8f3a8d
Showing 1 changed file with 45 additions and 15 deletions.
Expand Up @@ -373,25 +373,28 @@ private boolean fetch(FileSystem fs, Path source, File dest, CopyStats stats)

logger.debug("Checksum from .metadata "
+ new String(Hex.encodeHex(origCheckSum)));

// Define the Global checksum generator
checkSumType = CheckSum.fromString(checkSumTypeString);
checkSumGenerator = CheckSum.getInstance(checkSumType);
fileCheckSumGenerator = CheckSum.getInstance(checkSumType);
}

} else if(!status.getPath().getName().startsWith(".")) {

// Read other (.data , .index files)
File copyLocation = new File(dest, status.getPath().getName());
copyFileWithCheckSum(fs,
status.getPath(),
copyLocation,
stats,
fileCheckSumGenerator);
fileCheckSumGenerator = copyFileWithCheckSum(fs,
status.getPath(),
copyLocation,
stats,
checkSumType);

if(fileCheckSumGenerator != null && checkSumGenerator != null) {
byte[] checkSum = fileCheckSumGenerator.getCheckSum();
logger.debug("Checksum for " + status.getPath() + " - "
+ new String(Hex.encodeHex(checkSum)));
if(logger.isDebugEnabled()) {
logger.debug("Checksum for " + status.getPath() + " - "
+ new String(Hex.encodeHex(checkSum)));
}
checkSumGenerator.update(checkSum);
}
}
Expand Down Expand Up @@ -422,18 +425,38 @@ private boolean fetch(FileSystem fs, Path source, File dest, CopyStats stats)

}

private void copyFileWithCheckSum(FileSystem fs,
Path source,
File dest,
CopyStats stats,
CheckSum fileCheckSumGenerator) throws IOException {
/**
* Function to copy a file from the given filesystem with a checksum of type
* 'checkSumType' computed and returned. In case an error occurs during such
* a copy, we do a retry for a maximum of NUM_RETRIES
*
* @param fs Filesystem used to copy the file
* @param source Source path of the file to copy
* @param dest Destination path of the file on the local machine
* @param stats Stats for measuring the transfer progress
* @param checkSumType Type of the Checksum to be computed for this file
* @return A Checksum (generator) of type checkSumType which contains the
* computed checksum of the copied file
* @throws IOException
*/
private CheckSum copyFileWithCheckSum(FileSystem fs,
Path source,
File dest,
CopyStats stats,
CheckSumType checkSumType) throws IOException {
CheckSum fileCheckSumGenerator = null;
logger.debug("Starting copy of " + source + " to " + dest);
FSDataInputStream input = null;
OutputStream output = null;
for(int attempt = 0; attempt < NUM_RETRIES; attempt++) {
boolean success = true;
try {

// Create a per file checksum generator
if(checkSumType != null) {
fileCheckSumGenerator = CheckSum.getInstance(checkSumType);
}

input = fs.open(source);
output = new BufferedOutputStream(new FileOutputStream(dest));
byte[] buffer = new byte[bufferSize];
Expand All @@ -445,10 +468,16 @@ private void copyFileWithCheckSum(FileSystem fs,
output.write(buffer, 0, read);
}

if(fileCheckSumGenerator != null)
// Update the per file checksum
if(fileCheckSumGenerator != null) {
fileCheckSumGenerator.update(buffer, 0, read);
if(throttler != null)
}

// Check if we need to throttle the fetch
if(throttler != null) {
throttler.maybeThrottle(read);
}

stats.recordBytes(read);
if(stats.getBytesSinceLastReport() > reportingIntervalBytes) {
NumberFormat format = NumberFormat.getNumberInstance();
Expand Down Expand Up @@ -491,6 +520,7 @@ private void copyFileWithCheckSum(FileSystem fs,
}
logger.debug("Completed copy of " + source + " to " + dest);
}
return fileCheckSumGenerator;
}

private long sizeOfPath(FileSystem fs, Path path) throws IOException {
Expand Down

0 comments on commit b8f3a8d

Please sign in to comment.