Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Refactoring reducer logic

  • Loading branch information...
commit 9a78082336c2b687682c07333418916d192028a7 1 parent c13fbeb
@abh1nay abh1nay authored
View
1  .classpath
@@ -56,5 +56,6 @@
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="lib" path="lib/joda-time-1.6.jar"/>
<classpathentry kind="lib" path="lib/mail-1.4.1.jar"/>
+ <classpathentry kind="lib" path="lib/azkaban-common-0.05.jar"/>
<classpathentry kind="output" path="classes"/>
</classpath>
View
239 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilder.java
@@ -17,10 +17,15 @@
package voldemort.store.readonly.mr;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroOutputFormat;
+import org.apache.avro.mapred.Pair;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -36,6 +41,7 @@
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.log4j.Logger;
@@ -48,6 +54,7 @@
import voldemort.store.readonly.ReadOnlyStorageMetadata;
import voldemort.store.readonly.checksum.CheckSum;
import voldemort.store.readonly.checksum.CheckSum.CheckSumType;
+import voldemort.store.readonly.disk.KeyValueWriter;
import voldemort.utils.Utils;
import voldemort.xml.ClusterMapper;
import voldemort.xml.StoreDefinitionsMapper;
@@ -66,7 +73,7 @@
private static final Logger logger = Logger.getLogger(HadoopStoreBuilder.class);
private final Configuration config;
- private final Class<? extends AbstractHadoopStoreBuilderMapper<?, ?>> mapperClass;
+ private final Class mapperClass;
@SuppressWarnings("unchecked")
private final Class<? extends InputFormat> inputFormatClass;
private final Cluster cluster;
@@ -99,7 +106,7 @@
@SuppressWarnings("unchecked")
@Deprecated
public HadoopStoreBuilder(Configuration conf,
- Class<? extends AbstractHadoopStoreBuilderMapper<?, ?>> mapperClass,
+ Class mapperClass,
Class<? extends InputFormat> inputFormatClass,
Cluster cluster,
StoreDefinition storeDef,
@@ -135,7 +142,7 @@ public HadoopStoreBuilder(Configuration conf,
*/
@SuppressWarnings("unchecked")
public HadoopStoreBuilder(Configuration conf,
- Class<? extends AbstractHadoopStoreBuilderMapper<?, ?>> mapperClass,
+ Class mapperClass,
Class<? extends InputFormat> inputFormatClass,
Cluster cluster,
StoreDefinition storeDef,
@@ -175,7 +182,7 @@ public HadoopStoreBuilder(Configuration conf,
*/
@SuppressWarnings("unchecked")
public HadoopStoreBuilder(Configuration conf,
- Class<? extends AbstractHadoopStoreBuilderMapper<?, ?>> mapperClass,
+ Class mapperClass,
Class<? extends InputFormat> inputFormatClass,
Cluster cluster,
StoreDefinition storeDef,
@@ -218,7 +225,7 @@ public HadoopStoreBuilder(Configuration conf,
*/
@SuppressWarnings("unchecked")
public HadoopStoreBuilder(Configuration conf,
- Class<? extends AbstractHadoopStoreBuilderMapper<?, ?>> mapperClass,
+ Class mapperClass,
Class<? extends InputFormat> inputFormatClass,
Cluster cluster,
StoreDefinition storeDef,
@@ -265,7 +272,7 @@ public HadoopStoreBuilder(Configuration conf,
*/
@SuppressWarnings("unchecked")
public HadoopStoreBuilder(Configuration conf,
- Class<? extends AbstractHadoopStoreBuilderMapper<?, ?>> mapperClass,
+ Class mapperClass,
Class<? extends InputFormat> inputFormatClass,
Cluster cluster,
StoreDefinition storeDef,
@@ -391,14 +398,14 @@ public void build() {
if(saveKeys) {
if(reducerPerBucket) {
logger.info("Number of collisions in the job - "
- + counters.getCounter(HadoopStoreBuilderReducerPerBucket.CollisionCounter.NUM_COLLISIONS));
+ + counters.getCounter(KeyValueWriter.CollisionCounter.NUM_COLLISIONS));
logger.info("Maximum number of collisions for one entry - "
- + counters.getCounter(HadoopStoreBuilderReducerPerBucket.CollisionCounter.MAX_COLLISIONS));
+ + counters.getCounter(KeyValueWriter.CollisionCounter.MAX_COLLISIONS));
} else {
logger.info("Number of collisions in the job - "
- + counters.getCounter(HadoopStoreBuilderReducer.CollisionCounter.NUM_COLLISIONS));
+ + counters.getCounter(KeyValueWriter.CollisionCounter.NUM_COLLISIONS));
logger.info("Maximum number of collisions for one entry - "
- + counters.getCounter(HadoopStoreBuilderReducer.CollisionCounter.MAX_COLLISIONS));
+ + counters.getCounter(KeyValueWriter.CollisionCounter.MAX_COLLISIONS));
}
}
@@ -491,6 +498,218 @@ public boolean accept(Path arg0) {
}
/**
+ * Run the job
+ */
+ public void buildAvro() {
+ try {
+ JobConf conf = new JobConf(config);
+ conf.setInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+ conf.set("cluster.xml", new ClusterMapper().writeCluster(cluster));
+ conf.set("stores.xml",
+ new StoreDefinitionsMapper().writeStoreList(Collections.singletonList(storeDef)));
+ conf.setBoolean("save.keys", saveKeys);
+ conf.setBoolean("reducer.per.bucket", reducerPerBucket);
+ conf.setPartitionerClass(AvroStoreBuilderPartitioner.class);
+ // conf.setMapperClass(mapperClass);
+ conf.setMapOutputKeyClass(ByteBuffer.class);
+ conf.setMapOutputValueClass(ByteBuffer.class);
+
+ conf.setInputFormat(inputFormatClass);
+
+ conf.setOutputFormat((Class<? extends OutputFormat>) AvroOutputFormat.class);
+ conf.setOutputKeyClass(ByteBuffer.class);
+ conf.setOutputValueClass(ByteBuffer.class);
+ conf.setJarByClass(getClass());
+ conf.setReduceSpeculativeExecution(false);
+ FileInputFormat.setInputPaths(conf, inputPath);
+ conf.set("final.output.dir", outputDir.toString());
+ conf.set("checksum.type", CheckSum.toString(checkSumType));
+ FileOutputFormat.setOutputPath(conf, tempDir);
+
+ FileSystem outputFs = outputDir.getFileSystem(conf);
+ if(outputFs.exists(outputDir)) {
+ throw new IOException("Final output directory already exists.");
+ }
+
+ // delete output dir if it already exists
+ FileSystem tempFs = tempDir.getFileSystem(conf);
+ tempFs.delete(tempDir, true);
+
+ long size = sizeOfPath(tempFs, inputPath);
+ logger.info("Data size = " + size + ", replication factor = "
+ + storeDef.getReplicationFactor() + ", numNodes = "
+ + cluster.getNumberOfNodes() + ", chunk size = " + chunkSizeBytes);
+
+ // Derive "rough" number of chunks and reducers
+ int numReducers;
+ if(saveKeys) {
+
+ if(this.numChunks == -1) {
+ this.numChunks = Math.max((int) (storeDef.getReplicationFactor() * size
+ / cluster.getNumberOfPartitions()
+ / storeDef.getReplicationFactor() / chunkSizeBytes),
+ 1);
+ } else {
+ logger.info("Overriding chunk size byte and taking num chunks ("
+ + this.numChunks + ") directly");
+ }
+
+ if(reducerPerBucket) {
+ numReducers = cluster.getNumberOfPartitions() * storeDef.getReplicationFactor();
+ } else {
+ numReducers = cluster.getNumberOfPartitions() * storeDef.getReplicationFactor()
+ * numChunks;
+ }
+ } else {
+
+ if(this.numChunks == -1) {
+ this.numChunks = Math.max((int) (storeDef.getReplicationFactor() * size
+ / cluster.getNumberOfPartitions() / chunkSizeBytes),
+ 1);
+ } else {
+ logger.info("Overriding chunk size byte and taking num chunks ("
+ + this.numChunks + ") directly");
+ }
+
+ if(reducerPerBucket) {
+ numReducers = cluster.getNumberOfPartitions();
+ } else {
+ numReducers = cluster.getNumberOfPartitions() * numChunks;
+ }
+ }
+ conf.setInt("num.chunks", numChunks);
+ conf.setNumReduceTasks(numReducers);
+
+ conf.setSpeculativeExecution(false);
+
+ System.out.println(config.get("avro.rec.schema"));
+ AvroJob.setInputSchema(conf, Schema.parse(config.get("avro.rec.schema")));
+
+ AvroJob.setOutputSchema(conf,
+ Pair.getPairSchema(Schema.create(Schema.Type.BYTES),
+ Schema.create(Schema.Type.BYTES)));
+
+ AvroJob.setMapperClass(conf, mapperClass);
+
+ if(reducerPerBucket) {
+ conf.setReducerClass(AvroStoreBuilderReducerPerBucket.class);
+ } else {
+ conf.setReducerClass(AvroStoreBuilderReducer.class);
+ }
+
+ logger.info("Number of chunks: " + numChunks + ", number of reducers: " + numReducers
+ + ", save keys: " + saveKeys + ", reducerPerBucket: " + reducerPerBucket);
+ logger.info("Building store...");
+
+ RunningJob job = JobClient.runJob(conf);
+
+ // Once the job has completed log the counter
+ Counters counters = job.getCounters();
+
+ if(saveKeys) {
+ if(reducerPerBucket) {
+ logger.info("Number of collisions in the job - "
+ + counters.getCounter(KeyValueWriter.CollisionCounter.NUM_COLLISIONS));
+ logger.info("Maximum number of collisions for one entry - "
+ + counters.getCounter(KeyValueWriter.CollisionCounter.MAX_COLLISIONS));
+ } else {
+ logger.info("Number of collisions in the job - "
+ + counters.getCounter(KeyValueWriter.CollisionCounter.NUM_COLLISIONS));
+ logger.info("Maximum number of collisions for one entry - "
+ + counters.getCounter(KeyValueWriter.CollisionCounter.MAX_COLLISIONS));
+ }
+ }
+
+ // Do a CheckSumOfCheckSum - Similar to HDFS
+ CheckSum checkSumGenerator = CheckSum.getInstance(this.checkSumType);
+ if(!this.checkSumType.equals(CheckSumType.NONE) && checkSumGenerator == null) {
+ throw new VoldemortException("Could not generate checksum digest for type "
+ + this.checkSumType);
+ }
+
+ // Check if all folder exists and with format file
+ for(Node node: cluster.getNodes()) {
+
+ ReadOnlyStorageMetadata metadata = new ReadOnlyStorageMetadata();
+
+ if(saveKeys) {
+ metadata.add(ReadOnlyStorageMetadata.FORMAT,
+ ReadOnlyStorageFormat.READONLY_V2.getCode());
+ } else {
+ metadata.add(ReadOnlyStorageMetadata.FORMAT,
+ ReadOnlyStorageFormat.READONLY_V1.getCode());
+ }
+
+ Path nodePath = new Path(outputDir.toString(), "node-" + node.getId());
+
+ if(!outputFs.exists(nodePath)) {
+ logger.info("No data generated for node " + node.getId()
+ + ". Generating empty folder");
+ outputFs.mkdirs(nodePath); // Create empty folder
+ }
+
+ if(checkSumType != CheckSumType.NONE) {
+
+ FileStatus[] storeFiles = outputFs.listStatus(nodePath, new PathFilter() {
+
+ @Override
+ public boolean accept(Path arg0) {
+ if(arg0.getName().endsWith("checksum")
+ && !arg0.getName().startsWith(".")) {
+ return true;
+ }
+ return false;
+ }
+ });
+
+ if(storeFiles != null && storeFiles.length > 0) {
+ Arrays.sort(storeFiles, new IndexFileLastComparator());
+ FSDataInputStream input = null;
+
+ for(FileStatus file: storeFiles) {
+ try {
+ input = outputFs.open(file.getPath());
+ byte fileCheckSum[] = new byte[CheckSum.checkSumLength(this.checkSumType)];
+ input.read(fileCheckSum);
+ logger.debug("Checksum for file " + file.toString() + " - "
+ + new String(Hex.encodeHex(fileCheckSum)));
+ checkSumGenerator.update(fileCheckSum);
+ } catch(Exception e) {
+ logger.error("Error while reading checksum file " + e.getMessage(),
+ e);
+ } finally {
+ if(input != null)
+ input.close();
+ }
+ outputFs.delete(file.getPath(), false);
+ }
+
+ metadata.add(ReadOnlyStorageMetadata.CHECKSUM_TYPE,
+ CheckSum.toString(checkSumType));
+
+ String checkSum = new String(Hex.encodeHex(checkSumGenerator.getCheckSum()));
+ logger.info("Checksum for node " + node.getId() + " - " + checkSum);
+
+ metadata.add(ReadOnlyStorageMetadata.CHECKSUM, checkSum);
+ }
+ }
+
+ // Write metadata
+ FSDataOutputStream metadataStream = outputFs.create(new Path(nodePath, ".metadata"));
+ metadataStream.write(metadata.toJsonString().getBytes());
+ metadataStream.flush();
+ metadataStream.close();
+
+ }
+
+ } catch(Exception e) {
+ logger.error("Error in Store builder", e);
+ throw new VoldemortException(e);
+ }
+
+ }
+
+ /**
* A comparator that sorts index files last. This is required to maintain
* the order while calculating checksum
*
View
255 .../hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderReducer.java
@@ -16,64 +16,29 @@
package voldemort.store.readonly.mr;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.log4j.Logger;
-import voldemort.VoldemortException;
-import voldemort.store.readonly.ReadOnlyUtils;
-import voldemort.store.readonly.checksum.CheckSum;
-import voldemort.store.readonly.checksum.CheckSum.CheckSumType;
-import voldemort.utils.ByteUtils;
+import voldemort.store.readonly.disk.HadoopStoreWriter;
+import voldemort.store.readonly.disk.KeyValueWriter;
+import azkaban.common.utils.Utils;
/**
* Take key md5s and value bytes and build a read-only store from these values
*/
@SuppressWarnings("deprecation")
-public class HadoopStoreBuilderReducer extends AbstractStoreBuilderConfigurable implements
- Reducer<BytesWritable, BytesWritable, Text, Text> {
+public class HadoopStoreBuilderReducer implements Reducer<BytesWritable, BytesWritable, Text, Text> {
- private static final Logger logger = Logger.getLogger(HadoopStoreBuilderReducer.class);
-
- private DataOutputStream indexFileStream = null;
- private DataOutputStream valueFileStream = null;
- private int position;
- private String taskId = null;
-
- private int nodeId = -1;
- private int partitionId = -1;
- private int chunkId = -1;
- private int replicaType = -1;
-
- private Path taskIndexFileName;
- private Path taskValueFileName;
-
- private JobConf conf;
- private CheckSumType checkSumType;
- private CheckSum checkSumDigestIndex;
- private CheckSum checkSumDigestValue;
-
- private String outputDir;
-
- private FileSystem fs;
-
- protected static enum CollisionCounter {
- NUM_COLLISIONS,
- MAX_COLLISIONS;
- }
+ String keyValueWriterClass;
+ @SuppressWarnings("rawtypes")
+ KeyValueWriter writer;
/**
* Reduce should get sorted MD5 of Voldemort key ( either 16 bytes if saving
@@ -82,220 +47,36 @@
* partition-id, replica-type, [key-size, value-size, key, value]* if saving
* keys is enabled
*/
+ @SuppressWarnings("unchecked")
public void reduce(BytesWritable key,
Iterator<BytesWritable> iterator,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
- // Write key and position
- this.indexFileStream.write(key.get(), 0, key.getSize());
- this.indexFileStream.writeInt(this.position);
-
- // Run key through checksum digest
- if(this.checkSumDigestIndex != null) {
- this.checkSumDigestIndex.update(key.get(), 0, key.getSize());
- this.checkSumDigestIndex.update(this.position);
- }
-
- short numTuples = 0;
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- DataOutputStream valueStream = new DataOutputStream(stream);
-
- while(iterator.hasNext()) {
- BytesWritable writable = iterator.next();
- byte[] valueBytes = writable.get();
- int offsetTillNow = 0;
-
- // Read node Id
- if(this.nodeId == -1)
- this.nodeId = ByteUtils.readInt(valueBytes, offsetTillNow);
- offsetTillNow += ByteUtils.SIZE_OF_INT;
-
- // Read partition id
- if(this.partitionId == -1)
- this.partitionId = ByteUtils.readInt(valueBytes, offsetTillNow);
- offsetTillNow += ByteUtils.SIZE_OF_INT;
-
- // Read chunk id
- if(this.chunkId == -1)
- this.chunkId = ReadOnlyUtils.chunk(key.get(), getNumChunks());
-
- // Read replica type
- if(getSaveKeys()) {
- if(this.replicaType == -1)
- this.replicaType = (int) ByteUtils.readBytes(valueBytes,
- offsetTillNow,
- ByteUtils.SIZE_OF_BYTE);
- offsetTillNow += ByteUtils.SIZE_OF_BYTE;
- }
-
- int valueLength = writable.getSize() - offsetTillNow;
- if(getSaveKeys()) {
- // Write ( key_length, value_length, key,
- // value )
- valueStream.write(valueBytes, offsetTillNow, valueLength);
- } else {
- // Write (value_length + value)
- valueStream.writeInt(valueLength);
- valueStream.write(valueBytes, offsetTillNow, valueLength);
- }
-
- numTuples++;
-
- // If we have multiple values for this md5 that is a collision,
- // throw an exception--either the data itself has duplicates, there
- // are trillions of keys, or someone is attempting something
- // malicious ( We obviously expect collisions when we save keys )
- if(!getSaveKeys() && numTuples > 1)
- throw new VoldemortException("Duplicate keys detected for md5 sum "
- + ByteUtils.toHexString(ByteUtils.copy(key.get(),
- 0,
- key.getSize())));
-
- }
-
- if(numTuples < 0) {
- // Overflow
- throw new VoldemortException("Found too many collisions: chunk " + chunkId
- + " has exceeded " + Short.MAX_VALUE + " collisions.");
- } else if(numTuples > 1) {
- // Update number of collisions + max keys per collision
- reporter.incrCounter(CollisionCounter.NUM_COLLISIONS, 1);
-
- long numCollisions = reporter.getCounter(CollisionCounter.MAX_COLLISIONS).getCounter();
- if(numTuples > numCollisions) {
- reporter.incrCounter(CollisionCounter.MAX_COLLISIONS, numTuples - numCollisions);
- }
- }
-
- // Flush the value
- valueStream.flush();
- byte[] value = stream.toByteArray();
-
- // Start writing to file now
- // First, if save keys flag set the number of keys
- if(getSaveKeys()) {
-
- this.valueFileStream.writeShort(numTuples);
- this.position += ByteUtils.SIZE_OF_SHORT;
-
- if(this.checkSumDigestValue != null) {
- this.checkSumDigestValue.update(numTuples);
- }
- }
-
- this.valueFileStream.write(value);
- this.position += value.length;
-
- if(this.checkSumDigestValue != null) {
- this.checkSumDigestValue.update(value);
- }
-
- if(this.position < 0)
- throw new VoldemortException("Chunk overflow exception: chunk " + chunkId
- + " has exceeded " + Integer.MAX_VALUE + " bytes.");
+ writer.write(key, iterator, reporter);
}
@Override
public void configure(JobConf job) {
- super.configure(job);
- try {
- this.conf = job;
- this.position = 0;
- this.outputDir = job.get("final.output.dir");
- this.taskId = job.get("mapred.task.id");
- this.checkSumType = CheckSum.fromString(job.get("checksum.type"));
- this.checkSumDigestIndex = CheckSum.getInstance(checkSumType);
- this.checkSumDigestValue = CheckSum.getInstance(checkSumType);
- this.taskIndexFileName = new Path(FileOutputFormat.getOutputPath(job), getStoreName()
- + "."
- + this.taskId
- + ".index");
- this.taskValueFileName = new Path(FileOutputFormat.getOutputPath(job), getStoreName()
- + "."
- + this.taskId
- + ".data");
-
- if(this.fs == null)
- this.fs = this.taskIndexFileName.getFileSystem(job);
+ try {
- this.indexFileStream = fs.create(this.taskIndexFileName);
- this.valueFileStream = fs.create(this.taskValueFileName);
+ keyValueWriterClass = job.get("writer.class");
+ if(keyValueWriterClass != null)
+ writer = (KeyValueWriter) Utils.callConstructor(keyValueWriterClass);
+ else
+ writer = new HadoopStoreWriter();
- logger.info("Opening " + this.taskIndexFileName + " and " + this.taskValueFileName
- + " for writing.");
+ writer.conf(job);
- } catch(IOException e) {
+ } catch(Exception e) {
throw new RuntimeException("Failed to open Input/OutputStream", e);
}
}
@Override
public void close() throws IOException {
-
- this.indexFileStream.close();
- this.valueFileStream.close();
-
- if(this.nodeId == -1 || this.chunkId == -1 || this.partitionId == -1) {
- // Issue 258 - No data was read in the reduce phase, do not create
- // any output
- return;
- }
-
- // If the replica type read was not valid, shout out
- if(getSaveKeys() && this.replicaType == -1) {
- throw new RuntimeException("Could not read the replica type correctly for node "
- + nodeId + " ( partition - " + this.partitionId + " )");
- }
-
- String fileNamePrefix = null;
- if(getSaveKeys()) {
- fileNamePrefix = new String(Integer.toString(this.partitionId) + "_"
- + Integer.toString(this.replicaType) + "_"
- + Integer.toString(this.chunkId));
- } else {
- fileNamePrefix = new String(Integer.toString(this.partitionId) + "_"
- + Integer.toString(this.chunkId));
- }
-
- // Initialize the node directory
- Path nodeDir = new Path(this.outputDir, "node-" + this.nodeId);
-
- // Create output directory, if it doesn't exist
- FileSystem outputFs = nodeDir.getFileSystem(this.conf);
- outputFs.mkdirs(nodeDir);
-
- // Write the checksum and output files
- if(this.checkSumType != CheckSumType.NONE) {
-
- if(this.checkSumDigestIndex != null && this.checkSumDigestValue != null) {
- Path checkSumIndexFile = new Path(nodeDir, fileNamePrefix + ".index.checksum");
- Path checkSumValueFile = new Path(nodeDir, fileNamePrefix + ".data.checksum");
-
- FSDataOutputStream output = outputFs.create(checkSumIndexFile);
- output.write(this.checkSumDigestIndex.getCheckSum());
- output.close();
-
- output = outputFs.create(checkSumValueFile);
- output.write(this.checkSumDigestValue.getCheckSum());
- output.close();
- } else {
- throw new RuntimeException("Failed to open checksum digest for node " + nodeId
- + " ( partition - " + this.partitionId + ", chunk - "
- + chunkId + " )");
- }
- }
-
- // Generate the final chunk files
- Path indexFile = new Path(nodeDir, fileNamePrefix + ".index");
- Path valueFile = new Path(nodeDir, fileNamePrefix + ".data");
-
- logger.info("Moving " + this.taskIndexFileName + " to " + indexFile);
- outputFs.rename(taskIndexFileName, indexFile);
- logger.info("Moving " + this.taskValueFileName + " to " + valueFile);
- outputFs.rename(this.taskValueFileName, valueFile);
-
+ writer.close();
}
}
View
265 ...tore-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderReducerPerBucket.java
@@ -16,28 +16,20 @@
package voldemort.store.readonly.mr;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-import voldemort.VoldemortException;
-import voldemort.store.readonly.ReadOnlyUtils;
-import voldemort.store.readonly.checksum.CheckSum;
-import voldemort.store.readonly.checksum.CheckSum.CheckSumType;
-import voldemort.utils.ByteUtils;
+import voldemort.store.readonly.disk.HadoopStoreWriterPerBucket;
+import voldemort.store.readonly.disk.KeyValueWriter;
+import azkaban.common.utils.Utils;
/**
* Take key md5s and value bytes and build a read-only store from these values
@@ -48,31 +40,9 @@
private static final Logger logger = Logger.getLogger(HadoopStoreBuilderReducerPerBucket.class);
- private DataOutputStream[] indexFileStream = null;
- private DataOutputStream[] valueFileStream = null;
- private int[] position;
- private String taskId = null;
-
- private int nodeId = -1;
- private int partitionId = -1;
- private int replicaType = -1;
-
- private Path[] taskIndexFileName;
- private Path[] taskValueFileName;
-
- private JobConf conf;
- private CheckSumType checkSumType;
- private CheckSum[] checkSumDigestIndex;
- private CheckSum[] checkSumDigestValue;
-
- private String outputDir;
-
- private FileSystem fs;
-
- protected static enum CollisionCounter {
- NUM_COLLISIONS,
- MAX_COLLISIONS;
- }
+ String keyValueWriterClass;
+ @SuppressWarnings("rawtypes")
+ KeyValueWriter writer;
/**
* Reduce should get sorted MD5 of Voldemort key ( either 16 bytes if saving
@@ -81,237 +51,36 @@
* partition-id, replica-type, [key-size, value-size, key, value]* if saving
* keys is enabled
*/
+ @SuppressWarnings("unchecked")
public void reduce(BytesWritable key,
Iterator<BytesWritable> iterator,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
- // Read chunk id
- int chunkId = ReadOnlyUtils.chunk(key.get(), getNumChunks());
-
- // Write key and position
- this.indexFileStream[chunkId].write(key.get(), 0, key.getSize());
- this.indexFileStream[chunkId].writeInt(this.position[chunkId]);
-
- // Run key through checksum digest
- if(this.checkSumDigestIndex[chunkId] != null) {
- this.checkSumDigestIndex[chunkId].update(key.get(), 0, key.getSize());
- this.checkSumDigestIndex[chunkId].update(this.position[chunkId]);
- }
-
- short numTuples = 0;
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- DataOutputStream valueStream = new DataOutputStream(stream);
-
- while(iterator.hasNext()) {
- BytesWritable writable = iterator.next();
- byte[] valueBytes = writable.get();
- int offsetTillNow = 0;
-
- // Read node Id
- if(this.nodeId == -1)
- this.nodeId = ByteUtils.readInt(valueBytes, offsetTillNow);
- offsetTillNow += ByteUtils.SIZE_OF_INT;
-
- // Read partition id
- if(this.partitionId == -1)
- this.partitionId = ByteUtils.readInt(valueBytes, offsetTillNow);
- offsetTillNow += ByteUtils.SIZE_OF_INT;
-
- // Read replica type
- if(getSaveKeys()) {
- if(this.replicaType == -1)
- this.replicaType = (int) ByteUtils.readBytes(valueBytes,
- offsetTillNow,
- ByteUtils.SIZE_OF_BYTE);
- offsetTillNow += ByteUtils.SIZE_OF_BYTE;
- }
-
- int valueLength = writable.getSize() - offsetTillNow;
- if(getSaveKeys()) {
- // Write ( key_length, value_length, key,
- // value )
- valueStream.write(valueBytes, offsetTillNow, valueLength);
- } else {
- // Write (value_length + value)
- valueStream.writeInt(valueLength);
- valueStream.write(valueBytes, offsetTillNow, valueLength);
- }
-
- numTuples++;
-
- // If we have multiple values for this md5 that is a collision,
- // throw an exception--either the data itself has duplicates, there
- // are trillions of keys, or someone is attempting something
- // malicious ( We obviously expect collisions when we save keys )
- if(!getSaveKeys() && numTuples > 1)
- throw new VoldemortException("Duplicate keys detected for md5 sum "
- + ByteUtils.toHexString(ByteUtils.copy(key.get(),
- 0,
- key.getSize())));
-
- }
-
- if(numTuples < 0) {
- // Overflow
- throw new VoldemortException("Found too many collisions: chunk " + chunkId
- + " has exceeded " + Short.MAX_VALUE + " collisions.");
- } else if(numTuples > 1) {
- // Update number of collisions + max keys per collision
- reporter.incrCounter(CollisionCounter.NUM_COLLISIONS, 1);
-
- long numCollisions = reporter.getCounter(CollisionCounter.MAX_COLLISIONS).getCounter();
- if(numTuples > numCollisions) {
- reporter.incrCounter(CollisionCounter.MAX_COLLISIONS, numTuples - numCollisions);
- }
- }
-
- // Flush the value
- valueStream.flush();
- byte[] value = stream.toByteArray();
-
- // Start writing to file now
- // First, if save keys flag set the number of keys
- if(getSaveKeys()) {
-
- this.valueFileStream[chunkId].writeShort(numTuples);
- this.position[chunkId] += ByteUtils.SIZE_OF_SHORT;
-
- if(this.checkSumDigestValue[chunkId] != null) {
- this.checkSumDigestValue[chunkId].update(numTuples);
- }
- }
-
- this.valueFileStream[chunkId].write(value);
- this.position[chunkId] += value.length;
-
- if(this.checkSumDigestValue[chunkId] != null) {
- this.checkSumDigestValue[chunkId].update(value);
- }
-
- if(this.position[chunkId] < 0)
- throw new VoldemortException("Chunk overflow exception: chunk " + chunkId
- + " has exceeded " + Integer.MAX_VALUE + " bytes.");
+ writer.write(key, iterator, reporter);
}
@Override
public void configure(JobConf job) {
- super.configure(job);
- try {
- this.conf = job;
- this.outputDir = job.get("final.output.dir");
- this.taskId = job.get("mapred.task.id");
- this.checkSumType = CheckSum.fromString(job.get("checksum.type"));
-
- this.checkSumDigestIndex = new CheckSum[getNumChunks()];
- this.checkSumDigestValue = new CheckSum[getNumChunks()];
- this.position = new int[getNumChunks()];
- this.taskIndexFileName = new Path[getNumChunks()];
- this.taskValueFileName = new Path[getNumChunks()];
- this.indexFileStream = new DataOutputStream[getNumChunks()];
- this.valueFileStream = new DataOutputStream[getNumChunks()];
-
- for(int chunkId = 0; chunkId < getNumChunks(); chunkId++) {
-
- this.checkSumDigestIndex[chunkId] = CheckSum.getInstance(checkSumType);
- this.checkSumDigestValue[chunkId] = CheckSum.getInstance(checkSumType);
- this.position[chunkId] = 0;
-
- this.taskIndexFileName[chunkId] = new Path(FileOutputFormat.getOutputPath(job),
- getStoreName() + "."
- + Integer.toString(chunkId)
- + "_" + this.taskId + ".index");
- this.taskValueFileName[chunkId] = new Path(FileOutputFormat.getOutputPath(job),
- getStoreName() + "."
- + Integer.toString(chunkId)
- + "_" + this.taskId + ".data");
- if(this.fs == null)
- this.fs = this.taskIndexFileName[chunkId].getFileSystem(job);
+ try {
- this.indexFileStream[chunkId] = fs.create(this.taskIndexFileName[chunkId]);
- this.valueFileStream[chunkId] = fs.create(this.taskValueFileName[chunkId]);
+ keyValueWriterClass = job.get("writer.class");
+ if(keyValueWriterClass != null)
+ writer = (KeyValueWriter) Utils.callConstructor(keyValueWriterClass);
+ else
+ writer = new HadoopStoreWriterPerBucket();
- logger.info("Opening " + this.taskIndexFileName[chunkId] + " and "
- + this.taskValueFileName[chunkId] + " for writing.");
- }
+ writer.conf(job);
- } catch(IOException e) {
+ } catch(Exception e) {
throw new RuntimeException("Failed to open Input/OutputStream", e);
}
}
@Override
public void close() throws IOException {
-
- for(int chunkId = 0; chunkId < getNumChunks(); chunkId++) {
- this.indexFileStream[chunkId].close();
- this.valueFileStream[chunkId].close();
- }
-
- if(this.nodeId == -1 || this.partitionId == -1) {
- // Issue 258 - No data was read in the reduce phase, do not create
- // any output
- return;
- }
-
- // If the replica type read was not valid, shout out
- if(getSaveKeys() && this.replicaType == -1) {
- throw new RuntimeException("Could not read the replica type correctly for node "
- + nodeId + " ( partition - " + this.partitionId + " )");
- }
-
- String fileNamePrefix = null;
- if(getSaveKeys()) {
- fileNamePrefix = new String(Integer.toString(this.partitionId) + "_"
- + Integer.toString(this.replicaType) + "_");
- } else {
- fileNamePrefix = new String(Integer.toString(this.partitionId) + "_");
- }
-
- // Initialize the node directory
- Path nodeDir = new Path(this.outputDir, "node-" + this.nodeId);
-
- // Create output directory, if it doesn't exist
- FileSystem outputFs = nodeDir.getFileSystem(this.conf);
- outputFs.mkdirs(nodeDir);
-
- // Write the checksum and output files
- for(int chunkId = 0; chunkId < getNumChunks(); chunkId++) {
-
- String chunkFileName = fileNamePrefix + Integer.toString(chunkId);
- if(this.checkSumType != CheckSumType.NONE) {
-
- if(this.checkSumDigestIndex[chunkId] != null
- && this.checkSumDigestValue[chunkId] != null) {
- Path checkSumIndexFile = new Path(nodeDir, chunkFileName + ".index.checksum");
- Path checkSumValueFile = new Path(nodeDir, chunkFileName + ".data.checksum");
-
- FSDataOutputStream output = outputFs.create(checkSumIndexFile);
- output.write(this.checkSumDigestIndex[chunkId].getCheckSum());
- output.close();
-
- output = outputFs.create(checkSumValueFile);
- output.write(this.checkSumDigestValue[chunkId].getCheckSum());
- output.close();
- } else {
- throw new RuntimeException("Failed to open checksum digest for node " + nodeId
- + " ( partition - " + this.partitionId
- + ", chunk - " + chunkId + " )");
- }
- }
-
- // Generate the final chunk files
- Path indexFile = new Path(nodeDir, chunkFileName + ".index");
- Path valueFile = new Path(nodeDir, chunkFileName + ".data");
-
- logger.info("Moving " + this.taskIndexFileName[chunkId] + " to " + indexFile);
- fs.rename(taskIndexFileName[chunkId], indexFile);
- logger.info("Moving " + this.taskValueFileName[chunkId] + " to " + valueFile);
- fs.rename(this.taskValueFileName[chunkId], valueFile);
-
- }
-
+ writer.close();
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.