Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

INT-2935: Improve `AELMP` performance #750

Closed
wants to merge 6 commits into from

2 participants

@artembilan
Collaborator

Before ApplicationEventListeningMessageProducer accepted all ApplicationEvent's and than filtered them.
It caused some ApplicationEventMulticaster.retrieverCache overhead.

  • Improve ApplicationEventListeningMessageProducer to implements SmartApplicationListener. It allows to make filtering earlier on first appropriate ApplicationEvent from ApplicationEventListeningMessageProducer#supportsEventType and caching the ApplicationListener only for that ApplicationEvent.
  • Re-register ApplicationEventListeningMessageProducer in the ApplicationEventMulticaster on ApplicationEventListeningMessageProducer#setEventTypes to clear ApplicationEventMulticaster.retrieverCache and make filtering on next the ApplicationEvent.
  • Move org.springframework.integration.gemfire.inbound.SpelMessageProducerSupport to core ExpressionMessageProducerSupport as useful component.
  • Add test about new logic of ApplicationEventListeningMessageProducer and its behavior with respect to the ApplicationEventMulticaster.retrieverCache.

JIRA: https://jira.springsource.org/browse/INT-2935

...ration/endpoint/ExpressionMessageProducerSupport.java
@@ -18,21 +18,22 @@
/**
* @author David Turanski
@garyrussell Owner

Needs Class JavaDoc (even if brief).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...ration/endpoint/ExpressionMessageProducerSupport.java
@@ -41,7 +42,7 @@ public void setPayloadExpression(String payloadExpression) {
this.payloadExpression = this.parser.parseExpression(payloadExpression);
}
}
-
+
protected Object evaluationResult(Object payload){
@garyrussell Owner

This method name should be an adjective (not a noun); such as evaluate, evaluateResult.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...inbound/ApplicationEventListeningMessageProducer.java
((105 lines not shown))
public void onApplicationEvent(ApplicationEvent event) {
- if (this.active || event instanceof ApplicationContextEvent) {
- if (CollectionUtils.isEmpty(this.eventTypes)) {
- this.sendEventAsMessage(event);
- return;
+ if (event instanceof ApplicationContextEvent || this.isRunning()) {
@garyrussell Owner

suggest if (this.isRunning() || event instanceof ApplicationContextEvent) { so we take the early exit most of the time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...inbound/ApplicationEventListeningMessageProducer.java
((62 lines not shown))
*/
- @SuppressWarnings("unchecked")
- public void setEventTypes(Class<? extends ApplicationEvent>[] eventTypes) {
+ public void setEventTypes(Class<? extends ApplicationEvent>... eventTypes) {
Assert.notEmpty(eventTypes, "at least one event type is required");
synchronized (this.eventTypes) {
this.eventTypes.clear();
@garyrussell Owner

This is not a new problem but I am concerned about losing events (or emitting unexpected events if an event appears between the clear and the first add in addAll()).

We either need to atomically replace this.eventTypes (change it to volatile), or add a barrier to onApplicationEvent to prevent it from proceeding while this.eventTypes is being rebuilt. However, we shouldn't always synchronize onApplicationEvent; only while the rebuild is happening.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@artembilan
Collaborator

Pusshed commit (for review) about ReadWriteLock in the ApplicationEventListeningMessageProducer

...inbound/ApplicationEventListeningMessageProducer.java
((119 lines not shown))
public void onApplicationEvent(ApplicationEvent event) {
- if (this.active || event instanceof ApplicationContextEvent) {
- if (CollectionUtils.isEmpty(this.eventTypes)) {
- this.sendEventAsMessage(event);
- return;
- }
- for (Class<? extends ApplicationEvent> eventType : this.eventTypes) {
- if (eventType.isAssignableFrom(event.getClass())) {
- this.sendEventAsMessage(event);
+ if (event instanceof ApplicationContextEvent || this.isRunning()) {
+
+ this.readEventTypesLock.lock();
+ try {
+ if(!this.supportsEventType(event.getClass())) {
@artembilan Collaborator

I don't like this code, but I can't invent how to do waiting for release of 'write task' ... :-(. ReadLock doesn't support Condition

@artembilan Collaborator

I've found more strange code:

if(this.readEventTypesLock.tryLock()) {
    this.readEventTypesLock.unlock();
}
else {
    this.writeEventTypesLock.lock();
    try {
        if(!this.supportsEventType(event.getClass())) {
            return;
        }
    }
    finally {
        this.writeEventTypesLock.unlock();
    }
}

but to me it seems less overhead to this.supportsEventType.
Sorry, I'm not still well with Java Concurrency :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@artembilan
Collaborator

Pushed new commit.

So, I've changed my mind and it works like this:
*eventTypes was changed to volatole array
*there is no reason to block setEventTypes when onApplicationEvent works
*setEventTypes switch on a volatile flag eventTypesChanging, who is checked on each event in the onApplicationEvent
*if it is true received event waits a changeEventTypesLatch, who is released on the changeEventTypesLock.unlock() in the finally of setEventTypes
*after that supportsEventType is called on expectant event, to check, if it is appropriate for changed eventTypes at runtime

Added concurrency test.

A question: how about to change AbstractEndpoint#lifecycleLock to the ReentrantReadWriteLock? There is a block for each Thread on the call of this.isRunning(). That's why I made use of it like this:

if (event instanceof ApplicationContextEvent || this.isRunning()) {

where we skip unnecessary lock.

P.S. Although AbstractApplicationEventMulticaster#getApplicationListeners has synchronized (this.defaultRetriever) { there should be an additinal double check this.retrieverCache.get(cacheKey) in that block, as it is recommended for Singleton Pattern. WDYT: is there any reason to disturb Chris or even Juergen?

@garyrussell
Owner

Well, once you switch to a volatile field, all the complexity goes away; I don't think you don't need any locks at all.

this.eventTypes = eventTypes

is an atomic operation; a thread will either see the old or new list and never a partially constructed one.

Regarding isRunning() - maybe it's easier to go back to using the local active flag like before - then you don't need a lock (reentrant or not).

@artembilan
Collaborator

Hi, Gary!
Sorry for late response

you don't need any locks at all

So, do you mean remove any locking from setEventTypes and even don't use synchronized in the method declaration? I'm not sure in the atomicity of logic in this method...

And yet: why don't use ReentrantReadWriteLock for SmartLifecycle operations?

@garyrussell
Owner

Right - with a volatile field, I don't believe we need any locks at all; setEventTypes does two things...

  1. Atomically replaces the field (this.eventTypes = newEventTypes).
  2. Updates the multicaster (clearing the cache).

if a new even arrives between 1 and 2...

  • if the event was previously handled by this adapter (but no longer is), it will be filtered out during the iteration. Result - new settings for this event
  • if the event was NOT previously handled by this adapter (but now is), we won't be called. Result - old settings for this event

While a thread is in setEventTypes a new event can arrive before 1, between 1 and 2, or after 2. In each case, it's simply a race condition as to whether this event will use the old settings, or the new settings, but there is never any inconsistency (such as that which can occur now, while changing the set in-place).

However, this comes at the "expense" of making the field volatile. However, I submit that if we were worried about the cost of volatile variables, we'd have more than just this one to worry about.

Finally, I have not looked at the performance characteristics of ReentrantLock compared with ReentrantReadWriteLock, but I submit that a local 'active' boolean (such as is used today), is much cheaper than either one.

@artembilan
Collaborator

Got it, thanks.
I worry more about changeEventTypesLatch: an event is waiting one instance, but another 'write'-Thread creates new instance of CountDownLatch... With Java 7 Phaser it will be OK, I think...

About active: here it's OK. There is no any additional 'lifecycle' logic, so, all lifecycle operations are fully atomic.
I worry more about other AbstractEndpoint implementations, where start/stop have a cost before they change 'running' flag. So, here we really should be inside 'lock' with isRunning(). But if my application 'eats' a lot of Messages, I'll get benefits from ReentrantReadWriteLock. Of course, this is not the topic of this PR...

@artembilan
Collaborator

HI!
Sorry for big delay.
So, I've change my mind again:

  • Revert active flag. However a question about ReentrantReadWriteLock is open. E.g. HTTP inbound has an ability of Smart Lifecycle, but it doesn't react to the change of running after stop()
  • Remove lock from AELMP
  • Add info on discarding event during eventTypes change.
  • Polishing concurrency test
@artembilan
Collaborator

Rabased, polished and added avoiding 'eventTypes' mutation afterwards

@garyrussell
Owner

Hi Artem, sorry it took so long to get back to this.

Looks good, except, I think we can remove the CountDownLatch and eventTypesChanging boolean altogether. They really add no value.

if (!this.supportsEventType(event.getClass())) then we can simply assume that the event was received while the supported events was being changed. See my last comment above that shows the race condition will never cause a problem, so no latch is needed. Worst case is we'll see an event that would not have been emitted after the change.

You could argue that the latch is needed to avoid the call to supportsEventType unless we know the event list is changing. However, my point is, "who cares whether the event that was recently supported is still supported?". In other words, if we receive an event because the cache was not yet updated, simply emit the event anyway (if it had arrived a few milliseconds earlier it would have been emitted anyway". So, I think, all we need is

public void setEventTypes(Class<? extends ApplicationEvent>... eventTypes) {
    this.eventTypes = new HashSet<Class<? extends ApplicationEvent>>(Arrays.asList(eventTypes));
    if (this.applicationEventMulticaster != null) {
        this.applicationEventMulticaster.addApplicationListener(this);
    }
}

public void onApplicationEvent(ApplicationEvent event) {
    if (this.active || event instanceof ApplicationContextEvent) {
        if (event.getSource() instanceof Message<?>) {
            this.sendMessage((Message<?>) event.getSource());
        }
        else {
            Object payload = this.evaluatePayloadExpression(event);
            this.sendMessage(MessageBuilder.withPayload(payload).build());
        }
    }
}
...ent/config/EventInboundChannelAdapterParserTests.java
@@ -87,11 +90,12 @@ public void validateEventParserWithEventTypes() {
Assert.assertTrue(adapter instanceof ApplicationEventListeningMessageProducer);
DirectFieldAccessor adapterAccessor = new DirectFieldAccessor(adapter);
Assert.assertEquals(context.getBean("inputFiltered"), adapterAccessor.getPropertyValue("outputChannel"));
- Set<Class<? extends ApplicationEvent>> eventTypes = (Set<Class<? extends ApplicationEvent>>) adapterAccessor.getPropertyValue("eventTypes");
+ Class<? extends ApplicationEvent>[] eventTypes = (Class<? extends ApplicationEvent>[]) adapterAccessor.getPropertyValue("eventTypes");
@garyrussell Owner

This no longer runs - eventTypes is back to being a Set.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@artembilan
Collaborator

Hi, Gary!
Sorry for delay on all open questions: I was out of computer.
Here: I agree and I'll answer soon by commit with your concerns.

@artembilan
Collaborator

Rebased and pushed:

the event that was recently supported is still supported

was an argument :-)
Thanks

...inbound/ApplicationEventListeningMessageProducer.java
((73 lines not shown))
-
- /**
- * Provide an expression to be evaluated against the received ApplicationEvent
- * instance (the "root object") in order to create the Message payload. If none
- * is provided, the ApplicationEvent itself will be used as the payload.
- */
- public void setPayloadExpression(String payloadExpression) {
- if (payloadExpression == null) {
- this.payloadExpression = null;
- }
- else {
- this.payloadExpression = this.parser.parseExpression(payloadExpression);
+ public void setEventTypes(Class<? extends ApplicationEvent>... eventTypes) {
+ this.eventTypes = new HashSet<Class<? extends ApplicationEvent>>(Arrays.asList(eventTypes));
+ if (this.applicationEventMulticaster != null) {
+ this.applicationEventMulticaster.addApplicationListener(this);
}
@garyrussell Owner

One last comment :smile: (I hope!)

I wonder if we should detect eventTypes.length == 1 && eventTypes[0] == null and reset the field to null in that case? (just like if we had never set an event type list).

Of course, they can always set it to ApplicationEvent to get all events.

Also, aside from the only element being null, maybe an Assert.noNullElements() ??

@artembilan Collaborator

Gery, it's very serious catch:

adapter.setEventTypes((Class<? extends ApplicationEvent>) null);

ends up with NPE now.
So, will be fixed soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
artembilan added some commits
@artembilan artembilan INT-2935: Improve `AELMP` performance
Before `ApplicationEventListeningMessageProducer` accepted all `ApplicationEvent`'s and than filtered them.
It caused some `ApplicationEventMulticaster.retrieverCache` overhead.

* Improve `ApplicationEventListeningMessageProducer` to `implements SmartApplicationListener`.
It allows to make filtering earlier on first appropriate `ApplicationEvent` from `ApplicationEventListeningMessageProducer#supportsEventType`
 and caching the `ApplicationListener` only for that `ApplicationEvent`.
* Re-register `ApplicationEventListeningMessageProducer` in the `ApplicationEventMulticaster` on `ApplicationEventListeningMessageProducer#setEventTypes`
to clear `ApplicationEventMulticaster.retrieverCache` and make filtering on next the `ApplicationEvent`.
* Move `org.springframework.integration.gemfire.inbound.SpelMessageProducerSupport` to core `ExpressionMessageProducerSupport` as useful component.
* Add test about new logic of `ApplicationEventListeningMessageProducer` and its behavior with respect to the `ApplicationEventMulticaster.retrieverCache`.

JIRA: https://jira.springsource.org/browse/INT-2935
5ca318e
@artembilan artembilan INT-2935: EMPS JavaDoc & AELMP ReadWriteLock 13008f8
@artembilan artembilan INT-2935: AELMP#eventTypes changing 'barrier' 91b3144
@artembilan artembilan INT-2935: avoid 'eventTypes' mutation afterwards ffc5ecd
@artembilan artembilan INT-2935: Polishing according PR comments 0344126
@artembilan artembilan INT-2935: Fix NPE in the `setEventTypes` 561d940
@artembilan
Collaborator

Pushed NPE fix

@garyrussell
Owner

LGTM; merging.

@garyrussell garyrussell closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 8, 2013
  1. @artembilan

    INT-2935: Improve `AELMP` performance

    artembilan authored
    Before `ApplicationEventListeningMessageProducer` accepted all `ApplicationEvent`'s and than filtered them.
    It caused some `ApplicationEventMulticaster.retrieverCache` overhead.
    
    * Improve `ApplicationEventListeningMessageProducer` to `implements SmartApplicationListener`.
    It allows to make filtering earlier on first appropriate `ApplicationEvent` from `ApplicationEventListeningMessageProducer#supportsEventType`
     and caching the `ApplicationListener` only for that `ApplicationEvent`.
    * Re-register `ApplicationEventListeningMessageProducer` in the `ApplicationEventMulticaster` on `ApplicationEventListeningMessageProducer#setEventTypes`
    to clear `ApplicationEventMulticaster.retrieverCache` and make filtering on next the `ApplicationEvent`.
    * Move `org.springframework.integration.gemfire.inbound.SpelMessageProducerSupport` to core `ExpressionMessageProducerSupport` as useful component.
    * Add test about new logic of `ApplicationEventListeningMessageProducer` and its behavior with respect to the `ApplicationEventMulticaster.retrieverCache`.
    
    JIRA: https://jira.springsource.org/browse/INT-2935
  2. @artembilan
  3. @artembilan
  4. @artembilan
  5. @artembilan
  6. @artembilan
This page is out of date. Refresh to see the latest.
View
31 ...n/gemfire/inbound/SpelMessageProducerSupport.java → ...on/endpoint/ExpressionMessageProducerSupport.java
@@ -1,38 +1,36 @@
/*
- * Copyright 2002-2011 the original author or authors.
- *
+ * Copyright 2002-2013 the original author or authors.
+ *
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package org.springframework.integration.gemfire.inbound;
+package org.springframework.integration.endpoint;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.endpoint.MessageProducerSupport;
/**
+ * A {@link MessageProducerSupport} sub-class that provides {@linkplain #payloadExpression}
+ * evaluation with result as a payload for Message to send.
+ *
* @author David Turanski
+ * @author Artem Bilan
* @since 2.1
*
*/
-abstract class SpelMessageProducerSupport extends MessageProducerSupport {
-
- private volatile Expression payloadExpression;
+public abstract class ExpressionMessageProducerSupport extends MessageProducerSupport {
private final SpelExpressionParser parser = new SpelExpressionParser();
-
- @Override
- protected void onInit(){
- super.onInit();
- }
-
+ private volatile Expression payloadExpression;
+
public void setPayloadExpression(String payloadExpression) {
if (payloadExpression == null) {
this.payloadExpression = null;
@@ -41,14 +39,13 @@ public void setPayloadExpression(String payloadExpression) {
this.payloadExpression = this.parser.parseExpression(payloadExpression);
}
}
-
- protected Object evaluationResult(Object payload){
+
+ protected Object evaluatePayloadExpression(Object payload){
Object evaluationResult = payload;
if (payloadExpression != null) {
evaluationResult = payloadExpression.getValue(payload);
}
return evaluationResult;
}
-
}
View
117 ...rc/main/java/org/springframework/integration/event/inbound/ApplicationEventListeningMessageProducer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2010 the original author or authors.
+ * Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,63 +16,63 @@
package org.springframework.integration.event.inbound;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
import org.springframework.context.ApplicationEvent;
-import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ApplicationContextEvent;
-import org.springframework.expression.Expression;
-import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.context.event.ApplicationEventMulticaster;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.SmartApplicationListener;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.core.Ordered;
import org.springframework.integration.Message;
-import org.springframework.integration.endpoint.MessageProducerSupport;
+import org.springframework.integration.endpoint.ExpressionMessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
- * An inbound Channel Adapter that passes Spring {@link ApplicationEvent ApplicationEvents} within messages.
+ * An inbound Channel Adapter that implements {@link ApplicationListener} and
+ * passes Spring {@link ApplicationEvent ApplicationEvents} within messages.
* If a {@link #setPayloadExpression(String) payloadExpression} is provided, it will be evaluated against
* the ApplicationEvent instance to create the Message payload. Otherwise, the event itself will be the payload.
- *
+ *
* @author Mark Fisher
+ * @author Artem Bilan
+ * @see ApplicationEventMulticaster
+ * @see ExpressionMessageProducerSupport
*/
-public class ApplicationEventListeningMessageProducer extends MessageProducerSupport implements ApplicationListener<ApplicationEvent> {
+public class ApplicationEventListeningMessageProducer extends ExpressionMessageProducerSupport implements SmartApplicationListener {
- private final Set<Class<? extends ApplicationEvent>> eventTypes = new CopyOnWriteArraySet<Class<? extends ApplicationEvent>>();
+ private volatile Set<Class<? extends ApplicationEvent>> eventTypes;
- private volatile Expression payloadExpression;
+ private ApplicationEventMulticaster applicationEventMulticaster;
private volatile boolean active;
- private final SpelExpressionParser parser = new SpelExpressionParser();
-
-
/**
* Set the list of event types (classes that extend ApplicationEvent) that
* this adapter should send to the message channel. By default, all event
* types will be sent.
+ * In additional this method re-register current instance as a {@link ApplicationListener}
+ * in the {@link ApplicationEventMulticaster} to clear listeners cache and get a fresh cache entry
+ * on next appropriate {@link ApplicationEvent}.
+ *
+ * @see ApplicationEventMulticaster#addApplicationListener
+ * @see #supportsEventType
*/
@SuppressWarnings("unchecked")
- public void setEventTypes(Class<? extends ApplicationEvent>[] eventTypes) {
- Assert.notEmpty(eventTypes, "at least one event type is required");
- synchronized (this.eventTypes) {
- this.eventTypes.clear();
- this.eventTypes.addAll(CollectionUtils.arrayToList(eventTypes));
- }
- }
+ public void setEventTypes(Class<? extends ApplicationEvent>... eventTypes) {
+ Set<Class<? extends ApplicationEvent>> eventSet = new HashSet<Class<? extends ApplicationEvent>>(CollectionUtils.arrayToList(eventTypes));
+ eventSet.remove(null);
+ this.eventTypes = (eventSet.size() > 0 ? eventSet : null);
- /**
- * Provide an expression to be evaluated against the received ApplicationEvent
- * instance (the "root object") in order to create the Message payload. If none
- * is provided, the ApplicationEvent itself will be used as the payload.
- */
- public void setPayloadExpression(String payloadExpression) {
- if (payloadExpression == null) {
- this.payloadExpression = null;
- }
- else {
- this.payloadExpression = this.parser.parseExpression(payloadExpression);
+ if (this.applicationEventMulticaster != null) {
+ this.applicationEventMulticaster.addApplicationListener(this);
}
}
@@ -80,21 +80,47 @@ public String getComponentType() {
return "event:inbound-channel-adapter";
}
+ @Override
+ protected void onInit() {
+ super.onInit();
+ this.applicationEventMulticaster = this.getBeanFactory()
+ .getBean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
+ Assert.notNull(this.applicationEventMulticaster,
+ "To use ApplicationListeners the 'applicationEventMulticaster' bean must be supplied within ApplicationContext.");
+ }
+
public void onApplicationEvent(ApplicationEvent event) {
if (this.active || event instanceof ApplicationContextEvent) {
- if (CollectionUtils.isEmpty(this.eventTypes)) {
- this.sendEventAsMessage(event);
- return;
+ if (event.getSource() instanceof Message<?>) {
+ this.sendMessage((Message<?>) event.getSource());
}
- for (Class<? extends ApplicationEvent> eventType : this.eventTypes) {
- if (eventType.isAssignableFrom(event.getClass())) {
- this.sendEventAsMessage(event);
- return;
- }
+ else {
+ Object payload = this.evaluatePayloadExpression(event);
+ this.sendMessage(MessageBuilder.withPayload(payload).build());
}
}
}
+ public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
+ if (this.eventTypes == null) {
+ return true;
+ }
+ for (Class<? extends ApplicationEvent> type : this.eventTypes) {
+ if (type.isAssignableFrom(eventType)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean supportsSourceType(Class<?> sourceType) {
+ return true;
+ }
+
+ public int getOrder() {
+ return Ordered.LOWEST_PRECEDENCE;
+ }
+
@Override
protected void doStart() {
this.active = true;
@@ -105,14 +131,5 @@ protected void doStop() {
this.active = false;
}
- private void sendEventAsMessage(ApplicationEvent event) {
- if (event.getSource() instanceof Message<?>) {
- this.sendMessage((Message<?>) event.getSource());
- }
- else {
- Object payload = (this.payloadExpression != null) ? this.payloadExpression.getValue(event) : event;
- this.sendMessage(MessageBuilder.withPayload(payload).build());
- }
- }
-
}
+
View
4 spring-integration-event/src/test/java/log4j.properties
@@ -2,7 +2,7 @@ log4j.rootCategory=WARN, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%c{1}: %m%n
+log4j.appender.stdout.layout.ConversionPattern=%c{1}: (%t) %m%n
log4j.category.org.springframework.integration=WARN
-log4j.category.org.springframework.integration.file=WARN
+log4j.category.org.springframework.integration.event=INFO
View
116 ...st/java/org/springframework/integration/event/inbound/ApplicationEventListeningMessageProducerTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2010 the original author or authors.
+ * Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,16 +17,30 @@
package org.springframework.integration.event.inbound;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hamcrest.Matchers;
import org.junit.Test;
+import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ApplicationEventMulticaster;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.context.event.ContextStoppedEvent;
+import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.Message;
import org.springframework.integration.MessageHandlingException;
import org.springframework.integration.channel.DirectChannel;
@@ -35,10 +49,12 @@
import org.springframework.integration.event.core.MessagingEvent;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.message.GenericMessage;
+import org.springframework.integration.test.util.TestUtils;
/**
* @author Mark Fisher
* @author Gary Russell
+ * @author Artem Bilan
*/
public class ApplicationEventListeningMessageProducerTests {
@@ -50,7 +66,9 @@ public void anyApplicationEventSentByDefault() {
adapter.start();
Message<?> message1 = channel.receive(0);
assertNull(message1);
+ assertTrue(adapter.supportsEventType(TestApplicationEvent1.class));
adapter.onApplicationEvent(new TestApplicationEvent1());
+ assertTrue(adapter.supportsEventType(TestApplicationEvent2.class));
adapter.onApplicationEvent(new TestApplicationEvent2());
Message<?> message2 = channel.receive(20);
assertNotNull(message2);
@@ -66,17 +84,29 @@ public void onlyConfiguredEventTypesAreSent() {
QueueChannel channel = new QueueChannel();
ApplicationEventListeningMessageProducer adapter = new ApplicationEventListeningMessageProducer();
adapter.setOutputChannel(channel);
- adapter.setEventTypes(new Class[]{TestApplicationEvent1.class});
+ adapter.setEventTypes(TestApplicationEvent1.class);
adapter.start();
Message<?> message1 = channel.receive(0);
assertNull(message1);
+ assertTrue(adapter.supportsEventType(TestApplicationEvent1.class));
adapter.onApplicationEvent(new TestApplicationEvent1());
- adapter.onApplicationEvent(new TestApplicationEvent2());
+ assertFalse(adapter.supportsEventType(TestApplicationEvent2.class));
Message<?> message2 = channel.receive(20);
assertNotNull(message2);
assertEquals("event1", ((ApplicationEvent) message2.getPayload()).getSource());
- Message<?> message3 = channel.receive(0);
- assertNull(message3);
+ assertNull(channel.receive(0));
+
+ adapter.setEventTypes((Class<? extends ApplicationEvent>) null);
+ assertTrue(adapter.supportsEventType(TestApplicationEvent1.class));
+ assertTrue(adapter.supportsEventType(TestApplicationEvent2.class));
+
+ adapter.setEventTypes(null, TestApplicationEvent2.class, null);
+ assertFalse(adapter.supportsEventType(TestApplicationEvent1.class));
+ assertTrue(adapter.supportsEventType(TestApplicationEvent2.class));
+
+ adapter.setEventTypes(null, null);
+ assertTrue(adapter.supportsEventType(TestApplicationEvent1.class));
+ assertTrue(adapter.supportsEventType(TestApplicationEvent2.class));
}
@Test
@@ -148,7 +178,7 @@ public void messageAsSourceOrCustomEventType() {
assertEquals("test", message2.getPayload());
}
- @Test(expected=MessageHandlingException.class)
+ @Test(expected = MessageHandlingException.class)
public void anyApplicationEventCausesExceptionWithErrorHandling() {
DirectChannel channel = new DirectChannel();
channel.subscribe(new AbstractReplyProducingMessageHandler() {
@@ -170,6 +200,65 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
adapter.onApplicationEvent(new TestApplicationEvent1());
}
+ @Test
+ @SuppressWarnings({"unchecked", "serial"})
+ public void testInt2935CheckRetrieverCache() {
+ GenericApplicationContext ctx = TestUtils.createTestApplicationContext();
+ ConfigurableListableBeanFactory beanFactory = ctx.getBeanFactory();
+
+ QueueChannel channel = new QueueChannel();
+ ApplicationEventListeningMessageProducer listenerMessageProducer = new ApplicationEventListeningMessageProducer();
+ listenerMessageProducer.setOutputChannel(channel);
+ listenerMessageProducer.setEventTypes(TestApplicationEvent2.class);
+ beanFactory.registerSingleton("testListenerMessageProducer", listenerMessageProducer);
+
+ AtomicInteger listenerCounter = new AtomicInteger();
+ beanFactory.registerSingleton("testListener", new TestApplicationListener(listenerCounter));
+
+ ctx.refresh();
+
+ ApplicationEventMulticaster multicaster =
+ ctx.getBean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
+ Map retrieverCache = TestUtils.getPropertyValue(multicaster, "retrieverCache", Map.class);
+
+ ctx.publishEvent(new TestApplicationEvent1());
+
+ //Before retrieverCache grew exponentially: the same ApplicationEventListeningMessageProducer for each event
+ assertEquals(2, retrieverCache.size());
+ for (Object key : retrieverCache.keySet()) {
+ Class<? extends ApplicationEvent> event = TestUtils.getPropertyValue(key, "eventType", Class.class);
+ assertThat(event, Matchers.is(Matchers.isOneOf(ContextRefreshedEvent.class, TestApplicationEvent1.class)));
+ Set listeners = TestUtils.getPropertyValue(retrieverCache.get(key), "applicationListenerBeans", Set.class);
+ assertEquals(1, listeners.size());
+ assertEquals("testListener", listeners.iterator().next());
+ }
+
+ TestApplicationEvent2 event2 = new TestApplicationEvent2();
+ ctx.publishEvent(event2);
+ assertEquals(3, retrieverCache.size());
+ for (Object key : retrieverCache.keySet()) {
+ Class event = TestUtils.getPropertyValue(key, "eventType", Class.class);
+ if (TestApplicationEvent2.class.isAssignableFrom(event)) {
+ Set listeners = TestUtils.getPropertyValue(retrieverCache.get(key), "applicationListenerBeans", Set.class);
+ assertEquals(2, listeners.size());
+ for (Object listener : listeners) {
+ assertThat((String) listener, Matchers.is(Matchers.isOneOf("testListenerMessageProducer", "testListener")));
+ }
+ break;
+ }
+ }
+
+ ctx.publishEvent(new ApplicationEvent("Some event") {
+ });
+
+ assertEquals(4, listenerCounter.get());
+
+ final Message<?> receive = channel.receive(10);
+ assertNotNull(receive);
+ assertSame(event2, receive.getPayload());
+ assertNull(channel.receive(1));
+ }
+
@SuppressWarnings("serial")
private static class TestApplicationEvent1 extends ApplicationEvent {
@@ -179,7 +268,6 @@ public TestApplicationEvent1() {
}
}
-
@SuppressWarnings("serial")
private static class TestApplicationEvent2 extends ApplicationEvent {
@@ -188,7 +276,6 @@ public TestApplicationEvent2() {
}
}
-
@SuppressWarnings("serial")
private static class TestMessagingEvent extends ApplicationEvent {
@@ -197,4 +284,17 @@ public TestMessagingEvent(Message<?> message) {
}
}
+ private static class TestApplicationListener implements ApplicationListener<ApplicationEvent> {
+
+ private final AtomicInteger counter;
+
+ private TestApplicationListener(AtomicInteger counter) {
+ this.counter = counter;
+ }
+
+ public void onApplicationEvent(ApplicationEvent event) {
+ this.counter.incrementAndGet();
+ }
+ }
+
}
View
23 ...gemfire/src/main/java/org/springframework/integration/gemfire/inbound/CacheListeningMessageProducer.java
@@ -22,6 +22,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.springframework.integration.endpoint.ExpressionMessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert;
@@ -37,13 +38,13 @@
* enum for all options. A SpEL expression may be provided to generate a Message payload by
* evaluating that expression against the {@link EntryEvent} instance as the root object. If no
* payloadExpression is provided, the {@link EntryEvent} itself will be the payload.
- *
+ *
* @author Mark Fisher
* @author David Turanski
* @since 2.1
*/
@SuppressWarnings({"rawtypes", "unchecked"})
-public class CacheListeningMessageProducer extends SpelMessageProducerSupport {
+public class CacheListeningMessageProducer extends ExpressionMessageProducerSupport {
private final Log logger = LogFactory.getLog(this.getClass());
@@ -58,7 +59,7 @@
public CacheListeningMessageProducer(Region<?, ?> region) {
Assert.notNull(region, "region must not be null");
this.region = region;
- this.listener = new MessageProducingCacheListener();
+ this.listener = new MessageProducingCacheListener();
}
@@ -81,16 +82,16 @@ protected void doStop() {
if (logger.isInfoEnabled()) {
logger.info("removing MessageProducingCacheListener from GemFire Region '" + this.region.getName() + "'");
}
- try {
+ try {
this.region.getAttributesMutator().removeCacheListener(this.listener);
} catch (CacheClosedException e) {
if (logger.isDebugEnabled()){
logger.debug(e.getMessage(),e);
}
}
-
+
}
-
+
private class MessageProducingCacheListener extends CacheListenerAdapter {
@Override
@@ -121,16 +122,16 @@ public void afterDestroy(EntryEvent event) {
}
}
- private void processEvent(EntryEvent event) {
- this.publish(evaluationResult(event));
-
+ private void processEvent(EntryEvent event) {
+ this.publish(evaluatePayloadExpression(event));
+
}
private void publish(Object payload) {
sendMessage(MessageBuilder.withPayload(payload).build());
}
}
-
-
+
+
}
View
23 ...emfire/src/main/java/org/springframework/integration/gemfire/inbound/ContinuousQueryMessageProducer.java
@@ -26,6 +26,7 @@
import org.springframework.data.gemfire.listener.ContinuousQueryListener;
import org.springframework.data.gemfire.listener.ContinuousQueryListenerContainer;
import org.springframework.integration.Message;
+import org.springframework.integration.endpoint.ExpressionMessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert;
@@ -36,13 +37,13 @@
* constantly evaluated against a cache
* {@link com.gemstone.gemfire.cache.Region}. This is much faster than
* re-querying the cache manually.
- *
+ *
* @author Josh Long
* @author David Turanski
* @since 2.1
- *
+ *
*/
-public class ContinuousQueryMessageProducer extends SpelMessageProducerSupport implements ContinuousQueryListener {
+public class ContinuousQueryMessageProducer extends ExpressionMessageProducerSupport implements ContinuousQueryListener {
private static Log logger = LogFactory.getLog(ContinuousQueryMessageProducer.class);
private final String query;
@@ -57,7 +58,7 @@
CqEventType.UPDATED));
/**
- *
+ *
* @param queryListenerContainer a {@link org.springframework.data.gemfire.listener.ContinuousQueryListenerContainer}
* @param query the query string
*/
@@ -69,7 +70,7 @@ public ContinuousQueryMessageProducer(ContinuousQueryListenerContainer queryList
}
/**
- *
+ *
* @param queryName optional query name
*/
public void setQueryName(String queryName) {
@@ -77,7 +78,7 @@ public void setQueryName(String queryName) {
}
/**
- *
+ *
* @param durable true if the query is a durable subscription
*/
public void setDurable(boolean durable) {
@@ -102,7 +103,7 @@ protected void onInit() {
/*
* (non-Javadoc)
- *
+ *
* @see
* org.springframework.data.gemfire.listener.QueryListener#onEvent(com.gemstone
* .gemfire.cache.query.CqEvent)
@@ -113,17 +114,17 @@ public void onEvent(CqEvent event) {
logger.debug(String.format("processing cq event key [%s] event [%s]", event.getQueryOperation()
.toString(), event.getKey()));
}
- Message<?> cqEventMessage = MessageBuilder.withPayload(evaluationResult(event)).build();
+ Message<?> cqEventMessage = MessageBuilder.withPayload(evaluatePayloadExpression(event)).build();
sendMessage(cqEventMessage);
}
}
private boolean isEventSupported(CqEvent event) {
-
- String eventName = event.getQueryOperation().toString() +
+
+ String eventName = event.getQueryOperation().toString() +
(event.getQueryOperation().toString().endsWith("Y")? "ED" : "D");
CqEventType eventType = CqEventType.valueOf(eventName);
return supportedEventTypes.contains(eventType);
}
-}
+}
Something went wrong with that request. Please try again.