Skip to content
This repository has been archived by the owner on Jun 23, 2020. It is now read-only.

Commit

Permalink
Basic working prototype for Coordinator and Sample Thin client
Browse files Browse the repository at this point in the history
  • Loading branch information
Chinmay Soman committed Mar 26, 2013
1 parent 314e2e8 commit 4140295
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 23 deletions.
5 changes: 5 additions & 0 deletions .classpath
Expand Up @@ -55,6 +55,11 @@
<classpathentry kind="lib" path="lib/snappy-0.2.jar"/>
<classpathentry kind="lib" path="lib/httpclient-4.1.2.jar"/>
<classpathentry kind="lib" path="lib/httpcore-4.1.2.jar"/>
<classpathentry kind="lib" path="lib/netty-3.5.8.Final.jar"/>
<classpathentry kind="lib" path="lib/r2-1.5.10.jar"/>
<classpathentry kind="lib" path="lib/pegasus-common-1.5.10.jar"/>
<classpathentry kind="lib" path="lib/data-1.5.10.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="lib" path="lib/joda-time-1.6.jar"/>
<classpathentry kind="lib" path="lib/mail-1.4.1.jar"/>
<classpathentry kind="lib" path="lib/azkaban-common-0.05.jar"/>
Expand Down
Binary file added lib/data-1.5.10.jar
Binary file not shown.
Binary file added lib/netty-3.5.8.Final.jar
Binary file not shown.
Binary file added lib/pegasus-common-1.5.10.jar
Binary file not shown.
Binary file added lib/r2-1.5.10.jar
Binary file not shown.
52 changes: 31 additions & 21 deletions src/java/voldemort/client/AbstractStoreClientFactory.java
Expand Up @@ -294,8 +294,11 @@ public <K, V, T> Store<K, V, T> getRawStore(String storeName,
failureDetectorRef,
isJmxEnabled,
this.jmxId);

store = new LoggingStore(store);

Store<K, V, T> finalStore = (Store<K, V, T>) store;

if(isJmxEnabled) {
StatTrackingStore statStore = new StatTrackingStore(store, this.stats);
store = statStore;
Expand All @@ -305,35 +308,42 @@ public <K, V, T> Store<K, V, T> getRawStore(String storeName,
+ JmxUtils.getJmxId(jmxId)));
}

if(storeDef.getKeySerializer().hasCompression()
|| storeDef.getValueSerializer().hasCompression()) {
store = new CompressingStore(store,
getCompressionStrategy(storeDef.getKeySerializer()),
getCompressionStrategy(storeDef.getValueSerializer()));
if(this.config.isEnableCompressionLayer()) {
if(storeDef.getKeySerializer().hasCompression()
|| storeDef.getValueSerializer().hasCompression()) {
store = new CompressingStore(store,
getCompressionStrategy(storeDef.getKeySerializer()),
getCompressionStrategy(storeDef.getValueSerializer()));
}
}

Serializer<K> keySerializer = (Serializer<K>) serializerFactory.getSerializer(storeDef.getKeySerializer());
Serializer<V> valueSerializer = (Serializer<V>) serializerFactory.getSerializer(storeDef.getValueSerializer());
if(this.config.isEnableSerializationLayer()) {
Serializer<K> keySerializer = (Serializer<K>) serializerFactory.getSerializer(storeDef.getKeySerializer());
Serializer<V> valueSerializer = (Serializer<V>) serializerFactory.getSerializer(storeDef.getValueSerializer());

if(storeDef.isView() && (storeDef.getTransformsSerializer() == null))
throw new SerializationException("Transforms serializer must be specified with a view ");
if(storeDef.isView() && (storeDef.getTransformsSerializer() == null))
throw new SerializationException("Transforms serializer must be specified with a view ");

Serializer<T> transformsSerializer = (Serializer<T>) serializerFactory.getSerializer(storeDef.getTransformsSerializer() != null ? storeDef.getTransformsSerializer()
: new SerializerDefinition("identity"));
Serializer<T> transformsSerializer = (Serializer<T>) serializerFactory.getSerializer(storeDef.getTransformsSerializer() != null ? storeDef.getTransformsSerializer()
: new SerializerDefinition("identity"));

Store<K, V, T> serializedStore = SerializingStore.wrap(store,
keySerializer,
valueSerializer,
transformsSerializer);
finalStore = SerializingStore.wrap(store,
keySerializer,
valueSerializer,
transformsSerializer);
}

// Add inconsistency resolving decorator, using their inconsistency
// resolver (if they gave us one)
InconsistencyResolver<Versioned<V>> secondaryResolver = resolver == null ? new TimeBasedInconsistencyResolver()
: resolver;
serializedStore = new InconsistencyResolvingStore<K, V, T>(serializedStore,
new ChainedResolver<Versioned<V>>(new VectorClockInconsistencyResolver(),
secondaryResolver));
return serializedStore;
if(this.config.isEnableInconsistencyResolvingLayer()) {
InconsistencyResolver<Versioned<V>> secondaryResolver = resolver == null ? new TimeBasedInconsistencyResolver()
: resolver;
finalStore = new InconsistencyResolvingStore<K, V, T>(finalStore,
new ChainedResolver<Versioned<V>>(new VectorClockInconsistencyResolver(),
secondaryResolver));
}

return finalStore;
}

