Permalink
Browse files

Server side transforms changes to client interface

  • Loading branch information...
1 parent a8ba2f9 commit fccc987a79cc63f0f35318d7569f27450fe64cd7 @rsumbaly rsumbaly committed Oct 4, 2010
Showing with 308 additions and 313 deletions.
  1. +3 −3 contrib/ec2-testing/test/voldemort/utils/Ec2FailureDetectorTest.java
  2. +1 −1 example/java/voldemort/examples/ClientExample.java
  3. +2 −2 src/java/voldemort/VoldemortClientShell.java
  4. +9 −4 src/java/voldemort/client/AbstractStoreClientFactory.java
  5. +9 −79 src/java/voldemort/client/DefaultStoreClient.java
  6. +16 −6 src/java/voldemort/client/MockStoreClientFactory.java
  7. +3 −38 src/java/voldemort/client/StoreClient.java
  8. +3 −7 src/java/voldemort/client/StoreClientFactory.java
  9. +2 −2 src/java/voldemort/client/UpdateAction.java
  10. +1 −1 src/java/voldemort/server/http/gui/QueryServlet.java
  11. +24 −8 src/java/voldemort/store/StoreDefinition.java
  12. +15 −5 src/java/voldemort/store/StoreDefinitionBuilder.java
  13. +2 −2 src/java/voldemort/store/serialized/SerializingStore.java
  14. +36 −3 src/java/voldemort/store/views/ViewStorageConfiguration.java
  15. +64 −8 src/java/voldemort/store/views/ViewStorageEngine.java
  16. +15 −15 src/java/voldemort/xml/StoreDefinitionsMapper.java
  17. +1 −0 src/java/voldemort/xml/stores.xsd
  18. +3 −3 test/common/voldemort/StaticStoreClientFactory.java
  19. +2 −2 test/integration/voldemort/TestClientShutdown.java
  20. +1 −1 test/integration/voldemort/performance/RemoteDataGenerator.java
  21. +7 −9 test/integration/voldemort/performance/RemoteTest.java
  22. +19 −2 test/integration/voldemort/performance/benchmark/Benchmark.java
  23. +8 −72 test/integration/voldemort/performance/benchmark/VoldemortWrapper.java
  24. +22 −5 test/integration/voldemort/performance/benchmark/Workload.java
  25. +16 −16 test/integration/voldemort/performance/benchmark/WorkloadPlugin.java
  26. +2 −2 test/unit/voldemort/client/AdminServiceBasicTest.java
  27. +1 −1 test/unit/voldemort/client/DefaultStoreClientTest.java
  28. +8 −8 test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java
  29. +1 −1 test/unit/voldemort/server/EndToEndTest.java
  30. +1 −1 test/unit/voldemort/store/compress/CompressingStoreTest.java
  31. +9 −4 test/unit/voldemort/store/views/ViewStorageEngineTest.java
  32. +2 −2 test/unit/voldemort/store/views/ViewTransformsTest.java
