Skip to content

Commit

Permalink
WFLY-11423 Drop custom invocation batching logic
Browse files Browse the repository at this point in the history
  • Loading branch information
pferraro committed Nov 27, 2018
1 parent 3f32f19 commit fdfdbad
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 326 deletions.

This file was deleted.

Expand Up @@ -25,67 +25,29 @@
import org.infinispan.AdvancedCache;
import org.infinispan.cache.impl.AbstractDelegatingAdvancedCache;
import org.infinispan.manager.EmbeddedCacheManager;
import org.wildfly.clustering.ee.Batch;
import org.wildfly.clustering.ee.Batcher;

/**
* {@link AdvancedCache} decorator associated with a {@link DefaultCacheContainer}.
* Overrides {@link #startBatch()} and {@link #endBatch(boolean)} methods to use WildFly's {@link Batcher} mechanism, instead of Infinispan's {@link org.infinispan.batch.BatchContainer}.
* @author Paul Ferraro
*/
public class DefaultCache<K, V> extends AbstractDelegatingAdvancedCache<K, V> {
// Holds reference to active batch across start/end batch methods
private static final ThreadLocal<Batch> CURRENT_BATCH = new ThreadLocal<>();

private final EmbeddedCacheManager manager;
private final Batcher<? extends Batch> batcher;

DefaultCache(EmbeddedCacheManager manager, Batcher<? extends Batch> batcher, AdvancedCache<K, V> cache) {
DefaultCache(EmbeddedCacheManager manager, AdvancedCache<K, V> cache) {
super(cache, new AdvancedCacheWrapper<K, V>() {
@Override
public AdvancedCache<K, V> wrap(AdvancedCache<K, V> cache) {
return new DefaultCache<>(manager, batcher, cache);
return new DefaultCache<>(manager, cache);
}
});
this.manager = manager;
this.batcher = batcher;
}

public DefaultCache(EmbeddedCacheManager manager, BatcherFactory batcherFactory, AdvancedCache<K, V> cache) {
this(manager, batcherFactory.createBatcher(cache), cache);
}

@Override
public EmbeddedCacheManager getCacheManager() {
return this.manager;
}

@Override
public boolean startBatch() {
// If cache was not configured with a Batcher then this is a no-op
if (this.batcher == null) return false;
// If a batch is already associated with the current thread then don't create a new one
if (CURRENT_BATCH.get() != null) return false;
// Associate a new catch with the current thread
CURRENT_BATCH.set(this.batcher.createBatch());
return true;
}

@Override
public void endBatch(boolean successful) {
try (Batch batch = CURRENT_BATCH.get()) {
// If no batch is associated with the current thread then this is a no-op
if (batch != null) {
if (!successful) {
batch.discard();
}
}
} finally {
// Disassociate the batch with the current thread no matter what
CURRENT_BATCH.remove();
}
}

@Override
public boolean equals(Object object) {
return (object == this) || (object == this.cache);
Expand Down
Expand Up @@ -41,11 +41,8 @@
*/
public class DefaultCacheContainer extends AbstractDelegatingEmbeddedCacheManager implements CacheContainer, EmbeddedCacheManagerAdmin {

private final BatcherFactory batcherFactory;

public DefaultCacheContainer(EmbeddedCacheManager container, BatcherFactory batcherFactory) {
public DefaultCacheContainer(EmbeddedCacheManager container) {
super(container);
this.batcherFactory = batcherFactory;
}

@Override
Expand Down Expand Up @@ -104,7 +101,7 @@ public <K, V> Cache<K, V> getCache(String cacheName, String configurationTemplat
}

private <K, V> Cache<K, V> wrap(Cache<K, V> cache) {
return new DefaultCache<>(this, this.batcherFactory, cache.getAdvancedCache());
return new DefaultCache<>(this, cache.getAdvancedCache());
}

@Override
Expand Down

This file was deleted.

Expand Up @@ -27,10 +27,10 @@

import java.util.function.Consumer;

import org.infinispan.commons.CacheException;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.ExpirationConfiguration;
import org.infinispan.configuration.cache.GroupsConfigurationBuilder;
import org.infinispan.configuration.cache.JMXStatisticsConfiguration;
import org.infinispan.configuration.cache.LockingConfiguration;
import org.infinispan.configuration.cache.MemoryConfiguration;
import org.infinispan.configuration.cache.PersistenceConfiguration;
Expand All @@ -51,6 +51,7 @@
import org.wildfly.clustering.service.ServiceConfigurator;
import org.wildfly.clustering.service.ServiceSupplierDependency;
import org.wildfly.clustering.service.SupplierDependency;
import org.wildfly.transaction.client.ContextTransactionManager;

/**
* Builds a cache configuration from its components.
Expand All @@ -66,7 +67,7 @@ public class CacheConfigurationServiceConfigurator extends CapabilityServiceName
private final SupplierDependency<TransactionConfiguration> transaction;
private final SupplierDependency<Module> module;

private volatile JMXStatisticsConfiguration statistics;
private volatile boolean statisticsEnabled;

CacheConfigurationServiceConfigurator(PathAddress address) {
super(CONFIGURATION, address);
Expand Down Expand Up @@ -99,21 +100,29 @@ public <T> ServiceBuilder<T> register(ServiceBuilder<T> builder) {

@Override
public ServiceConfigurator configure(OperationContext context, ModelNode model) throws OperationFailedException {
boolean enabled = STATISTICS_ENABLED.resolveModelAttribute(context, model).asBoolean();
this.statistics = new ConfigurationBuilder().jmxStatistics().enabled(enabled).available(enabled).create();
this.statisticsEnabled = STATISTICS_ENABLED.resolveModelAttribute(context, model).asBoolean();

this.configurator.configure(context);
return this;
}

@Override
public void accept(ConfigurationBuilder builder) {
TransactionConfiguration tx = this.transaction.get();

builder.memory().read(this.memory.get());
builder.expiration().read(this.expiration.get());
builder.locking().read(this.locking.get());
builder.persistence().read(this.persistence.get());
builder.transaction().read(this.transaction.get());
builder.jmxStatistics().read(this.statistics);
builder.transaction().read(tx);
builder.jmxStatistics().enabled(this.statisticsEnabled).available(this.statisticsEnabled);

try {
// Configure invocation batching based on transaction configuration
builder.invocationBatching().enable(tx.transactionMode().isTransactional() && (tx.transactionManagerLookup().getTransactionManager() != ContextTransactionManager.getInstance()));
} catch (Exception e) {
throw new CacheException(e);
}
}

MemoryConfiguration memory() {
Expand Down
Expand Up @@ -46,9 +46,7 @@
import org.jboss.as.clustering.controller.CapabilityServiceNameProvider;
import org.jboss.as.clustering.controller.ResourceServiceConfigurator;
import org.jboss.as.clustering.dmr.ModelNodes;
import org.jboss.as.clustering.infinispan.BatcherFactory;
import org.jboss.as.clustering.infinispan.DefaultCacheContainer;
import org.jboss.as.clustering.infinispan.InfinispanBatcherFactory;
import org.jboss.as.clustering.infinispan.InfinispanLogger;
import org.jboss.as.clustering.infinispan.LocalGlobalConfigurationManager;
import org.jboss.as.controller.OperationContext;
Expand All @@ -75,7 +73,6 @@ public class CacheContainerServiceConfigurator extends CapabilityServiceNameProv
private final List<ServiceName> aliases = new LinkedList<>();
private final String name;
private final SupplierDependency<GlobalConfiguration> configuration;
private final BatcherFactory batcherFactory = new InfinispanBatcherFactory();

public CacheContainerServiceConfigurator(PathAddress address) {
super(CONTAINER, address);
Expand All @@ -85,7 +82,7 @@ public CacheContainerServiceConfigurator(PathAddress address) {

@Override
public CacheContainer apply(EmbeddedCacheManager manager) {
return new DefaultCacheContainer(manager, this.batcherFactory);
return new DefaultCacheContainer(manager);
}

@Override
Expand Down
Expand Up @@ -90,7 +90,7 @@ public TransactionConfiguration get() {
.lockingMode(this.locking)
.cacheStopTimeout(this.timeout)
.transactionMode((this.mode == TransactionMode.NONE) ? org.infinispan.transaction.TransactionMode.NON_TRANSACTIONAL : org.infinispan.transaction.TransactionMode.TRANSACTIONAL)
.useSynchronization(EnumSet.of(TransactionMode.BATCH, TransactionMode.NON_XA).contains(this.mode))
.useSynchronization(this.mode == TransactionMode.NON_XA)
.recovery().enabled(this.mode == TransactionMode.FULL_XA).transaction()
;

Expand Down
Expand Up @@ -55,9 +55,8 @@
* @author Paul Ferraro
*/
public class DefaultCacheContainerTestCase {
private final BatcherFactory batcherFactory = mock(BatcherFactory.class);
private final EmbeddedCacheManager manager = mock(EmbeddedCacheManager.class);
private final CacheContainer subject = new DefaultCacheContainer(this.manager, this.batcherFactory);
private final CacheContainer subject = new DefaultCacheContainer(this.manager);

@After
public void cleanup() {
Expand Down

0 comments on commit fdfdbad

Please sign in to comment.