Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

INT-1807 Add Mechanism For Headers with TCP #763

Merged
merged 1 commit into from 8 months ago

2 participants

Gary Russell Artem Bilan
Gary Russell
Collaborator

TCP streams have no standard message structure. Therefore, the
TCP implementation previously only transferred the message
payload.

If someone wanted to convey header information, they would have
to write their own wrapper and/or use Java serialization for
the entire message.

This change provides a strategy to allow users to determine
which headers are transferred, and how.

A MessageConvertingMessageMapper is now provided that invokes
any MessageConverter. A MapMessageConverter is provided that
converts the payload, and selected heades to a Map with two
entries ("payload") and ("headers").

A MapJsonSerializer is provided that converts a Map to/from
JSON. Jackson can't delimit multiple objects in a stream
so another serializer is required to encode/decode structure.
A ByteArrayLfSerializer is used by default, inserting a
linefeed between JSON objects.

The combination of these elements now allows header
information to be transferred over TCP. Of course, users
can implment their own (de)serializer to format the
bits on the wire exactly as needed by their application.

INT-1807 Polishing

Add a test that uses a Map MessageConverter with a
Java (de)serializer.

Artem Bilan
Collaborator

Hi, Gary!
Let me review a bit.
I like TcpMessageMapper with MessageConverter strategy, but as I see MapMessageConverter and MapJsonSerializer are some limited implementations, when we expect on the other side Spring Integration aplication too, with the same configuration, of course, or some JSON-aware reader. Am I right?
In my case for ISO-8583 I convert SI-Message to the ISOMsg as payload and then delegate serialization to the jpos packager for send over TCP and something similar versa.
I mean, that I'm not sure in MapMessageConverter and MapJsonSerializer implementations yet. Maybe the same result we can achieve with ObjectToMapTransformer or ObjectToJsonTransformer, but if you had provided them, you had had some reason... My point - DRY. However, I can live with them.
Further - on the lines.

Artem Bilan artembilan commented on the diff April 03, 2013
...ion-core/src/main/java/org/springframework/integration/support/converter/MapMessageConverter.java
((50 lines not shown))
  50
+		this.headerNames = newHeaderNames;
  51
+	}
  52
+
  53
+	/**
  54
+	 * By default all headers on Map passed to {@link #toMessage(Object)}
  55
+	 * will be mapped. Set this property
  56
+	 * to 'true' if you wish to limit the inbound headers to those in
  57
+	 * the #headerNames.
  58
+	 * @param filterHeadersInToMessage
  59
+	 */
  60
+	public void setFilterHeadersInToMessage(boolean filterHeadersInToMessage) {
  61
+		this.filterHeadersInToMessage = filterHeadersInToMessage;
  62
+	}
  63
+
  64