@@ -70,7 +70,7 @@
private static Map<String, Integer> nodeIds;
private FailureDetector failureDetector;
- private StoreClient<String, String, String> store;
+ private StoreClient<String, String> store;
private static final Logger logger = Logger.getLogger(Ec2FailureDetectorTest.class);
@@ -233,11 +233,11 @@ public void run() {
assertEquals(hostNamePairs.size(), failureDetector.getAvailableNodeCount());
}
- private void test(StoreClient<String, String, String> store) {
+ private void test(StoreClient<String, String> store) {
test(store, 1000);
}
- private void test(StoreClient<String, String, String> store, int tests) {
+ private void test(StoreClient<String, String> store, int tests) {
for(int i = 0; i < tests; i++) {
try {
store.get("test_" + i);
@@ -30,7 +30,7 @@ public static void main(String[] args) {
String bootstrapUrl = "tcp://localhost:6666";
StoreClientFactory factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(bootstrapUrl));
- StoreClient<String, String, String> client = factory.getStoreClient("my_store_name");
+ StoreClient<String, String> client = factory.getStoreClient("my_store_name");
// get the value
Versioned<String> version = client.get("some_key");
@@ -52,7 +52,7 @@
private static final String PROMPT = "> ";
- private static DefaultStoreClient<Object, Object, Object> client;
+ private static DefaultStoreClient<Object, Object> client;
public static void main(String[] args) throws Exception {
@@ -98,7 +98,7 @@ public static void main(String[] args) throws Exception {
StoreClientFactory factory = new SocketStoreClientFactory(clientConfig);
try {
- client = (DefaultStoreClient<Object, Object, Object>) factory.getStoreClient(storeName);
+ client = (DefaultStoreClient<Object, Object>) factory.getStoreClient(storeName);
} catch(Exception e) {
Utils.croak("Could not connect to server: " + e.getMessage());
}
@@ -33,6 +33,7 @@
import voldemort.cluster.Node;
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.serialization.IdentitySerializer;
+import voldemort.serialization.SerializationException;
import voldemort.serialization.Serializer;
import voldemort.serialization.SerializerDefinition;
import voldemort.serialization.SerializerFactory;
@@ -120,13 +121,13 @@ public AbstractStoreClientFactory(ClientConfig config) {
}
}
- public <K, V, T> StoreClient<K, V, T> getStoreClient(String storeName) {
+ public <K, V> StoreClient<K, V> getStoreClient(String storeName) {
return getStoreClient(storeName, null);
}
- public <K, V, T> StoreClient<K, V, T> getStoreClient(String storeName,
- InconsistencyResolver<Versioned<V>> resolver) {
- return new DefaultStoreClient<K, V, T>(storeName, resolver, this, 3);
+ public <K, V> StoreClient<K, V> getStoreClient(String storeName,
+ InconsistencyResolver<Versioned<V>> resolver) {
+ return new DefaultStoreClient<K, V>(storeName, resolver, this, 3);
}
@SuppressWarnings("unchecked")
@@ -188,6 +189,10 @@ public AbstractStoreClientFactory(ClientConfig config) {
Serializer<K> keySerializer = (Serializer<K>) serializerFactory.getSerializer(storeDef.getKeySerializer());
Serializer<V> valueSerializer = (Serializer<V>) serializerFactory.getSerializer(storeDef.getValueSerializer());
+
+ if(storeDef.isView() && (storeDef.getTransformsSerializer() == null))
+ throw new SerializationException("Transforms serializer must be specified with a view ");
+
Serializer<T> transformsSerializer = (Serializer<T>) serializerFactory.getSerializer(storeDef.getTransformsSerializer() != null ? storeDef.getTransformsSerializer()
: new SerializerDefinition("identity"));
@@ -53,15 +53,15 @@
*/
@Threadsafe
@JmxManaged(description = "A voldemort client")
-public class DefaultStoreClient<K, V, T> implements StoreClient<K, V, T> {
+public class DefaultStoreClient<K, V> implements StoreClient<K, V> {
private final Logger logger = Logger.getLogger(DefaultStoreClient.class);
private final StoreClientFactory storeFactory;
private final int metadataRefreshAttempts;
private final String storeName;
private final InconsistencyResolver<Versioned<V>> resolver;
- private volatile Store<K, V, T> store;
+ private volatile Store<K, V, Object> store;
public DefaultStoreClient(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
@@ -100,29 +100,15 @@ public boolean delete(K key, Version version) {
}
public V getValue(K key, V defaultValue) {
- Versioned<V> versioned = getWithDefaultValue(key, null, null);
+ Versioned<V> versioned = get(key);
if(versioned == null)
return defaultValue;
else
return versioned.getValue();
}
- private Versioned<V> getWithDefaultValue(K key, Versioned<V> defaultValue, T transforms) {
- for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
- try {
- List<Versioned<V>> items = store.get(key, transforms);
- return getItemOrThrow(key, defaultValue, items);
- } catch(InvalidMetadataException e) {
- bootStrap();
- }
- }
- throw new VoldemortException(this.metadataRefreshAttempts
- + " metadata refresh attempts failed.");
-
- }
-
public V getValue(K key) {
- Versioned<V> returned = getWithDefaultValue(key, null, null);
+ Versioned<V> returned = get(key, null);
if(returned == null)
return null;
else
@@ -164,34 +150,8 @@ else if(items.size() == 1)
+ ") = " + items, items);
}
- public Versioned<V> get(K key, T transforms) {
- return getWithDefaultValue(key, null, transforms);
- }
-
public Versioned<V> get(K key) {
- return getWithDefaultValue(key, null, null);
- }
-
- public Map<K, Versioned<V>> getAll(Iterable<K> keys, Map<K, T> transforms) {
- Map<K, List<Versioned<V>>> items = null;
- for(int attempts = 0;; attempts++) {
- if(attempts >= this.metadataRefreshAttempts)
- throw new VoldemortException(this.metadataRefreshAttempts
- + " metadata refresh attempts failed.");
- try {
- items = store.getAll(keys, transforms);
- break;
- } catch(InvalidMetadataException e) {
- bootStrap();
- }
- }
- Map<K, Versioned<V>> result = Maps.newHashMapWithExpectedSize(items.size());
-
- for(Entry<K, List<Versioned<V>>> mapEntry: items.entrySet()) {
- Versioned<V> value = getItemOrThrow(mapEntry.getKey(), null, mapEntry.getValue());
- result.put(mapEntry.getKey(), value);
- }
- return result;
+ return get(key, null);
}
public Map<K, Versioned<V>> getAll(Iterable<K> keys) {
@@ -216,23 +176,6 @@ else if(items.size() == 1)
return result;
}
- public void put(K key, V value, T transforms) {
- List<Version> versions = getVersions(key);
- Versioned<V> versioned;
- if(versions.isEmpty())
- versioned = Versioned.value(value, new VectorClock());
- else if(versions.size() == 1)
- versioned = Versioned.value(value, versions.get(0));
- else {
- versioned = get(key, (Versioned<V>) null);
- if(versioned == null)
- versioned = Versioned.value(value, new VectorClock());
- else
- versioned.setObject(value);
- }
- put(key, versioned, transforms);
- }
-
public void put(K key, V value) {
List<Version> versions = getVersions(key);
Versioned<V> versioned;
@@ -241,13 +184,13 @@ public void put(K key, V value) {
else if(versions.size() == 1)
versioned = Versioned.value(value, versions.get(0));
else {
- versioned = get(key, (T) null);
+ versioned = get(key, null);
if(versioned == null)
versioned = Versioned.value(value, new VectorClock());
else
versioned.setObject(value);
}
- put(key, versioned, null);
+ put(key, versioned);
}
public boolean putIfNotObsolete(K key, Versioned<V> versioned) {
@@ -259,19 +202,6 @@ public boolean putIfNotObsolete(K key, Versioned<V> versioned) {
}
}
- public void put(K key, Versioned<V> versioned, T transforms) throws ObsoleteVersionException {
- for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
- try {
- store.put(key, versioned, transforms);
- return;
- } catch(InvalidMetadataException e) {
- bootStrap();
- }
- }
- throw new VoldemortException(this.metadataRefreshAttempts
- + " metadata refresh attempts failed.");
- }
-
public void put(K key, Versioned<V> versioned) throws ObsoleteVersionException {
for(int attempts = 0; attempts < this.metadataRefreshAttempts; attempts++) {
@@ -286,11 +216,11 @@ public void put(K key, Versioned<V> versioned) throws ObsoleteVersionException {
+ " metadata refresh attempts failed.");
}
- public boolean applyUpdate(UpdateAction<K, V, T> action) {
+ public boolean applyUpdate(UpdateAction<K, V> action) {
return applyUpdate(action, 3);
}
- public boolean applyUpdate(UpdateAction<K, V, T> action, int maxTries) {
+ public boolean applyUpdate(UpdateAction<K, V> action, int maxTries) {
boolean success = false;
try {
for(int i = 0; i < maxTries; i++) {
@@ -31,6 +31,7 @@
import voldemort.store.serialized.SerializingStore;
import voldemort.store.versioned.InconsistencyResolvingStore;
import voldemort.store.versioned.VersionIncrementingStore;
+import voldemort.store.views.ViewStorageConfiguration;
import voldemort.store.views.ViewStorageEngine;
import voldemort.utils.SystemTime;
import voldemort.utils.Time;
@@ -95,12 +96,12 @@ public MockStoreClientFactory(Serializer<?> keySerializer,
failureDetector = new NoopFailureDetector();
}
- public <K, V, T> StoreClient<K, V, T> getStoreClient(String storeName) {
+ public <K, V> StoreClient<K, V> getStoreClient(String storeName) {
return getStoreClient(storeName, new TimeBasedInconsistencyResolver<V>());
}
- public <K, V, T> StoreClient<K, V, T> getStoreClient(String storeName,
- InconsistencyResolver<Versioned<V>> resolver) {
+ public <K, V> StoreClient<K, V> getStoreClient(String storeName,
+ InconsistencyResolver<Versioned<V>> resolver) {
return new DefaultStoreClient(storeName, resolver, this, 3);
}
@@ -143,13 +144,21 @@ public MockStoreClientFactory(Serializer<?> keySerializer,
Serializer<K1> keySerializer = (Serializer<K1>) serializerFactory.getSerializer(storeDef.getKeySerializer());
Serializer<V1> valueSerializer = (Serializer<V1>) serializerFactory.getSerializer(storeDef.getValueSerializer());
- Serializer<T1> transformsSerializer = (Serializer<T1>) serializerFactory.getSerializer(storeDef.getTransformsSerializer());
+ Serializer<T1> transformsSerializer = null;
+
+ if(storeDef.isView())
+ transformsSerializer = (Serializer<T1>) serializerFactory.getSerializer(storeDef.getTransformsSerializer());
// Add inconsistency resolving decorator, using their inconsistency
// resolver (if they gave us one)
InconsistencyResolver<Versioned<V1>> secondaryResolver = new TimeBasedInconsistencyResolver();
- StorageEngine engine = new InMemoryStorageEngine(storeDef.getViewTargetStoreName());
+ StorageEngine engine;
+ if(storeDef.isView())
+ engine = new InMemoryStorageEngine(storeDef.getViewTargetStoreName());
+ else
+ engine = new InMemoryStorageEngine(storeDef.getName());
+
if(storeDef.isView()) {
// instantiate view
String targetName = storeDef.getViewTargetStoreName();
@@ -165,7 +174,8 @@ public MockStoreClientFactory(Serializer<?> keySerializer,
: serializerFactory.getSerializer(targetDef.getKeySerializer()),
this.valueSerializer != null ? this.valueSerializer
: serializerFactory.getSerializer(targetDef.getValueSerializer()),
- storeDef.getValueTransformation());
+ null,
+ ViewStorageConfiguration.loadTransformation(storeDef.getValueTransformation()));
}
Store store = new VersionIncrementingStore(engine, nodeId, time);
@@ -34,7 +34,7 @@
* @param <V> The type of the value being stored
*/
@Threadsafe
-public interface StoreClient<K, V, T> {
+public interface StoreClient<K, V> {
/**
* Get the value associated with the given key or null if there is no value
@@ -68,19 +68,6 @@
*/
public Versioned<V> get(K key);
- /**
- * Get the versioned value associated with the given key and apply the given
- * transforms to it before returning the value. Returns null if no value is
- * associated with the key
- *
- * @param key the key for which the value is fetched
- * @param transforms the transforms to be applied on the value fetched from
- * the store
- * @return the transformed versioned value, or null if no value is stored
- * for this key
- */
- public Versioned<V> get(K key, T transforms);
-
/**
* Gets the versioned values associated with the given keys and returns them
* in a Map of keys to versioned values. Note that the returned map will
@@ -92,18 +79,6 @@
*/
public Map<K, Versioned<V>> getAll(Iterable<K> keys);
- /**
- * Like {@link voldemort.client.StoreClient#getAll(Iterable) getAll}, except
- * that the transforms are applied on the value associated with each key
- * before returning the results
- *
- * @param keys the keys for which the values are fetched
- * @param transforms the map of transforms, describing the transform to be
- * applied to the value for each key
- * @return A map of keys to transformed versioned values
- */
- public Map<K, Versioned<V>> getAll(Iterable<K> keys, Map<K, T> transforms);
-
/**
* Get the versioned value associated with the given key or the defaultValue
* if no value is associated with the key.
@@ -123,16 +98,6 @@
*/
public void put(K key, V value);
- /**
- * Like {@link voldemort.store.StoreClient#put(K, V) put}, except that the
- * given transforms are applied on the value before writing it to the store
- *
- * @param key the key
- * @param value the value
- * @param transforms the transforms to be applied on the value
- */
- public void put(K key, V value, T transforms);
-
/**
* Put the given Versioned value into the store for the given key if the
* version is greater to or concurrent with existing values. Throw an
@@ -165,7 +130,7 @@
* @return true if the action is successfully applied, false if the 3
* attempts all result in ObsoleteVersionException
*/
- public boolean applyUpdate(UpdateAction<K, V, T> action);
+ public boolean applyUpdate(UpdateAction<K, V> action);
/**
* Apply the given action repeatedly until no ObsoleteVersionException is
@@ -176,7 +141,7 @@
* @return true if the action is successfully applied, false if maxTries
* failed attempts have been made
*/
- public boolean applyUpdate(UpdateAction<K, V, T> action, int maxTries);
+ public boolean applyUpdate(UpdateAction<K, V> action, int maxTries);
/**
* Delete any version of the given key which equal to or less than the
Oops, something went wrong.

0 comments on commit fccc987

Please sign in to comment.