diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/converter/MapMessageConverter.java b/spring-integration-core/src/main/java/org/springframework/integration/support/converter/MapMessageConverter.java
new file mode 100644
index 00000000000..50cf28cd9ee
--- /dev/null
+++ b/spring-integration-core/src/main/java/org/springframework/integration/support/converter/MapMessageConverter.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2002-2013 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.support.converter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.springframework.integration.Message;
+import org.springframework.integration.support.MessageBuilder;
+import org.springframework.util.Assert;
+
+/**
+ * Converts to/from a Map with 2 keys ('headers' and 'payload').
+ * @author Gary Russell
+ * @since 3.0
+ *
+ */
+public class MapMessageConverter implements MessageConverter {
+
+ private volatile String[] headerNames;
+
+ private volatile boolean filterHeadersInToMessage;
+
+ /**
+ * Headers to be converted in {@link #fromMessage(Message)}.
+ * {@link #toMessage(Object)} will populate all headers found in
+ * the map, unless {@link #filterHeadersInToMessage} is true.
+ * @param headerNames
+ */
+ public void setHeaderNames(String... headerNames) {
+ this.headerNames = headerNames;
+ }
+
+ /**
+ * By default all headers on Map passed to {@link #toMessage(Object)}
+ * will be mapped. Set this property
+ * to 'true' if you wish to limit the inbound headers to those in
+ * the #headerNames.
+ * @param filterHeadersInToMessage
+ */
+ public void setFilterHeadersInToMessage(boolean filterHeadersInToMessage) {
+ this.filterHeadersInToMessage = filterHeadersInToMessage;
+ }
+
+ public
Message
toMessage(Object object) {
+ Assert.isInstanceOf(Map.class, object, "This converter expects a Map");
+ @SuppressWarnings("unchecked")
+ Map map = (Map) object;
+ Object payload = map.get("payload");
+ Assert.notNull(payload, "'payload' entry cannot be null");
+ MessageBuilder> messageBuilder = MessageBuilder.withPayload(payload);
+ @SuppressWarnings("unchecked")
+ Map headers = (Map) map.get("headers");
+ if (headers != null) {
+ if (this.filterHeadersInToMessage) {
+ headers.keySet().retainAll(Arrays.asList(this.headerNames));
+ }
+ messageBuilder.copyHeaders(headers);
+ /*for (Entry entry : headers.entrySet()) {
+ if (this.filterHeadersInToMessage ? this.headerNames.contains(entry.getKey()) : true) {
+ messageBuilder.setHeader(entry.getKey(), entry.getValue());
+ }
+ }*/
+ }
+ @SuppressWarnings("unchecked")
+ Message convertedMessage = (Message
) messageBuilder.build();
+ return convertedMessage;
+ }
+
+ public
Object fromMessage(Message
message) {
+ Map map = new HashMap();
+ map.put("payload", message.getPayload());
+ Map headers = new HashMap();
+ for (String headerName : headerNames) {
+ Object header = message.getHeaders().get(headerName);
+ if (header != null) {
+ headers.put(headerName, header);
+ }
+ }
+ map.put("headers", headers);
+ return map;
+ }
+
+}
diff --git a/spring-integration-core/src/test/java/org/springframework/integration/support/converter/MapMessageConverterTests.java b/spring-integration-core/src/test/java/org/springframework/integration/support/converter/MapMessageConverterTests.java
new file mode 100644
index 00000000000..3243057352d
--- /dev/null
+++ b/spring-integration-core/src/test/java/org/springframework/integration/support/converter/MapMessageConverterTests.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2002-2013 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.support.converter;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import java.util.Map;
+
+import org.junit.Test;
+import org.springframework.integration.Message;
+import org.springframework.integration.support.MessageBuilder;
+
+/**
+ * @author Gary Russell
+ * @author Artem Bilan
+ * @since 3.0
+ *
+ */
+public class MapMessageConverterTests {
+
+ @Test
+ public void testFromMessageToMessage() throws Exception {
+ Message message = MessageBuilder.withPayload("foo")
+ .setHeader("bar", "baz")
+ .setHeader("baz", "qux")
+ .build();
+ MapMessageConverter converter = new MapMessageConverter();
+ converter.setHeaderNames("bar");
+ @SuppressWarnings("unchecked")
+ Map map = (Map) converter.fromMessage(message);
+ @SuppressWarnings("unchecked")
+ Map headers = (Map) map.get("headers");
+
+ assertNotNull(headers);
+ assertNotNull(map.get("payload"));
+ assertEquals("foo", map.get("payload"));
+ assertNotNull(headers.get("bar"));
+ assertEquals("baz", headers.get("bar"));
+ assertNull(headers.get("baz"));
+
+ headers.put("baz", "qux");
+ message = converter.toMessage(map);
+ assertEquals("foo", message.getPayload());
+ assertEquals("baz", message.getHeaders().get("bar"));
+ assertEquals("qux", message.getHeaders().get("baz"));
+
+ converter.setFilterHeadersInToMessage(true);
+
+ message = converter.toMessage(map);
+ assertEquals("foo", message.getPayload());
+ assertEquals("baz", message.getHeaders().get("bar"));
+ assertNull(message.getHeaders().get("baz"));
+ }
+
+ @Test
+ public void testInvalid() throws Exception {
+ Message message = MessageBuilder.withPayload("foo")
+ .setHeader("bar", "baz")
+ .setHeader("baz", "qux")
+ .build();
+ MapMessageConverter converter = new MapMessageConverter();
+ converter.setHeaderNames("bar");
+ @SuppressWarnings("unchecked")
+ Map map = (Map) converter.fromMessage(message);
+
+ map.remove("payload");
+
+ try {
+ converter.toMessage(map);
+ fail("Expected exception");
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("'payload' entry cannot be null", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testNoHeaders() throws Exception {
+ Message message = MessageBuilder.withPayload("foo")
+ .build();
+ MapMessageConverter converter = new MapMessageConverter();
+ converter.setHeaderNames("bar");
+ @SuppressWarnings("unchecked")
+ Map map = (Map) converter.fromMessage(message);
+ @SuppressWarnings("unchecked")
+ Map headers = (Map) map.get("headers");
+
+ assertNotNull(headers);
+ assertEquals(0, headers.size());
+ map.remove("headers");
+ message = converter.toMessage(map);
+ assertEquals("foo", message.getPayload());
+ }
+
+ @Test
+ public void testNotIncludedIfNull() throws Exception {
+ Message message = MessageBuilder.withPayload("foo")
+ .setHeader("bar", null)
+ .build();
+ MapMessageConverter converter = new MapMessageConverter();
+ converter.setHeaderNames("bar");
+ @SuppressWarnings("unchecked")
+ Map map = (Map) converter.fromMessage(message);
+ @SuppressWarnings("unchecked")
+ Map headers = (Map) map.get("headers");
+
+ assertNotNull(headers);
+ assertNotNull(map.get("payload"));
+ assertEquals("foo", map.get("payload"));
+ assertFalse(headers.keySet().contains("bar"));
+ assertEquals(0, headers.size());
+ }
+}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/MessageConvertingTcpMessageMapper.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/MessageConvertingTcpMessageMapper.java
new file mode 100644
index 00000000000..50fd00380a9
--- /dev/null
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/MessageConvertingTcpMessageMapper.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2002-2013 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.connection;
+
+import org.springframework.integration.Message;
+import org.springframework.integration.support.MessageBuilder;
+import org.springframework.integration.support.converter.MessageConverter;
+import org.springframework.util.Assert;
+
+/**
+ * @author Gary Russell
+ * @since 3.0
+ *
+ */
+public class MessageConvertingTcpMessageMapper extends TcpMessageMapper {
+
+ private final MessageConverter messageConverter;
+
+ public MessageConvertingTcpMessageMapper(MessageConverter messageConverter) {
+ Assert.notNull(messageConverter, "'messasgeConverter' must not be null");
+ this.messageConverter = messageConverter;
+ }
+
+ @Override
+ public Message toMessage(TcpConnection connection) throws Exception {
+ Object data = connection.getPayload();
+ if (data != null) {
+ Message message = this.messageConverter.toMessage(data);
+ MessageBuilder messageBuilder = MessageBuilder.fromMessage(message);
+ this.addStandardHeaders(connection, messageBuilder);
+ this.addCustomHeaders(connection, messageBuilder);
+ return messageBuilder.build();
+ }
+ else {
+ if (logger.isWarnEnabled()) {
+ logger.warn("Null payload from connection " + connection.getConnectionId());
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public Object fromMessage(Message> message) throws Exception {
+ return this.messageConverter.fromMessage(message);
+ }
+
+}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapper.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapper.java
index a0b9eb43747..083f158c2f4 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapper.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapper.java
@@ -18,6 +18,8 @@
import java.io.UnsupportedEncodingException;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.springframework.integration.Message;
import org.springframework.integration.MessageHandlingException;
import org.springframework.integration.ip.IpHeaders;
@@ -43,6 +45,8 @@ public class TcpMessageMapper implements
InboundMessageMapper,
OutboundMessageMapper {
+ protected final Log logger = LogFactory.getLog(this.getClass());
+
private volatile String charset = "UTF-8";
private volatile boolean stringToBytes = true;
@@ -54,26 +58,39 @@ public Message toMessage(TcpConnection connection) throws Exception {
Object payload = connection.getPayload();
if (payload != null) {
MessageBuilder messageBuilder = MessageBuilder.withPayload(payload);
- String connectionId = connection.getConnectionId();
- messageBuilder
- .setHeader(IpHeaders.HOSTNAME, connection.getHostName())
- .setHeader(IpHeaders.IP_ADDRESS, connection.getHostAddress())
- .setHeader(IpHeaders.REMOTE_PORT, connection.getPort())
- .setHeader(IpHeaders.CONNECTION_ID, connectionId);
- if (this.applySequence) {
- messageBuilder
- .setCorrelationId(connectionId)
- .setSequenceNumber((int) connection.incrementAndGetConnectionSequence());
- }
- Map customHeaders = this.supplyCustomHeaders(connection);
- if (customHeaders != null) {
- messageBuilder.copyHeadersIfAbsent(customHeaders);
- }
+ this.addStandardHeaders(connection, messageBuilder);
+ this.addCustomHeaders(connection, messageBuilder);
message = messageBuilder.build();
}
+ else {
+ if (logger.isWarnEnabled()) {
+ logger.warn("Null payload from connection " + connection.getConnectionId());
+ }
+ }
return message;
}
+ protected final void addStandardHeaders(TcpConnection connection, MessageBuilder> messageBuilder) {
+ String connectionId = connection.getConnectionId();
+ messageBuilder
+ .setHeader(IpHeaders.HOSTNAME, connection.getHostName())
+ .setHeader(IpHeaders.IP_ADDRESS, connection.getHostAddress())
+ .setHeader(IpHeaders.REMOTE_PORT, connection.getPort())
+ .setHeader(IpHeaders.CONNECTION_ID, connectionId);
+ if (this.applySequence) {
+ messageBuilder
+ .setCorrelationId(connectionId)
+ .setSequenceNumber((int) connection.incrementAndGetConnectionSequence());
+ }
+ }
+
+ protected final void addCustomHeaders(TcpConnection connection, MessageBuilder> messageBuilder) {
+ Map customHeaders = this.supplyCustomHeaders(connection);
+ if (customHeaders != null) {
+ messageBuilder.copyHeadersIfAbsent(customHeaders);
+ }
+ }
+
/**
* Override to provide additional headers. The standard headers cannot be overridden
* and any such headers will be ignored if provided in the result.
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/MapJsonSerializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/MapJsonSerializer.java
new file mode 100644
index 00000000000..aaa06a7eeb8
--- /dev/null
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/MapJsonSerializer.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2002-2013 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.serializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.Map;
+
+import org.springframework.core.serializer.Deserializer;
+import org.springframework.core.serializer.Serializer;
+import org.springframework.integration.support.json.JacksonJsonObjectMapperProvider;
+import org.springframework.integration.support.json.JsonObjectMapper;
+import org.springframework.util.Assert;
+
+/**
+ * Serializes a {@link Map} as JSON. Deserializes JSON to
+ * a {@link Map}. The default {@link JacksonJsonObjectMapperProvider#newInstance()} can be
+ * overridden using {@link #setJsonObjectMapper(JsonObjectMapper)}.
+ *
+ * The JSON deserializer can't delimit multiple JSON
+ * objects. Therefore another (de)serializer is used to
+ * apply structure to the stream. By default, this is a
+ * simple {@link ByteArrayLfSerializer}, which inserts/expects
+ * LF (0x0a) between messages.
+ *
+ * @author Gary Russell
+ * @author Artem Bilan
+ * @since 3.0
+ *
+ */
+public class MapJsonSerializer implements Serializer>, Deserializer> {
+
+ private volatile JsonObjectMapper> jsonObjectMapper = JacksonJsonObjectMapperProvider.newInstance();
+
+ private volatile Deserializer packetDeserializer = new ByteArrayLfSerializer();
+
+ private volatile Serializer packetSerializer = new ByteArrayLfSerializer();
+
+ /**
+ * An {@link JsonObjectMapper} to be used for the conversion to/from
+ * JSON. Use this if you wish to set additional {@link JsonObjectMapper} implementation features.
+ * @param jsonObjectMapper the jsonObjectMapper.
+ */
+ public void setJsonObjectMapper(JsonObjectMapper> jsonObjectMapper) {
+ Assert.notNull(jsonObjectMapper, "'jsonObjectMapper' cannot be null");
+ this.jsonObjectMapper = jsonObjectMapper;
+ }
+
+ /**
+ * A {@link Deserializer} that will construct the full JSON content from
+ * the stream which is then passed to the JsonObjectMapper. Default is
+ * {@link ByteArrayLfSerializer}.
+ * @param packetDeserializer the packetDeserializer
+ */
+ public void setPacketDeserializer(Deserializer packetDeserializer) {
+ Assert.notNull(packetDeserializer, "'packetDeserializer' cannot be null");
+ this.packetDeserializer = packetDeserializer;
+ }
+
+ /**
+ * A {@link Serializer} that will delimit the full JSON content in
+ * the stream. Default is
+ * {@link ByteArrayLfSerializer}.
+ * @param packetSerializer the packetSerializer
+ */
+ public void setPacketSerializer(Serializer packetSerializer) {
+ Assert.notNull(packetSerializer, "'packetSerializer' cannot be null");
+ this.packetSerializer = packetSerializer;
+ }
+
+ public Map, ?> deserialize(InputStream inputStream) throws IOException {
+ byte[] bytes = this.packetDeserializer.deserialize(inputStream);
+ try {
+ return this.jsonObjectMapper.fromJson(new InputStreamReader(new ByteArrayInputStream(bytes)), Map.class);
+ }
+ catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public void serialize(Map, ?> object, OutputStream outputStream) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ this.jsonObjectMapper.toJson(object, new OutputStreamWriter(baos));
+ }
+ catch (Exception e) {
+ throw new IOException(e);
+ }
+ this.packetSerializer.serialize(baos.toByteArray(), outputStream);
+ outputStream.flush();
+ }
+
+}
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapperTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapperTests.java
index ed88488195d..8bd39f705c3 100644
--- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapperTests.java
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapperTests.java
@@ -20,6 +20,8 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.net.Socket;
import java.util.Collections;
import java.util.Map;
@@ -27,9 +29,13 @@
import javax.net.SocketFactory;
import org.junit.Test;
+import org.springframework.core.serializer.DefaultDeserializer;
+import org.springframework.core.serializer.DefaultSerializer;
import org.springframework.integration.Message;
import org.springframework.integration.ip.IpHeaders;
+import org.springframework.integration.ip.tcp.serializer.MapJsonSerializer;
import org.springframework.integration.support.MessageBuilder;
+import org.springframework.integration.support.converter.MapMessageConverter;
/**
* @author Gary Russell
@@ -207,4 +213,72 @@ public void testFromMessage() throws Exception {
}
-}
\ No newline at end of file
+ @Test
+ public void testMapMessageConvertingOutboundJson() throws Exception {
+ Message message = MessageBuilder.withPayload("foo")
+ .setHeader("bar", "baz")
+ .build();
+ MapMessageConverter converter = new MapMessageConverter();
+ converter.setHeaderNames("bar");
+ MessageConvertingTcpMessageMapper mapper = new MessageConvertingTcpMessageMapper(converter);
+ Map, ?> map = (Map, ?>) mapper.fromMessage(message);
+ MapJsonSerializer serializer = new MapJsonSerializer();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ serializer.serialize(map, baos);
+ assertEquals("{\"headers\":{\"bar\":\"baz\"},\"payload\":\"foo\"}\n", new String(baos.toByteArray(), "UTF-8"));
+ }
+
+ @Test
+ public void testMapMessageConvertingInboundJson() throws Exception {
+ String json = "{\"headers\":{\"bar\":\"baz\"},\"payload\":\"foo\"}\n";
+ MapMessageConverter converter = new MapMessageConverter();
+ MessageConvertingTcpMessageMapper mapper = new MessageConvertingTcpMessageMapper(converter);
+ MapJsonSerializer deserializer = new MapJsonSerializer();
+ Map, ?> map = deserializer.deserialize(new ByteArrayInputStream(json.getBytes("UTF-8")));
+
+ TcpConnection connection = mock(TcpConnection.class);
+ when(connection.getPayload()).thenReturn(map);
+ when(connection.getHostName()).thenReturn("someHost");
+ when(connection.getHostAddress()).thenReturn("1.1.1.1");
+ when(connection.getPort()).thenReturn(1234);
+ when(connection.getConnectionId()).thenReturn("someId");
+ Message> message = mapper.toMessage(connection);
+ assertEquals("foo", message.getPayload());
+ assertEquals("baz", message.getHeaders().get("bar"));
+ assertEquals("someHost", message.getHeaders().get(IpHeaders.HOSTNAME));
+ assertEquals("1.1.1.1", message.getHeaders().get(IpHeaders.IP_ADDRESS));
+ assertEquals(1234, message.getHeaders().get(IpHeaders.REMOTE_PORT));
+ assertEquals("someId", message.getHeaders().get(IpHeaders.CONNECTION_ID));
+ }
+
+ @Test
+ public void testMapMessageConvertingBothWaysJava() throws Exception {
+ Message outMessage = MessageBuilder.withPayload("foo")
+ .setHeader("bar", "baz")
+ .build();
+ MapMessageConverter converter = new MapMessageConverter();
+ converter.setHeaderNames("bar");
+ MessageConvertingTcpMessageMapper mapper = new MessageConvertingTcpMessageMapper(converter);
+ Map, ?> map = (Map, ?>) mapper.fromMessage(outMessage);
+ DefaultSerializer serializer = new DefaultSerializer();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ serializer.serialize(map, baos);
+
+ DefaultDeserializer deserializer = new DefaultDeserializer();
+ map = (Map, ?>) deserializer.deserialize(new ByteArrayInputStream(baos.toByteArray()));
+ TcpConnection connection = mock(TcpConnection.class);
+ when(connection.getPayload()).thenReturn(map);
+ when(connection.getHostName()).thenReturn("someHost");
+ when(connection.getHostAddress()).thenReturn("1.1.1.1");
+ when(connection.getPort()).thenReturn(1234);
+ when(connection.getConnectionId()).thenReturn("someId");
+ Message> message = mapper.toMessage(connection);
+ assertEquals("foo", message.getPayload());
+ assertEquals("baz", message.getHeaders().get("bar"));
+ assertEquals("someHost", message.getHeaders().get(IpHeaders.HOSTNAME));
+ assertEquals("1.1.1.1", message.getHeaders().get(IpHeaders.IP_ADDRESS));
+ assertEquals(1234, message.getHeaders().get(IpHeaders.REMOTE_PORT));
+ assertEquals("someId", message.getHeaders().get(IpHeaders.CONNECTION_ID));
+ }
+
+}
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java
index e768b2ed980..d786b7a0a56 100644
--- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java
@@ -21,19 +21,30 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.ByteArrayOutputStream;
import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
+
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+
import org.springframework.beans.DirectFieldAccessor;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.integration.Message;
import org.springframework.integration.ip.tcp.connection.TcpNioConnection.ChannelInputStream;
import org.springframework.integration.ip.tcp.serializer.ByteArrayStxEtxSerializer;
+import org.springframework.integration.ip.tcp.serializer.MapJsonSerializer;
+import org.springframework.integration.support.MessageBuilder;
+import org.springframework.integration.support.converter.MapMessageConverter;
import org.springframework.integration.test.util.TestUtils;
/**
@@ -43,13 +54,18 @@
*/
public class TcpNetConnectionTests {
+ private final ApplicationEventPublisher nullPublisher = new ApplicationEventPublisher() {
+ public void publishEvent(ApplicationEvent event) {
+ }
+ };
+
@Test
public void testErrorLog() throws Exception {
Socket socket = mock(Socket.class);
InputStream stream = mock(InputStream.class);
when(socket.getInputStream()).thenReturn(stream);
when(stream.read()).thenReturn((int) 'x');
- TcpNetConnection connection = new TcpNetConnection(socket, true, false, null, null);
+ TcpNetConnection connection = new TcpNetConnection(socket, true, false, nullPublisher, null);
connection.setDeserializer(new ByteArrayStxEtxSerializer());
final AtomicReference log = new AtomicReference();
Log logger = mock(Log.class);
@@ -76,10 +92,55 @@ public void testBinary() throws Exception {
SocketChannel socketChannel = mock(SocketChannel.class);
Socket socket = mock(Socket.class);
when(socketChannel.socket()).thenReturn(socket);
- TcpNioConnection connection = new TcpNioConnection(socketChannel, true, false, null, null);
+ TcpNioConnection connection = new TcpNioConnection(socketChannel, true, false, nullPublisher, null);
ChannelInputStream inputStream = TestUtils.getPropertyValue(connection, "channelInputStream", ChannelInputStream.class);
inputStream.write(new byte[] {(byte) 0x80}, 1);
assertEquals(0x80, inputStream.read());
}
+ @Test
+ public void transferHeaders() throws Exception {
+ Socket inSocket = mock(Socket.class);
+ PipedInputStream pipe = new PipedInputStream();
+ when(inSocket.getInputStream()).thenReturn(pipe);
+
+ TcpConnectionSupport inboundConnection = new TcpNetConnection(inSocket, true, false, nullPublisher, null);
+ inboundConnection.setDeserializer(new MapJsonSerializer());
+ MapMessageConverter inConverter = new MapMessageConverter();
+ MessageConvertingTcpMessageMapper inMapper = new MessageConvertingTcpMessageMapper(inConverter);
+ inboundConnection.setMapper(inMapper);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Socket outSocket = mock(Socket.class);
+ TcpNetConnection outboundConnection = new TcpNetConnection(outSocket, true, false, nullPublisher, null);
+ when(outSocket.getOutputStream()).thenReturn(baos);
+
+ MapMessageConverter outConverter = new MapMessageConverter();
+ outConverter.setHeaderNames("bar");
+ MessageConvertingTcpMessageMapper outMapper = new MessageConvertingTcpMessageMapper(outConverter);
+ outboundConnection.setMapper(outMapper);
+ outboundConnection.setSerializer(new MapJsonSerializer());
+
+ Message message = MessageBuilder.withPayload("foo")
+ .setHeader("bar", "baz")
+ .build();
+ outboundConnection.send(message);
+ PipedOutputStream out = new PipedOutputStream(pipe);
+ out.write(baos.toByteArray());
+ out.close();
+
+ final AtomicReference> inboundMessage = new AtomicReference>();
+ TcpListener listener = new TcpListener() {
+
+ public boolean onMessage(Message> message) {
+ inboundMessage.set(message);
+ return false;
+ }
+ };
+ inboundConnection.registerListener(listener);
+ inboundConnection.run();
+ assertNotNull(inboundMessage.get());
+ assertEquals("foo", inboundMessage.get().getPayload());
+ assertEquals("baz", inboundMessage.get().getHeaders().get("bar"));
+ }
}
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java
index 9da0c95fa7c..0d8457eeeff 100644
--- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2012 the original author or authors.
+ * Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,12 +17,15 @@
package org.springframework.integration.ip.tcp.connection;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
@@ -46,18 +49,23 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicReference;
import javax.net.ServerSocketFactory;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+
import org.springframework.beans.DirectFieldAccessor;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.Message;
import org.springframework.integration.ip.tcp.connection.TcpNioConnection.ChannelInputStream;
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
+import org.springframework.integration.ip.tcp.serializer.MapJsonSerializer;
import org.springframework.integration.support.MessageBuilder;
+import org.springframework.integration.support.converter.MapMessageConverter;
import org.springframework.integration.test.util.SocketUtils;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.util.ReflectionUtils;
@@ -72,6 +80,11 @@
*/
public class TcpNioConnectionTests {
+ private final ApplicationEventPublisher nullPublisher = new ApplicationEventPublisher() {
+ public void publishEvent(ApplicationEvent event) {
+ }
+ };
+
@Test
public void testWriteTimeout() throws Exception {
final int port = SocketUtils.findAvailableServerSocket();
@@ -442,6 +455,70 @@ public void run() {
assertEquals("foo\u0000", new String(out));
}
+ @Test
+ public void transferHeaders() throws Exception {
+ Socket inSocket = mock(Socket.class);
+ SocketChannel inChannel = mock(SocketChannel.class);
+ when(inChannel.socket()).thenReturn(inSocket);
+
+ TcpNioConnection inboundConnection = new TcpNioConnection(inChannel, true, false, nullPublisher, null);
+ inboundConnection.setDeserializer(new MapJsonSerializer());
+ MapMessageConverter inConverter = new MapMessageConverter();
+ MessageConvertingTcpMessageMapper inMapper = new MessageConvertingTcpMessageMapper(inConverter);
+ inboundConnection.setMapper(inMapper);
+ final ByteArrayOutputStream written = new ByteArrayOutputStream();
+ doAnswer(new Answer() {
+ public Integer answer(InvocationOnMock invocation) throws Throwable {
+ ByteBuffer buff = (ByteBuffer) invocation.getArguments()[0];
+ byte[] bytes = written.toByteArray();
+ buff.put(bytes);
+ return bytes.length;
+ }
+ }).when(inChannel).read(any(ByteBuffer.class));
+
+ Socket outSocket = mock(Socket.class);
+ SocketChannel outChannel = mock(SocketChannel.class);
+ when(outChannel.socket()).thenReturn(outSocket);
+ TcpNioConnection outboundConnection = new TcpNioConnection(outChannel, true, false, nullPublisher, null);
+ doAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ ByteBuffer buff = (ByteBuffer) invocation.getArguments()[0];
+ byte[] bytes = new byte[buff.limit()];
+ buff.get(bytes);
+ written.write(bytes);
+ return null;
+ }
+ }).when(outChannel).write(any(ByteBuffer.class));
+
+ MapMessageConverter outConverter = new MapMessageConverter();
+ outConverter.setHeaderNames("bar");
+ MessageConvertingTcpMessageMapper outMapper = new MessageConvertingTcpMessageMapper(outConverter);
+ outboundConnection.setMapper(outMapper);
+ outboundConnection.setSerializer(new MapJsonSerializer());
+
+ Message message = MessageBuilder.withPayload("foo")
+ .setHeader("bar", "baz")
+ .build();
+ outboundConnection.send(message);
+
+ final AtomicReference> inboundMessage = new AtomicReference>();
+ final CountDownLatch latch = new CountDownLatch(1);
+ TcpListener listener = new TcpListener() {
+
+ public boolean onMessage(Message> message) {
+ inboundMessage.set(message);
+ latch.countDown();
+ return false;
+ }
+ };
+ inboundConnection.registerListener(listener);
+ inboundConnection.readPacket();
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertNotNull(inboundMessage.get());
+ assertEquals("foo", inboundMessage.get().getPayload());
+ assertEquals("baz", inboundMessage.get().getHeaders().get("bar"));
+ }
+
private void readFully(InputStream is, byte[] buff) throws IOException {
for (int i = 0; i < buff.length; i++) {
buff[i] = (byte) is.read();
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/MapJsonSerializerTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/MapJsonSerializerTests.java
new file mode 100644
index 00000000000..e266b5be263
--- /dev/null
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/MapJsonSerializerTests.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2002-2013 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.serializer;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.util.Map;
+
+import org.junit.Test;
+
+/**
+ * @author Gary Russell
+ * @since 3.0
+ *
+ */
+public class MapJsonSerializerTests {
+
+ @Test
+ public void multi() throws Exception {
+ String json = "{\"headers\":{\"bar\":\"baz\"},\"payload\":\"foo\"}\n";
+ String twoJson = json + json;
+ MapJsonSerializer deserializer = new MapJsonSerializer();
+ ByteArrayInputStream bais = new ByteArrayInputStream(twoJson.getBytes("UTF-8"));
+ Map, ?> map = deserializer.deserialize(bais);
+ assertNotNull(map);
+ map = deserializer.deserialize(bais);
+ assertNotNull(map);
+ }
+}
diff --git a/src/reference/docbook/ip.xml b/src/reference/docbook/ip.xml
index 0555a205655..06717ad957e 100644
--- a/src/reference/docbook/ip.xml
+++ b/src/reference/docbook/ip.xml
@@ -159,9 +159,10 @@
This is the extent of message correlation performed when sharing connection
factories between inbound and outbound adapters. Such sharing allows for
- asynchronous two-way communication over TCP. Only payload information is
+ asynchronous two-way communication over TCP. By default, only payload information is
transferred using TCP; therefore any message correlation must be performed
- by downstream components such as aggregators or other endpoints.
+ by downstream components such as aggregators or other endpoints. Support for
+ transferring selected headers was introduced in version 3.0.
For more information refer to .
@@ -225,7 +226,17 @@
telnet as a client, for example.
- The ByteArrayStxEtxSerializer ,
+ The ByteArraySingleTerminatorSerializer ,
+ converts a byte array to a stream of bytes followed by a single termination
+ character (default 0x00).
+
+
+ The ByteArrayLfSerializer ,
+ converts a byte array to a stream of bytes followed by a single linefeed
+ character (0x0a).
+
+
+ The ByteArrayStxEtxSerializer ,
converts a byte array to a stream of bytes preceded by an STX (0x02) and
followed by an ETX (0x03).
@@ -276,6 +287,21 @@
and wish to increase the maximum message size, you must declare it as an explicit bean
with the property set and configure the connection factory to use that bean.
+
+ The MapJsonSerializer uses a Jackson
+ ObjectMapper to convert between a Map
+ and JSON. This can be used in conjunction with a MessageConvertingTcpMessageMapper
+ and a MapMessageConverter
+ to transfer selected headers and the payload in a JSON format.
+
+ The Jackson ObjectMapper cannot demarcate messages in the stream.
+ Therefore, the MapJsonSerializer needs to delegate to another
+ (de)serializer to handle message demarcation. By default, a
+ ByteArrayLfSerializer is used, resulting in messages with the
+ format <json><LF>
on the wire, but you can configure it to
+ use others instead.
+
+
The final standard serializer is
org.springframework.core.serializer.DefaultSerializer which can be
@@ -677,8 +703,12 @@
One goal of the IP Endpoints is to provide communication with systems other
than another Spring Integration application. For this reason, only
- message payloads are sent
- and received. No message correlation is provided by the framework,
+ message payloads are sent and received, by default. Since 3.0, headers can
+ be transferred, using JSON, Java serialization, or with custom
+ Serializer s and
+ Deserializer s; see
+ for more information.
+ No message correlation is provided by the framework,
except when using the gateways, or collaborating channel adapters on the
server side. In the paragraphs below we discuss the various
correlation techniques available to applications. In most cases, this
@@ -799,6 +829,98 @@
+
A Note About NIO
diff --git a/src/reference/docbook/whats-new.xml b/src/reference/docbook/whats-new.xml
index f3fee34d99a..39c32f73145 100644
--- a/src/reference/docbook/whats-new.xml
+++ b/src/reference/docbook/whats-new.xml
@@ -259,5 +259,18 @@
For more information see .
+