Skip to content

Commit

Permalink
Rfactoring mapper logic out
Browse files Browse the repository at this point in the history
  • Loading branch information
abh1nay committed Oct 4, 2012
1 parent d5f9c9c commit a63be3c
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 0 deletions.
@@ -0,0 +1,73 @@
package voldemort.store.readonly.mr;

import java.io.IOException;
import java.io.StringReader;
import java.util.List;

import org.apache.hadoop.mapred.JobConf;

import voldemort.VoldemortException;
import voldemort.cluster.Cluster;
import voldemort.store.StoreDefinition;
import voldemort.xml.ClusterMapper;
import voldemort.xml.StoreDefinitionsMapper;

public class JobState {

private int numChunks;
private Cluster cluster;
private StoreDefinition storeDef;
private boolean saveKeys;
private boolean reducerPerBucket;

public void configure(JobConf conf) {
this.cluster = new ClusterMapper().readCluster(new StringReader(conf.get("cluster.xml")));
List<StoreDefinition> storeDefs = new StoreDefinitionsMapper().readStoreList(new StringReader(conf.get("stores.xml")));
if(storeDefs.size() != 1)
throw new IllegalStateException("Expected to find only a single store, but found multiple!");
this.storeDef = storeDefs.get(0);

this.numChunks = conf.getInt("num.chunks", -1);
if(this.numChunks < 1)
throw new VoldemortException("num.chunks not specified in the job conf.");

this.saveKeys = conf.getBoolean("save.keys", false);
this.reducerPerBucket = conf.getBoolean("reducer.per.bucket", false);
}

@SuppressWarnings("unused")
public void close() throws IOException {}

public Cluster getCluster() {
checkNotNull(cluster);
return cluster;
}

public boolean getSaveKeys() {
return this.saveKeys;
}

public boolean getReducerPerBucket() {
return this.reducerPerBucket;
}

public StoreDefinition getStoreDef() {
checkNotNull(storeDef);
return storeDef;
}

public String getStoreName() {
checkNotNull(storeDef);
return storeDef.getName();
}

private final void checkNotNull(Object o) {
if(o == null)
throw new VoldemortException("Not configured yet!");
}

public int getNumChunks() {
return this.numChunks;
}

}
@@ -0,0 +1,40 @@
package voldemort.store.readonly.mr.utils;

import voldemort.store.StoreDefinition;
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.utils.ByteUtils;

public class KeyValuePartitioner {

public int getPartition(byte[] keyBytes,
byte[] valueBytes,
boolean saveKeys,
boolean reducerPerBucket,
StoreDefinition storeDef,
int numChunks,
int numReduceTasks) {
int partitionId = ByteUtils.readInt(valueBytes, ByteUtils.SIZE_OF_INT);
int chunkId = ReadOnlyUtils.chunk(keyBytes, numChunks);
if(saveKeys) {
int replicaType = (int) ByteUtils.readBytes(valueBytes,
2 * ByteUtils.SIZE_OF_INT,
ByteUtils.SIZE_OF_BYTE);
if(reducerPerBucket) {
return (partitionId * storeDef.getReplicationFactor() + replicaType)
% numReduceTasks;
} else {
return ((partitionId * storeDef.getReplicationFactor() * numChunks)
+ (replicaType * numChunks) + chunkId)
% numReduceTasks;
}
} else {
if(reducerPerBucket) {
return partitionId % numReduceTasks;
} else {
return (partitionId * numChunks + chunkId) % numReduceTasks;
}

}
}

}
@@ -0,0 +1,137 @@
package voldemort.store.readonly.mr.utils;

import java.io.IOException;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.BytesWritable;

import voldemort.cluster.Node;
import voldemort.routing.ConsistentRoutingStrategy;
import voldemort.serialization.Serializer;
import voldemort.serialization.SerializerDefinition;
import voldemort.store.compress.CompressionStrategy;
import voldemort.utils.ByteUtils;
import voldemort.utils.Pair;

