diff --git a/spring-integration-core/src/main/java/org/springframework/integration/message/MutableMessage.java b/spring-integration-core/src/main/java/org/springframework/integration/message/MutableMessage.java index 314b08ee4bc..a9ec6fcc9d8 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/message/MutableMessage.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/message/MutableMessage.java @@ -23,6 +23,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.GenericMessage; +import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; /** @@ -34,6 +35,7 @@ * a reference to the message and changes will be reflected there too. * * @author Gary Russell + * @author Artem Bilan * @since 4.0 * */ @@ -52,9 +54,10 @@ public MutableMessage(T payload) { } @SuppressWarnings("unchecked") - public MutableMessage(T payload, MessageHeaders headers) { - this.payload = payload; + public MutableMessage(T payload, Map headers) { + Assert.notNull(payload, "payload must not be null"); this.headers = new MessageHeaders(headers); + this.payload = payload; // Needs SPR-11468 to avoid DFA and header manipulation rawHeaders = (Map) new DirectFieldAccessor(this.headers) .getPropertyValue("headers"); @@ -64,6 +67,7 @@ public MutableMessage(T payload, MessageHeaders headers) { } } + @Override public MessageHeaders getHeaders() { return this.headers; @@ -75,6 +79,7 @@ public T getPayload() { } public void setPayload(T payload) { + Assert.notNull(payload, "'payload' must not be null"); this.payload = payload; } diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java index e1fa8a50808..9aec43fec9f 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java @@ -362,6 +362,7 @@ private static class MessageDocument { * when the application context is configured with auditing. The document is not * currently Auditable. */ + @SuppressWarnings("unused") @Id private String _id; diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java index 46cf9bed557..3a08501a1b3 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java @@ -29,10 +29,18 @@ import java.util.Properties; import java.util.UUID; +import org.springframework.beans.BeansException; import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.BeanClassLoaderAware; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.core.convert.converter.Converter; +import org.springframework.core.serializer.support.DeserializingConverter; +import org.springframework.core.serializer.support.SerializingConverter; +import org.springframework.data.annotation.Id; import org.springframework.data.annotation.Transient; +import org.springframework.data.convert.WritingConverter; import org.springframework.data.domain.Sort; import org.springframework.data.domain.Sort.Direction; import org.springframework.data.mapping.context.MappingContext; @@ -46,6 +54,8 @@ import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import org.springframework.integration.history.MessageHistory; +import org.springframework.integration.message.AdviceMessage; +import org.springframework.integration.message.MutableMessage; import org.springframework.integration.store.AbstractMessageGroupStore; import org.springframework.integration.store.MessageGroup; import org.springframework.integration.store.MessageGroupStore; @@ -54,6 +64,7 @@ import org.springframework.jmx.export.annotation.ManagedAttribute; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.ErrorMessage; import org.springframework.messaging.support.GenericMessage; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -76,7 +87,8 @@ * @author Artem Bilan * @since 2.1 */ -public class MongoDbMessageStore extends AbstractMessageGroupStore implements MessageStore, BeanClassLoaderAware { +public class MongoDbMessageStore extends AbstractMessageGroupStore + implements MessageStore, BeanClassLoaderAware, ApplicationContextAware, InitializingBean { private final static String DEFAULT_COLLECTION_NAME = "messages"; @@ -90,17 +102,19 @@ public class MongoDbMessageStore extends AbstractMessageGroupStore implements Me private final static String GROUP_UPDATE_TIMESTAMP_KEY = "_group_update_timestamp"; - private final static String PAYLOAD_TYPE_KEY = "_payloadType"; - private final static String CREATED_DATE = "_createdDate"; private final MongoTemplate template; + private final MessageReadingMongoConverter converter; + private final String collectionName; private volatile ClassLoader classLoader = ClassUtils.getDefaultClassLoader(); + private ApplicationContext applicationContext; + /** * Create a MongoDbMessageStore using the provided {@link MongoDbFactory}.and the default collection name. @@ -119,9 +133,8 @@ public MongoDbMessageStore(MongoDbFactory mongoDbFactory) { */ public MongoDbMessageStore(MongoDbFactory mongoDbFactory, String collectionName) { Assert.notNull(mongoDbFactory, "mongoDbFactory must not be null"); - MessageReadingMongoConverter converter = new MessageReadingMongoConverter(mongoDbFactory, new MongoMappingContext()); - converter.afterPropertiesSet(); - this.template = new MongoTemplate(mongoDbFactory, converter); + this.converter = new MessageReadingMongoConverter(mongoDbFactory, new MongoMappingContext()); + this.template = new MongoTemplate(mongoDbFactory, this.converter); this.collectionName = (StringUtils.hasText(collectionName)) ? collectionName : DEFAULT_COLLECTION_NAME; } @@ -132,6 +145,18 @@ public void setBeanClassLoader(ClassLoader classLoader) { this.classLoader = classLoader; } + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + @Override + public void afterPropertiesSet() throws Exception { + this.template.setApplicationContext(this.applicationContext); + this.converter.setApplicationContext(this.applicationContext); + this.converter.afterPropertiesSet(); + } + @Override public Message addMessage(Message message) { Assert.notNull(message, "'message' must not be null"); @@ -335,6 +360,10 @@ public void afterPropertiesSet() { customConverters.add(new DBObjectToUUIDConverter()); customConverters.add(new MessageHistoryToDBObjectConverter()); customConverters.add(new DBObjectToGenericMessageConverter()); + customConverters.add(new DBObjectToMutableMessageConverter()); + customConverters.add(new DBObjectToErrorMessageConverter()); + customConverters.add(new DBObjectToAdviceMessageConverter()); + customConverters.add(new ThrowableToBytesConverter()); this.setCustomConversions(new CustomConversions(customConverters)); super.afterPropertiesSet(); } @@ -355,24 +384,18 @@ public S read(Class clazz, DBObject source) { return super.read(clazz, source); } if (source != null) { - Map headers = this.normalizeHeaders((Map) source.get("headers")); - - Object payload = source.get("payload"); - Object payloadType = source.get(PAYLOAD_TYPE_KEY); - if (payloadType != null && payload instanceof DBObject) { - try { - Class payloadClass = ClassUtils.forName(payloadType.toString(), classLoader); - payload = this.read(payloadClass, (DBObject) payload); - } - catch (Exception e) { - throw new IllegalStateException("failed to load class: " + payloadType, e); - } + Message message = null; + Object messageType = source.get("_messageType"); + if (messageType == null) { + messageType = GenericMessage.class.getName(); + } + try { + message = (Message) this.read(ClassUtils.forName(messageType.toString(), classLoader), source); + } + catch (ClassNotFoundException e) { + throw new IllegalStateException("failed to load class: " + messageType, e); } - GenericMessage message = new GenericMessage(payload, headers); - Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); - // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders - innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); - innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); + Long groupTimestamp = (Long)source.get(GROUP_TIMESTAMP_KEY); Long lastModified = (Long)source.get(GROUP_UPDATE_TIMESTAMP_KEY); Integer lastReleasedSequenceNumber = (Integer)source.get(LAST_RELEASED_SEQUENCE_NUMBER); @@ -432,8 +455,34 @@ else if (source instanceof BasicDBList) { } return normalizedHeaders; } + + private Object extractPayload(DBObject source) { + Object payload = source.get("payload"); + if (payload instanceof DBObject) { + DBObject payloadObject = (DBObject) payload; + Object payloadType = payloadObject.get("_class"); + try { + Class payloadClass = ClassUtils.forName(payloadType.toString(), classLoader); + payload = this.read(payloadClass, payloadObject); + } + catch (Exception e) { + throw new IllegalStateException("failed to load class: " + payloadType, e); + } + } + return payload; + } + } + @SuppressWarnings("unchecked") + private static void enhanceHeaders(MessageHeaders messageHeaders, Map headers) { + Map innerMap = (Map) new DirectFieldAccessor(messageHeaders).getPropertyValue("headers"); + // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders + innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); + innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); + } + + private static class UuidToDBObjectConverter implements Converter { @Override public DBObject convert(UUID source) { @@ -474,53 +523,118 @@ public DBObject convert(MessageHistory source) { private class DBObjectToGenericMessageConverter implements Converter> { @Override - @SuppressWarnings("unchecked") + public GenericMessage convert(DBObject source) { - MessageReadingMongoConverter converter = (MessageReadingMongoConverter) MongoDbMessageStore.this.template - .getConverter(); - Map headers = converter.normalizeHeaders((Map) source.get("headers")); + @SuppressWarnings("unchecked") + Map headers = MongoDbMessageStore.this.converter.normalizeHeaders((Map) source.get("headers")); - Object payload = source.get("payload"); - Object payloadType = source.get(PAYLOAD_TYPE_KEY); - if (payloadType != null && payload instanceof DBObject) { + GenericMessage message = new GenericMessage(MongoDbMessageStore.this.converter.extractPayload(source), headers); + enhanceHeaders(message.getHeaders(), headers); + return message; + } + + } + + private class DBObjectToMutableMessageConverter implements Converter> { + + @Override + public MutableMessage convert(DBObject source) { + @SuppressWarnings("unchecked") + Map headers = MongoDbMessageStore.this.converter.normalizeHeaders((Map) source.get("headers")); + + return new MutableMessage(MongoDbMessageStore.this.converter.extractPayload(source), headers); + } + + } + + private class DBObjectToAdviceMessageConverter implements Converter { + + @Override + public AdviceMessage convert(DBObject source) { + @SuppressWarnings("unchecked") + Map headers = MongoDbMessageStore.this.converter.normalizeHeaders((Map) source.get("headers")); + + Message inputMessage = null; + + if (source.get("inputMessage") != null) { + DBObject inputMessageObject = (DBObject) source.get("inputMessage"); + Object inputMessageType = inputMessageObject.get("_class"); try { - Class payloadClass = ClassUtils.forName(payloadType.toString(), classLoader); - payload = converter.read(payloadClass, (DBObject) payload); + Class messageClass = ClassUtils.forName(inputMessageType.toString(), classLoader); + inputMessage = (Message) MongoDbMessageStore.this.converter.read(messageClass, inputMessageObject); } catch (Exception e) { - throw new IllegalStateException("failed to load class: " + payloadType, e); + throw new IllegalStateException("failed to load class: " + inputMessageType, e); } } - @SuppressWarnings("rawtypes") - GenericMessage message = new GenericMessage(payload, headers); - Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); - // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders - innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); - innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); + AdviceMessage message = new AdviceMessage(MongoDbMessageStore.this.converter.extractPayload(source), headers, inputMessage); + enhanceHeaders(message.getHeaders(), headers); + + return message; + } + + } + + private class DBObjectToErrorMessageConverter implements Converter { + + private final Converter deserializingConverter = new DeserializingConverter(); + + @Override + public ErrorMessage convert(DBObject source) { + @SuppressWarnings("unchecked") + Map headers = MongoDbMessageStore.this.converter.normalizeHeaders((Map) source.get("headers")); + + Object payload = this.deserializingConverter.convert((byte[]) source.get("payload")); + ErrorMessage message = new ErrorMessage((Throwable) payload, headers); + enhanceHeaders(message.getHeaders(), headers); return message; } } + @WritingConverter + private class ThrowableToBytesConverter implements Converter { + + private final Converter serializingConverter = new SerializingConverter(); + + @Override + public byte[] convert(Throwable source) { + return serializingConverter.convert(source); + } + + } + + /** * Wrapper class used for storing Messages in MongoDB along with their "group" metadata. */ private static final class MessageWrapper { + /* + * Needed as a persistence property to suppress 'Cannot determine IsNewStrategy' MappingException + * when the application context is configured with auditing. The document is not + * currently Auditable. + */ + @SuppressWarnings("unused") + @Id + private String _id; + private volatile Object _groupId; @Transient private final Message message; + @SuppressWarnings("unused") + private final String _messageType; + private final Object payload; @SuppressWarnings("unused") private final Map headers; - @SuppressWarnings("unused") - private final String _payloadType; + private final Message inputMessage; private volatile long _group_timestamp; @@ -533,9 +647,15 @@ private static final class MessageWrapper { public MessageWrapper(Message message) { Assert.notNull(message, "'message' must not be null"); this.message = message; + this._messageType = message.getClass().getName(); this.payload = message.getPayload(); this.headers = message.getHeaders(); - this._payloadType = this.payload.getClass().getName(); + if (message instanceof AdviceMessage) { + this.inputMessage = ((AdviceMessage) message).getInputMessage(); + } + else { + this.inputMessage = null; + } } public int get_LastReleasedSequenceNumber() { diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java index 179c25fc584..821714313bd 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java @@ -131,6 +131,7 @@ public void testPollMessages() throws Exception{ Message messageA = new GenericMessage("A"); Message messageB = new GenericMessage("B"); store.addMessageToGroup(1, messageA); + Thread.sleep(10); store.addMessageToGroup(1, messageB); assertEquals(2, store.messageGroupSize(1)); Message out = store.pollMessageFromGroup(1); diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageStoreTests.java index 0c722b9e6d8..9a56ef68883 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageStoreTests.java @@ -18,22 +18,29 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.io.Serializable; import java.util.Properties; import java.util.UUID; +import org.hamcrest.Matchers; import org.junit.Test; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.history.MessageHistory; +import org.springframework.integration.message.AdviceMessage; +import org.springframework.integration.message.MutableMessage; import org.springframework.integration.mongodb.rules.MongoDbAvailable; import org.springframework.integration.mongodb.rules.MongoDbAvailableTests; import org.springframework.integration.store.MessageStore; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.support.ErrorMessage; +import org.springframework.messaging.support.GenericMessage; import com.mongodb.Mongo; @@ -151,6 +158,108 @@ public void testInt3153SequenceDetails() throws Exception{ assertEquals(messageToStore, retrievedMessage); } + @Test + @MongoDbAvailable + public void testInt3076MessageAsPayload() throws Exception{ + MessageStore store = this.getMessageStore(); + Person p = new Person(); + p.setFname("John"); + p.setLname("Doe"); + Message messageToStore = new GenericMessage>(MessageBuilder.withPayload(p).build()); + store.addMessage(messageToStore); + Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); + assertNotNull(retrievedMessage); + assertTrue(retrievedMessage.getPayload() instanceof GenericMessage); + assertEquals(messageToStore.getPayload(), retrievedMessage.getPayload()); + assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); + assertEquals(((Message) messageToStore.getPayload()).getPayload(), p); + assertEquals(messageToStore, retrievedMessage); + } + + @Test + @MongoDbAvailable + public void testInt3076AdviceMessage() throws Exception{ + MessageStore store = this.getMessageStore(); + Person p = new Person(); + p.setFname("John"); + p.setLname("Doe"); + Message inputMessage = MessageBuilder.withPayload(p).build(); + Message messageToStore = new AdviceMessage("foo", inputMessage); + store.addMessage(messageToStore); + Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); + assertNotNull(retrievedMessage); + assertTrue(retrievedMessage instanceof AdviceMessage); + assertEquals(messageToStore.getPayload(), retrievedMessage.getPayload()); + assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); + assertEquals(inputMessage, ((AdviceMessage) retrievedMessage).getInputMessage()); + assertEquals(messageToStore, retrievedMessage); + } + + @Test + @MongoDbAvailable + public void testAdviceMessageAsPayload() throws Exception{ + MessageStore store = this.getMessageStore(); + Person p = new Person(); + p.setFname("John"); + p.setLname("Doe"); + Message inputMessage = MessageBuilder.withPayload(p).build(); + Message messageToStore = new GenericMessage>(new AdviceMessage("foo", inputMessage)); + store.addMessage(messageToStore); + Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); + assertNotNull(retrievedMessage); + assertTrue(retrievedMessage.getPayload() instanceof AdviceMessage); + AdviceMessage adviceMessage = (AdviceMessage) retrievedMessage.getPayload(); + assertEquals("foo", adviceMessage.getPayload()); + assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); + assertEquals(inputMessage, adviceMessage.getInputMessage()); + assertEquals(messageToStore, retrievedMessage); + } + + @Test + @MongoDbAvailable + public void testMutableMessageAsPayload() throws Exception{ + MessageStore store = this.getMessageStore(); + Person p = new Person(); + p.setFname("John"); + p.setLname("Doe"); + Message messageToStore = new GenericMessage>(new MutableMessage(p)); + store.addMessage(messageToStore); + Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); + assertNotNull(retrievedMessage); + assertTrue(retrievedMessage.getPayload() instanceof MutableMessage); + assertEquals(messageToStore.getPayload(), retrievedMessage.getPayload()); + assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); + assertEquals(((Message) messageToStore.getPayload()).getPayload(), p); + assertEquals(messageToStore, retrievedMessage); + } + + @Test + @MongoDbAvailable + public void testInt3076ErrorMessage() throws Exception{ + MessageStore store = this.getMessageStore(); + Person p = new Person(); + p.setFname("John"); + p.setLname("Doe"); + Message failedMessage = MessageBuilder.withPayload(p).build(); + MessagingException messagingException; + try { + throw new RuntimeException("intentional"); + } + catch (Exception e) { + messagingException = new MessagingException(failedMessage, "intentional MessagingException", e); + } + Message messageToStore = new ErrorMessage(messagingException); + store.addMessage(messageToStore); + Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); + assertNotNull(retrievedMessage); + assertTrue(retrievedMessage instanceof ErrorMessage); + assertThat(retrievedMessage.getPayload(), Matchers.instanceOf(MessagingException.class)); + assertEquals("intentional MessagingException", ((MessagingException) retrievedMessage.getPayload()).getMessage()); + assertEquals(failedMessage, ((MessagingException) retrievedMessage.getPayload()).getFailedMessage()); + assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); + } + + public static class Foo implements Serializable { /** * diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStoreTests.java index 54a8572547f..bcefb9b2c37 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStoreTests.java @@ -16,26 +16,11 @@ package org.springframework.integration.mongodb.store; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import org.hamcrest.Matchers; -import org.junit.Test; - import org.springframework.context.support.GenericApplicationContext; import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; -import org.springframework.integration.message.AdviceMessage; -import org.springframework.integration.mongodb.rules.MongoDbAvailable; import org.springframework.integration.store.MessageStore; -import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.test.util.TestUtils; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessagingException; -import org.springframework.messaging.support.ErrorMessage; -import org.springframework.messaging.support.GenericMessage; import com.mongodb.Mongo; @@ -56,68 +41,4 @@ protected MessageStore getMessageStore() throws Exception { return mongoDbMessageStore; } - - @Test - @MongoDbAvailable - public void testInt3076MessageAsPayload() throws Exception{ - MessageStore store = this.getMessageStore(); - Person p = new Person(); - p.setFname("John"); - p.setLname("Doe"); - Message messageToStore = new GenericMessage>(MessageBuilder.withPayload(p).build()); - store.addMessage(messageToStore); - Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); - assertNotNull(retrievedMessage); - assertTrue(retrievedMessage.getPayload() instanceof GenericMessage); - assertEquals(messageToStore.getPayload(), retrievedMessage.getPayload()); - assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); - assertEquals(((Message) messageToStore.getPayload()).getPayload(), p); - assertEquals(messageToStore, retrievedMessage); - } - - @Test - @MongoDbAvailable - public void testInt3076AdviceMessage() throws Exception{ - MessageStore store = this.getMessageStore(); - Person p = new Person(); - p.setFname("John"); - p.setLname("Doe"); - Message inputMessage = MessageBuilder.withPayload(p).build(); - Message messageToStore = new AdviceMessage("foo", inputMessage); - store.addMessage(messageToStore); - Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); - assertNotNull(retrievedMessage); - assertTrue(retrievedMessage instanceof AdviceMessage); - assertEquals(messageToStore.getPayload(), retrievedMessage.getPayload()); - assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); - assertEquals(inputMessage, ((AdviceMessage) retrievedMessage).getInputMessage()); - assertEquals(messageToStore, retrievedMessage); - } - - @Test - @MongoDbAvailable - public void testInt3076ErrorMessage() throws Exception{ - MessageStore store = this.getMessageStore(); - Person p = new Person(); - p.setFname("John"); - p.setLname("Doe"); - Message failedMessage = MessageBuilder.withPayload(p).build(); - MessagingException messagingException; - try { - throw new RuntimeException("intentional"); - } - catch (Exception e) { - messagingException = new MessagingException(failedMessage, "intentional MessagingException", e); - } - Message messageToStore = new ErrorMessage(messagingException); - store.addMessage(messageToStore); - Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); - assertNotNull(retrievedMessage); - assertTrue(retrievedMessage instanceof ErrorMessage); - assertThat(retrievedMessage.getPayload(), Matchers.instanceOf(MessagingException.class)); - assertEquals("intentional MessagingException", ((MessagingException) retrievedMessage.getPayload()).getMessage()); - assertEquals(failedMessage, ((MessagingException) retrievedMessage.getPayload()).getFailedMessage()); - assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); - } - } diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java index dcbe587b69e..7a31826582b 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java @@ -24,12 +24,12 @@ import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; -import org.springframework.messaging.Message; import org.springframework.integration.mongodb.rules.MongoDbAvailable; import org.springframework.integration.mongodb.rules.MongoDbAvailableTests; import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.transformer.ClaimCheckInTransformer; import org.springframework.integration.transformer.ClaimCheckOutTransformer; +import org.springframework.messaging.Message; import com.mongodb.Mongo; diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml index 532ab391301..dece0dbcbf2 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml @@ -6,15 +6,15 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"> - + - + - + diff --git a/src/reference/docbook/mongodb.xml b/src/reference/docbook/mongodb.xml index d08bd035c20..10ecc1384d3 100644 --- a/src/reference/docbook/mongodb.xml +++ b/src/reference/docbook/mongodb.xml @@ -88,8 +88,8 @@ Spring Integration's MongoDB module provides the MongoDbMessageStore which is an implementation of both - the MessageStore strategy (mainly used by the QueueChannel and ClaimCheck - patterns) and the MessageGroupStore strategy (mainly used by the Aggregator and + the MessageStore strategy (mainly used by the ClaimCheckpattern) + and the MessageGroupStore strategy (mainly used by the Aggregator and Resequencer patterns). @@ -111,15 +111,18 @@ and an Aggregator. As you can see it is a simple bean configuration, and it expects a MongoDbFactory as a constructor argument. + + The MongoDbMessageStore expands the Message as a Mongo document + with all nested properties using Spring Data Mongo Mapping mechanism. It is useful when needed to have access to + payload or headers for auditing or analytics, for example, against stored messages. + - The MongoDbMessageStore uses a custom MappingMongoConverter implementation + As far as MongoDbMessageStore uses a custom MappingMongoConverter implementation to store Messages as MongoDB documents and there are some limitations for the properties (payload and header values) of the Message. - For example an ErrorMessage can't be converted to the MongoDB document, because it has an - Exception property, where the cause property is infintely recursed. Also, there is no ability to - configure - custom converters for complex domain payloads or header values. + For example, there is no ability to configure custom converters for complex domain payloads or header values. + Or provide some other MongoTemplate (or MappingMongoConverter) customizations. To achieve these capabilities, an alternative MongoDB MessageStore implementation has been introduced; see next paragraph. @@ -128,8 +131,7 @@ Spring Integration 3.0 introduced the ConfigurableMongoDbMessageStore - MessageStore and MessageGroupStore implementation. This class can receive, as a constructor argument, a MongoTemplate, with which you can - configure with a - custom WriteConcern, for example. Another constructor requires a + configure with a custom WriteConcern, for example. Another constructor requires a MappingMongoConverter, and a MongoDbFactory, which allows you to provide some custom conversions for Messages and their properties. Note, by default, the ConfigurableMongoDbMessageStore uses standard Java serialization @@ -138,7 +140,6 @@ MongoDbFactory and MappingMongoConverter. The default name for the collection stored by the ConfigurableMongoDbMessageStore is configurableStoreMessages. It is recommended to use this implementation for robust and flexible solutions. - The MongoDbMessageStore remains for backward compatibility and may be removed in future releases.