Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Code review - partition scan

  • Loading branch information...
commit 98182b66c43c781f7e51d17ba4c739de699f5b22 1 parent fbe56d2
@vinothchandar vinothchandar authored
View
12 src/java/voldemort/routing/ConsistentRoutingStrategy.java
@@ -150,6 +150,16 @@ else if(a != Integer.MIN_VALUE)
return replicationPartitionsList;
}
+ /**
+ * Obtain the master partition for a given key
+ *
+ * @param key
+ * @return
+ */
+ public Integer getMasterPartition(byte[] key) {
+ return abs(hash.hash(key)) % (Math.max(1, this.partitionToNode.length));
+ }
+
public Set<Node> getNodes() {
Set<Node> s = Sets.newHashSetWithExpectedSize(partitionToNode.length);
for(Node n: this.partitionToNode)
@@ -172,7 +182,7 @@ Node getNodeByPartition(int partition) {
public List<Integer> getPartitionList(byte[] key) {
// hash the key and perform a modulo on the total number of partitions,
// to get the master partition
- int index = abs(hash.hash(key)) % (Math.max(1, this.partitionToNode.length));
+ int index = getMasterPartition(key);
if(logger.isDebugEnabled()) {
logger.debug("Key " + ByteUtils.toHexString(key) + " primary partition " + index);
}
View
10 src/java/voldemort/routing/RouteToAllStrategy.java
@@ -57,6 +57,16 @@ public int getNumReplicas() {
throw new UnsupportedOperationException("Not yet implemented.");
}
+ /**
+ * Obtain the master partition for a given key
+ *
+ * @param key
+ * @return
+ */
+ public Integer getMasterPartition(byte[] key) {
+ throw new UnsupportedOperationException("Not yet implemented.");
+ }
+
public String getType() {
return RoutingStrategyType.TO_ALL_STRATEGY;
}
View
8 src/java/voldemort/routing/RoutingStrategy.java
@@ -55,6 +55,14 @@
public List<Integer> getPartitionList(byte[] key);
/**
+ * Obtain the master partition for a given key
+ *
+ * @param key The key being operated on
+ * @return The partition that owns the key
+ */
+ public Integer getMasterPartition(byte[] key);
+
+ /**
* Get the replication partitions list for the given partition.
*
* @param partitionId
View
21 src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2008-2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package voldemort.server.protocol.admin;
import java.io.DataInputStream;
@@ -22,6 +38,7 @@
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.Pair;
import voldemort.utils.RebalanceUtils;
+import voldemort.utils.Time;
import voldemort.versioning.Versioned;
import com.google.protobuf.Message;
@@ -102,7 +119,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
found = true;
logger.info("Fetching [partition: " + currentPartition + ", replica type:"
+ currentReplicaType + "] for store " + storageEngine.getName());
- entriesPartitionIterator = storageEngine.entries(currentPartition.shortValue());
+ entriesPartitionIterator = storageEngine.entries(currentPartition);
}
currentIndex++;
}
@@ -145,7 +162,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
// log progress
if(0 == counter % 100000) {
- long totalTime = (System.currentTimeMillis() - startTime) / 1000;
+ long totalTime = (System.currentTimeMillis() - startTime) / Time.MS_PER_SECOND;
logger.info("Fetch entries scanned " + counter + " entries, fetched " + fetched
+ " entries for store '" + storageEngine.getName()
View
21 src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2008-2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package voldemort.server.protocol.admin;
import java.io.DataInputStream;
@@ -21,6 +37,7 @@
import voldemort.utils.ClosableIterator;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.RebalanceUtils;
+import voldemort.utils.Time;
import com.google.protobuf.Message;
@@ -100,7 +117,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
found = true;
logger.info("Fetching [partition: " + currentPartition + ", replica type:"
+ currentReplicaType + "] for store " + storageEngine.getName());
- keysPartitionIterator = storageEngine.keys(currentPartition.shortValue());
+ keysPartitionIterator = storageEngine.keys(currentPartition);
}
currentIndex++;
}
@@ -135,7 +152,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
// log progress
if(0 == counter % 100000) {
- long totalTime = (System.currentTimeMillis() - startTime) / 1000;
+ long totalTime = (System.currentTimeMillis() - startTime) / Time.MS_PER_SECOND;
logger.info("Fetch entries scanned " + counter + " entries, fetched " + fetched
+ " entries for store '" + storageEngine.getName()
View
38 src/java/voldemort/store/StoreBinaryFormat.java
@@ -8,30 +8,32 @@
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;
-/**
- * Defines a generic on-disk data format for versioned voldemort data
- *
- * The format of the values stored on disk. The format is
- * VERSION - 1 byte
- * ------------repeating-------------------
- * CLOCK - variable length, self delimiting
- * NUM_CLOCK_ENTRIES - 2 bytes (short)
- * VERSION_SIZE - 1 byte
- * --------------- repeating ----------
- * NODE_ID - 2 bytes (short)
- * VERSION - VERSION_SIZE bytes
- * ------------------------------------
- * VALUE - variable length
- * VALUE_SIZE - 4 bytes
- * VALUE_BYTES - VALUE_SIZE bytes
- * ----------------------------------------
+/*
+ * Defines a generic on-disk binary data format for versioned voldemort data
+ * -----------------------------------------
+ * FORMAT_VERSION : 1 byte
+ * Versioned value (repeating) {
+ * Clock (variable length) {
+ * NUM_CLOCK_ENTRIES : 2 bytes (short)
+ * VERSION_SIZE : 1 byte
+ * Server clock (repeating) {
+ * NODE_ID : 2 bytes (short)
+ * VERSION : VERSION_SIZE bytes
+ * }
+ * }
+ * Value (variable length) {
+ * VALUE_SIZE : 4 bytes (int)
+ * VALUE_BYTES : VALUE_SIZE bytes
+ * }
+ * }
+ * -----------------------------------------
*/
public class StoreBinaryFormat {
/* In the future we can use this to handle format changes */
private static final byte VERSION = 0;
- private static final int PARTITIONID_PREFIX_SIZE = 2;
+ private static final int PARTITIONID_PREFIX_SIZE = ByteUtils.SIZE_OF_SHORT;
public static byte[] toByteArray(List<Versioned<byte[]>> values) {
int size = 1;
View
27 src/java/voldemort/store/bdb/PartitionPrefixedBdbStorageEngine.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2008-2009 LinkedIn, Inc
+ * Copyright 2008-2012 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -92,7 +92,7 @@ public PartitionPrefixedBdbStorageEngine(String name,
public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms)
throws PersistenceFailureException {
StoreUtils.assertValidKey(key);
- int partition = routingStrategy.getPartitionList(key.get()).get(0);
+ int partition = routingStrategy.getMasterPartition(key.get());
ByteArray prefixedKey = new ByteArray(StoreBinaryFormat.makePrefixedKey(key.get(),
partition));
return super.get(prefixedKey, transforms);
@@ -103,7 +103,7 @@ public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
throws PersistenceFailureException {
StoreUtils.assertValidKey(key);
- int partition = routingStrategy.getPartitionList(key.get()).get(0);
+ int partition = routingStrategy.getMasterPartition(key.get());
ByteArray prefixedKey = new ByteArray(StoreBinaryFormat.makePrefixedKey(key.get(),
partition));
super.put(prefixedKey, value, transforms);
@@ -113,7 +113,7 @@ public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
public boolean delete(ByteArray key, Version version) throws PersistenceFailureException {
StoreUtils.assertValidKey(key);
- int partition = routingStrategy.getPartitionList(key.get()).get(0);
+ int partition = routingStrategy.getMasterPartition(key.get());
ByteArray prefixedKey = new ByteArray(StoreBinaryFormat.makePrefixedKey(key.get(),
partition));
return super.delete(prefixedKey, version);
@@ -124,6 +124,10 @@ protected Logger getLogger() {
return logger;
}
+ /**
+ * Implements a range scan over the partition entries
+ *
+ */
private static class BdbPartitionEntriesIterator extends
BdbIterator<Pair<ByteArray, Versioned<byte[]>>> {
@@ -153,6 +157,11 @@ public boolean hasNext() {
return cache.remove(cache.size() - 1);
}
+ /**
+ * Fetches the next entry from the DB, for the partition
+ *
+ * @return true if some new data was fetched, false if end of data
+ */
private boolean makeMore() {
DatabaseEntry keyEntry = new DatabaseEntry();
DatabaseEntry valueEntry = new DatabaseEntry();
@@ -188,6 +197,10 @@ private boolean makeMore() {
}
}
+ /**
+ * Implements a range scan over the key entries belonging to the partition
+ *
+ */
private static class BdbPartitionKeysIterator extends BdbIterator<ByteArray> {
ByteArray current = null;
@@ -215,6 +228,12 @@ public ByteArray next() {
return result;
}
+ /**
+ * Fetches the next key for the partition from the DB
+ *
+ * @return true if successfully fetched one more key, false if end of
+ * keys
+ */
private boolean fetchNextKey() {
DatabaseEntry keyEntry = new DatabaseEntry();
DatabaseEntry valueEntry = new DatabaseEntry();
View
2  test/unit/voldemort/client/AdminFetchTest.java
@@ -203,7 +203,7 @@ public void testFetchNonExistentEntriesPrimary() {
}
@Test
- public void testFetchNonExistentEntriesSeccondary() {
+ public void testFetchNonExistentEntriesSecondary() {
HashMap<Integer, List<Integer>> replicaToPartitionList = new HashMap<Integer, List<Integer>>();
replicaToPartitionList.put(1, Arrays.asList(1, 2));
Iterator<Pair<ByteArray, Versioned<byte[]>>> entriesItr = adminClient.fetchEntries(0,
View
5 test/unit/voldemort/store/bdb/BdbStorageEngineTest.java
@@ -54,6 +54,11 @@
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
+/**
+ * Tests the BDB storage engine. Note that this class uses junit4 style test
+ * methods, though the base class extends TestCase junit 3 style
+ *
+ */
@RunWith(Parameterized.class)
public class BdbStorageEngineTest extends AbstractStorageEngineTest {
View
19 test/unit/voldemort/store/bdb/PartitionPrefixedBdbStorageEngineTest.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2008-2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package voldemort.store.bdb;
import static junit.framework.Assert.assertEquals;
@@ -108,13 +124,10 @@ public void testHashConsistencyAcrossRoutingStrategies() {
cluster);
RoutingStrategy zStrategy = new RoutingStrategyFactory().updateRoutingStrategy(zoneStore,
cluster);
-
BdbStorageEngine cPrefixedBdbStore = (BdbStorageEngine) bdbStorage.getStore(consistentStore,
cStrategy);
-
BdbStorageEngine zPrefixedBdbStore = (BdbStorageEngine) bdbStorage.getStore(zoneStore,
zStrategy);
-
HashMap<ByteArray, byte[]> kvpairs = ServerTestUtils.createRandomKeyValuePairs(10000);
for(ByteArray key: kvpairs.keySet()) {
assertEquals(cStrategy.getPartitionList(key.get()).get(0),
Please sign in to comment.
Something went wrong with that request. Please try again.