Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INT-1807 Add Mechanism For Headers with TCP #763

Merged
merged 1 commit into from Aug 16, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,102 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.support.converter;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.springframework.integration.Message;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert;

/**
* Converts to/from a Map with 2 keys ('headers' and 'payload').
* @author Gary Russell
* @since 3.0
*
*/
public class MapMessageConverter implements MessageConverter {

private volatile String[] headerNames;

private volatile boolean filterHeadersInToMessage;

/**
* Headers to be converted in {@link #fromMessage(Message)}.
* {@link #toMessage(Object)} will populate all headers found in
* the map, unless {@link #filterHeadersInToMessage} is true.
* @param headerNames
*/
public void setHeaderNames(String... headerNames) {
this.headerNames = headerNames;
}

/**
* By default all headers on Map passed to {@link #toMessage(Object)}
* will be mapped. Set this property
* to 'true' if you wish to limit the inbound headers to those in
* the #headerNames.
* @param filterHeadersInToMessage
*/
public void setFilterHeadersInToMessage(boolean filterHeadersInToMessage) {
this.filterHeadersInToMessage = filterHeadersInToMessage;
}

public <P> Message<P> toMessage(Object object) {
Assert.isInstanceOf(Map.class, object, "This converter expects a Map");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add generic type to the MessageConverter? Or it can take one type on toMessage, but produce another one from fromMessage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - we can take any payload type in fromMessage; the resulting payload type here depends on what the deserializer puts in the Map's 'payload' element. So we have to leave it unspecified.

@SuppressWarnings("unchecked")
Map<String, ?> map = (Map<String, ?>) object;
Object payload = map.get("payload");
Assert.notNull(payload, "'payload' entry cannot be null");
MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(payload);
@SuppressWarnings("unchecked")
Map<String, ?> headers = (Map<String, ?>) map.get("headers");
if (headers != null) {
if (this.filterHeadersInToMessage) {
headers.keySet().retainAll(Arrays.asList(this.headerNames));
}
messageBuilder.copyHeaders(headers);
/*for (Entry<String, ?> entry : headers.entrySet()) {
if (this.filterHeadersInToMessage ? this.headerNames.contains(entry.getKey()) : true) {
messageBuilder.setHeader(entry.getKey(), entry.getValue());
}
}*/
}
@SuppressWarnings("unchecked")
Message<P> convertedMessage = (Message<P>) messageBuilder.build();
return convertedMessage;
}

public <P> Object fromMessage(Message<P> message) {
Map<String,Object> map = new HashMap<String, Object>();
map.put("payload", message.getPayload());
Map<String, Object> headers = new HashMap<String, Object>();
for (String headerName : headerNames) {
Object header = message.getHeaders().get(headerName);
if (header != null) {
headers.put(headerName, header);
}
}
map.put("headers", headers);
return map;
}

}
@@ -0,0 +1,130 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.support.converter;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

import java.util.Map;

import org.junit.Test;
import org.springframework.integration.Message;
import org.springframework.integration.support.MessageBuilder;

/**
* @author Gary Russell
* @author Artem Bilan
* @since 3.0
*
*/
public class MapMessageConverterTests {

@Test
public void testFromMessageToMessage() throws Exception {
Message<String> message = MessageBuilder.withPayload("foo")
.setHeader("bar", "baz")
.setHeader("baz", "qux")
.build();
MapMessageConverter converter = new MapMessageConverter();
converter.setHeaderNames("bar");
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) converter.fromMessage(message);
@SuppressWarnings("unchecked")
Map<String, Object> headers = (Map<String, Object>) map.get("headers");

assertNotNull(headers);
assertNotNull(map.get("payload"));
assertEquals("foo", map.get("payload"));
assertNotNull(headers.get("bar"));
assertEquals("baz", headers.get("bar"));
assertNull(headers.get("baz"));

headers.put("baz", "qux");
message = converter.toMessage(map);
assertEquals("foo", message.getPayload());
assertEquals("baz", message.getHeaders().get("bar"));
assertEquals("qux", message.getHeaders().get("baz"));

converter.setFilterHeadersInToMessage(true);

message = converter.toMessage(map);
assertEquals("foo", message.getPayload());
assertEquals("baz", message.getHeaders().get("bar"));
assertNull(message.getHeaders().get("baz"));
}

@Test
public void testInvalid() throws Exception {
Message<String> message = MessageBuilder.withPayload("foo")
.setHeader("bar", "baz")
.setHeader("baz", "qux")
.build();
MapMessageConverter converter = new MapMessageConverter();
converter.setHeaderNames("bar");
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) converter.fromMessage(message);

map.remove("payload");

try {
converter.toMessage(map);
fail("Expected exception");
}
catch (IllegalArgumentException e) {
assertEquals("'payload' entry cannot be null", e.getMessage());
}
}

@Test
public void testNoHeaders() throws Exception {
Message<String> message = MessageBuilder.withPayload("foo")
.build();
MapMessageConverter converter = new MapMessageConverter();
converter.setHeaderNames("bar");
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) converter.fromMessage(message);
@SuppressWarnings("unchecked")
Map<String, Object> headers = (Map<String, Object>) map.get("headers");

