Skip to content
Browse files

Code refactor. StoreClient interface resumed to original. Added a Vie…

…wStoreClient and ViewUpdateAction interface for views
  • Loading branch information...
1 parent 03e0e44 commit 11fe6251ff619ee5bc2837117a5067bebd967f7d @nehanarkhede nehanarkhede committed with afeinberg Jun 14, 2010
Showing with 1,060 additions and 305 deletions.
  1. +3 −3 contrib/ec2-testing/test/voldemort/utils/Ec2FailureDetectorTest.java
  2. +4 −4 contrib/mongodb/example/MongoDBClient.java
  3. +1 −1 example/java/voldemort/examples/ClientExample.java
  4. +49 −14 src/java/voldemort/VoldemortClientShell.java
  5. +14 −4 src/java/voldemort/client/AbstractStoreClientFactory.java
  6. +9 −79 src/java/voldemort/client/DefaultStoreClient.java
  7. +330 −0 src/java/voldemort/client/DefaultViewStoreClient.java
  8. +12 −3 src/java/voldemort/client/MockStoreClientFactory.java
  9. +3 −38 src/java/voldemort/client/StoreClient.java
  10. +33 −5 src/java/voldemort/client/StoreClientFactory.java
  11. +2 −2 src/java/voldemort/client/UpdateAction.java
  12. +206 −0 src/java/voldemort/client/ViewStoreClient.java
  13. +44 −0 src/java/voldemort/client/ViewUpdateAction.java
  14. +1 −1 src/java/voldemort/server/http/gui/QueryServlet.java
  15. +16 −3 test/common/voldemort/StaticStoreClientFactory.java
  16. +2 −2 test/integration/voldemort/TestClientShutdown.java
  17. +2 −2 test/integration/voldemort/performance/AbstractLoadTestHarness.java
  18. +2 −3 test/integration/voldemort/performance/LocalDirectLoadTest.java
  19. +2 −3 test/integration/voldemort/performance/LocalRoutedStoreLoadTest.java
  20. +1 −1 test/integration/voldemort/performance/RemoteDataGenerator.java
  21. +1 −1 test/integration/voldemort/performance/RemoteHttpStoreLoadTest.java
  22. +26 −18 test/integration/voldemort/performance/RemoteTest.java
  23. +1 −1 test/integration/voldemort/performance/SemiLocalHttpStoreLoadTest.java
  24. +34 −12 test/integration/voldemort/performance/benchmark/Benchmark.java
  25. +22 −0 test/integration/voldemort/performance/benchmark/DbWrapper.java
  26. +192 −0 test/integration/voldemort/performance/benchmark/VoldemortViewWrapper.java
  27. +19 −73 test/integration/voldemort/performance/benchmark/VoldemortWrapper.java
  28. +8 −8 test/integration/voldemort/performance/benchmark/Workload.java
  29. +2 −2 test/unit/voldemort/client/AbstractStoreClientFactoryTest.java
  30. +1 −1 test/unit/voldemort/client/AdminServiceBasicTest.java
  31. +1 −1 test/unit/voldemort/client/DefaultStoreClientTest.java
  32. +8 −8 test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java
  33. +1 −1 test/unit/voldemort/server/EndToEndTest.java
  34. +1 −1 test/unit/voldemort/store/compress/CompressingStoreTest.java
  35. +7 −10 test/unit/voldemort/store/views/ViewTransformsTest.java
