Skip to content

Commit

Permalink
Adding Partition Scan support for rebalancing
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Oct 19, 2012
1 parent 98182b6 commit 68b31b9
Show file tree
Hide file tree
Showing 5 changed files with 327 additions and 18 deletions.
11 changes: 11 additions & 0 deletions src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -195,6 +195,7 @@ public class VoldemortConfig implements Serializable {
private long rebalancingTimeoutSec;
private int maxParallelStoresRebalancing;
private boolean rebalancingOptimization;
private boolean usePartitionScanForRebalance;

public VoldemortConfig(Properties props) {
this(new Props(props));
Expand Down Expand Up @@ -405,6 +406,8 @@ public VoldemortConfig(Props props) {
this.rebalancingTimeoutSec = props.getLong("rebalancing.timeout.seconds", 10 * 24 * 60 * 60);
this.maxParallelStoresRebalancing = props.getInt("max.parallel.stores.rebalancing", 3);
this.rebalancingOptimization = props.getBoolean("rebalancing.optimization", true);
this.usePartitionScanForRebalance = props.getBoolean("use.partition.scan.for.rebalance",
true);

this.failureDetectorImplementation = props.getString("failuredetector.implementation",
FailureDetectorConfig.DEFAULT_IMPLEMENTATION_CLASS_NAME);
Expand Down Expand Up @@ -1646,6 +1649,14 @@ public void setMaxParallelStoresRebalancing(boolean rebalancingOptimization) {
this.rebalancingOptimization = rebalancingOptimization;
}

public boolean usePartitionScanForRebalance() {
return usePartitionScanForRebalance;
}

public void setUsePartitionScanForRebalance(boolean usePartitionScanForRebalance) {
this.usePartitionScanForRebalance = usePartitionScanForRebalance;
}

public boolean isEnableJmxClusterName() {
return enableJmxClusterName;
}
Expand Down
7 changes: 4 additions & 3 deletions src/java/voldemort/server/rebalance/Rebalancer.java
Expand Up @@ -298,8 +298,8 @@ private void changeCluster(final Cluster cluster) {
System.currentTimeMillis());
logger.info("Switching metadata from " + metadataStore.getCluster() + " to "
+ cluster + " [ " + updatedVectorClock + " ]");
metadataStore.put(MetadataStore.CLUSTER_KEY, Versioned.value((Object) cluster,
updatedVectorClock));
metadataStore.put(MetadataStore.CLUSTER_KEY,
Versioned.value((Object) cluster, updatedVectorClock));
} finally {
metadataStore.writeLock.unlock();
}
Expand Down Expand Up @@ -383,7 +383,8 @@ public int rebalanceNodeOnDonor(final List<RebalancePartitionsInfo> stealInfos)
voldemortConfig,
metadataStore,
requestId,
stealInfos));
stealInfos,
voldemortConfig.usePartitionScanForRebalance()));

return requestId;
}
Expand Down
Expand Up @@ -16,13 +16,15 @@

package voldemort.server.rebalance.async;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
Expand All @@ -40,6 +42,7 @@
import voldemort.server.VoldemortConfig;
import voldemort.server.rebalance.Rebalancer;
import voldemort.server.rebalance.VoldemortRebalancingException;
import voldemort.store.PartitionListIterator;
import voldemort.store.StorageEngine;
import voldemort.store.StoreDefinition;
import voldemort.store.metadata.MetadataStore;
Expand Down Expand Up @@ -78,6 +81,7 @@ public class DonorBasedRebalanceAsyncOperation extends RebalanceAsyncOperation {
private final AtomicBoolean running = new AtomicBoolean(true);
private final Cluster initialCluster;
private final Cluster targetCluster;
private final boolean usePartitionScan;

private final HashMultimap<String, Pair<Integer, HashMap<Integer, List<Integer>>>> storeToNodePartitionMapping;

Expand All @@ -103,13 +107,15 @@ public DonorBasedRebalanceAsyncOperation(Rebalancer rebalancer,
VoldemortConfig voldemortConfig,
MetadataStore metadataStore,
int requestId,
List<RebalancePartitionsInfo> stealInfos) {
List<RebalancePartitionsInfo> stealInfos,
boolean usePartitionScan) {
super(rebalancer, voldemortConfig, metadataStore, requestId, "Donor based rebalance : "
+ stealInfos);
this.storeRepository = storeRepository;
this.stealInfos = stealInfos;
this.targetCluster = metadataStore.getCluster();
this.initialCluster = stealInfos.get(0).getInitialCluster();
this.usePartitionScan = usePartitionScan;

// Group the plans by the store names
this.storeToNodePartitionMapping = groupByStores(stealInfos);
Expand Down Expand Up @@ -293,11 +299,18 @@ public Thread newThread(Runnable r) {
logger.info("Started a thread for " + jobName);
}

fetchEntriesForStealers(storageEngine,
optimizedStealerNodeToMappingTuples,
storeDef,
nodeToQueue,
storeName);
if(usePartitionScan && storageEngine.isPartitionScanSupported())
fetchEntriesForStealersPartitionScan(storageEngine,
optimizedStealerNodeToMappingTuples,
storeDef,
nodeToQueue,
storeName);
else
fetchEntriesForStealers(storageEngine,
optimizedStealerNodeToMappingTuples,
storeDef,
nodeToQueue,
storeName);
}
}

Expand Down Expand Up @@ -340,18 +353,79 @@ private void fetchEntriesForStealers(StorageEngine<ByteArray, byte[], byte[]> st
}
}

