Skip to content

Commit

Permalink
ISPN-7420 Hot Rod enhancements for transcoding
Browse files Browse the repository at this point in the history
  • Loading branch information
Gustavo Fernandes authored and tristantarrant committed May 10, 2018
1 parent e6a4f99 commit c630752
Show file tree
Hide file tree
Showing 138 changed files with 1,938 additions and 635 deletions.
@@ -0,0 +1,157 @@
package org.infinispan.client.hotrod;

import static org.infinispan.client.hotrod.marshall.MarshallerUtil.bytes2obj;
import static org.infinispan.client.hotrod.marshall.MarshallerUtil.obj2bytes;

import java.util.List;

import org.infinispan.client.hotrod.impl.MarshallerRegistry;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.marshall.IdentityMarshaller;
import org.infinispan.commons.marshall.Marshaller;

/**
* Defines data format for keys and values during Hot Rod client requests.
*
* @since 9.3
*/
public final class DataFormat {

private static final Log log = LogFactory.getLog(DataFormat.class, Log.class);

private final MediaType keyType;
private final MediaType valueType;
private final Marshaller keyMarshaller;
private final Marshaller valueMarshaller;

private MarshallerRegistry marshallerRegistry;
private Marshaller defaultMarshaller;

private DataFormat(MediaType keyType, MediaType valueType, Marshaller keyMarshaller, Marshaller valueMarshaller) {
this.keyType = keyType;
this.valueType = valueType;
this.keyMarshaller = keyMarshaller;
this.valueMarshaller = valueMarshaller;
}

public DataFormat withoutValueType() {
return new DataFormat(keyType, null, keyMarshaller, null);
}

public MediaType getKeyType() {
return keyType;
}

public MediaType getValueType() {
return valueType;
}

public void initialize(RemoteCacheManager remoteCacheManager) {
this.marshallerRegistry = remoteCacheManager.getMarshallerRegistry();
this.defaultMarshaller = remoteCacheManager.getMarshaller();
}

private Marshaller resolveValueMarshaller() {
if (valueMarshaller != null) return valueMarshaller;
if (valueType == null) return defaultMarshaller;

Marshaller forValueType = marshallerRegistry.getMarshaller(valueType);
if (forValueType != null) return forValueType;
log.debugf("No marshaller registered for %s, using no-op marshaller", valueType);

return IdentityMarshaller.INSTANCE;
}

private Marshaller resolveKeyMarshaller() {
if (keyMarshaller != null) return keyMarshaller;
if (keyType == null) return defaultMarshaller;

Marshaller forKeyType = marshallerRegistry.getMarshaller(keyType);
if (forKeyType != null) return forKeyType;
log.debugf("No marshaller registered for %s, using no-op marshaller", keyType);

return IdentityMarshaller.INSTANCE;
}

public boolean hasCustomFormat() {
return keyType != null || valueType != null;
}

public byte[] keyToBytes(Object key, int estimateKeySize, int estimateValueSize) {
Marshaller keyMarshaller = resolveKeyMarshaller();
return obj2bytes(keyMarshaller, key, true, estimateKeySize, estimateValueSize);
}

public byte[] valueToBytes(Object value, int estimateKeySize, int estimateValueSize) {
Marshaller valueMarshaller = resolveValueMarshaller();
return obj2bytes(valueMarshaller, value, false, estimateKeySize, estimateValueSize);
}

public <T> T keyToObj(byte[] bytes, short status, List<String> whitelist) {
Marshaller keyMarshaller = resolveKeyMarshaller();
return bytes2obj(keyMarshaller, bytes, status, whitelist);
}

public <T> T valueToObj(byte[] bytes, short status, List<String> whitelist) {
Marshaller valueMarshaller = resolveValueMarshaller();
return bytes2obj(valueMarshaller, bytes, status, whitelist);
}

@Override
public String toString() {
return "DataFormat{" +
"keyType=" + keyType +
", valueType=" + valueType +
", keyMarshaller=" + keyMarshaller +
", valueMarshaller=" + valueMarshaller +
", marshallerRegistry=" + marshallerRegistry +
", defaultMarshaller=" + defaultMarshaller +
'}';
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private MediaType keyType;
private MediaType valueType;
private Marshaller valueMarshaller;
private Marshaller keyMarshaller;

public Builder from(DataFormat dataFormat) {
this.keyType = dataFormat.keyType;
this.valueType = dataFormat.valueType;
this.keyMarshaller = dataFormat.keyMarshaller;
this.valueMarshaller = dataFormat.valueMarshaller;
return this;
}

public Builder valueMarshaller(Marshaller valueMarshaller) {
this.valueMarshaller = valueMarshaller;
return this;
}

public Builder keyMarshaller(Marshaller keyMarshaller) {
this.keyMarshaller = keyMarshaller;
return this;
}

public Builder keyType(MediaType keyType) {
this.keyType = keyType;
return this;
}

public Builder valueType(MediaType valueType) {
this.valueType = valueType;
return this;
}

public DataFormat build() {
return new DataFormat(keyType, valueType, keyMarshaller, valueMarshaller);
}

}
}
Expand Up @@ -458,4 +458,14 @@ public interface RemoteCache<K, V> extends BasicCache<K, V> {
* Returns a cache where values are manipulated using {@link java.io.InputStream} and {@link java.io.OutputStream} * Returns a cache where values are manipulated using {@link java.io.InputStream} and {@link java.io.OutputStream}
*/ */
StreamingRemoteCache<K> streaming(); StreamingRemoteCache<K> streaming();

/**
* Return a new instance of {@link RemoteCache} using the supplied {@link DataFormat}.
*/
<T, U> RemoteCache<T, U> withDataFormat(DataFormat dataFormat);

/**
* Return the currently {@link DataFormat} being used.
*/
DataFormat getDataFormat();
} }
Expand Up @@ -25,6 +25,7 @@
import org.infinispan.client.hotrod.event.impl.ClientListenerNotifier; import org.infinispan.client.hotrod.event.impl.ClientListenerNotifier;
import org.infinispan.client.hotrod.exceptions.HotRodClientException; import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.client.hotrod.impl.InvalidatedNearRemoteCache; import org.infinispan.client.hotrod.impl.InvalidatedNearRemoteCache;
import org.infinispan.client.hotrod.impl.MarshallerRegistry;
import org.infinispan.client.hotrod.impl.RemoteCacheImpl; import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
import org.infinispan.client.hotrod.impl.RemoteCacheManagerAdminImpl; import org.infinispan.client.hotrod.impl.RemoteCacheManagerAdminImpl;
import org.infinispan.client.hotrod.impl.operations.OperationsFactory; import org.infinispan.client.hotrod.impl.operations.OperationsFactory;
Expand All @@ -39,6 +40,7 @@
import org.infinispan.commons.api.CacheContainerAdmin; import org.infinispan.commons.api.CacheContainerAdmin;
import org.infinispan.commons.executors.ExecutorFactory; import org.infinispan.commons.executors.ExecutorFactory;
import org.infinispan.commons.marshall.Marshaller; import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.UTF8StringMarshaller;
import org.infinispan.commons.marshall.jboss.GenericJBossMarshaller; import org.infinispan.commons.marshall.jboss.GenericJBossMarshaller;
import org.infinispan.commons.util.FileLookupFactory; import org.infinispan.commons.util.FileLookupFactory;
import org.infinispan.commons.util.Util; import org.infinispan.commons.util.Util;
Expand Down Expand Up @@ -72,6 +74,7 @@ public class RemoteCacheManager implements RemoteCacheContainer, Closeable {


private volatile boolean started = false; private volatile boolean started = false;
private final Map<RemoteCacheKey, RemoteCacheHolder> cacheName2RemoteCache = new HashMap<>(); private final Map<RemoteCacheKey, RemoteCacheHolder> cacheName2RemoteCache = new HashMap<>();
private final MarshallerRegistry marshallerRegistry = new MarshallerRegistry();
private final AtomicInteger defaultCacheTopologyId = new AtomicInteger(HotRodConstants.DEFAULT_CACHE_TOPOLOGY); private final AtomicInteger defaultCacheTopologyId = new AtomicInteger(HotRodConstants.DEFAULT_CACHE_TOPOLOGY);
private Configuration configuration; private Configuration configuration;
private Codec codec; private Codec codec;
Expand Down Expand Up @@ -225,6 +228,8 @@ public void start() {
marshaller = Util.getInstance(clazz); marshaller = Util.getInstance(clazz);
} }
} }
marshallerRegistry.registerMarshaller(marshaller);
marshallerRegistry.registerMarshaller(new UTF8StringMarshaller());


