Skip to content

Commit

Permalink
ISPN-9108 Hot Rod Lightweight Transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo authored and tristantarrant committed May 17, 2018
1 parent cafa808 commit 3207a9a
Show file tree
Hide file tree
Showing 71 changed files with 4,009 additions and 155 deletions.
5 changes: 5 additions & 0 deletions all/remote/pom.xml
Expand Up @@ -70,6 +70,11 @@
<scope>provided</scope>
</dependency>

<!-- it is excluded below -->
<dependency>
<groupId>org.jboss.spec.javax.transaction</groupId>
<artifactId>jboss-transaction-api_1.1_spec</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
18 changes: 18 additions & 0 deletions client/hotrod-client/pom.xml
Expand Up @@ -120,6 +120,12 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.jboss.spec.javax.transaction</groupId>
<artifactId>jboss-transaction-api_1.1_spec</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>sample-domain-definition</artifactId>
Expand All @@ -137,6 +143,18 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.geronimo.components</groupId>
<artifactId>geronimo-transaction</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.objectweb.howl</groupId>
<artifactId>howl</artifactId>
<version>${version.howl}</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.apache.karaf.features</groupId>
Expand Down
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.TimeUnit;

import org.infinispan.commons.api.BasicCache;
import org.infinispan.commons.api.TransactionalCache;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.CloseableIteratorCollection;
import org.infinispan.commons.util.CloseableIteratorSet;
Expand Down Expand Up @@ -63,7 +64,7 @@
* @author Mircea.Markus@jboss.com
* @since 4.1
*/
public interface RemoteCache<K, V> extends BasicCache<K, V> {
public interface RemoteCache<K, V> extends BasicCache<K, V>, TransactionalCache {
/**
* Removes the given entry only if its version matches the supplied version. A typical use case looks like this:
* <pre>
Expand Down
@@ -1,6 +1,9 @@
package org.infinispan.client.hotrod;

import javax.transaction.TransactionManager;

import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.TransactionMode;
import org.infinispan.commons.api.BasicCacheContainer;
import org.infinispan.commons.marshall.Marshaller;

Expand All @@ -23,9 +26,82 @@ public interface RemoteCacheContainer extends BasicCacheContainer {
*/
Configuration getConfiguration();

<K, V> RemoteCache<K, V> getCache(String cacheName, boolean forceReturnValue);
/**
* Same as {@code getCache(cacheName, forceReturnValue, null, null)}
*
* @see #getCache(String, boolean, TransactionMode, TransactionManager)
*/
default <K, V> RemoteCache<K, V> getCache(String cacheName, boolean forceReturnValue) {
return getCache(cacheName, forceReturnValue, null, null);
}

/**
* Same as {@code getCache("", forceReturnValue, null, null)}
*
* @see #getCache(String, boolean, TransactionMode, TransactionManager)
*/
default <K, V> RemoteCache<K, V> getCache(boolean forceReturnValue) {
return getCache("", forceReturnValue, null, null);
}

/**
* Same as {@code getCache(cacheName, transactionMode, null)}
*
* @see #getCache(String, TransactionMode, TransactionManager)
*/
default <K, V> RemoteCache<K, V> getCache(String cacheName, TransactionMode transactionMode) {
return getCache(cacheName, transactionMode, null);
}

/**
* Same as {@code getCache(cacheName, forceReturnValue, transactionMode, null)}
*
* @see #getCache(String, boolean, TransactionMode, TransactionManager)
*/
default <K, V> RemoteCache<K, V> getCache(String cacheName, boolean forceReturnValue,
TransactionMode transactionMode) {
return getCache(cacheName, forceReturnValue, transactionMode, null);
}

/**
* Same as {@code getCache(cacheName, null, transactionManager)}
*
* @see #getCache(String, TransactionMode, TransactionManager)
*/
default <K, V> RemoteCache<K, V> getCache(String cacheName, TransactionManager transactionManager) {
return getCache(cacheName, null, transactionManager);
}

<K, V> RemoteCache<K, V> getCache(boolean forceReturnValue);
/**
* Same as {@code getCache(cacheName, forceReturnValue, null, transactionManager)}
*
* @see #getCache(String, boolean, TransactionMode, TransactionManager)
*/
default <K, V> RemoteCache<K, V> getCache(String cacheName, boolean forceReturnValue,
TransactionManager transactionManager) {
return getCache(cacheName, forceReturnValue, null, transactionManager);
}

/**
*
* @param cacheName The cache's name.
* @param transactionMode The {@link TransactionMode} to override. If {@code null}, it uses the configured value.
* @param transactionManager The {@link TransactionManager} to override. If {@code null}, it uses the configured value.
* @return the {@link RemoteCache} implementation.
*/
<K, V> RemoteCache<K, V> getCache(String cacheName, TransactionMode transactionMode,
TransactionManager transactionManager);

/**
* @param cacheName The cache's name.
* @param forceReturnValue {@code true} to force a return value when it is not needed.
* @param transactionMode The {@link TransactionMode} to override. If {@code null}, it uses the configured value.
* @param transactionManager The {@link TransactionManager} to override. If {@code null}, it uses the configured
* value.
* @return the {@link RemoteCache} implementation.
*/
<K, V> RemoteCache<K, V> getCache(String cacheName, boolean forceReturnValue, TransactionMode transactionMode,
TransactionManager transactionManager);

boolean isStarted();

Expand Down
Expand Up @@ -18,9 +18,12 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.transaction.TransactionManager;

import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.configuration.NearCacheConfiguration;
import org.infinispan.client.hotrod.configuration.TransactionMode;
import org.infinispan.client.hotrod.counter.impl.RemoteCounterManager;
import org.infinispan.client.hotrod.event.impl.ClientListenerNotifier;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
Expand All @@ -33,6 +36,9 @@
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.protocol.CodecFactory;
import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
import org.infinispan.client.hotrod.impl.transaction.SyncModeTransactionTable;
import org.infinispan.client.hotrod.impl.transaction.TransactionTable;
import org.infinispan.client.hotrod.impl.transaction.TransactionalRemoteCacheImpl;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelFactory;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
Expand Down Expand Up @@ -85,6 +91,7 @@ public class RemoteCacheManager implements RemoteCacheContainer, Closeable {
private final Runnable start = this::start;
private final Runnable stop = this::stop;
private final RemoteCounterManager counterManager;
private final TransactionTable syncTransactionTable = new SyncModeTransactionTable();

/**
* Create a new RemoteCacheManager using the supplied {@link Configuration}.
Expand Down Expand Up @@ -166,7 +173,7 @@ public RemoteCacheManager() {
*/
@Override
public <K, V> RemoteCache<K, V> getCache(String cacheName) {
return getCache(cacheName, configuration.forceReturnValues());
return getCache(cacheName, configuration.forceReturnValues(), null, null);
}

@Override
Expand All @@ -183,11 +190,6 @@ public Set<String> getCacheNames() {
return cacheNames;
}

@Override
public <K, V> RemoteCache<K, V> getCache(String cacheName, boolean forceReturnValue) {
return createRemoteCache(cacheName, forceReturnValue);
}

/**
* Retrieves the default cache from the remote server.
*
Expand All @@ -200,9 +202,15 @@ public <K, V> RemoteCache<K, V> getCache() {
}

@Override
public <K, V> RemoteCache<K, V> getCache(boolean forceReturnValue) {
//As per the HotRod protocol specification, the default cache is identified by an empty string
return createRemoteCache("", forceReturnValue);
public <K, V> RemoteCache<K, V> getCache(String cacheName, TransactionMode transactionMode,
TransactionManager transactionManager) {
return createRemoteCache(cacheName, configuration.forceReturnValues(), transactionMode, transactionManager);
}

@Override
public <K, V> RemoteCache<K, V> getCache(String cacheName, boolean forceReturnValue, TransactionMode transactionMode,
TransactionManager transactionManager) {
return createRemoteCache(cacheName, forceReturnValue, transactionMode, transactionManager);
}

public CompletableFuture<Void> startAsync() {
Expand Down Expand Up @@ -310,12 +318,19 @@ private Properties loadFromStream(InputStream stream) {
return properties;
}

@SuppressWarnings("unchecked")
private <K, V> RemoteCache<K, V> createRemoteCache(String cacheName, Boolean forceReturnValueOverride) {
private <K, V> RemoteCache<K, V> createRemoteCache(String cacheName, boolean forceReturnValueOverride,
TransactionMode transactionModeOverride, TransactionManager transactionManagerOverride) {
synchronized (cacheName2RemoteCache) {
RemoteCacheKey key = new RemoteCacheKey(cacheName, forceReturnValueOverride);
if (!cacheName2RemoteCache.containsKey(key)) {
RemoteCacheImpl<K, V> result = createRemoteCache(cacheName);
TransactionMode transactionMode = getTransactionMode(transactionModeOverride);
RemoteCacheImpl<K, V> result;
if (transactionMode == TransactionMode.NONE) {
result = createRemoteCache(cacheName);
} else {
TransactionManager transactionManager = getTransactionManager(transactionManagerOverride);
result = createRemoteTransactionalCache(cacheName, forceReturnValueOverride, transactionMode, transactionManager);
}
RemoteCacheHolder rcc = new RemoteCacheHolder(result, forceReturnValueOverride);
startRemoteCache(rcc);

Expand All @@ -326,13 +341,16 @@ private <K, V> RemoteCache<K, V> createRemoteCache(String cacheName, Boolean for
pingResult == PingResult.CACHE_DOES_NOT_EXIST) {
return null;
}
if (transactionMode != TransactionMode.NONE) {
((TransactionalRemoteCacheImpl<K, V>) result).checkTransactionSupport();
}

result.start();
// If ping on startup is disabled, or cache is defined in server
cacheName2RemoteCache.put(key, rcc);
return result;
} else {
return (RemoteCache<K, V>) cacheName2RemoteCache.get(key).remoteCache;
return cacheName2RemoteCache.get(key).remoteCache();
}
}
}
Expand All @@ -353,7 +371,7 @@ protected <K, V> NearCacheService<K, V> createNearCacheService(NearCacheConfigur
}

private void startRemoteCache(RemoteCacheHolder remoteCacheHolder) {
RemoteCacheImpl<?, ?> remoteCache = remoteCacheHolder.remoteCache;
RemoteCacheImpl<?, ?> remoteCache = remoteCacheHolder.remoteCache();
OperationsFactory operationsFactory = new OperationsFactory(
channelFactory, remoteCache.getName(), remoteCacheHolder.forceReturnValue, codec, listenerNotifier,
configuration);
Expand Down Expand Up @@ -411,6 +429,36 @@ public ChannelFactory getChannelFactory() {
return channelFactory;
}

private TransactionManager getTransactionManager(TransactionManager override) {
try {
return override == null ? configuration.transaction().transactionManagerLookup()
.getTransactionManager() : override;
} catch (Exception e) {
throw new HotRodClientException(e);
}
}

private TransactionMode getTransactionMode(TransactionMode override) {
return override == null ? configuration.transaction().transactionMode() : override;
}

private TransactionTable getTransactionTable(TransactionMode transactionMode) {
switch (transactionMode) {
case NON_XA:
return syncTransactionTable;
case FULL_XA:
case NON_DURABLE_XA:
default:
throw new IllegalArgumentException("XA isn't supported yet!");
}
}

private <K, V> TransactionalRemoteCacheImpl<K, V> createRemoteTransactionalCache(String cacheName,
boolean forceReturnValues, TransactionMode transactionMode, TransactionManager transactionManager) {
return new TransactionalRemoteCacheImpl<>(this, cacheName, forceReturnValues, transactionManager,
getTransactionTable(transactionMode));
}

private static class RemoteCacheKey {

final String cacheName;
Expand Down Expand Up @@ -448,5 +496,10 @@ private static class RemoteCacheHolder {
this.remoteCache = remoteCache;
this.forceReturnValue = forceReturnValue;
}

<K, V> RemoteCacheImpl<K, V> remoteCache() {
//noinspection unchecked
return (RemoteCacheImpl<K, V>) remoteCache;
}
}
}
Expand Up @@ -181,6 +181,11 @@ public ConfigurationBuilder batchSize(int batchSize) {
return builder.batchSize(batchSize);
}

@Override
public TransactionConfigurationBuilder transaction() {
return builder.transaction();
}

@Override
public ConfigurationBuilder withProperties(Properties properties) {
return builder.withProperties(properties);
Expand Down
Expand Up @@ -49,13 +49,15 @@ public class Configuration {
private final List<ClusterConfiguration> clusters;
private final List<String> serialWhitelist;
private final int batchSize;
private final TransactionConfiguration transaction;

Configuration(ExecutorFactoryConfiguration asyncExecutorFactory, Supplier<FailoverRequestBalancingStrategy> balancingStrategyFactory, ClassLoader classLoader,
ClientIntelligence clientIntelligence, ConnectionPoolConfiguration connectionPool, int connectionTimeout, Class<? extends ConsistentHash>[] consistentHashImpl, boolean forceReturnValues, int keySizeEstimate,
Marshaller marshaller, Class<? extends Marshaller> marshallerClass,
ProtocolVersion protocolVersion, List<ServerConfiguration> servers, int socketTimeout, SecurityConfiguration security, boolean tcpNoDelay, boolean tcpKeepAlive,
int valueSizeEstimate, int maxRetries, NearCacheConfiguration nearCache,
List<ClusterConfiguration> clusters, List<String> serialWhitelist, int batchSize) {
List<ClusterConfiguration> clusters, List<String> serialWhitelist, int batchSize,
TransactionConfiguration transaction) {
this.asyncExecutorFactory = asyncExecutorFactory;
this.balancingStrategyFactory = balancingStrategyFactory;
this.maxRetries = maxRetries;
Expand All @@ -79,6 +81,7 @@ public class Configuration {
this.clusters = clusters;
this.serialWhitelist = serialWhitelist;
this.batchSize = batchSize;
this.transaction = transaction;
}

public ExecutorFactoryConfiguration asyncExecutorFactory() {
Expand Down Expand Up @@ -221,6 +224,10 @@ public int batchSize() {
return batchSize;
}

public TransactionConfiguration transaction() {
return transaction;
}

@Override
public String toString() {
return "Configuration [asyncExecutorFactory=" + asyncExecutorFactory + ", balancingStrategyFactory=()->" + balancingStrategyFactory.get()
Expand All @@ -231,7 +238,8 @@ public String toString() {
+ ", valueSizeEstimate=" + valueSizeEstimate + ", maxRetries=" + maxRetries
+ ", serialWhiteList=" + serialWhitelist
+ ", batchSize=" + batchSize
+ "nearCache=" + nearCache + "]";
+ ", nearCache=" + nearCache
+ ", transaction=" + transaction + "]";
}

public Properties properties() {
Expand Down Expand Up @@ -336,6 +344,7 @@ public Properties properties() {

properties.setProperty(ConfigurationProperties.BATCH_SIZE, Integer.toString(batchSize));

transaction.toProperties(properties);
return properties;
}
}

0 comments on commit 3207a9a

Please sign in to comment.