private void fetchEntriesForStealersPartitionScan(StorageEngine<ByteArray, byte[], byte[]> storageEngine,
Set<Pair<Integer, HashMap<Integer, List<Integer>>>> optimizedStealerNodeToMappingTuples,
StoreDefinition storeDef,
HashMap<Integer, SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>>> nodeToQueue,
String storeName) {
int scanned = 0;
int[] fetched = new int[targetCluster.getNumberOfNodes()];
long startTime = System.currentTimeMillis();

// construct a set of all the partitions we will be fetching
Set<Integer> partitionsToDonate = new HashSet<Integer>();
for(Pair<Integer, HashMap<Integer, List<Integer>>> nodePartitionMapPair: optimizedStealerNodeToMappingTuples) {
// for each of the nodes, add all the partitions requested
HashMap<Integer, List<Integer>> replicaToPartitionMap = nodePartitionMapPair.getSecond();
if(replicaToPartitionMap != null && replicaToPartitionMap.values() != null) {
for(List<Integer> partitions: replicaToPartitionMap.values())
if(partitions != null)
partitionsToDonate.addAll(partitions);
}
}

PartitionListIterator entries = new PartitionListIterator(storageEngine,
new ArrayList<Integer>(partitionsToDonate));

try {
while(running.get() && entries.hasNext()) {
Pair<ByteArray, Versioned<byte[]>> entry = entries.next();
ByteArray key = entry.getFirst();
Versioned<byte[]> value = entry.getSecond();

scanned++;
List<Integer> nodeIds = RebalanceUtils.checkKeyBelongsToPartition(key.get(),
optimizedStealerNodeToMappingTuples,
targetCluster,
storeDef);

if(nodeIds.size() > 0) {
putValue(nodeIds, key, value, nodeToQueue, fetched);
}

// print progress for every 100k entries.
if(0 == scanned % SCAN_PROGRESS_COUNT) {
printProgress(scanned, fetched, startTime, storeName);
}
}
terminateAllSlaves(storeName);
} catch(InterruptedException e) {
logger.info("InterruptedException received while sending entries to remote nodes, the process is terminating...");
terminateAllSlavesAsync(storeName);
} finally {
close(entries, storeName, scanned, fetched, startTime);
}
}

private void putAll(List<Integer> dests,
ByteArray key,
List<Versioned<byte[]>> values,
HashMap<Integer, SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>>> nodeToQueue,
int[] fetched) throws InterruptedException {
for(Versioned<byte[]> value: values) {
for(int nodeId: dests) {
fetched[nodeId]++;
nodeToQueue.get(nodeId).put(Pair.create(key, value));
if(0 == fetched[nodeId] % FETCHUPDATE_BATCH_SIZE) {
nodeToQueue.get(nodeId).put(BREAK);
}
for(Versioned<byte[]> value: values)
putValue(dests, key, value, nodeToQueue, fetched);
}

private void putValue(List<Integer> dests,
ByteArray key,
Versioned<byte[]> value,
HashMap<Integer, SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>>> nodeToQueue,
int[] fetched) throws InterruptedException {
for(int nodeId: dests) {
fetched[nodeId]++;
nodeToQueue.get(nodeId).put(Pair.create(key, value));
if(0 == fetched[nodeId] % FETCHUPDATE_BATCH_SIZE) {
nodeToQueue.get(nodeId).put(BREAK);
}
}
}
Expand All @@ -364,7 +438,7 @@ private void printProgress(int scanned, int[] fetched, long startTime, String st
}
}

private void close(ClosableIterator<ByteArray> keys,
private void close(ClosableIterator<?> keys,
String storeName,
int scanned,
int[] fetched,
Expand Down
67 changes: 67 additions & 0 deletions src/java/voldemort/store/PartitionListIterator.java
@@ -0,0 +1,67 @@
package voldemort.store;

import java.util.List;
import java.util.NoSuchElementException;

import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
import voldemort.utils.Pair;
import voldemort.versioning.Versioned;

/**
* Iterator that uses efficient partition scan to iterate across a list of
* supplied partitions
*
*/
public class PartitionListIterator implements ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> {

StorageEngine<ByteArray, byte[], byte[]> storageEngine;
List<Integer> partitionsToFetch;
ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> partitionIterator;
int currentIndex;

public PartitionListIterator(StorageEngine<ByteArray, byte[], byte[]> storageEngine,
List<Integer> partitionsToFetch) {
this.storageEngine = storageEngine;
this.partitionsToFetch = partitionsToFetch;
this.currentIndex = 0;
}

public boolean hasNext() {
// do we have more elements in the current partition we are serving?
if(this.partitionIterator != null && this.partitionIterator.hasNext())
return true;
// if not, find the next non empty partition
while((currentIndex < partitionsToFetch.size())) {
// close the previous iterator
if(this.partitionIterator != null)
this.partitionIterator.close();
// advance to the next partition
this.partitionIterator = storageEngine.entries(this.partitionsToFetch.get(currentIndex++));
if(this.partitionIterator.hasNext())
return true;
}
return false;
}

public Pair<ByteArray, Versioned<byte[]>> next() {
if(!hasNext())
throw new NoSuchElementException("End of partition entries stream");
return this.partitionIterator.next();
}

public void remove() {
throw new UnsupportedOperationException("Removal not supported");
}

@Override
protected final void finalize() {
close();
}

public void close() {
if(partitionIterator != null) {
partitionIterator.close();
}
}
}

0 comments on commit 68b31b9

Please sign in to comment.