assertNotNull(headers);
assertEquals(0, headers.size());
map.remove("headers");
message = converter.toMessage(map);
assertEquals("foo", message.getPayload());
}

@Test
public void testNotIncludedIfNull() throws Exception {
Message<String> message = MessageBuilder.withPayload("foo")
.setHeader("bar", null)
.build();
MapMessageConverter converter = new MapMessageConverter();
converter.setHeaderNames("bar");
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) converter.fromMessage(message);
@SuppressWarnings("unchecked")
Map<String, Object> headers = (Map<String, Object>) map.get("headers");

assertNotNull(headers);
assertNotNull(map.get("payload"));
assertEquals("foo", map.get("payload"));
assertFalse(headers.keySet().contains("bar"));
assertEquals(0, headers.size());
}
}
@@ -0,0 +1,60 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.ip.tcp.connection;

import org.springframework.integration.Message;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.converter.MessageConverter;
import org.springframework.util.Assert;

/**
* @author Gary Russell
* @since 3.0
*
*/
public class MessageConvertingTcpMessageMapper extends TcpMessageMapper {

private final MessageConverter messageConverter;

public MessageConvertingTcpMessageMapper(MessageConverter messageConverter) {
Assert.notNull(messageConverter, "'messasgeConverter' must not be null");
this.messageConverter = messageConverter;
}

@Override
public Message<Object> toMessage(TcpConnection connection) throws Exception {
Object data = connection.getPayload();
if (data != null) {
Message<Object> message = this.messageConverter.toMessage(data);
MessageBuilder<Object> messageBuilder = MessageBuilder.fromMessage(message);
this.addStandardHeaders(connection, messageBuilder);
this.addCustomHeaders(connection, messageBuilder);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some recent PR we've already discussed about opening messageBuilder instance to the end-developer API...
WDYT now? Is there some other strategy to not show messageBuilder as parameter in these methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I made super.addStandardHeaders() and super.addCustomHeaders() final so subclasses can't override them and get a handle to the framework message builder (I remembered our earlier conversation about not allowing subclasses to be able to make arbitrary changes to the message).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! Thanks, So, nevermind.

return messageBuilder.build();
}
else {
if (logger.isWarnEnabled()) {
logger.warn("Null payload from connection " + connection.getConnectionId());
}
return null;
}
}

@Override
public Object fromMessage(Message<?> message) throws Exception {
return this.messageConverter.fromMessage(message);
}

}
Expand Up @@ -18,6 +18,8 @@
import java.io.UnsupportedEncodingException;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.Message;
import org.springframework.integration.MessageHandlingException;
import org.springframework.integration.ip.IpHeaders;
Expand All @@ -43,6 +45,8 @@ public class TcpMessageMapper implements
InboundMessageMapper<TcpConnection>,
OutboundMessageMapper<Object> {

protected final Log logger = LogFactory.getLog(this.getClass());

private volatile String charset = "UTF-8";

private volatile boolean stringToBytes = true;
Expand All @@ -54,26 +58,39 @@ public Message<Object> toMessage(TcpConnection connection) throws Exception {
Object payload = connection.getPayload();
if (payload != null) {
MessageBuilder<Object> messageBuilder = MessageBuilder.withPayload(payload);
String connectionId = connection.getConnectionId();
messageBuilder
.setHeader(IpHeaders.HOSTNAME, connection.getHostName())
.setHeader(IpHeaders.IP_ADDRESS, connection.getHostAddress())
.setHeader(IpHeaders.REMOTE_PORT, connection.getPort())
.setHeader(IpHeaders.CONNECTION_ID, connectionId);
if (this.applySequence) {
messageBuilder
.setCorrelationId(connectionId)
.setSequenceNumber((int) connection.incrementAndGetConnectionSequence());
}
Map<String, ?> customHeaders = this.supplyCustomHeaders(connection);
if (customHeaders != null) {
messageBuilder.copyHeadersIfAbsent(customHeaders);
}
this.addStandardHeaders(connection, messageBuilder);
this.addCustomHeaders(connection, messageBuilder);
message = messageBuilder.build();
}
else {
if (logger.isWarnEnabled()) {
logger.warn("Null payload from connection " + connection.getConnectionId());
}
}
return message;
}

protected final void addStandardHeaders(TcpConnection connection, MessageBuilder<?> messageBuilder) {
String connectionId = connection.getConnectionId();
messageBuilder
.setHeader(IpHeaders.HOSTNAME, connection.getHostName())
.setHeader(IpHeaders.IP_ADDRESS, connection.getHostAddress())
.setHeader(IpHeaders.REMOTE_PORT, connection.getPort())
.setHeader(IpHeaders.CONNECTION_ID, connectionId);
if (this.applySequence) {
messageBuilder
.setCorrelationId(connectionId)
.setSequenceNumber((int) connection.incrementAndGetConnectionSequence());
}
}

protected final void addCustomHeaders(TcpConnection connection, MessageBuilder<?> messageBuilder) {
Map<String, ?> customHeaders = this.supplyCustomHeaders(connection);
if (customHeaders != null) {
messageBuilder.copyHeadersIfAbsent(customHeaders);
}
}

/**
* Override to provide additional headers. The standard headers cannot be overridden
* and any such headers will be ignored if provided in the result.
Expand Down