Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

ISPN-1293 Implement Default Lifespan/MaxIdle over HotRod

  • Loading branch information...
commit 26a64f8539be29be7971d6843f404b29631525f4 1 parent 67f2027
@tristantarrant tristantarrant authored danberindei committed
Showing with 580 additions and 228 deletions.
  1. +1 −0  .gitignore
  2. +20 −1 client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java
  3. +2 −1  client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/ConfigurationProperties.java
  4. +23 −11 client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
  5. +26 −6 client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/OperationsFactory.java
  6. +5 −4 client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/Codec10.java
  7. +0 −1  client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/Codec11.java
  8. +70 −0 client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/Codec12.java
  9. +15 −5 client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/CodecFactory.java
  10. +2 −1  client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java
  11. +115 −0 client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DefaultExpirationTest.java
  12. +4 −4 client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ForceReturnValuesTest.java
  13. +3 −3 client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
  14. +2 −2 core/src/main/java/org/infinispan/CacheImpl.java
  15. +30 −13 server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
  16. +30 −0 server/core/src/main/scala/org/infinispan/server/core/ServerConstants.scala
  17. +18 −19 server/core/src/main/scala/org/infinispan/server/core/transport/NettyTransport.scala
  18. +1 −1  server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractEncoder1x.scala
  19. +166 −0 server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractTopologyAwareEncoder1x.scala
  20. +2 −2 server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
  21. +1 −1  server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedEncoder.scala
  22. +1 −0  server/hotrod/src/main/scala/org/infinispan/server/hotrod/Constants.scala
  23. +1 −1  server/hotrod/src/main/scala/org/infinispan/server/hotrod/CrashedMemberDetectorListener.scala
  24. +23 −15 server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
  25. +5 −123 server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoders.scala
  26. +2 −2 server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
  27. +3 −2 server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
  28. +1 −1  server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodOperation.scala
  29. +1 −1  server/hotrod/src/main/scala/org/infinispan/server/hotrod/LifecycleCallbacks.scala
  30. +1 −1  server/hotrod/src/main/scala/org/infinispan/server/hotrod/OperationStatus.scala
  31. +1 −2  server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
  32. +1 −1  server/hotrod/src/main/scala/org/infinispan/server/hotrod/ServerAddress.scala
  33. +4 −4 server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
