Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: voldemort/voldemort
base: master
...
head fork: voldemort/voldemort
compare: release-096li8
Checking mergeability… Don’t worry, you can still create the pull request.
  • 9 commits
  • 73 files changed
  • 0 commit comments
  • 1 contributor
Showing with 2,780 additions and 568 deletions.
  1. +1 −1  .classpath
  2. +1 −1  build.properties
  3. +1 −1  contrib/hadoop-store-builder/perf/voldemort/contrib/batchindexer/performance/BdbBuildPerformanceTest.java
  4. +1 −1  contrib/hadoop-store-builder/perf/voldemort/contrib/batchindexer/performance/MysqlBuildPerformanceTest.java
  5. +3 −1 contrib/krati/src/java/voldemort/store/krati/KratiStorageConfiguration.java
  6. +12 −0 contrib/krati/src/java/voldemort/store/krati/KratiStorageEngine.java
  7. BIN  lib/google-collect-1.0-rc2.jar
  8. BIN  lib/google-collect-1.0.jar
  9. BIN  lib/je-4.0.92.jar
  10. BIN  lib/je-4.1.17.jar
  11. +69 −0 release_notes.txt
  12. +36 −1 src/java/voldemort/VoldemortAdminTool.java
  13. +102 −0 src/java/voldemort/client/protocol/admin/AdminClient.java
  14. +11 −1 src/java/voldemort/routing/ConsistentRoutingStrategy.java
  15. +10 −0 src/java/voldemort/routing/RouteToAllStrategy.java
  16. +8 −0 src/java/voldemort/routing/RoutingStrategy.java
  17. +133 −59 src/java/voldemort/server/VoldemortConfig.java
  18. +44 −23 src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java
  19. +11 −1 src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java
  20. +13 −1 src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java
  21. +189 −0 src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java
  22. +179 −0 src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java
  23. +1 −8 src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java
  24. +2 −1  src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java
  25. +9 −8 src/java/voldemort/server/storage/StorageService.java
  26. +4 −1 src/java/voldemort/store/StorageConfiguration.java
  27. +31 −0 src/java/voldemort/store/StorageEngine.java
  28. +107 −0 src/java/voldemort/store/StoreBinaryFormat.java
  29. +16 −0 src/java/voldemort/store/StoreUtils.java
  30. +43 −0 src/java/voldemort/store/bdb/BdbIterator.java
  31. +11 −0 src/java/voldemort/store/bdb/BdbRuntimeConfig.java
  32. +21 −6 src/java/voldemort/store/bdb/BdbStorageConfiguration.java
  33. +235 −274 src/java/voldemort/store/bdb/BdbStorageEngine.java
  34. +275 −0 src/java/voldemort/store/bdb/PartitionPrefixedBdbStorageEngine.java
  35. +69 −0 src/java/voldemort/store/bdb/stats/BdbEnvironmentStats.java
  36. +12 −0 src/java/voldemort/store/configuration/ConfigurationStorageEngine.java
  37. +3 −1 src/java/voldemort/store/memory/CacheStorageConfiguration.java
  38. +3 −1 src/java/voldemort/store/memory/InMemoryStorageConfiguration.java
  39. +13 −1 src/java/voldemort/store/memory/InMemoryStorageEngine.java
  40. +15 −2 src/java/voldemort/store/metadata/MetadataStore.java
  41. +3 −1 src/java/voldemort/store/mysql/MysqlStorageConfiguration.java
  42. +14 −1 src/java/voldemort/store/mysql/MysqlStorageEngine.java
  43. +3 −1 src/java/voldemort/store/readonly/ReadOnlyStorageConfiguration.java
  44. +13 −0 src/java/voldemort/store/readonly/ReadOnlyStorageEngine.java
  45. +12 −0 src/java/voldemort/store/serialized/SerializingStorageEngine.java
  46. +12 −0 src/java/voldemort/store/slop/SlopStorageEngine.java
  47. +3 −1 src/java/voldemort/store/views/ViewStorageConfiguration.java
  48. +14 −2 src/java/voldemort/store/views/ViewStorageEngine.java
  49. +23 −0 src/java/voldemort/utils/ByteUtils.java
  50. +28 −3 src/java/voldemort/utils/RebalanceUtils.java
  51. +15 −11 src/java/voldemort/versioning/VectorClock.java
  52. +1 −1  src/java/voldemort/versioning/Versioned.java
  53. +10 −3 src/java/voldemort/xml/ClusterMapper.java
  54. +16 −0 test/common/voldemort/TestUtils.java
  55. +4 −0 test/common/voldemort/VoldemortTestConstants.java
  56. +1 −1  test/common/voldemort/config/one-node-cluster.xml
  57. +50 −0 test/common/voldemort/config/two-stores-with-zones.xml
  58. +52 −0 test/common/voldemort/store/RandomlyFailingDelegatingStore.java
  59. +13 −5 test/integration/voldemort/CatBdbStore.java
  60. +2 −1  test/integration/voldemort/performance/CacheStorageEnginePerformanceTest.java
  61. +2 −1  test/integration/voldemort/performance/StorageEnginePerformanceTest.java
  62. +2 −1  test/integration/voldemort/performance/benchmark/Benchmark.java
  63. +3 −1 test/integration/voldemort/store/noop/NoopStorageConfiguration.java
  64. +12 −0 test/integration/voldemort/store/noop/NoopStorageEngine.java
  65. +3 −1 test/integration/voldemort/store/pausable/PausableStorageConfiguration.java
  66. +14 −0 test/integration/voldemort/store/pausable/PausableStorageEngine.java
  67. +222 −0 test/unit/voldemort/client/AdminFetchTest.java
  68. +32 −8 test/unit/voldemort/scheduled/DataCleanupJobTest.java
  69. +137 −97 test/unit/voldemort/store/bdb/BdbCachePartitioningTest.java
  70. +54 −22 test/unit/voldemort/store/bdb/BdbSplitStorageEngineTest.java
  71. +74 −11 test/unit/voldemort/store/bdb/BdbStorageEngineTest.java
  72. +240 −0 test/unit/voldemort/store/bdb/PartitionPrefixedBdbStorageEngineTest.java
  73. +2 −1  test/unit/voldemort/store/memory/CacheStorageEngineTest.java
