Skip to content

Commit

Permalink
Merge branch 'master' into release-0802
Browse files Browse the repository at this point in the history
  • Loading branch information
afeinberg committed Apr 27, 2010
2 parents d31b3d4 + fc05a68 commit 460e33d
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 45 deletions.
Expand Up @@ -37,8 +37,8 @@ public byte[] getCheckSum() {
}

@Override
public void update(byte[] input) {
checkSumGenerator.update(input);
}
public void update(byte[] input, int startIndex, int length) {
checkSumGenerator.update(input, startIndex, length);

}
}
Expand Up @@ -37,8 +37,7 @@ public byte[] getCheckSum() {
}

@Override
public void update(byte[] input) {
checkSumGenerator.update(input);
public void update(byte[] input, int startIndex, int length) {
checkSumGenerator.update(input, startIndex, length);
}

}
Expand Up @@ -22,12 +22,33 @@

public abstract class CheckSum {

/**
* Update the checksum buffer to include input with startIndex and length
*
* @param input
* @param startIndex
* @param length
*/
public abstract void update(byte[] input, int startIndex, int length);

/**
*
* @param number number to be stored in checksum buffer
*/
public void update(int number) {
byte[] numberInBytes = new byte[ByteUtils.SIZE_OF_INT];
ByteUtils.writeInt(numberInBytes, number, 0);
update(numberInBytes);
}

/**
* Update the checksum buffer to include input
*
* @param input bytes added to the buffer
*/
public abstract void update(byte[] input);
public void update(byte[] input) {
update(input, 0, input.length);
}

/**
* Get the checkSum of the buffer till now, after which buffer is reset
Expand Down
Expand Up @@ -33,8 +33,8 @@ public byte[] getCheckSum() {
}

@Override
public void update(byte[] input) {
checkSumGenerator.update(input);
public void update(byte[] input, int startIndex, int length) {
checkSumGenerator.update(input, startIndex, length);
}

}
Expand Up @@ -110,7 +110,6 @@ public File fetch(String fileUrl, String storeName) throws IOException {
if(result) {
return destination;
} else {
logger.error("Check sum failed for " + fileUrl);
return null;
}
} finally {
Expand All @@ -133,7 +132,7 @@ private boolean fetch(FileSystem fs,
byte[] origCheckSum = null;
CheckSumType checkSumType = CheckSumType.NONE;

// Do a MD5ofMD5s - Similar to HDFS
// Do a checksum of checksum - Similar to HDFS
CheckSum checkSumGenerator = null;
CheckSum fileCheckSumGenerator = null;

Expand Down Expand Up @@ -165,7 +164,7 @@ private boolean fetch(FileSystem fs,

}

// Check MD5
// Check checksum
if(checkSumType != CheckSumType.NONE) {
byte[] newCheckSum = checkSumGenerator.getCheckSum();
return (ByteUtils.compare(newCheckSum, origCheckSum) == 0);
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
Expand All @@ -41,7 +42,6 @@
import voldemort.store.StoreDefinition;
import voldemort.store.readonly.checksum.CheckSum;
import voldemort.store.readonly.checksum.CheckSum.CheckSumType;
import voldemort.utils.ByteUtils;
import voldemort.utils.Utils;
import voldemort.xml.ClusterMapper;
import voldemort.xml.StoreDefinitionsMapper;
Expand Down Expand Up @@ -162,6 +162,7 @@ public void build() {
conf.setJarByClass(getClass());
FileInputFormat.setInputPaths(conf, inputPath);
conf.set("final.output.dir", outputDir.toString());
conf.set("checksum.type", CheckSum.toString(checkSumType));
FileOutputFormat.setOutputPath(conf, tempDir);

try {
Expand Down Expand Up @@ -197,47 +198,43 @@ public void build() {

// Do a CheckSumOfCheckSum - Similar to HDFS
CheckSum checkSumGenerator = CheckSum.getInstance(this.checkSumType);
CheckSum fileCheckSumGenerator = CheckSum.getInstance(this.checkSumType);

if(checkSumGenerator == null || fileCheckSumGenerator == null) {
if(checkSumGenerator == null) {
throw new VoldemortException("Could not generate checksum digests");
}

for(FileStatus node: nodes) {
if(node.isDir()) {
FileStatus[] storeFiles = outputFs.listStatus(node.getPath());
FileStatus[] storeFiles = outputFs.listStatus(node.getPath(),
new PathFilter() {

public boolean accept(Path arg0) {
if(arg0.getName()
.endsWith("checksum")
&& !arg0.getName()
.startsWith(".")) {
return true;
}
return false;
}
});

if(storeFiles != null) {
Arrays.sort(storeFiles, new IndexFileLastComparator());

byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];

for(FileStatus status: storeFiles) {
if(!status.getPath().getName().startsWith(".")) {
FSDataInputStream input = outputFs.open(status.getPath());

while(true) {
int read = input.read(buffer);
if(read < 0)
break;
else if(read < DEFAULT_BUFFER_SIZE) {
buffer = ByteUtils.copy(buffer, 0, read);
}
fileCheckSumGenerator.update(buffer);
}
checkSumGenerator.update(fileCheckSumGenerator.getCheckSum());
}
for(FileStatus file: storeFiles) {
FSDataInputStream input = outputFs.open(file.getPath());
byte fileCheckSum[] = new byte[CheckSum.checkSumLength(this.checkSumType)];
input.read(fileCheckSum);
checkSumGenerator.update(fileCheckSum);
outputFs.delete(file.getPath(), true);
}

byte[] checkSumBytes = checkSumGenerator.getCheckSum();
FSDataOutputStream checkSumStream = outputFs.create(new Path(node.getPath(),
CheckSum.toString(checkSumType)
+ "checkSum.txt"));
checkSumStream.write(checkSumBytes);
checkSumStream.write(checkSumGenerator.getCheckSum());
checkSumStream.flush();
checkSumStream.close();

}

}
}
}
Expand All @@ -258,10 +255,6 @@ public int compare(FileStatus fs1, FileStatus fs2) {
// directories before files
if(fs1.isDir())
return fs2.isDir() ? 0 : -1;
else if(fs1.getPath().getName().endsWith("checkSum.txt"))
return -1;
else if(fs2.getPath().getName().endsWith("checkSum.txt"))
return 1;
// index files after all other files
else if(fs1.getPath().getName().endsWith(".index"))
return fs2.getPath().getName().endsWith(".index") ? 0 : 1;
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
Expand All @@ -33,6 +34,8 @@

import voldemort.VoldemortException;
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.store.readonly.checksum.CheckSum;
import voldemort.store.readonly.checksum.CheckSum.CheckSumType;
import voldemort.utils.ByteUtils;

/**
Expand All @@ -56,6 +59,9 @@ public class HadoopStoreBuilderReducer extends AbstractStoreBuilderConfigurable
private Path taskValueFileName;
private String outputDir;
private JobConf conf;
private CheckSumType checkSumType;
private CheckSum checkSumDigestIndex;
private CheckSum checkSumDigestValue;

/**
* Reduce should get sorted MD5 keys here with a single value (appended in
Expand All @@ -75,12 +81,16 @@ public void reduce(BytesWritable key,

// Write key and position
this.indexFileStream.write(key.get(), 0, key.getSize());
this.checkSumDigestIndex.update(key.get(), 0, key.getSize());
this.indexFileStream.writeInt(this.position);
this.checkSumDigestIndex.update(this.position);

// Write length and value
int valueLength = writable.getSize() - 4;
this.valueFileStream.writeInt(valueLength);
this.checkSumDigestValue.update(valueLength);
this.valueFileStream.write(valueBytes, 4, valueLength);
this.checkSumDigestValue.update(valueBytes, 4, valueLength);

this.position += 4 + valueLength;
if(this.position < 0)
Expand All @@ -106,6 +116,9 @@ public void configure(JobConf job) {
this.numChunks = job.getInt("num.chunks", -1);
this.outputDir = job.get("final.output.dir");
this.taskId = job.get("mapred.task.id");
this.checkSumType = CheckSum.fromString(job.get("checksum.type"));
this.checkSumDigestIndex = CheckSum.getInstance(checkSumType);
this.checkSumDigestValue = CheckSum.getInstance(checkSumType);

this.taskIndexFileName = new Path(FileOutputFormat.getOutputPath(job), getStoreName()
+ "."
Expand Down Expand Up @@ -140,6 +153,23 @@ public void close() throws IOException {
FileSystem fs = indexFile.getFileSystem(this.conf);
fs.mkdirs(nodeDir);

if(this.checkSumType != CheckSumType.NONE) {
if(this.checkSumDigestIndex != null && this.checkSumDigestValue != null) {
Path checkSumIndexFile = new Path(nodeDir, this.chunkId + ".index.checksum");
Path checkSumValueFile = new Path(nodeDir, this.chunkId + ".data.checksum");

FSDataOutputStream output = fs.create(checkSumIndexFile);
output.write(this.checkSumDigestIndex.getCheckSum());
output.close();

output = fs.create(checkSumValueFile);
output.write(this.checkSumDigestValue.getCheckSum());
output.close();
} else {
throw new VoldemortException("Failed to open CheckSum digest");
}
}

logger.info("Moving " + this.taskIndexFileName + " to " + indexFile + ".");
fs.rename(taskIndexFileName, indexFile);
logger.info("Moving " + this.taskValueFileName + " to " + valueFile + ".");
Expand Down
Expand Up @@ -170,9 +170,11 @@ private void doFetch(HttpServletRequest req, HttpServletResponse resp) throws IO
logger.info("Executing fetch of " + fetchUrl);
fetchDir = fileFetcher.fetch(fetchUrl, storeName);
if(fetchDir == null) {
fetchDir = new File(fetchUrl);
throw new ServletException("Checksum failed for " + fetchUrl + " and store name = "
+ storeName);
} else {
logger.info("Fetch complete.");
}
logger.info("Fetch complete.");
}
resp.getWriter().write(fetchDir.getAbsolutePath());
}
Expand Down

0 comments on commit 460e33d

Please sign in to comment.