Skip to content
Browse files

Using a per file checksum generator in the file copy in HdfsFetcher. …

…This is used to handle the case where we might retry the copy in case of a Filesystem (hdfs) error.
  • Loading branch information...
1 parent 8f4e8b0 commit b8f3a8d6615ba41d9c4c93c72c1a4417d8b0d9b4 Chinmay Soman committed with abh1nay
View
60 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java
@@ -373,25 +373,28 @@ private boolean fetch(FileSystem fs, Path source, File dest, CopyStats stats)
logger.debug("Checksum from .metadata "
+ new String(Hex.encodeHex(origCheckSum)));
+
+ // Define the Global checksum generator
checkSumType = CheckSum.fromString(checkSumTypeString);
checkSumGenerator = CheckSum.getInstance(checkSumType);
- fileCheckSumGenerator = CheckSum.getInstance(checkSumType);
}
} else if(!status.getPath().getName().startsWith(".")) {
// Read other (.data , .index files)
File copyLocation = new File(dest, status.getPath().getName());
- copyFileWithCheckSum(fs,
- status.getPath(),
- copyLocation,
- stats,
- fileCheckSumGenerator);
+ fileCheckSumGenerator = copyFileWithCheckSum(fs,
+ status.getPath(),
+ copyLocation,
+ stats,
+ checkSumType);
if(fileCheckSumGenerator != null && checkSumGenerator != null) {
byte[] checkSum = fileCheckSumGenerator.getCheckSum();
- logger.debug("Checksum for " + status.getPath() + " - "
- + new String(Hex.encodeHex(checkSum)));
+ if(logger.isDebugEnabled()) {
+ logger.debug("Checksum for " + status.getPath() + " - "
+ + new String(Hex.encodeHex(checkSum)));
+ }
checkSumGenerator.update(checkSum);
}
}
@@ -422,11 +425,26 @@ private boolean fetch(FileSystem fs, Path source, File dest, CopyStats stats)
}
- private void copyFileWithCheckSum(FileSystem fs,
- Path source,
- File dest,
- CopyStats stats,
- CheckSum fileCheckSumGenerator) throws IOException {
+ /**
+ * Function to copy a file from the given filesystem with a checksum of type
+ * 'checkSumType' computed and returned. In case an error occurs during such
+ * a copy, we do a retry for a maximum of NUM_RETRIES
+ *
+ * @param fs Filesystem used to copy the file
+ * @param source Source path of the file to copy
+ * @param dest Destination path of the file on the local machine
+ * @param stats Stats for measuring the transfer progress
+ * @param checkSumType Type of the Checksum to be computed for this file
+ * @return A Checksum (generator) of type checkSumType which contains the
+ * computed checksum of the copied file
+ * @throws IOException
+ */
+ private CheckSum copyFileWithCheckSum(FileSystem fs,
+ Path source,
+ File dest,
+ CopyStats stats,
+ CheckSumType checkSumType) throws IOException {
+ CheckSum fileCheckSumGenerator = null;
logger.debug("Starting copy of " + source + " to " + dest);
FSDataInputStream input = null;
OutputStream output = null;
@@ -434,6 +452,11 @@ private void copyFileWithCheckSum(FileSystem fs,
boolean success = true;
try {
+ // Create a per file checksum generator
+ if(checkSumType != null) {
+ fileCheckSumGenerator = CheckSum.getInstance(checkSumType);
+ }
+
input = fs.open(source);
output = new BufferedOutputStream(new FileOutputStream(dest));
byte[] buffer = new byte[bufferSize];
@@ -445,10 +468,16 @@ private void copyFileWithCheckSum(FileSystem fs,
output.write(buffer, 0, read);
}
- if(fileCheckSumGenerator != null)
+ // Update the per file checksum
+ if(fileCheckSumGenerator != null) {
fileCheckSumGenerator.update(buffer, 0, read);
- if(throttler != null)
+ }
+
+ // Check if we need to throttle the fetch
+ if(throttler != null) {
throttler.maybeThrottle(read);
+ }
+
stats.recordBytes(read);
if(stats.getBytesSinceLastReport() > reportingIntervalBytes) {
NumberFormat format = NumberFormat.getNumberInstance();
@@ -491,6 +520,7 @@ private void copyFileWithCheckSum(FileSystem fs,
}
logger.debug("Completed copy of " + source + " to " + dest);
}
+ return fileCheckSumGenerator;
}
private long sizeOfPath(FileSystem fs, Path path) throws IOException {

0 comments on commit b8f3a8d

Please sign in to comment.
Something went wrong with that request. Please try again.