+	public <P> Message<P> toMessage(Object object) {
  65
+		Assert.isInstanceOf(Map.class, object, "This converter expects a Map");
2
Artem Bilan Collaborator
artembilan added a note April 03, 2013

Maybe add generic type to the MessageConverter? Or it can take one type on toMessage, but produce another one from fromMessage?

Gary Russell Collaborator

Right - we can take any payload type in fromMessage; the resulting payload type here depends on what the deserializer puts in the Map's 'payload' element. So we have to leave it unspecified.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...ion-core/src/main/java/org/springframework/integration/support/converter/MapMessageConverter.java
((32 lines not shown))
  32
+ * @since 3.0
  33
+ *
  34
+ */
  35
+public class MapMessageConverter implements MessageConverter {
  36
+
  37
+	private volatile Set<String> headerNames = new HashSet<String>();
  38
+
  39
+	private volatile boolean filterHeadersInToMessage;
  40
+
  41
+	/**
  42
+	 * Headers to be converted in {@link #fromMessage(Message)}.
  43
+	 * {@link #toMessage(Object)} will populate all headers found in
  44
+	 * the map, unless {@link #filterHeadersInToMessage} is true.
  45
+	 * @param headerNames
  46
+	 */
  47
+	public void setHeaderNames(Collection<String> headerNames) {
2
Artem Bilan Collaborator
artembilan added a note April 03, 2013

How about to make just it like this:

public void setHeaderNames(String... headerNames) {
        this.headerNames = headerNames;
    }

In most cases it will be configured as <bean> and comma-delimited value for this property will be converted perfectly.
And further in the toMessage use retainAll:

if (headers != null) {
    if (this.filterHeadersInToMessage) {
        headers.keySet().retainAll(Arrays.asList(this.headerNames));
    }
    messageBuilder.copyHeaders(headers);
}
Gary Russell Collaborator

Nice! However, I will still create a Set in setHeaderNames() because retainAll() uses contains() which is inefficient for a List.

Also this.headerNames = headerNames would allow the caller to mutate the list afterwards.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Artem Bilan artembilan commented on the diff April 03, 2013
...ain/java/org/springframework/integration/ip/tcp/connection/MessageConvertingTcpMessageMapper.java
((29 lines not shown))
  29
+
  30
+	private final MessageConverter messageConverter;
  31
+
  32
+	public MessageConvertingTcpMessageMapper(MessageConverter messageConverter) {
  33
+		Assert.notNull(messageConverter, "'messasgeConverter' must not be null");
  34
+		this.messageConverter = messageConverter;
  35
+	}
  36
+
  37
+	@Override
  38
+	public Message<Object> toMessage(TcpConnection connection) throws Exception {
  39
+		Object data = connection.getPayload();
  40
+		if (data != null) {
  41
+			Message<Object> message = this.messageConverter.toMessage(data);
  42
+			MessageBuilder<Object> messageBuilder = MessageBuilder.fromMessage(message);
  43
+			this.addStandardHeaders(connection, messageBuilder);
  44
+			this.addCustomHeaders(connection, messageBuilder);
3
Artem Bilan Collaborator
artembilan added a note April 03, 2013

In some recent PR we've already discussed about opening messageBuilder instance to the end-developer API...
WDYT now? Is there some other strategy to not show messageBuilder as parameter in these methods?

Gary Russell Collaborator

Note that I made super.addStandardHeaders() and super.addCustomHeaders() final so subclasses can't override them and get a handle to the framework message builder (I remembered our earlier conversation about not allowing subclasses to be able to make arbitrary changes to the message).

Artem Bilan Collaborator
artembilan added a note April 03, 2013

Got it! Thanks, So, nevermind.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Gary Russell
Collaborator

Regarding your general comment - yes there is some overlap between converters and transformers, but this wouldn't be the first place for that. The MapJsonSerializer is really only provided as an example (of course, someone could use it) of how to take the map and put it in some format on the wire.

Also, a MessageConverter converts between an SI Message<?> and some external representation whereas a <transformer/> transforms one SI message to another.

Gary Russell
Collaborator

Rebased, pushed.

...gration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/MapJsonSerializer.java
((37 lines not shown))
  37
+ * Note that Feature.AUTO_CLOSE_SOURCE and
  38
+ * JsonGenerator.Feature.AUTO_CLOSE_TARGET
  39
+ * and will be disabled to avoid closing the connection's
  40
+ * InputStream and OutputStream.
  41
+ * <p/>
  42
+ * The jackson deserializer can't delimit multiple JSON
  43
+ * objects. Therefore another (de)serializer is used to
  44
+ * apply structure to the stream. By default, this is a
  45
+ * simple {@link ByteArrayLfSerializer}, which inserts/expects
  46
+ * LF (0x0a) between messages.
  47
+ *
  48
+ * @author Gary Russell
  49
+ * @since 3.0
  50
+ *
  51
+ */
  52
+public class MapJsonSerializer implements Serializer<Map<?, ?>>, Deserializer<Map<?, ?>>,
2
Artem Bilan Collaborator
artembilan added a note April 05, 2013

Gary, regarding this implementation.
It fully depends from Jackson 1.
However we are planning to support both #774
So, maybe we push MapJsonSerializer feature to future after merging my PR?
My point - to make dependency as less as posible, otherwise we will should provide MapJsonSerializer for Jackson 2.
WDYT?

Gary Russell Collaborator

Yes; I agree - I will try to get your PR reviewed/merged today or Monday.

And, I think I can just make this serializer J2 (no need for both, I think - if someone really, really wants a 1.x version, they can make their own).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...gration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/MapJsonSerializer.java
((80 lines not shown))
  80
+	}
  81
+
  82
+	/**
  83
+	 * A {@link Serializer} that will delimit the full JSON content in
  84
+	 * the stream. Default is
  85
+	 * {@link ByteArrayLfSerializer}.
  86
+	 * @param packetSerializer the packetSerializer
  87
+	 */
  88
+	public void setPacketSerializer(Serializer<byte[]> packetSerializer) {
  89
+		Assert.notNull(packetSerializer, "'packetSerializer' cannot be null");
  90
+		this.packetSerializer = packetSerializer;
  91
+	}
  92
+
  93
+	public void afterPropertiesSet() throws Exception {
  94
+		this.objectMapper.configure(Feature.AUTO_CLOSE_SOURCE, false);
  95
+		this.objectMapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
6
Artem Bilan Collaborator
artembilan added a note May 08, 2013

Gery, let's get back to this class after merging JsonObjectMapper abstraction.
I dwell on these two lines because I didn't introduce any configuration hook and I didn't know how to abstract it in case auto-detecation of Jackson lib at CLASSPATH.
Do you have any idea?

Gary Russell Collaborator
garyrussell added a note May 08, 2013

Ugh; you are right; let me think a while.

Artem Bilan Collaborator
artembilan added a note June 10, 2013

this.objectMapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
Does not make sense: it applies for local ByteArrayOutputStream in the serialize below.
From other side, Gary, comes to mind a solution with some generic JsonObjectMapperConfiguration, which will be mapped to concrate configuration based on JsonObjectMapper implementation. E.g. GSON doesn't support similar featuers, but it has GsonBuilder. In the end it may look like thankless work to try provide hooks for all existing configuration features in different JSON mapping engines...
WDYT?

Gary Russell Collaborator

When we get the package tangle code merged, I'll re-visit this PR.

Gary Russell Collaborator

Artem; since this is brand new, I am inclined to support only Jackson2 out of the box (wire up a JsonObjectMapper with a properly configured Jackson2 mapper), but allow the user to inject his own JsonObjectMapper if he wishes to use Jackson1 or some other implementation.

WDYT?

Artem Bilan Collaborator

What you say is right. But my general concern here that you are using feature
this.objectMapper.configure(Feature.AUTO_CLOSE_SOURCE, false);
I think I'll play today with JsonObjectMapper here, before you wake up. :smile:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...ore/src/test/java/org/springframework/integration/support/converter/MapMessageConverterTests.java
((77 lines not shown))
  77
+		Message<String> message = MessageBuilder.withPayload("foo")
  78
+				.setHeader("bar", "baz")
  79
+				.setHeader("baz", "qux")
  80
+				.build();
  81
+		MapMessageConverter converter = new MapMessageConverter();
  82
+		converter.setHeaderNames("bar");
  83
+		@SuppressWarnings("unchecked")
  84
+		Map<String, Object> map =  (Map<String, Object>) converter.fromMessage(message);
  85
+
  86
+		map.remove("payload");
  87
+
  88
+		try {
  89
+			converter.toMessage(map);
  90
+			fail("Expected exception");
  91
+		}
  92
+		catch (IllegalArgumentException e) {}
1
Artem Bilan Collaborator
artembilan added a note June 06, 2013

Hi!
This test looks not full.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Artem Bilan
Collaborator

Gary, take a look, please, into my polishing:
artembilan@06259ef
And if it is OK, there is need to change a bit IP doc.

Gary Russell INT-1807 Add Mechanism For Headers with TCP
TCP streams have no standard message structure. Therefore, the
TCP implementation previously only transferred the message
payload.

If someone wanted to convey header information, they would have
to write their own wrapper and/or use Java serialization for
the entire message.

This change provides a strategy to allow users to determine
which headers are transferred, and how.

A MessageConvertingMessageMapper is now provided that invokes
any MessageConverter. A MapMessageConverter is provided that
converts the payload, and selected heades to a Map with two
entries ("payload") and ("headers").

A MapJsonSerializer is provided that converts a Map to/from
JSON. Jackson can't delimit multiple objects in a stream
so another serializer is required to encode/decode structure.
A ByteArrayLfSerializer is used by default, inserting a
linefeed between JSON objects.

The combination of these elements now allows header
information to be transferred over TCP. Of course, users
can implment their own (de)serializer to format the
bits on the wire exactly as needed by their application.

INT-1807 Polishing

Add a test that uses a Map MessageConverter with a
Java (de)serializer.

INT-1807: Polishing

INT-1807: Rebased and polished

Change `MapJsonSerializer` to use `JsonObjectMapper` abstraction

Doc Polishing
0699fdc
Gary Russell
Collaborator

Thanks Artem; I applied your commit on top and polished the docs; I realize now that those Jackson settings were no longer required after I added the second serializer (LF) to delineate between json objects. Now that the objectmapper is no longer acting on the socket's inputstream it doesn't matter that it closes the stream (it is simply closing a byte array stream that is already at the end.

Merging...

Gary Russell garyrussell merged commit 0699fdc into from August 16, 2013
Gary Russell garyrussell closed this August 16, 2013
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 1 unique commit by 2 authors.

Aug 16, 2013
Gary Russell INT-1807 Add Mechanism For Headers with TCP
TCP streams have no standard message structure. Therefore, the
TCP implementation previously only transferred the message
payload.

If someone wanted to convey header information, they would have
to write their own wrapper and/or use Java serialization for
the entire message.

This change provides a strategy to allow users to determine
which headers are transferred, and how.

A MessageConvertingMessageMapper is now provided that invokes
any MessageConverter. A MapMessageConverter is provided that
converts the payload, and selected heades to a Map with two
entries ("payload") and ("headers").

A MapJsonSerializer is provided that converts a Map to/from
JSON. Jackson can't delimit multiple objects in a stream
so another serializer is required to encode/decode structure.
A ByteArrayLfSerializer is used by default, inserting a
linefeed between JSON objects.

The combination of these elements now allows header
information to be transferred over TCP. Of course, users
can implment their own (de)serializer to format the
bits on the wire exactly as needed by their application.

INT-1807 Polishing

Add a test that uses a Map MessageConverter with a
Java (de)serializer.

INT-1807: Polishing

INT-1807: Rebased and polished

Change `MapJsonSerializer` to use `JsonObjectMapper` abstraction

Doc Polishing
0699fdc
This page is out of date. Refresh to see the latest.
102  ...ntegration-core/src/main/java/org/springframework/integration/support/converter/MapMessageConverter.java
... ...
@@ -0,0 +1,102 @@
  1
+/*
  2
+ * Copyright 2002-2013 the original author or authors.
  3
+ *
  4
+ * Licensed under the Apache License, Version 2.0 (the "License");
  5
+ * you may not use this file except in compliance with the License.
  6
+ * You may obtain a copy of the License at
  7
+ *
  8
+ *      http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS,
  12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13
+ * See the License for the specific language governing permissions and
  14
+ * limitations under the License.
  15
+ */
  16
+package org.springframework.integration.support.converter;
  17
+
  18
+import java.util.Arrays;
  19
+import java.util.Collection;
  20
+import java.util.HashMap;
  21
+import java.util.HashSet;
  22
+import java.util.Map;
  23
+import java.util.Map.Entry;
  24
+import java.util.Set;
  25
+
  26
+import org.springframework.integration.Message;
  27
+import org.springframework.integration.support.MessageBuilder;
  28
+import org.springframework.util.Assert;
  29
+
  30
+/**
  31
+ * Converts to/from a Map with 2 keys ('headers' and 'payload').
  32
+ * @author Gary Russell
  33
+ * @since 3.0
  34
+ *
  35
+ */
  36
+public class MapMessageConverter implements MessageConverter {
  37
+
  38
+	private volatile String[] headerNames;
  39
+
  40
+	private volatile boolean filterHeadersInToMessage;
  41
+
  42
+	/**
  43
+	 * Headers to be converted in {@link #fromMessage(Message)}.
  44
+	 * {@link #toMessage(Object)} will populate all headers found in
  45
+	 * the map, unless {@link #filterHeadersInToMessage} is true.
  46
+	 * @param headerNames
  47
+	 */
  48
+	public void setHeaderNames(String... headerNames) {
  49
+		this.headerNames = headerNames;
  50
+	}
  51
+
  52
+	/**
  53
+	 * By default all headers on Map passed to {@link #toMessage(Object)}
  54
+	 * will be mapped. Set this property
  55
+	 * to 'true' if you wish to limit the inbound headers to those in
  56
+	 * the #headerNames.
  57
+	 * @param filterHeadersInToMessage
  58
+	 */
  59
+	public void setFilterHeadersInToMessage(boolean filterHeadersInToMessage) {
  60
+		this.filterHeadersInToMessage = filterHeadersInToMessage;
  61
+	}
  62
+
  63
+	public <P> Message<P> toMessage(Object object) {
  64
+		Assert.isInstanceOf(Map.class, object, "This converter expects a Map");
  65
+		@SuppressWarnings("unchecked")
  66
+		Map<String, ?> map = (Map<String, ?>) object;
  67
+		Object payload = map.get("payload");
  68
+		Assert.notNull(payload, "'payload' entry cannot be null");
  69
+		MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(payload);
  70
+		@SuppressWarnings("unchecked")
  71
+		Map<String, ?> headers = (Map<String, ?>) map.get("headers");
  72
+		if (headers != null) {
  73
+			if (this.filterHeadersInToMessage) {
  74
+				headers.keySet().retainAll(Arrays.asList(this.headerNames));
  75
+			}
  76
+			messageBuilder.copyHeaders(headers);
  77
+			/*for (Entry<String, ?> entry : headers.entrySet()) {
  78
+				if (this.filterHeadersInToMessage ? this.headerNames.contains(entry.getKey()) : true) {
  79
+					messageBuilder.setHeader(entry.getKey(), entry.getValue());
  80
+				}
  81
+			}*/
  82
+		}
  83
+		@SuppressWarnings("unchecked")
  84
+		Message<P> convertedMessage = (Message<P>) messageBuilder.build();
  85
+		return convertedMessage;
  86
+	}
  87
+
  88
+	public <P> Object fromMessage(Message<P> message) {
  89
+		Map<String,Object> map = new HashMap<String, Object>();
  90
+		map.put("payload", message.getPayload());
  91
+		Map<String, Object> headers = new HashMap<String, Object>();
  92
+		for (String headerName : headerNames) {
  93
+			Object header = message.getHeaders().get(headerName);
  94
+			if (header != null) {
  95
+				headers.put(headerName, header);
  96
+			}
  97
+		}
  98
+		map.put("headers", headers);
  99
+		return map;
  100
+	}
  101
+
  102
+}
130  ...ation-core/src/test/java/org/springframework/integration/support/converter/MapMessageConverterTests.java
... ...
@@ -0,0 +1,130 @@
  1
+/*
  2
+ * Copyright 2002-2013 the original author or authors.
  3
+ *
  4
+ * Licensed under the Apache License, Version 2.0 (the "License");
  5
+ * you may not use this file except in compliance with the License.
  6
+ * You may obtain a copy of the License at
  7
+ *
  8
+ *      http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS,
  12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13
+ * See the License for the specific language governing permissions and
  14
+ * limitations under the License.
  15
+ */
  16
+package org.springframework.integration.support.converter;
  17
+
  18
+import static org.junit.Assert.assertEquals;
  19
+import static org.junit.Assert.assertFalse;
  20
+import static org.junit.Assert.assertNotNull;
  21
+import static org.junit.Assert.assertNull;
  22
+import static org.junit.Assert.fail;
  23
+
  24
+import java.util.Map;
  25
+
  26
+import org.junit.Test;
  27
+import org.springframework.integration.Message;
  28
+import org.springframework.integration.support.MessageBuilder;
  29
+
  30
+/**
  31
+ * @author Gary Russell
  32
+ * @author Artem Bilan
  33
+ * @since 3.0
  34
+ *
  35
+ */
  36
+public class MapMessageConverterTests {
  37
+
  38
+	@Test
  39
+	public void testFromMessageToMessage() throws Exception {
  40
+		Message<String> message = MessageBuilder.withPayload("foo")
  41
+				.setHeader("bar", "baz")
  42
+				.setHeader("baz", "qux")
  43
+				.build();
  44
+		MapMessageConverter converter = new MapMessageConverter();
  45
+		converter.setHeaderNames("bar");
  46
+		@SuppressWarnings("unchecked")
  47
+		Map<String, Object> map =  (Map<String, Object>) converter.fromMessage(message);
  48
+		@SuppressWarnings("unchecked")
  49
+		Map<String, Object> headers = (Map<String, Object>) map.get("headers");
  50
+
  51
+		assertNotNull(headers);
  52
+		assertNotNull(map.get("payload"));
  53
+		assertEquals("foo", map.get("payload"));
  54
+		assertNotNull(headers.get("bar"));
  55
+		assertEquals("baz", headers.get("bar"));
  56
+		assertNull(headers.get("baz"));
  57
+
  58
+		headers.put("baz", "qux");
  59
+		message = converter.toMessage(map);
  60
+		assertEquals("foo", message.getPayload());
  61
+		assertEquals("baz", message.getHeaders().get("bar"));
  62
+		assertEquals("qux", message.getHeaders().get("baz"));
  63
+
  64
+		converter.setFilterHeadersInToMessage(true);
  65
+
  66
+		message = converter.toMessage(map);
  67
+		assertEquals("foo", message.getPayload());
  68
+		assertEquals("baz", message.getHeaders().get("bar"));
  69
+		assertNull(message.getHeaders().get("baz"));
  70
+	}
  71
+
  72
+	@Test
  73
+	public void testInvalid() throws Exception {
  74
+		Message<String> message = MessageBuilder.withPayload("foo")
  75
+				.setHeader("bar", "baz")
  76
+				.setHeader("baz", "qux")
  77
+				.build();
  78
+		MapMessageConverter converter = new MapMessageConverter();
  79
+		converter.setHeaderNames("bar");
  80
+		@SuppressWarnings("unchecked")
  81
+		Map<String, Object> map =  (Map<String, Object>) converter.fromMessage(message);
  82
+
  83
+		map.remove("payload");
  84
+
  85
+		try {
  86
+			converter.toMessage(map);
  87
+			fail("Expected exception");
  88
+		}
  89
+		catch (IllegalArgumentException e) {
  90
+			assertEquals("'payload' entry cannot be null", e.getMessage());
  91
+		}
  92
+	}
  93
+
  94
+	@Test
  95
+	public void testNoHeaders() throws Exception {
  96
+		Message<String> message = MessageBuilder.withPayload("foo")
  97
+				.build();
  98
+		MapMessageConverter converter = new MapMessageConverter();
  99
+		converter.setHeaderNames("bar");
  100
+		@SuppressWarnings("unchecked")
  101
+		Map<String, Object> map =  (Map<String, Object>) converter.fromMessage(message);
  102
+		@SuppressWarnings("unchecked")
  103
+		Map<String, Object> headers = (Map<String, Object>) map.get("headers");
  104
+
  105
+		assertNotNull(headers);
  106
+		assertEquals(0, headers.size());
  107
+		map.remove("headers");
  108
+		message = converter.toMessage(map);
  109
+		assertEquals("foo", message.getPayload());
  110
+	}
  111
+
  112
+	@Test
  113
+	public void testNotIncludedIfNull() throws Exception {
  114
+		Message<String> message = MessageBuilder.withPayload("foo")
  115
+				.setHeader("bar", null)
  116
+				.build();
  117
+		MapMessageConverter converter = new MapMessageConverter();
  118
+		converter.setHeaderNames("bar");
  119
+		@SuppressWarnings("unchecked")
  120
+		Map<String, Object> map =  (Map<String, Object>) converter.fromMessage(message);
  121
+		@SuppressWarnings("unchecked")
  122
+		Map<String, Object> headers = (Map<String, Object>) map.get("headers");
  123
+
  124
+		assertNotNull(headers);
  125
+		assertNotNull(map.get("payload"));
  126
+		assertEquals("foo", map.get("payload"));
  127
+		assertFalse(headers.keySet().contains("bar"));
  128
+		assertEquals(0, headers.size());
  129
+	}
  130
+}
60  ...p/src/main/java/org/springframework/integration/ip/tcp/connection/MessageConvertingTcpMessageMapper.java
... ...
@@ -0,0 +1,60 @@
  1
+/*
  2
+ * Copyright 2002-2013 the original author or authors.
  3
+ *
  4
+ * Licensed under the Apache License, Version 2.0 (the "License");
  5
+ * you may not use this file except in compliance with the License.
  6
+ * You may obtain a copy of the License at
  7
+ *
  8
+ *      http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS,
  12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13
+ * See the License for the specific language governing permissions and
  14
+ * limitations under the License.
  15
+ */
  16
+package org.springframework.integration.ip.tcp.connection;
  17
+
  18
+import org.springframework.integration.Message;
  19
+import org.springframework.integration.support.MessageBuilder;
  20
+import org.springframework.integration.support.converter.MessageConverter;
  21
+import org.springframework.util.Assert;
  22
+
  23
+/**
  24
+ * @author Gary Russell
  25
+ * @since 3.0
  26
+ *
  27
+ */
  28
+public class MessageConvertingTcpMessageMapper extends TcpMessageMapper {
  29
+
  30
+	private final MessageConverter messageConverter;
  31
+
  32
+	public MessageConvertingTcpMessageMapper(MessageConverter messageConverter) {
  33
+		Assert.notNull(messageConverter, "'messasgeConverter' must not be null");
  34
+		this.messageConverter = messageConverter;
  35
+	}
  36
+
  37
+	@Override
  38
+	public Message<Object> toMessage(TcpConnection connection) throws Exception {
  39
+		Object data = connection.getPayload();
  40
+		if (data != null) {
  41
+			Message<Object> message = this.messageConverter.toMessage(data);
  42
+			MessageBuilder<Object> messageBuilder = MessageBuilder.fromMessage(message);
  43
+			this.addStandardHeaders(connection, messageBuilder);
  44
+			this.addCustomHeaders(connection, messageBuilder);
  45
+			return messageBuilder.build();
  46
+		}
  47
+		else {
  48
+			if (logger.isWarnEnabled()) {
  49
+				logger.warn("Null payload from connection " + connection.getConnectionId());
  50
+			}
  51
+			return null;
  52
+		}
  53
+	}
  54
+
  55
+	@Override
  56
+	public Object fromMessage(Message<?> message) throws Exception {
  57
+		return this.messageConverter.fromMessage(message);
  58
+	}
  59
+
  60
+}
47  spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapper.java
@@ -18,6 +18,8 @@
18 18
 import java.io.UnsupportedEncodingException;
19 19
 import java.util.Map;
20 20
 
  21
+import org.apache.commons.logging.Log;
  22
+import org.apache.commons.logging.LogFactory;
21 23
 import org.springframework.integration.Message;
22 24
 import org.springframework.integration.MessageHandlingException;
23 25
 import org.springframework.integration.ip.IpHeaders;
@@ -43,6 +45,8 @@
43 45
 		InboundMessageMapper<TcpConnection>,
44 46
 		OutboundMessageMapper<Object> {
45 47
 
  48
+	protected final Log logger = LogFactory.getLog(this.getClass());
  49
+
46 50
 	private volatile String charset = "UTF-8";
47 51
 
48 52
 	private volatile boolean stringToBytes = true;
@@ -54,26 +58,39 @@
54 58
 		Object payload = connection.getPayload();
55 59
 		if (payload != null) {
56 60
 			MessageBuilder<Object> messageBuilder = MessageBuilder.withPayload(payload);
57  
-			String connectionId = connection.getConnectionId();
58  
-			messageBuilder
59  
-				.setHeader(IpHeaders.HOSTNAME, connection.getHostName())
60  
-				.setHeader(IpHeaders.IP_ADDRESS, connection.getHostAddress())
61  
-				.setHeader(IpHeaders.REMOTE_PORT, connection.getPort())
62  
-				.setHeader(IpHeaders.CONNECTION_ID, connectionId);
63  
-			if (this.applySequence) {
64  
-				messageBuilder
65  
-					.setCorrelationId(connectionId)
66  
-					.setSequenceNumber((int) connection.incrementAndGetConnectionSequence());
67  
-			}
68  
-			Map<String, ?> customHeaders = this.supplyCustomHeaders(connection);
69  
-			if (customHeaders != null) {
70  
-				messageBuilder.copyHeadersIfAbsent(customHeaders);
71  
-			}
  61
+			this.addStandardHeaders(connection, messageBuilder);
  62
+			this.addCustomHeaders(connection, messageBuilder);
72 63
 			message = messageBuilder.build();
73 64
 		}
  65
+		else {
  66
+			if (logger.isWarnEnabled()) {
  67
+				logger.warn("Null payload from connection " + connection.getConnectionId());
  68
+			}
  69
+		}
74 70
 		return message;
75 71
 	}
76 72
 
  73
+	protected final void addStandardHeaders(TcpConnection connection, MessageBuilder<?> messageBuilder) {
  74
+		String connectionId = connection.getConnectionId();
  75
+		messageBuilder
  76
+			.setHeader(IpHeaders.HOSTNAME, connection.getHostName())
  77
+			.setHeader(IpHeaders.IP_ADDRESS, connection.getHostAddress())
  78
+			.setHeader(IpHeaders.REMOTE_PORT, connection.getPort())
  79
+			.setHeader(IpHeaders.CONNECTION_ID, connectionId);
  80
+		if (this.applySequence) {
  81
+			messageBuilder
  82
+				.setCorrelationId(connectionId)
  83
+				.setSequenceNumber((int) connection.incrementAndGetConnectionSequence());
  84
+		}
  85
+	}
  86
+
  87
+	protected final void addCustomHeaders(TcpConnection connection, MessageBuilder<?> messageBuilder) {
  88
+		Map<String, ?> customHeaders = this.supplyCustomHeaders(connection);
  89
+		if (customHeaders != null) {
  90
+			messageBuilder.copyHeadersIfAbsent(customHeaders);
  91
+		}
  92
+	}
  93
+
77 94
 	/**
78 95
 	 * Override to provide additional headers. The standard headers cannot be overridden
79 96
 	 * and any such headers will be ignored if provided in the result.
111  ...ng-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/MapJsonSerializer.java
... ...
@@ -0,0 +1,111 @@
  1
+/*
  2
+ * Copyright 2002-2013 the original author or authors.
  3
+ *
  4
+ * Licensed under the Apache License, Version 2.0 (the "License");
  5
+ * you may not use this file except in compliance with the License.
  6
+ * You may obtain a copy of the License at
  7
+ *
  8
+ *      http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS,
  12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13
+ * See the License for the specific language governing permissions and
  14
+ * limitations under the License.
  15
+ */
  16
+package org.springframework.integration.ip.tcp.serializer;
  17
+
  18
+import java.io.ByteArrayInputStream;
  19
+import java.io.ByteArrayOutputStream;
  20
+import java.io.IOException;
  21
+import java.io.InputStream;
  22
+import java.io.InputStreamReader;
  23
+import java.io.OutputStream;
  24
+import java.io.OutputStreamWriter;
  25
+import java.util.Map;
  26
+
  27
+import org.springframework.core.serializer.Deserializer;
  28
+import org.springframework.core.serializer.Serializer;
  29
+import org.springframework.integration.support.json.JacksonJsonObjectMapperProvider;
  30
+import org.springframework.integration.support.json.JsonObjectMapper;
  31
+import org.springframework.util.Assert;
  32
+
  33
+/**
  34
+ * Serializes a {@link Map} as JSON. Deserializes JSON to
  35
+ * a {@link Map}. The default {@link JacksonJsonObjectMapperProvider#newInstance()} can be
  36
+ * overridden using {@link #setJsonObjectMapper(JsonObjectMapper)}.
  37
+ * <p/>
  38
+ * The JSON deserializer can't delimit multiple JSON
  39
+ * objects. Therefore another (de)serializer is used to
  40
+ * apply structure to the stream. By default, this is a
  41
+ * simple {@link ByteArrayLfSerializer}, which inserts/expects
  42
+ * LF (0x0a) between messages.
  43
+ *
  44
+ * @author Gary Russell
  45
+ * @author Artem Bilan
  46
+ * @since 3.0
  47
+ *
  48
+ */
  49
+public class MapJsonSerializer implements Serializer<Map<?, ?>>, Deserializer<Map<?, ?>> {
  50
+
  51
+	private volatile JsonObjectMapper<?> jsonObjectMapper = JacksonJsonObjectMapperProvider.newInstance();
  52
+
  53
+	private volatile Deserializer<byte[]> packetDeserializer = new ByteArrayLfSerializer();
  54
+
  55
+	private volatile Serializer<byte[]> packetSerializer = new ByteArrayLfSerializer();
  56
+
  57
+	/**
  58
+	 * An {@link JsonObjectMapper} to be used for the conversion to/from
  59
+	 * JSON. Use this if you wish to set additional {@link JsonObjectMapper} implementation features.
  60
+	 * @param jsonObjectMapper the jsonObjectMapper.
  61
+	 */
  62
+	public void setJsonObjectMapper(JsonObjectMapper<?> jsonObjectMapper) {
  63
+		Assert.notNull(jsonObjectMapper, "'jsonObjectMapper' cannot be null");
  64
+		this.jsonObjectMapper = jsonObjectMapper;
  65
+	}
  66
+
  67
+	/**
  68
+	 * A {@link Deserializer} that will construct the full JSON content from
  69
+	 * the stream which is then passed to the JsonObjectMapper. Default is
  70
+	 * {@link ByteArrayLfSerializer}.
  71
+	 * @param packetDeserializer the packetDeserializer
  72
+	 */
  73
+	public void setPacketDeserializer(Deserializer<byte[]> packetDeserializer) {
  74
+		Assert.notNull(packetDeserializer, "'packetDeserializer' cannot be null");
  75
+		this.packetDeserializer = packetDeserializer;
  76
+	}
  77
+
  78
+	/**
  79
+	 * A {@link Serializer} that will delimit the full JSON content in
  80
+	 * the stream. Default is
  81
+	 * {@link ByteArrayLfSerializer}.
  82
+	 * @param packetSerializer the packetSerializer
  83
+	 */
  84
+	public void setPacketSerializer(Serializer<byte[]> packetSerializer) {
  85
+		Assert.notNull(packetSerializer, "'packetSerializer' cannot be null");
  86
+		this.packetSerializer = packetSerializer;
  87
+	}
  88
+
  89
+	public Map<?, ?> deserialize(InputStream inputStream) throws IOException {
  90
+		byte[] bytes = this.packetDeserializer.deserialize(inputStream);
  91
+		try {
  92
+			return this.jsonObjectMapper.fromJson(new InputStreamReader(new ByteArrayInputStream(bytes)), Map.class);
  93
+		}
  94
+		catch (Exception e) {
  95
+			throw new IOException(e);
  96
+		}
  97
+	}
  98
+
  99
+	public void serialize(Map<?, ?> object, OutputStream outputStream) throws IOException {
  100
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
  101
+		try {
  102
+			this.jsonObjectMapper.toJson(object, new OutputStreamWriter(baos));
  103
+		}
  104
+		catch (Exception e) {
  105
+			throw new IOException(e);
  106
+		}
  107
+		this.packetSerializer.serialize(baos.toByteArray(), outputStream);
  108
+		outputStream.flush();
  109
+	}
  110
+
  111
+}
76  ...ntegration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapperTests.java
@@ -20,6 +20,8 @@
20 20
 import static org.mockito.Mockito.mock;
21 21
 import static org.mockito.Mockito.when;
22 22
 
  23
+import java.io.ByteArrayInputStream;
  24
+import java.io.ByteArrayOutputStream;
23 25
 import java.net.Socket;
24 26
 import java.util.Collections;
25 27
 import java.util.Map;
@@ -27,9 +29,13 @@
27 29
 import javax.net.SocketFactory;
28 30
 
29 31
 import org.junit.Test;
  32
+import org.springframework.core.serializer.DefaultDeserializer;
  33
+import org.springframework.core.serializer.DefaultSerializer;
30 34
 import org.springframework.integration.Message;
31 35
 import org.springframework.integration.ip.IpHeaders;
  36
+import org.springframework.integration.ip.tcp.serializer.MapJsonSerializer;
32 37
 import org.springframework.integration.support.MessageBuilder;
  38
+import org.springframework.integration.support.converter.MapMessageConverter;
33 39
 
34 40
 /**
35 41
  * @author Gary Russell
@@ -207,4 +213,72 @@ public void testFromMessage() throws Exception {
207 213
 
208 214
 	}
209 215
 
210  
-}
  216
+	@Test
  217
+	public void testMapMessageConvertingOutboundJson() throws Exception {
  218
+		Message<String> message = MessageBuilder.withPayload("foo")
  219
+				.setHeader("bar", "baz")
  220
+				.build();
  221
+		MapMessageConverter converter = new MapMessageConverter();
  222
+		converter.setHeaderNames("bar");
  223
+		MessageConvertingTcpMessageMapper mapper = new MessageConvertingTcpMessageMapper(converter);
  224
+		Map<?, ?> map = (Map<?, ?>) mapper.fromMessage(message);
  225
+		MapJsonSerializer serializer = new MapJsonSerializer();
  226
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
  227
+		serializer.serialize(map, baos);
  228
+		assertEquals("{\"headers\":{\"bar\":\"baz\"},\"payload\":\"foo\"}\n", new String(baos.toByteArray(), "UTF-8"));
  229
+	}
  230
+
  231
+	@Test
  232
+	public void testMapMessageConvertingInboundJson() throws Exception {
  233
+		String json = "{\"headers\":{\"bar\":\"baz\"},\"payload\":\"foo\"}\n";
  234
+		MapMessageConverter converter = new MapMessageConverter();
  235
+		MessageConvertingTcpMessageMapper mapper = new MessageConvertingTcpMessageMapper(converter);
  236
+		MapJsonSerializer deserializer = new MapJsonSerializer();
  237
+		Map<?, ?> map = deserializer.deserialize(new ByteArrayInputStream(json.getBytes("UTF-8")));
  238
+
  239
+		TcpConnection connection = mock(TcpConnection.class);
  240
+		when(connection.getPayload()).thenReturn(map);
  241
+		when(connection.getHostName()).thenReturn("someHost");
  242
+		when(connection.getHostAddress()).thenReturn("1.1.1.1");
  243
+		when(connection.getPort()).thenReturn(1234);
  244
+		when(connection.getConnectionId()).thenReturn("someId");
  245
+		Message<?> message = mapper.toMessage(connection);
  246
+		assertEquals("foo", message.getPayload());
  247
+		assertEquals("baz", message.getHeaders().get("bar"));
  248
+		assertEquals("someHost", message.getHeaders().get(IpHeaders.HOSTNAME));
  249
+		assertEquals("1.1.1.1", message.getHeaders().get(IpHeaders.IP_ADDRESS));
  250
+		assertEquals(1234, message.getHeaders().get(IpHeaders.REMOTE_PORT));
  251
+		assertEquals("someId", message.getHeaders().get(IpHeaders.CONNECTION_ID));
  252
+	}
  253
+
  254
+	@Test
  255
+	public void testMapMessageConvertingBothWaysJava() throws Exception {
  256
+		Message<String> outMessage = MessageBuilder.withPayload("foo")
  257
+				.setHeader("bar", "baz")
  258
+				.build();
  259
+		MapMessageConverter converter = new MapMessageConverter();
  260
+		converter.setHeaderNames("bar");
  261
+		MessageConvertingTcpMessageMapper mapper = new MessageConvertingTcpMessageMapper(converter);
  262
+		Map<?, ?> map = (Map<?, ?>) mapper.fromMessage(outMessage);
  263
+		DefaultSerializer serializer = new DefaultSerializer();
  264
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
  265
+		serializer.serialize(map, baos);
  266
+
  267
+		DefaultDeserializer deserializer = new DefaultDeserializer();
  268
+		map = (Map<?, ?>) deserializer.deserialize(new ByteArrayInputStream(baos.toByteArray()));
  269
+		TcpConnection connection = mock(TcpConnection.class);
  270
+		when(connection.getPayload()).thenReturn(map);
  271
+		when(connection.getHostName()).thenReturn("someHost");
  272
+		when(connection.getHostAddress()).thenReturn("1.1.1.1");
  273
+		when(connection.getPort()).thenReturn(1234);
  274
+		when(connection.getConnectionId()).thenReturn("someId");
  275
+		Message<?> message = mapper.toMessage(connection);
  276
+		assertEquals("foo", message.getPayload());
  277
+		assertEquals("baz", message.getHeaders().get("bar"));
  278
+		assertEquals("someHost", message.getHeaders().get(IpHeaders.HOSTNAME));
  279
+		assertEquals("1.1.1.1", message.getHeaders().get(IpHeaders.IP_ADDRESS));
  280
+		assertEquals(1234, message.getHeaders().get(IpHeaders.REMOTE_PORT));
  281
+		assertEquals("someId", message.getHeaders().get(IpHeaders.CONNECTION_ID));
  282
+	}
  283
+
  284
+}
65  ...ntegration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java
@@ -21,19 +21,30 @@
21 21
 import static org.mockito.Mockito.mock;
22 22
 import static org.mockito.Mockito.when;
23 23
 
  24
+import java.io.ByteArrayOutputStream;
24 25
 import java.io.InputStream;
  26
+import java.io.PipedInputStream;
  27
+import java.io.PipedOutputStream;
25 28
 import java.net.Socket;
26 29
 import java.nio.channels.SocketChannel;
27 30
 import java.util.concurrent.atomic.AtomicReference;
28 31
 
29 32
 import org.apache.commons.logging.Log;
  33
+
30 34
 import org.junit.Test;
31 35
 import org.mockito.Mockito;
32 36
 import org.mockito.invocation.InvocationOnMock;
33 37
 import org.mockito.stubbing.Answer;
  38
+
34 39
 import org.springframework.beans.DirectFieldAccessor;
  40
+import org.springframework.context.ApplicationEvent;
  41
+import org.springframework.context.ApplicationEventPublisher;
  42
+import org.springframework.integration.Message;
35 43
 import org.springframework.integration.ip.tcp.connection.TcpNioConnection.ChannelInputStream;
36 44
 import org.springframework.integration.ip.tcp.serializer.ByteArrayStxEtxSerializer;
  45
+import org.springframework.integration.ip.tcp.serializer.MapJsonSerializer;
  46
+import org.springframework.integration.support.MessageBuilder;
  47
+import org.springframework.integration.support.converter.MapMessageConverter;
37 48
 import org.springframework.integration.test.util.TestUtils;
38 49
 
39 50
 /**
@@ -43,13 +54,18 @@
43 54
  */
44 55
 public class TcpNetConnectionTests {
45 56
 
  57
+	private final ApplicationEventPublisher nullPublisher = new ApplicationEventPublisher() {
  58
+		public void publishEvent(ApplicationEvent event) {
  59
+		}
  60
+	};
  61
+
46 62
 	@Test
47 63
 	public void testErrorLog() throws Exception {
48 64
 		Socket socket = mock(Socket.class);
49 65
 		InputStream stream = mock(InputStream.class);
50 66
 		when(socket.getInputStream()).thenReturn(stream);
51 67
 		when(stream.read()).thenReturn((int) 'x');
52  
-		TcpNetConnection connection = new TcpNetConnection(socket, true, false, null, null);
  68
+		TcpNetConnection connection = new TcpNetConnection(socket, true, false, nullPublisher, null);
53 69
 		connection.setDeserializer(new ByteArrayStxEtxSerializer());
54 70
 		final AtomicReference<Object> log = new AtomicReference<Object>();
55 71
 		Log logger = mock(Log.class);
@@ -76,10 +92,55 @@ public void testBinary() throws Exception {
76 92
 		SocketChannel socketChannel = mock(SocketChannel.class);
77 93
 		Socket socket = mock(Socket.class);
78 94
 		when(socketChannel.socket()).thenReturn(socket);
79  
-		TcpNioConnection connection = new TcpNioConnection(socketChannel, true, false, null, null);
  95
+		TcpNioConnection connection = new TcpNioConnection(socketChannel, true, false, nullPublisher, null);
80 96
 		ChannelInputStream inputStream = TestUtils.getPropertyValue(connection, "channelInputStream", ChannelInputStream.class);
81 97
 		inputStream.write(new byte[] {(byte) 0x80}, 1);
82 98
 		assertEquals(0x80, inputStream.read());
83 99
 	}
84 100
 
  101
+	@Test
  102
+	public void transferHeaders() throws Exception {
  103
+		Socket inSocket = mock(Socket.class);
  104
+		PipedInputStream pipe = new PipedInputStream();
  105
+		when(inSocket.getInputStream()).thenReturn(pipe);
  106
+
  107
+		TcpConnectionSupport inboundConnection = new TcpNetConnection(inSocket, true, false, nullPublisher, null);
  108
+		inboundConnection.setDeserializer(new MapJsonSerializer());
  109
+		MapMessageConverter inConverter = new MapMessageConverter();
  110
+		MessageConvertingTcpMessageMapper inMapper = new MessageConvertingTcpMessageMapper(inConverter);
  111
+		inboundConnection.setMapper(inMapper);
  112
+
  113
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
  114
+		Socket outSocket = mock(Socket.class);
  115
+		TcpNetConnection outboundConnection = new TcpNetConnection(outSocket, true, false, nullPublisher, null);
  116
+		when(outSocket.getOutputStream()).thenReturn(baos);
  117
+
  118
+		MapMessageConverter outConverter = new MapMessageConverter();
  119
+		outConverter.setHeaderNames("bar");
  120
+		MessageConvertingTcpMessageMapper outMapper = new MessageConvertingTcpMessageMapper(outConverter);
  121
+		outboundConnection.setMapper(outMapper);
  122
+		outboundConnection.setSerializer(new MapJsonSerializer());
  123
+
  124
+		Message<String> message = MessageBuilder.withPayload("foo")
  125
+				.setHeader("bar", "baz")
  126
+				.build();
  127
+		outboundConnection.send(message);
  128
+		PipedOutputStream out = new PipedOutputStream(pipe);
  129
+		out.write(baos.toByteArray());
  130
+		out.close();
  131
+
  132
+		final AtomicReference<Message<?>> inboundMessage = new AtomicReference<Message<?>>();
  133
+		TcpListener listener = new TcpListener() {
  134
+
  135
+			public boolean onMessage(Message<?> message) {
  136
+				inboundMessage.set(message);
  137
+				return false;
  138
+			}
  139
+		};
  140
+		inboundConnection.registerListener(listener);
  141
+		inboundConnection.run();
  142
+		assertNotNull(inboundMessage.get());
  143
+		assertEquals("foo", inboundMessage.get().getPayload());
  144
+		assertEquals("baz", inboundMessage.get().getHeaders().get("bar"));
  145
+	}
85 146
 }
81  ...ntegration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java
... ...
@@ -1,5 +1,5 @@
1 1
 /*
2  
- * Copyright 2002-2012 the original author or authors.
  2
+ * Copyright 2002-2013 the original author or authors.
3 3
  *
4 4
  * Licensed under the Apache License, Version 2.0 (the "License");
5 5
  * you may not use this file except in compliance with the License.
@@ -17,12 +17,15 @@
17 17
 package org.springframework.integration.ip.tcp.connection;
18 18
 
19 19
 import static org.junit.Assert.assertEquals;
  20
+import static org.junit.Assert.assertNotNull;
20 21
 import static org.junit.Assert.assertTrue;
21 22
 import static org.junit.Assert.fail;
  23
+import static org.mockito.Matchers.any;
22 24
 import static org.mockito.Mockito.doAnswer;
23 25
 import static org.mockito.Mockito.mock;
24 26
 import static org.mockito.Mockito.when;
25 27
 
  28
+import java.io.ByteArrayOutputStream;
26 29
 import java.io.IOException;
27 30
 import java.io.InputStream;
28 31
 import java.lang.reflect.Field;
@@ -46,18 +49,23 @@
46 49
 import java.util.concurrent.Executors;
47 50
 import java.util.concurrent.Future;
48 51
 import java.util.concurrent.TimeUnit;
49  
-
  52
+import java.util.concurrent.atomic.AtomicReference;
50 53
 import javax.net.ServerSocketFactory;
51 54
 
52 55
 import org.junit.Test;
53 56
 import org.mockito.Mockito;
54 57
 import org.mockito.invocation.InvocationOnMock;
55 58
 import org.mockito.stubbing.Answer;
  59
+
56 60
 import org.springframework.beans.DirectFieldAccessor;
  61
+import org.springframework.context.ApplicationEvent;
  62
+import org.springframework.context.ApplicationEventPublisher;
57 63
 import org.springframework.integration.Message;
58 64
 import org.springframework.integration.ip.tcp.connection.TcpNioConnection.ChannelInputStream;
59 65
 import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
  66
+import org.springframework.integration.ip.tcp.serializer.MapJsonSerializer;
60 67
 import org.springframework.integration.support.MessageBuilder;
  68
+import org.springframework.integration.support.converter.MapMessageConverter;
61 69
 import org.springframework.integration.test.util.SocketUtils;
62 70
 import org.springframework.integration.test.util.TestUtils;
63 71
 import org.springframework.util.ReflectionUtils;
@@ -72,6 +80,11 @@
72 80
  */
73 81
 public class TcpNioConnectionTests {
74 82