Skip to content

Commit

Permalink
Fixed mapper issue
Browse files Browse the repository at this point in the history
  • Loading branch information
abh1nay committed Nov 21, 2012
1 parent 84eda3a commit 60a987c
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 22 deletions.
4 changes: 2 additions & 2 deletions META-INF/MANIFEST.MF
@@ -1,7 +1,7 @@
Manifest-Version: 1.0
Ant-Version: Apache Ant 1.7.1
Created-By: 20.2-b06 (Sun Microsystems Inc.)
Created-By: 20.1-b02 (Sun Microsystems Inc.)
Implementation-Title: Voldemort
Implementation-Version: 1.1.1
Implementation-Version: 1.1.2
Implementation-Vendor: LinkedIn

Expand Up @@ -26,14 +26,14 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import voldemort.cluster.Node;
import voldemort.routing.ConsistentRoutingStrategy;
import voldemort.serialization.DefaultSerializerFactory;
import voldemort.serialization.Serializer;
import voldemort.serialization.SerializerDefinition;
import voldemort.serialization.SerializerFactory;
import voldemort.store.compress.CompressionStrategy;
import voldemort.store.compress.CompressionStrategyFactory;
import voldemort.store.readonly.mr.utils.MapperKeyValueWriter;
import voldemort.utils.ByteUtils;

/**
Expand Down Expand Up @@ -76,31 +76,100 @@ public void map(K key,
V value,
OutputCollector<BytesWritable, BytesWritable> output,
Reporter reporter) throws IOException {

byte[] keyBytes = keySerializer.toBytes(makeKey(key, value));
byte[] valBytes = valueSerializer.toBytes(makeValue(key, value));

MapperKeyValueWriter mapWriter = new MapperKeyValueWriter();

List mapperList = mapWriter.map(routingStrategy,
keySerializer,
valueSerializer,
valueCompressor,
keyCompressor,
keySerializerDefinition,
valueSerializerDefinition,
keyBytes,
valBytes,
getSaveKeys(),
md5er);

for(int i = 0; i < mapperList.size(); i++) {
voldemort.utils.Pair<BytesWritable, BytesWritable> pair = (voldemort.utils.Pair<BytesWritable, BytesWritable>) mapperList.get(i);
BytesWritable outputKey = pair.getFirst();
BytesWritable outputVal = pair.getSecond();
// Compress key and values if required
if(keySerializerDefinition.hasCompression()) {
keyBytes = keyCompressor.deflate(keyBytes);
}

output.collect(outputKey, outputVal);
if(valueSerializerDefinition.hasCompression()) {
valBytes = valueCompressor.deflate(valBytes);
}

// Get the output byte arrays ready to populate
byte[] outputValue;
BytesWritable outputKey;

// Leave initial offset for (a) node id (b) partition id
// since they are written later
int offsetTillNow = 2 * ByteUtils.SIZE_OF_INT;

if(getSaveKeys()) {

// In order - 4 ( for node id ) + 4 ( partition id ) + 1 ( replica
// type - primary | secondary | tertiary... ] + 4 ( key size )
// size ) + 4 ( value size ) + key + value
outputValue = new byte[valBytes.length + keyBytes.length + ByteUtils.SIZE_OF_BYTE + 4
* ByteUtils.SIZE_OF_INT];

// Write key length - leave byte for replica type
offsetTillNow += ByteUtils.SIZE_OF_BYTE;
ByteUtils.writeInt(outputValue, keyBytes.length, offsetTillNow);

// Write value length
offsetTillNow += ByteUtils.SIZE_OF_INT;
ByteUtils.writeInt(outputValue, valBytes.length, offsetTillNow);

// Write key
offsetTillNow += ByteUtils.SIZE_OF_INT;
System.arraycopy(keyBytes, 0, outputValue, offsetTillNow, keyBytes.length);

// Write value
offsetTillNow += keyBytes.length;
System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length);

// Generate MR key - upper 8 bytes of 16 byte md5
outputKey = new BytesWritable(ByteUtils.copy(md5er.digest(keyBytes),
0,
2 * ByteUtils.SIZE_OF_INT));

} else {

// In order - 4 ( for node id ) + 4 ( partition id ) + value
outputValue = new byte[valBytes.length + 2 * ByteUtils.SIZE_OF_INT];

// Write value
System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length);

// Generate MR key - 16 byte md5
outputKey = new BytesWritable(md5er.digest(keyBytes));

}

// Generate partition and node list this key is destined for
List<Integer> partitionList = routingStrategy.getPartitionList(keyBytes);
Node[] partitionToNode = routingStrategy.getPartitionToNode();

for(int replicaType = 0; replicaType < partitionList.size(); replicaType++) {

// Node id
ByteUtils.writeInt(outputValue,
partitionToNode[partitionList.get(replicaType)].getId(),
0);

if(getSaveKeys()) {
// Primary partition id
ByteUtils.writeInt(outputValue, partitionList.get(0), ByteUtils.SIZE_OF_INT);

// Replica type
ByteUtils.writeBytes(outputValue,
replicaType,
2 * ByteUtils.SIZE_OF_INT,
ByteUtils.SIZE_OF_BYTE);
} else {
// Partition id
ByteUtils.writeInt(outputValue,
partitionList.get(replicaType),
ByteUtils.SIZE_OF_INT);
}
BytesWritable outputVal = new BytesWritable(outputValue);

output.collect(outputKey, outputVal);

}
md5er.reset();
}

Expand Down
Expand Up @@ -120,6 +120,12 @@ public List<Pair<BytesWritable, BytesWritable>> map(ConsistentRoutingStrategy ro
List<Integer> partitionList = routingStrategy.getPartitionList(keyBytes);
Node[] partitionToNode = routingStrategy.getPartitionToNode();

System.out.println("num partitions the key maps to is :" + partitionList.size());

for(int replicaType = 0; replicaType < partitionList.size(); replicaType++) {
System.out.println("key is hashed to node:"
+ partitionToNode[partitionList.get(replicaType)]);
}
for(int replicaType = 0; replicaType < partitionList.size(); replicaType++) {

// Node id
Expand Down

0 comments on commit 60a987c

Please sign in to comment.