Skip to content

Commit

Permalink
ISPN-5758 Clients should find out if compatibility is enabled
Browse files Browse the repository at this point in the history
* Hot Rod servers now provide different status identifiers when
  compatibility is enabled.
* A new version of the Hot Rod protocol, version 2.4, has been created
  to support the new status ids.
* Clients need to know whether compatibility is enabled or not to be
  able to apply correct unmarshalling when keys and/or values are
  returned back as part of the response.
* Version handling in both the client and server has been centralised in
  order to make it easier to do a version upgrade.
  • Loading branch information
galderz committed Oct 8, 2015
1 parent 42ae266 commit e9c34c7
Show file tree
Hide file tree
Showing 46 changed files with 398 additions and 235 deletions.
Expand Up @@ -555,8 +555,6 @@ public void start() {
// Workaround for JDK6 NPE: http://bugs.sun.com/view_bug.do?bug_id=6427854
SecurityActions.setProperty("sun.nio.ch.bugLevel", "\"\"");

codec = CodecFactory.getCodec(configuration.protocolVersion());

transportFactory = Util.getInstance(configuration.transportFactory());

if (marshaller == null) {
Expand All @@ -566,6 +564,8 @@ public void start() {
}
}

codec = CodecFactory.getCodec(configuration.protocolVersion());

if (asyncExecutorService == null) {
ExecutorFactory executorFactory = configuration.asyncExecutorFactory().factory();
if (executorFactory == null) {
Expand Down
Expand Up @@ -51,6 +51,7 @@ public class ConfigurationProperties {
public static final int DEFAULT_SO_TIMEOUT = 60000;
public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
public static final int DEFAULT_MAX_RETRIES = 10;
public static final String PROTOCOL_VERSION_24 = "2.4";
public static final String PROTOCOL_VERSION_23 = "2.3";
public static final String PROTOCOL_VERSION_22 = "2.2";
public static final String PROTOCOL_VERSION_21 = "2.1";
Expand All @@ -59,7 +60,7 @@ public class ConfigurationProperties {
public static final String PROTOCOL_VERSION_12 = "1.2";
public static final String PROTOCOL_VERSION_11 = "1.1";
public static final String PROTOCOL_VERSION_10 = "1.0";
public static final String DEFAULT_PROTOCOL_VERSION = PROTOCOL_VERSION_23;
public static final String DEFAULT_PROTOCOL_VERSION = PROTOCOL_VERSION_24;

private final TypedProperties props;

Expand Down
Expand Up @@ -29,7 +29,6 @@
import org.infinispan.client.hotrod.impl.iteration.RemoteCloseableIterator;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.client.hotrod.marshall.MarshallerUtil;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.concurrent.NotifyingFuture;
Expand Down Expand Up @@ -79,8 +78,8 @@ public RemoteCacheManager getRemoteCacheManager() {
@Override
public boolean removeWithVersion(K key, long version) {
assertRemoteCacheManagerIsStarted();
RemoveIfUnmodifiedOperation op = operationsFactory.newRemoveIfUnmodifiedOperation(obj2bytes(key, true), version);
VersionedOperationResponse response = op.execute();
RemoveIfUnmodifiedOperation<V> op = operationsFactory.newRemoveIfUnmodifiedOperation(obj2bytes(key, true), version);
VersionedOperationResponse<V> response = op.execute();
return response.getCode().isUpdated();
}

Expand Down Expand Up @@ -188,17 +187,15 @@ public CloseableIterator<Entry<Object, Object>> retrieveEntries(String filterCon
@Override
public VersionedValue<V> getVersioned(K key) {
assertRemoteCacheManagerIsStarted();
GetWithVersionOperation op = operationsFactory.newGetWithVersionOperation(obj2bytes(key, true));
VersionedValue<byte[]> value = op.execute();
return binary2VersionedValue(value);
GetWithVersionOperation<V> op = operationsFactory.newGetWithVersionOperation(obj2bytes(key, true));
return op.execute();
}

@Override
public MetadataValue<V> getWithMetadata(K key) {
assertRemoteCacheManagerIsStarted();
GetWithMetadataOperation op = operationsFactory.newGetWithMetadataOperation(obj2bytes(key, true));
MetadataValue<byte[]> value = op.execute();
return binary2MetadataValue(value);
GetWithMetadataOperation<V> op = operationsFactory.newGetWithMetadataOperation(obj2bytes(key, true));
return op.execute();
}

@Override
Expand Down Expand Up @@ -277,28 +274,25 @@ public V put(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleT
if (log.isTraceEnabled()) {
log.tracef("About to add (K,V): (%s, %s) lifespan:%d, maxIdle:%d", key, value, lifespan, maxIdleTime);
}
PutOperation op = operationsFactory.newPutKeyValueOperation(obj2bytes(key, true), obj2bytes(value, false), lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
byte[] result = op.execute();
return MarshallerUtil.bytes2obj(marshaller, result);
PutOperation<V> op = operationsFactory.newPutKeyValueOperation(obj2bytes(key, true), obj2bytes(value, false), lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
return op.execute();
}


@Override
public V putIfAbsent(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
assertRemoteCacheManagerIsStarted();
applyDefaultExpirationFlags(lifespan, maxIdleTime);
PutIfAbsentOperation op = operationsFactory.newPutIfAbsentOperation(obj2bytes(key, true), obj2bytes(value, false), lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
byte[] bytes = op.execute();
return MarshallerUtil.bytes2obj(marshaller, bytes);
PutIfAbsentOperation<V> op = operationsFactory.newPutIfAbsentOperation(obj2bytes(key, true), obj2bytes(value, false), lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
return op.execute();
}

@Override
public V replace(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
assertRemoteCacheManagerIsStarted();
applyDefaultExpirationFlags(lifespan, maxIdleTime);
ReplaceOperation op = operationsFactory.newReplaceOperation(obj2bytes(key, true), obj2bytes(value, false), lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
byte[] bytes = op.execute();
return MarshallerUtil.bytes2obj(marshaller, bytes);
ReplaceOperation<V> op = operationsFactory.newReplaceOperation(obj2bytes(key, true), obj2bytes(value, false), lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
return op.execute();
}

@Override
Expand Down Expand Up @@ -462,9 +456,8 @@ public boolean containsKey(Object key) {
public V get(Object key) {
assertRemoteCacheManagerIsStarted();
byte[] keyBytes = obj2bytes(key, true);
GetOperation gco = operationsFactory.newGetKeyOperation(keyBytes);
byte[] bytes = gco.execute();
V result = MarshallerUtil.bytes2obj(marshaller, bytes);
GetOperation<V> gco = operationsFactory.newGetKeyOperation(keyBytes);
V result = gco.execute();
if (log.isTraceEnabled()) {
log.tracef("For key(%s) returning %s", key, result);
}
Expand All @@ -481,15 +474,9 @@ public Map<K, V> getAll(Set<? extends K> keys) {
for (K key : keys) {
byteKeys.add(obj2bytes(key, true));
}
GetAllOperation op = operationsFactory.newGetAllOperation(byteKeys);
Map<byte[], byte[]> result = op.execute();
Map<K,V> toReturn = new HashMap<K,V>();
for (Map.Entry<byte[], byte[]> entry : result.entrySet()) {
V value = MarshallerUtil.bytes2obj(marshaller, entry.getValue());
K key = MarshallerUtil.bytes2obj(marshaller, entry.getKey());
toReturn.put(key, value);
}
return Collections.unmodifiableMap(toReturn);
GetAllOperation<K, V> op = operationsFactory.newGetAllOperation(byteKeys);
Map<K, V> result = op.execute();
return Collections.unmodifiableMap(result);
}

@Override
Expand All @@ -500,25 +487,18 @@ public Map<K, V> getBulk() {
@Override
public Map<K, V> getBulk(int size) {
assertRemoteCacheManagerIsStarted();
BulkGetOperation op = operationsFactory.newBulkGetOperation(size);
Map<byte[], byte[]> result = op.execute();
Map<K,V> toReturn = new HashMap<K,V>();
for (Map.Entry<byte[], byte[]> entry : result.entrySet()) {
V value = MarshallerUtil.bytes2obj(marshaller, entry.getValue());
K key = MarshallerUtil.bytes2obj(marshaller, entry.getKey());
toReturn.put(key, value);
}
return Collections.unmodifiableMap(toReturn);
BulkGetOperation<K, V> op = operationsFactory.newBulkGetOperation(size);
Map<K, V> result = op.execute();
return Collections.unmodifiableMap(result);
}

@Override
public V remove(Object key) {
assertRemoteCacheManagerIsStarted();
RemoveOperation removeOperation = operationsFactory.newRemoveOperation(obj2bytes(key, true));
byte[] existingValue = removeOperation.execute();
RemoveOperation<V> removeOperation = operationsFactory.newRemoveOperation(obj2bytes(key, true));
// TODO: It sucks that you need the prev value to see if it works...
// We need to find a better API for RemoteCache...
return MarshallerUtil.bytes2obj(marshaller, existingValue);
return removeOperation.execute();
}

@Override
Expand Down Expand Up @@ -651,20 +631,6 @@ private byte[] obj2bytes(Object o, boolean isKey) {
}
}

private VersionedValue<V> binary2VersionedValue(VersionedValue<byte[]> value) {
if (value == null)
return null;
V valueObj = MarshallerUtil.bytes2obj(marshaller, value.getValue());
return new VersionedValueImpl<V>(value.getVersion(), valueObj);
}

private MetadataValue<V> binary2MetadataValue(MetadataValue<byte[]> value) {
if (value == null)
return null;
V valueObj = MarshallerUtil.bytes2obj(marshaller, value.getValue());
return new MetadataValueImpl<V>(value.getCreated(), value.getLifespan(), value.getLastUsed(), value.getMaxIdle(), value.getVersion(), valueObj);
}

private void assertRemoteCacheManagerIsStarted() {
if (!remoteCacheManager.isStarted()) {
String message = "Cannot perform operations on a cache associated with an unstarted RemoteCacheManager. Use RemoteCacheManager.start before using the remote cache.";
Expand Down Expand Up @@ -696,14 +662,8 @@ private void applyDefaultExpirationFlags(long lifespan, long maxIdle) {
public Set<K> keySet() {
assertRemoteCacheManagerIsStarted();
// Use default scope
BulkGetKeysOperation op = operationsFactory.newBulkGetKeysOperation(0);
Set<byte[]> result = op.execute();
Set<K> toReturn = new HashSet<K>();
for (byte[] keyBytes : result) {
K key = MarshallerUtil.bytes2obj(marshaller, keyBytes);
toReturn.add(key);
}
return Collections.unmodifiableSet(toReturn);
BulkGetKeysOperation<K> op = operationsFactory.newBulkGetKeysOperation(0);
return Collections.unmodifiableSet(op.execute());
}

@Override
Expand All @@ -715,8 +675,8 @@ public <T> T execute(String taskName, Map<String, ?> params) {
marshalledParams.put(entry.getKey(), obj2bytes(entry.getValue(), false));
}
}
ExecuteOperation op = operationsFactory.newExecuteOperation(taskName, marshalledParams);
return MarshallerUtil.bytes2obj(marshaller, op.execute());
ExecuteOperation<T> op = operationsFactory.newExecuteOperation(taskName, marshalledParams);
return op.execute();
}

@Override
Expand Down
Expand Up @@ -4,7 +4,7 @@
* @author Mircea.Markus@jboss.com
* @since 4.1
*/
public class VersionedOperationResponse {
public class VersionedOperationResponse<V> {

public enum RspCode {
SUCCESS(true), NO_SUCH_KEY(false), MODIFIED_KEY(false);
Expand All @@ -19,17 +19,17 @@ public boolean isUpdated() {
}
}

private byte[] value;
private V value;

private RspCode code;


public VersionedOperationResponse(byte[] value, RspCode code) {
public VersionedOperationResponse(V value, RspCode code) {
this.value = value;
this.code = code;
}

public byte[] getValue() {
public V getValue() {
return value;
}

Expand Down
Expand Up @@ -8,6 +8,7 @@
import org.infinispan.client.hotrod.impl.operations.IterationStartOperation;
import org.infinispan.client.hotrod.impl.operations.IterationStartResponse;
import org.infinispan.client.hotrod.impl.operations.OperationsFactory;
import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
import org.infinispan.client.hotrod.impl.transport.Transport;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
Expand All @@ -21,8 +22,6 @@
import java.util.Queue;
import java.util.Set;

import static org.infinispan.client.hotrod.impl.protocol.HotRodConstants.INVALID_ITERATION;
import static org.infinispan.client.hotrod.impl.protocol.HotRodConstants.NO_ERROR_STATUS;
import static org.infinispan.client.hotrod.marshall.MarshallerUtil.bytes2obj;

/**
Expand Down Expand Up @@ -59,10 +58,10 @@ public void close() {
IterationEndResponse endResponse = operationsFactory.newIterationEndOperation(iterationId, transport).execute();
short status = endResponse.getStatus();

if (status == NO_ERROR_STATUS) {
if (HotRodConstants.isSuccess(status)) {
log.iterationClosed(iterationId);
}
if (endResponse.getStatus() == INVALID_ITERATION) {
if (HotRodConstants.isInvalidIteration(status)) {
throw log.errorClosingIteration(iterationId);
}
}
Expand All @@ -88,7 +87,7 @@ private void fetch() {
while (nextElements.isEmpty() && !endOfIteration) {
IterationNextResponse iterationNextResponse = iterationNextOperation.execute();
short status = iterationNextResponse.getStatus();
if (status == INVALID_ITERATION) {
if (HotRodConstants.isInvalidIteration(status)) {
throw log.errorRetrievingNext(iterationId);
}
Entry<byte[], byte[]>[] entries = iterationNextResponse.getEntries();
Expand All @@ -97,9 +96,10 @@ private void fetch() {
endOfIteration = true;
break;
}

for (Entry<byte[], byte[]> entry : entries) {
if (segmentKeyTracker.track(entry.getKey())) {
nextElements.add(new SimpleEntry<>(unmarshall(entry.getKey()), unmarshall(entry.getValue())));
nextElements.add(new SimpleEntry<>(unmarshall(entry.getKey(), status), unmarshall(entry.getValue(), status)));
}
}
segmentKeyTracker.segmentsFinished(iterationNextResponse.getFinishedSegments());
Expand All @@ -112,9 +112,8 @@ private void fetch() {
}
}


private Object unmarshall(byte[] bytes) {
return bytes2obj(marshaller, bytes);
private Object unmarshall(byte[] bytes, short status) {
return bytes2obj(marshaller, bytes, status);
}

private void restartIteration(Set<Integer> missedSegments) {
Expand Down
Expand Up @@ -9,6 +9,7 @@
import org.infinispan.client.hotrod.impl.VersionedOperationResponse;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.protocol.HeaderParams;
import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
import org.infinispan.client.hotrod.impl.transport.Transport;
import org.infinispan.client.hotrod.impl.transport.TransportFactory;
import org.infinispan.commons.logging.BasicLogFactory;
Expand Down Expand Up @@ -53,8 +54,8 @@ protected short sendKeyOperation(byte[] key, Transport transport, byte opCode, b
return readHeaderAndValidate(transport, params);
}

protected byte[] returnPossiblePrevValue(Transport transport, short status) {
return codec.returnPossiblePrevValue(transport, status, flags);
protected T returnPossiblePrevValue(Transport transport, short status) {
return (T) codec.returnPossiblePrevValue(transport, status, flags);
}

protected VersionedOperationResponse returnVersionedOperationResponse(Transport transport, HeaderParams params) {
Expand All @@ -63,16 +64,16 @@ protected VersionedOperationResponse returnVersionedOperationResponse(Transport

//4 ...
VersionedOperationResponse.RspCode code;
if (respStatus == NO_ERROR_STATUS || respStatus == SUCCESS_WITH_PREVIOUS) {
if (HotRodConstants.isSuccess(respStatus)) {
code = VersionedOperationResponse.RspCode.SUCCESS;
} else if (respStatus == NOT_PUT_REMOVED_REPLACED_STATUS || respStatus == NOT_EXECUTED_WITH_PREVIOUS) {
} else if (HotRodConstants.isNotExecuted(respStatus)) {
code = VersionedOperationResponse.RspCode.MODIFIED_KEY;
} else if (respStatus == KEY_DOES_NOT_EXIST_STATUS) {
} else if (HotRodConstants.isNotExist(respStatus)) {
code = VersionedOperationResponse.RspCode.NO_SUCH_KEY;
} else {
throw new IllegalStateException("Unknown response status: " + Integer.toHexString(respStatus));
}
byte[] prevValue = returnPossiblePrevValue(transport, respStatus);
Object prevValue = returnPossiblePrevValue(transport, respStatus);
return new VersionedOperationResponse(prevValue, code);
}
}
Expand Up @@ -6,6 +6,7 @@
import org.infinispan.client.hotrod.event.ClientEvent;
import org.infinispan.client.hotrod.event.ClientListenerNotifier;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
import org.infinispan.commons.util.Either;
import org.infinispan.client.hotrod.impl.protocol.HeaderParams;
import org.infinispan.client.hotrod.impl.transport.Transport;
Expand Down Expand Up @@ -95,7 +96,7 @@ protected Short executeOperation(Transport transport) {
either = codec.readHeaderOrEvent(dedicatedTransport, params, listenerId, listenerNotifier.getMarshaller());
switch(either.type()) {
case LEFT:
if (either.left() == NO_ERROR_STATUS)
if (HotRodConstants.isSuccess(either.left()))
listenerNotifier.startClientListener(listenerId);
else // If error, remove it
listenerNotifier.removeClientListener(listenerId);
Expand Down
Expand Up @@ -17,7 +17,7 @@
* @author <a href="mailto:rtsang@redhat.com">Ray Tsang</a>
* @since 5.2
*/
public class BulkGetKeysOperation extends RetryOnFailureOperation<Set<byte[]>> {
public class BulkGetKeysOperation<K> extends RetryOnFailureOperation<Set<K>> {
private final int scope;

public BulkGetKeysOperation(Codec codec, TransportFactory transportFactory, byte[] cacheName, AtomicInteger topologyId, Flag[] flags, int scope) {
Expand All @@ -31,14 +31,14 @@ protected Transport getTransport(int retryCount, Set<SocketAddress> failedServer
}

@Override
protected Set<byte[]> executeOperation(Transport transport) {
protected Set<K> executeOperation(Transport transport) {
HeaderParams params = writeHeader(transport, BULK_GET_KEYS_REQUEST);
transport.writeVInt(scope);
transport.flush();
readHeaderAndValidate(transport, params);
Set<byte[]> result = new HashSet<byte[]>();
short status = readHeaderAndValidate(transport, params);
Set<K> result = new HashSet<K>();
while ( transport.readByte() == 1) { //there's more!
result.add(transport.readArray());
result.add(codec.readUnmarshallByteArray(transport, status));
}
return result;
}
Expand Down

0 comments on commit e9c34c7

Please sign in to comment.