protected ClientConfig getConfig() {
Expand Down
49 changes: 48 additions & 1 deletion src/java/voldemort/client/ClientConfig.java
Expand Up @@ -60,7 +60,6 @@ public class ClientConfig {
private volatile List<String> bootstrapUrls = null;
private volatile RequestFormatType requestFormatType = RequestFormatType.VOLDEMORT_V1;
private volatile RoutingTier routingTier = RoutingTier.CLIENT;
private volatile boolean enableJmx = true;
private volatile boolean enableLazy = true;

private volatile boolean enablePipelineRoutedStore = true;
Expand Down Expand Up @@ -107,6 +106,12 @@ public class ClientConfig {
private volatile boolean sysEnableJmx = false;
private volatile boolean sysEnablePipelineRoutedStore = true;

/* Voldemort client component config */
private volatile boolean enableJmx = true;
private volatile boolean enableCompressionLayer = true;
private volatile boolean enableSerializationLayer = true;
private volatile boolean enableInconsistencyResolvingLayer = true;

public ClientConfig() {}

/* Propery names for propery-based configuration */
Expand Down Expand Up @@ -157,6 +162,9 @@ public ClientConfig() {}
public static final String SYS_SOCKET_TIMEOUT_MS = "sys_socket_timeout_ms";
public static final String SYS_ENABLE_JMX = "sys_enable_jmx";
public static final String SYS_ENABLE_PIPELINE_ROUTED_STORE = "sys_enable_pipeline_routed_store";
public static final String ENABLE_COMPRESSION_LAYER = "enable_compression_layer";
public static final String ENABLE_SERIALIZATION_LAYER = "enable_serialization_layer";
public static final String ENABLE_INCONSISTENCY_RESOLVING_LAYER = "enable_inconsistency_resolving_layer";

/**
* Instantiate the client config using a properties file
Expand Down Expand Up @@ -360,6 +368,18 @@ private void setProperties(Properties properties) {
this.setSysEnablePipelineRoutedStore(props.getBoolean(SYS_ENABLE_PIPELINE_ROUTED_STORE));
}

if(props.containsKey(ENABLE_COMPRESSION_LAYER)) {
this.setEnableCompressionLayer(props.getBoolean(ENABLE_COMPRESSION_LAYER));
}

if(props.containsKey(ENABLE_SERIALIZATION_LAYER)) {
this.setEnableSerializationLayer(props.getBoolean(ENABLE_SERIALIZATION_LAYER));
}

if(props.containsKey(ENABLE_INCONSISTENCY_RESOLVING_LAYER)) {
this.setEnableInconsistencyResolvingLayer(props.getBoolean(ENABLE_INCONSISTENCY_RESOLVING_LAYER));
}

}

/**
Expand Down Expand Up @@ -1057,4 +1077,31 @@ public ClientConfig setAsyncJobThreadPoolSize(int asyncJobThreadPoolSize) {
this.asyncJobThreadPoolSize = asyncJobThreadPoolSize;
return this;
}

public boolean isEnableCompressionLayer() {
return enableCompressionLayer;
}

public ClientConfig setEnableCompressionLayer(boolean enableCompressionLayer) {
this.enableCompressionLayer = enableCompressionLayer;
return this;
}

public boolean isEnableSerializationLayer() {
return enableSerializationLayer;
}

public ClientConfig setEnableSerializationLayer(boolean enableSerializationLayer) {
this.enableSerializationLayer = enableSerializationLayer;
return this;
}

public boolean isEnableInconsistencyResolvingLayer() {
return enableInconsistencyResolvingLayer;
}

public ClientConfig setEnableInconsistencyResolvingLayer(boolean enableInconsistencyResolvingLayer) {
this.enableInconsistencyResolvingLayer = enableInconsistencyResolvingLayer;
return this;
}
}
Expand Up @@ -36,6 +36,7 @@
import voldemort.client.StoreClient;
import voldemort.client.StoreClientFactory;
import voldemort.client.protocol.RequestFormatType;
import voldemort.coordinator.RESTClient;
import voldemort.serialization.IdentitySerializer;
import voldemort.serialization.Serializer;
import voldemort.serialization.SerializerDefinition;
Expand Down Expand Up @@ -350,12 +351,14 @@ public void initializeStore(Props benchmarkProps) throws Exception {
TimeUnit.MILLISECONDS)
.setRequestFormatType(RequestFormatType.VOLDEMORT_V3)
.setBootstrapUrls(socketUrl);
// .enableDefaultClient(true);

if(clientZoneId >= 0) {
clientConfig.setClientZoneId(clientZoneId);
}
SocketStoreClientFactory socketFactory = new SocketStoreClientFactory(clientConfig);
this.storeClient = socketFactory.getStoreClient(storeName);
// this.storeClient = socketFactory.getStoreClient(storeName);
this.storeClient = new RESTClient<Object, Object>();
StoreDefinition storeDef = getStoreDefinition(socketFactory, storeName);
this.keyType = findKeyType(storeDef);
benchmarkProps.put(Benchmark.KEY_TYPE, this.keyType);
Expand Down

0 comments on commit 4140295

Please sign in to comment.