From b8f3a8d6615ba41d9c4c93c72c1a4417d8b0d9b4 Mon Sep 17 00:00:00 2001 From: Chinmay Soman Date: Thu, 21 Mar 2013 10:34:12 -0700 Subject: [PATCH] Using a per file checksum generator in the file copy in HdfsFetcher. This is used to handle the case where we might retry the copy in case of a Filesystem (hdfs) error. --- .../store/readonly/fetcher/HdfsFetcher.java | 60 ++++++++++++++----- 1 file changed, 45 insertions(+), 15 deletions(-) diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java index 8c67ea8c1c..b9c7b86783 100644 --- a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java @@ -373,25 +373,28 @@ private boolean fetch(FileSystem fs, Path source, File dest, CopyStats stats) logger.debug("Checksum from .metadata " + new String(Hex.encodeHex(origCheckSum))); + + // Define the Global checksum generator checkSumType = CheckSum.fromString(checkSumTypeString); checkSumGenerator = CheckSum.getInstance(checkSumType); - fileCheckSumGenerator = CheckSum.getInstance(checkSumType); } } else if(!status.getPath().getName().startsWith(".")) { // Read other (.data , .index files) File copyLocation = new File(dest, status.getPath().getName()); - copyFileWithCheckSum(fs, - status.getPath(), - copyLocation, - stats, - fileCheckSumGenerator); + fileCheckSumGenerator = copyFileWithCheckSum(fs, + status.getPath(), + copyLocation, + stats, + checkSumType); if(fileCheckSumGenerator != null && checkSumGenerator != null) { byte[] checkSum = fileCheckSumGenerator.getCheckSum(); - logger.debug("Checksum for " + status.getPath() + " - " - + new String(Hex.encodeHex(checkSum))); + if(logger.isDebugEnabled()) { + logger.debug("Checksum for " + status.getPath() + " - " + + new String(Hex.encodeHex(checkSum))); + } checkSumGenerator.update(checkSum); } } @@ -422,11 +425,26 @@ private boolean fetch(FileSystem fs, Path source, File dest, CopyStats stats) } - private void copyFileWithCheckSum(FileSystem fs, - Path source, - File dest, - CopyStats stats, - CheckSum fileCheckSumGenerator) throws IOException { + /** + * Function to copy a file from the given filesystem with a checksum of type + * 'checkSumType' computed and returned. In case an error occurs during such + * a copy, we do a retry for a maximum of NUM_RETRIES + * + * @param fs Filesystem used to copy the file + * @param source Source path of the file to copy + * @param dest Destination path of the file on the local machine + * @param stats Stats for measuring the transfer progress + * @param checkSumType Type of the Checksum to be computed for this file + * @return A Checksum (generator) of type checkSumType which contains the + * computed checksum of the copied file + * @throws IOException + */ + private CheckSum copyFileWithCheckSum(FileSystem fs, + Path source, + File dest, + CopyStats stats, + CheckSumType checkSumType) throws IOException { + CheckSum fileCheckSumGenerator = null; logger.debug("Starting copy of " + source + " to " + dest); FSDataInputStream input = null; OutputStream output = null; @@ -434,6 +452,11 @@ private void copyFileWithCheckSum(FileSystem fs, boolean success = true; try { + // Create a per file checksum generator + if(checkSumType != null) { + fileCheckSumGenerator = CheckSum.getInstance(checkSumType); + } + input = fs.open(source); output = new BufferedOutputStream(new FileOutputStream(dest)); byte[] buffer = new byte[bufferSize]; @@ -445,10 +468,16 @@ private void copyFileWithCheckSum(FileSystem fs, output.write(buffer, 0, read); } - if(fileCheckSumGenerator != null) + // Update the per file checksum + if(fileCheckSumGenerator != null) { fileCheckSumGenerator.update(buffer, 0, read); - if(throttler != null) + } + + // Check if we need to throttle the fetch + if(throttler != null) { throttler.maybeThrottle(read); + } + stats.recordBytes(read); if(stats.getBytesSinceLastReport() > reportingIntervalBytes) { NumberFormat format = NumberFormat.getNumberInstance(); @@ -491,6 +520,7 @@ private void copyFileWithCheckSum(FileSystem fs, } logger.debug("Completed copy of " + source + " to " + dest); } + return fileCheckSumGenerator; } private long sizeOfPath(FileSystem fs, Path path) throws IOException {