forked from voldemort/voldemort
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
250 additions
and
0 deletions.
There are no files selected for viewing
73 changes: 73 additions & 0 deletions
73
contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/JobState.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
package voldemort.store.readonly.mr; | ||
|
||
import java.io.IOException; | ||
import java.io.StringReader; | ||
import java.util.List; | ||
|
||
import org.apache.hadoop.mapred.JobConf; | ||
|
||
import voldemort.VoldemortException; | ||
import voldemort.cluster.Cluster; | ||
import voldemort.store.StoreDefinition; | ||
import voldemort.xml.ClusterMapper; | ||
import voldemort.xml.StoreDefinitionsMapper; | ||
|
||
public class JobState { | ||
|
||
private int numChunks; | ||
private Cluster cluster; | ||
private StoreDefinition storeDef; | ||
private boolean saveKeys; | ||
private boolean reducerPerBucket; | ||
|
||
public void configure(JobConf conf) { | ||
this.cluster = new ClusterMapper().readCluster(new StringReader(conf.get("cluster.xml"))); | ||
List<StoreDefinition> storeDefs = new StoreDefinitionsMapper().readStoreList(new StringReader(conf.get("stores.xml"))); | ||
if(storeDefs.size() != 1) | ||
throw new IllegalStateException("Expected to find only a single store, but found multiple!"); | ||
this.storeDef = storeDefs.get(0); | ||
|
||
this.numChunks = conf.getInt("num.chunks", -1); | ||
if(this.numChunks < 1) | ||
throw new VoldemortException("num.chunks not specified in the job conf."); | ||
|
||
this.saveKeys = conf.getBoolean("save.keys", false); | ||
this.reducerPerBucket = conf.getBoolean("reducer.per.bucket", false); | ||
} | ||
|
||
@SuppressWarnings("unused") | ||
public void close() throws IOException {} | ||
|
||
public Cluster getCluster() { | ||
checkNotNull(cluster); | ||
return cluster; | ||
} | ||
|
||
public boolean getSaveKeys() { | ||
return this.saveKeys; | ||
} | ||
|
||
public boolean getReducerPerBucket() { | ||
return this.reducerPerBucket; | ||
} | ||
|
||
public StoreDefinition getStoreDef() { | ||
checkNotNull(storeDef); | ||
return storeDef; | ||
} | ||
|
||
public String getStoreName() { | ||
checkNotNull(storeDef); | ||
return storeDef.getName(); | ||
} | ||
|
||
private final void checkNotNull(Object o) { | ||
if(o == null) | ||
throw new VoldemortException("Not configured yet!"); | ||
} | ||
|
||
public int getNumChunks() { | ||
return this.numChunks; | ||
} | ||
|
||
} |
40 changes: 40 additions & 0 deletions
40
.../hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/KeyValuePartitioner.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package voldemort.store.readonly.mr.utils; | ||
|
||
import voldemort.store.StoreDefinition; | ||
import voldemort.store.readonly.ReadOnlyUtils; | ||
import voldemort.utils.ByteUtils; | ||
|
||
public class KeyValuePartitioner { | ||
|
||
public int getPartition(byte[] keyBytes, | ||
byte[] valueBytes, | ||
boolean saveKeys, | ||
boolean reducerPerBucket, | ||
StoreDefinition storeDef, | ||
int numChunks, | ||
int numReduceTasks) { | ||
int partitionId = ByteUtils.readInt(valueBytes, ByteUtils.SIZE_OF_INT); | ||
int chunkId = ReadOnlyUtils.chunk(keyBytes, numChunks); | ||
if(saveKeys) { | ||
int replicaType = (int) ByteUtils.readBytes(valueBytes, | ||
2 * ByteUtils.SIZE_OF_INT, | ||
ByteUtils.SIZE_OF_BYTE); | ||
if(reducerPerBucket) { | ||
return (partitionId * storeDef.getReplicationFactor() + replicaType) | ||
% numReduceTasks; | ||
} else { | ||
return ((partitionId * storeDef.getReplicationFactor() * numChunks) | ||
+ (replicaType * numChunks) + chunkId) | ||
% numReduceTasks; | ||
} | ||
} else { | ||
if(reducerPerBucket) { | ||
return partitionId % numReduceTasks; | ||
} else { | ||
return (partitionId * numChunks + chunkId) % numReduceTasks; | ||
} | ||
|
||
} | ||
} | ||
|
||
} |
137 changes: 137 additions & 0 deletions
137
...hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/MapperKeyValueWriter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
package voldemort.store.readonly.mr.utils; | ||
|
||
import java.io.IOException; | ||
import java.security.MessageDigest; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
import org.apache.hadoop.io.BytesWritable; | ||
|
||
import voldemort.cluster.Node; | ||
import voldemort.routing.ConsistentRoutingStrategy; | ||
import voldemort.serialization.Serializer; | ||
import voldemort.serialization.SerializerDefinition; | ||
import voldemort.store.compress.CompressionStrategy; | ||
import voldemort.utils.ByteUtils; | ||
import voldemort.utils.Pair; | ||
|
||
public class MapperKeyValueWriter { | ||
|
||
public BytesWritable getOutputKey() { | ||
return outputKey; | ||
} | ||
|
||
public BytesWritable getOutputValue() { | ||
return outputVal; | ||
} | ||
|
||
BytesWritable outputKey; | ||
BytesWritable outputVal; | ||
|
||
public List<Pair<BytesWritable, BytesWritable>> map(ConsistentRoutingStrategy routingStrategy, | ||
Serializer keySerializer, | ||
Serializer valueSerializer, | ||
CompressionStrategy valueCompressor, | ||
CompressionStrategy keyCompressor, | ||
SerializerDefinition keySerializerDefinition, | ||
SerializerDefinition valueSerializerDefinition, | ||
byte[] keyBytes, | ||
byte[] valBytes, | ||
boolean getSaveKeys, | ||
MessageDigest md5er) throws IOException { | ||
|
||
List outputList = new ArrayList(); | ||
// Compress key and values if required | ||
if(keySerializerDefinition.hasCompression()) { | ||
keyBytes = keyCompressor.deflate(keyBytes); | ||
} | ||
|
||
if(valueSerializerDefinition.hasCompression()) { | ||
valBytes = valueCompressor.deflate(valBytes); | ||
} | ||
|
||
// Get the output byte arrays ready to populate | ||
byte[] outputValue; | ||
|
||
// Leave initial offset for (a) node id (b) partition id | ||
// since they are written later | ||
int offsetTillNow = 2 * ByteUtils.SIZE_OF_INT; | ||
|
||
if(getSaveKeys) { | ||
|
||
// In order - 4 ( for node id ) + 4 ( partition id ) + 1 ( | ||
// replica | ||
// type - primary | secondary | tertiary... ] + 4 ( key size ) | ||
// size ) + 4 ( value size ) + key + value | ||
outputValue = new byte[valBytes.length + keyBytes.length + ByteUtils.SIZE_OF_BYTE + 4 | ||
* ByteUtils.SIZE_OF_INT]; | ||
|
||
// Write key length - leave byte for replica type | ||
offsetTillNow += ByteUtils.SIZE_OF_BYTE; | ||
ByteUtils.writeInt(outputValue, keyBytes.length, offsetTillNow); | ||
|
||
// Write value length | ||
offsetTillNow += ByteUtils.SIZE_OF_INT; | ||
ByteUtils.writeInt(outputValue, valBytes.length, offsetTillNow); | ||
|
||
// Write key | ||
offsetTillNow += ByteUtils.SIZE_OF_INT; | ||
System.arraycopy(keyBytes, 0, outputValue, offsetTillNow, keyBytes.length); | ||
|
||
// Write value | ||
offsetTillNow += keyBytes.length; | ||
System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length); | ||
|
||
// Generate MR key - upper 8 bytes of 16 byte md5 | ||
outputKey = new BytesWritable(ByteUtils.copy(md5er.digest(keyBytes), | ||
0, | ||
2 * ByteUtils.SIZE_OF_INT)); | ||
|
||
} else { | ||
|
||
// In order - 4 ( for node id ) + 4 ( partition id ) + value | ||
outputValue = new byte[valBytes.length + 2 * ByteUtils.SIZE_OF_INT]; | ||
|
||
// Write value | ||
System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length); | ||
|
||
// Generate MR key - 16 byte md5 | ||
outputKey = new BytesWritable(md5er.digest(keyBytes)); | ||
|
||
} | ||
|
||
// Generate partition and node list this key is destined for | ||
List<Integer> partitionList = routingStrategy.getPartitionList(keyBytes); | ||
Node[] partitionToNode = routingStrategy.getPartitionToNode(); | ||
|
||
for(int replicaType = 0; replicaType < partitionList.size(); replicaType++) { | ||
|
||
// Node id | ||
ByteUtils.writeInt(outputValue, | ||
partitionToNode[partitionList.get(replicaType)].getId(), | ||
0); | ||
|
||
if(getSaveKeys) { | ||
// Primary partition id | ||
ByteUtils.writeInt(outputValue, partitionList.get(0), ByteUtils.SIZE_OF_INT); | ||
|
||
// Replica type | ||
ByteUtils.writeBytes(outputValue, | ||
replicaType, | ||
2 * ByteUtils.SIZE_OF_INT, | ||
ByteUtils.SIZE_OF_BYTE); | ||
} else { | ||
// Partition id | ||
ByteUtils.writeInt(outputValue, | ||
partitionList.get(replicaType), | ||
ByteUtils.SIZE_OF_INT); | ||
} | ||
outputVal = new BytesWritable(outputValue); | ||
Pair<BytesWritable, BytesWritable> pair = Pair.create(outputKey, outputVal); | ||
outputList.add(pair); | ||
} | ||
|
||
return outputList; | ||
|
||
} | ||
} |