diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AbstractHadoopStoreBuilderMapper.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AbstractHadoopStoreBuilderMapper.java index 95ef6582f3..ea18558da6 100644 --- a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AbstractHadoopStoreBuilderMapper.java +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AbstractHadoopStoreBuilderMapper.java @@ -26,7 +26,6 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; -import voldemort.cluster.Node; import voldemort.routing.ConsistentRoutingStrategy; import voldemort.serialization.DefaultSerializerFactory; import voldemort.serialization.Serializer; @@ -34,6 +33,7 @@ import voldemort.serialization.SerializerFactory; import voldemort.store.compress.CompressionStrategy; import voldemort.store.compress.CompressionStrategyFactory; +import voldemort.store.readonly.mr.utils.MapperKeyValueWriter; import voldemort.utils.ByteUtils; /** @@ -79,96 +79,28 @@ public void map(K key, byte[] keyBytes = keySerializer.toBytes(makeKey(key, value)); byte[] valBytes = valueSerializer.toBytes(makeValue(key, value)); - // Compress key and values if required - if(keySerializerDefinition.hasCompression()) { - keyBytes = keyCompressor.deflate(keyBytes); - } - - if(valueSerializerDefinition.hasCompression()) { - valBytes = valueCompressor.deflate(valBytes); - } - - // Get the output byte arrays ready to populate - byte[] outputValue; - BytesWritable outputKey; - - // Leave initial offset for (a) node id (b) partition id - // since they are written later - int offsetTillNow = 2 * ByteUtils.SIZE_OF_INT; - - if(getSaveKeys()) { - - // In order - 4 ( for node id ) + 4 ( partition id ) + 1 ( replica - // type - primary | secondary | tertiary... ] + 4 ( key size ) - // size ) + 4 ( value size ) + key + value - outputValue = new byte[valBytes.length + keyBytes.length + ByteUtils.SIZE_OF_BYTE + 4 - * ByteUtils.SIZE_OF_INT]; - - // Write key length - leave byte for replica type - offsetTillNow += ByteUtils.SIZE_OF_BYTE; - ByteUtils.writeInt(outputValue, keyBytes.length, offsetTillNow); - - // Write value length - offsetTillNow += ByteUtils.SIZE_OF_INT; - ByteUtils.writeInt(outputValue, valBytes.length, offsetTillNow); - - // Write key - offsetTillNow += ByteUtils.SIZE_OF_INT; - System.arraycopy(keyBytes, 0, outputValue, offsetTillNow, keyBytes.length); - - // Write value - offsetTillNow += keyBytes.length; - System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length); - - // Generate MR key - upper 8 bytes of 16 byte md5 - outputKey = new BytesWritable(ByteUtils.copy(md5er.digest(keyBytes), - 0, - 2 * ByteUtils.SIZE_OF_INT)); - - } else { - - // In order - 4 ( for node id ) + 4 ( partition id ) + value - outputValue = new byte[valBytes.length + 2 * ByteUtils.SIZE_OF_INT]; - - // Write value - System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length); - - // Generate MR key - 16 byte md5 - outputKey = new BytesWritable(md5er.digest(keyBytes)); - - } - - // Generate partition and node list this key is destined for - List partitionList = routingStrategy.getPartitionList(keyBytes); - Node[] partitionToNode = routingStrategy.getPartitionToNode(); - - for(int replicaType = 0; replicaType < partitionList.size(); replicaType++) { - - // Node id - ByteUtils.writeInt(outputValue, - partitionToNode[partitionList.get(replicaType)].getId(), - 0); - - if(getSaveKeys()) { - // Primary partition id - ByteUtils.writeInt(outputValue, partitionList.get(0), ByteUtils.SIZE_OF_INT); - - // Replica type - ByteUtils.writeBytes(outputValue, - replicaType, - 2 * ByteUtils.SIZE_OF_INT, - ByteUtils.SIZE_OF_BYTE); - } else { - // Partition id - ByteUtils.writeInt(outputValue, - partitionList.get(replicaType), - ByteUtils.SIZE_OF_INT); - } - BytesWritable outputVal = new BytesWritable(outputValue); + MapperKeyValueWriter mapWriter = new MapperKeyValueWriter(); + + List mapperList = mapWriter.map(routingStrategy, + keySerializer, + valueSerializer, + valueCompressor, + keyCompressor, + keySerializerDefinition, + valueSerializerDefinition, + keyBytes, + valBytes, + getSaveKeys(), + md5er); + + for(int i = 0; i < mapperList.size(); i++) { + voldemort.utils.Pair pair = (voldemort.utils.Pair) mapperList.get(i); + BytesWritable outputKey = pair.getFirst(); + BytesWritable outputVal = pair.getSecond(); output.collect(outputKey, outputVal); - } + md5er.reset(); } diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderMapper.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderMapper.java index b8fac151df..6dc468af91 100644 --- a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderMapper.java +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderMapper.java @@ -10,7 +10,6 @@ import org.apache.avro.mapred.AvroCollector; import org.apache.avro.mapred.AvroMapper; import org.apache.avro.mapred.Pair; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConfigurable; @@ -18,7 +17,6 @@ import voldemort.VoldemortException; import voldemort.cluster.Cluster; -import voldemort.cluster.Node; import voldemort.routing.ConsistentRoutingStrategy; import voldemort.serialization.DefaultSerializerFactory; import voldemort.serialization.SerializerDefinition; @@ -27,17 +25,17 @@ import voldemort.store.StoreDefinition; import voldemort.store.compress.CompressionStrategy; import voldemort.store.compress.CompressionStrategyFactory; -import voldemort.store.readonly.mr.azkaban.StoreBuilderTransformation; import voldemort.store.readonly.mr.utils.HadoopUtils; +import voldemort.store.readonly.mr.utils.MapperKeyValueWriter; import voldemort.utils.ByteUtils; import voldemort.xml.ClusterMapper; import voldemort.xml.StoreDefinitionsMapper; import azkaban.common.utils.Props; -import azkaban.common.utils.Utils; -// Avro container files are not sequence input format files -// they contain records instead of k/v pairs -// to consume these files we use the AvroMapper +/* + * Avro container files are not sequence input format files they contain records + * instead of k/v pairs to consume these files we use the AvroMapper + */ public class AvroStoreBuilderMapper extends AvroMapper> implements JobConfigurable { @@ -52,28 +50,19 @@ public class AvroStoreBuilderMapper extends private String keyField; private String valField; - private String _keySelection; - private String _valSelection; - - private StoreBuilderTransformation _keyTrans; - private StoreBuilderTransformation _valTrans; - private CompressionStrategy valueCompressor; private CompressionStrategy keyCompressor; private SerializerDefinition keySerializerDefinition; private SerializerDefinition valueSerializerDefinition; - // Path path = new Path(fileName); - FSDataOutputStream outputStream; - /** - * Create the voldemort key and value from the input key and value and map - * it out for each of the responsible voldemort nodes + * Create the voldemort key and value from the input Avro record by + * extracting the key and value and map it out for each of the responsible + * voldemort nodes + * * - * The output key is the md5 of the serialized key returned by makeKey(). * The output value is the node_id & partition_id of the responsible node - * followed by serialized value returned by makeValue() OR if we have - * setKeys flag on the serialized key and serialized value + * followed by serialized value */ @Override public void map(GenericData.Record record, @@ -83,96 +72,25 @@ public void map(GenericData.Record record, byte[] keyBytes = keySerializer.toBytes(record.get(keyField)); byte[] valBytes = valueSerializer.toBytes(record.get(valField)); - // Compress key and values if required - if(keySerializerDefinition.hasCompression()) { - keyBytes = keyCompressor.deflate(keyBytes); - } - - if(valueSerializerDefinition.hasCompression()) { - valBytes = valueCompressor.deflate(valBytes); - } - - // Get the output byte arrays ready to populate - byte[] outputValue; - BytesWritable outputKey; - - // Leave initial offset for (a) node id (b) partition id - // since they are written later - int offsetTillNow = 2 * ByteUtils.SIZE_OF_INT; - - if(getSaveKeys()) { - - // In order - 4 ( for node id ) + 4 ( partition id ) + 1 ( - // replica - // type - primary | secondary | tertiary... ] + 4 ( key size ) - // size ) + 4 ( value size ) + key + value - outputValue = new byte[valBytes.length + keyBytes.length + ByteUtils.SIZE_OF_BYTE + 4 - * ByteUtils.SIZE_OF_INT]; - - // Write key length - leave byte for replica type - offsetTillNow += ByteUtils.SIZE_OF_BYTE; - ByteUtils.writeInt(outputValue, keyBytes.length, offsetTillNow); - - // Write value length - offsetTillNow += ByteUtils.SIZE_OF_INT; - ByteUtils.writeInt(outputValue, valBytes.length, offsetTillNow); - - // Write key - offsetTillNow += ByteUtils.SIZE_OF_INT; - System.arraycopy(keyBytes, 0, outputValue, offsetTillNow, keyBytes.length); + MapperKeyValueWriter mapWriter = new MapperKeyValueWriter(); + + List mapperList = mapWriter.map(routingStrategy, + keySerializer, + valueSerializer, + valueCompressor, + keyCompressor, + keySerializerDefinition, + valueSerializerDefinition, + keyBytes, + valBytes, + getSaveKeys(), + md5er); + + for(int i = 0; i < mapperList.size(); i++) { + voldemort.utils.Pair pair = (voldemort.utils.Pair) mapperList.get(i); + BytesWritable outputKey = pair.getFirst(); + BytesWritable outputVal = pair.getSecond(); - // Write value - offsetTillNow += keyBytes.length; - System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length); - - // Generate MR key - upper 8 bytes of 16 byte md5 - outputKey = new BytesWritable(ByteUtils.copy(md5er.digest(keyBytes), - 0, - 2 * ByteUtils.SIZE_OF_INT)); - - } else { - - // In order - 4 ( for node id ) + 4 ( partition id ) + value - outputValue = new byte[valBytes.length + 2 * ByteUtils.SIZE_OF_INT]; - - // Write value - System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length); - - // Generate MR key - 16 byte md5 - outputKey = new BytesWritable(md5er.digest(keyBytes)); - - } - - // Generate partition and node list this key is destined for - List partitionList = routingStrategy.getPartitionList(keyBytes); - Node[] partitionToNode = routingStrategy.getPartitionToNode(); - - for(int replicaType = 0; replicaType < partitionList.size(); replicaType++) { - - // Node id - ByteUtils.writeInt(outputValue, - partitionToNode[partitionList.get(replicaType)].getId(), - 0); - - if(getSaveKeys()) { - // Primary partition id - ByteUtils.writeInt(outputValue, partitionList.get(0), ByteUtils.SIZE_OF_INT); - - // Replica type - ByteUtils.writeBytes(outputValue, - replicaType, - 2 * ByteUtils.SIZE_OF_INT, - ByteUtils.SIZE_OF_BYTE); - } else { - // Partition id - ByteUtils.writeInt(outputValue, - partitionList.get(replicaType), - ByteUtils.SIZE_OF_INT); - } - BytesWritable outputVal = new BytesWritable(outputValue); - - // System.out.println("collect length (K/V): "+ - // outputKey.getLength()+ " , " + outputVal.getLength()); ByteBuffer keyBuffer = null, valueBuffer = null; byte[] md5KeyBytes = outputKey.getBytes(); @@ -180,6 +98,7 @@ public void map(GenericData.Record record, keyBuffer.put(md5KeyBytes); keyBuffer.rewind(); + byte[] outputValue = outputVal.getBytes(); valueBuffer = ByteBuffer.allocate(outputValue.length); valueBuffer.put(outputValue); valueBuffer.rewind(); @@ -189,6 +108,7 @@ public void map(GenericData.Record record, collector.collect(p); } + md5er.reset(); } @@ -246,16 +166,6 @@ public void configure(JobConf conf) { // / Props props = HadoopUtils.getPropsFromJob(conf); - _keySelection = props.getString("key.selection", null); - _valSelection = props.getString("value.selection", null); - - String _keyTransClass = props.getString("key.transformation.class", null); - String _valueTransClass = props.getString("value.transformation.class", null); - - if(_keyTransClass != null) - _keyTrans = (StoreBuilderTransformation) Utils.callConstructor(_keyTransClass); - if(_valueTransClass != null) - _valTrans = (StoreBuilderTransformation) Utils.callConstructor(_valueTransClass); } private int numChunks; diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderPartitioner.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderPartitioner.java index 6c1c5a9dc2..573de222c6 100644 --- a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderPartitioner.java +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderPartitioner.java @@ -23,15 +23,13 @@ import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapred.AvroValue; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Partitioner; import voldemort.VoldemortException; import voldemort.cluster.Cluster; import voldemort.store.StoreDefinition; -import voldemort.store.readonly.ReadOnlyUtils; -import voldemort.utils.ByteUtils; +import voldemort.store.readonly.mr.utils.KeyValuePartitioner; import voldemort.xml.ClusterMapper; import voldemort.xml.StoreDefinitionsMapper; @@ -54,9 +52,6 @@ public int getPartition(AvroKey key, AvroValue value, in valueBytes = new byte[value.datum().remaining()]; value.datum().get(valueBytes); - BytesWritable outputKey = new BytesWritable(keyBytes); - BytesWritable outputVal = new BytesWritable(valueBytes); - ByteBuffer keyBuffer = null, valueBuffer = null; keyBuffer = ByteBuffer.allocate(keyBytes.length); @@ -70,28 +65,16 @@ public int getPartition(AvroKey key, AvroValue value, in key.datum(keyBuffer); value.datum(valueBuffer); - int partitionId = ByteUtils.readInt(valueBytes, ByteUtils.SIZE_OF_INT); - int chunkId = ReadOnlyUtils.chunk(keyBytes, getNumChunks()); - if(getSaveKeys()) { - int replicaType = (int) ByteUtils.readBytes(valueBytes, - 2 * ByteUtils.SIZE_OF_INT, - ByteUtils.SIZE_OF_BYTE); - if(getReducerPerBucket()) { - return (partitionId * getStoreDef().getReplicationFactor() + replicaType) - % numReduceTasks; - } else { - return ((partitionId * getStoreDef().getReplicationFactor() * getNumChunks()) - + (replicaType * getNumChunks()) + chunkId) - % numReduceTasks; - } - } else { - if(getReducerPerBucket()) { - return partitionId % numReduceTasks; - } else { - return (partitionId * getNumChunks() + chunkId) % numReduceTasks; - } - - } + KeyValuePartitioner partitioner = new KeyValuePartitioner(); + + return partitioner.getPartition(keyBytes, + valueBytes, + saveKeys, + reducerPerBucket, + storeDef, + numReduceTasks, + numReduceTasks); + } private int numChunks; diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilder.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilder.java index 117b72ece0..c1e3c9c70f 100644 --- a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilder.java +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilder.java @@ -87,6 +87,8 @@ public class HadoopStoreBuilder { private boolean reducerPerBucket = false; private int numChunks = -1; + private boolean isAvro; + /** * Kept for backwards compatibility. We do not use replicationFactor any * more since it is derived from the store definition @@ -160,6 +162,7 @@ public HadoopStoreBuilder(Configuration conf, this.chunkSizeBytes = chunkSizeBytes; this.tempDir = tempDir; this.outputDir = Utils.notNull(outputDir); + isAvro = false; if(chunkSizeBytes > MAX_CHUNK_SIZE || chunkSizeBytes < MIN_CHUNK_SIZE) throw new VoldemortException("Invalid chunk size, chunk size must be in the range " + MIN_CHUNK_SIZE + "..." + MAX_CHUNK_SIZE); @@ -297,6 +300,7 @@ public HadoopStoreBuilder(Configuration conf, this.saveKeys = saveKeys; this.reducerPerBucket = reducerPerBucket; this.numChunks = numChunks; + isAvro = false; if(numChunks <= 0) throw new VoldemortException("Number of chunks should be greater than zero"); } @@ -313,14 +317,16 @@ public void build() { new StoreDefinitionsMapper().writeStoreList(Collections.singletonList(storeDef))); conf.setBoolean("save.keys", saveKeys); conf.setBoolean("reducer.per.bucket", reducerPerBucket); - conf.setPartitionerClass(HadoopStoreBuilderPartitioner.class); - conf.setMapperClass(mapperClass); - conf.setMapOutputKeyClass(BytesWritable.class); - conf.setMapOutputValueClass(BytesWritable.class); - if(reducerPerBucket) { - conf.setReducerClass(HadoopStoreBuilderReducerPerBucket.class); - } else { - conf.setReducerClass(HadoopStoreBuilderReducer.class); + if(!isAvro) { + conf.setPartitionerClass(HadoopStoreBuilderPartitioner.class); + conf.setMapperClass(mapperClass); + conf.setMapOutputKeyClass(BytesWritable.class); + conf.setMapOutputValueClass(BytesWritable.class); + if(reducerPerBucket) { + conf.setReducerClass(HadoopStoreBuilderReducerPerBucket.class); + } else { + conf.setReducerClass(HadoopStoreBuilderReducer.class); + } } conf.setInputFormat(inputFormatClass); conf.setOutputFormat(SequenceFileOutputFormat.class); @@ -387,220 +393,38 @@ public void build() { conf.setInt("num.chunks", numChunks); conf.setNumReduceTasks(numReducers); - logger.info("Number of chunks: " + numChunks + ", number of reducers: " + numReducers - + ", save keys: " + saveKeys + ", reducerPerBucket: " + reducerPerBucket); - logger.info("Building store..."); - RunningJob job = JobClient.runJob(conf); - - // Once the job has completed log the counter - Counters counters = job.getCounters(); - - if(saveKeys) { - if(reducerPerBucket) { - logger.info("Number of collisions in the job - " - + counters.getCounter(KeyValueWriter.CollisionCounter.NUM_COLLISIONS)); - logger.info("Maximum number of collisions for one entry - " - + counters.getCounter(KeyValueWriter.CollisionCounter.MAX_COLLISIONS)); - } else { - logger.info("Number of collisions in the job - " - + counters.getCounter(KeyValueWriter.CollisionCounter.NUM_COLLISIONS)); - logger.info("Maximum number of collisions for one entry - " - + counters.getCounter(KeyValueWriter.CollisionCounter.MAX_COLLISIONS)); - } - } - - // Do a CheckSumOfCheckSum - Similar to HDFS - CheckSum checkSumGenerator = CheckSum.getInstance(this.checkSumType); - if(!this.checkSumType.equals(CheckSumType.NONE) && checkSumGenerator == null) { - throw new VoldemortException("Could not generate checksum digest for type " - + this.checkSumType); - } - - // Check if all folder exists and with format file - for(Node node: cluster.getNodes()) { - - ReadOnlyStorageMetadata metadata = new ReadOnlyStorageMetadata(); - - if(saveKeys) { - metadata.add(ReadOnlyStorageMetadata.FORMAT, - ReadOnlyStorageFormat.READONLY_V2.getCode()); - } else { - metadata.add(ReadOnlyStorageMetadata.FORMAT, - ReadOnlyStorageFormat.READONLY_V1.getCode()); - } - - Path nodePath = new Path(outputDir.toString(), "node-" + node.getId()); - - if(!outputFs.exists(nodePath)) { - logger.info("No data generated for node " + node.getId() - + ". Generating empty folder"); - outputFs.mkdirs(nodePath); // Create empty folder - } + if(isAvro) { + conf.setPartitionerClass(AvroStoreBuilderPartitioner.class); + // conf.setMapperClass(mapperClass); + conf.setMapOutputKeyClass(ByteBuffer.class); + conf.setMapOutputValueClass(ByteBuffer.class); - if(checkSumType != CheckSumType.NONE) { + conf.setInputFormat(inputFormatClass); - FileStatus[] storeFiles = outputFs.listStatus(nodePath, new PathFilter() { + conf.setOutputFormat((Class) AvroOutputFormat.class); + conf.setOutputKeyClass(ByteBuffer.class); + conf.setOutputValueClass(ByteBuffer.class); - public boolean accept(Path arg0) { - if(arg0.getName().endsWith("checksum") - && !arg0.getName().startsWith(".")) { - return true; - } - return false; - } - }); + // AvroJob confs for the avro mapper + AvroJob.setInputSchema(conf, Schema.parse(config.get("avro.rec.schema"))); - if(storeFiles != null && storeFiles.length > 0) { - Arrays.sort(storeFiles, new IndexFileLastComparator()); - FSDataInputStream input = null; + AvroJob.setOutputSchema(conf, + Pair.getPairSchema(Schema.create(Schema.Type.BYTES), + Schema.create(Schema.Type.BYTES))); - for(FileStatus file: storeFiles) { - try { - input = outputFs.open(file.getPath()); - byte fileCheckSum[] = new byte[CheckSum.checkSumLength(this.checkSumType)]; - input.read(fileCheckSum); - logger.debug("Checksum for file " + file.toString() + " - " - + new String(Hex.encodeHex(fileCheckSum))); - checkSumGenerator.update(fileCheckSum); - } catch(Exception e) { - logger.error("Error while reading checksum file " + e.getMessage(), - e); - } finally { - if(input != null) - input.close(); - } - outputFs.delete(file.getPath(), false); - } - - metadata.add(ReadOnlyStorageMetadata.CHECKSUM_TYPE, - CheckSum.toString(checkSumType)); - - String checkSum = new String(Hex.encodeHex(checkSumGenerator.getCheckSum())); - logger.info("Checksum for node " + node.getId() + " - " + checkSum); - - metadata.add(ReadOnlyStorageMetadata.CHECKSUM, checkSum); - } - } - - // Write metadata - FSDataOutputStream metadataStream = outputFs.create(new Path(nodePath, ".metadata")); - metadataStream.write(metadata.toJsonString().getBytes()); - metadataStream.flush(); - metadataStream.close(); - - } - - } catch(Exception e) { - logger.error("Error in Store builder", e); - throw new VoldemortException(e); - } - - } - - /** - * Run the job - */ - public void buildAvro() { - try { - JobConf conf = new JobConf(config); - conf.setInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE); - conf.set("cluster.xml", new ClusterMapper().writeCluster(cluster)); - conf.set("stores.xml", - new StoreDefinitionsMapper().writeStoreList(Collections.singletonList(storeDef))); - conf.setBoolean("save.keys", saveKeys); - conf.setBoolean("reducer.per.bucket", reducerPerBucket); - conf.setPartitionerClass(AvroStoreBuilderPartitioner.class); - // conf.setMapperClass(mapperClass); - conf.setMapOutputKeyClass(ByteBuffer.class); - conf.setMapOutputValueClass(ByteBuffer.class); - - conf.setInputFormat(inputFormatClass); - - conf.setOutputFormat((Class) AvroOutputFormat.class); - conf.setOutputKeyClass(ByteBuffer.class); - conf.setOutputValueClass(ByteBuffer.class); - conf.setJarByClass(getClass()); - conf.setReduceSpeculativeExecution(false); - FileInputFormat.setInputPaths(conf, inputPath); - conf.set("final.output.dir", outputDir.toString()); - conf.set("checksum.type", CheckSum.toString(checkSumType)); - FileOutputFormat.setOutputPath(conf, tempDir); - - FileSystem outputFs = outputDir.getFileSystem(conf); - if(outputFs.exists(outputDir)) { - throw new IOException("Final output directory already exists."); - } - - // delete output dir if it already exists - FileSystem tempFs = tempDir.getFileSystem(conf); - tempFs.delete(tempDir, true); - - long size = sizeOfPath(tempFs, inputPath); - logger.info("Data size = " + size + ", replication factor = " - + storeDef.getReplicationFactor() + ", numNodes = " - + cluster.getNumberOfNodes() + ", chunk size = " + chunkSizeBytes); - - // Derive "rough" number of chunks and reducers - int numReducers; - if(saveKeys) { - - if(this.numChunks == -1) { - this.numChunks = Math.max((int) (storeDef.getReplicationFactor() * size - / cluster.getNumberOfPartitions() - / storeDef.getReplicationFactor() / chunkSizeBytes), - 1); - } else { - logger.info("Overriding chunk size byte and taking num chunks (" - + this.numChunks + ") directly"); - } - - if(reducerPerBucket) { - numReducers = cluster.getNumberOfPartitions() * storeDef.getReplicationFactor(); - } else { - numReducers = cluster.getNumberOfPartitions() * storeDef.getReplicationFactor() - * numChunks; - } - } else { - - if(this.numChunks == -1) { - this.numChunks = Math.max((int) (storeDef.getReplicationFactor() * size - / cluster.getNumberOfPartitions() / chunkSizeBytes), - 1); - } else { - logger.info("Overriding chunk size byte and taking num chunks (" - + this.numChunks + ") directly"); - } + AvroJob.setMapperClass(conf, mapperClass); if(reducerPerBucket) { - numReducers = cluster.getNumberOfPartitions(); + conf.setReducerClass(AvroStoreBuilderReducerPerBucket.class); } else { - numReducers = cluster.getNumberOfPartitions() * numChunks; + conf.setReducerClass(AvroStoreBuilderReducer.class); } - } - conf.setInt("num.chunks", numChunks); - conf.setNumReduceTasks(numReducers); - - conf.setSpeculativeExecution(false); - - // AvroJob confs for the avro mapper - AvroJob.setInputSchema(conf, Schema.parse(config.get("avro.rec.schema"))); - AvroJob.setOutputSchema(conf, - Pair.getPairSchema(Schema.create(Schema.Type.BYTES), - Schema.create(Schema.Type.BYTES))); - - AvroJob.setMapperClass(conf, mapperClass); - - if(reducerPerBucket) { - conf.setReducerClass(AvroStoreBuilderReducerPerBucket.class); - } else { - conf.setReducerClass(AvroStoreBuilderReducer.class); } logger.info("Number of chunks: " + numChunks + ", number of reducers: " + numReducers + ", save keys: " + saveKeys + ", reducerPerBucket: " + reducerPerBucket); logger.info("Building store..."); - RunningJob job = JobClient.runJob(conf); // Once the job has completed log the counter @@ -652,7 +476,6 @@ public void buildAvro() { FileStatus[] storeFiles = outputFs.listStatus(nodePath, new PathFilter() { - @Override public boolean accept(Path arg0) { if(arg0.getName().endsWith("checksum") && !arg0.getName().startsWith(".")) { @@ -709,6 +532,17 @@ public boolean accept(Path arg0) { } + /** + * Run the job + */ + public void buildAvro() { + + isAvro = true; + build(); + return; + + } + /** * A comparator that sorts index files last. This is required to maintain * the order while calculating checksum diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderPartitioner.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderPartitioner.java index 6d40611280..2e7f31c1dc 100644 --- a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderPartitioner.java +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderPartitioner.java @@ -19,8 +19,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.Partitioner; -import voldemort.store.readonly.ReadOnlyUtils; -import voldemort.utils.ByteUtils; +import voldemort.store.readonly.mr.utils.KeyValuePartitioner; /** * A Partitioner that splits data so that all data for the same nodeId, chunkId @@ -31,27 +30,17 @@ public class HadoopStoreBuilderPartitioner extends AbstractStoreBuilderConfigura Partitioner { public int getPartition(BytesWritable key, BytesWritable value, int numReduceTasks) { - int partitionId = ByteUtils.readInt(value.get(), ByteUtils.SIZE_OF_INT); - int chunkId = ReadOnlyUtils.chunk(key.get(), getNumChunks()); - if(getSaveKeys()) { - int replicaType = (int) ByteUtils.readBytes(value.get(), - 2 * ByteUtils.SIZE_OF_INT, - ByteUtils.SIZE_OF_BYTE); - if(getReducerPerBucket()) { - return (partitionId * getStoreDef().getReplicationFactor() + replicaType) - % numReduceTasks; - } else { - return ((partitionId * getStoreDef().getReplicationFactor() * getNumChunks()) - + (replicaType * getNumChunks()) + chunkId) - % numReduceTasks; - } - } else { - if(getReducerPerBucket()) { - return partitionId % numReduceTasks; - } else { - return (partitionId * getNumChunks() + chunkId) % numReduceTasks; - } - } + byte[] keyBytes = key.get(); + byte[] valueBytes = value.get(); + KeyValuePartitioner partitioner = new KeyValuePartitioner(); + return partitioner.getPartition(keyBytes, + valueBytes, + getSaveKeys(), + getReducerPerBucket(), + getStoreDef(), + numReduceTasks, + numReduceTasks); + } } diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortBuildAndPushJob.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortBuildAndPushJob.java index d6f584e083..f56e66039b 100644 --- a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortBuildAndPushJob.java +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortBuildAndPushJob.java @@ -109,9 +109,18 @@ public VoldemortBuildAndPushJob(String name, Props props) { isAvroJob = props.getBoolean("build.type.avro", false); - keyField = props.getString("avro.key.field", "key"); + keyField = props.getString("avro.key.field", null); - valueField = props.getString("avro.value.field", "value"); + valueField = props.getString("avro.value.field", null); + + if(isAvroJob) { + if(keyField == null) + throw new RuntimeException("The key field must be specified in the properties for the Avro build and push job!"); + + if(valueField == null) + throw new RuntimeException("The value field must be specified in the properties for the Avro build and push job!"); + + } }