Permalink
Browse files

Adding Partition Scan support for rebalancing

  • Loading branch information...
1 parent 98182b6 commit 68b31b96088d5a8e2b986ec4733aa07a4420f6a6 @vinothchandar vinothchandar committed Oct 10, 2012
View
11 src/java/voldemort/server/VoldemortConfig.java
@@ -195,6 +195,7 @@
private long rebalancingTimeoutSec;
private int maxParallelStoresRebalancing;
private boolean rebalancingOptimization;
+ private boolean usePartitionScanForRebalance;
public VoldemortConfig(Properties props) {
this(new Props(props));
@@ -405,6 +406,8 @@ public VoldemortConfig(Props props) {
this.rebalancingTimeoutSec = props.getLong("rebalancing.timeout.seconds", 10 * 24 * 60 * 60);
this.maxParallelStoresRebalancing = props.getInt("max.parallel.stores.rebalancing", 3);
this.rebalancingOptimization = props.getBoolean("rebalancing.optimization", true);
+ this.usePartitionScanForRebalance = props.getBoolean("use.partition.scan.for.rebalance",
+ true);
this.failureDetectorImplementation = props.getString("failuredetector.implementation",
FailureDetectorConfig.DEFAULT_IMPLEMENTATION_CLASS_NAME);
@@ -1646,6 +1649,14 @@ public void setMaxParallelStoresRebalancing(boolean rebalancingOptimization) {
this.rebalancingOptimization = rebalancingOptimization;
}
+ public boolean usePartitionScanForRebalance() {
+ return usePartitionScanForRebalance;
+ }
+
+ public void setUsePartitionScanForRebalance(boolean usePartitionScanForRebalance) {
+ this.usePartitionScanForRebalance = usePartitionScanForRebalance;
+ }
+
public boolean isEnableJmxClusterName() {
return enableJmxClusterName;
}
View
7 src/java/voldemort/server/rebalance/Rebalancer.java
@@ -298,8 +298,8 @@ private void changeCluster(final Cluster cluster) {
System.currentTimeMillis());
logger.info("Switching metadata from " + metadataStore.getCluster() + " to "
+ cluster + " [ " + updatedVectorClock + " ]");
- metadataStore.put(MetadataStore.CLUSTER_KEY, Versioned.value((Object) cluster,
- updatedVectorClock));
+ metadataStore.put(MetadataStore.CLUSTER_KEY,
+ Versioned.value((Object) cluster, updatedVectorClock));
} finally {
metadataStore.writeLock.unlock();
}
@@ -383,7 +383,8 @@ public int rebalanceNodeOnDonor(final List<RebalancePartitionsInfo> stealInfos)
voldemortConfig,
metadataStore,
requestId,
- stealInfos));
+ stealInfos,
+ voldemortConfig.usePartitionScanForRebalance()));
return requestId;
}
View
104 src/java/voldemort/server/rebalance/async/DonorBasedRebalanceAsyncOperation.java
@@ -16,13 +16,15 @@
package voldemort.server.rebalance.async;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
@@ -40,6 +42,7 @@
import voldemort.server.VoldemortConfig;
import voldemort.server.rebalance.Rebalancer;
import voldemort.server.rebalance.VoldemortRebalancingException;
+import voldemort.store.PartitionListIterator;
import voldemort.store.StorageEngine;
import voldemort.store.StoreDefinition;
import voldemort.store.metadata.MetadataStore;
@@ -78,6 +81,7 @@
private final AtomicBoolean running = new AtomicBoolean(true);
private final Cluster initialCluster;
private final Cluster targetCluster;
+ private final boolean usePartitionScan;
private final HashMultimap<String, Pair<Integer, HashMap<Integer, List<Integer>>>> storeToNodePartitionMapping;
@@ -103,13 +107,15 @@ public DonorBasedRebalanceAsyncOperation(Rebalancer rebalancer,
VoldemortConfig voldemortConfig,
MetadataStore metadataStore,
int requestId,
- List<RebalancePartitionsInfo> stealInfos) {
+ List<RebalancePartitionsInfo> stealInfos,
+ boolean usePartitionScan) {
super(rebalancer, voldemortConfig, metadataStore, requestId, "Donor based rebalance : "
+ stealInfos);
this.storeRepository = storeRepository;
this.stealInfos = stealInfos;
this.targetCluster = metadataStore.getCluster();
this.initialCluster = stealInfos.get(0).getInitialCluster();
+ this.usePartitionScan = usePartitionScan;
// Group the plans by the store names
this.storeToNodePartitionMapping = groupByStores(stealInfos);
@@ -293,11 +299,18 @@ public Thread newThread(Runnable r) {
logger.info("Started a thread for " + jobName);
}
- fetchEntriesForStealers(storageEngine,
- optimizedStealerNodeToMappingTuples,
- storeDef,
- nodeToQueue,
- storeName);
+ if(usePartitionScan && storageEngine.isPartitionScanSupported())
+ fetchEntriesForStealersPartitionScan(storageEngine,
+ optimizedStealerNodeToMappingTuples,
+ storeDef,
+ nodeToQueue,
+ storeName);
+ else
+ fetchEntriesForStealers(storageEngine,
+ optimizedStealerNodeToMappingTuples,
+ storeDef,
+ nodeToQueue,
+ storeName);
}
}
@@ -340,18 +353,79 @@ private void fetchEntriesForStealers(StorageEngine<ByteArray, byte[], byte[]> st
}
}
+ private void fetchEntriesForStealersPartitionScan(StorageEngine<ByteArray, byte[], byte[]> storageEngine,
+ Set<Pair<Integer, HashMap<Integer, List<Integer>>>> optimizedStealerNodeToMappingTuples,
+ StoreDefinition storeDef,
+ HashMap<Integer, SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>>> nodeToQueue,
+ String storeName) {
+ int scanned = 0;
+ int[] fetched = new int[targetCluster.getNumberOfNodes()];
+ long startTime = System.currentTimeMillis();
+
+ // construct a set of all the partitions we will be fetching
+ Set<Integer> partitionsToDonate = new HashSet<Integer>();
+ for(Pair<Integer, HashMap<Integer, List<Integer>>> nodePartitionMapPair: optimizedStealerNodeToMappingTuples) {
+ // for each of the nodes, add all the partitions requested
+ HashMap<Integer, List<Integer>> replicaToPartitionMap = nodePartitionMapPair.getSecond();
+ if(replicaToPartitionMap != null && replicaToPartitionMap.values() != null) {
+ for(List<Integer> partitions: replicaToPartitionMap.values())
+ if(partitions != null)
+ partitionsToDonate.addAll(partitions);
+ }
+ }
+
+ PartitionListIterator entries = new PartitionListIterator(storageEngine,
+ new ArrayList<Integer>(partitionsToDonate));
+
+ try {
+ while(running.get() && entries.hasNext()) {
+ Pair<ByteArray, Versioned<byte[]>> entry = entries.next();
+ ByteArray key = entry.getFirst();
+ Versioned<byte[]> value = entry.getSecond();
+
+ scanned++;
+ List<Integer> nodeIds = RebalanceUtils.checkKeyBelongsToPartition(key.get(),
+ optimizedStealerNodeToMappingTuples,
+ targetCluster,
+ storeDef);
+
+ if(nodeIds.size() > 0) {
+ putValue(nodeIds, key, value, nodeToQueue, fetched);
+ }
+
+ // print progress for every 100k entries.
+ if(0 == scanned % SCAN_PROGRESS_COUNT) {
+ printProgress(scanned, fetched, startTime, storeName);
+ }
+ }
+ terminateAllSlaves(storeName);
+ } catch(InterruptedException e) {
+ logger.info("InterruptedException received while sending entries to remote nodes, the process is terminating...");
+ terminateAllSlavesAsync(storeName);
+ } finally {
+ close(entries, storeName, scanned, fetched, startTime);
+ }
+ }
+
private void putAll(List<Integer> dests,
ByteArray key,
List<Versioned<byte[]>> values,
HashMap<Integer, SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>>> nodeToQueue,
int[] fetched) throws InterruptedException {
- for(Versioned<byte[]> value: values) {
- for(int nodeId: dests) {
- fetched[nodeId]++;
- nodeToQueue.get(nodeId).put(Pair.create(key, value));
- if(0 == fetched[nodeId] % FETCHUPDATE_BATCH_SIZE) {
- nodeToQueue.get(nodeId).put(BREAK);
- }
+ for(Versioned<byte[]> value: values)
+ putValue(dests, key, value, nodeToQueue, fetched);
+ }
+
+ private void putValue(List<Integer> dests,
+ ByteArray key,
+ Versioned<byte[]> value,
+ HashMap<Integer, SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>>> nodeToQueue,
+ int[] fetched) throws InterruptedException {
+ for(int nodeId: dests) {
+ fetched[nodeId]++;
+ nodeToQueue.get(nodeId).put(Pair.create(key, value));
+ if(0 == fetched[nodeId] % FETCHUPDATE_BATCH_SIZE) {
+ nodeToQueue.get(nodeId).put(BREAK);
}
}
}
@@ -364,7 +438,7 @@ private void printProgress(int scanned, int[] fetched, long startTime, String st
}
}
- private void close(ClosableIterator<ByteArray> keys,
+ private void close(ClosableIterator<?> keys,
String storeName,
int scanned,
int[] fetched,
View
67 src/java/voldemort/store/PartitionListIterator.java
@@ -0,0 +1,67 @@
+package voldemort.store;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import voldemort.utils.ByteArray;
+import voldemort.utils.ClosableIterator;
+import voldemort.utils.Pair;
+import voldemort.versioning.Versioned;
+
+/**
+ * Iterator that uses efficient partition scan to iterate across a list of
+ * supplied partitions
+ *
+ */
+public class PartitionListIterator implements ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> {
+
+ StorageEngine<ByteArray, byte[], byte[]> storageEngine;
+ List<Integer> partitionsToFetch;
+ ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> partitionIterator;
+ int currentIndex;
+
+ public PartitionListIterator(StorageEngine<ByteArray, byte[], byte[]> storageEngine,
+ List<Integer> partitionsToFetch) {
+ this.storageEngine = storageEngine;
+ this.partitionsToFetch = partitionsToFetch;
+ this.currentIndex = 0;
+ }
+
+ public boolean hasNext() {
+ // do we have more elements in the current partition we are serving?
+ if(this.partitionIterator != null && this.partitionIterator.hasNext())
+ return true;
+ // if not, find the next non empty partition
+ while((currentIndex < partitionsToFetch.size())) {
+ // close the previous iterator
+ if(this.partitionIterator != null)
+ this.partitionIterator.close();
+ // advance to the next partition
+ this.partitionIterator = storageEngine.entries(this.partitionsToFetch.get(currentIndex++));
+ if(this.partitionIterator.hasNext())
+ return true;
+ }
+ return false;
+ }
+
+ public Pair<ByteArray, Versioned<byte[]>> next() {
+ if(!hasNext())
+ throw new NoSuchElementException("End of partition entries stream");
+ return this.partitionIterator.next();
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException("Removal not supported");
+ }
+
+ @Override
+ protected final void finalize() {
+ close();
+ }
+
+ public void close() {
+ if(partitionIterator != null) {
+ partitionIterator.close();
+ }
+ }
+}
View
156 test/unit/voldemort/store/bdb/BdbPartitionListIteratorTest.java
@@ -0,0 +1,156 @@
+package voldemort.store.bdb;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.fail;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.commons.io.FileDeleteStrategy;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import voldemort.TestUtils;
+import voldemort.routing.RoutingStrategy;
+import voldemort.server.VoldemortConfig;
+import voldemort.store.PartitionListIterator;
+import voldemort.store.StoreDefinition;
+import voldemort.utils.ByteArray;
+import voldemort.utils.Props;
+import voldemort.versioning.Versioned;
+
+/**
+ * Tests the PartitionListIterator used in pidscan based rebalancing
+ *
+ */
+public class BdbPartitionListIteratorTest {
+
+ private File bdbMasterDir;
+ private BdbStorageConfiguration bdbStorage;
+ private BdbStorageEngine store;
+ private RoutingStrategy strategy;
+ private HashMap<Integer, Set<String>> partitionEntries;
+
+ @Before
+ public void setUp() throws Exception {
+ bdbMasterDir = TestUtils.createTempDir();
+ FileDeleteStrategy.FORCE.delete(bdbMasterDir);
+
+ // lets use all the default values.
+ Props props = new Props();
+ props.put("node.id", 1);
+ props.put("voldemort.home", "test/common/voldemort/config");
+ VoldemortConfig voldemortConfig = new VoldemortConfig(props);
+ voldemortConfig.setBdbCacheSize(10 * 1024 * 1024);
+ voldemortConfig.setBdbOneEnvPerStore(true);
+ voldemortConfig.setBdbDataDirectory(bdbMasterDir.toURI().getPath());
+ voldemortConfig.setBdbPrefixKeysWithPartitionId(true);
+ bdbStorage = new BdbStorageConfiguration(voldemortConfig);
+ StoreDefinition defA = TestUtils.makeStoreDefinition("storeA");
+ store = (BdbStorageEngine) bdbStorage.getStore(defA,
+ (strategy = TestUtils.makeSingleNodeRoutingStrategy()));
+
+ // load some data for non odd partitions, and note down how much data we
+ // have for each partition.
+ partitionEntries = new HashMap<Integer, Set<String>>();
+ int numEntries = 0;
+ while(numEntries++ < 10000) {
+ String key = "entry_" + numEntries;
+ int p = strategy.getMasterPartition(key.getBytes());
+ // omit odd partitions
+ if(p % 2 == 1)
+ continue;
+
+ if(!partitionEntries.containsKey(p))
+ partitionEntries.put(p, new HashSet<String>());
+
+ store.put(new ByteArray(key.getBytes()), new Versioned<byte[]>(key.getBytes()), null);
+ partitionEntries.get(p).add(key);
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ store.close();
+ bdbStorage.close();
+ FileDeleteStrategy.FORCE.delete(bdbMasterDir);
+ }
+
+ @Test
+ public void testEmptyPartitionList() {
+
+ PartitionListIterator plistItr = new PartitionListIterator(store, new ArrayList<Integer>());
+ assertEquals("Empty list cannot have a next element", false, plistItr.hasNext());
+ try {
+ plistItr.next();
+ fail("Should have thrown an exception for next()");
+ } catch(NoSuchElementException ne) {
+
+ } finally {
+ plistItr.close();
+ }
+ }
+
+ @Test
+ public void testEmptyPartition() {
+
+ PartitionListIterator plistItr = new PartitionListIterator(store, Arrays.asList(1));
+ assertEquals("No data loaded for odd partitions, so hasNext() should be false",
+ false,
+ plistItr.hasNext());
+ try {
+ plistItr.next();
+ fail("Should have thrown an exception for next()");
+ } catch(NoSuchElementException ne) {
+
+ } finally {
+ plistItr.close();
+ }
+ }
+
+ @Test
+ public void testSingletonPartitionList() {
+ PartitionListIterator plistItr = new PartitionListIterator(store, Arrays.asList(4));
+ Set<String> pentries = new HashSet<String>();
+ while(plistItr.hasNext()) {
+ pentries.add(new String(plistItr.next().getFirst().get()));
+ }
+ plistItr.close();
+ assertEquals(partitionEntries.get(4), pentries);
+ }
+
+ @Test
+ public void testPartitionListWithEmptyPartitions() {
+ PartitionListIterator plistItr = new PartitionListIterator(store, Arrays.asList(2,
+ 3,
+ 4,
+ 5,
+ 6));
+ HashMap<Integer, Set<String>> retrievedPartitionEntries = new HashMap<Integer, Set<String>>();
+ while(plistItr.hasNext()) {
+ String key = new String(plistItr.next().getFirst().get());
+ int p = strategy.getMasterPartition(key.getBytes());
+
+ if(!retrievedPartitionEntries.containsKey(p))
+ retrievedPartitionEntries.put(p, new HashSet<String>());
+ retrievedPartitionEntries.get(p).add(key);
+ }
+ plistItr.close();
+
+ // should only have retrieved entries for even partitions
+ assertEquals(3, retrievedPartitionEntries.size());
+ for(Integer p: Arrays.asList(2, 3, 4, 5, 6)) {
+ if(p % 2 == 0) {
+ assertEquals(partitionEntries.get(p), retrievedPartitionEntries.get(p));
+ } else {
+ assertEquals(false, retrievedPartitionEntries.containsKey(p));
+ }
+ }
+ }
+}

0 comments on commit 68b31b9

Please sign in to comment.