View
1  .gitignore
@@ -26,6 +26,7 @@ classes
PutObjectStoreDirHere
/query/person
ObjectStore
+.cache
# generated rhq plugin xml
rhq-plugin.xml
# Compiled python files
View
21 client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java
@@ -34,6 +34,12 @@
* the previous value associated with the key. By applying this flag, this default
* behavior is overridden for the scope of a single invocation, and the previous
* existing value is returned.</li>
+ * <li>{@link #DEFAULT_LIFESPAN} This flag can either be used as a request flag during a put operation to mean
+ * that the default server lifespan should be applied or as a response flag meaning that
+ * the return entry has a default lifespan value</li>
+ * <li>{@link #DEFAULT_MAXIDLE} This flag can either be used as a request flag during a put operation to mean
+ * that the default server maxIdle should be applied or as a response flag meaning that
+ * the return entry has a default maxIdle value</li>
* </ul>
*
* @author Mircea.Markus@jboss.com
@@ -48,7 +54,20 @@
* By applying this flag, this default behavior is overridden for the scope of a single invocation, and the previous
* existing value is returned.
*/
- FORCE_RETURN_VALUE(0x0001);
+ FORCE_RETURN_VALUE(0x0001),
+ /**
+ * This flag can either be used as a request flag during a put operation to mean that the default
+ * server lifespan should be applied or as a response flag meaning that the return entry has a
+ * default lifespan value
+ */
+ DEFAULT_LIFESPAN(0x0002),
+ /**
+ * This flag can either be used as a request flag during a put operation to mean that the default
+ * server maxIdle should be applied or as a response flag meaning that the return entry has a
+ * default maxIdle value
+ */
+ DEFAULT_MAXIDLE(0x0004)
+ ;
private int flagInt;
View
3  client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/ConfigurationProperties.java
@@ -65,9 +65,10 @@
public static final int DEFAULT_HOTROD_PORT = 11222;
public static final int DEFAULT_SO_TIMEOUT = 60000;
public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
+ public static final String PROTOCOL_VERSION_12 = "1.2";
public static final String PROTOCOL_VERSION_11 = "1.1";
public static final String PROTOCOL_VERSION_10 = "1.0";
- public static final String DEFAULT_PROTOCOL_VERSION = PROTOCOL_VERSION_11;
+ public static final String DEFAULT_PROTOCOL_VERSION = PROTOCOL_VERSION_12;
private final TypedProperties props;
View
34 client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
@@ -103,7 +103,7 @@ public RemoteCacheManager getRemoteCacheManager() {
public boolean removeWithVersion(K key, long version) {
assertRemoteCacheManagerIsStarted();
RemoveIfUnmodifiedOperation op = operationsFactory.newRemoveIfUnmodifiedOperation(obj2bytes(key, true), version);
- VersionedOperationResponse response = (VersionedOperationResponse) op.execute();
+ VersionedOperationResponse response = op.execute();
return response.getCode().isUpdated();
}
@@ -127,7 +127,7 @@ public Boolean call() throws Exception {
public boolean replaceWithVersion(K key, V newValue, long version, int lifespanSeconds, int maxIdleTimeSeconds) {
assertRemoteCacheManagerIsStarted();
ReplaceIfUnmodifiedOperation op = operationsFactory.newReplaceIfUnmodifiedOperation(obj2bytes(key, true), obj2bytes(newValue, false), lifespanSeconds, maxIdleTimeSeconds, version);
- VersionedOperationResponse response = (VersionedOperationResponse) op.execute();
+ VersionedOperationResponse response = op.execute();
return response.getCode().isUpdated();
}
@@ -151,7 +151,7 @@ public Boolean call() throws Exception {
public VersionedValue<V> getVersioned(K key) {
assertRemoteCacheManagerIsStarted();
GetWithVersionOperation op = operationsFactory.newGetWithVersionOperation(obj2bytes(key, true));
- BinaryVersionedValue value = (BinaryVersionedValue) op.execute();
+ BinaryVersionedValue value = op.execute();
return binary2VersionedValue(value);
}
@@ -185,7 +185,7 @@ public Void call() throws Exception {
public int size() {
assertRemoteCacheManagerIsStarted();
StatsOperation op = operationsFactory.newStatsOperation();
- return Integer.parseInt(((Map<String, String>) op.execute()).get(ServerStatistics.CURRENT_NR_OF_ENTRIES));
+ return Integer.parseInt(op.execute().get(ServerStatistics.CURRENT_NR_OF_ENTRIES));
}
@Override
@@ -198,7 +198,7 @@ public boolean isEmpty() {
public ServerStatistics stats() {
assertRemoteCacheManagerIsStarted();
StatsOperation op = operationsFactory.newStatsOperation();
- Map<String, String> statsMap = (Map<String, String>) op.execute();
+ Map<String, String> statsMap = op.execute();
ServerStatisticsImpl stats = new ServerStatisticsImpl();
for (Map.Entry<String, String> entry : statsMap.entrySet()) {
stats.addStats(entry.getKey(), entry.getValue());
@@ -212,11 +212,12 @@ public V put(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleT
assertRemoteCacheManagerIsStarted();
int lifespanSecs = toSeconds(lifespan, lifespanUnit);
int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
+ applyDefaultExpirationFlags(lifespan, maxIdleTime);
if (log.isTraceEnabled()) {
log.tracef("About to add (K,V): (%s, %s) lifespanSecs:%d, maxIdleSecs:%d", key, value, lifespanSecs, maxIdleSecs);
}
PutOperation op = operationsFactory.newPutKeyValueOperation(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs);
- byte[] result = (byte[]) op.execute();
+ byte[] result = op.execute();
return (V) bytes2obj(result);
}
@@ -227,8 +228,9 @@ public V putIfAbsent(K key, V value, long lifespan, TimeUnit lifespanUnit, long
assertRemoteCacheManagerIsStarted();
int lifespanSecs = toSeconds(lifespan, lifespanUnit);
int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
+ applyDefaultExpirationFlags(lifespan, maxIdleTime);
PutIfAbsentOperation op = operationsFactory.newPutIfAbsentOperation(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs);
- byte[] bytes = (byte[]) op.execute();
+ byte[] bytes = op.execute();
return (V) bytes2obj(bytes);
}
@@ -238,8 +240,9 @@ public V replace(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxI
assertRemoteCacheManagerIsStarted();
int lifespanSecs = toSeconds(lifespan, lifespanUnit);
int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
+ applyDefaultExpirationFlags(lifespan, maxIdleTime);
ReplaceOperation op = operationsFactory.newReplaceOperation(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs);
- byte[] bytes = (byte[]) op.execute();
+ byte[] bytes = op.execute();
return (V) bytes2obj(bytes);
}
@@ -327,7 +330,7 @@ public V call() throws Exception {
public boolean containsKey(Object key) {
assertRemoteCacheManagerIsStarted();
ContainsKeyOperation op = operationsFactory.newContainsKeyOperation(obj2bytes(key, true));
- return (Boolean)op.execute();
+ return op.execute();
}
@Override
@@ -336,7 +339,7 @@ public V get(Object key) {
assertRemoteCacheManagerIsStarted();
byte[] keyBytes = obj2bytes(key, true);
GetOperation gco = operationsFactory.newGetKeyOperation(keyBytes);
- byte[] bytes = (byte[]) gco.execute();
+ byte[] bytes = gco.execute();
V result = (V) bytes2obj(bytes);
if (log.isTraceEnabled()) {
log.tracef("For key(%s) returning %s", key, result);
@@ -369,7 +372,7 @@ public V get(Object key) {
public V remove(Object key) {
assertRemoteCacheManagerIsStarted();
RemoveOperation removeOperation = operationsFactory.newRemoveOperation(obj2bytes(key, true));
- byte[] existingValue = (byte[]) removeOperation.execute();
+ byte[] existingValue = removeOperation.execute();
// TODO: It sucks that you need the prev value to see if it works...
// We need to find a better API for RemoteCache...
return (V) bytes2obj(existingValue);
@@ -481,4 +484,13 @@ protected void set(K key, V value) {
// Warning: never invoke put(K,V) in this scope or we'll get a stackoverflow.
put(key, value, defaultLifespan, MILLISECONDS, defaultMaxIdleTime, MILLISECONDS);
}
+
+ private void applyDefaultExpirationFlags(long lifespan, long maxIdle) {
+ if (lifespan == 0) {
+ operationsFactory.addFlags(Flag.DEFAULT_LIFESPAN);
+ }
+ if (maxIdle == 0) {
+ operationsFactory.addFlags(Flag.DEFAULT_MAXIDLE);
+ }
+ }
}
View
32 client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/OperationsFactory.java
@@ -31,6 +31,8 @@
import org.infinispan.client.hotrod.impl.transport.Transport;
import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -44,7 +46,7 @@
private static final Flag[] FORCE_RETURN_VALUE = {Flag.FORCE_RETURN_VALUE};
- private final ThreadLocal<Flag[]> flagsMap = new ThreadLocal<Flag[]>();
+ private final ThreadLocal<List<Flag>> flagsMap = new ThreadLocal<List<Flag>>();
private final TransportFactory transportFactory;
@@ -157,15 +159,33 @@ public FaultTolerantPingOperation newFaultTolerantPingOperation() {
}
private Flag[] flags() {
- Flag[] flags = this.flagsMap.get();
+ List<Flag> flags = this.flagsMap.get();
this.flagsMap.remove();
- if (flags == null && forceReturnValue) {
- return FORCE_RETURN_VALUE;
+ if (forceReturnValue) {
+ if (flags == null) {
+ return FORCE_RETURN_VALUE;
+ } else {
+ flags.add(Flag.FORCE_RETURN_VALUE);
+ }
}
- return flags;
+ return flags != null ? flags.toArray(new Flag[0]) : null;
}
public void setFlags(Flag[] flags) {
- this.flagsMap.set(flags);
+ List<Flag> list = new ArrayList<Flag>();
+ for(Flag flag : flags)
+ list.add(flag);
+ this.flagsMap.set(list);
+ }
+
+ public void addFlags(Flag... flags) {
+ List<Flag> list = this.flagsMap.get();
+ if (list == null) {
+ list = new ArrayList<Flag>();
+ this.flagsMap.set(list);
+ }
+ for(Flag flag : flags)
+ list.add(flag);
+
}
}
View
9 client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/Codec10.java
@@ -66,7 +66,8 @@ protected HeaderParams writeHeader(
int flagInt = 0;
if (params.flags != null) {
for (Flag flag : params.flags) {
- flagInt = flag.getFlagInt() | flagInt;
+ if (flag.equals(Flag.FORCE_RETURN_VALUE)) // 1.0 / 1.1 servers only understand this flag
+ flagInt = flag.getFlagInt();
}
}
transport.writeVInt(flagInt);
@@ -126,12 +127,12 @@ public Log getLog() {
return log;
}
- private void checkForErrorsInResponseStatus(Transport transport, HeaderParams params, short status) {
+ protected void checkForErrorsInResponseStatus(Transport transport, HeaderParams params, short status) {
final Log localLog = getLog();
boolean isTrace = localLog.isTraceEnabled();
if (isTrace) localLog.tracef("Received operation status: %#x", status);
- switch ((int) status) {
+ switch (status) {
case HotRodConstants.INVALID_MAGIC_OR_MESSAGE_ID_STATUS:
case HotRodConstants.REQUEST_PARSING_ERROR_STATUS:
case HotRodConstants.UNKNOWN_COMMAND_STATUS:
@@ -161,7 +162,7 @@ private void checkForErrorsInResponseStatus(Transport transport, HeaderParams pa
}
}
- private void readNewTopologyIfPresent(Transport transport, HeaderParams params) {
+ protected void readNewTopologyIfPresent(Transport transport, HeaderParams params) {
short topologyChangeByte = transport.readByte();
if (topologyChangeByte == 1)
readNewTopologyAndHash(transport, params.topologyId);
View
1  client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/Codec11.java
@@ -25,7 +25,6 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash;
import org.infinispan.client.hotrod.impl.transport.Transport;
View
70 client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/Codec12.java
@@ -0,0 +1,70 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2012 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+package org.infinispan.client.hotrod.impl.protocol;
+
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.logging.Log;
+import org.infinispan.client.hotrod.logging.LogFactory;
+
+/**
+ * A Hot Rod encoder/decoder for version 1.2 of the protocol.
+ *
+ * @author Tristan Tarrant
+ * @author Galder Zamarreño
+ * @since 5.2
+ */
+public class Codec12 extends Codec11 {
+
+ private static final Log log = LogFactory.getLog(Codec12.class, Log.class);
+
+ @Override
+ public HeaderParams writeHeader(Transport transport, HeaderParams params) {
+ return writeHeader(transport, params, HotRodConstants.VERSION_12);
+ }
+
+ @Override
+ protected HeaderParams writeHeader(Transport transport, HeaderParams params, byte version) {
+ transport.writeByte(HotRodConstants.REQUEST_MAGIC);
+ transport.writeVLong(params.messageId(MSG_ID.incrementAndGet()).messageId);
+ transport.writeByte(version);
+ transport.writeByte(params.opCode);
+ transport.writeArray(params.cacheName);
+
+ int flagInt = 0;
+ if (params.flags != null) {
+ for (Flag flag : params.flags) {
+ flagInt = flag.getFlagInt() | flagInt;
+ }
+ }
+ transport.writeVInt(flagInt);
+ transport.writeByte(params.clientIntel);
+ transport.writeVInt(params.topologyId.get());
+ //todo change once TX support is added
+ transport.writeByte(params.txMarker);
+ getLog().tracef("Wrote header for message %d. Operation code: %#04x. Flags: %#x", params.messageId, params.opCode, flagInt);
+ return params;
+ }
+
+ @Override
+ public Log getLog() {
+ return log;
+ }
+
+}
View
20 client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/CodecFactory.java
@@ -19,7 +19,10 @@
package org.infinispan.client.hotrod.impl.protocol;
-import org.infinispan.client.hotrod.impl.ConfigurationProperties;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.infinispan.client.hotrod.impl.ConfigurationProperties.*;
/**
* Code factory.
@@ -28,15 +31,22 @@
* @since 5.1
*/
public class CodecFactory {
+ private static final Map<String, Codec> codecMap;
private static final Codec CODEC_10 = new Codec10();
private static final Codec CODEC_11 = new Codec11();
+ private static final Codec CODEC_12 = new Codec12();
+
+ static {
+ codecMap = new HashMap<String, Codec>();
+ codecMap.put(PROTOCOL_VERSION_10, CODEC_10);
+ codecMap.put(PROTOCOL_VERSION_11, CODEC_11);
+ codecMap.put(PROTOCOL_VERSION_12, CODEC_12);
+ }
public static Codec getCodec(String version) {
- if (version.equals(ConfigurationProperties.PROTOCOL_VERSION_10))
- return CODEC_10;
- else if (version.equals(ConfigurationProperties.PROTOCOL_VERSION_11))
- return CODEC_11;
+ if (codecMap.containsKey(version))
+ return codecMap.get(version);
else
throw new IllegalArgumentException("Invalid Hot Rod protocol version");
}
View
3  client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java
@@ -37,6 +37,7 @@
static final byte VERSION_10 = 10;
static final byte VERSION_11 = 11;
+ static final byte VERSION_12 = 12;
//requests
static final byte PUT_REQUEST = 0x01;
@@ -86,6 +87,6 @@
static final byte CLIENT_INTELLIGENCE_TOPOLOGY_AWARE = 0x02;
static final byte CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE = 0x03;
Charset HOTROD_STRING_CHARSET = Charset.forName("UTF-8");
-
+
static final byte[] DEFAULT_CACHE_NAME_BYTES = new byte[]{};
}
View
115 client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DefaultExpirationTest.java
@@ -0,0 +1,115 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2012 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+package org.infinispan.client.hotrod;
+
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.infinispan.Cache;
+import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.MortalCacheEntry;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.marshall.Marshaller;
+import org.infinispan.marshall.jboss.JBossMarshaller;
+import org.infinispan.server.core.CacheValue;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.util.ByteArrayKey;
+import org.testng.annotations.AfterClass;
+import static org.testng.AssertJUnit.*;
+
+/**
+ * @author Tristan Tarrant
+ * @since 5.2
+ */
+@Test (testName = "client.hotrod.DefaultExpirationTest", groups = "functional" )
+public class DefaultExpirationTest extends SingleCacheManagerTest {
+ private Marshaller marshaller = new JBossMarshaller();
+
+ private RemoteCache<String, String> remoteCache;
+ private RemoteCacheManager remoteCacheManager;
+ private Cache<ByteArrayKey, CacheValue> cache;
+ protected HotRodServer hotrodServer;
+
+ @Override
+ protected EmbeddedCacheManager createCacheManager() throws Exception {
+ ConfigurationBuilder builder = getDefaultStandaloneCacheConfig(false);
+ builder.expiration().lifespan(3, TimeUnit.SECONDS).maxIdle(2, TimeUnit.SECONDS);
+ cacheManager = TestCacheManagerFactory.createCacheManager(builder);
+ cache = cacheManager.getCache();
+
+ //pass the config file to the cache
+ hotrodServer = TestHelper.startHotRodServer(cacheManager);
+ log.info("Started server on port: " + hotrodServer.getPort());
+
+ remoteCacheManager = getRemoteCacheManager();
+ remoteCache = remoteCacheManager.getCache();
+ return cacheManager;
+ }
+
+ protected RemoteCacheManager getRemoteCacheManager() {
+ Properties config = new Properties();
+ config.put("infinispan.client.hotrod.server_list", "127.0.0.1:" + hotrodServer.getPort());
+ return new RemoteCacheManager(config);
+ }
+
+
+ @AfterClass(alwaysRun = true)
+ public void testDestroyRemoteCacheFactory() {
+ HotRodClientTestingUtil.killRemoteCacheManager(remoteCacheManager);
+ HotRodClientTestingUtil.killServers(hotrodServer);
+ }
+
+ @Test
+ public void testDefaultExpiration() throws Exception {
+ remoteCache.put("Key", "Value");
+ InternalCacheEntry entry = getInternalCacheEntry(cache, "Key", "Value");
+ assertTrue(entry.canExpire());
+ assertEquals(3000, entry.getLifespan());
+ assertEquals(2000, entry.getMaxIdle());
+ Thread.sleep(5000);
+ assertFalse(remoteCache.containsKey("Key"));
+ }
+
+ private InternalCacheEntry getInternalCacheEntry(Cache<ByteArrayKey, CacheValue> cache, String key, String value) throws Exception {
+ InternalCacheEntry entry = cache.getAdvancedCache().getDataContainer().get(toBinaryKey(key));
+ if (value != null) {
+ CacheValue v = (CacheValue) entry.getValue();
+ AssertJUnit.assertEquals(toBinaryValue(value), v.data());
+ }
+ return entry;
+ }
+
+ private ByteArrayKey toBinaryKey(String key) throws Exception {
+ byte[] keyBytes = marshaller.objectToByteBuffer(key, 64);
+ return new ByteArrayKey(keyBytes);
+ }
+
+ private byte[] toBinaryValue(String value) throws Exception {
+ return marshaller.objectToByteBuffer(value, 64);
+ }
+
+}
View
8 client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ForceReturnValuesTest.java
@@ -19,6 +19,7 @@
package org.infinispan.client.hotrod;
+import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.test.SingleCacheManagerTest;
@@ -26,7 +27,6 @@
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
-import scala.remote;
import java.util.Properties;
@@ -51,10 +51,10 @@ protected EmbeddedCacheManager createCacheManager() throws Exception {
@AfterMethod(alwaysRun = true)
void shutdown() {
- remoteCacheManager.stop();
- remoteCacheManager = null;
+ HotRodClientTestingUtil.killRemoteCacheManager(remoteCacheManager);
+ HotRodClientTestingUtil.killServers(hotRodServer);
}
-
+
public void testDontForceReturnValues() {
RemoteCache<String, String> rc = remoteCacheManager.getCache();
String rv = rc.put("Key", "Value");
View
6 client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
@@ -23,6 +23,7 @@
package org.infinispan.client.hotrod;
import org.infinispan.Cache;
+import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.Marshaller;
@@ -41,7 +42,6 @@
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Properties;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.infinispan.test.TestingUtil.k;
@@ -97,8 +97,8 @@ protected RemoteCacheManager getRemoteCacheManager() {
@AfterClass(alwaysRun = true)
public void testDestroyRemoteCacheFactory() {
- remoteCacheManager.stop();
- hotrodServer.stop();
+ HotRodClientTestingUtil.killRemoteCacheManager(remoteCacheManager);
+ HotRodClientTestingUtil.killServers(hotrodServer);
}
public void testPut() throws Exception {
View
4 core/src/main/java/org/infinispan/CacheImpl.java
@@ -392,7 +392,7 @@ private InvocationContext createSingleKeyNonTxInvocationContext(ClassLoader expl
public org.infinispan.config.Configuration getConfiguration() {
return LegacyConfigurationAdaptor.adapt(config);
}
-
+
@Override
public Configuration getCacheConfiguration() {
return config;
@@ -502,7 +502,7 @@ boolean lock(Collection<? extends K> keys, EnumSet<Flag> explicitFlags, ClassLoa
LockControlCommand command = commandsFactory.buildLockControlCommand((Collection<Object>) keys, explicitFlags);
return (Boolean) invoker.invoke(ctx, command);
}
-
+
@Override
public void applyDelta(K deltaAwareValueKey, Delta delta, Object... locksToAcquire) {
if (locksToAcquire == null || locksToAcquire.length == 0) {
View
43 server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
@@ -45,7 +45,7 @@ import org.jboss.netty.util.CharsetUtil
* @since 4.1
*/
abstract class AbstractProtocolDecoder[K, V <: CacheValue](transport: NettyTransport)
- extends ReplayingDecoder[DecoderState](DECODE_HEADER, true) with Log {
+ extends ReplayingDecoder[DecoderState](DECODE_HEADER, true) with ServerConstants with Log {
import AbstractProtocolDecoder._
type SuitableParameters <: RequestParameters
@@ -197,9 +197,14 @@ abstract class AbstractProtocolDecoder[K, V <: CacheValue](transport: NettyTrans
private def put: AnyRef = {
val v = createValue(generateVersion(cache))
// Get an optimised cache in case we can make the operation more efficient
- val prev = getOptimizedCache(cache).put(key, v,
- toMillis(params.lifespan), DefaultTimeUnit,
- toMillis(params.maxIdle), DefaultTimeUnit)
+ val prev = (params.lifespan, params.maxIdle) match {
+ case (EXPIRATION_DEFAULT, EXPIRATION_DEFAULT) => getOptimizedCache(cache).put(key, v)
+ case (_, EXPIRATION_DEFAULT) => getOptimizedCache(cache).put(key, v, toMillis(params.lifespan), DefaultTimeUnit)
+ case (_, _) => getOptimizedCache(cache).put(key, v,
+ toMillis(params.lifespan), DefaultTimeUnit,
+ toMillis(params.maxIdle), DefaultTimeUnit)
+ }
+
createSuccessResponse(prev)
}
@@ -209,9 +214,13 @@ abstract class AbstractProtocolDecoder[K, V <: CacheValue](transport: NettyTrans
var prev = cache.get(key)
if (prev == null) { // Generate new version only if key not present
val v = createValue(generateVersion(cache))
- prev = cache.putIfAbsent(key, v,
- toMillis(params.lifespan), DefaultTimeUnit,
- toMillis(params.maxIdle), DefaultTimeUnit)
+ prev = (params.lifespan, params.maxIdle) match {
+ case (EXPIRATION_DEFAULT, EXPIRATION_DEFAULT) => getOptimizedCache(cache).putIfAbsent(key, v)
+ case (_, EXPIRATION_DEFAULT) => getOptimizedCache(cache).putIfAbsent(key, v, toMillis(params.lifespan), DefaultTimeUnit)
+ case (_, _) => getOptimizedCache(cache).putIfAbsent(key, v,
+ toMillis(params.lifespan), DefaultTimeUnit,
+ toMillis(params.maxIdle), DefaultTimeUnit)
+ }
}
if (prev == null)
createSuccessResponse(prev)
@@ -223,9 +232,13 @@ abstract class AbstractProtocolDecoder[K, V <: CacheValue](transport: NettyTrans
var prev = cache.get(key)
if (prev != null) { // Generate new version only if key present
val v = createValue(generateVersion(cache))
- prev = cache.replace(key, v,
- toMillis(params.lifespan), DefaultTimeUnit,
- toMillis(params.maxIdle), DefaultTimeUnit)
+ prev = (params.lifespan, params.maxIdle) match {
+ case (EXPIRATION_DEFAULT, EXPIRATION_DEFAULT) => cache.replace(key, v)
+ case (_, EXPIRATION_DEFAULT) => cache.replace(key, v, toMillis(params.lifespan), DefaultTimeUnit)
+ case (_, _) => cache.replace(key, v,
+ toMillis(params.lifespan), DefaultTimeUnit,
+ toMillis(params.maxIdle), DefaultTimeUnit)
+ }
}
if (prev != null)
createSuccessResponse(prev)
@@ -239,9 +252,13 @@ abstract class AbstractProtocolDecoder[K, V <: CacheValue](transport: NettyTrans
if (prev.version == params.streamVersion) {
// Generate new version only if key present and version has not changed, otherwise it's wasteful
val v = createValue(generateVersion(cache))
- val replaced = cache.replace(key, prev, v,
- toMillis(params.lifespan), DefaultTimeUnit,
- toMillis(params.maxIdle), DefaultTimeUnit)
+ val replaced = (params.lifespan, params.maxIdle) match {
+ case (EXPIRATION_DEFAULT, EXPIRATION_DEFAULT) => cache.replace(key, prev, v)
+ case (_, EXPIRATION_DEFAULT) => cache.replace(key, prev, v, toMillis(params.lifespan), DefaultTimeUnit)
+ case (_, _) => cache.replace(key, prev, v,
+ toMillis(params.lifespan), DefaultTimeUnit,
+ toMillis(params.maxIdle), DefaultTimeUnit)
+ }
if (replaced)
createSuccessResponse(prev)
else
View
30 server/core/src/main/scala/org/infinispan/server/core/ServerConstants.scala
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2011 Red Hat, Inc. and/or its affiliates.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+package org.infinispan.server.core;
+
+/**
+ * Server Constant values
+ *
+ * @author Tristan Tarrant
+ * @since 5.2
+ */
+trait ServerConstants {
+ val EXPIRATION_NONE = -1
+ val EXPIRATION_DEFAULT = -2
+}
View
37 server/core/src/main/scala/org/infinispan/server/core/transport/NettyTransport.scala
@@ -45,7 +45,7 @@ import util.concurrent.{TimeUnit, Executors}
/**
* A Netty based transport.
- *
+ *
* @author Galder Zamarreño
* @since 4.1
*/
@@ -54,7 +54,22 @@ class NettyTransport(server: ProtocolServer, encoder: ChannelDownstreamHandler,
idleTimeout: Int, threadNamePrefix: String, tcpNoDelay: Boolean,
sendBufSize: Int, recvBufSize: Int, cacheManager: EmbeddedCacheManager)
extends Transport with Log {
-
+ ThreadRenamingRunnable.setThreadNameDeterminer(new ThreadNameDeterminer {
+ override def determineThreadName(currentThreadName: String, proposedThreadName: String): String = {
+ val index = proposedThreadName.indexWhere(_ == '#')
+ val typeInFix =
+ if (proposedThreadName contains "server worker") "ServerWorker-"
+ else if (proposedThreadName contains "server boss") "ServerMaster-"
+ else if (proposedThreadName contains "client worker") "ClientWorker-"
+ else "ClientMaster-"
+ // Set thread name to be: <prefix><ServerWorker-|ServerMaster-|ClientWorker-|ClientMaster-><number>
+ val name = threadNamePrefix + typeInFix + proposedThreadName.substring(index + 1, proposedThreadName.length)
+ if (isTrace)
+ trace("Thread name will be %s, with current thread name being %s and proposed name being '%s'",
+ name, Thread.currentThread, proposedThreadName)
+ name
+ }
+ })
private val serverChannels = new DefaultChannelGroup(threadNamePrefix + "-Channels")
val acceptedChannels = new DefaultChannelGroup(threadNamePrefix + "-Accepted")
private val pipeline =
@@ -73,22 +88,6 @@ class NettyTransport(server: ProtocolServer, encoder: ChannelDownstreamHandler,
cacheManager.getCacheManagerConfiguration.globalJmxStatistics().enabled()
override def start() {
- ThreadRenamingRunnable.setThreadNameDeterminer(new ThreadNameDeterminer {
- override def determineThreadName(currentThreadName: String, proposedThreadName: String): String = {
- val index = proposedThreadName.indexWhere(_ == '#')
- val typeInFix =
- if (proposedThreadName contains "server worker") "ServerWorker-"
- else if (proposedThreadName contains "server boss") "ServerMaster-"
- else if (proposedThreadName contains "client worker") "ClientWorker-"
- else "ClientMaster-"
- // Set thread name to be: <prefix><ServerWorker-|ServerMaster-|ClientWorker-|ClientMaster-><number>
- val name = threadNamePrefix + typeInFix + proposedThreadName.substring(index + 1, proposedThreadName.length)
- if (isTrace)
- trace("Thread name will be %s, with current thread name being %s and proposed name being '%s'",
- name, Thread.currentThread, proposedThreadName)
- name
- }
- })
// Make netty use log4j, otherwise it goes to JDK logging.
if (isLog4jAvailable)
InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory)
@@ -104,7 +103,7 @@ class NettyTransport(server: ProtocolServer, encoder: ChannelDownstreamHandler,
val ch = bootstrap.bind(address)
serverChannels.add(ch)
}
-
+
private def isLog4jAvailable: Boolean = {
try {
Util.loadClassStrict("org.apache.log4j.Logger",
View
2  server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractEncoder1x.scala
@@ -228,4 +228,4 @@ abstract class AbstractEncoder1x extends AbstractVersionedEncoder with Constants
trace("Topology will contain %d addresses", numServers)
}
-}
+}
View
166 server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractTopologyAwareEncoder1x.scala
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2011 Red Hat, Inc. and/or its affiliates.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+package org.infinispan.server.hotrod
+
+import logging.Log
+import org.jboss.netty.buffer.ChannelBuffer
+import org.infinispan.Cache
+import org.infinispan.manager.EmbeddedCacheManager
+import org.infinispan.remoting.transport.Address
+import org.infinispan.server.core.transport.ExtendedChannelBuffer._
+import collection.JavaConversions._
+import OperationStatus._
+import org.infinispan.util.ByteArrayKey
+import org.infinispan.server.core.CacheValue
+import org.infinispan.configuration.cache.Configuration
+import org.infinispan.distribution.ch.DefaultConsistentHash
+import collection.mutable.ArrayBuffer
+import org.infinispan.distribution.ch.ConsistentHash
+
+/**
+ * Hot Rod encoder for protocol version 1.1
+ *
+ * @author Galder Zamarreño
+ * @since 5.2
+ */
+abstract class AbstractTopologyAwareEncoder1x extends AbstractEncoder1x with Constants with Log {
+
+ override protected def createHashDistAwareResp(lastViewId: Int,
+ cfg: Configuration): AbstractHashDistAwareResponse = {
+ HashDistAware11Response(lastViewId, cfg.clustering().hash().numOwners(),
+ DEFAULT_HASH_FUNCTION_VERSION, Integer.MAX_VALUE,
+ cfg.clustering().hash().numVirtualNodes())
+ }
+
+ override protected def writeHashTopologyHeader(
+ topoResp: AbstractTopologyResponse, buf: ChannelBuffer, r: Response,
+ members: Cache[Address, ServerAddress], server: HotRodServer) {
+ topoResp match {
+ case h: HashDistAware11Response => {
+ trace("Write hash distribution change response header %s", h)
+ if (h.hashFunction == 0) {
+ // When the cache is replicated, we just send the addresses without any hash ids
+ writeCommonHashTopologyHeader(buf, h.viewId, h.numOwners,
+ h.hashFunction, h.hashSpace, members.size)
+ writeUnsignedInt(1, buf) // Num virtual nodes
+
+ mapAsScalaMap(members).foreach { case (addr, serverAddr) =>
+ writeString(serverAddr.host, buf)
+ writeUnsignedShort(serverAddr.port, buf)
+ // Send the address' hash code as is
+ // With virtual nodes off, clients will have to normalize it
+ // With virtual nodes on, it's used as root to calculate
+ // hash code and then normalize it
+ buf.writeInt(0)
+ }
+ return
+ }
+
+ val cache = server.getCacheInstance(r.cacheName, members.getCacheManager, false)
+
+ // This is not quite correct, as the ownership of segments on the 1.0/1.1 clients is not exactly
+ // the same as on the server. But the difference appears only for (numSegment*numOwners/MAX_INT)
+ // of the keys (at the "segment borders"), so it's still much better than having no hash information.
+ // The idea here is to be able to be compatible with clients running version 1.0 of the protocol.
+ // With time, users should migrate to version 1.2 capable clients.
+ val distManager = cache.getAdvancedCache.getDistributionManager
+ val ch = distManager.getConsistentHash
+
+ val numSegments = ch.getNumSegments
+ val totalNumServers = (0 until numSegments).map(i => ch.locateOwnersForSegment(i).size).sum
+ writeCommonHashTopologyHeader(buf, h.viewId, h.numOwners,
+ h.hashFunction, h.hashSpace, totalNumServers)
+ writeUnsignedInt(1, buf) // Num virtual nodes
+
+ val allDenormalizedHashIds = denormalizeSegmentHashIds(ch)
+ for (segmentIdx <- 0 until numSegments) {
+ val denormalizedSegmentHashIds = allDenormalizedHashIds(segmentIdx)
+ val segmentOwners = ch.locateOwnersForSegment(segmentIdx)
+ for (ownerIdx <- 0 until segmentOwners.length) {
+ val address = segmentOwners(ownerIdx % segmentOwners.size)
+ val serverAddress = members(address)
+ val hashId = denormalizedSegmentHashIds(ownerIdx)
+ log.tracef("Writing hash id %d for %s:%s", hashId, serverAddress.host, serverAddress.port)
+ writeString(serverAddress.host, buf)
+ writeUnsignedShort(serverAddress.port, buf)
+ buf.writeInt(hashId)
+ }
+ }
+ }
+ case t: TopologyAwareResponse => {
+ trace("Return limited hash distribution aware header in spite of having a hash aware client %s", t)
+ val serverAddresses = members.values()
+ writeCommonHashTopologyHeader(buf, t.viewId, 0, 0, 0, serverAddresses.size)
+ writeUnsignedInt(0, buf) // Num virtual nodes
+ serverAddresses.foreach { address =>
+ writeString(address.host, buf)
+ writeUnsignedShort(address.port, buf)
+ buf.writeInt(0) // Address' hash id
+ }
+ }
+ case _ => throw new IllegalStateException(
+ "Expected version 1.1 specific response: " + topoResp)
+ }
+ }
+
+ // "Denormalize" the segments - for each hash segment, find numOwners integer values that map on the hash wheel
+ // to the interval [segmentIdx*segmentSize, segmentIdx*segmentSize+leeway], leeway being hardcoded
+ // on the first line of the function
+ // TODO This relies on implementation details (segment layout) of DefaultConsistentHash, and won't work with any other CH
+ def denormalizeSegmentHashIds(ch: ConsistentHash): Array[Seq[Int]] = {
+ // This is the fraction of keys we allow to have "wrong" owners. The algorithm below takes longer
+ // as this value decreases, and at some point it starts hanging (checked with an assert below)
+ val leewayFraction = 0.0002
+ val numOwners = ch.getNumOwners
+ val numSegments = ch.getNumSegments
+
+ val segmentSize = math.ceil(Integer.MAX_VALUE.toDouble / numSegments).toInt
+ val leeway = (leewayFraction * segmentSize).toInt
+ assert(leeway > 2 * numOwners, "numOwners is too big")
+ val ownerHashes = new Array[collection.mutable.Map[Int, Int]](numSegments)
+ for (i <- 0 until numSegments) {
+ ownerHashes(i) = collection.mutable.Map[Int, Int]()
+ }
+ var segmentsLeft : Int = numSegments
+
+ var i = 0
+ while (segmentsLeft != 0) {
+ val normalizedHash = ch.getHashFunction.hash(i) & Integer.MAX_VALUE
+ if (normalizedHash % segmentSize < leeway) {
+ val nextSegmentIdx = normalizedHash / segmentSize
+ val segmentIdx = (nextSegmentIdx - 1 + numSegments) % numSegments
+ val segmentHashes = ownerHashes(segmentIdx)
+ if (segmentHashes.size < numOwners) {
+ segmentHashes += (normalizedHash -> i)
+ if (segmentHashes.size == numOwners) {
+ segmentsLeft -= 1
+ }
+ }
+ }
+ // Allows overflow, if we didn't find all segments in the 0..MAX_VALUE range
+ i += 1
+ }
+ log.tracef("Found denormalized hashes: %s", ownerHashes)
+
+ // Sort each list of hashes by the normalized hash and then return a list with only the denormalized hash
+ val denormalizedHashes = ownerHashes.map(segmentHashes => segmentHashes.toSeq.sortBy(_._1).map(_._2))
+ return denormalizedHashes
+ }
+}
View
4 server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
@@ -34,7 +34,7 @@ import org.infinispan.server.core.transport.NettyTransport
*
* @author Galder Zamarreño
* @since 4.1
- */
+ */
abstract class AbstractVersionedDecoder {
/**
@@ -106,4 +106,4 @@ abstract class AbstractVersionedDecoder {
* Get an optimized cache instance depending on the operation parameters.
*/
def getOptimizedCache(h: HotRodHeader, c: Cache[ByteArrayKey, CacheValue]): Cache[ByteArrayKey, CacheValue]
-}
+}
View
2  server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedEncoder.scala
@@ -44,4 +44,4 @@ abstract class AbstractVersionedEncoder {
*/
def writeResponse(r: Response, buf: ChannelBuffer, cacheManager: EmbeddedCacheManager, server: HotRodServer)
-}
+}
View
1  server/hotrod/src/main/scala/org/infinispan/server/hotrod/Constants.scala
@@ -31,6 +31,7 @@ trait Constants {
val MAGIC_RES = 0xA1
val VERSION_10: Byte = 10
val VERSION_11: Byte = 11
+ val VERSION_12: Byte = 12
val DEFAULT_HASH_FUNCTION_VERSION: Byte = 2
}
View
2  server/hotrod/src/main/scala/org/infinispan/server/hotrod/CrashedMemberDetectorListener.scala
@@ -76,4 +76,4 @@ class CrashedMemberDetectorListener(cache: Cache[Address, ServerAddress], server
server.setViewId(e.getViewId)
}
-}
+}
View
38 server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
@@ -45,7 +45,7 @@ import transport.NettyTransport
* @author Galder Zamarreño
* @since 4.1
*/
-object Decoder10 extends AbstractVersionedDecoder with Log {
+object Decoder10 extends AbstractVersionedDecoder with ServerConstants with Log {
import OperationResponse._
import ProtocolFlag._
type SuitableHeader = HotRodHeader
@@ -71,12 +71,9 @@ object Decoder10 extends AbstractVersionedDecoder with Log {
"Unknown operation: " + streamOp, version, messageId)
}
if (isTrace) trace("Operation code: %d has been matched to %s", streamOp, op)
-
+
val cacheName = readString(buffer)
- val flag = readUnsignedInt(buffer) match {
- case 0 => NoFlag
- case 1 => ForceReturnPreviousValue
- }
+ val flag = readUnsignedInt(buffer)
val clientIntelligence = buffer.readUnsignedByte
val topologyId = readUnsignedInt(buffer)
// TODO: Use these once transaction support is added
@@ -109,24 +106,33 @@ object Decoder10 extends AbstractVersionedDecoder with Log {
case RemoveRequest => (null, true)
case RemoveIfUnmodifiedRequest => (new RequestParameters(-1, -1, -1, buffer.readLong), true)
case ReplaceIfUnmodifiedRequest => {
- val lifespan = readLifespanOrMaxIdle(buffer)
- val maxIdle = readLifespanOrMaxIdle(buffer)
+ val lifespan = readLifespanOrMaxIdle(buffer, hasFlag(header, ProtocolFlag.DefaultLifespan))
+ val maxIdle = readLifespanOrMaxIdle(buffer, hasFlag(header, ProtocolFlag.DefaultMaxIdle))
val version = buffer.readLong
val valueLength = readUnsignedInt(buffer)
(new RequestParameters(valueLength, lifespan, maxIdle, version), false)
}
case _ => {
- val lifespan = readLifespanOrMaxIdle(buffer)
- val maxIdle = readLifespanOrMaxIdle(buffer)
+ val lifespan = readLifespanOrMaxIdle(buffer, hasFlag(header, ProtocolFlag.DefaultLifespan))
+ val maxIdle = readLifespanOrMaxIdle(buffer, hasFlag(header, ProtocolFlag.DefaultMaxIdle))
val valueLength = readUnsignedInt(buffer)
(new RequestParameters(valueLength, lifespan, maxIdle, -1), false)
}
}
}
- private def readLifespanOrMaxIdle(buffer: ChannelBuffer): Int = {
+ private def hasFlag(h: HotRodHeader, f: ProtocolFlag): Boolean = {
+ (h.flag & f.id) == f.id
+ }
+
+ private def readLifespanOrMaxIdle(buffer: ChannelBuffer, useDefault: Boolean): Int = {
val stream = readUnsignedInt(buffer)
- if (stream <= 0) -1 else stream
+ if (stream <= 0) {
+ if (useDefault)
+ EXPIRATION_DEFAULT
+ else
+ EXPIRATION_NONE
+ } else stream
}
override def createValue(params: RequestParameters, nextVersion: Long, rawValue: Array[Byte]): CacheValue =
@@ -142,7 +148,7 @@ object Decoder10 extends AbstractVersionedDecoder with Log {
createResponse(header, toResponse(header.op), KeyDoesNotExist, null)
private def createResponse(h: HotRodHeader, op: OperationResponse, st: OperationStatus, prev: CacheValue): AnyRef = {
- if (h.flag == ForceReturnPreviousValue)
+ if (hasFlag(h, ForceReturnPreviousValue))
new ResponseWithPrevious(h.version, h.messageId, h.cacheName,
h.clientIntel, op, st, h.topologyId, if (prev == null) None else Some(prev.data))
else
@@ -252,7 +258,7 @@ object Decoder10 extends AbstractVersionedDecoder with Log {
}
override def getOptimizedCache(h: HotRodHeader, c: Cache[ByteArrayKey, CacheValue]): Cache[ByteArrayKey, CacheValue] = {
- if (h.flag != ForceReturnPreviousValue) {
+ if (!hasFlag(h, ForceReturnPreviousValue)) {
c.getAdvancedCache.withFlags(IGNORE_RETURN_VALUES)
} else {
c
@@ -300,5 +306,7 @@ object OperationResponse extends Enumeration {
object ProtocolFlag extends Enumeration {
type ProtocolFlag = Enumeration#Value
val NoFlag = Value
- val ForceReturnPreviousValue = Value
+ val ForceReturnPreviousValue = Value(0x01)
+ val DefaultLifespan = Value(0x02)
+ val DefaultMaxIdle = Value(0x04)
}
View
128 server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoders.scala
@@ -48,128 +48,10 @@ object Encoders {
/**
* Encoder for version 1.1 of the Hot Rod protocol.
*/
- object Encoder11 extends AbstractEncoder1x with Log {
+ object Encoder11 extends AbstractTopologyAwareEncoder1x with Log
- override protected def createHashDistAwareResp(lastViewId: Int,
- cfg: Configuration): AbstractHashDistAwareResponse = {
- HashDistAware11Response(lastViewId, cfg.clustering().hash().numOwners(),
- DEFAULT_HASH_FUNCTION_VERSION, Integer.MAX_VALUE,
- cfg.clustering().hash().numVirtualNodes())
- }
-
- override protected def writeHashTopologyHeader(
- topoResp: AbstractTopologyResponse, buf: ChannelBuffer, r: Response,
- members: Cache[Address, ServerAddress], server: HotRodServer) {
- topoResp match {
- case h: HashDistAware11Response => {
- trace("Write hash distribution change response header %s", h)
- if (h.hashFunction == 0) {
- // When the cache is replicated, we just send the addresses without any hash ids
- writeCommonHashTopologyHeader(buf, h.viewId, h.numOwners,
- h.hashFunction, h.hashSpace, members.size)
- writeUnsignedInt(1, buf) // Num virtual nodes
-
- mapAsScalaMap(members).foreach { case (addr, serverAddr) =>
- writeString(serverAddr.host, buf)
- writeUnsignedShort(serverAddr.port, buf)
- // Send the address' hash code as is
- // With virtual nodes off, clients will have to normalize it
- // With virtual nodes on, it's used as root to calculate
- // hash code and then normalize it
- buf.writeInt(0)
- }
- return
- }
-
- val cache = server.getCacheInstance(r.cacheName, members.getCacheManager, false)
-
- // This is not quite correct, as the ownership of segments on the 1.0/1.1 clients is not exactly
- // the same as on the server. But the difference appears only for (numSegment*numOwners/MAX_INT)
- // of the keys (at the "segment borders"), so it's still much better than having no hash information.
- // The idea here is to be able to be compatible with clients running version 1.0 of the protocol.
- // With time, users should migrate to version 1.2 capable clients.
- val distManager = cache.getAdvancedCache.getDistributionManager
- val ch = distManager.getConsistentHash
-
- val numSegments = ch.getNumSegments
- val totalNumServers = (0 until numSegments).map(i => ch.locateOwnersForSegment(i).size).sum
- writeCommonHashTopologyHeader(buf, h.viewId, h.numOwners,
- h.hashFunction, h.hashSpace, totalNumServers)
- writeUnsignedInt(1, buf) // Num virtual nodes
-
- val allDenormalizedHashIds = denormalizeSegmentHashIds(ch)
- for (segmentIdx <- 0 until numSegments) {
- val denormalizedSegmentHashIds = allDenormalizedHashIds(segmentIdx)
- val segmentOwners = ch.locateOwnersForSegment(segmentIdx)
- for (ownerIdx <- 0 until segmentOwners.length) {
- val address = segmentOwners(ownerIdx % segmentOwners.size)
- val serverAddress = members(address)
- val hashId = denormalizedSegmentHashIds(ownerIdx)
- log.tracef("Writing hash id %d for %s:%s", hashId, serverAddress.host, serverAddress.port)
- writeString(serverAddress.host, buf)
- writeUnsignedShort(serverAddress.port, buf)
- buf.writeInt(hashId)
- }
- }
- }
- case t: TopologyAwareResponse => {
- trace("Return limited hash distribution aware header in spite of having a hash aware client %s", t)
- val serverAddresses = members.values()
- writeCommonHashTopologyHeader(buf, t.viewId, 0, 0, 0, serverAddresses.size)
- writeUnsignedInt(0, buf) // Num virtual nodes
- serverAddresses.foreach { address =>
- writeString(address.host, buf)
- writeUnsignedShort(address.port, buf)
- buf.writeInt(0) // Address' hash id
- }
- }
- case _ => throw new IllegalStateException(
- "Expected version 1.1 specific response: " + topoResp)
- }
- }
-
- // "Denormalize" the segments - for each hash segment, find numOwners integer values that map on the hash wheel
- // to the interval [segmentIdx*segmentSize, segmentIdx*segmentSize+leeway], leeway being hardcoded
- // on the first line of the function
- // TODO This relies on implementation details (segment layout) of DefaultConsistentHash, and won't work with any other CH
- def denormalizeSegmentHashIds(ch: ConsistentHash): Array[Seq[Int]] = {
- // This is the fraction of keys we allow to have "wrong" owners. The algorithm below takes longer
- // as this value decreases, and at some point it starts hanging (checked with an assert below)
- val leewayFraction = 0.0002
- val numOwners = ch.getNumOwners
- val numSegments = ch.getNumSegments
-
- val segmentSize = math.ceil(Integer.MAX_VALUE.toDouble / numSegments).toInt
- val leeway = (leewayFraction * segmentSize).toInt
- assert(leeway > 2 * numOwners, "numOwners is too big")
- val ownerHashes = new Array[collection.mutable.Map[Int, Int]](numSegments)
- for (i <- 0 until numSegments) {
- ownerHashes(i) = collection.mutable.Map[Int, Int]()
- }
- var segmentsLeft : Int = numSegments
-
- var i = 0
- while (segmentsLeft != 0) {
- val normalizedHash = ch.getHashFunction.hash(i) & Integer.MAX_VALUE
- if (normalizedHash % segmentSize < leeway) {
- val nextSegmentIdx = normalizedHash / segmentSize
- val segmentIdx = (nextSegmentIdx - 1 + numSegments) % numSegments
- val segmentHashes = ownerHashes(segmentIdx)
- if (segmentHashes.size < numOwners) {
- segmentHashes += (normalizedHash -> i)
- if (segmentHashes.size == numOwners) {
- segmentsLeft -= 1
- }
- }
- }
- // Allows overflow, if we didn't find all segments in the 0..MAX_VALUE range
- i += 1
- }
- log.tracef("Found denormalized hashes: %s", ownerHashes)
-
- // Sort each list of hashes by the normalized hash and then return a list with only the denormalized hash
- val denormalizedHashes = ownerHashes.map(segmentHashes => segmentHashes.toSeq.sortBy(_._1).map(_._2))
- return denormalizedHashes
- }
- }
+ /**
+ * Encoder for version 1.2 of the Hot Rod protocol.
+ */
+ object Encoder12 extends AbstractTopologyAwareEncoder1x with Log
}
View
4 server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
@@ -77,7 +77,7 @@ class HotRodDecoder(cacheManager: EmbeddedCacheManager, transport: NettyTranspor
try {
val decoder = version match {
- case VERSION_10 | VERSION_11 => Decoder10
+ case VERSION_10 | VERSION_11 | VERSION_12 => Decoder10
case _ => throw new UnknownVersionException(
"Unknown version:" + version, version, messageId)
}
@@ -234,7 +234,7 @@ class HotRodHeader extends RequestHeader {
var version: Byte = _
var messageId: Long = _
var cacheName: String = _
- var flag: ProtocolFlag = _
+ var flag: Int = _
var clientIntel: Short = _
var topologyId: Int = _
var decoder: AbstractVersionedDecoder = _
View
5 server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
@@ -55,11 +55,12 @@ class HotRodEncoder(cacheManager: EmbeddedCacheManager, server: HotRodServer)
val encoder = r.version match {
case VERSION_10 => Encoders.Encoder10
case VERSION_11 => Encoders.Encoder11
- case 0 => Encoders.Encoder11
+ case VERSION_12 => Encoders.Encoder12
+ case 0 => Encoders.Encoder12
}
r.version match {
- case VERSION_10 | VERSION_11 => encoder.writeHeader(r, buf, addressCache, server)
+ case VERSION_10 | VERSION_11 | VERSION_12 => encoder.writeHeader(r, buf, addressCache, server)
// if error before reading version, don't send any topology changes
// cos the encoding might vary from one version to the other
case 0 => encoder.writeHeader(r, buf, null, null)
View
2  server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodOperation.scala
@@ -38,4 +38,4 @@ object HotRodOperation extends Enumeration(20) {
val PingRequest = Value
val BulkGetRequest = Value
-}
+}
View
2  server/hotrod/src/main/scala/org/infinispan/server/hotrod/LifecycleCallbacks.scala
@@ -41,4 +41,4 @@ class LifecycleCallbacks extends AbstractModuleLifecycle {
globalCfg.serialization().advancedExternalizers().put(
SERVER_ADDRESS, new ServerAddress.Externalizer)
-}
+}
View
2  server/hotrod/src/main/scala/org/infinispan/server/hotrod/OperationStatus.scala
@@ -42,4 +42,4 @@ object OperationStatus extends Enumeration {
val ServerError = Value(0x85) // todo: test
val OperationTimedOut = Value(0x86) // todo: test
-}
+}
View
3  server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
@@ -67,8 +67,7 @@ class ResponseWithPrevious(override val version: Byte, override val messageId: L
class GetResponse(override val version: Byte, override val messageId: Long, override val cacheName: String, override val clientIntel: Short,
override val operation: OperationResponse, override val status: OperationStatus,
- override val topologyId: Int,
- val data: Option[Array[Byte]])
+ override val topologyId: Int, val data: Option[Array[Byte]])
extends Response(version, messageId, cacheName, clientIntel, operation, status, topologyId) {
override def toString = {
new StringBuilder().append("GetResponse").append("{")
View
2  server/hotrod/src/main/scala/org/infinispan/server/hotrod/ServerAddress.scala
@@ -108,4 +108,4 @@ object ServerAddress {
}
-}
+}
View
8 server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
@@ -59,7 +59,7 @@ import java.lang.StringBuilder
* @since 4.1
*/
class HotRodClient(host: String, port: Int, defaultCacheName: String, rspTimeoutSeconds: Int, protocolVersion: Byte) extends Log {
- val idToOp = new ConcurrentHashMap[Long, Op]
+ val idToOp = new ConcurrentHashMap[Long, Op]
private lazy val ch: Channel = {
val factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
@@ -74,7 +74,7 @@ class HotRodClient(host: String, port: Int, defaultCacheName: String, rspTimeout
assertTrue(connectFuture.isSuccess)
ch
}
-
+
def stop = ch.disconnect
def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): TestResponse =
@@ -111,12 +111,12 @@ class HotRodClient(host: String, port: Int, defaultCacheName: String, rspTimeout
def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): TestResponse =
execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
-
+
def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): TestResponse =
execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, 1 ,0)
def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): TestResponse =
- execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
+ execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte],
dataVersion: Long): TestResponse =
Please sign in to comment.
Something went wrong with that request. Please try again.