INT-2932/2941 TCP Connection Events - Doc and Polishing #752

Closed
wants to merge 1 commit into
from
Jump to file or symbol
Failed to load files and symbols.
+133 −62
Split
@@ -37,6 +37,8 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
adapterBuilder.addPropertyReference("outputChannel", channelName);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(adapterBuilder, element, "error-channel", "errorChannel");
IntegrationNamespaceUtils.setValueIfAttributeDefined(adapterBuilder, element, "event-types");
+ IntegrationNamespaceUtils.setValueIfAttributeDefined(adapterBuilder, element, "auto-startup");
+ IntegrationNamespaceUtils.setValueIfAttributeDefined(adapterBuilder, element, "phase");
return adapterBuilder.getBeanDefinition();
}
@@ -61,15 +61,19 @@ public TcpConnectionEvent(TcpConnectionSupport connection, Throwable t,
}
public EventType getType() {
- return type;
+ return this.type;
}
public String getConnectionId() {
return ((TcpConnection) this.getSource()).getConnectionId();
}
public String getConnectionFactoryName() {
- return connectionFactoryName;
+ return this.connectionFactoryName;
+ }
+
+ public Throwable getThrowable() {
+ return this.throwable;
}
@Override
@@ -53,14 +53,16 @@ public void setEventTypes(Class<? extends TcpConnectionEvent>[] eventTypes) {
}
public void onApplicationEvent(TcpConnectionEvent event) {
- if (CollectionUtils.isEmpty(this.eventTypes)) {
- this.sendMessage(messageFromEvent(event));
- }
- else {
- for (Class<? extends TcpConnectionEvent> eventType : this.eventTypes) {
- if (eventType.isAssignableFrom(event.getClass())) {
- this.sendMessage(messageFromEvent(event));
- break;
+ if (this.isRunning()) {
+ if (CollectionUtils.isEmpty(this.eventTypes)) {
+ this.sendMessage(messageFromEvent(event));
+ }
+ else {
+ for (Class<? extends TcpConnectionEvent> eventType : this.eventTypes) {
+ if (eventType.isAssignableFrom(event.getClass())) {
+ this.sendMessage(messageFromEvent(event));
+ break;
+ }
}
}
}
@@ -620,48 +620,51 @@ setCustomHeaders(). Default is TcpMessageMapper.
</xsd:documentation>
</xsd:annotation>
<xsd:complexType>
- <xsd:attribute name="id" type="xsd:string" use="optional" />
- <xsd:attribute name="event-types" type="xsd:string" use="optional">
- <xsd:annotation>
- <xsd:documentation>
- Comma delimited list of event types (classes that extend TcpConnectionEvent) that this adapter
- should send to the message channel. By default, all event types will be sent [OPTIONAL].
- Note, it is NOT possible to filter by subtype, just class - for example, the standard TcpConnectionEvent
- class has 3 subtypes (OPEN, CLOSE, EXCEPTION). This feature is intended to allow the adapter to
- be used, say, to obtain just subclasses of TcpConnectionEvent (perhaps generated by a
- TcpConnectionInterceptor, perhaps to signal handshaking of some kind).
- </xsd:documentation>
- </xsd:annotation>
- </xsd:attribute>
- <xsd:attribute name="channel" type="xsd:string">
- <xsd:annotation>
- <xsd:appinfo>
- <tool:annotation kind="ref">
- <tool:expected-type
- type="org.springframework.integration.MessageChannel" />
- </tool:annotation>
- </xsd:appinfo>
- <xsd:documentation>
- The channel to which Messages generated from Application Context events will be sent.
- </xsd:documentation>
- </xsd:annotation>
- </xsd:attribute>
- <xsd:attribute name="error-channel" type="xsd:string">
- <xsd:annotation>
- <xsd:appinfo>
- <tool:annotation kind="ref">
- <tool:expected-type
- type="org.springframework.integration.MessageChannel" />
- </tool:annotation>
- </xsd:appinfo>
- <xsd:documentation>
- If a (synchronous) downstream exception is thrown and an error-channel is specified,
- a MessagingException will be sent to this channel. Otherwise, any such exception
- will be propagated to the caller.
- </xsd:documentation>
- </xsd:annotation>
- </xsd:attribute>
- <xsd:attribute name="auto-startup" type="xsd:string" default="true" />
+ <xsd:complexContent>
+ <xsd:extension base="smartLifeCycleType">
+ <xsd:attribute name="id" type="xsd:string" use="optional" />
+ <xsd:attribute name="event-types" type="xsd:string" use="optional">
+ <xsd:annotation>
+ <xsd:documentation>
+ Comma delimited list of event types (classes that extend TcpConnectionEvent) that this adapter
+ should send to the message channel. By default, all event types will be sent [OPTIONAL].
+ Note, it is NOT possible to filter by subtype, just class - for example, the standard TcpConnectionEvent
+ class has 3 subtypes (OPEN, CLOSE, EXCEPTION). This feature is intended to allow the adapter to
+ be used, say, to obtain just subclasses of TcpConnectionEvent (perhaps generated by a
+ TcpConnectionInterceptor, perhaps to signal handshaking of some kind).
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="channel" type="xsd:string">
+ <xsd:annotation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type
+ type="org.springframework.integration.MessageChannel" />
+ </tool:annotation>
+ </xsd:appinfo>
+ <xsd:documentation>
+ The channel to which Messages generated from Application Context events will be sent.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="error-channel" type="xsd:string">
+ <xsd:annotation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type
+ type="org.springframework.integration.MessageChannel" />
+ </tool:annotation>
+ </xsd:appinfo>
+ <xsd:documentation>
+ If a (synchronous) downstream exception is thrown and an error-channel is specified,
+ a MessagingException will be sent to this channel. Otherwise, any such exception
+ will be propagated to the caller.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ </xsd:extension>
+ </xsd:complexContent>
</xsd:complexType>
</xsd:element>
@@ -419,7 +419,13 @@
</constructor-arg>
</bean>
- <ip:tcp-connection-event-inbound-channel-adapter id="eventAdapter" channel="nullChannel"
+ <ip:tcp-connection-event-inbound-channel-adapter id="eventAdapter" channel="eventChannel"
+ auto-startup="false" phase="23" error-channel="eventErrors"
event-types="org.springframework.integration.ip.config.ParserUnitTests$EventSubclass1, org.springframework.integration.ip.config.ParserUnitTests$EventSubclass2"/>
+ <int:channel id="eventChannel">
+ <int:queue />
+ </int:channel>
+
+ <int:channel id="eventErrors" />
</beans>
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2012 the original author or authors.
+ * Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
import java.util.Iterator;
import java.util.Set;
@@ -41,6 +42,7 @@
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessageHandler;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.endpoint.EventDrivenConsumer;
@@ -54,6 +56,7 @@
import org.springframework.integration.ip.tcp.connection.DefaultTcpNioSSLConnectionSupport;
import org.springframework.integration.ip.tcp.connection.DefaultTcpSSLContextSupport;
import org.springframework.integration.ip.tcp.connection.TcpConnectionEvent;
+import org.springframework.integration.ip.tcp.connection.TcpConnectionEvent.TcpConnectionEventType;
import org.springframework.integration.ip.tcp.connection.TcpConnectionEventListeningMessageProducer;
import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport;
import org.springframework.integration.ip.tcp.connection.TcpMessageMapper;
@@ -262,6 +265,9 @@
@Autowired
TcpConnectionEventListeningMessageProducer eventAdapter;
+ @Autowired
+ QueueChannel eventChannel;
+
private static volatile int adviceCalled;
@Test
@@ -649,12 +655,28 @@ public void testSecureServer() {
assertSame(socketSupport, dfa.getPropertyValue("tcpSocketSupport"));
}
+ @SuppressWarnings("unchecked")
@Test
public void testEventAdapter() {
Set<?> eventTypes = TestUtils.getPropertyValue(this.eventAdapter, "eventTypes", Set.class);
assertEquals(2, eventTypes.size());
assertTrue(eventTypes.contains(EventSubclass1.class));
assertTrue(eventTypes.contains(EventSubclass2.class));
+ assertFalse(TestUtils.getPropertyValue(this.eventAdapter, "autoStartup", Boolean.class));
+ assertEquals(23, TestUtils.getPropertyValue(this.eventAdapter, "phase"));
+ assertEquals("eventErrors", TestUtils.getPropertyValue(this.eventAdapter, "errorChannel",
+ DirectChannel.class).getComponentName());
+
+ TcpConnectionSupport connection = mock(TcpConnectionSupport.class);
+ TcpConnectionEvent event = new TcpConnectionEvent(connection, TcpConnectionEventType.OPEN, "foo");
+ this.eventAdapter.setEventTypes(new Class[] {TcpConnectionEvent.class});
+ this.eventAdapter.onApplicationEvent(event);
+ assertNull(this.eventChannel.receive(0));
+ this.eventAdapter.start();
+ this.eventAdapter.onApplicationEvent(event);
+ Message<?> eventMessage = this.eventChannel.receive(0);
+ assertNotNull(eventMessage);
+ assertSame(event, eventMessage.getPayload());
}
public static class FooAdvice extends AbstractRequestHandlerAdvice {
@@ -17,6 +17,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -42,18 +43,19 @@
@Test
public void test() throws Exception {
Socket socket = mock(Socket.class);
- final AtomicReference<ApplicationEvent> theEvent = new AtomicReference<ApplicationEvent>();
+ final AtomicReference<TcpConnectionEvent> theEvent = new AtomicReference<TcpConnectionEvent>();
TcpNetConnection conn = new TcpNetConnection(socket, false, false, new ApplicationEventPublisher() {
public void publishEvent(ApplicationEvent event) {
- theEvent.set(event);
+ theEvent.set((TcpConnectionEvent) event);
}
}, "foo");
assertNotNull(theEvent.get());
assertEquals("TcpConnectionEvent [type=OPEN, factory=foo, connectionId=" + conn.getConnectionId() + "]", theEvent.get().toString());
@SuppressWarnings("unchecked")
Serializer<Object> serializer = mock(Serializer.class);
- doThrow(new RuntimeException("foo")).when(serializer).serialize(Mockito.any(Object.class), Mockito.any(OutputStream.class));
+ RuntimeException toBeThrown = new RuntimeException("foo");
+ doThrow(toBeThrown).when(serializer).serialize(Mockito.any(Object.class), Mockito.any(OutputStream.class));
conn.setMapper(new TcpMessageMapper());
conn.setSerializer(serializer);
try {
@@ -64,6 +66,8 @@ public void publishEvent(ApplicationEvent event) {
assertNotNull(theEvent.get());
assertEquals("TcpConnectionEvent [type=EXCEPTION, factory=foo, connectionId=" + conn.getConnectionId() +
", Exception=java.lang.RuntimeException: foo]", theEvent.get().toString());
+ assertNotNull(theEvent.get().getThrowable());
+ assertSame(toBeThrown, theEvent.get().getThrowable());
conn.close();
assertNotNull(theEvent.get());
assertEquals("TcpConnectionEvent [type=CLOSE, factory=foo, connectionId=" + conn.getConnectionId() + "]", theEvent.get().toString());
@@ -38,6 +38,7 @@ public void testNoFilter() {
QueueChannel outputChannel = new QueueChannel();
eventProducer.setOutputChannel(outputChannel);
eventProducer.afterPropertiesSet();
+ eventProducer.start();
TcpConnectionSupport connection = Mockito.mock(TcpConnectionSupport.class);
TcpConnectionEvent event1 = new TcpConnectionEvent(connection, TcpConnectionEventType.OPEN, "foo");
eventProducer.onApplicationEvent(event1);
@@ -66,6 +67,7 @@ public void testFilter() {
eventProducer.setOutputChannel(outputChannel);
eventProducer.setEventTypes(new Class[] {FooEvent.class, BarEvent.class});
eventProducer.afterPropertiesSet();
+ eventProducer.start();
TcpConnectionSupport connection = Mockito.mock(TcpConnectionSupport.class);
TcpConnectionEvent event1 = new TcpConnectionEvent(connection, TcpConnectionEventType.OPEN, "foo");
eventProducer.onApplicationEvent(event1);
@@ -454,6 +454,34 @@
Configuring a connection interceptor factory chain.
</para>
</section>
+ <section id="tcp-events">
+ <title>TCP Connection Events</title>
+ <para>
+ Beginning with version 3.0, changes to <interfacename>TcpConnection</interfacename>s
+ are reported by <classname>TcpConnectionEvent</classname>s. <classname>TcpConnectionEvent</classname>
+ is a subclass of <classname>ApplicationEvent</classname> and thus can
+ be received by any <interfacename>ApplicationListener</interfacename> defined in
+ the <interfacename>ApplicationContext</interfacename>.
+ </para>
+ <para>
+ For convenience, a <code>&lt;int-ip:tcp-connection-event-inbound-channel-adapter/&gt;</code>
+ is provided. This adapter will receive all <classname>TcpConnectionEvent</classname>s (by
+ default), and send them to its <code>channel</code>. The adapter accepts an <code>event-type</code>
+ attribute, which is a list of class names for events that should be sent. This can be used
+ if an application subclasses <classname>TcpConnectionEvent</classname> for some reason, and wishes
+ to only receive those events. Omitting this attribute will mean that all
+ <classname>TcpConnectionEvent</classname>s will be sent.
+ </para>
+ <para>
+ <classname>TcpConnectionEvents</classname> contain:
+ <itemizedlist>
+ <listitem>Event type (OPEN, CLOSE, EXCEPTION)</listitem>.
+ <listitem>Connection Id (which can be used in a message header to send data to the connection)</listitem>
+ <listitem>Connection Factory Name (the bean name of the connection factory the connection belongs to)</listitem>
+ <listitem>The <classname>Throwable</classname> (for EXCEPTION event types only)</listitem>
+ </itemizedlist>
+ </para>
+ </section>
<section id="tcp-adapters">
<title>TCP Adapters</title>
<para>
@@ -31,8 +31,8 @@
been renamed to <classname>TcpConnectionInterceptorSupport</classname>.
</para>
<para>
- In addition, a new <code>&lt;int-ip:tcp-connection-event-channel-adapter/&gt;</code>
- is provided; by default, this adapter sends all <classname>TcpConnectionEvents</classname>
+ In addition, a new <code>&lt;int-ip:tcp-connection-event-inbound-channel-adapter/&gt;</code>
+ is provided; by default, this adapter sends all <classname>TcpConnectionEvent</classname>s
to a <interfacename>Channel</interfacename>.
</para>
<para>
@@ -47,9 +47,7 @@
to explicitly close a connection using its ID.
</para>
<para>
- Further documentation for these features will follow; for now, refer to the
- schema documentation for information about the new adapter, and JavaDocs for
- it as well as the other features.
+ For more information see <xref linkend="tcp-events"/>.
</para>
</section>
</section>