Skip to content

Commit

Permalink
ISPN-4955 Fixes:
Browse files Browse the repository at this point in the history
* Resolved serialization issues
* Added integration test against clustered caches
* Added warn when events do not arrive
* Removed hardcoded listener wait time
  • Loading branch information
Gustavo Fernandes authored and wburns committed May 13, 2015
1 parent 86ff7b4 commit 332a63a
Show file tree
Hide file tree
Showing 12 changed files with 392 additions and 168 deletions.
104 changes: 0 additions & 104 deletions commons/src/main/java/org/infinispan/commons/util/EmptyQueue.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.commons.util.EmptyQueue;
import org.infinispan.commons.util.InfinispanCollections;
import org.infinispan.jcache.logging.Log;

/**
Expand All @@ -42,8 +42,6 @@ public abstract class AbstractJCacheNotifier<K, V> {

private static final boolean isTrace = log.isTraceEnabled();

private static final Queue<CountDownLatch> EMPTY_QUEUE = new EmptyQueue<>();

// Traversals are a not more common than mutations when it comes to
// keeping track of registered listeners, so use copy-on-write lists.

Expand Down Expand Up @@ -88,12 +86,7 @@ public void removeListener(CacheEntryListenerConfiguration<K, V> reg,
public void addSyncNotificationLatch(Cache<K, V> cache, K key, V value, CountDownLatch latch) {
EventSource<K, V> eventSourceKey = new EventSource<K, V>(cache, key, value);

Queue<CountDownLatch> latches = new ConcurrentLinkedQueue<>();
Queue<CountDownLatch> prev = latchesByEventSource.putIfAbsent(eventSourceKey, latches);
if (prev != null) {
latches = prev;
}
latches.add(latch);
latchesByEventSource.computeIfAbsent(eventSourceKey, kvEventSource -> new ConcurrentLinkedQueue<>()).add(latch);
}

public void removeSyncNotificationLatch(Cache<K, V> cache, K key, V value, CountDownLatch latch) {
Expand All @@ -104,10 +97,11 @@ public void removeSyncNotificationLatch(Cache<K, V> cache, K key, V value, Count
if (latches == null) {
return;
}
latches.remove(latch);
if (latches.isEmpty()) {
latchesByEventSource.remove(eventSourceKey, EMPTY_QUEUE);
}

latchesByEventSource.compute(eventSourceKey, (kvEventSource, countDownLatches) -> {
countDownLatches.remove(latch);
return countDownLatches.isEmpty() ? null : countDownLatches;
});
}

private void notifySync(Cache<K, V> cache, K key, V value) {
Expand All @@ -120,11 +114,8 @@ private void notifySync(Queue<CountDownLatch> latches) {
if (latches == null) {
return;
}
while (true) {
CountDownLatch latch = latches.poll();
if (latch == null) {
break;
}
CountDownLatch latch = latches.poll();
if (latch != null) {
latch.countDown();
}
}
Expand Down Expand Up @@ -170,16 +161,12 @@ public void notifyEntryRemoved(Cache<K, V> cache, K key, V value) {
}

public void notifyEntryExpired(Cache<K, V> cache, K key, V value) {
try {
if (!expiredListeners.isEmpty()) {
List<CacheEntryEvent<? extends K, ? extends V>> events =
createEvent(cache, key, value, EventType.EXPIRED);
for (CacheEntryExpiredListener<K, V> listener : expiredListeners) {
listener.onExpired(getEntryIterable(events, listenerCfgs.get(listener)));
}
if (!expiredListeners.isEmpty()) {
List<CacheEntryEvent<? extends K, ? extends V>> events =
createEvent(cache, key, value, EventType.EXPIRED);
for (CacheEntryExpiredListener<K, V> listener : expiredListeners) {
listener.onExpired(getEntryIterable(events, listenerCfgs.get(listener)));
}
} finally {
notifySync(cache, key, value);
}
}

Expand Down
83 changes: 83 additions & 0 deletions jcache/remote/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,89 @@
<artifactId>wildfly-controller</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.jboss.arquillian.junit</groupId>
<artifactId>arquillian-junit-container</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.wildfly</groupId>
<artifactId>wildfly-arquillian-container-managed</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<testResources>
<testResource>
<filtering>true</filtering>
<directory>src/test/resources</directory>
</testResource>
</testResources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit4</artifactId>
<version>${version.maven.surefire}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-infinispan-server-1</id>
<phase>generate-test-resources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/node1/infinispan-server-${project.version}</outputDirectory>
<resources>
<resource>
<directory>../../server/integration/build/target/infinispan-server-${project.version}/</directory>
<filtering>false</filtering>
</resource>
</resources>
</configuration>
</execution>
<execution>
<id>copy-infinispan-server-2</id>
<phase>generate-test-resources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/node2/infinispan-server-${project.version}</outputDirectory>
<resources>
<resource>
<directory>../../server/integration/build/target/infinispan-server-${project.version}/</directory>
<filtering>false</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,24 @@
import javax.cache.event.CacheEntryListenerException;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.jcache.AbstractJCacheNotifier;
import org.infinispan.jcache.remote.logging.Log;

public class RemoteCacheWithSyncListeners<K, V> extends RemoteCacheWrapper<K, V> {
private static final boolean DONT_EXPECT_EVENT_ON_NULL_RESULT = false;
private static final long TIMEOUT = 5000;

private static final Log log = LogFactory.getLog(RemoteCacheWithSyncListeners.class, Log.class);

private final AbstractJCacheNotifier<K, V> notifier;
private final Cache<K, V> cache;
private final int timeout;

public RemoteCacheWithSyncListeners(RemoteCache<K, V> delegate, AbstractJCacheNotifier<K, V> notifier, Cache<K, V> cache) {
super(delegate);
this.notifier = notifier;
this.cache = cache;
this.timeout = delegate.getRemoteCacheManager().getConfiguration().socketTimeout();
}

@Override
Expand Down Expand Up @@ -88,17 +93,18 @@ private <R> R withSyncListeners(boolean hasListeners, boolean expectEventOnNull,
if ((ret == null) && !expectEventOnNull) {
return ret;
}
latch.await(TIMEOUT, TimeUnit.MILLISECONDS);
boolean wasClosed = latch.await(timeout, TimeUnit.MILLISECONDS);
if (!wasClosed) {
log.timeoutWaitingEvent();
}
return ret;
} finally {
notifier.removeSyncNotificationLatch(cache, key, value, latch);
}
} catch (Exception ex) {
if (!(ex instanceof CacheEntryListenerException)) {
throw new CacheEntryListenerException(ex);
} else {
throw (CacheEntryListenerException) ex;
}
} catch (CacheEntryListenerException ex) {
throw ex;
} catch (Exception e) {
throw new CacheEntryListenerException(e);
}
}

Expand Down
Loading

0 comments on commit 332a63a

Please sign in to comment.