Skip to content

Commit

Permalink
ISPN-5298 HotRod millisecond precision for lifespan/maxidle
Browse files Browse the repository at this point in the history
* Propagated user defined TimeUnit via HotRod for lifespan and maxIdle
* Added Test
* Updated doc
  • Loading branch information
Gustavo Fernandes authored and danberindei committed May 13, 2015
1 parent cbaf14a commit c2d8c09
Show file tree
Hide file tree
Showing 33 changed files with 421 additions and 259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,23 @@ public interface RemoteCache<K, V> extends BasicCache<K, V> {
*/
boolean replaceWithVersion(K key, V newValue, long version, int lifespanSeconds, int maxIdleTimeSeconds);

/**
* A overloaded form of {@link #replaceWithVersion(Object, Object, long)}
* which takes in lifespan and maximum idle time parameters.
*
* @param key key to use
* @param newValue new value to be associated with the key
* @param version numeric version that should match the one in the server
* for the operation to succeed
* @param lifespan lifespan of the entry
* @param lifespanTimeUnit {@link java.util.concurrent.TimeUnit} for lifespan
* @param maxIdle the maximum amount of time this key is allowed
* to be idle for before it is considered as expired
* @param maxIdleTimeUnit {@link java.util.concurrent.TimeUnit} for maxIdle
* @return true if the value was replaced
*/
boolean replaceWithVersion(K key, V newValue, long version, long lifespan, TimeUnit lifespanTimeUnit, long maxIdle, TimeUnit maxIdleTimeUnit);

/**
* @see #replaceWithVersion(Object, Object, long)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,13 @@ public Boolean call() throws Exception {

@Override
public boolean replaceWithVersion(K key, V newValue, long version, int lifespanSeconds, int maxIdleTimeSeconds) {
assertRemoteCacheManagerIsStarted();
long maxIdleNanos = toNanoseconds(maxIdleTimeSeconds, TimeUnit.SECONDS);
long lifespanNanos = toNanoseconds(lifespanSeconds, TimeUnit.SECONDS);
return replaceWithVersion(key, newValue, version, lifespanSeconds, TimeUnit.SECONDS, maxIdleTimeSeconds, TimeUnit.SECONDS);
}

ReplaceIfUnmodifiedOperation op = operationsFactory.newReplaceIfUnmodifiedOperation(obj2bytes(key, true), obj2bytes(newValue, false), lifespanNanos, maxIdleNanos, version);
@Override
public boolean replaceWithVersion(K key, V newValue, long version, long lifespan, TimeUnit lifespanTimeUnit, long maxIdle, TimeUnit maxIdleTimeUnit) {
assertRemoteCacheManagerIsStarted();
ReplaceIfUnmodifiedOperation op = operationsFactory.newReplaceIfUnmodifiedOperation(obj2bytes(key, true), obj2bytes(newValue, false), lifespan, lifespanTimeUnit, maxIdle, maxIdleTimeUnit, version);
VersionedOperationResponse response = op.execute();
return response.getCode().isUpdated();
}
Expand Down Expand Up @@ -168,17 +170,15 @@ public MetadataValue<V> getWithMetadata(K key) {
@Override
public void putAll(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
assertRemoteCacheManagerIsStarted();
int lifespanSecs = toSeconds(lifespan, lifespanUnit);
int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
applyDefaultExpirationFlags(lifespan, maxIdleTime);
if (log.isTraceEnabled()) {
log.tracef("About to putAll entries (%s) lifespanSecs:%d, maxIdleSecs:%d", map, lifespanSecs, maxIdleSecs);
log.tracef("About to putAll entries (%s) lifespan:%d (%s), maxIdle:%d (%s)", map, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
}
Map<byte[], byte[]> byteMap = new HashMap<>();
for (Entry<? extends K, ? extends V> entry : map.entrySet()) {
byteMap.put(obj2bytes(entry.getKey(), true), obj2bytes(entry.getValue(), false));
}
PutAllOperation op = operationsFactory.newPutAllOperation(byteMap, lifespanSecs, maxIdleSecs);
PutAllOperation op = operationsFactory.newPutAllOperation(byteMap, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
op.execute();
}

Expand Down Expand Up @@ -239,14 +239,11 @@ public ServerStatistics stats() {
@Override
public V put(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
assertRemoteCacheManagerIsStarted();
long lifespanNanos = toNanoseconds(lifespan, lifespanUnit);
long maxIdleNanos = toNanoseconds(maxIdleTime, maxIdleTimeUnit);

applyDefaultExpirationFlags(lifespan, maxIdleTime);
if (log.isTraceEnabled()) {
log.tracef("About to add (K,V): (%s, %s) lifespanSecs:%d, maxIdleSecs:%d", key, value, toSeconds(lifespan, lifespanUnit), toSeconds(maxIdleTime, maxIdleTimeUnit));
log.tracef("About to add (K,V): (%s, %s) lifespan:%d, maxIdle:%d", key, value, lifespan, maxIdleTime);
}
PutOperation op = operationsFactory.newPutKeyValueOperation(obj2bytes(key, true), obj2bytes(value, false), lifespanNanos, maxIdleNanos);
PutOperation op = operationsFactory.newPutKeyValueOperation(obj2bytes(key, true), obj2bytes(value, false), lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
byte[] result = op.execute();
return MarshallerUtil.bytes2obj(marshaller, result);
}
Expand All @@ -255,21 +252,17 @@ public V put(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleT
@Override
public V putIfAbsent(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
assertRemoteCacheManagerIsStarted();
long lifespanNanos = toNanoseconds(lifespan, lifespanUnit);
long maxIdleNanos = toNanoseconds(maxIdleTime, maxIdleTimeUnit);
applyDefaultExpirationFlags(lifespan, maxIdleTime);
PutIfAbsentOperation op = operationsFactory.newPutIfAbsentOperation(obj2bytes(key, true), obj2bytes(value, false), lifespanNanos, maxIdleNanos);
PutIfAbsentOperation op = operationsFactory.newPutIfAbsentOperation(obj2bytes(key, true), obj2bytes(value, false), lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
byte[] bytes = op.execute();
return MarshallerUtil.bytes2obj(marshaller, bytes);
}

@Override
public V replace(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
assertRemoteCacheManagerIsStarted();
long lifespanNanos = toNanoseconds(lifespan, lifespanUnit);
long maxIdleNanos = toNanoseconds(maxIdleTime, maxIdleTimeUnit);
applyDefaultExpirationFlags(lifespan, maxIdleTime);
ReplaceOperation op = operationsFactory.newReplaceOperation(obj2bytes(key, true), obj2bytes(value, false), lifespanNanos, maxIdleNanos);
ReplaceOperation op = operationsFactory.newReplaceOperation(obj2bytes(key, true), obj2bytes(value, false), lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
byte[] bytes = op.execute();
return MarshallerUtil.bytes2obj(marshaller, bytes);
}
Expand Down Expand Up @@ -633,21 +626,6 @@ private MetadataValue<V> binary2MetadataValue(MetadataValue<byte[]> value) {
return new MetadataValueImpl<V>(value.getCreated(), value.getLifespan(), value.getLastUsed(), value.getMaxIdle(), value.getVersion(), valueObj);
}

protected static int toSeconds(long duration, TimeUnit timeUnit) {
int seconds = (int) timeUnit.toSeconds(duration);
long inverseDuration = timeUnit.convert(seconds, TimeUnit.SECONDS);

if (duration > inverseDuration) {
//Round up.
seconds++;
}
return seconds;
}

private static long toNanoseconds(long duration, TimeUnit timeUnit) {
return (int) timeUnit.toNanos(duration);
}

private void assertRemoteCacheManagerIsStarted() {
if (!remoteCacheManager.isStarted()) {
String message = "Cannot perform operations on a cache associated with an unstarted RemoteCacheManager. Use RemoteCacheManager.start before using the remote cache.";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.infinispan.client.hotrod.impl;

import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Time unit representation for HotRod
*
* @author gustavonalle
* @since 8.0
*/
public class TimeUnitParam {

private static final Map<TimeUnit, Byte> timeUnitToByte = new EnumMap<>(TimeUnit.class);

static {
timeUnitToByte.put(TimeUnit.SECONDS, (byte) 0);
timeUnitToByte.put(TimeUnit.MILLISECONDS, (byte) 1);
timeUnitToByte.put(TimeUnit.NANOSECONDS, (byte) 2);
timeUnitToByte.put(TimeUnit.MICROSECONDS, (byte) 3);
timeUnitToByte.put(TimeUnit.MINUTES, (byte) 4);
timeUnitToByte.put(TimeUnit.HOURS, (byte) 5);
timeUnitToByte.put(TimeUnit.DAYS, (byte) 6);
}

private static final byte TIME_UNIT_DEFAULT = (byte) 7;
private static final byte TIME_UNIT_INFINITE = (byte) 8;

private static byte encodeDuration(long duration, TimeUnit timeUnit) {
return duration == 0 ? TIME_UNIT_DEFAULT : duration < 0 ? TIME_UNIT_INFINITE : timeUnitToByte.get(timeUnit);
}

public static byte encodeTimeUnits(long lifespan, TimeUnit lifespanTimeUnit, long maxIdle, TimeUnit maxIdleTimeUnit) {
byte encodedLifespan = encodeDuration(lifespan, lifespanTimeUnit);
byte encodedMaxIdle = encodeDuration(maxIdle, maxIdleTimeUnit);
return (byte) (encodedLifespan << 4 | encodedMaxIdle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.infinispan.client.hotrod.impl.VersionedOperationResponse;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.protocol.HeaderParams;
import org.infinispan.client.hotrod.impl.protocol.InternalFlag;
import org.infinispan.client.hotrod.impl.transport.Transport;
import org.infinispan.client.hotrod.impl.transport.TransportFactory;
import org.infinispan.commons.logging.BasicLogFactory;
Expand All @@ -30,13 +29,8 @@ public abstract class AbstractKeyOperation<T> extends RetryOnFailureOperation<T>
protected final byte[] key;

protected AbstractKeyOperation(Codec codec, TransportFactory transportFactory,
byte[] key, byte[] cacheName, AtomicInteger topologyId, Flag[] flags) {
this(codec, transportFactory, key, cacheName, topologyId, flags, null);
}

protected AbstractKeyOperation(Codec codec, TransportFactory transportFactory,
byte[] key, byte[] cacheName, AtomicInteger topologyId, Flag[] flags, InternalFlag[] internalFlags) {
super(codec, transportFactory, cacheName, topologyId, flags, internalFlags);
byte[] key, byte[] cacheName, AtomicInteger topologyId, Flag[] flags) {
super(codec, transportFactory, cacheName, topologyId, flags);
this.key = key;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.infinispan.client.hotrod.Flag;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.protocol.HeaderParams;
import org.infinispan.client.hotrod.impl.protocol.InternalFlag;
import org.infinispan.client.hotrod.impl.transport.Transport;
import org.infinispan.client.hotrod.impl.transport.TransportFactory;

Expand All @@ -19,21 +18,26 @@
*/
@Immutable
public abstract class AbstractKeyValueOperation<T> extends AbstractKeyOperation<T> {
private static final long NANOS_IN_SEC = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);

protected final byte[] value;

protected final long lifespan;

protected final long maxIdle;

protected final TimeUnit lifespanTimeUnit;

protected final TimeUnit maxIdleTimeUnit;

protected AbstractKeyValueOperation(Codec codec, TransportFactory transportFactory, byte[] key, byte[] cacheName,
AtomicInteger topologyId, Flag[] flags, byte[] value,
long lifespan, long maxIdle) {
super(codec, transportFactory, key, cacheName, topologyId, flags, internalFlags(lifespan, maxIdle));
long lifespan, TimeUnit lifespanTimeUnit, long maxIdle, TimeUnit maxIdleTimeUnit) {
super(codec, transportFactory, key, cacheName, topologyId, flags);
this.value = value;
this.lifespan = lifespan;
this.maxIdle = maxIdle;
this.lifespanTimeUnit = lifespanTimeUnit;
this.maxIdleTimeUnit = maxIdleTimeUnit;
}

//[header][key length][key][lifespan][max idle][value length][value]
Expand All @@ -43,7 +47,7 @@ protected short sendPutOperation(Transport transport, short opCode, byte opRespC

// 2) write key and value
transport.writeArray(key);
codec.writeExpirationParams(transport, lifespan, maxIdle, internalFlags);
codec.writeExpirationParams(transport, lifespan, lifespanTimeUnit, maxIdle, maxIdleTimeUnit);
transport.writeArray(value);
transport.flush();

Expand All @@ -52,12 +56,4 @@ protected short sendPutOperation(Transport transport, short opCode, byte opRespC
//return status (not error status for sure)
return readHeaderAndValidate(transport, params);
}

private static InternalFlag[] internalFlags(long lifespan, long maxIdle) {
if ((lifespan % NANOS_IN_SEC != 0) || (maxIdle % NANOS_IN_SEC != 0)) {
return new InternalFlag[] {InternalFlag.NANO_DURATIONS};
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.protocol.HeaderParams;
import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
import org.infinispan.client.hotrod.impl.protocol.InternalFlag;
import org.infinispan.client.hotrod.impl.transport.Transport;

/**
Expand All @@ -23,7 +22,6 @@
public abstract class HotRodOperation implements HotRodConstants {

protected final Flag[] flags;
protected final InternalFlag[] internalFlags;

public final byte[] cacheName;

Expand All @@ -35,12 +33,7 @@ public abstract class HotRodOperation implements HotRodConstants {
private static final byte XA_TX = 1;

protected HotRodOperation(Codec codec, Flag[] flags, byte[] cacheName, AtomicInteger topologyId) {
this(codec, flags, null, cacheName, topologyId);
}

protected HotRodOperation(Codec codec, Flag[] flags, InternalFlag[] internalFlags, byte[] cacheName, AtomicInteger topologyId) {
this.flags = flags;
this.internalFlags = internalFlags;
this.cacheName = cacheName;
this.topologyId = topologyId;
this.codec = codec;
Expand All @@ -50,8 +43,7 @@ protected HotRodOperation(Codec codec, Flag[] flags, InternalFlag[] internalFlag

protected final HeaderParams writeHeader(Transport transport, short operationCode) {
HeaderParams params = new HeaderParams()
.opCode(operationCode).cacheName(cacheName)
.flags(flags).internalFlags(internalFlags)
.opCode(operationCode).cacheName(cacheName).flags(flags)
.clientIntel(CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE)
.topologyId(topologyId).txMarker(NO_TX);
return codec.writeHeader(transport, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -87,10 +88,10 @@ public RemoveIfUnmodifiedOperation newRemoveIfUnmodifiedOperation(byte[] key, lo
}

public ReplaceIfUnmodifiedOperation newReplaceIfUnmodifiedOperation(byte[] key,
byte[] value, long lifespan, long maxIdle, long version) {
byte[] value, long lifespan, TimeUnit lifespanTimeUnit, long maxIdle, TimeUnit maxIdleTimeUnit, long version) {
return new ReplaceIfUnmodifiedOperation(
codec, transportFactory, key, cacheNameBytes, topologyId, flags(),
value, lifespan, maxIdle, version);
value, lifespan, lifespanTimeUnit, maxIdle, maxIdleTimeUnit, version);
}

public GetWithVersionOperation newGetWithVersionOperation(byte[] key) {
Expand All @@ -109,31 +110,31 @@ public StatsOperation newStatsOperation() {
}

public PutOperation newPutKeyValueOperation(byte[] key, byte[] value,
long lifespan, long maxIdle) {
long lifespan, TimeUnit lifespanTimeUnit, long maxIdle, TimeUnit maxIdleTimeUnit) {
return new PutOperation(
codec, transportFactory, key, cacheNameBytes, topologyId, flags(),
value, lifespan, maxIdle);
value, lifespan, lifespanTimeUnit, maxIdle, maxIdleTimeUnit);
}

public PutAllOperation newPutAllOperation(Map<byte[], byte[]> map,
int lifespan, int maxIdle) {
long lifespan, TimeUnit lifespanTimeUnit, long maxIdle, TimeUnit maxIdleTimeUnit) {
return new PutAllOperation(
codec, transportFactory, map, cacheNameBytes, topologyId, flags(),
lifespan, maxIdle);
lifespan, lifespanTimeUnit, maxIdle, maxIdleTimeUnit);
}

public PutIfAbsentOperation newPutIfAbsentOperation(byte[] key, byte[] value,
long lifespan, long maxIdle) {
long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
return new PutIfAbsentOperation(
codec, transportFactory, key, cacheNameBytes, topologyId, flags(),
value, lifespan, maxIdle);
value, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
}

public ReplaceOperation newReplaceOperation(byte[] key, byte[] values,
long lifespan, long maxIdle) {
long lifespan, TimeUnit lifespanTimeUnit, long maxIdle, TimeUnit maxIdleTimeUnit) {
return new ReplaceOperation(
codec, transportFactory, key, cacheNameBytes, topologyId, flags(),
values, lifespan, maxIdle);
values, lifespan, lifespanTimeUnit, maxIdle, maxIdleTimeUnit);
}

public ContainsKeyOperation newContainsKeyOperation(byte[] key) {
Expand Down Expand Up @@ -204,7 +205,7 @@ public QueryOperation newQueryOperation(RemoteQuery remoteQuery) {
public SizeOperation newSizeOperation() {
return new SizeOperation(codec, transportFactory, cacheNameBytes, topologyId, flags());
}

public ExecuteOperation newExecuteOperation(String taskName, Map<String, byte[]> marshalledParams) {
return new ExecuteOperation(codec, transportFactory, cacheNameBytes, topologyId, flags(), taskName, marshalledParams);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import net.jcip.annotations.Immutable;
Expand All @@ -26,22 +27,25 @@ public class PutAllOperation extends RetryOnFailureOperation<Void> {

public PutAllOperation(Codec codec, TransportFactory transportFactory,
Map<byte[], byte[]> map, byte[] cacheName, AtomicInteger topologyId,
Flag[] flags, int lifespan, int maxIdle) {
Flag[] flags, long lifespan, TimeUnit lifespanTimeUnit, long maxIdle, TimeUnit maxIdleTimeUnit) {
super(codec, transportFactory, cacheName, topologyId, flags);
this.map = map;
this.lifespan = lifespan;
this.lifespanTimeUnit = lifespanTimeUnit;
this.maxIdle = maxIdle;
this.maxIdleTimeUnit = maxIdleTimeUnit;
}

protected final Map<byte[], byte[]> map;
protected final int lifespan;
protected final int maxIdle;
protected final long lifespan;
private final TimeUnit lifespanTimeUnit;
protected final long maxIdle;
private final TimeUnit maxIdleTimeUnit;

@Override
protected Void executeOperation(Transport transport) {
HeaderParams params = writeHeader(transport, PUT_ALL_REQUEST);
transport.writeVInt(lifespan);
transport.writeVInt(maxIdle);
codec.writeExpirationParams(transport, lifespan, lifespanTimeUnit, maxIdle, maxIdleTimeUnit);
transport.writeVInt(map.size());
for (Entry<byte[], byte[]> entry : map.entrySet()) {
transport.writeArray(entry.getKey());
Expand Down
Loading

0 comments on commit c2d8c09

Please sign in to comment.