View
2  .classpath
@@ -40,7 +40,7 @@
<classpathentry kind="lib" path="lib/protobuf-java-2.3.0.jar"/>
<classpathentry kind="lib" path="contrib/ec2-testing/lib/typica.jar"/>
<classpathentry kind="lib" path="lib/google-collect-1.0.jar"/>
- <classpathentry kind="lib" path="lib/je-4.0.92.jar"/>
+ <classpathentry kind="lib" path="lib/je-4.1.17.jar"/>
<classpathentry kind="lib" path="lib/paranamer-2.1.jar"/>
<classpathentry kind="lib" path="lib/jackson-mapper-asl-1.4.0.jar"/>
<classpathentry kind="lib" path="lib/jackson-core-asl-1.4.0.jar"/>
View
2  build.properties
@@ -37,4 +37,4 @@ tomcat.manager.password=tomcat
tomcat.context=/voldemort
## Release
-curr.release=0.90.1
+curr.release=0.96.li8
View
2  ...tore-builder/perf/voldemort/contrib/batchindexer/performance/BdbBuildPerformanceTest.java
@@ -52,7 +52,7 @@ public static void main(String[] args) throws FileNotFoundException, IOException
String storeName = args[1];
String jsonDataFile = args[2];
- final Store<ByteArray, byte[], byte[]> store = new BdbStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(TestUtils.makeStoreDefinition(storeName));
+ final Store<ByteArray, byte[], byte[]> store = new BdbStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(TestUtils.makeStoreDefinition(storeName), TestUtils.makeSingleNodeRoutingStrategy());
final AtomicInteger obsoletes = new AtomicInteger(0);
View
2  ...re-builder/perf/voldemort/contrib/batchindexer/performance/MysqlBuildPerformanceTest.java
@@ -52,7 +52,7 @@ public static void main(String[] args) throws FileNotFoundException, IOException
String storeName = args[1];
String jsonDataFile = args[2];
- final Store<ByteArray, byte[], byte[]> store = new MysqlStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(TestUtils.makeStoreDefinition(storeName));
+ final Store<ByteArray, byte[], byte[]> store = new MysqlStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(TestUtils.makeStoreDefinition(storeName), TestUtils.makeSingleNodeRoutingStrategy());
final AtomicInteger obsoletes = new AtomicInteger(0);
View
4 contrib/krati/src/java/voldemort/store/krati/KratiStorageConfiguration.java
@@ -8,6 +8,7 @@
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
+import voldemort.routing.RoutingStrategy;
import voldemort.server.VoldemortConfig;
import voldemort.store.StorageConfiguration;
import voldemort.store.StorageEngine;
@@ -45,7 +46,8 @@ public KratiStorageConfiguration(VoldemortConfig config) {
public void close() {}
- public StorageEngine<ByteArray, byte[], byte[]> getStore(StoreDefinition storeDef) {
+ public StorageEngine<ByteArray, byte[], byte[]> getStore(StoreDefinition storeDef,
+ RoutingStrategy strategy) {
synchronized(lock) {
File storeDir = new File(dataDirectory, storeDef.getName());
if(!storeDir.exists()) {
View
12 contrib/krati/src/java/voldemort/store/krati/KratiStorageEngine.java
@@ -147,6 +147,14 @@ public void truncate() {
return StoreUtils.keys(entries());
}
+ public ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> entries(int partition) {
+ throw new UnsupportedOperationException("Partition based entries scan not supported for this storage type");
+ }
+
+ public ClosableIterator<ByteArray> keys(int partition) {
+ throw new UnsupportedOperationException("Partition based key scan not supported for this storage type");
+ }
+
public boolean delete(ByteArray key, Version maxVersion) throws VoldemortException {
StoreUtils.assertValidKey(key);
@@ -312,4 +320,8 @@ public void remove() {
public boolean isPartitionAware() {
return false;
}
+
+ public boolean isPartitionScanSupported() {
+ return false;
+ }
}
View
BIN  lib/google-collect-1.0-rc2.jar
Binary file not shown
View
BIN  lib/google-collect-1.0.jar
Binary file not shown
View
BIN  lib/je-4.0.92.jar
Binary file not shown
View
BIN  lib/je-4.1.17.jar
Binary file not shown
View
69 release_notes.txt
@@ -1,3 +1,72 @@
+Release 0.96.li8 (LinkedIn release) on 09/24/2012
+* Efficient partition scan support
+Note: This is experimental again
+
+Release 0.96.li4 (LinkedIn release) on 08/02/2012
+* Upgrading BDB JE version to 4.1.17 on top of 0.96.li3
+Note : This is experimental. Once verified, will merge to master
+
+Release 0.96.li3 (LinkedIn release) on 08/02/2012
+
+* Server storage conflict handling done in Voldemort code
+* Turning off duplicates in BDB which has several sideeffects and performance
+* penalty
+* --mirror-from-url option added to admin tool to copy data from another
+* voldemort server
+
+Note : This is an experimental release, and will be merged back to master
+after prod verification.
+
+Release 0.96.li2 (LinkedIn release) on 08/01/2012
+
+Changes made since 0.96.li1
+* HFTP performance issue bug fix (fix in byte buffer and copy process)
+* Added mechanism to isolate BDB cache usage among stores
+* Enhanced debug logging (for traffic analysis).
+* Python client bug fixes (from pull request)
+* Tooling improvement (generate_cluster_xml.py)
+
+Release 0.96.li1 (LinkedIn release) on 07/12/2012
+
+Changes made since 0.95.li9
+* Reuploaded the jar
+
+Changes made since 0.95.li4
+* Added configurable option to interrupt service being unscheduled
+* Added rollback capability to the Admin tool
+* Added tracking functionality for scan permit owner
+* Added zoned option for restore from replicas
+* Enabled NIO connector by default
+* Added client side MBeans for multiple socket pool metrics
+* Shortened some long tests and created new target called "junit-long" that
+* includes them
+* Finer timeouts and partial getalls
+* Refactored to share VodemortOpCode between classes
+* Fixed a donor-based rebalancing bug and added a unit test
+* Fixed a bug that will return null during concurrent read and writes
+* Added jmxId to Mbean name for failureDetector and storefactory
+* Prevent preferred reads from crossing zones
+* Added additional debug messages
+
+Release 0.95.li4 (LinkedIn release) on 04/13/2012
+
+Changes made since 0.95.li3
+* Added error count Mbeans
+* Concurrency bug fix in Donor based rebalancing
+
+Release 0.95.li3 (LinkedIn release) on 03/22/2012
+
+Changes made since 0.95.li2
+ * HOTFIX - DDS-2536: Fixed FailureDetector to handle host swap
+
+Release 0.95.li2 (LinkedIn release) on 03/21/2012
+
+Changes made since 0.95.li1
+ * Hot fix for SNA-10109. Replaced exception by log message in Histogram
+ * Replaced int by long to avoid overflow in Histogram and RequestCounter
+
+Release 0.95.li1 (LinkedIn release) on 03/07/2012
+
Release 0.90.1 on 10/10/2011
Changes made since 0.90
View
37 src/java/voldemort/VoldemortAdminTool.java
@@ -224,6 +224,14 @@ public static void main(String[] args) throws Exception {
.withRequiredArg()
.describedAs("size-in-mb")
.ofType(Long.class);
+ parser.accepts("mirror-from-url", "Cluster url to mirror data from")
+ .withRequiredArg()
+ .describedAs("mirror-cluster-bootstrap-url")
+ .ofType(String.class);
+ parser.accepts("mirror-node", "Node id in the mirror cluster to mirror from")
+ .withRequiredArg()
+ .describedAs("id-of-mirror-node")
+ .ofType(Integer.class);
OptionSet options = parser.parse(args);
@@ -235,6 +243,8 @@ public static void main(String[] args) throws Exception {
Set<String> missing = CmdUtils.missing(options, "url", "node");
if(missing.size() > 0) {
// Not the most elegant way to do this
+ // basically check if only "node" is missing for these set of
+ // options; all these can live without explicit node ids
if(!(missing.equals(ImmutableSet.of("node"))
&& (options.has("add-stores") || options.has("delete-store")
|| options.has("ro-metadata") || options.has("set-metadata")
@@ -321,11 +331,17 @@ public static void main(String[] args) throws Exception {
}
ops += "f";
}
+ if(options.has("mirror-from-url")) {
+ if(!options.has("mirror-node")) {
+ Utils.croak("Specify the mirror node to fetch from");
+ }
+ ops += "h";
+ }
if(ops.length() < 1) {
Utils.croak("At least one of (delete-partitions, restore, add-node, fetch-entries, "
+ "fetch-keys, add-stores, delete-store, update-entries, get-metadata, ro-metadata, "
+ "set-metadata, check-metadata, key-distribution, clear-rebalancing-metadata, async, "
- + "repair-job, native-backup, rollback, reserve-memory) must be specified");
+ + "repair-job, native-backup, rollback, reserve-memory, mirror-from-url) must be specified");
}
List<String> storeNames = null;
@@ -494,6 +510,21 @@ public static void main(String[] args) throws Exception {
long reserveMB = (Long) options.valueOf("reserve-memory");
adminClient.reserveMemory(nodeId, storeNames, reserveMB);
}
+ if(ops.contains("h")) {
+ if(nodeId == -1) {
+ System.err.println("Cannot run mirroring without node id");
+ System.exit(1);
+ }
+ Integer mirrorNodeId = CmdUtils.valueOf(options, "mirror-node", -1);
+ if(mirrorNodeId == -1) {
+ System.err.println("Cannot run mirroring without mirror node id");
+ System.exit(1);
+ }
+ adminClient.mirrorData(nodeId,
+ mirrorNodeId,
+ (String) options.valueOf("mirror-from-url"),
+ storeNames);
+ }
} catch(Exception e) {
e.printStackTrace();
Utils.croak(e.getMessage());
@@ -597,6 +628,10 @@ public static void printHelp(PrintStream stream, OptionParser parser) throws IOE
stream.println("\t\t./bin/voldemort-admin-tool.sh --fetch-entries --url [url] --node [node-id]");
stream.println("\t9) Update entries for a set of stores using the output from a binary dump fetch entries");
stream.println("\t\t./bin/voldemort-admin-tool.sh --update-entries [folder path from output of --fetch-entries --outdir] --url [url] --node [node-id] --stores [comma-separated list of store names]");
+ stream.println("\t10) Mirror data from another voldemort server (possibly in another cluster) for specified stores");
+ stream.println("\t\t./bin/voldemort-admin-tool.sh --mirror-from-url [bootstrap url to mirror from] --mirror-node [node to mirror from] --url [url] --node [node-id] --stores [comma-separated-list-of-store-names]");
+ stream.println("\t11) Mirror data from another voldemort server (possibly in another cluster) for all stores in current cluster");
+ stream.println("\t\t./bin/voldemort-admin-tool.sh --mirror-from-url [bootstrap url to mirror from] --mirror-node [node to mirror from] --url [url] --node [node-id]");
stream.println();
stream.println("READ-ONLY OPERATIONS");
stream.println("\t1) Retrieve metadata information of read-only data for a particular node and all stores");
View
102 src/java/voldemort/client/protocol/admin/AdminClient.java
@@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -63,6 +64,7 @@
import voldemort.server.rebalance.VoldemortRebalancingException;
import voldemort.store.ErrorCodeMapper;
import voldemort.store.StoreDefinition;
+import voldemort.store.StoreUtils;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.metadata.MetadataStore.VoldemortState;
import voldemort.store.mysql.MysqlStorageConfiguration;
@@ -2487,4 +2489,104 @@ public void reserveMemory(int nodeId, List<String> stores, long sizeInMB) {
logger.info("Finished reserving memory for store : " + storeName);
}
}
+
+ /**
+ * Mirror data from another voldemort server
+ *
+ * @param nodeId node in the current cluster to mirror to
+ * @param nodeIdToMirrorFrom node from which to mirror data
+ * @param urlToMirrorFrom cluster bootstrap url to mirror from
+ * @param stores set of stores to be mirrored
+ *
+ */
+ public void mirrorData(final int nodeId,
+ final int nodeIdToMirrorFrom,
+ final String urlToMirrorFrom,
+ List<String> stores) {
+ final AdminClient mirrorAdminClient = new AdminClient(urlToMirrorFrom,
+ new AdminClientConfig());
+ final AdminClient currentAdminClient = this;
+
+ // determine the partitions residing on the mirror node
+ Node mirrorNode = mirrorAdminClient.getAdminClientCluster().getNodeById(nodeIdToMirrorFrom);
+ Node currentNode = currentAdminClient.getAdminClientCluster().getNodeById(nodeId);
+
+ if(mirrorNode == null) {
+ logger.error("Mirror node specified does not exist in the mirror cluster");
+ return;
+ }
+
+ if(currentNode == null) {
+ logger.error("node specified does not exist in the current cluster");
+ return;
+ }
+
+ // compare the mirror-from and mirrored-to nodes have same set of stores
+ List<String> currentStoreList = StoreUtils.getStoreNames(currentAdminClient.getRemoteStoreDefList(nodeId)
+ .getValue(),
+ true);
+ List<String> mirrorStoreList = StoreUtils.getStoreNames(mirrorAdminClient.getRemoteStoreDefList(nodeIdToMirrorFrom)
+ .getValue(),
+ true);
+ if(stores == null)
+ stores = currentStoreList;
+
+ if(!currentStoreList.containsAll(stores) || !mirrorStoreList.containsAll(stores)) {
+ logger.error("Make sure the set of stores match on both sides");
+ return;
+ }
+
+ // check if the partitions are same on both the nodes
+ if(!currentNode.getPartitionIds().equals(mirrorNode.getPartitionIds())) {
+ logger.error("Make sure the same set of partitions exist on both sides");
+ return;
+ }
+
+ ExecutorService executors = Executors.newFixedThreadPool(stores.size(),
+ new ThreadFactory() {
+
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setName("mirror-data-thread");
+ return thread;
+ }
+ });
+
+ final List<Integer> partitionIdList = mirrorNode.getPartitionIds();
+ final CountDownLatch waitLatch = new CountDownLatch(stores.size());
+ try {
+ for(final String storeName: stores)
+ executors.submit(new Runnable() {
+
+ public void run() {
+ try {
+ logger.info("Mirroring data for store " + storeName + " from node "
+ + nodeIdToMirrorFrom + "(" + urlToMirrorFrom + ") to node "
+ + nodeId + " partitions:" + partitionIdList);
+
+ Iterator<Pair<ByteArray, Versioned<byte[]>>> iterator = mirrorAdminClient.fetchEntries(nodeIdToMirrorFrom,
+ storeName,
+ partitionIdList,
+ null,
+ false);
+ currentAdminClient.updateEntries(nodeId, storeName, iterator, null);
+
+ logger.info("Mirroring data for store:" + storeName + " from node "
+ + nodeIdToMirrorFrom + " completed.");
+ } catch(Exception e) {
+ logger.error("Mirroring operation for store " + storeName
+ + "from node " + nodeIdToMirrorFrom + " failed.", e);
+ } finally {
+ waitLatch.countDown();
+ }
+ }
+ });
+ waitLatch.await();
+ } catch(Exception e) {
+ logger.error("Mirroring operation failed.", e);
+ } finally {
+ executors.shutdown();
+ logger.info("Finished mirroring data.");
+ }
+ }
}
View
12 src/java/voldemort/routing/ConsistentRoutingStrategy.java
@@ -150,6 +150,16 @@ else if(a != Integer.MIN_VALUE)
return replicationPartitionsList;
}
+ /**
+ * Obtain the master partition for a given key
+ *
+ * @param key
+ * @return
+ */
+ public Integer getMasterPartition(byte[] key) {
+ return abs(hash.hash(key)) % (Math.max(1, this.partitionToNode.length));
+ }
+
public Set<Node> getNodes() {
Set<Node> s = Sets.newHashSetWithExpectedSize(partitionToNode.length);
for(Node n: this.partitionToNode)
@@ -172,7 +182,7 @@ Node getNodeByPartition(int partition) {
public List<Integer> getPartitionList(byte[] key) {
// hash the key and perform a modulo on the total number of partitions,
// to get the master partition
- int index = abs(hash.hash(key)) % (Math.max(1, this.partitionToNode.length));
+ int index = getMasterPartition(key);
if(logger.isDebugEnabled()) {
logger.debug("Key " + ByteUtils.toHexString(key) + " primary partition " + index);
}
View
10 src/java/voldemort/routing/RouteToAllStrategy.java
@@ -57,6 +57,16 @@ public int getNumReplicas() {
throw new UnsupportedOperationException("Not yet implemented.");
}
+ /**
+ * Obtain the master partition for a given key
+ *
+ * @param key
+ * @return
+ */
+ public Integer getMasterPartition(byte[] key) {
+ throw new UnsupportedOperationException("Not yet implemented.");
+ }
+
public String getType() {
return RoutingStrategyType.TO_ALL_STRATEGY;
}
View
8 src/java/voldemort/routing/RoutingStrategy.java
@@ -55,6 +55,14 @@
public List<Integer> getPartitionList(byte[] key);
/**
+ * Obtain the master partition for a given key
+ *
+ * @param key The key being operated on
+ * @return The partition that owns the key
+ */
+ public Integer getMasterPartition(byte[] key);
+
+ /**
* Get the replication partitions list for the given partition.
*
* @param partitionId
View
192 src/java/voldemort/server/VoldemortConfig.java
@@ -65,7 +65,6 @@
private long bdbCacheSize;
private boolean bdbWriteTransactions;
private boolean bdbFlushTransactions;
- private boolean bdbSortedDuplicates;
private String bdbDataDirectory;
private long bdbMaxLogFileSize;
private int bdbBtreeFanout;
@@ -87,6 +86,10 @@
private long bdbStatsCacheTtlMs;
private boolean bdbExposeSpaceUtilization;
private long bdbMinimumSharedCache;
+ private boolean bdbCleanerLazyMigration;
+ private boolean bdbCacheModeEvictLN;
+ private boolean bdbMinimizeScanImpact;
+ private boolean bdbPrefixKeysWithPartitionId;
private String mysqlUsername;
private String mysqlPassword;
@@ -214,7 +217,6 @@ public VoldemortConfig(Props props) {
this.bdbBtreeFanout = props.getInt("bdb.btree.fanout", 512);
this.bdbCheckpointBytes = props.getLong("bdb.checkpoint.interval.bytes", 20 * 1024 * 1024);
this.bdbCheckpointMs = props.getLong("bdb.checkpoint.interval.ms", 30 * Time.MS_PER_SECOND);
- this.bdbSortedDuplicates = props.getBoolean("bdb.enable.sorted.duplicates", true);
this.bdbOneEnvPerStore = props.getBoolean("bdb.one.env.per.store", false);
this.bdbCleanerMinFileUtilization = props.getInt("bdb.cleaner.min.file.utilization", 5);
this.bdbCleanerMinUtilization = props.getInt("bdb.cleaner.minUtilization", 50);
@@ -231,6 +233,11 @@ public VoldemortConfig(Props props) {
this.bdbStatsCacheTtlMs = props.getLong("bdb.stats.cache.ttl.ms", 5 * Time.MS_PER_SECOND);
this.bdbExposeSpaceUtilization = props.getBoolean("bdb.expose.space.utilization", true);
this.bdbMinimumSharedCache = props.getLong("bdb.minimum.shared.cache", 0);
+ this.bdbCleanerLazyMigration = props.getBoolean("bdb.cleaner.lazy.migration", true);
+ this.bdbCacheModeEvictLN = props.getBoolean("bdb.cache.evictln", false);
+ this.bdbMinimizeScanImpact = props.getBoolean("bdb.minimize.scan.impact", false);
+ this.bdbPrefixKeysWithPartitionId = props.getBoolean("bdb.prefix.keys.with.partitionid",
+ true);
this.readOnlyBackups = props.getInt("readonly.backups", 1);
this.readOnlySearchStrategy = props.getString("readonly.search.strategy",
@@ -792,6 +799,130 @@ public void setBdbBtreeFanout(int bdbBtreeFanout) {
}
/**
+ * If true, Cleaner offloads some work to application threads, to keep up
+ * with the write rate.
+ *
+ * <ul>
+ * <li>property: "bdb.cleaner.lazy.migration"</li>
+ * <li>default : true</li>
+ * </ul>
+ *
+ * @return
+ */
+ public boolean getBdbCleanerLazyMigration() {
+ return bdbCleanerLazyMigration;
+ }
+
+ public final void setBdbCleanerLazyMigration(boolean bdbCleanerLazyMigration) {
+ this.bdbCleanerLazyMigration = bdbCleanerLazyMigration;
+ }
+
+ /**
+ * If true, BDB will not cache data in the JVM.
+ *
+ * <ul>
+ * <li>Property : "bdb.cache.evictln"</li>
+ * <li>Default : false</li>
+ * </ul>
+ *
+ * @return
+ */
+ public boolean getBdbCacheModeEvictLN() {
+ return bdbCacheModeEvictLN;
+ }
+
+ public void setBdbCacheModeEvictLN(boolean bdbCacheModeEvictLN) {
+ this.bdbCacheModeEvictLN = bdbCacheModeEvictLN;
+ }
+
+ /**
+ * If true, attempts are made to minimize impact to BDB cache during scan
+ * jobs
+ *
+ * <ul>
+ * <li>Property : "bdb.minimize.scan.impact"</li>
+ * <li>Default : false</li>
+ * </ul>
+ *
+ * @return
+ */
+ public boolean getBdbMinimizeScanImpact() {
+ return bdbMinimizeScanImpact;
+ }
+
+ public void setBdbMinimizeScanImpact(boolean bdbMinimizeScanImpact) {
+ this.bdbMinimizeScanImpact = bdbMinimizeScanImpact;
+ }
+
+ public boolean isBdbWriteTransactionsEnabled() {
+ return bdbWriteTransactions;
+ }
+
+ public void setBdbWriteTransactions(boolean bdbWriteTransactions) {
+ this.bdbWriteTransactions = bdbWriteTransactions;
+ }
+
+ public void setBdbOneEnvPerStore(boolean bdbOneEnvPerStore) {
+ this.bdbOneEnvPerStore = bdbOneEnvPerStore;
+ }
+
+ public boolean isBdbOneEnvPerStore() {
+ return bdbOneEnvPerStore;
+ }
+
+ /**
+ * If true, keys will be prefixed by the partition Id on disk. This can
+ * dramatically speed up rebalancing, restore operations, at the cost of 2
+ * bytes of extra storage per key
+ *
+ * <ul>
+ * <li>Property : "bdb.prefix.keys.with.partitionid"</li>
+ * <li>Default : true</li>
+ * </ul>
+ *
+ * @return
+ */
+ public boolean getBdbPrefixKeysWithPartitionId() {
+ return bdbPrefixKeysWithPartitionId;
+ }
+
+ public void setBdbPrefixKeysWithPartitionId(boolean bdbPrefixKeysWithPartitionId) {
+ this.bdbPrefixKeysWithPartitionId = bdbPrefixKeysWithPartitionId;
+ }
+
+ public long getBdbCheckpointBytes() {
+ return this.bdbCheckpointBytes;
+ }
+
+ public void setBdbCheckpointBytes(long bdbCheckpointBytes) {
+ this.bdbCheckpointBytes = bdbCheckpointBytes;
+ }
+
+ public long getBdbCheckpointMs() {
+ return this.bdbCheckpointMs;
+ }
+
+ public void setBdbCheckpointMs(long bdbCheckpointMs) {
+ this.bdbCheckpointMs = bdbCheckpointMs;
+ }
+
+ public long getBdbStatsCacheTtlMs() {
+ return this.bdbStatsCacheTtlMs;
+ }
+
+ public void setBdbStatsCacheTtlMs(long statsCacheTtlMs) {
+ this.bdbStatsCacheTtlMs = statsCacheTtlMs;
+ }
+
+ public long getBdbMinimumSharedCache() {
+ return this.bdbMinimumSharedCache;
+ }
+
+ public void setBdbMinimumSharedCache(long minimumSharedCache) {
+ this.bdbMinimumSharedCache = minimumSharedCache;
+ }
+
+ /**
* The comfortable number of threads the threadpool will attempt to
* maintain. Specified by "core.threads" default: max(1, floor(0.5 *
* max.threads))
@@ -1148,38 +1279,6 @@ public void setEnableMetadataChecking(boolean enableMetadataChecking) {
this.enableMetadataChecking = enableMetadataChecking;
}
- public long getBdbCheckpointBytes() {
- return this.bdbCheckpointBytes;
- }
-
- public void setBdbCheckpointBytes(long bdbCheckpointBytes) {
- this.bdbCheckpointBytes = bdbCheckpointBytes;
- }
-
- public long getBdbCheckpointMs() {
- return this.bdbCheckpointMs;
- }
-
- public void setBdbCheckpointMs(long bdbCheckpointMs) {
- this.bdbCheckpointMs = bdbCheckpointMs;
- }
-
- public long getBdbStatsCacheTtlMs() {
- return this.bdbStatsCacheTtlMs;
- }
-
- public void setBdbStatsCacheTtlMs(long statsCacheTtlMs) {
- this.bdbStatsCacheTtlMs = statsCacheTtlMs;
- }
-
- public long getBdbMinimumSharedCache() {
- return this.bdbMinimumSharedCache;
- }
-
- public void setBdbMinimumSharedCache(long minimumSharedCache) {
- this.bdbMinimumSharedCache = minimumSharedCache;
- }
-
public int getSchedulerThreads() {
return schedulerThreads;
}
@@ -1227,30 +1326,6 @@ public void setReadOnlyDeleteBackupMs(int readOnlyDeleteBackupTimeMs) {
this.readOnlyDeleteBackupTimeMs = readOnlyDeleteBackupTimeMs;
}
- public boolean isBdbWriteTransactionsEnabled() {
- return bdbWriteTransactions;
- }
-
- public void setBdbWriteTransactions(boolean bdbWriteTransactions) {
- this.bdbWriteTransactions = bdbWriteTransactions;
- }
-
- public boolean isBdbSortedDuplicatesEnabled() {
- return this.bdbSortedDuplicates;
- }
-
- public void setBdbSortedDuplicates(boolean enable) {
- this.bdbSortedDuplicates = enable;
- }
-
- public void setBdbOneEnvPerStore(boolean bdbOneEnvPerStore) {
- this.bdbOneEnvPerStore = bdbOneEnvPerStore;
- }
-
- public boolean isBdbOneEnvPerStore() {
- return bdbOneEnvPerStore;
- }
-
public int getSocketBufferSize() {
return socketBufferSize;
}
@@ -1546,5 +1621,4 @@ public boolean isEnableJmxClusterName() {
public void setEnableJmxClusterName(boolean enableJmxClusterName) {
this.enableJmxClusterName = enableJmxClusterName;
}
-
}
View
67 src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java
@@ -217,10 +217,10 @@ public StreamRequestHandler handleRequest(final DataInputStream inputStream,
ProtoUtils.writeMessage(outputStream, handleDeleteStore(request.getDeleteStore()));
break;
case FETCH_STORE:
- ProtoUtils.writeMessage(outputStream, handleFetchStore(request.getFetchStore()));
+ ProtoUtils.writeMessage(outputStream, handleFetchROStore(request.getFetchStore()));
break;
case SWAP_STORE:
- ProtoUtils.writeMessage(outputStream, handleSwapStore(request.getSwapStore()));
+ ProtoUtils.writeMessage(outputStream, handleSwapROStore(request.getSwapStore()));
break;
case ROLLBACK_STORE:
ProtoUtils.writeMessage(outputStream,
@@ -239,12 +239,12 @@ public StreamRequestHandler handleRequest(final DataInputStream inputStream,
handleGetROStorageFormat(request.getGetRoStorageFormat()));
break;
case FETCH_PARTITION_FILES:
- return handleFetchPartitionFiles(request.getFetchPartitionFiles());
+ return handleFetchROPartitionFiles(request.getFetchPartitionFiles());
case UPDATE_SLOP_ENTRIES:
return handleUpdateSlopEntries(request.getUpdateSlopEntries());
case FAILED_FETCH_STORE:
ProtoUtils.writeMessage(outputStream,
- handleFailedFetch(request.getFailedFetchStore()));
+ handleFailedROFetch(request.getFailedFetchStore()));
break;
case REBALANCE_STATE_CHANGE:
ProtoUtils.writeMessage(outputStream,
@@ -493,7 +493,7 @@ public StreamRequestHandler handleRequest(final DataInputStream inputStream,
return response.build();
}
- public VAdminProto.FailedFetchStoreResponse handleFailedFetch(VAdminProto.FailedFetchStoreRequest request) {
+ public VAdminProto.FailedFetchStoreResponse handleFailedROFetch(VAdminProto.FailedFetchStoreRequest request) {
final String storeDir = request.getStoreDir();
final String storeName = request.getStoreName();
VAdminProto.FailedFetchStoreResponse.Builder response = VAdminProto.FailedFetchStoreResponse.newBuilder();
@@ -528,7 +528,7 @@ public StreamRequestHandler handleRequest(final DataInputStream inputStream,
return response.build();
}
- public StreamRequestHandler handleFetchPartitionFiles(VAdminProto.FetchPartitionFilesRequest request) {
+ public StreamRequestHandler handleFetchROPartitionFiles(VAdminProto.FetchPartitionFilesRequest request) {
return new FetchPartitionFileStreamRequestHandler(request,
metadataStore,
voldemortConfig,
@@ -542,23 +542,44 @@ public StreamRequestHandler handleUpdateSlopEntries(VAdminProto.UpdateSlopEntrie
public StreamRequestHandler handleFetchPartitionEntries(VAdminProto.FetchPartitionEntriesRequest request) {
boolean fetchValues = request.hasFetchValues() && request.getFetchValues();
+ StorageEngine<ByteArray, byte[], byte[]> storageEngine = AdminServiceRequestHandler.getStorageEngine(storeRepository,
+ request.getStore());
if(fetchValues) {
- return new FetchEntriesStreamRequestHandler(request,
- metadataStore,
- errorCodeMapper,
- voldemortConfig,
- storeRepository,
- networkClassLoader,
- stats);
- } else
- return new FetchKeysStreamRequestHandler(request,
- metadataStore,
- errorCodeMapper,
- voldemortConfig,
- storeRepository,
- networkClassLoader,
- stats);
+ if(storageEngine.isPartitionScanSupported())
+ return new FetchPartitionEntriesStreamRequestHandler(request,
+ metadataStore,
+ errorCodeMapper,
+ voldemortConfig,
+ storeRepository,
+ networkClassLoader,
+ stats);
+ else
+ return new FetchEntriesStreamRequestHandler(request,
+ metadataStore,
+ errorCodeMapper,
+ voldemortConfig,
+ storeRepository,
+ networkClassLoader,
+ stats);
+ } else {
+ if(storageEngine.isPartitionScanSupported())
+ return new FetchPartitionKeysStreamRequestHandler(request,
+ metadataStore,
+ errorCodeMapper,
+ voldemortConfig,
+ storeRepository,
+ networkClassLoader,
+ stats);
+ else
+ return new FetchKeysStreamRequestHandler(request,
+ metadataStore,
+ errorCodeMapper,
+ voldemortConfig,
+ storeRepository,
+ networkClassLoader,
+ stats);
+ }
}
public StreamRequestHandler handleUpdatePartitionEntries(VAdminProto.UpdatePartitionEntriesRequest request) {
@@ -687,7 +708,7 @@ private String swapStore(String storeName, String directory) throws VoldemortExc
return currentDirPath;
}
- public VAdminProto.SwapStoreResponse handleSwapStore(VAdminProto.SwapStoreRequest request) {
+ public VAdminProto.SwapStoreResponse handleSwapROStore(VAdminProto.SwapStoreRequest request) {
final String dir = request.getStoreDir();
final String storeName = request.getStoreName();
VAdminProto.SwapStoreResponse.Builder response = VAdminProto.SwapStoreResponse.newBuilder();
@@ -713,7 +734,7 @@ private String swapStore(String storeName, String directory) throws VoldemortExc
}
}
- public VAdminProto.AsyncOperationStatusResponse handleFetchStore(VAdminProto.FetchStoreRequest request) {
+ public VAdminProto.AsyncOperationStatusResponse handleFetchROStore(VAdminProto.FetchStoreRequest request) {
final String fetchUrl = request.getStoreDir();
final String storeName = request.getStoreName();
View
12 src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java
@@ -15,6 +15,7 @@
import voldemort.store.stats.StreamStats;
import voldemort.store.stats.StreamStats.Operation;
import voldemort.utils.ByteArray;
+import voldemort.utils.ClosableIterator;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.RebalanceUtils;
import voldemort.versioning.Versioned;
@@ -31,6 +32,8 @@
public class FetchEntriesStreamRequestHandler extends FetchStreamRequestHandler {
+ protected final ClosableIterator<ByteArray> keyIterator;
+
public FetchEntriesStreamRequestHandler(FetchPartitionEntriesRequest request,
MetadataStore metadataStore,
ErrorCodeMapper errorCodeMapper,
@@ -46,6 +49,7 @@ public FetchEntriesStreamRequestHandler(FetchPartitionEntriesRequest request,
networkClassLoader,
stats,
Operation.FETCH_ENTRIES);
+ this.keyIterator = storageEngine.keys();
logger.info("Starting fetch entries for store '" + storageEngine.getName()
+ "' with replica to partition mapping " + replicaToPartitionList);
}
@@ -65,7 +69,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
initialCluster,
storeDef)
- && counter % skipRecords == 0) {
+ && counter % skipRecords == 0) {
List<Versioned<byte[]>> values = storageEngine.get(key, null);
stats.recordDiskTime(handle, System.nanoTime() - startNs);
for(Versioned<byte[]> value: values) {
@@ -114,4 +118,10 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
}
}
+ @Override
+ public final void close(DataOutputStream outputStream) throws IOException {
+ if(null != keyIterator)
+ keyIterator.close();
+ super.close(outputStream);
+ }
}
View
14 src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java
@@ -14,6 +14,7 @@
import voldemort.store.stats.StreamStats;
import voldemort.store.stats.StreamStats.Operation;
import voldemort.utils.ByteArray;
+import voldemort.utils.ClosableIterator;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.RebalanceUtils;
@@ -21,6 +22,8 @@
public class FetchKeysStreamRequestHandler extends FetchStreamRequestHandler {
+ protected final ClosableIterator<ByteArray> keyIterator;
+
public FetchKeysStreamRequestHandler(FetchPartitionEntriesRequest request,
MetadataStore metadataStore,
ErrorCodeMapper errorCodeMapper,
@@ -36,6 +39,7 @@ public FetchKeysStreamRequestHandler(FetchPartitionEntriesRequest request,
networkClassLoader,
stats,
Operation.FETCH_KEYS);
+ this.keyIterator = storageEngine.keys();
logger.info("Starting fetch keys for store '" + storageEngine.getName()
+ "' with replica to partition mapping " + replicaToPartitionList);
}
@@ -56,7 +60,8 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
replicaToPartitionList,
initialCluster,
storeDef)
- && filter.accept(key, null) && counter % skipRecords == 0) {
+ && filter.accept(key, null)
+ && counter % skipRecords == 0) {
VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
response.setKey(ProtoUtils.encodeBytes(key));
@@ -88,4 +93,11 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
return StreamRequestHandlerState.COMPLETE;
}
}
+
+ @Override
+ public final void close(DataOutputStream outputStream) throws IOException {
+ if(null != keyIterator)
+ keyIterator.close();
+ super.close(outputStream);
+ }
}
View
189 src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2008-2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.server.protocol.admin;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import voldemort.client.protocol.pb.ProtoUtils;
+import voldemort.client.protocol.pb.VAdminProto;
+import voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest;
+import voldemort.server.StoreRepository;
+import voldemort.server.VoldemortConfig;
+import voldemort.store.ErrorCodeMapper;
+import voldemort.store.metadata.MetadataStore;
+import voldemort.store.stats.StreamStats;
+import voldemort.store.stats.StreamStats.Operation;
+import voldemort.utils.ByteArray;
+import voldemort.utils.ClosableIterator;
+import voldemort.utils.NetworkClassLoader;
+import voldemort.utils.Pair;
+import voldemort.utils.RebalanceUtils;
+import voldemort.utils.Time;
+import voldemort.versioning.Versioned;
+
+import com.google.protobuf.Message;
+
+/**
+ * Fetches the entries using an efficient partition scan
+ *
+ */
+public class FetchPartitionEntriesStreamRequestHandler extends FetchStreamRequestHandler {
+
+ protected Set<Integer> fetchedPartitions;
+ protected ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> entriesPartitionIterator;
+ protected List<Integer> replicaTypeList;
+ protected List<Integer> partitionList;
+ protected Integer currentIndex;
+
+ public FetchPartitionEntriesStreamRequestHandler(FetchPartitionEntriesRequest request,
+ MetadataStore metadataStore,
+ ErrorCodeMapper errorCodeMapper,
+ VoldemortConfig voldemortConfig,
+ StoreRepository storeRepository,
+ NetworkClassLoader networkClassLoader,
+ StreamStats stats) {
+ super(request,
+ metadataStore,
+ errorCodeMapper,
+ voldemortConfig,
+ storeRepository,
+ networkClassLoader,
+ stats,
+ Operation.FETCH_ENTRIES);
+ logger.info("Starting fetch entries for store '" + storageEngine.getName()
+ + "' with replica to partition mapping " + replicaToPartitionList);
+ fetchedPartitions = new HashSet<Integer>();
+ replicaTypeList = new ArrayList<Integer>();
+ partitionList = new ArrayList<Integer>();
+ currentIndex = 0;
+ entriesPartitionIterator = null;
+
+ // flatten the replicatype to partition map
+ for(Integer replicaType: replicaToPartitionList.keySet()) {
+ if(replicaToPartitionList.get(replicaType) != null) {
+ for(Integer partitionId: replicaToPartitionList.get(replicaType)) {
+ partitionList.add(partitionId);
+ replicaTypeList.add(replicaType);
+ }
+ }
+ }
+ }
+
+ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
+ DataOutputStream outputStream)
+ throws IOException {
+
+ // process the next partition
+ if(entriesPartitionIterator == null) {
+ // we are finally done
+ if(currentIndex == partitionList.size()) {
+ stats.closeHandle(handle);
+ return StreamRequestHandlerState.COMPLETE;
+ }
+
+ boolean found = false;
+ // find the next partition to scan
+ while(!found && (currentIndex < partitionList.size())) {
+ Integer currentPartition = partitionList.get(currentIndex);
+ Integer currentReplicaType = replicaTypeList.get(currentIndex);
+
+ // Check the current node contains the partition as the
+ // requested replicatype
+ if(!fetchedPartitions.contains(currentPartition)
+ && RebalanceUtils.checkPartitionBelongsToNode(currentPartition,
+ currentReplicaType,
+ nodeId,
+ initialCluster,
+ storeDef)) {
+ fetchedPartitions.add(currentPartition);
+ found = true;
+ logger.info("Fetching [partition: " + currentPartition + ", replica type:"
+ + currentReplicaType + "] for store " + storageEngine.getName());
+ entriesPartitionIterator = storageEngine.entries(currentPartition);
+ }
+ currentIndex++;
+ }
+
+ } else {
+ long startNs = System.nanoTime();
+ // do a check before reading in case partition has 0 elements
+ if(entriesPartitionIterator.hasNext()) {
+ counter++;
+
+ // honor skipRecords
+ if(counter % skipRecords == 0) {
+ // do the filtering
+ Pair<ByteArray, Versioned<byte[]>> entry = entriesPartitionIterator.next();
+ stats.recordDiskTime(handle, System.nanoTime() - startNs);
+ ByteArray key = entry.getFirst();
+ Versioned<byte[]> value = entry.getSecond();
+
+ throttler.maybeThrottle(key.length());
+ if(filter.accept(key, value)) {
+ fetched++;
+ handle.incrementEntriesScanned();
+ VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
+
+ VAdminProto.PartitionEntry partitionEntry = VAdminProto.PartitionEntry.newBuilder()
+ .setKey(ProtoUtils.encodeBytes(key))
+ .setVersioned(ProtoUtils.encodeVersioned(value))
+ .build();
+ response.setPartitionEntry(partitionEntry);
+ Message message = response.build();
+
+ startNs = System.nanoTime();
+ ProtoUtils.writeMessage(outputStream, message);
+ stats.recordNetworkTime(handle, System.nanoTime() - startNs);
+ throttler.maybeThrottle(AdminServiceRequestHandler.valueSize(value));
+ }
+ } else {
+ stats.recordDiskTime(handle, System.nanoTime() - startNs);
+ }
+
+ // log progress
+ if(0 == counter % 100000) {
+ long totalTime = (System.currentTimeMillis() - startTime) / Time.MS_PER_SECOND;
+
+ logger.info("Fetch entries scanned " + counter + " entries, fetched " + fetched
+ + " entries for store '" + storageEngine.getName()
+ + "' replicaToPartitionList:" + replicaToPartitionList + " in "
+ + totalTime + " s");
+ }
+ }
+
+ // reset the iterator if done with this partition
+ if(!entriesPartitionIterator.hasNext()) {
+ entriesPartitionIterator.close();
+ entriesPartitionIterator = null;
+ }
+ }
+ return StreamRequestHandlerState.WRITING;
+ }
+
+ @Override
+ public final void close(DataOutputStream outputStream) throws IOException {
+ if(null != entriesPartitionIterator)
+ entriesPartitionIterator.close();
+ super.close(outputStream);
+ }
+}
View
179 src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2008-2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.server.protocol.admin;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import voldemort.client.protocol.pb.ProtoUtils;
+import voldemort.client.protocol.pb.VAdminProto;
+import voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest;
+import voldemort.server.StoreRepository;
+import voldemort.server.VoldemortConfig;
+import voldemort.store.ErrorCodeMapper;
+import voldemort.store.metadata.MetadataStore;
+import voldemort.store.stats.StreamStats;
+import voldemort.store.stats.StreamStats.Operation;
+import voldemort.utils.ByteArray;
+import voldemort.utils.ClosableIterator;
+import voldemort.utils.NetworkClassLoader;
+import voldemort.utils.RebalanceUtils;
+import voldemort.utils.Time;
+
+import com.google.protobuf.Message;
+
+/**
+ * Fetches the keys using an efficient partition scan
+ *
+ */
+public class FetchPartitionKeysStreamRequestHandler extends FetchStreamRequestHandler {
+
+ protected ClosableIterator<ByteArray> keysPartitionIterator;
+ protected Set<Integer> fetchedPartitions;
+ protected List<Integer> replicaTypeList;
+ protected List<Integer> partitionList;
+ protected Integer currentIndex;
+
+ public FetchPartitionKeysStreamRequestHandler(FetchPartitionEntriesRequest request,
+ MetadataStore metadataStore,
+ ErrorCodeMapper errorCodeMapper,
+ VoldemortConfig voldemortConfig,
+ StoreRepository storeRepository,
+ NetworkClassLoader networkClassLoader,
+ StreamStats stats) {
+ super(request,
+ metadataStore,
+ errorCodeMapper,
+ voldemortConfig,
+ storeRepository,
+ networkClassLoader,
+ stats,
+ Operation.FETCH_KEYS);
+ logger.info("Starting fetch keys for store '" + storageEngine.getName()
+ + "' with replica to partition mapping " + replicaToPartitionList);
+ fetchedPartitions = new HashSet<Integer>();
+ replicaTypeList = new ArrayList<Integer>();
+ partitionList = new ArrayList<Integer>();
+ currentIndex = 0;
+ keysPartitionIterator = null;
+
+ // flatten the replicatype to partition map
+ for(Integer replicaType: replicaToPartitionList.keySet()) {
+ if(replicaToPartitionList.get(replicaType) != null) {
+ for(Integer partitionId: replicaToPartitionList.get(replicaType)) {
+ partitionList.add(partitionId);
+ replicaTypeList.add(replicaType);
+ }
+ }
+ }
+ }
+
+ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
+ DataOutputStream outputStream)
+ throws IOException {
+
+ // process the next partition
+ if(keysPartitionIterator == null) {
+ if(currentIndex == partitionList.size()) {
+ // we are finally done
+ stats.closeHandle(handle);
+ return StreamRequestHandlerState.COMPLETE;
+ }
+
+ boolean found = false;
+ // find the next partition to scan
+ while(!found && (currentIndex < partitionList.size())) {
+ Integer currentPartition = partitionList.get(currentIndex);
+ Integer currentReplicaType = replicaTypeList.get(currentIndex);
+
+ // Check the current node contains the partition as the
+ // requested replicatype
+ if(!fetchedPartitions.contains(currentPartition)
+ && RebalanceUtils.checkPartitionBelongsToNode(currentPartition,
+ currentReplicaType,
+ nodeId,
+ initialCluster,
+ storeDef)) {
+ fetchedPartitions.add(currentPartition);
+ found = true;
+ logger.info("Fetching [partition: " + currentPartition + ", replica type:"
+ + currentReplicaType + "] for store " + storageEngine.getName());
+ keysPartitionIterator = storageEngine.keys(currentPartition);
+ }
+ currentIndex++;
+ }
+ } else {
+ long startNs = System.nanoTime();
+ // do a check before reading in case partition has 0 elements
+ if(keysPartitionIterator.hasNext()) {
+ counter++;
+
+ // honor skipRecords
+ if(counter % skipRecords == 0) {
+ // do the filtering
+ ByteArray key = keysPartitionIterator.next();
+ stats.recordDiskTime(handle, System.nanoTime() - startNs);
+ throttler.maybeThrottle(key.length());
+ if(filter.accept(key, null)) {
+
+ VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
+ response.setKey(ProtoUtils.encodeBytes(key));
+
+ fetched++;
+ handle.incrementEntriesScanned();
+ Message message = response.build();
+
+ startNs = System.nanoTime();
+ ProtoUtils.writeMessage(outputStream, message);
+ stats.recordNetworkTime(handle, System.nanoTime() - startNs);
+ }
+ } else {
+ stats.recordDiskTime(handle, System.nanoTime() - startNs);
+ }
+
+ // log progress
+ if(0 == counter % 100000) {
+ long totalTime = (System.currentTimeMillis() - startTime) / Time.MS_PER_SECOND;
+
+ logger.info("Fetch entries scanned " + counter + " entries, fetched " + fetched
+ + " entries for store '" + storageEngine.getName()
+ + "' replicaToPartitionList:" + replicaToPartitionList + " in "
+ + totalTime + " s");
+ }
+ }
+
+ // reset the iterator if done with this partition
+ if(!keysPartitionIterator.hasNext()) {
+ keysPartitionIterator.close();
+ keysPartitionIterator = null;
+ }
+ }
+ return StreamRequestHandlerState.WRITING;
+ }
+
+ @Override
+ public final void close(DataOutputStream outputStream) throws IOException {
+ if(null != keysPartitionIterator)
+ keysPartitionIterator.close();
+ super.close(outputStream);
+ }
+}
View
9 src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java
@@ -24,7 +24,6 @@
import voldemort.store.stats.StreamStats;
import voldemort.store.stats.StreamStats.Handle;
import voldemort.utils.ByteArray;
-import voldemort.utils.ClosableIterator;
import voldemort.utils.EventThrottler;
import voldemort.utils.NetworkClassLoader;
import voldemort.xml.ClusterMapper;
@@ -45,8 +44,6 @@
protected final StorageEngine<ByteArray, byte[], byte[]> storageEngine;
- protected final ClosableIterator<ByteArray> keyIterator;
-
protected long counter;
protected long skipRecords;
@@ -95,7 +92,6 @@ protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest req
} else {
this.filter = new DefaultVoldemortFilter();
}
- this.keyIterator = storageEngine.keys();
this.startTime = System.currentTimeMillis();
this.counter = 0;
@@ -109,14 +105,11 @@ public final StreamRequestDirection getDirection() {
return StreamRequestDirection.WRITING;
}
- public final void close(DataOutputStream outputStream) throws IOException {
+ public void close(DataOutputStream outputStream) throws IOException {
logger.info("Successfully scanned " + counter + " tuples, fetched " + fetched
+ " tuples for store '" + storageEngine.getName() + "' in "
+ ((System.currentTimeMillis() - startTime) / 1000) + " s");
- if(null != keyIterator)
- keyIterator.close();
-
ProtoUtils.writeEndOfStream(outputStream);
}
View
3  src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java
@@ -356,7 +356,8 @@ public boolean isComplete() {
writeThrottler.maybeThrottle(writtenLast);
writtenLast = slopSize(head);
- deleteBatch.add(Pair.create(head.getValue().makeKey(), head.getVersion()));
+ deleteBatch.add(Pair.create(head.getValue().makeKey(),
+ (Version) head.getVersion()));
return head;
}
}
View
17 src/java/voldemort/server/storage/StorageService.java
@@ -50,6 +50,7 @@
import voldemort.cluster.failuredetector.ServerStoreVerifier;
import voldemort.routing.RoutingStrategy;
import voldemort.routing.RoutingStrategyFactory;
+import voldemort.routing.RoutingStrategyType;
import voldemort.server.AbstractService;
import voldemort.server.RequestRoutingType;
import voldemort.server.ServiceType;
@@ -222,7 +223,7 @@ protected void startInner() {
null,
null,
null,
- null,
+ RoutingStrategyType.CONSISTENT_STRATEGY,
0,
null,
0,
@@ -241,7 +242,9 @@ protected void startInner() {
null,
null,
0);
- SlopStorageEngine slopEngine = new SlopStorageEngine(config.getStore(slopStoreDefinition),
+ SlopStorageEngine slopEngine = new SlopStorageEngine(config.getStore(slopStoreDefinition,
+ new RoutingStrategyFactory().updateRoutingStrategy(slopStoreDefinition,
+ metadata.getCluster())),
metadata.getCluster());
registerEngine(slopEngine, false, "slop");
storeRepository.setSlopStore(slopEngine);
@@ -330,13 +333,11 @@ public void openStore(StoreDefinition storeDef) {
+ " storage engine has not been enabled.");
boolean isReadOnly = storeDef.getType().compareTo(ReadOnlyStorageConfiguration.TYPE_NAME) == 0;
- if(isReadOnly) {
- final RoutingStrategy routingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef,
- metadata.getCluster());
- ((ReadOnlyStorageConfiguration) config).setRoutingStrategy(routingStrategy);
- }
+ final RoutingStrategy routingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef,
+ metadata.getCluster());