View
6 contrib/ec2-testing/test/voldemort/utils/Ec2FailureDetectorTest.java
@@ -70,7 +70,7 @@
private static Map<String, Integer> nodeIds;
private FailureDetector failureDetector;
- private StoreClient<String, String, String> store;
+ private StoreClient<String, String> store;
private static final Logger logger = Logger.getLogger(Ec2FailureDetectorTest.class);
@@ -233,11 +233,11 @@ public void run() {
assertEquals(hostNamePairs.size(), failureDetector.getAvailableNodeCount());
}
- private void test(StoreClient<String, String, String> store) {
+ private void test(StoreClient<String, String> store) {
test(store, 1000);
}
- private void test(StoreClient<String, String, String> store, int tests) {
+ private void test(StoreClient<String, String> store, int tests) {
for(int i = 0; i < tests; i++) {
try {
store.get("test_" + i);
View
8 contrib/mongodb/example/MongoDBClient.java
@@ -38,7 +38,7 @@ public MongoDBClient(String bootstrapUrl) {
public long multiWriteLarge(int count, String keyRoot) {
- StoreClient<String, Doc, String> client = factory.getStoreClient("test");
+ StoreClient<String, Doc> client = factory.getStoreClient("test");
long start = System.currentTimeMillis();
@@ -58,7 +58,7 @@ public long multiWriteLarge(int count, String keyRoot) {
public long multiWrite(int count, String keyRoot) {
- StoreClient<String, Doc, String> client = factory.getStoreClient("test");
+ StoreClient<String, Doc> client = factory.getStoreClient("test");
long start = System.currentTimeMillis();
@@ -78,7 +78,7 @@ public long multiWrite(int count, String keyRoot) {
public long multiRead(int count, String keyRoot) {
- StoreClient<String, Doc, String> client = factory.getStoreClient("test");
+ StoreClient<String, Doc> client = factory.getStoreClient("test");
long start = System.currentTimeMillis();
@@ -116,7 +116,7 @@ public String getRandomKey(int n) {
public void simple() {
- StoreClient<String, Doc, String> client = factory.getStoreClient("test");
+ StoreClient<String, Doc> client = factory.getStoreClient("test");
Versioned<Doc> v = client.get("key");
View
2 example/java/voldemort/examples/ClientExample.java
@@ -30,7 +30,7 @@ public static void main(String[] args) {
String bootstrapUrl = "tcp://localhost:6666";
StoreClientFactory factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(bootstrapUrl));
- StoreClient<String, String, String> client = factory.getStoreClient("my_store_name");
+ StoreClient<String, String> client = factory.getStoreClient("my_store_name");
// get the value
Versioned<String> version = client.get("some_key");
View
63 src/java/voldemort/VoldemortClientShell.java
@@ -30,6 +30,7 @@
import voldemort.client.ClientConfig;
import voldemort.client.DefaultStoreClient;
+import voldemort.client.DefaultViewStoreClient;
import voldemort.client.SocketStoreClientFactory;
import voldemort.client.StoreClientFactory;
import voldemort.client.protocol.RequestFormatType;
@@ -50,20 +51,31 @@
private static final String PROMPT = "> ";
- private static DefaultStoreClient<Object, Object, Object> client;
+ private static DefaultStoreClient<Object, Object> client;
+ private static DefaultViewStoreClient<Object, Object, Object> viewClient;
+ private static boolean isView = false;
public static void main(String[] args) throws Exception {
if(args.length < 2 || args.length > 3)
- Utils.croak("USAGE: java VoldemortClientShell store_name bootstrap_url [command_file]");
+ Utils.croak("USAGE: java VoldemortClientShell store_name bootstrap_url is_view [command_file]");
String storeName = args[0];
String bootstrapUrl = args[1];
+
+ if(args.length > 2) {
+ String viewString = args[2];
+ if(!(viewString.equalsIgnoreCase("true") || viewString.equalsIgnoreCase("false")))
+ Utils.croak("is_view should be either true or false");
+ if(viewString.equalsIgnoreCase("true"))
+ isView = true;
+ }
+
String commandsFileName = "";
BufferedReader fileReader = null;
BufferedReader inputReader = null;
try {
- if(args.length == 3) {
- commandsFileName = args[2];
+ if(args.length == 4) {
+ commandsFileName = args[3];
fileReader = new BufferedReader(new FileReader(commandsFileName));
}
@@ -77,7 +89,10 @@ public static void main(String[] args) throws Exception {
StoreClientFactory factory = new SocketStoreClientFactory(clientConfig);
try {
- client = (DefaultStoreClient<Object, Object, Object>) factory.getStoreClient(storeName);
+ if(!isView)
+ client = (DefaultStoreClient<Object, Object>) factory.getStoreClient(storeName);
+ else
+ viewClient = (DefaultViewStoreClient<Object, Object, Object>) factory.getViewStoreClient(storeName);
} catch(Exception e) {
Utils.croak("Could not connect to server: " + e.getMessage());
}
@@ -104,10 +119,14 @@ private static void processCommands(StoreClientFactory factory,
JsonReader jsonReader = new JsonReader(new StringReader(line.substring("put".length())));
Object key = tightenNumericTypes(jsonReader.read());
Object value = tightenNumericTypes(jsonReader.read());
- if(jsonReader.hasMore())
- client.put(key, value, tightenNumericTypes(jsonReader.read()));
- else
+ if(isView) {
+ if(jsonReader.hasMore())
+ viewClient.put(key, value, tightenNumericTypes(jsonReader.read()));
+ else
+ viewClient.put(key, value);
+ } else {
client.put(key, value);
+ }
} else if(line.toLowerCase().startsWith("getall")) {
JsonReader jsonReader = new JsonReader(new StringReader(line.substring("getall".length())));
List<Object> keys = new ArrayList<Object>();
@@ -117,7 +136,11 @@ private static void processCommands(StoreClientFactory factory,
} catch(EndOfFileException e) {
// this is okay, just means we are done reading
}
- Map<Object, Versioned<Object>> vals = client.getAll(keys);
+ Map<Object, Versioned<Object>> vals;
+ if(isView)
+ vals = viewClient.getAll(keys);
+ else
+ vals = client.getAll(keys);
if(vals.size() > 0) {
for(Map.Entry<Object, Versioned<Object>> entry: vals.entrySet()) {
System.out.print(entry.getKey());
@@ -130,17 +153,29 @@ private static void processCommands(StoreClientFactory factory,
} else if(line.toLowerCase().startsWith("get")) {
JsonReader jsonReader = new JsonReader(new StringReader(line.substring("get".length())));
Object key = tightenNumericTypes(jsonReader.read());
- if(jsonReader.hasMore())
- printVersioned(client.get(key, tightenNumericTypes(jsonReader.read())));
- else
+ if(isView) {
+ if(jsonReader.hasMore())
+ printVersioned(viewClient.get(key,
+ tightenNumericTypes(jsonReader.read())));
+ else
+ printVersioned(viewClient.get(key));
+ } else {
printVersioned(client.get(key));
+ }
} else if(line.toLowerCase().startsWith("delete")) {
JsonReader jsonReader = new JsonReader(new StringReader(line.substring("delete".length())));
- client.delete(tightenNumericTypes(jsonReader.read()));
+ if(isView)
+ viewClient.delete(tightenNumericTypes(jsonReader.read()));
+ else
+ client.delete(tightenNumericTypes(jsonReader.read()));
} else if(line.startsWith("preflist")) {
JsonReader jsonReader = new JsonReader(new StringReader(line.substring("preflist".length())));
Object key = tightenNumericTypes(jsonReader.read());
- printNodeList(client.getResponsibleNodes(key), factory.getFailureDetector());
+ if(isView)
+ printNodeList(viewClient.getResponsibleNodes(key),
+ factory.getFailureDetector());
+ else
+ printNodeList(client.getResponsibleNodes(key), factory.getFailureDetector());
} else if(line.startsWith("help")) {
System.out.println("Commands:");
System.out.println("put key value -- Associate the given value with the key.");
View
18 src/java/voldemort/client/AbstractStoreClientFactory.java
@@ -116,14 +116,24 @@ public AbstractStoreClientFactory(ClientConfig config) {
}
}
- public <K, V, T> StoreClient<K, V, T> getStoreClient(String storeName) {
+ public <K, V> StoreClient<K, V> getStoreClient(String storeName) {
return getStoreClient(storeName, null);
}
- public <K, V, T> StoreClient<K, V, T> getStoreClient(String storeName,
- InconsistencyResolver<Versioned<V>> resolver) {
+ public <K, V, T> ViewStoreClient<K, V, T> getViewStoreClient(String storeName) {
+ return getViewStoreClient(storeName, null);
+ }
+
+ public <K, V> StoreClient<K, V> getStoreClient(String storeName,
+ InconsistencyResolver<Versioned<V>> resolver) {
+
+ return new DefaultStoreClient<K, V>(storeName, resolver, this, 3);
+ }
+
+ public <K, V, T> ViewStoreClient<K, V, T> getViewStoreClient(String storeName,
+ InconsistencyResolver<Versioned<V>> resolver) {
- return new DefaultStoreClient<K, V, T>(storeName, resolver, this, 3);
+ return new DefaultViewStoreClient<K, V, T>(storeName, resolver, this, 3);
}
@SuppressWarnings("unchecked")
View
88 src/java/voldemort/client/DefaultStoreClient.java
@@ -53,15 +53,15 @@
*/
@Threadsafe
@JmxManaged(description = "A voldemort client")
-public class DefaultStoreClient<K, V, T> implements StoreClient<K, V, T> {
+public class DefaultStoreClient<K, V> implements StoreClient<K, V> {
private final Logger logger = Logger.getLogger(DefaultStoreClient.class);
private final StoreClientFactory storeFactory;
private final int metadataRefreshAttempts;
private final String storeName;
private final InconsistencyResolver<Versioned<V>> resolver;
- private volatile Store<K, V, T> store;
+ private volatile Store<K, V, Object> store;
public DefaultStoreClient(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
@@ -100,15 +100,15 @@ public boolean delete(K key, Version version) {
}
public V getValue(K key, V defaultValue) {
- Versioned<V> versioned = getWithDefaultValue(key, null, null);
+ Versioned<V> versioned = get(key);
if(versioned == null)
return defaultValue;
else
return versioned.getValue();
}
public V getValue(K key) {
- Versioned<V> returned = getWithDefaultValue(key, null, null);
+ Versioned<V> returned = get(key, null);
if(returned == null)
return null;
else
@@ -128,19 +128,6 @@ public V getValue(K key) {
+ " metadata refresh attempts failed.");
}
- private Versioned<V> getWithDefaultValue(K key, Versioned<V> defaultValue, T transforms) {
- for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
- try {
- List<Versioned<V>> items = store.get(key, transforms);
- return getItemOrThrow(key, defaultValue, items);
- } catch(InvalidMetadataException e) {
- bootStrap();
- }
- }
- throw new VoldemortException(this.metadataRefreshAttempts
- + " metadata refresh attempts failed.");
- }
-
private List<Version> getVersions(K key) {
for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
try {
@@ -163,34 +150,8 @@ else if(items.size() == 1)
+ ") = " + items, items);
}
- public Versioned<V> get(K key, T transforms) {
- return getWithDefaultValue(key, null, transforms);
- }
-
public Versioned<V> get(K key) {
- return getWithDefaultValue(key, null, null);
- }
-
- public Map<K, Versioned<V>> getAll(Iterable<K> keys, Map<K, T> transforms) {
- Map<K, List<Versioned<V>>> items = null;
- for(int attempts = 0;; attempts++) {
- if(attempts >= this.metadataRefreshAttempts)
- throw new VoldemortException(this.metadataRefreshAttempts
- + " metadata refresh attempts failed.");
- try {
- items = store.getAll(keys, transforms);
- break;
- } catch(InvalidMetadataException e) {
- bootStrap();
- }
- }
- Map<K, Versioned<V>> result = Maps.newHashMapWithExpectedSize(items.size());
-
- for(Entry<K, List<Versioned<V>>> mapEntry: items.entrySet()) {
- Versioned<V> value = getItemOrThrow(mapEntry.getKey(), null, mapEntry.getValue());
- result.put(mapEntry.getKey(), value);
- }
- return result;
+ return get(key, null);
}
public Map<K, Versioned<V>> getAll(Iterable<K> keys) {
@@ -215,23 +176,6 @@ else if(items.size() == 1)
return result;
}
- public void put(K key, V value, T transforms) {
- List<Version> versions = getVersions(key);
- Versioned<V> versioned;
- if(versions.isEmpty())
- versioned = Versioned.value(value, new VectorClock());
- else if(versions.size() == 1)
- versioned = Versioned.value(value, versions.get(0));
- else {
- versioned = get(key, (Versioned<V>) null);
- if(versioned == null)
- versioned = Versioned.value(value, new VectorClock());
- else
- versioned.setObject(value);
- }
- put(key, versioned, transforms);
- }
-
public void put(K key, V value) {
List<Version> versions = getVersions(key);
Versioned<V> versioned;
@@ -240,13 +184,13 @@ public void put(K key, V value) {
else if(versions.size() == 1)
versioned = Versioned.value(value, versions.get(0));
else {
- versioned = get(key, (T) null);
+ versioned = get(key, null);
if(versioned == null)
versioned = Versioned.value(value, new VectorClock());
else
versioned.setObject(value);
}
- put(key, versioned, null);
+ put(key, versioned);
}
public boolean putIfNotObsolete(K key, Versioned<V> versioned) {
@@ -258,21 +202,7 @@ public boolean putIfNotObsolete(K key, Versioned<V> versioned) {
}
}
- public void put(K key, Versioned<V> versioned, T transforms) throws ObsoleteVersionException {
- for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
- try {
- store.put(key, versioned, transforms);
- return;
- } catch(InvalidMetadataException e) {
- bootStrap();
- }
- }
- throw new VoldemortException(this.metadataRefreshAttempts
- + " metadata refresh attempts failed.");
- }
-
public void put(K key, Versioned<V> versioned) throws ObsoleteVersionException {
-
for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
try {
store.put(key, versioned, null);
@@ -285,11 +215,11 @@ public void put(K key, Versioned<V> versioned) throws ObsoleteVersionException {
+ " metadata refresh attempts failed.");
}
- public boolean applyUpdate(UpdateAction<K, V, T> action) {
+ public boolean applyUpdate(UpdateAction<K, V> action) {
return applyUpdate(action, 3);
}
- public boolean applyUpdate(UpdateAction<K, V, T> action, int maxTries) {
+ public boolean applyUpdate(UpdateAction<K, V> action, int maxTries) {
boolean success = false;
try {
for(int i = 0; i < maxTries; i++) {
View
330 src/java/voldemort/client/DefaultViewStoreClient.java
@@ -0,0 +1,330 @@
+/*
+ * Copyright 2008-2009 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.client;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.log4j.Logger;
+
+import voldemort.VoldemortException;
+import voldemort.annotations.concurrency.Threadsafe;
+import voldemort.annotations.jmx.JmxOperation;
+import voldemort.cluster.Node;
+import voldemort.routing.RoutingStrategy;
+import voldemort.serialization.Serializer;
+import voldemort.store.InvalidMetadataException;
+import voldemort.store.Store;
+import voldemort.store.StoreCapabilityType;
+import voldemort.utils.Utils;
+import voldemort.versioning.InconsistencyResolver;
+import voldemort.versioning.InconsistentDataException;
+import voldemort.versioning.ObsoleteVersionException;
+import voldemort.versioning.VectorClock;
+import voldemort.versioning.Version;
+import voldemort.versioning.Versioned;
+
+import com.google.common.collect.Maps;
+
+/**
+ * The default {@link voldemort.client.StoreClient StoreClient} implementation
+ * you get back from a {@link voldemort.client.StoreClientFactory
+ * StoreClientFactory}
+ *
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ */
+@Threadsafe
+public class DefaultViewStoreClient<K, V, T> implements ViewStoreClient<K, V, T> {
+
+ private final Logger logger = Logger.getLogger(DefaultStoreClient.class);
+ private final StoreClientFactory storeFactory;
+
+ private final int metadataRefreshAttempts;
+ private final String storeName;
+ private final InconsistencyResolver<Versioned<V>> resolver;
+ private volatile Store<K, V, T> store;
+
+ public DefaultViewStoreClient(String storeName,
+ InconsistencyResolver<Versioned<V>> resolver,
+ StoreClientFactory storeFactory,
+ int maxMetadataRefreshAttempts) {
+ this.storeName = Utils.notNull(storeName);
+ this.resolver = resolver;
+ this.storeFactory = Utils.notNull(storeFactory);
+ this.metadataRefreshAttempts = maxMetadataRefreshAttempts;
+ bootStrap();
+ }
+
+ @JmxOperation(description = "bootstrap metadata from the cluster.")
+ public void bootStrap() {
+ logger.info("bootstrapping metadata.");
+ this.store = storeFactory.getRawStore(storeName, resolver);
+ }
+
+ public boolean delete(K key) {
+ Versioned<V> versioned = get(key);
+ if(versioned == null)
+ return false;
+ return delete(key, versioned.getVersion());
+ }
+
+ public boolean delete(K key, Version version) {
+ for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
+ try {
+ return store.delete(key, version);
+ } catch(InvalidMetadataException e) {
+ bootStrap();
+ }
+ }
+ throw new VoldemortException(this.metadataRefreshAttempts
+ + " metadata refresh attempts failed.");
+ }
+
+ public V getValue(K key, V defaultValue) {
+ Versioned<V> versioned = getWithDefaultValue(key, null, null);
+ if(versioned == null)
+ return defaultValue;
+ else
+ return versioned.getValue();
+ }
+
+ public V getValue(K key) {
+ Versioned<V> returned = getWithDefaultValue(key, null, null);
+ if(returned == null)
+ return null;
+ else
+ return returned.getValue();
+ }
+
+ public Versioned<V> get(K key, Versioned<V> defaultValue) {
+ for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
+ try {
+ List<Versioned<V>> items = store.get(key, null);
+ return getItemOrThrow(key, defaultValue, items);
+ } catch(InvalidMetadataException e) {
+ bootStrap();
+ }
+ }
+ throw new VoldemortException(this.metadataRefreshAttempts
+ + " metadata refresh attempts failed.");
+ }
+
+ private Versioned<V> getWithDefaultValue(K key, Versioned<V> defaultValue, T transforms) {
+ for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
+ try {
+ List<Versioned<V>> items = store.get(key, transforms);
+ return getItemOrThrow(key, defaultValue, items);
+ } catch(InvalidMetadataException e) {
+ bootStrap();
+ }
+ }
+ throw new VoldemortException(this.metadataRefreshAttempts
+ + " metadata refresh attempts failed.");
+ }
+
+ private List<Version> getVersions(K key) {
+ for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
+ try {
+ return store.getVersions(key);
+ } catch(InvalidMetadataException e) {
+ bootStrap();
+ }
+ }
+ throw new VoldemortException(this.metadataRefreshAttempts
+ + " metadata refresh attempts failed.");
+ }
+
+ private Versioned<V> getItemOrThrow(K key, Versioned<V> defaultValue, List<Versioned<V>> items) {
+ if(items.size() == 0)
+ return defaultValue;
+ else if(items.size() == 1)
+ return items.get(0);
+ else
+ throw new InconsistentDataException("Unresolved versions returned from get(" + key
+ + ") = " + items, items);
+ }
+
+ public Versioned<V> get(K key, T transforms) {
+ return getWithDefaultValue(key, null, transforms);
+ }
+
+ public Versioned<V> get(K key) {
+ return getWithDefaultValue(key, null, null);
+ }
+
+ public Map<K, Versioned<V>> getAll(Iterable<K> keys, Map<K, T> transforms) {
+ Map<K, List<Versioned<V>>> items = null;
+ for(int attempts = 0;; attempts++) {
+ if(attempts >= this.metadataRefreshAttempts)
+ throw new VoldemortException(this.metadataRefreshAttempts
+ + " metadata refresh attempts failed.");
+ try {
+ items = store.getAll(keys, transforms);
+ break;
+ } catch(InvalidMetadataException e) {
+ bootStrap();
+ }
+ }
+ Map<K, Versioned<V>> result = Maps.newHashMapWithExpectedSize(items.size());
+
+ for(Entry<K, List<Versioned<V>>> mapEntry: items.entrySet()) {
+ Versioned<V> value = getItemOrThrow(mapEntry.getKey(), null, mapEntry.getValue());
+ result.put(mapEntry.getKey(), value);
+ }
+ return result;
+ }
+
+ public Map<K, Versioned<V>> getAll(Iterable<K> keys) {
+ Map<K, List<Versioned<V>>> items = null;
+ for(int attempts = 0;; attempts++) {
+ if(attempts >= this.metadataRefreshAttempts)
+ throw new VoldemortException(this.metadataRefreshAttempts
+ + " metadata refresh attempts failed.");
+ try {
+ items = store.getAll(keys, null);
+ break;
+ } catch(InvalidMetadataException e) {
+ bootStrap();
+ }
+ }
+ Map<K, Versioned<V>> result = Maps.newHashMapWithExpectedSize(items.size());
+
+ for(Entry<K, List<Versioned<V>>> mapEntry: items.entrySet()) {
+ Versioned<V> value = getItemOrThrow(mapEntry.getKey(), null, mapEntry.getValue());
+ result.put(mapEntry.getKey(), value);
+ }
+ return result;
+ }
+
+ public void put(K key, V value, T transforms) {
+ List<Version> versions = getVersions(key);
+ Versioned<V> versioned;
+ if(versions.isEmpty())
+ versioned = Versioned.value(value, new VectorClock());
+ else if(versions.size() == 1)
+ versioned = Versioned.value(value, versions.get(0));
+ else {
+ versioned = get(key, (Versioned<V>) null);
+ if(versioned == null)
+ versioned = Versioned.value(value, new VectorClock());
+ else
+ versioned.setObject(value);
+ }
+ put(key, versioned, transforms);
+ }
+
+ public void put(K key, V value) {
+ List<Version> versions = getVersions(key);
+ Versioned<V> versioned;
+ if(versions.isEmpty())
+ versioned = Versioned.value(value, new VectorClock());
+ else if(versions.size() == 1)
+ versioned = Versioned.value(value, versions.get(0));
+ else {
+ versioned = get(key, (T) null);
+ if(versioned == null)
+ versioned = Versioned.value(value, new VectorClock());
+ else
+ versioned.setObject(value);
+ }
+ put(key, versioned, null);
+ }
+
+ public boolean putIfNotObsolete(K key, Versioned<V> versioned) {
+ try {
+ put(key, versioned);
+ return true;
+ } catch(ObsoleteVersionException e) {
+ return false;
+ }
+ }
+
+ public void put(K key, Versioned<V> versioned, T transforms) throws ObsoleteVersionException {
+ for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
+ try {
+ store.put(key, versioned, transforms);
+ return;
+ } catch(InvalidMetadataException e) {
+ bootStrap();
+ }
+ }
+ throw new VoldemortException(this.metadataRefreshAttempts
+ + " metadata refresh attempts failed.");
+ }
+
+ public void put(K key, Versioned<V> versioned) throws ObsoleteVersionException {
+
+ for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
+ try {
+ store.put(key, versioned, null);
+ return;
+ } catch(InvalidMetadataException e) {
+ bootStrap();
+ }
+ }
+ throw new VoldemortException(this.metadataRefreshAttempts
+ + " metadata refresh attempts failed.");
+ }
+
+ public boolean applyUpdate(ViewUpdateAction<K, V, T> action) {
+ return applyUpdate(action, 3);
+ }
+
+ public boolean applyUpdate(ViewUpdateAction<K, V, T> action, int maxTries) {
+ boolean success = false;
+ try {
+ for(int i = 0; i < maxTries; i++) {
+ try {
+ action.update(this);
+ success = true;
+ return success;
+ } catch(ObsoleteVersionException e) {
+ // ignore for now
+ }
+ }
+ } finally {
+ if(!success)
+ action.rollback();
+ }
+
+ // if we got here we have seen too many ObsoleteVersionExceptions
+ // and have rolled back the updates
+ return false;
+ }
+
+ public List<Node> getResponsibleNodes(K key) {
+ RoutingStrategy strategy = (RoutingStrategy) store.getCapability(StoreCapabilityType.ROUTING_STRATEGY);
+ @SuppressWarnings("unchecked")
+ Serializer<K> keySerializer = (Serializer<K>) store.getCapability(StoreCapabilityType.KEY_SERIALIZER);
+ return strategy.routeRequest(keySerializer.toBytes(key));
+ }
+
+ @SuppressWarnings("unused")
+ private Version getVersion(K key) {
+ List<Version> versions = getVersions(key);
+ if(versions.size() == 0)
+ return null;
+ else if(versions.size() == 1)
+ return versions.get(0);
+ else
+ throw new InconsistentDataException("Unresolved versions returned from get(" + key
+ + ") = " + versions, versions);
+ }
+}
View
15 src/java/voldemort/client/MockStoreClientFactory.java
@@ -95,15 +95,24 @@ public MockStoreClientFactory(Serializer<?> keySerializer,
failureDetector = new NoopFailureDetector();
}
- public <K, V, T> StoreClient<K, V, T> getStoreClient(String storeName) {
+ public <K, V> StoreClient<K, V> getStoreClient(String storeName) {
return getStoreClient(storeName, new TimeBasedInconsistencyResolver<V>());
}
- public <K, V, T> StoreClient<K, V, T> getStoreClient(String storeName,
- InconsistencyResolver<Versioned<V>> resolver) {
+ public <K, V> StoreClient<K, V> getStoreClient(String storeName,
+ InconsistencyResolver<Versioned<V>> resolver) {
return new DefaultStoreClient(storeName, resolver, this, 3);
}
+ public <K, V, T> ViewStoreClient<K, V, T> getViewStoreClient(String storeName) {
+ return getViewStoreClient(storeName, new TimeBasedInconsistencyResolver<V>());
+ }
+
+ public <K, V, T> ViewStoreClient<K, V, T> getViewStoreClient(String storeName,
+ InconsistencyResolver<Versioned<V>> resolver) {
+ return new DefaultViewStoreClient(storeName, resolver, this, 3);
+ }
+
public <K1, V1, T1> Store<K1, V1, T1> getRawStore(String storeName,
InconsistencyResolver<Versioned<V1>> resolver) {
if(this.storesXml != null)
View
41 src/java/voldemort/client/StoreClient.java
@@ -34,7 +34,7 @@
* @param <V> The type of the value being stored
*/
@Threadsafe
-public interface StoreClient<K, V, T> {
+public interface StoreClient<K, V> {
/**
* Get the value associated with the given key or null if there is no value
@@ -69,19 +69,6 @@
public Versioned<V> get(K key);
/**
- * Get the versioned value associated with the given key and apply the given
- * transforms to it before returning the value. Returns null if no value is
- * associated with the key
- *
- * @param key the key for which the value is fetched
- * @param transforms the transforms to be applied on the value fetched from
- * the store
- * @return the transformed versioned value, or null if no value is stored
- * for this key
- */
- public Versioned<V> get(K key, T transforms);
-
- /**
* Gets the versioned values associated with the given keys and returns them
* in a Map of keys to versioned values. Note that the returned map will
* only contain entries for the keys which have a value associated with
@@ -93,18 +80,6 @@
public Map<K, Versioned<V>> getAll(Iterable<K> keys);
/**
- * Like {@link voldemort.client.StoreClient#getAll(Iterable) getAll}, except
- * that the transforms are applied on the value associated with each key
- * before returning the results
- *
- * @param keys the keys for which the values are fetched
- * @param transforms the map of transforms, describing the transform to be
- * applied to the value for each key
- * @return A map of keys to transformed versioned values
- */
- public Map<K, Versioned<V>> getAll(Iterable<K> keys, Map<K, T> transforms);
-
- /**
* Get the versioned value associated with the given key or the defaultValue
* if no value is associated with the key.
*
@@ -124,16 +99,6 @@
public void put(K key, V value);
/**
- * Like {@link voldemort.store.StoreClient#put(K, V) put}, except that the
- * given transforms are applied on the value before writing it to the store
- *
- * @param key the key
- * @param value the value
- * @param transforms the transforms to be applied on the value
- */
- public void put(K key, V value, T transforms);
-
- /**
* Put the given Versioned value into the store for the given key if the
* version is greater to or concurrent with existing values. Throw an
* ObsoleteVersionException otherwise.
@@ -165,7 +130,7 @@
* @return true if the action is successfully applied, false if the 3
* attempts all result in ObsoleteVersionException
*/
- public boolean applyUpdate(UpdateAction<K, V, T> action);
+ public boolean applyUpdate(UpdateAction<K, V> action);
/**
* Apply the given action repeatedly until no ObsoleteVersionException is
@@ -176,7 +141,7 @@
* @return true if the action is successfully applied, false if maxTries
* failed attempts have been made
*/
- public boolean applyUpdate(UpdateAction<K, V, T> action, int maxTries);
+ public boolean applyUpdate(UpdateAction<K, V> action, int maxTries);
/**
* Delete any version of the given key which equal to or less than the
View
38 src/java/voldemort/client/StoreClientFactory.java
@@ -41,12 +41,24 @@
* {@link voldemort.client.StoreClient}
* @param <V> The type of the value of the
* {@link voldemort.client.StoreClient}
+ * @param storeName The name of the store
+ * @return A fully constructed {@link voldemort.client.StoreClient}
+ */
+ public <K, V> StoreClient<K, V> getStoreClient(String storeName);
+
+ /**
+ * Get a {@link voldemort.client.ViewStoreClient} for the given store.
+ *
+ * @param <K> The type of the key of the
+ * {@link voldemort.client.StoreClient}
+ * @param <V> The type of the value of the
+ * {@link voldemort.client.StoreClient}
* @param <T> The type of the transform of the
* {@link voldemort.client.StoreClient}
* @param storeName The name of the store
* @return A fully constructed {@link voldemort.client.StoreClient}
*/
- public <K, V, T> StoreClient<K, V, T> getStoreClient(String storeName);
+ public <K, V, T> ViewStoreClient<K, V, T> getViewStoreClient(String storeName);
/**
* Get a {@link voldemort.client.StoreClient} for the given store.
@@ -55,16 +67,32 @@
* {@link voldemort.client.StoreClient}
* @param <V> The type of the value of the
* {@link voldemort.client.StoreClient}
- * @param <T> The type of the transform of the
- * {@link voldemort.client.StoreClient}
* @param storeName The name of the store
* @param inconsistencyResolver The
* {@link voldemort.versioning.InconsistencyResolver} that should be
* used to resolve inconsistencies.
* @return A fully constructed {@link voldemort.client.StoreClient}
*/
- public <K, V, T> StoreClient<K, V, T> getStoreClient(String storeName,
- InconsistencyResolver<Versioned<V>> inconsistencyResolver);
+ public <K, V> StoreClient<K, V> getStoreClient(String storeName,
+ InconsistencyResolver<Versioned<V>> inconsistencyResolver);
+
+ /**
+ * Get a {@link voldemort.client.ViewStoreClient} for the given store.
+ *
+ * @param <K> The type of the key of the
+ * {@link voldemort.client.ViewStoreClient}
+ * @param <V> The type of the value of the
+ * {@link voldemort.client.ViewStoreClient}
+ * @param <T> The type of the transform of the
+ * {@link voldemort.client.ViewStoreClient}
+ * @param storeName The name of the store
+ * @param inconsistencyResolver The
+ * {@link voldemort.versioning.InconsistencyResolver} that should be
+ * used to resolve inconsistencies.
+ * @return A fully constructed {@link voldemort.client.ViewStoreClient}
+ */
+ public <K, V, T> ViewStoreClient<K, V, T> getViewStoreClient(String storeName,
+ InconsistencyResolver<Versioned<V>> inconsistencyResolver);
/**
* Get the underlying store, not the public StoreClient interface
View
4 src/java/voldemort/client/UpdateAction.java
@@ -25,14 +25,14 @@
*
*
*/
-public abstract class UpdateAction<K, V, T> {
+public abstract class UpdateAction<K, V> {
/**
* Apply the update operation to the given store client.
*
* @param storeClient The store client to use
*/
- public abstract void update(StoreClient<K, V, T> storeClient);
+ public abstract void update(StoreClient<K, V> storeClient);
/**
* A hook for the user to override with any rollback actions they want
View
206 src/java/voldemort/client/ViewStoreClient.java
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2008-2009 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.client;
+
+import java.util.List;
+import java.util.Map;
+
+import voldemort.annotations.concurrency.Threadsafe;
+import voldemort.cluster.Node;
+import voldemort.versioning.ObsoleteVersionException;
+import voldemort.versioning.Version;
+import voldemort.versioning.Versioned;
+
+/**
+ * The user-facing interface to a Voldemort store. Gives basic put/get/delete
+ * plus helper functions.
+ *
+ *
+ * @param <K> The type of the key being stored
+ * @param <V> The type of the value being stored
+ */
+@Threadsafe
+public interface ViewStoreClient<K, V, T> {
+
+ /**
+ * Get the value associated with the given key or null if there is no value
+ * associated with this key. This method strips off all version information
+ * and is only useful when no further storage operations will be done on
+ * this key.
+ *
+ * @param key The key
+ */
+ public V getValue(K key);
+
+ /**
+ * Get the value associated with the given key or defaultValue if there is
+ * no value associated with the key. This method strips off all version
+ * information and is only useful when no further storage operations will be
+ * done on this key.
+ *
+ * @param key The key for which to fetch the associated value
+ * @param defaultValue A value to return if there is no value associated
+ * with this key
+ * @return Either the value stored for the key or the default value.
+ */
+ public V getValue(K key, V defaultValue);
+
+ /**
+ * Get the versioned value associated with the given key or null if no value
+ * is associated with the key.
+ *
+ * @param key The key for which to fetch the value.
+ * @return The versioned value, or null if no value is stored for this key.
+ */
+ public Versioned<V> get(K key);
+
+ /**
+ * Get the versioned value associated with the given key and apply the given
+ * transforms to it before returning the value. Returns null if no value is
+ * associated with the key
+ *
+ * @param key the key for which the value is fetched
+ * @param transforms the transforms to be applied on the value fetched from
+ * the store
+ * @return the transformed versioned value, or null if no value is stored
+ * for this key
+ */
+ public Versioned<V> get(K key, T transforms);
+
+ /**
+ * Gets the versioned values associated with the given keys and returns them
+ * in a Map of keys to versioned values. Note that the returned map will
+ * only contain entries for the keys which have a value associated with
+ * them.
+ *
+ * @param keys The keys for which to fetch the values.
+ * @return A Map of keys to versioned values.
+ */
+ public Map<K, Versioned<V>> getAll(Iterable<K> keys);
+
+ /**
+ * Like {@link voldemort.client.StoreClient#getAll(Iterable) getAll}, except
+ * that the transforms are applied on the value associated with each key
+ * before returning the results
+ *
+ * @param keys the keys for which the values are fetched
+ * @param transforms the map of transforms, describing the transform to be
+ * applied to the value for each key
+ * @return A map of keys to transformed versioned values
+ */
+ public Map<K, Versioned<V>> getAll(Iterable<K> keys, Map<K, T> transforms);
+
+ /**
+ * Get the versioned value associated with the given key or the defaultValue
+ * if no value is associated with the key.
+ *
+ * @param key The key for which to fetch the value.
+ * @return The versioned value, or the defaultValue if no value is stored
+ * for this key.
+ */
+ public Versioned<V> get(K key, Versioned<V> defaultValue);
+
+ /**
+ * Associated the given value to the key, clobbering any existing values
+ * stored for the key.
+ *
+ * @param key The key
+ * @param value The value
+ */
+ public void put(K key, V value);
+
+ /**
+ * Like {@link voldemort.store.StoreClient#put(K, V) put}, except that the
+ * given transforms are applied on the value before writing it to the store
+ *
+ * @param key the key
+ * @param value the value
+ * @param transforms the transforms to be applied on the value
+ */
+ public void put(K key, V value, T transforms);
+
+ /**
+ * Put the given Versioned value into the store for the given key if the
+ * version is greater to or concurrent with existing values. Throw an
+ * ObsoleteVersionException otherwise.
+ *
+ * @param key The key
+ * @param versioned The value and its versioned
+ * @throws ObsoleteVersionException
+ */
+ public void put(K key, Versioned<V> versioned) throws ObsoleteVersionException;
+
+ /**
+ * Put the versioned value to the key, ignoring any ObsoleteVersionException
+ * that may be thrown
+ *
+ * @param key The key
+ * @param versioned The versioned value
+ * @return true if the put succeeded
+ */
+ public boolean putIfNotObsolete(K key, Versioned<V> versioned);
+
+ /**
+ * Apply the given action repeatedly until no ObsoleteVersionException is
+ * thrown. This is useful for implementing a read-modify-store loop that
+ * could be pre-empted by another concurrent update, and should be repeated
+ * until it succeeds.
+ *
+ * @param action The action to apply. This is meant as a callback for the
+ * user to extend to provide their own logic.
+ * @return true if the action is successfully applied, false if the 3
+ * attempts all result in ObsoleteVersionException
+ */
+ public boolean applyUpdate(ViewUpdateAction<K, V, T> action);
+
+ /**
+ * Apply the given action repeatedly until no ObsoleteVersionException is
+ * thrown or maxTries unsuccessful attempts have been made. This is useful
+ * for implementing a read-modify-store loop.
+ *
+ * @param action The action to apply
+ * @return true if the action is successfully applied, false if maxTries
+ * failed attempts have been made
+ */
+ public boolean applyUpdate(ViewUpdateAction<K, V, T> action, int maxTries);
+
+ /**
+ * Delete any version of the given key which equal to or less than the
+ * current versions
+ *
+ * @param key The key
+ * @return true if anything is deleted
+ */
+ public boolean delete(K key);
+
+ /**
+ * Delete the specified version and any prior versions of the given key
+ *
+ * @param key The key to delete
+ * @param version The version of the key
+ * @return true if anything is deleted
+ */
+ public boolean delete(K key, Version version);
+
+ /**
+ * Returns the list of nodes which should have this key.
+ *
+ * @param key
+ * @return a list of Nodes which should hold this key
+ */
+ public List<Node> getResponsibleNodes(K key);
+}
View
44 src/java/voldemort/client/ViewUpdateAction.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2008-2009 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.client;
+
+/**
+ * An update action such as a read-modify-store cycle. This is meant to be used
+ * as a callback interface to perform a data modification that might involve
+ * obsolete data. The operation will be repeated until it succeeds.
+ *
+ * The update action must be idempotent since it may be called multiple times.
+ *
+ *
+ */
+public abstract class ViewUpdateAction<K, V, T> {
+
+ /**
+ * Apply the update operation to the given view store client.
+ *
+ * @param storeClient The view store client to use
+ */
+ public abstract void update(ViewStoreClient<K, V, T> storeClient);
+
+ /**
+ * A hook for the user to override with any rollback actions they want
+ * performed when the update fails (say due to an exception or due to too
+ * many ObsoleteVersionExceptions).
+ */
+ public void rollback() {}
+
+}
View
2 src/java/voldemort/server/http/gui/QueryServlet.java
@@ -84,7 +84,7 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp)
Object keyObj = parseKey(keyType, key);
Map<String, Object> params = new HashMap<String, Object>();
- StoreClient<?, ?, ?> client = clientFactory.getStoreClient(storeName);
+ StoreClient<?, ?> client = clientFactory.getStoreClient(storeName);
engine.render("query.vm", params, resp.getOutputStream());
}
View
19 test/common/voldemort/StaticStoreClientFactory.java
@@ -5,8 +5,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import voldemort.client.DefaultStoreClient;
+import voldemort.client.DefaultViewStoreClient;
import voldemort.client.StoreClient;
import voldemort.client.StoreClientFactory;
+import voldemort.client.ViewStoreClient;
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.cluster.failuredetector.NoopFailureDetector;
import voldemort.store.Store;
@@ -47,16 +49,27 @@ public StaticStoreClientFactory(Store<?, ?, ?>... stores) {
}
@SuppressWarnings("unchecked")
- public <K, V, T> StoreClient<K, V, T> getStoreClient(String storeName) {
+ public <K, V> StoreClient<K, V> getStoreClient(String storeName) {
return new DefaultStoreClient(storeName, null, this, 3);
}
@SuppressWarnings("unchecked")
- public <K, V, T> StoreClient<K, V, T> getStoreClient(String storeName,
- InconsistencyResolver<Versioned<V>> resolver) {
+ public <K, V> StoreClient<K, V> getStoreClient(String storeName,
+ InconsistencyResolver<Versioned<V>> resolver) {
return new DefaultStoreClient(storeName, resolver, this, 3);
}
+ @SuppressWarnings("unchecked")
+ public <K, V, T> ViewStoreClient<K, V, T> getViewStoreClient(String storeName) {
+ return new DefaultViewStoreClient(storeName, null, this, 3);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <K, V, T> ViewStoreClient<K, V, T> getViewStoreClient(String storeName,
+ InconsistencyResolver<Versioned<V>> resolver) {
+ return new DefaultViewStoreClient(storeName, resolver, this, 3);
+ }
+
public void close() {
for(Store<?, ?, ?> store: stores)
store.close();
View
4 test/integration/voldemort/TestClientShutdown.java
@@ -38,9 +38,9 @@ public static void main(String[] args) throws Exception {
String bootstrapUrl = args[1];
StoreClientFactory factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(bootstrapUrl));
- DefaultStoreClient<Object, Object, Object> client = null;
+ DefaultStoreClient<Object, Object> client = null;
try {
- client = (DefaultStoreClient<Object, Object, Object>) factory.getStoreClient(storeName);
+ client = (DefaultStoreClient<Object, Object>) factory.getStoreClient(storeName);
} catch(Exception e) {
Utils.croak("Could not connect to server: " + e.getMessage());
}
View
4 test/integration/voldemort/performance/AbstractLoadTestHarness.java
@@ -26,7 +26,7 @@
public abstract class AbstractLoadTestHarness {
- public abstract StoreClient<String, String, String> getStore(Props propsA, Props propsB)
+ public abstract StoreClient<String, String> getStore(Props propsA, Props propsB)
throws Exception;
public void run(String[] args) throws Exception {
@@ -40,7 +40,7 @@ public void run(String[] args) throws Exception {
Props propertiesB = new Props(new File(args[3]));
System.out.println("Creating client: ");
- final StoreClient<String, String, String> client = getStore(propertiesA, propertiesB);
+ final StoreClient<String, String> client = getStore(propertiesA, propertiesB);
PerformanceTest writeTest = new PerformanceTest() {
View
5 test/integration/voldemort/performance/LocalDirectLoadTest.java
@@ -35,8 +35,7 @@
public class LocalDirectLoadTest extends AbstractLoadTestHarness {
@Override
- public StoreClient<String, String, String> getStore(Props propsA, Props propsB)
- throws Exception {
+ public StoreClient<String, String> getStore(Props propsA, Props propsB) throws Exception {
StorageConfiguration conf = new BdbStorageConfiguration(new VoldemortConfig(propsA));
Store<String, String, String> store = SerializingStore.wrap(conf.getStore("test" + 0),
new StringSerializer(),
@@ -46,7 +45,7 @@
Store<String, String, String> resolvingStore = new InconsistencyResolvingStore<String, String, String>(store,
resolver);
StoreClientFactory factory = new StaticStoreClientFactory(resolvingStore);
- return new DefaultStoreClient<String, String, String>(store.getName(), resolver, factory, 1);
+ return new DefaultStoreClient<String, String>(store.getName(), resolver, factory, 1);
}
public static void main(String[] args) throws Exception {
View
5 test/integration/voldemort/performance/LocalRoutedStoreLoadTest.java
@@ -53,8 +53,7 @@
public class LocalRoutedStoreLoadTest extends AbstractLoadTestHarness {
@Override
- public StoreClient<String, String, String> getStore(Props propsA, Props propsB)
- throws Exception {
+ public StoreClient<String, String> getStore(Props propsA, Props propsB) throws Exception {
Cluster cluster = new ClusterMapper().readCluster(new FileReader(propsA.getString("metadata.directory")
+ File.separator
+ "/cluster.xml"));
@@ -97,7 +96,7 @@
Store<String, String, String> resolvingStore = new InconsistencyResolvingStore<String, String, String>(serializingStore,
resolver);
StoreClientFactory factory = new StaticStoreClientFactory(resolvingStore);
- return new DefaultStoreClient<String, String, String>(store.getName(), resolver, factory, 1);
+ return new DefaultStoreClient<String, String>(store.getName(), resolver, factory, 1);
}
public static void main(String[] args) throws Exception {
View
2 test/integration/voldemort/performance/RemoteDataGenerator.java
@@ -63,7 +63,7 @@ public static void printUsage(PrintStream out, OptionParser parser) throws IOExc
public void generateData(int requests, int keySize, int valueSize, String postfix) {
StoreClientFactory storeClientFactory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(url)
.setMaxThreads(workers));
- StoreClient<String, String, String> client = storeClientFactory.getStoreClient(storeName);
+ StoreClient<String, String> client = storeClientFactory.getStoreClient(storeName);
for(int i = 0; i < requests; i++) {
StringBuilder keyBuilder = new StringBuilder(makeString(keySize)).append(i);
View
2 test/integration/voldemort/performance/RemoteHttpStoreLoadTest.java
@@ -26,7 +26,7 @@
public class RemoteHttpStoreLoadTest extends AbstractLoadTestHarness {
@Override
- public StoreClient<String, String, String> getStore(Props propsA, Props propsB)
+ public StoreClient<String, String> getStore(Props propsA, Props propsB)
throws java.lang.Exception {
System.out.println("Initializing master server.");
VoldemortServer serverA = new VoldemortServer(new VoldemortConfig(propsA));
View
44 test/integration/voldemort/performance/RemoteTest.java
@@ -16,7 +16,15 @@
package voldemort.performance;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -205,8 +213,8 @@ public static void main(String[] args) throws Exception {
parser.accepts("v", "verbose");
parser.accepts("ignore-nulls", "ignore null values");
parser.accepts("save-nulls", "save keys which had null to a file")
- .withRequiredArg()
- .ofType(String.class);
+ .withRequiredArg()
+ .ofType(String.class);
parser.accepts("node", "go to this node id").withRequiredArg().ofType(Integer.class);
parser.accepts("interval", "print requests on this interval, -1 to disable")
.withRequiredArg()
@@ -270,7 +278,7 @@ public static void main(String[] args) throws Exception {
}
final BufferedWriter nullWriter;
- if (options.has("save-nulls")) {
+ if(options.has("save-nulls")) {
nullWriter = new BufferedWriter(new FileWriter((String) options.valueOf("save-nulls")));
} else {
nullWriter = null;
@@ -309,7 +317,7 @@ public static void main(String[] args) throws Exception {
.setFailureDetectorRequestLengthThreshold(TimeUnit.SECONDS.toMillis(60))
.setSocketBufferSize(4 * 1024);
SocketStoreClientFactory factory = new SocketStoreClientFactory(clientConfig);
- final StoreClient<Object, Object, Object> store = factory.getStoreClient(storeName);
+ final StoreClient<Object, Object> store = factory.getStoreClient(storeName);
StoreDefinition storeDef = getStoreDefinition(factory, storeName);
Class<?> keyType = findKeyType(storeDef);
@@ -392,17 +400,17 @@ public void run() {
public void run() {
try {
final Object key = keyProvider1.next();
- store.applyUpdate(new UpdateAction<Object, Object, Object>() {
+ store.applyUpdate(new UpdateAction<Object, Object>() {
- public void update(StoreClient<Object, Object, Object> storeClient) {
+ @Override
+ public void update(StoreClient<Object, Object> storeClient) {
long startNs = System.nanoTime();
storeClient.put(key, value);
requestTimes[j] = (System.nanoTime() - startNs)
/ Time.NS_PER_MS;
numWrites.incrementAndGet();
}
- },
- 64);
+ }, 64);
} catch(Exception e) {
if(verbose) {
e.printStackTrace();
@@ -453,7 +461,7 @@ public void run() {
if(!ignoreNulls) {
throw new Exception("value returned is null for key " + key);
}
- if (nullWriter != null) {
+ if(nullWriter != null) {
nullWriter.write(key.toString() + "\n");
}
}
@@ -504,27 +512,27 @@ public void run() {
try {
final Object key = keyProvider.next();
- store.applyUpdate(new UpdateAction<Object, Object, Object>() {
+ store.applyUpdate(new UpdateAction<Object, Object>() {
- public void update(StoreClient<Object, Object, Object> storeClient) {
+ @Override
+ public void update(StoreClient<Object, Object> storeClient) {
Versioned<Object> v = store.get(key);
numReads.incrementAndGet();
if(v != null) {
storeClient.put(key, v);
} else {
numNulls.incrementAndGet();
- if (nullWriter != null) {
+ if(nullWriter != null) {
try {
nullWriter.write(key.toString() + "\n");
- } catch (IOException e) {
+ } catch(IOException e) {
e.printStackTrace();
}
}
}
numWrites.incrementAndGet();
}
- },
- 64);
+ }, 64);
} catch(Exception e) {
if(verbose) {
e.printStackTrace();
@@ -547,10 +555,10 @@ public void update(StoreClient<Object, Object, Object> storeClient) {
printStatistics("writes", numWrites.get(), start);
}
- if (nullWriter != null) {
+ if(nullWriter != null) {
nullWriter.close();
}
-
+
System.exit(0);
}
View
2 test/integration/voldemort/performance/SemiLocalHttpStoreLoadTest.java
@@ -24,7 +24,7 @@
public class SemiLocalHttpStoreLoadTest extends AbstractLoadTestHarness {
@Override
- public StoreClient<String, String, String> getStore(Props propsA, Props propsB)
+ public StoreClient<String, String> getStore(Props propsA, Props propsB)
throws java.lang.Exception {
System.out.println("Initializing master server.");
VoldemortServer serverA = new VoldemortServer(new VoldemortConfig(propsA));
View
46 test/integration/voldemort/performance/benchmark/Benchmark.java
@@ -34,6 +34,7 @@
import voldemort.client.SocketStoreClientFactory;
import voldemort.client.StoreClient;
import voldemort.client.StoreClientFactory;
+import voldemort.client.ViewStoreClient;
import voldemort.serialization.IdentitySerializer;
import voldemort.serialization.Serializer;
import voldemort.serialization.SerializerDefinition;
@@ -104,7 +105,8 @@
public static final String VIEW_CLASS = "voldemort.store.views.UpperCaseView";
public static final String HAS_TRANSFORMS = "true";
- private StoreClient<Object, Object, Object> storeClient;
+ private StoreClient<Object, Object> storeClient;
+ private ViewStoreClient<Object, Object, Object> viewStoreClient;
private StoreClientFactory factory;
private int numThreads;
@@ -119,6 +121,7 @@
private boolean verifyRead = false;
private boolean ignoreNulls = false;
private String keyType;
+ private boolean isView = false;
class StatusThread extends Thread {
@@ -162,15 +165,15 @@ public void run() {
class ClientThread extends Thread {
- private VoldemortWrapper db;
+ private DbWrapper db;
private boolean runBenchmark;
private boolean verbose;
private Workload workLoad;
private int opsCount;
private double targetThroughputPerMs;
private int opsDone;
- public ClientThread(VoldemortWrapper db,
+ public ClientThread(DbWrapper db,
boolean runBenchmark,
Workload workLoad,
int opsCount,
@@ -325,6 +328,10 @@ public void initializeStore(Props benchmarkProps) throws Exception {
String socketUrl = benchmarkProps.getString(URL);
String storeName = benchmarkProps.getString(STORE_NAME);
+ String storeType = benchmarkProps.getString(STORE_TYPE);
+ boolean isView = false;
+ if(storeType.compareTo("view") == 0)
+ isView = true;
ClientConfig clientConfig = new ClientConfig().setMaxThreads(numThreads)
.setMaxTotalConnections(numThreads)
@@ -337,7 +344,10 @@ public void initializeStore(Props benchmarkProps) throws Exception {
.setSocketBufferSize(4 * 1024);
SocketStoreClientFactory socketFactory = new SocketStoreClientFactory(clientConfig);
- this.storeClient = socketFactory.getStoreClient(storeName);
+ if(isView)
+ this.viewStoreClient = socketFactory.getViewStoreClient(storeName);
+ else
+ this.storeClient = socketFactory.getStoreClient(storeName);
StoreDefinition storeDef = getStoreDefinition(socketFactory, storeName);
this.keyType = findKeyType(storeDef);
benchmarkProps.put(Benchmark.KEY_TYPE, this.keyType);
@@ -346,9 +356,15 @@ public void initializeStore(Props benchmarkProps) throws Exception {
// Send the store a value and then delete it
if(benchmarkProps.getBoolean(HANDSHAKE, false)) {
final Object key = getTempKey(this.keyType);
- this.storeClient.delete(key);
- this.storeClient.put(key, "123");
- this.storeClient.delete(key);
+ if(isView) {
+ this.viewStoreClient.delete(key);
+ this.viewStoreClient.put(key, "123");
+ this.viewStoreClient.delete(key);
+ } else {
+ this.storeClient.delete(key);
+ this.storeClient.put(key, "123");
+ this.storeClient.delete(key);
+ }
}
} else {
@@ -381,6 +397,7 @@ public void initializeStore(Props benchmarkProps) throws Exception {
null,
BenchmarkViews.loadTransformation(benchmarkProps.getString(VIEW_CLASS)
.trim()));
+ isView = true;
}
store = SerializingStore.wrap(engine,
@@ -389,7 +406,10 @@ public void initializeStore(Props benchmarkProps) throws Exception {
new IdentitySerializer());
this.factory = new StaticStoreClientFactory(store);
- this.storeClient = factory.getStoreClient(store.getName());
+ if(storeType.compareTo("view") == 0)
+ this.viewStoreClient = factory.getViewStoreClient(store.getName());
+ else
+ this.storeClient = factory.getStoreClient(store.getName());
}
this.storeInitialized = true;
@@ -430,11 +450,13 @@ public long runTests(boolean runBenchmark) throws Exception {
label = new String("warmup");
}
Vector<Thread> threads = new Vector<Thread>();
-
+ DbWrapper db;
for(int index = 0; index < this.numThreads; index++) {
- VoldemortWrapper db = new VoldemortWrapper(storeClient,
- this.verifyRead,
- this.ignoreNulls);
+ if(isView) {
+ db = new VoldemortViewWrapper(viewStoreClient, this.verifyRead, this.ignoreNulls);
+ } else {
+ db = new VoldemortWrapper(storeClient, this.verifyRead, this.ignoreNulls);
+ }
Thread clientThread = new ClientThread(db,
runBenchmark,
this.workLoad,
View
22 test/integration/voldemort/performance/benchmark/DbWrapper.java
@@ -0,0 +1,22 @@
+package voldemort.performance.benchmark;
+
+/**
+ * @author nnarkhed
+ *
+ */
+public interface DbWrapper {
+
+ public int read(Object key, Object expectedValue);
+
+ public int read(Object key, Object expectedValue, Object transforms);
+
+ public int mixed(final Object key, final Object newValue);
+
+ public int mixed(final Object key, final Object newValue, final Object transforms);
+
+ public int write(final Object key, final Object value);
+
+ public int write(final Object key, final Object value, final Object transforms);
+
+ public int delete(Object key);
+}
View
192 test/integration/voldemort/performance/benchmark/VoldemortViewWrapper.java
@@ -0,0 +1,192 @@
+package voldemort.performance.benchmark;
+
+import voldemort.client.ViewStoreClient;
+import voldemort.client.ViewUpdateAction;
+import voldemort.utils.Time;
+import voldemort.versioning.Versioned;
+
+/**
+ * @author nnarkhed
+ *
+ */
+public class VoldemortViewWrapper implements DbWrapper {
+
+ public final int Ok = 0;
+ public final int Error = -1;
+
+ private ViewStoreClient<Object, Object, Object> voldemortStore;
+ private Metrics measurement;
+ private boolean verifyReads;
+ private boolean ignoreNulls;
+
+ public static final String READS_STRING = "reads";
+ public static final String DELETES_STRING = "deletes";
+ public static final String WRITES_STRING = "writes";
+ public static final String MIXED_STRING = "transactions";
+
+ public VoldemortViewWrapper(ViewStoreClient<Object, Object, Object> storeClient,
+ boolean verifyReads,
+ boolean ignoreNulls) {
+ this.voldemortStore = storeClient;
+ this.measurement = Metrics.getInstance();
+ this.verifyReads = verifyReads;
+ this.ignoreNulls = ignoreNulls;
+ }
+
+ public int read(Object key, Object expectedValue) {
+ long startNs = System.nanoTime();
+ Versioned<Object> returnedValue = voldemortStore.get(key);
+ long endNs = System.nanoTime();
+ measurement.measure(READS_STRING, (int) ((endNs - startNs) / Time.NS_PER_MS));
+
+ int res = this.Ok;
+ if(returnedValue == null && !this.ignoreNulls) {
+ res = this.Error;
+ }
+
+ if(verifyReads && !expectedValue.equals(returnedValue.getValue())) {
+ res = this.Error;
+ }
+
+ measurement.reportReturnCode(READS_STRING, res);
+ return res;
+ }
+
+ public int read(Object key, Object expectedValue, Object transforms) {
+ long startNs = System.nanoTime();
+ Versioned<Object> returnedValue = voldemortStore.get(key, transforms);
+ long endNs = System.nanoTime();
+ measurement.measure(READS_STRING, (int) ((endNs - startNs) / Time.NS_PER_MS));
+
+ int res = this.Ok;
+ if(returnedValue == null && !this.ignoreNulls) {
+ res = this.Error;
+ }
+
+ if(verifyReads && !expectedValue.equals(returnedValue.getValue())) {
+ res = this.Error;
+ }
+
+ measurement.reportReturnCode(READS_STRING, res);
+ return res;
+ }
+
+ public int mixed(final Object key, final Object newValue) {
+
+ boolean updated = voldemortStore.applyUpdate(new ViewUpdateAction<Object, Object, Object>() {
+
+ @Override
+ public void update(ViewStoreClient<Object, Object, Object> storeClient) {
+ long startNs = System.nanoTime();
+ Versioned<Object> v = storeClient.get(key);
+ if(v != null) {
+ voldemortStore.put(key, newValue);
+ }
+ long endNs = System.nanoTime();
+ measurement.measure(MIXED_STRING,
+ (int) ((endNs - startNs) / Time.NS_PER_MS));
+ }
+ },
+ 3);
+
+ int res = this.Error;
+ if(updated) {
+ res = this.Ok;
+ }
+
+ measurement.reportReturnCode(MIXED_STRING, res);
+ return res;
+ }
+
+ public int mixed(final Object key, final Object newValue, final Object transforms) {
+
+ boolean updated = voldemortStore.applyUpdate(new ViewUpdateAction<Object, Object, Object>() {
+
+ @Override
+ public void update(ViewStoreClient<Object, Object, Object> storeClient) {
+ long startNs = System.nanoTime();
+ Versioned<Object> v = storeClient.get(key,
+ transforms);
+ if(v != null) {
+ voldemortStore.put(key, newValue);
+ }
+ long endNs = System.nanoTime();
+ measurement.measure(MIXED_STRING,
+ (int) ((endNs - startNs) / Time.NS_PER_MS));
+ }
+ },
+ 3);
+
+ int res = this.Error;
+ if(updated) {
+ res = this.Ok;
+ }
+
+ measurement.reportReturnCode(MIXED_STRING, res);
+ return res;
+ }
+
+ public int write(final Object key, final Object value) {
+
+ boolean written = voldemortStore.applyUpdate(new ViewUpdateAction<Object, Object, Object>() {
+
+ @Override
+ public void update(ViewStoreClient<Object, Object, Object> storeClient) {
+ long startNs = System.nanoTime();
+ storeClient.put(key, value);
+ long endNs = System.nanoTime();
+ measurement.measure(WRITES_STRING,
+ (int) ((endNs - startNs) / Time.NS_PER_MS));
+ }
+ },
+ 3);
+
+ int res = this.Error;
+ if(written) {
+ res = this.Ok;
+ }
+
+ measurement.reportReturnCode(WRITES_STRING, this.Ok);
+ return res;
+ }
+
+ public int write(final Object key, final Object value, final Object transforms) {
+
+ boolean written = voldemortStore.applyUpdate(new ViewUpdateAction<Object, Object, Object>() {
+
+ @Override
+ public void update(ViewStoreClient<Object, Object, Object> storeClient) {
+ long startNs = System.nanoTime();
+ storeClient.put(key, value, transforms);
+ long endNs = System.nanoTime();
+ measurement.measure(WRITES_STRING,
+ (int) ((endNs - startNs) / Time.NS_PER_MS));
+ }
+ },
+ 3);
+
+ int res = this.Error;
+ if(written) {
+ res = this.Ok;
+ }
+
+ measurement.reportReturnCode(WRITES_STRING, this.Ok);
+ return res;
+ }
+
+ public int delete(Object key) {
+ long startNs = System.nanoTime();
+ boolean deleted = voldemortStore.delete(key);
+ long endNs = System.nanoTime();
+
+ int res = this.Error;
+ if(deleted) {
+ res = this.Ok;
+ }
+
+ measurement.measure(DELETES_STRING, (int) ((endNs - startNs) / Time.NS_PER_MS));
+ measurement.reportReturnCode(DELETES_STRING, res);
+ return res;
+ }
+
+}
View
92 test/integration/voldemort/performance/benchmark/VoldemortWrapper.java
@@ -21,12 +21,12 @@
import voldemort.utils.Time;
import voldemort.versioning.Versioned;
-public class VoldemortWrapper {
+public class VoldemortWrapper implements DbWrapper {
public final int Ok = 0;
public final int Error = -1;
- private StoreClient<Object, Object, Object> voldemortStore;
+ private StoreClient<Object, Object> voldemortStore;
private Metrics measurement;
private boolean verifyReads;
private boolean ignoreNulls;
@@ -36,7 +36,7 @@
public static final String WRITES_STRING = "writes";
public static final String MIXED_STRING = "transactions";
- public VoldemortWrapper(StoreClient<Object, Object, Object> storeClient,
+ public VoldemortWrapper(StoreClient<Object, Object> storeClient,
boolean verifyReads,
boolean ignoreNulls) {
this.voldemortStore = storeClient;
@@ -64,31 +64,12 @@ public int read(Object key, Object expectedValue) {
return res;
}
- public int read(Object key, Object expectedValue, Object transforms) {
- long startNs = System.nanoTime();
- Versioned<Object> returnedValue = voldemortStore.get(key, transforms);
- long endNs = System.nanoTime();
- measurement.measure(READS_STRING, (int) ((endNs - startNs) / Time.NS_PER_MS));
-
- int res = this.Ok;
- if(returnedValue == null && !this.ignoreNulls) {
- res = this.Error;
- }
-
- if(verifyReads && !expectedValue.equals(returnedValue.getValue())) {
- res = this.Error;
- }
-
- measurement.reportReturnCode(READS_STRING, res);
- return res;
- }
-
public int mixed(final Object key, final Object newValue) {
- boolean updated = voldemortStore.applyUpdate(new UpdateAction<Object, Object, Object>() {
+ boolean updated = voldemortStore.applyUpdate(new UpdateAction<Object, Object>() {
@Override
- public void update(StoreClient<Object, Object, Object> storeClient) {
+ public void update(StoreClient<Object, Object> storeClient) {
long startNs = System.nanoTime();
Versioned<Object> v = storeClient.get(key);
if(v != null) {
@@ -108,37 +89,12 @@ public void update(StoreClient<Object, Object, Object> storeClient) {
return res;
}
- public int mixed(final Object key, final Object newValue, final Object transforms) {
-
- boolean updated = voldemortStore.applyUpdate(new UpdateAction<Object, Object, Object>() {
-
- @Override
- public void update(StoreClient<Object, Object, Object> storeClient) {
- long startNs = System.nanoTime();
- Versioned<Object> v = storeClient.get(key);
- if(v != null) {
- voldemortStore.put(key, newValue, transforms);
- }
- long endNs = System.nanoTime();
- measurement.measure(MIXED_STRING, (int) ((endNs - startNs) / Time.NS_PER_MS));
- }
- }, 3);
-
- int res = this.Error;
- if(updated) {
- res = this.Ok;
- }
-
- measurement.reportReturnCode(MIXED_STRING, res);
- return res;
- }
-
public int write(final Object key, final Object value) {
- boolean written = voldemortStore.applyUpdate(new UpdateAction<Object, Object, Object>() {
+ boolean written = voldemortStore.applyUpdate(new UpdateAction<Object, Object>() {
@Override
- public void update(StoreClient<Object, Object, Object> storeClient) {
+ public void update(StoreClient<Object, Object> storeClient) {
long startNs = System.nanoTime();
storeClient.put(key, value);
long endNs = System.nanoTime();
@@ -155,28 +111,6 @@ public void update(StoreClient<Object, Object, Object> storeClient) {
return res;
}
- public int write(final Object key, final Object value, final Object transforms) {
-
- boolean written = voldemortStore.applyUpdate(new UpdateAction<Object, Object, Object>() {
-
- @Override
- public void update(StoreClient<Object, Object, Object> storeClient) {