public class MapperKeyValueWriter {

public BytesWritable getOutputKey() {
return outputKey;
}

public BytesWritable getOutputValue() {
return outputVal;
}

BytesWritable outputKey;
BytesWritable outputVal;

public List<Pair<BytesWritable, BytesWritable>> map(ConsistentRoutingStrategy routingStrategy,
Serializer keySerializer,
Serializer valueSerializer,
CompressionStrategy valueCompressor,
CompressionStrategy keyCompressor,
SerializerDefinition keySerializerDefinition,
SerializerDefinition valueSerializerDefinition,
byte[] keyBytes,
byte[] valBytes,
boolean getSaveKeys,
MessageDigest md5er) throws IOException {

List outputList = new ArrayList();
// Compress key and values if required
if(keySerializerDefinition.hasCompression()) {
keyBytes = keyCompressor.deflate(keyBytes);
}

if(valueSerializerDefinition.hasCompression()) {
valBytes = valueCompressor.deflate(valBytes);
}

// Get the output byte arrays ready to populate
byte[] outputValue;

// Leave initial offset for (a) node id (b) partition id
// since they are written later
int offsetTillNow = 2 * ByteUtils.SIZE_OF_INT;

if(getSaveKeys) {

// In order - 4 ( for node id ) + 4 ( partition id ) + 1 (
// replica
// type - primary | secondary | tertiary... ] + 4 ( key size )
// size ) + 4 ( value size ) + key + value
outputValue = new byte[valBytes.length + keyBytes.length + ByteUtils.SIZE_OF_BYTE + 4
* ByteUtils.SIZE_OF_INT];

// Write key length - leave byte for replica type
offsetTillNow += ByteUtils.SIZE_OF_BYTE;
ByteUtils.writeInt(outputValue, keyBytes.length, offsetTillNow);

// Write value length
offsetTillNow += ByteUtils.SIZE_OF_INT;
ByteUtils.writeInt(outputValue, valBytes.length, offsetTillNow);

// Write key
offsetTillNow += ByteUtils.SIZE_OF_INT;
System.arraycopy(keyBytes, 0, outputValue, offsetTillNow, keyBytes.length);

// Write value
offsetTillNow += keyBytes.length;
System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length);

// Generate MR key - upper 8 bytes of 16 byte md5
outputKey = new BytesWritable(ByteUtils.copy(md5er.digest(keyBytes),
0,
2 * ByteUtils.SIZE_OF_INT));

} else {

// In order - 4 ( for node id ) + 4 ( partition id ) + value
outputValue = new byte[valBytes.length + 2 * ByteUtils.SIZE_OF_INT];

// Write value
System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length);

// Generate MR key - 16 byte md5
outputKey = new BytesWritable(md5er.digest(keyBytes));

}

// Generate partition and node list this key is destined for
List<Integer> partitionList = routingStrategy.getPartitionList(keyBytes);
Node[] partitionToNode = routingStrategy.getPartitionToNode();

for(int replicaType = 0; replicaType < partitionList.size(); replicaType++) {

// Node id
ByteUtils.writeInt(outputValue,
partitionToNode[partitionList.get(replicaType)].getId(),
0);

if(getSaveKeys) {
// Primary partition id
ByteUtils.writeInt(outputValue, partitionList.get(0), ByteUtils.SIZE_OF_INT);

// Replica type
ByteUtils.writeBytes(outputValue,
replicaType,
2 * ByteUtils.SIZE_OF_INT,
ByteUtils.SIZE_OF_BYTE);
} else {
// Partition id
ByteUtils.writeInt(outputValue,
partitionList.get(replicaType),
ByteUtils.SIZE_OF_INT);
}
outputVal = new BytesWritable(outputValue);
Pair<BytesWritable, BytesWritable> pair = Pair.create(outputKey, outputVal);
outputList.add(pair);
}

return outputList;

}
}

0 comments on commit a63be3c

Please sign in to comment.