codec = CodecFactory.getCodec(configuration.version()); codec = CodecFactory.getCodec(configuration.version());


Expand All @@ -235,7 +240,7 @@ public void start() {
} }
ExecutorService asyncExecutorService = executorFactory.getExecutor(configuration.asyncExecutorFactory().properties()); ExecutorService asyncExecutorService = executorFactory.getExecutor(configuration.asyncExecutorFactory().properties());
channelFactory.start(codec, configuration, defaultCacheTopologyId, marshaller, asyncExecutorService, channelFactory.start(codec, configuration, defaultCacheTopologyId, marshaller, asyncExecutorService,
listenerNotifier, Collections.singletonList(listenerNotifier::failoverListeners)); listenerNotifier, Collections.singletonList(listenerNotifier::failoverListeners), marshallerRegistry);
counterManager.start(channelFactory, codec, configuration, listenerNotifier); counterManager.start(channelFactory, codec, configuration, listenerNotifier);


synchronized (cacheName2RemoteCache) { synchronized (cacheName2RemoteCache) {
Expand All @@ -261,6 +266,10 @@ private final void warnAboutUberJarDuplicates() {
}); });
} }


public MarshallerRegistry getMarshallerRegistry() {
return marshallerRegistry;
}

/** /**
* Stop the remote cache manager, disconnecting all existing connections. * Stop the remote cache manager, disconnecting all existing connections.
* As part of the disconnection, all registered client cache listeners will * As part of the disconnection, all registered client cache listeners will
Expand Down
Expand Up @@ -35,7 +35,7 @@ abstract class BaseCounterOperation<T> extends RetryOnFailureOperation<T> {


BaseCounterOperation(short requestCode, short responseCode, Codec codec, ChannelFactory channelFactory, AtomicInteger topologyId, Configuration cfg, BaseCounterOperation(short requestCode, short responseCode, Codec codec, ChannelFactory channelFactory, AtomicInteger topologyId, Configuration cfg,
String counterName) { String counterName) {
super(requestCode, responseCode, codec, channelFactory, EMPTY_CACHE_NAME, topologyId, 0, cfg); super(requestCode, responseCode, codec, channelFactory, EMPTY_CACHE_NAME, topologyId, 0, cfg, null);
this.counterName = counterName; this.counterName = counterName;
} }


Expand Down
Expand Up @@ -10,6 +10,7 @@
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


import org.infinispan.client.hotrod.DataFormat;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated; import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired; import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified; import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
Expand Down Expand Up @@ -131,6 +132,10 @@ protected void invokeFailoverEvent() {
} }
} }


protected DataFormat getDataFormat() {
return op.getDataFormat();
}

static final class ClientListenerInvocation { static final class ClientListenerInvocation {
private static final Log log = LogFactory.getLog(ClientListenerInvocation.class, Log.class); private static final Log log = LogFactory.getLog(ClientListenerInvocation.class, Log.class);


Expand Down
Expand Up @@ -13,6 +13,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.infinispan.client.hotrod.DataFormat;
import org.infinispan.client.hotrod.impl.protocol.Codec; import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelFactory; import org.infinispan.client.hotrod.impl.transport.netty.ChannelFactory;
import org.infinispan.client.hotrod.logging.Log; import org.infinispan.client.hotrod.logging.Log;
Expand Down Expand Up @@ -176,6 +177,14 @@ public <T> void invokeEvent(byte[] listenerId, T event) {
eventDispatcher.invokeEvent(event); eventDispatcher.invokeEvent(event);
} }


public DataFormat getCacheDataFormat(byte[] listenerId) {
ClientEventDispatcher clientEventDispatcher = (ClientEventDispatcher) dispatchers.get(new WrappedByteArray(listenerId));
if (clientEventDispatcher == null) {
throw log.unexpectedListenerId(Util.printArray(listenerId));
}
return clientEventDispatcher.getDataFormat();
}

public Codec codec() { public Codec codec() {
return codec; return codec;
} }
Expand Down
@@ -0,0 +1,30 @@
package org.infinispan.client.hotrod.impl;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.marshall.Marshaller;

/**
* A registry of {@link Marshaller} along with its {@link MediaType}.
*
* @since 9.3
*/
public final class MarshallerRegistry {

public static final Log log = LogFactory.getLog(MarshallerRegistry.class, Log.class);

private final Map<MediaType, Marshaller> marshallerByMediaType = new ConcurrentHashMap<>();

public void registerMarshaller(Marshaller marshaller) {
marshallerByMediaType.put(marshaller.mediaType().withoutParameters(), marshaller);
}

public Marshaller getMarshaller(MediaType mediaType) {
return marshallerByMediaType.get(mediaType);
}

}

0 comments on commit c630752

Please sign in to comment.