Skip to content

Commit

Permalink
WFLY-8207 Upgrade JGroups to 4.0.10.Final
Browse files Browse the repository at this point in the history
WFLY-6973 Upgrade Infinispan to 9.1.5.Final
  • Loading branch information
pferraro committed Feb 9, 2018
1 parent 488f3bb commit c252b2c
Show file tree
Hide file tree
Showing 167 changed files with 1,475 additions and 1,296 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void isMarshalling() {
for (CacheMode mode : EnumSet.allOf(CacheMode.class)) {
Configuration config = new ConfigurationBuilder().clustering().cacheMode(mode).build();
CacheProperties configuration = new InfinispanCacheProperties(config);
if (mode.isDistributed() || mode.isReplicated()) {
if (mode.isDistributed() || mode.isReplicated() || mode.isScattered()) {
Assert.assertTrue(mode.name(), configuration.isMarshalling());
} else {
Assert.assertFalse(mode.name(), configuration.isMarshalling());
Expand All @@ -92,7 +92,7 @@ public void isPersistent() {
for (CacheMode mode : EnumSet.allOf(CacheMode.class)) {
Configuration config = new ConfigurationBuilder().clustering().cacheMode(mode).build();
CacheProperties configuration = new InfinispanCacheProperties(config);
if (mode.isDistributed() || mode.isReplicated()) {
if (mode.isDistributed() || mode.isReplicated() || mode.isScattered()) {
Assert.assertTrue(mode.name(), configuration.isPersistent());
} else {
Assert.assertFalse(mode.name(), configuration.isPersistent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,9 @@ private void executeOnPrimaryOwner(Bean<I, T> bean, final Command<Void, Schedule

Node locatePrimaryOwner(I id) {
DistributionManager dist = this.cache.getAdvancedCache().getDistributionManager();
Address address = (dist != null) ? dist.getPrimaryLocation(id) : null;
return (address != null) ? this.nodeFactory.createNode(address) : this.registry.getGroup().getLocalMember();
Address address = (dist != null) ? dist.getCacheTopology().getDistribution(id).primary() : null;
Node member = (address != null) ? this.nodeFactory.createNode(address) : null;
return (member != null) ? member : this.registry.getGroup().getLocalMember();
}

@Override
Expand Down Expand Up @@ -332,8 +333,7 @@ public void activated(CacheEntryActivatedEvent<BeanKey<I>, BeanEntry<I>> event)

@DataRehashed
public void dataRehashed(DataRehashedEvent<BeanKey<I>, BeanEntry<I>> event) {
Address localAddress = this.cache.getCacheManager().getAddress();
Locality newLocality = new ConsistentHashLocality(localAddress, event.getConsistentHashAtEnd());
Locality newLocality = new ConsistentHashLocality(event.getCache(), event.getConsistentHashAtEnd());
if (event.isPre()) {
Future<?> future = this.rehashFuture.getAndSet(null);
if (future != null) {
Expand All @@ -348,7 +348,7 @@ public void dataRehashed(DataRehashedEvent<BeanKey<I>, BeanEntry<I>> event) {
// Executor was shutdown
}
} else {
Locality oldLocality = new ConsistentHashLocality(localAddress, event.getConsistentHashAtStart());
Locality oldLocality = new ConsistentHashLocality(event.getCache(), event.getConsistentHashAtStart());
try {
this.rehashFuture.set(this.executor.submit(() -> this.schedule(oldLocality, newLocality)));
} catch (RejectedExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@

import org.infinispan.Cache;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.EvictionConfiguration;
import org.infinispan.configuration.cache.ExpirationConfiguration;
import org.infinispan.eviction.EvictionStrategy;
import org.infinispan.configuration.cache.MemoryConfiguration;
import org.jboss.as.clustering.controller.BuilderAdapter;
import org.jboss.as.clustering.controller.CapabilityServiceBuilder;
import org.jboss.as.controller.capability.CapabilityServiceSupport;
Expand Down Expand Up @@ -109,9 +108,9 @@ public Collection<CapabilityServiceBuilder<?>> getDeploymentBuilders(final Servi
InfinispanEjbLogger.ROOT_LOGGER.expirationDisabled(InfinispanCacheRequirement.CONFIGURATION.resolve(containerName, templateCacheName));
}
// Ensure eviction is not enabled on cache
EvictionConfiguration eviction = builder.eviction().create();
if (eviction.strategy().isEnabled()) {
builder.eviction().size(-1L).strategy(EvictionStrategy.MANUAL);
MemoryConfiguration memory = builder.memory().create();
if (memory.size() >= 0) {
builder.memory().size(-1);
InfinispanEjbLogger.ROOT_LOGGER.evictionDisabled(InfinispanCacheRequirement.CONFIGURATION.resolve(containerName, templateCacheName));
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ public BeanEntry<I> tryValue(I id) {

@Override
public BeanEntry<I> createValue(I id, I groupId) {
return this.cache.getAdvancedCache().withFlags(Flag.FORCE_SYNCHRONOUS).computeIfAbsent(this.createKey(id), key -> new InfinispanBeanEntry<>(this.beanName, groupId));
BeanEntry<I> entry = new InfinispanBeanEntry<>(this.beanName, groupId);
BeanEntry<I> existing = this.cache.getAdvancedCache().withFlags(Flag.FORCE_SYNCHRONOUS).putIfAbsent(this.createKey(id), entry);
return (existing == null) ? entry : existing;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public BeanGroupKey<I> createKey(I id) {

@Override
public BeanGroupEntry<I, T> createValue(I id, Void context) {
return this.cache.getAdvancedCache().withFlags(Flag.FORCE_SYNCHRONOUS).computeIfAbsent(this.createKey(id), key -> new InfinispanBeanGroupEntry<>(this.factory.createMarshalledValue(new ConcurrentHashMap<>())));
BeanGroupEntry<I, T> entry = new InfinispanBeanGroupEntry<>(this.factory.createMarshalledValue(new ConcurrentHashMap<>()));
BeanGroupEntry<I, T> existing = this.cache.getAdvancedCache().withFlags(Flag.FORCE_SYNCHRONOUS).putIfAbsent(this.createKey(id), entry);
return (existing == null) ? entry : existing;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,22 @@

package org.jboss.as.clustering.infinispan;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.infinispan.commons.CacheException;
import org.infinispan.commons.marshall.StreamingMarshaller;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.configuration.global.TransportConfiguration;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.transport.jgroups.CommandAwareRpcDispatcher;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.remoting.inboundhandler.InboundInvocationHandler;
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
import org.infinispan.remoting.transport.jgroups.MarshallerAdapter;
import org.infinispan.util.TimeService;
import org.wildfly.clustering.jgroups.spi.ChannelFactory;

/**
Expand All @@ -50,13 +54,19 @@ public ChannelFactoryTransport(ChannelFactory factory) {

@Inject
@Override
public void setConfiguration(GlobalConfiguration config) {
public void initialize(GlobalConfiguration configuration, StreamingMarshaller marshaller,
CacheManagerNotifier notifier, TimeService timeService, InboundInvocationHandler globalHandler,
@ComponentName(KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR) ScheduledExecutorService timeoutExecutor,
@ComponentName(KnownComponentNames.REMOTE_COMMAND_EXECUTOR) ExecutorService remoteExecutor) {

super.initialize(configuration, marshaller, notifier, timeService, globalHandler, timeoutExecutor, remoteExecutor);

GlobalConfigurationBuilder builder = new GlobalConfigurationBuilder();
// WFLY-6685 Prevent Infinispan from registering channel mbeans
// The JGroups subsystem already does this
builder.globalJmxStatistics().read(config.globalJmxStatistics()).disable();
builder.globalJmxStatistics().read(configuration.globalJmxStatistics()).disable();
// ISPN-4755 workaround
TransportConfiguration transport = config.transport();
TransportConfiguration transport = configuration.transport();
builder.transport()
.clusterName(transport.clusterName())
.distributedSyncTimeout(transport.distributedSyncTimeout())
Expand All @@ -69,25 +79,7 @@ public void setConfiguration(GlobalConfiguration config) {
.transport(transport.transport())
.withProperties(transport.properties())
;
super.setConfiguration(builder.build());
}

@Override
protected void initRPCDispatcher() {
this.dispatcher = new CommandAwareRpcDispatcher(this.channel, this, this.globalHandler, this.getTimeoutExecutor(), this.timeService);
MarshallerAdapter adapter = new MarshallerAdapter(this.marshaller) {
@Override
public Object objectFromBuffer(byte[] buffer, int offset, int length) throws Exception {
return ChannelFactoryTransport.this.isUnknownForkResponse(ByteBuffer.wrap(buffer, offset, length)) ? CacheNotFoundResponse.INSTANCE : super.objectFromBuffer(buffer, offset, length);
}
};
this.dispatcher.setRequestMarshaller(adapter);
this.dispatcher.setResponseMarshaller(adapter);
this.dispatcher.start();
}

boolean isUnknownForkResponse(ByteBuffer response) {
return this.factory.isUnknownForkResponse(response);
this.configuration = builder.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,8 @@

package org.jboss.as.clustering.infinispan;

import java.util.Arrays;
import java.util.EnumSet;
import java.util.Set;

import org.infinispan.AdvancedCache;
import org.infinispan.cache.impl.AbstractDelegatingAdvancedCache;
import org.infinispan.context.Flag;
import org.infinispan.manager.EmbeddedCacheManager;
import org.wildfly.clustering.ee.Batch;
import org.wildfly.clustering.ee.Batcher;
Expand All @@ -44,22 +39,20 @@ public class DefaultCache<K, V> extends AbstractDelegatingAdvancedCache<K, V> {

private final EmbeddedCacheManager manager;
private final Batcher<? extends Batch> batcher;
private final Set<Flag> flags;

DefaultCache(final EmbeddedCacheManager manager, final Batcher<? extends Batch> batcher, final AdvancedCache<K, V> cache, final Set<Flag> flags) {
DefaultCache(EmbeddedCacheManager manager, Batcher<? extends Batch> batcher, AdvancedCache<K, V> cache) {
super(cache, new AdvancedCacheWrapper<K, V>() {
@Override
public AdvancedCache<K, V> wrap(AdvancedCache<K, V> cache) {
return new DefaultCache<>(manager, batcher, cache, flags);
return new DefaultCache<>(manager, batcher, cache);
}
});
this.manager = manager;
this.batcher = batcher;
this.flags = flags;
}

public DefaultCache(EmbeddedCacheManager manager, BatcherFactory batcherFactory, AdvancedCache<K, V> cache) {
this(manager, batcherFactory.createBatcher(cache), cache, EnumSet.noneOf(Flag.class));
this(manager, batcherFactory.createBatcher(cache), cache);
}

@Override
Expand Down Expand Up @@ -93,13 +86,6 @@ public void endBatch(boolean successful) {
}
}

@Override
public AdvancedCache<K, V> withFlags(Flag... flags) {
Set<Flag> set = EnumSet.copyOf(this.flags);
set.addAll(Arrays.asList(flags));
return new DefaultCache<>(this.manager, this.batcher, this.cache.withFlags(flags), set);
}

@Override
public boolean equals(Object object) {
return (object == this) || (object == this.cache);
Expand Down
Loading

0 comments on commit c252b2c

Please sign in to comment.