From b5db27ef366917f6d9780fc58fa96de0cb2a2792 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 24 Mar 2014 13:06:54 +0200 Subject: [PATCH 1/6] INT-3337: `MongoDbMessageStore` refactoring JIRA: https://jira.spring.io/browse/INT-3337 * Introduce `MessageReadingMongoConverter` (similar to `MongoDbMessageStore.MessageReadingMongoConverter`) * Change tests to use `ConfigurableMongoDbMessageStore` with `MessageReadingMongoConverter` --- .../mongodb/store/MongoDbMessageStore.java | 5 + .../store/support/MessageDocument.java | 97 +++++++ .../support/MessageReadingMongoConverter.java | 253 ++++++++++++++++++ .../mongodb/store/support/package-info.java | 4 + ...dlerRescheduleIntegrationTests-context.xml | 8 +- .../store/MongoDbMessageGroupStoreTests.java | 11 +- ...essageStoreClaimCheckIntegrationTests.java | 13 +- .../store/MongoDbMessageStoreTests.java | 35 --- .../mongodb/store/mongo-aggregator-config.xml | 14 +- src/reference/docbook/mongodb.xml | 15 ++ src/reference/docbook/whats-new.xml | 9 + 11 files changed, 419 insertions(+), 45 deletions(-) create mode 100644 spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocument.java create mode 100644 spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageReadingMongoConverter.java create mode 100644 spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/package-info.java delete mode 100644 spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreTests.java diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java index 46cf9bed557..8dc6d2a6f48 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java @@ -68,6 +68,10 @@ * An implementation of both the {@link MessageStore} and {@link MessageGroupStore} * strategies that relies upon MongoDB for persistence. * + * @deprecated in favor of {@link org.springframework.integration.mongodb.store.ConfigurableMongoDbMessageStore} + * together with {@code org.springframework.integration.mongodb.store.support.MessageReadingMongoConverter}. + * Will be removed in future releases and {@code ConfigurableMongoDbMessageStore} will be renamed to {@code MongoDbMessageStore}. + * * @author Mark Fisher * @author Oleg Zhurakousky * @author Sean Brandt @@ -76,6 +80,7 @@ * @author Artem Bilan * @since 2.1 */ +@Deprecated public class MongoDbMessageStore extends AbstractMessageGroupStore implements MessageStore, BeanClassLoaderAware { private final static String DEFAULT_COLLECTION_NAME = "messages"; diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocument.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocument.java new file mode 100644 index 00000000000..7a57cdce585 --- /dev/null +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocument.java @@ -0,0 +1,97 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mongodb.store.support; + +import java.util.UUID; + +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +/** + * The entity class to wrap {@link org.springframework.messaging.Message} to the MongoDB document. + * + * @author Artem Bilan + * @since 4.0 + */ +public class MessageDocument { + + private final Message message; + + @SuppressWarnings("unused") + private final UUID messageId; + + private volatile Long createdTime = 0L; + + private volatile Object groupId; + + private volatile Long lastModifiedTime = 0L; + + private volatile Boolean complete = false; + + private volatile Integer lastReleasedSequence = 0; + + public MessageDocument(Message message) { + Assert.notNull(message, "'message' must not be null"); + this.message = message; + this.messageId = message.getHeaders().getId(); + } + + public Message getMessage() { + return message; + } + + public void setGroupId(Object groupId) { + this.groupId = groupId; + } + + public Object getGroupId() { + return groupId; + } + + public Long getLastModifiedTime() { + return lastModifiedTime; + } + + public void setLastModifiedTime(long lastModifiedTime) { + this.lastModifiedTime = lastModifiedTime; + } + + public Long getCreatedTime() { + return createdTime; + } + + public void setCreatedTime(long createdTime) { + this.createdTime = createdTime; + } + + public Boolean isComplete() { + return complete; + } + + public void setComplete(boolean complete) { + this.complete = complete; + } + + public Integer getLastReleasedSequence() { + return lastReleasedSequence; + } + + public void setLastReleasedSequence(int lastReleasedSequence) { + this.lastReleasedSequence = lastReleasedSequence; + } + +} diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageReadingMongoConverter.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageReadingMongoConverter.java new file mode 100644 index 00000000000..acd08708322 --- /dev/null +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageReadingMongoConverter.java @@ -0,0 +1,253 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mongodb.store.support; + +import static org.springframework.integration.history.MessageHistory.NAME_PROPERTY; +import static org.springframework.integration.history.MessageHistory.TIMESTAMP_PROPERTY; +import static org.springframework.integration.history.MessageHistory.TYPE_PROPERTY; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.beans.DirectFieldAccessor; +import org.springframework.beans.factory.BeanClassLoaderAware; +import org.springframework.core.convert.converter.Converter; +import org.springframework.data.mongodb.MongoDbFactory; +import org.springframework.data.mongodb.core.convert.CustomConversions; +import org.springframework.data.mongodb.core.convert.MappingMongoConverter; +import org.springframework.data.mongodb.core.mapping.MongoMappingContext; +import org.springframework.integration.history.MessageHistory; +import org.springframework.integration.mongodb.store.ConfigurableMongoDbMessageStore; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; + +import com.mongodb.BasicDBList; +import com.mongodb.BasicDBObject; +import com.mongodb.DBObject; + +/** + * @author Mark Fisher + * @author Oleg Zhurakousky + * @author Artem Bilan + * @since 4.0 + */ +public class MessageReadingMongoConverter extends MappingMongoConverter implements BeanClassLoaderAware { + + private static final Log logger = LogFactory.getLog(MessageReadingMongoConverter.class); + + private final static String CREATED_TIME_KEY = "createdTime"; + + private final static String PAYLOAD_TYPE_KEY = "_payloadType"; + + private final static String HEADERS_KEY = "headers"; + + private final static String PAYLOAD_KEY = "payload"; + + private ClassLoader classLoader; + + public MessageReadingMongoConverter(MongoDbFactory mongoDbFactory) { + super(mongoDbFactory, new MongoMappingContext()); + } + + @Override + public void setBeanClassLoader(ClassLoader classLoader) { + this.classLoader = classLoader; + } + + @Override + public void afterPropertiesSet() { + List> customConverters = new ArrayList>(); + customConverters.add(new MessageHistoryToDBObjectConverter()); + customConverters.add(new DBObjectToGenericMessageConverter()); + this.setCustomConversions(new CustomConversions(customConverters)); + super.afterPropertiesSet(); + } + + @Override + public void write(Object source, DBObject target) { + Assert.isInstanceOf(MessageDocument.class, source); + + MessageDocument document = (MessageDocument) source; + Message message = document.getMessage(); + + target.put(CREATED_TIME_KEY, document.getCreatedTime()); + target.put(ConfigurableMongoDbMessageStore.GROUP_ID, document.getGroupId()); + target.put(ConfigurableMongoDbMessageStore.LAST_MODIFIED_TIME, document.getLastModifiedTime()); + target.put(ConfigurableMongoDbMessageStore.LAST_RELEASED_SEQUENCE, document.getLastReleasedSequence()); + target.put(ConfigurableMongoDbMessageStore.COMPLETE, document.isComplete()); + target.put(ConfigurableMongoDbMessageStore.MESSAGE_ID, message.getHeaders().getId()); + + target.put(HEADERS_KEY, this.convertToMongoType(message.getHeaders())); + + target.put(PAYLOAD_TYPE_KEY, message.getPayload().getClass().getName()); + Object payload = message.getPayload(); + if (!this.conversions.isSimpleType(payload.getClass())) { + DBObject dbo = new BasicDBObject(); + super.write(payload, dbo); + payload = dbo; + } + target.put(PAYLOAD_KEY, payload); + target.put("_class", MessageDocument.class.getName()); + } + + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public S read(Class clazz, DBObject source) { + if (!MessageDocument.class.equals(clazz)) { + return super.read(clazz, source); + } + if (source != null) { + Map headers = this.normalizeHeaders((Map) source.get(HEADERS_KEY)); + + Object payload = source.get(PAYLOAD_KEY); + Object payloadType = source.get(PAYLOAD_TYPE_KEY); + if (payloadType != null && payload instanceof DBObject) { + try { + Class payloadClass = ClassUtils.forName(payloadType.toString(), classLoader); + payload = this.read(payloadClass, (DBObject) payload); + } + catch (Exception e) { + throw new IllegalStateException("failed to load class: " + payloadType, e); + } + } + GenericMessage message = new GenericMessage(payload, headers); + Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); + // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders + innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); + innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); + Long groupTimestamp = (Long)source.get(CREATED_TIME_KEY); + Long lastModified = (Long)source.get(ConfigurableMongoDbMessageStore.LAST_MODIFIED_TIME); + Integer lastReleasedSequenceNumber = (Integer)source.get(ConfigurableMongoDbMessageStore.LAST_RELEASED_SEQUENCE); + Boolean completeGroup = (Boolean)source.get(ConfigurableMongoDbMessageStore.COMPLETE); + + MessageDocument document = new MessageDocument(message); + + if (source.containsField(ConfigurableMongoDbMessageStore.GROUP_ID)){ + document.setGroupId(source.get(ConfigurableMongoDbMessageStore.GROUP_ID)); + } + if (groupTimestamp != null){ + document.setCreatedTime(groupTimestamp); + } + if (lastModified != null){ + document.setLastModifiedTime(lastModified); + } + if (lastReleasedSequenceNumber != null){ + document.setLastReleasedSequence(lastReleasedSequenceNumber); + } + + if (completeGroup != null){ + document.setComplete(completeGroup); + } + + return (S) document; + } + return null; + } + + private Map normalizeHeaders(Map headers) { + Map normalizedHeaders = new HashMap(); + for (String headerName : headers.keySet()) { + Object headerValue = headers.get(headerName); + if (headerValue instanceof DBObject) { + DBObject source = (DBObject) headerValue; + try { + Class typeClass = null; + if (source.containsField("_class")) { + Object type = source.get("_class"); + typeClass = ClassUtils.forName(type.toString(), classLoader); + } + else if (source instanceof BasicDBList) { + typeClass = List.class; + } + else { + throw new IllegalStateException("Unsupported 'DBObject' type: " + source.getClass()); + } + normalizedHeaders.put(headerName, super.read(typeClass, source)); + } + catch (Exception e) { + logger.warn("Header '" + headerName + "' could not be deserialized.", e); + } + } + else { + normalizedHeaders.put(headerName, headerValue); + } + } + return normalizedHeaders; + } + + + private static class MessageHistoryToDBObjectConverter implements Converter { + + @Override + public DBObject convert(MessageHistory source) { + BasicDBObject obj = new BasicDBObject(); + obj.put("_class", MessageHistory.class.getName()); + BasicDBList dbList = new BasicDBList(); + for (Properties properties : source) { + BasicDBObject dbo = new BasicDBObject(); + dbo.put(NAME_PROPERTY, properties.getProperty(NAME_PROPERTY)); + dbo.put(TYPE_PROPERTY, properties.getProperty(TYPE_PROPERTY)); + dbo.put(TIMESTAMP_PROPERTY, properties.getProperty(TIMESTAMP_PROPERTY)); + dbList.add(dbo); + } + obj.put("components", dbList); + return obj; + } + } + + private class DBObjectToGenericMessageConverter implements Converter> { + + @Override + @SuppressWarnings("unchecked") + public GenericMessage convert(DBObject source) { + Map headers = MessageReadingMongoConverter.this.normalizeHeaders((Map) source.get("headers")); + + Object payload = source.get(PAYLOAD_KEY); + Object payloadType = source.get(PAYLOAD_TYPE_KEY); + if (payloadType != null && payload instanceof DBObject) { + try { + Class payloadClass = ClassUtils.forName(payloadType.toString(), classLoader); + payload = MessageReadingMongoConverter.this.read(payloadClass, (DBObject) payload); + } + catch (Exception e) { + throw new IllegalStateException("failed to load class: " + payloadType, e); + } + } + + @SuppressWarnings("rawtypes") + GenericMessage message = new GenericMessage(payload, headers); + Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); + // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders + innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); + innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); + + return message; + } + + } + +} diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/package-info.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/package-info.java new file mode 100644 index 00000000000..60dc8b71c03 --- /dev/null +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides support classes related to the MongoDB message store. + */ +package org.springframework.integration.mongodb.store.support; diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests-context.xml b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests-context.xml index a6ce73f3ad0..7360b0c62ec 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests-context.xml +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests-context.xml @@ -12,8 +12,10 @@ - + + + @@ -26,4 +28,8 @@ default-delay="10000" message-store="messageStore"/> + + + + diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java index 110ae46663f..82066d23a06 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java @@ -17,8 +17,10 @@ import org.junit.Test; +import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; import org.springframework.integration.mongodb.rules.MongoDbAvailable; +import org.springframework.integration.mongodb.store.support.MessageReadingMongoConverter; import org.springframework.integration.store.MessageStore; import com.mongodb.Mongo; @@ -32,8 +34,13 @@ public class MongoDbMessageGroupStoreTests extends AbstractMongoDbMessageGroupStoreTests { @Override - protected MongoDbMessageStore getMessageGroupStore() throws Exception { - return new MongoDbMessageStore( new SimpleMongoDbFactory(new Mongo(), "test")); + protected ConfigurableMongoDbMessageStore getMessageGroupStore() throws Exception { + MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test"); + MessageReadingMongoConverter converter = new MessageReadingMongoConverter(mongoDbFactory); + converter.afterPropertiesSet(); + ConfigurableMongoDbMessageStore messageStore = new ConfigurableMongoDbMessageStore(mongoDbFactory, converter, "messages"); + messageStore.afterPropertiesSet(); + return messageStore; } @Override diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java index dcbe587b69e..a4abb7dba8a 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java @@ -24,12 +24,13 @@ import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; -import org.springframework.messaging.Message; import org.springframework.integration.mongodb.rules.MongoDbAvailable; import org.springframework.integration.mongodb.rules.MongoDbAvailableTests; +import org.springframework.integration.mongodb.store.support.MessageReadingMongoConverter; import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.transformer.ClaimCheckInTransformer; import org.springframework.integration.transformer.ClaimCheckOutTransformer; +import org.springframework.messaging.Message; import com.mongodb.Mongo; @@ -43,7 +44,10 @@ public class MongoDbMessageStoreClaimCheckIntegrationTests extends MongoDbAvaila @MongoDbAvailable public void stringPayload() throws Exception { MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test"); - MongoDbMessageStore messageStore = new MongoDbMessageStore(mongoDbFactory); + MessageReadingMongoConverter converter = new MessageReadingMongoConverter(mongoDbFactory); + converter.afterPropertiesSet(); + ConfigurableMongoDbMessageStore messageStore = new ConfigurableMongoDbMessageStore(mongoDbFactory, converter, "messages"); + messageStore.afterPropertiesSet(); ClaimCheckInTransformer checkin = new ClaimCheckInTransformer(messageStore); ClaimCheckOutTransformer checkout = new ClaimCheckOutTransformer(messageStore); Message originalMessage = MessageBuilder.withPayload("test1").build(); @@ -59,7 +63,10 @@ public void stringPayload() throws Exception { @MongoDbAvailable public void objectPayload() throws Exception { MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test"); - MongoDbMessageStore messageStore = new MongoDbMessageStore(mongoDbFactory); + MessageReadingMongoConverter converter = new MessageReadingMongoConverter(mongoDbFactory); + converter.afterPropertiesSet(); + ConfigurableMongoDbMessageStore messageStore = new ConfigurableMongoDbMessageStore(mongoDbFactory, converter, "messages"); + messageStore.afterPropertiesSet(); ClaimCheckInTransformer checkin = new ClaimCheckInTransformer(messageStore); ClaimCheckOutTransformer checkout = new ClaimCheckOutTransformer(messageStore); Beverage payload = new Beverage(); diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreTests.java deleted file mode 100644 index 12641284110..00000000000 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreTests.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.integration.mongodb.store; - -import org.springframework.data.mongodb.core.SimpleMongoDbFactory; -import org.springframework.integration.store.MessageStore; - -import com.mongodb.Mongo; -/** - * @author Mark Fisher - * @author Oleg Zhurakousky - * @author Artem Bilan - * - */ -public class MongoDbMessageStoreTests extends AbstractMongoDbMessageStoreTests { - - @Override - protected MessageStore getMessageStore() throws Exception { - return new MongoDbMessageStore(new SimpleMongoDbFactory(new Mongo(), "test")); - } - -} diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml index 532ab391301..7ecdbe63561 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml @@ -6,15 +6,17 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"> - + - - + + + + - + @@ -22,4 +24,8 @@ + + + + diff --git a/src/reference/docbook/mongodb.xml b/src/reference/docbook/mongodb.xml index d08bd035c20..ccc7757f6dd 100644 --- a/src/reference/docbook/mongodb.xml +++ b/src/reference/docbook/mongodb.xml @@ -140,6 +140,21 @@ configurableStoreMessages. It is recommended to use this implementation for robust and flexible solutions. The MongoDbMessageStore remains for backward compatibility and may be removed in future releases. + + Starting with Spring Integration 4.0 the MongoDbMessageStore is + deprecated and will be removed in future release and the ConfigurableMongoDbMessageStore + will be renamed. To achieve the similar MongoDB documents representation as it is with MongoDbMessageStore + (to have an access to the headers and payload properties) it is just necessary to configure + ConfigurableMongoDbMessageStore with MessageReadingMongoConverter: + + + + + + + +]]> +
diff --git a/src/reference/docbook/whats-new.xml b/src/reference/docbook/whats-new.xml index a68cd54b590..31a30e16552 100644 --- a/src/reference/docbook/whats-new.xml +++ b/src/reference/docbook/whats-new.xml @@ -185,5 +185,14 @@ For more information see .
+
+ MongoDB MessageStore: MessageReadingMongoConverter + + The MongoDbMessageStore has been deprecated in favor of + ConfigurableMongoDbMessageStore. The new MessageReadingMongoConverter + has been introduced to provide MongoDB document representation similar to MongoDbMessageStore results. + For more information see . + +
From 9ecea33efabed2749858c2b1902a703f76e68734 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 27 Mar 2014 17:06:22 +0200 Subject: [PATCH 2/6] INT-3337: Addressing PR comments --- .../mongodb/store/MongoDbMessageStore.java | 4 +- ...ava => MessageDocumentMongoConverter.java} | 69 ++++++++++--------- ...AbstractMongoDbMessageGroupStoreTests.java | 7 +- ...dlerRescheduleIntegrationTests-context.xml | 2 +- ...ayerHandlerRescheduleIntegrationTests.java | 5 +- .../store/MongoDbMessageGroupStoreTests.java | 4 +- ...essageStoreClaimCheckIntegrationTests.java | 8 +-- .../mongodb/store/mongo-aggregator-config.xml | 2 +- src/reference/docbook/mongodb.xml | 10 +-- src/reference/docbook/whats-new.xml | 4 +- 10 files changed, 57 insertions(+), 58 deletions(-) rename spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/{MessageReadingMongoConverter.java => MessageDocumentMongoConverter.java} (71%) diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java index 8dc6d2a6f48..ff6da4cfd32 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java @@ -69,8 +69,8 @@ * strategies that relies upon MongoDB for persistence. * * @deprecated in favor of {@link org.springframework.integration.mongodb.store.ConfigurableMongoDbMessageStore} - * together with {@code org.springframework.integration.mongodb.store.support.MessageReadingMongoConverter}. - * Will be removed in future releases and {@code ConfigurableMongoDbMessageStore} will be renamed to {@code MongoDbMessageStore}. + * together with {@code org.springframework.integration.mongodb.store.support.MessageDocumentMongoConverter}. + * Will be removed in future releases. * * @author Mark Fisher * @author Oleg Zhurakousky diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageReadingMongoConverter.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentMongoConverter.java similarity index 71% rename from spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageReadingMongoConverter.java rename to spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentMongoConverter.java index acd08708322..7ceb68521bd 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageReadingMongoConverter.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentMongoConverter.java @@ -29,7 +29,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.core.convert.converter.Converter; import org.springframework.data.mongodb.MongoDbFactory; @@ -37,10 +36,8 @@ import org.springframework.data.mongodb.core.convert.MappingMongoConverter; import org.springframework.data.mongodb.core.mapping.MongoMappingContext; import org.springframework.integration.history.MessageHistory; -import org.springframework.integration.mongodb.store.ConfigurableMongoDbMessageStore; +import org.springframework.integration.message.MutableMessage; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.support.GenericMessage; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -54,9 +51,9 @@ * @author Artem Bilan * @since 4.0 */ -public class MessageReadingMongoConverter extends MappingMongoConverter implements BeanClassLoaderAware { +public class MessageDocumentMongoConverter extends MappingMongoConverter implements BeanClassLoaderAware { - private static final Log logger = LogFactory.getLog(MessageReadingMongoConverter.class); + private static final Log logger = LogFactory.getLog(MessageDocumentMongoConverter.class); private final static String CREATED_TIME_KEY = "createdTime"; @@ -66,9 +63,19 @@ public class MessageReadingMongoConverter extends MappingMongoConverter implemen private final static String PAYLOAD_KEY = "payload"; + private static final String MESSAGE_ID = "messageId"; + + private static final String GROUP_ID = "groupId"; + + private static final String LAST_MODIFIED_TIME = "lastModifiedTime"; + + private static final String LAST_RELEASED_SEQUENCE = "lastReleasedSequence"; + + private static final String COMPLETE = "complete"; + private ClassLoader classLoader; - public MessageReadingMongoConverter(MongoDbFactory mongoDbFactory) { + public MessageDocumentMongoConverter(MongoDbFactory mongoDbFactory) { super(mongoDbFactory, new MongoMappingContext()); } @@ -81,7 +88,7 @@ public void setBeanClassLoader(ClassLoader classLoader) { public void afterPropertiesSet() { List> customConverters = new ArrayList>(); customConverters.add(new MessageHistoryToDBObjectConverter()); - customConverters.add(new DBObjectToGenericMessageConverter()); + customConverters.add(new DBObjectToMessageConverter()); this.setCustomConversions(new CustomConversions(customConverters)); super.afterPropertiesSet(); } @@ -94,11 +101,11 @@ public void write(Object source, DBObject target) { Message message = document.getMessage(); target.put(CREATED_TIME_KEY, document.getCreatedTime()); - target.put(ConfigurableMongoDbMessageStore.GROUP_ID, document.getGroupId()); - target.put(ConfigurableMongoDbMessageStore.LAST_MODIFIED_TIME, document.getLastModifiedTime()); - target.put(ConfigurableMongoDbMessageStore.LAST_RELEASED_SEQUENCE, document.getLastReleasedSequence()); - target.put(ConfigurableMongoDbMessageStore.COMPLETE, document.isComplete()); - target.put(ConfigurableMongoDbMessageStore.MESSAGE_ID, message.getHeaders().getId()); + target.put(GROUP_ID, document.getGroupId()); + target.put(LAST_MODIFIED_TIME, document.getLastModifiedTime()); + target.put(LAST_RELEASED_SEQUENCE, document.getLastReleasedSequence()); + target.put(COMPLETE, document.isComplete()); + target.put(MESSAGE_ID, message.getHeaders().getId()); target.put(HEADERS_KEY, this.convertToMongoType(message.getHeaders())); @@ -134,20 +141,18 @@ public S read(Class clazz, DBObject source) { throw new IllegalStateException("failed to load class: " + payloadType, e); } } - GenericMessage message = new GenericMessage(payload, headers); - Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); - // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders - innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); - innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); + MutableMessage message = new MutableMessage(payload); + message.getRawHeaders().putAll(headers); + Long groupTimestamp = (Long)source.get(CREATED_TIME_KEY); - Long lastModified = (Long)source.get(ConfigurableMongoDbMessageStore.LAST_MODIFIED_TIME); - Integer lastReleasedSequenceNumber = (Integer)source.get(ConfigurableMongoDbMessageStore.LAST_RELEASED_SEQUENCE); - Boolean completeGroup = (Boolean)source.get(ConfigurableMongoDbMessageStore.COMPLETE); + Long lastModified = (Long)source.get(LAST_MODIFIED_TIME); + Integer lastReleasedSequenceNumber = (Integer)source.get(LAST_RELEASED_SEQUENCE); + Boolean completeGroup = (Boolean)source.get(COMPLETE); MessageDocument document = new MessageDocument(message); - if (source.containsField(ConfigurableMongoDbMessageStore.GROUP_ID)){ - document.setGroupId(source.get(ConfigurableMongoDbMessageStore.GROUP_ID)); + if (source.containsField(GROUP_ID)){ + document.setGroupId(source.get(GROUP_ID)); } if (groupTimestamp != null){ document.setCreatedTime(groupTimestamp); @@ -219,31 +224,27 @@ public DBObject convert(MessageHistory source) { } } - private class DBObjectToGenericMessageConverter implements Converter> { + private class DBObjectToMessageConverter implements Converter> { @Override - @SuppressWarnings("unchecked") - public GenericMessage convert(DBObject source) { - Map headers = MessageReadingMongoConverter.this.normalizeHeaders((Map) source.get("headers")); + public Message convert(DBObject source) { + @SuppressWarnings("unchecked") + Map headers = MessageDocumentMongoConverter.this.normalizeHeaders((Map) source.get("headers")); Object payload = source.get(PAYLOAD_KEY); Object payloadType = source.get(PAYLOAD_TYPE_KEY); if (payloadType != null && payload instanceof DBObject) { try { Class payloadClass = ClassUtils.forName(payloadType.toString(), classLoader); - payload = MessageReadingMongoConverter.this.read(payloadClass, (DBObject) payload); + payload = MessageDocumentMongoConverter.this.read(payloadClass, (DBObject) payload); } catch (Exception e) { throw new IllegalStateException("failed to load class: " + payloadType, e); } } - @SuppressWarnings("rawtypes") - GenericMessage message = new GenericMessage(payload, headers); - Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); - // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders - innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); - innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); + MutableMessage message = new MutableMessage(payload); + message.getRawHeaders().putAll(headers); return message; } diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java index 179c25fc584..a7864f83f75 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java @@ -113,7 +113,7 @@ public void testMessageGroupWithAddedMessageUUIDGroupIdAndUUIDHeader() throws Ex @Test @MongoDbAvailable - public void testCountMessagesInGroup() throws Exception{ + public void testCountMessagesInGroup() throws Exception { this.cleanupCollections(new SimpleMongoDbFactory(new Mongo(), "test")); MessageGroupStore store = this.getMessageGroupStore(); Message messageA = new GenericMessage("A"); @@ -125,12 +125,13 @@ public void testCountMessagesInGroup() throws Exception{ @Test @MongoDbAvailable - public void testPollMessages() throws Exception{ + public void testPollMessages() throws Exception { this.cleanupCollections(new SimpleMongoDbFactory(new Mongo(), "test")); MessageGroupStore store = this.getMessageGroupStore(); Message messageA = new GenericMessage("A"); Message messageB = new GenericMessage("B"); store.addMessageToGroup(1, messageA); + Thread.sleep(10); store.addMessageToGroup(1, messageB); assertEquals(2, store.messageGroupSize(1)); Message out = store.pollMessageFromGroup(1); @@ -144,7 +145,7 @@ public void testPollMessages() throws Exception{ @Test @MongoDbAvailable - public void testSameMessageMultipleGroupsPoll() throws Exception{ + public void testSameMessageMultipleGroupsPoll() throws Exception { this.cleanupCollections(new SimpleMongoDbFactory(new Mongo(), "test")); MessageGroupStore store = this.getMessageGroupStore(); Message messageA = new GenericMessage("A"); diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests-context.xml b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests-context.xml index 7360b0c62ec..5c63b8cfce8 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests-context.xml +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests-context.xml @@ -28,7 +28,7 @@ default-delay="10000" message-store="messageStore"/> - + diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests.java index cc1282d52c6..d314a461009 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests.java @@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit; import org.hamcrest.Matchers; -import org.junit.Rule; import org.junit.Test; import org.springframework.context.support.AbstractApplicationContext; @@ -50,7 +49,7 @@ public class DelayerHandlerRescheduleIntegrationTests extends MongoDbAvailableTe public static final String DELAYER_ID = "delayerWithMongoMS"; - @Rule +// @Rule public LongRunningIntegrationTest longTests = new LongRunningIntegrationTest(); @Test @@ -108,7 +107,7 @@ private void testDelayerHandlerRescheduleWithMongoDbMessageStore(String config) Message original1 = (Message) ((DelayHandler.DelayedMessageWrapper) payload).getOriginal(); messageInStore = iterator.next(); Message original2 = (Message) ((DelayHandler.DelayedMessageWrapper) messageInStore.getPayload()).getOriginal(); - assertThat(message1, Matchers.anyOf(Matchers.is(original1), Matchers.is(original2))); + assertThat(message1.getPayload(), Matchers.anyOf(Matchers.is(original1.getPayload()), Matchers.is(original2.getPayload()))); context.refresh(); diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java index 82066d23a06..a61ff5621be 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java @@ -20,7 +20,7 @@ import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; import org.springframework.integration.mongodb.rules.MongoDbAvailable; -import org.springframework.integration.mongodb.store.support.MessageReadingMongoConverter; +import org.springframework.integration.mongodb.store.support.MessageDocumentMongoConverter; import org.springframework.integration.store.MessageStore; import com.mongodb.Mongo; @@ -36,7 +36,7 @@ public class MongoDbMessageGroupStoreTests extends AbstractMongoDbMessageGroupSt @Override protected ConfigurableMongoDbMessageStore getMessageGroupStore() throws Exception { MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test"); - MessageReadingMongoConverter converter = new MessageReadingMongoConverter(mongoDbFactory); + MessageDocumentMongoConverter converter = new MessageDocumentMongoConverter(mongoDbFactory); converter.afterPropertiesSet(); ConfigurableMongoDbMessageStore messageStore = new ConfigurableMongoDbMessageStore(mongoDbFactory, converter, "messages"); messageStore.afterPropertiesSet(); diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java index a4abb7dba8a..5d497357544 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java @@ -26,7 +26,7 @@ import org.springframework.data.mongodb.core.SimpleMongoDbFactory; import org.springframework.integration.mongodb.rules.MongoDbAvailable; import org.springframework.integration.mongodb.rules.MongoDbAvailableTests; -import org.springframework.integration.mongodb.store.support.MessageReadingMongoConverter; +import org.springframework.integration.mongodb.store.support.MessageDocumentMongoConverter; import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.transformer.ClaimCheckInTransformer; import org.springframework.integration.transformer.ClaimCheckOutTransformer; @@ -44,7 +44,7 @@ public class MongoDbMessageStoreClaimCheckIntegrationTests extends MongoDbAvaila @MongoDbAvailable public void stringPayload() throws Exception { MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test"); - MessageReadingMongoConverter converter = new MessageReadingMongoConverter(mongoDbFactory); + MessageDocumentMongoConverter converter = new MessageDocumentMongoConverter(mongoDbFactory); converter.afterPropertiesSet(); ConfigurableMongoDbMessageStore messageStore = new ConfigurableMongoDbMessageStore(mongoDbFactory, converter, "messages"); messageStore.afterPropertiesSet(); @@ -56,14 +56,13 @@ public void stringPayload() throws Exception { Message checkedOutMessage = checkout.transform(claimCheckMessage); assertEquals(claimCheckMessage.getPayload(), checkedOutMessage.getHeaders().getId()); assertEquals(originalMessage.getPayload(), checkedOutMessage.getPayload()); - assertEquals(originalMessage, checkedOutMessage); } @Test @MongoDbAvailable public void objectPayload() throws Exception { MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test"); - MessageReadingMongoConverter converter = new MessageReadingMongoConverter(mongoDbFactory); + MessageDocumentMongoConverter converter = new MessageDocumentMongoConverter(mongoDbFactory); converter.afterPropertiesSet(); ConfigurableMongoDbMessageStore messageStore = new ConfigurableMongoDbMessageStore(mongoDbFactory, converter, "messages"); messageStore.afterPropertiesSet(); @@ -79,7 +78,6 @@ public void objectPayload() throws Exception { Message checkedOutMessage = checkout.transform(claimCheckMessage); assertEquals(originalMessage.getPayload(), checkedOutMessage.getPayload()); assertEquals(claimCheckMessage.getPayload(), checkedOutMessage.getHeaders().getId()); - assertEquals(originalMessage, checkedOutMessage); } @Test diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml index 7ecdbe63561..dfea5c79f48 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml @@ -24,7 +24,7 @@ - + diff --git a/src/reference/docbook/mongodb.xml b/src/reference/docbook/mongodb.xml index ccc7757f6dd..8b8ceb8f5e4 100644 --- a/src/reference/docbook/mongodb.xml +++ b/src/reference/docbook/mongodb.xml @@ -142,14 +142,14 @@ Starting with Spring Integration 4.0 the MongoDbMessageStore is - deprecated and will be removed in future release and the ConfigurableMongoDbMessageStore - will be renamed. To achieve the similar MongoDB documents representation as it is with MongoDbMessageStore - (to have an access to the headers and payload properties) it is just necessary to configure - ConfigurableMongoDbMessageStore with MessageReadingMongoConverter: + deprecated and will be removed in future release. To achieve the similar MongoDB documents representation + as it is with MongoDbMessageStore (to have an access to the headers + and payload properties) it is just necessary to configure + ConfigurableMongoDbMessageStore with MessageDocumentMongoConverter: - + diff --git a/src/reference/docbook/whats-new.xml b/src/reference/docbook/whats-new.xml index 31a30e16552..f21b244dbf9 100644 --- a/src/reference/docbook/whats-new.xml +++ b/src/reference/docbook/whats-new.xml @@ -186,10 +186,10 @@
- MongoDB MessageStore: MessageReadingMongoConverter + MongoDB MessageStore: MessageDocumentMongoConverter The MongoDbMessageStore has been deprecated in favor of - ConfigurableMongoDbMessageStore. The new MessageReadingMongoConverter + ConfigurableMongoDbMessageStore. The new MessageDocumentMongoConverter has been introduced to provide MongoDB document representation similar to MongoDbMessageStore results. For more information see . From 2e69e5ca0bd1500e3402e3e2b0a732724d6a73fa Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 27 Mar 2014 20:54:14 +0200 Subject: [PATCH 3/6] INT-3337: Add support to store/read Messages * Add for `MessageDocumentMongoConverter` custom `converters` for all known `Message` type * Add `MessageDocumentFields` * Add support to store/read `ErrorMessage`. As a trick for `Throwable` is selected (de)serializing converter --- .../integration/message/MutableMessage.java | 9 +- .../store/support/MessageDocumentFields.java | 37 ++++ .../MessageDocumentMongoConverter.java | 206 +++++++++++++----- .../AbstractMongoDbMessageStoreTests.java | 150 +++++++++---- .../ConfigurableMongoDbMessageStoreTests.java | 79 ------- ...ayerHandlerRescheduleIntegrationTests.java | 5 +- .../store/MongoDbMessageStoreTests.java | 42 ++++ 7 files changed, 348 insertions(+), 180 deletions(-) create mode 100644 spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentFields.java create mode 100644 spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreTests.java diff --git a/spring-integration-core/src/main/java/org/springframework/integration/message/MutableMessage.java b/spring-integration-core/src/main/java/org/springframework/integration/message/MutableMessage.java index 314b08ee4bc..a9ec6fcc9d8 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/message/MutableMessage.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/message/MutableMessage.java @@ -23,6 +23,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.GenericMessage; +import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; /** @@ -34,6 +35,7 @@ * a reference to the message and changes will be reflected there too. * * @author Gary Russell + * @author Artem Bilan * @since 4.0 * */ @@ -52,9 +54,10 @@ public MutableMessage(T payload) { } @SuppressWarnings("unchecked") - public MutableMessage(T payload, MessageHeaders headers) { - this.payload = payload; + public MutableMessage(T payload, Map headers) { + Assert.notNull(payload, "payload must not be null"); this.headers = new MessageHeaders(headers); + this.payload = payload; // Needs SPR-11468 to avoid DFA and header manipulation rawHeaders = (Map) new DirectFieldAccessor(this.headers) .getPropertyValue("headers"); @@ -64,6 +67,7 @@ public MutableMessage(T payload, MessageHeaders headers) { } } + @Override public MessageHeaders getHeaders() { return this.headers; @@ -75,6 +79,7 @@ public T getPayload() { } public void setPayload(T payload) { + Assert.notNull(payload, "'payload' must not be null"); this.payload = payload; } diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentFields.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentFields.java new file mode 100644 index 00000000000..854c710e1bb --- /dev/null +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentFields.java @@ -0,0 +1,37 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mongodb.store.support; + +/** + * @author Artem Bilan + * @since 4.0 + */ +public final class MessageDocumentFields { + + public static final String MESSAGE_ID = "messageId"; + + public static final String GROUP_ID = "groupId"; + + public final static String CREATED_TIME = "createdTime"; + + public static final String LAST_MODIFIED_TIME = "lastModifiedTime"; + + public static final String LAST_RELEASED_SEQUENCE = "lastReleasedSequence"; + + public static final String COMPLETE = "complete"; + +} diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentMongoConverter.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentMongoConverter.java index 7ceb68521bd..8eac6e7e38a 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentMongoConverter.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentMongoConverter.java @@ -29,15 +29,23 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.core.convert.converter.Converter; +import org.springframework.core.serializer.support.DeserializingConverter; +import org.springframework.core.serializer.support.SerializingConverter; +import org.springframework.data.convert.WritingConverter; import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.convert.CustomConversions; import org.springframework.data.mongodb.core.convert.MappingMongoConverter; import org.springframework.data.mongodb.core.mapping.MongoMappingContext; import org.springframework.integration.history.MessageHistory; +import org.springframework.integration.message.AdviceMessage; import org.springframework.integration.message.MutableMessage; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.ErrorMessage; +import org.springframework.messaging.support.GenericMessage; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -46,6 +54,8 @@ import com.mongodb.DBObject; /** + * The custom {@link MappingMongoConverter} to decompose {@link Message} as a MongoDB field set. + * * @author Mark Fisher * @author Oleg Zhurakousky * @author Artem Bilan @@ -55,23 +65,13 @@ public class MessageDocumentMongoConverter extends MappingMongoConverter impleme private static final Log logger = LogFactory.getLog(MessageDocumentMongoConverter.class); - private final static String CREATED_TIME_KEY = "createdTime"; - - private final static String PAYLOAD_TYPE_KEY = "_payloadType"; - - private final static String HEADERS_KEY = "headers"; - - private final static String PAYLOAD_KEY = "payload"; + private final static String PAYLOAD_TYPE = "_payloadType"; - private static final String MESSAGE_ID = "messageId"; + private final static String MESSAGE_TYPE = "_messageType"; - private static final String GROUP_ID = "groupId"; + private final static String HEADERS = "headers"; - private static final String LAST_MODIFIED_TIME = "lastModifiedTime"; - - private static final String LAST_RELEASED_SEQUENCE = "lastReleasedSequence"; - - private static final String COMPLETE = "complete"; + private final static String PAYLOAD = "payload"; private ClassLoader classLoader; @@ -86,9 +86,13 @@ public void setBeanClassLoader(ClassLoader classLoader) { @Override public void afterPropertiesSet() { - List> customConverters = new ArrayList>(); + List customConverters = new ArrayList(); customConverters.add(new MessageHistoryToDBObjectConverter()); - customConverters.add(new DBObjectToMessageConverter()); + customConverters.add(new DBObjectToGenericMessageConverter()); + customConverters.add(new DBObjectToMutableMessageConverter()); + customConverters.add(new DBObjectToErrorMessageConverter()); + customConverters.add(new DBObjectToAdviceMessageConverter()); + customConverters.add(new ThrowableToBytesConverter()); this.setCustomConversions(new CustomConversions(customConverters)); super.afterPropertiesSet(); } @@ -100,59 +104,63 @@ public void write(Object source, DBObject target) { MessageDocument document = (MessageDocument) source; Message message = document.getMessage(); - target.put(CREATED_TIME_KEY, document.getCreatedTime()); - target.put(GROUP_ID, document.getGroupId()); - target.put(LAST_MODIFIED_TIME, document.getLastModifiedTime()); - target.put(LAST_RELEASED_SEQUENCE, document.getLastReleasedSequence()); - target.put(COMPLETE, document.isComplete()); - target.put(MESSAGE_ID, message.getHeaders().getId()); + target.put(MessageDocumentFields.CREATED_TIME, document.getCreatedTime()); + target.put(MessageDocumentFields.GROUP_ID, document.getGroupId()); + target.put(MessageDocumentFields.LAST_MODIFIED_TIME, document.getLastModifiedTime()); + target.put(MessageDocumentFields.LAST_RELEASED_SEQUENCE, document.getLastReleasedSequence()); + target.put(MessageDocumentFields.COMPLETE, document.isComplete()); + target.put(MessageDocumentFields.MESSAGE_ID, message.getHeaders().getId()); - target.put(HEADERS_KEY, this.convertToMongoType(message.getHeaders())); + target.put(HEADERS, this.convertToMongoType(message.getHeaders())); - target.put(PAYLOAD_TYPE_KEY, message.getPayload().getClass().getName()); + target.put(MESSAGE_TYPE, message.getClass().getName()); + target.put(PAYLOAD_TYPE, message.getPayload().getClass().getName()); Object payload = message.getPayload(); if (!this.conversions.isSimpleType(payload.getClass())) { DBObject dbo = new BasicDBObject(); super.write(payload, dbo); payload = dbo; } - target.put(PAYLOAD_KEY, payload); + else if (payload instanceof Throwable) { + payload = this.convertToMongoType(payload); + } + + target.put(PAYLOAD, payload); + + if (message instanceof AdviceMessage) { + DBObject dbo = new BasicDBObject(); + super.write(((AdviceMessage) message).getInputMessage(), dbo); + target.put("inputMessage", dbo); + } target.put("_class", MessageDocument.class.getName()); } @Override - @SuppressWarnings({"unchecked", "rawtypes"}) + @SuppressWarnings("unchecked") public S read(Class clazz, DBObject source) { if (!MessageDocument.class.equals(clazz)) { return super.read(clazz, source); } if (source != null) { - Map headers = this.normalizeHeaders((Map) source.get(HEADERS_KEY)); - - Object payload = source.get(PAYLOAD_KEY); - Object payloadType = source.get(PAYLOAD_TYPE_KEY); - if (payloadType != null && payload instanceof DBObject) { - try { - Class payloadClass = ClassUtils.forName(payloadType.toString(), classLoader); - payload = this.read(payloadClass, (DBObject) payload); - } - catch (Exception e) { - throw new IllegalStateException("failed to load class: " + payloadType, e); - } + Message message = null; + Object messageType = source.get(MESSAGE_TYPE); + try { + message = (Message) this.read(ClassUtils.forName(messageType.toString(), classLoader), source); + } + catch (ClassNotFoundException e) { + throw new IllegalStateException("failed to load class: " + messageType, e); } - MutableMessage message = new MutableMessage(payload); - message.getRawHeaders().putAll(headers); - Long groupTimestamp = (Long)source.get(CREATED_TIME_KEY); - Long lastModified = (Long)source.get(LAST_MODIFIED_TIME); - Integer lastReleasedSequenceNumber = (Integer)source.get(LAST_RELEASED_SEQUENCE); - Boolean completeGroup = (Boolean)source.get(COMPLETE); + Long groupTimestamp = (Long)source.get(MessageDocumentFields.CREATED_TIME); + Long lastModified = (Long)source.get(MessageDocumentFields.LAST_MODIFIED_TIME); + Integer lastReleasedSequenceNumber = (Integer)source.get(MessageDocumentFields.LAST_RELEASED_SEQUENCE); + Boolean completeGroup = (Boolean)source.get(MessageDocumentFields.COMPLETE); MessageDocument document = new MessageDocument(message); - if (source.containsField(GROUP_ID)){ - document.setGroupId(source.get(GROUP_ID)); + if (source.containsField(MessageDocumentFields.GROUP_ID)){ + document.setGroupId(source.get(MessageDocumentFields.GROUP_ID)); } if (groupTimestamp != null){ document.setCreatedTime(groupTimestamp); @@ -204,8 +212,24 @@ else if (source instanceof BasicDBList) { return normalizedHeaders; } + private Object extractPayload(DBObject source) { + Object payload = source.get(PAYLOAD); + if (payload instanceof DBObject) { + DBObject payloadObject = (DBObject) payload; + Object payloadType = payloadObject.get("_class"); + try { + Class payloadClass = ClassUtils.forName(payloadType.toString(), classLoader); + payload = MessageDocumentMongoConverter.this.read(payloadClass, payloadObject); + } + catch (Exception e) { + throw new IllegalStateException("failed to load class: " + payloadType, e); + } + } + return payload; + } + - private static class MessageHistoryToDBObjectConverter implements Converter { + private static class MessageHistoryToDBObjectConverter implements Converter { @Override public DBObject convert(MessageHistory source) { @@ -224,31 +248,99 @@ public DBObject convert(MessageHistory source) { } } - private class DBObjectToMessageConverter implements Converter> { + private class DBObjectToGenericMessageConverter implements Converter> { @Override - public Message convert(DBObject source) { + @SuppressWarnings("unchecked") + public GenericMessage convert(DBObject source) { + Map headers = MessageDocumentMongoConverter.this.normalizeHeaders((Map) source.get("headers")); + + GenericMessage message = new GenericMessage(MessageDocumentMongoConverter.this.extractPayload(source), headers); + Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); + // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders + innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); + innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); + + return message; + } + + } + + private class DBObjectToMutableMessageConverter implements Converter> { + + @Override + public MutableMessage convert(DBObject source) { @SuppressWarnings("unchecked") Map headers = MessageDocumentMongoConverter.this.normalizeHeaders((Map) source.get("headers")); - Object payload = source.get(PAYLOAD_KEY); - Object payloadType = source.get(PAYLOAD_TYPE_KEY); - if (payloadType != null && payload instanceof DBObject) { + return new MutableMessage(MessageDocumentMongoConverter.this.extractPayload(source), headers); + } + + } + + private class DBObjectToAdviceMessageConverter implements Converter { + + @Override + @SuppressWarnings("unchecked") + public AdviceMessage convert(DBObject source) { + Map headers = MessageDocumentMongoConverter.this.normalizeHeaders((Map) source.get("headers")); + + Message inputMessage = null; + + if (source.get("inputMessage") != null) { + DBObject inputMessageObject = (DBObject) source.get("inputMessage"); + Object inputMessageType = inputMessageObject.get("_class"); try { - Class payloadClass = ClassUtils.forName(payloadType.toString(), classLoader); - payload = MessageDocumentMongoConverter.this.read(payloadClass, (DBObject) payload); + Class messageClass = ClassUtils.forName(inputMessageType.toString(), classLoader); + inputMessage = (Message) MessageDocumentMongoConverter.this.read(messageClass, inputMessageObject); } catch (Exception e) { - throw new IllegalStateException("failed to load class: " + payloadType, e); + throw new IllegalStateException("failed to load class: " + inputMessageType, e); } } - MutableMessage message = new MutableMessage(payload); - message.getRawHeaders().putAll(headers); + AdviceMessage message = new AdviceMessage(MessageDocumentMongoConverter.this.extractPayload(source), headers, inputMessage); + Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); + // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders + innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); + innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); return message; } } + private class DBObjectToErrorMessageConverter implements Converter { + + private final Converter deserializingConverter = new DeserializingConverter(); + + @Override + @SuppressWarnings("unchecked") + public ErrorMessage convert(DBObject source) { + Map headers = MessageDocumentMongoConverter.this.normalizeHeaders((Map) source.get("headers")); + + Object payload = this.deserializingConverter.convert((byte[]) source.get(PAYLOAD)); + ErrorMessage message = new ErrorMessage((Throwable) payload, headers); + Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); + // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders + innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); + innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); + + return message; + } + + } + + @WritingConverter + private class ThrowableToBytesConverter implements Converter { + + private final Converter serializingConverter = new SerializingConverter(); + + @Override + public byte[] convert(Throwable source) { + return serializingConverter.convert(source); + } + + } + } diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageStoreTests.java index 0c722b9e6d8..f668778a11f 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageStoreTests.java @@ -18,22 +18,29 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.io.Serializable; import java.util.Properties; import java.util.UUID; +import org.hamcrest.Matchers; import org.junit.Test; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.history.MessageHistory; +import org.springframework.integration.message.AdviceMessage; +import org.springframework.integration.message.MutableMessage; import org.springframework.integration.mongodb.rules.MongoDbAvailable; import org.springframework.integration.mongodb.rules.MongoDbAvailableTests; import org.springframework.integration.store.MessageStore; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.support.ErrorMessage; +import org.springframework.messaging.support.GenericMessage; import com.mongodb.Mongo; @@ -103,13 +110,7 @@ public void testWithMessageHistory() throws Exception{ MessageStore store = getMessageStore(); Foo foo = new Foo(); foo.setName("foo"); - Message message = MessageBuilder.withPayload(foo). - setHeader("foo", foo). - setHeader("bar", new Bar("bar")). - setHeader("baz", new Baz()). - setHeader("abc", new Abc()). - setHeader("xyz", new Xyz()). - build(); + Message message = MessageBuilder.withPayload(foo).build(); DirectChannel fooChannel = new DirectChannel(); fooChannel.setBeanName("fooChannel"); DirectChannel barChannel = new DirectChannel(); @@ -120,11 +121,6 @@ public void testWithMessageHistory() throws Exception{ store.addMessage(message); message = store.getMessage(message.getHeaders().getId()); assertNotNull(message); - assertTrue(message.getHeaders().get("foo") instanceof Foo); - assertTrue(message.getHeaders().get("bar") instanceof Bar); - assertTrue(message.getHeaders().get("baz") instanceof Baz); - assertTrue(message.getHeaders().get("abc") instanceof Abc); - assertTrue(message.getHeaders().get("xyz") instanceof Xyz); MessageHistory messageHistory = MessageHistory.read(message); assertNotNull(messageHistory); assertEquals(2, messageHistory.size()); @@ -151,6 +147,108 @@ public void testInt3153SequenceDetails() throws Exception{ assertEquals(messageToStore, retrievedMessage); } + @Test + @MongoDbAvailable + public void testInt3076MessageAsPayload() throws Exception{ + MessageStore store = this.getMessageStore(); + Person p = new Person(); + p.setFname("John"); + p.setLname("Doe"); + Message messageToStore = new GenericMessage>(MessageBuilder.withPayload(p).build()); + store.addMessage(messageToStore); + Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); + assertNotNull(retrievedMessage); + assertTrue(retrievedMessage.getPayload() instanceof GenericMessage); + assertEquals(messageToStore.getPayload(), retrievedMessage.getPayload()); + assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); + assertEquals(((Message) messageToStore.getPayload()).getPayload(), p); + assertEquals(messageToStore, retrievedMessage); + } + + @Test + @MongoDbAvailable + public void testInt3076AdviceMessage() throws Exception{ + MessageStore store = this.getMessageStore(); + Person p = new Person(); + p.setFname("John"); + p.setLname("Doe"); + Message inputMessage = MessageBuilder.withPayload(p).build(); + Message messageToStore = new AdviceMessage("foo", inputMessage); + store.addMessage(messageToStore); + Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); + assertNotNull(retrievedMessage); + assertTrue(retrievedMessage instanceof AdviceMessage); + assertEquals(messageToStore.getPayload(), retrievedMessage.getPayload()); + assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); + assertEquals(inputMessage, ((AdviceMessage) retrievedMessage).getInputMessage()); + assertEquals(messageToStore, retrievedMessage); + } + + @Test + @MongoDbAvailable + public void testAdviceMessageAsPayload() throws Exception{ + MessageStore store = this.getMessageStore(); + Person p = new Person(); + p.setFname("John"); + p.setLname("Doe"); + Message inputMessage = MessageBuilder.withPayload(p).build(); + Message messageToStore = new GenericMessage>(new AdviceMessage("foo", inputMessage)); + store.addMessage(messageToStore); + Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); + assertNotNull(retrievedMessage); + assertTrue(retrievedMessage.getPayload() instanceof AdviceMessage); + AdviceMessage adviceMessage = (AdviceMessage) retrievedMessage.getPayload(); + assertEquals("foo", adviceMessage.getPayload()); + assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); + assertEquals(inputMessage, adviceMessage.getInputMessage()); + assertEquals(messageToStore, retrievedMessage); + } + + @Test + @MongoDbAvailable + public void testMutableMessageAsPayload() throws Exception{ + MessageStore store = this.getMessageStore(); + Person p = new Person(); + p.setFname("John"); + p.setLname("Doe"); + Message messageToStore = new GenericMessage>(new MutableMessage(p)); + store.addMessage(messageToStore); + Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); + assertNotNull(retrievedMessage); + assertTrue(retrievedMessage.getPayload() instanceof MutableMessage); + assertEquals(messageToStore.getPayload(), retrievedMessage.getPayload()); + assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); + assertEquals(((Message) messageToStore.getPayload()).getPayload(), p); + assertEquals(messageToStore, retrievedMessage); + } + + @Test + @MongoDbAvailable + public void testInt3076ErrorMessage() throws Exception{ + MessageStore store = this.getMessageStore(); + Person p = new Person(); + p.setFname("John"); + p.setLname("Doe"); + Message failedMessage = MessageBuilder.withPayload(p).build(); + MessagingException messagingException; + try { + throw new RuntimeException("intentional"); + } + catch (Exception e) { + messagingException = new MessagingException(failedMessage, "intentional MessagingException", e); + } + Message messageToStore = new ErrorMessage(messagingException); + store.addMessage(messageToStore); + Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); + assertNotNull(retrievedMessage); + assertTrue(retrievedMessage instanceof ErrorMessage); + assertThat(retrievedMessage.getPayload(), Matchers.instanceOf(MessagingException.class)); + assertEquals("intentional MessagingException", ((MessagingException) retrievedMessage.getPayload()).getMessage()); + assertEquals(failedMessage, ((MessagingException) retrievedMessage.getPayload()).getFailedMessage()); + assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); + } + + public static class Foo implements Serializable { /** * @@ -195,34 +293,6 @@ public String getName() { } } - public static class Abc implements Serializable { - /** - * - */ - private static final long serialVersionUID = 1L; - - private final String name = "abx"; - - private Abc(){} - - public String getName() { - return name; - } - } - - public static class Xyz implements Serializable { - /** - * - */ - private static final long serialVersionUID = 1L; - - @SuppressWarnings("unused") - private final String name = "xyz"; - - private Xyz(){} - } - - public static class Person implements Serializable{ /** diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStoreTests.java index 54a8572547f..bcefb9b2c37 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStoreTests.java @@ -16,26 +16,11 @@ package org.springframework.integration.mongodb.store; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import org.hamcrest.Matchers; -import org.junit.Test; - import org.springframework.context.support.GenericApplicationContext; import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; -import org.springframework.integration.message.AdviceMessage; -import org.springframework.integration.mongodb.rules.MongoDbAvailable; import org.springframework.integration.store.MessageStore; -import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.test.util.TestUtils; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessagingException; -import org.springframework.messaging.support.ErrorMessage; -import org.springframework.messaging.support.GenericMessage; import com.mongodb.Mongo; @@ -56,68 +41,4 @@ protected MessageStore getMessageStore() throws Exception { return mongoDbMessageStore; } - - @Test - @MongoDbAvailable - public void testInt3076MessageAsPayload() throws Exception{ - MessageStore store = this.getMessageStore(); - Person p = new Person(); - p.setFname("John"); - p.setLname("Doe"); - Message messageToStore = new GenericMessage>(MessageBuilder.withPayload(p).build()); - store.addMessage(messageToStore); - Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); - assertNotNull(retrievedMessage); - assertTrue(retrievedMessage.getPayload() instanceof GenericMessage); - assertEquals(messageToStore.getPayload(), retrievedMessage.getPayload()); - assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); - assertEquals(((Message) messageToStore.getPayload()).getPayload(), p); - assertEquals(messageToStore, retrievedMessage); - } - - @Test - @MongoDbAvailable - public void testInt3076AdviceMessage() throws Exception{ - MessageStore store = this.getMessageStore(); - Person p = new Person(); - p.setFname("John"); - p.setLname("Doe"); - Message inputMessage = MessageBuilder.withPayload(p).build(); - Message messageToStore = new AdviceMessage("foo", inputMessage); - store.addMessage(messageToStore); - Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); - assertNotNull(retrievedMessage); - assertTrue(retrievedMessage instanceof AdviceMessage); - assertEquals(messageToStore.getPayload(), retrievedMessage.getPayload()); - assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); - assertEquals(inputMessage, ((AdviceMessage) retrievedMessage).getInputMessage()); - assertEquals(messageToStore, retrievedMessage); - } - - @Test - @MongoDbAvailable - public void testInt3076ErrorMessage() throws Exception{ - MessageStore store = this.getMessageStore(); - Person p = new Person(); - p.setFname("John"); - p.setLname("Doe"); - Message failedMessage = MessageBuilder.withPayload(p).build(); - MessagingException messagingException; - try { - throw new RuntimeException("intentional"); - } - catch (Exception e) { - messagingException = new MessagingException(failedMessage, "intentional MessagingException", e); - } - Message messageToStore = new ErrorMessage(messagingException); - store.addMessage(messageToStore); - Message retrievedMessage = store.getMessage(messageToStore.getHeaders().getId()); - assertNotNull(retrievedMessage); - assertTrue(retrievedMessage instanceof ErrorMessage); - assertThat(retrievedMessage.getPayload(), Matchers.instanceOf(MessagingException.class)); - assertEquals("intentional MessagingException", ((MessagingException) retrievedMessage.getPayload()).getMessage()); - assertEquals(failedMessage, ((MessagingException) retrievedMessage.getPayload()).getFailedMessage()); - assertEquals(messageToStore.getHeaders(), retrievedMessage.getHeaders()); - } - } diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests.java index d314a461009..cc1282d52c6 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import org.hamcrest.Matchers; +import org.junit.Rule; import org.junit.Test; import org.springframework.context.support.AbstractApplicationContext; @@ -49,7 +50,7 @@ public class DelayerHandlerRescheduleIntegrationTests extends MongoDbAvailableTe public static final String DELAYER_ID = "delayerWithMongoMS"; -// @Rule + @Rule public LongRunningIntegrationTest longTests = new LongRunningIntegrationTest(); @Test @@ -107,7 +108,7 @@ private void testDelayerHandlerRescheduleWithMongoDbMessageStore(String config) Message original1 = (Message) ((DelayHandler.DelayedMessageWrapper) payload).getOriginal(); messageInStore = iterator.next(); Message original2 = (Message) ((DelayHandler.DelayedMessageWrapper) messageInStore.getPayload()).getOriginal(); - assertThat(message1.getPayload(), Matchers.anyOf(Matchers.is(original1.getPayload()), Matchers.is(original2.getPayload()))); + assertThat(message1, Matchers.anyOf(Matchers.is(original1), Matchers.is(original2))); context.refresh(); diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreTests.java new file mode 100644 index 00000000000..45dfb942b24 --- /dev/null +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreTests.java @@ -0,0 +1,42 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mongodb.store; + +import org.springframework.data.mongodb.MongoDbFactory; +import org.springframework.data.mongodb.core.SimpleMongoDbFactory; +import org.springframework.integration.mongodb.store.support.MessageDocumentMongoConverter; +import org.springframework.integration.store.MessageStore; + +import com.mongodb.Mongo; + +/** + * @author Artem Bilan + * @since 4.0 + */ +public class MongoDbMessageStoreTests extends AbstractMongoDbMessageStoreTests { + + @Override + protected MessageStore getMessageStore() throws Exception { + MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test"); + MessageDocumentMongoConverter converter = new MessageDocumentMongoConverter(mongoDbFactory); + converter.afterPropertiesSet(); + ConfigurableMongoDbMessageStore messageStore = new ConfigurableMongoDbMessageStore(mongoDbFactory, converter, "messages"); + messageStore.afterPropertiesSet(); + return messageStore; + } + +} From 4b4cb8fba5f4febbaaf78d3a5c15043ababdd26a Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 28 Mar 2014 08:28:45 +0200 Subject: [PATCH 4/6] INT-3337: Add converters for `Message` impls --- .../mongodb/store/MongoDbMessageStore.java | 169 ++++++--- .../store/support/MessageDocument.java | 97 ----- .../store/support/MessageDocumentFields.java | 37 -- .../MessageDocumentMongoConverter.java | 346 ------------------ .../mongodb/store/support/package-info.java | 4 - ...AbstractMongoDbMessageGroupStoreTests.java | 6 +- .../AbstractMongoDbMessageStoreTests.java | 41 ++- ...dlerRescheduleIntegrationTests-context.xml | 8 +- .../store/MongoDbMessageGroupStoreTests.java | 11 +- ...essageStoreClaimCheckIntegrationTests.java | 13 +- .../store/MongoDbMessageStoreTests.java | 17 +- .../mongodb/store/mongo-aggregator-config.xml | 8 +- src/reference/docbook/mongodb.xml | 15 - src/reference/docbook/whats-new.xml | 9 - 14 files changed, 184 insertions(+), 597 deletions(-) delete mode 100644 spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocument.java delete mode 100644 spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentFields.java delete mode 100644 spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentMongoConverter.java delete mode 100644 spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/package-info.java diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java index ff6da4cfd32..53104035917 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java @@ -32,7 +32,10 @@ import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.core.convert.converter.Converter; +import org.springframework.core.serializer.support.DeserializingConverter; +import org.springframework.core.serializer.support.SerializingConverter; import org.springframework.data.annotation.Transient; +import org.springframework.data.convert.WritingConverter; import org.springframework.data.domain.Sort; import org.springframework.data.domain.Sort.Direction; import org.springframework.data.mapping.context.MappingContext; @@ -46,6 +49,8 @@ import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import org.springframework.integration.history.MessageHistory; +import org.springframework.integration.message.AdviceMessage; +import org.springframework.integration.message.MutableMessage; import org.springframework.integration.store.AbstractMessageGroupStore; import org.springframework.integration.store.MessageGroup; import org.springframework.integration.store.MessageGroupStore; @@ -54,6 +59,7 @@ import org.springframework.jmx.export.annotation.ManagedAttribute; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.ErrorMessage; import org.springframework.messaging.support.GenericMessage; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -68,10 +74,6 @@ * An implementation of both the {@link MessageStore} and {@link MessageGroupStore} * strategies that relies upon MongoDB for persistence. * - * @deprecated in favor of {@link org.springframework.integration.mongodb.store.ConfigurableMongoDbMessageStore} - * together with {@code org.springframework.integration.mongodb.store.support.MessageDocumentMongoConverter}. - * Will be removed in future releases. - * * @author Mark Fisher * @author Oleg Zhurakousky * @author Sean Brandt @@ -80,7 +82,6 @@ * @author Artem Bilan * @since 2.1 */ -@Deprecated public class MongoDbMessageStore extends AbstractMessageGroupStore implements MessageStore, BeanClassLoaderAware { private final static String DEFAULT_COLLECTION_NAME = "messages"; @@ -95,13 +96,13 @@ public class MongoDbMessageStore extends AbstractMessageGroupStore implements Me private final static String GROUP_UPDATE_TIMESTAMP_KEY = "_group_update_timestamp"; - private final static String PAYLOAD_TYPE_KEY = "_payloadType"; - private final static String CREATED_DATE = "_createdDate"; private final MongoTemplate template; + private final MessageReadingMongoConverter converter; + private final String collectionName; private volatile ClassLoader classLoader = ClassUtils.getDefaultClassLoader(); @@ -124,9 +125,9 @@ public MongoDbMessageStore(MongoDbFactory mongoDbFactory) { */ public MongoDbMessageStore(MongoDbFactory mongoDbFactory, String collectionName) { Assert.notNull(mongoDbFactory, "mongoDbFactory must not be null"); - MessageReadingMongoConverter converter = new MessageReadingMongoConverter(mongoDbFactory, new MongoMappingContext()); - converter.afterPropertiesSet(); - this.template = new MongoTemplate(mongoDbFactory, converter); + this.converter = new MessageReadingMongoConverter(mongoDbFactory, new MongoMappingContext()); + this.converter.afterPropertiesSet(); + this.template = new MongoTemplate(mongoDbFactory, this.converter); this.collectionName = (StringUtils.hasText(collectionName)) ? collectionName : DEFAULT_COLLECTION_NAME; } @@ -340,6 +341,10 @@ public void afterPropertiesSet() { customConverters.add(new DBObjectToUUIDConverter()); customConverters.add(new MessageHistoryToDBObjectConverter()); customConverters.add(new DBObjectToGenericMessageConverter()); + customConverters.add(new DBObjectToMutableMessageConverter()); + customConverters.add(new DBObjectToErrorMessageConverter()); + customConverters.add(new DBObjectToAdviceMessageConverter()); + customConverters.add(new ThrowableToBytesConverter()); this.setCustomConversions(new CustomConversions(customConverters)); super.afterPropertiesSet(); } @@ -360,24 +365,18 @@ public S read(Class clazz, DBObject source) { return super.read(clazz, source); } if (source != null) { - Map headers = this.normalizeHeaders((Map) source.get("headers")); - - Object payload = source.get("payload"); - Object payloadType = source.get(PAYLOAD_TYPE_KEY); - if (payloadType != null && payload instanceof DBObject) { - try { - Class payloadClass = ClassUtils.forName(payloadType.toString(), classLoader); - payload = this.read(payloadClass, (DBObject) payload); - } - catch (Exception e) { - throw new IllegalStateException("failed to load class: " + payloadType, e); - } + Message message = null; + Object messageType = source.get("_messageType"); + if (messageType == null) { + messageType = GenericMessage.class.getName(); + } + try { + message = (Message) this.read(ClassUtils.forName(messageType.toString(), classLoader), source); } - GenericMessage message = new GenericMessage(payload, headers); - Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); - // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders - innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); - innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); + catch (ClassNotFoundException e) { + throw new IllegalStateException("failed to load class: " + messageType, e); + } + Long groupTimestamp = (Long)source.get(GROUP_TIMESTAMP_KEY); Long lastModified = (Long)source.get(GROUP_UPDATE_TIMESTAMP_KEY); Integer lastReleasedSequenceNumber = (Integer)source.get(LAST_RELEASED_SEQUENCE_NUMBER); @@ -437,6 +436,23 @@ else if (source instanceof BasicDBList) { } return normalizedHeaders; } + + private Object extractPayload(DBObject source) { + Object payload = source.get("payload"); + if (payload instanceof DBObject) { + DBObject payloadObject = (DBObject) payload; + Object payloadType = payloadObject.get("_class"); + try { + Class payloadClass = ClassUtils.forName(payloadType.toString(), classLoader); + payload = this.read(payloadClass, payloadObject); + } + catch (Exception e) { + throw new IllegalStateException("failed to load class: " + payloadType, e); + } + } + return payload; + } + } private static class UuidToDBObjectConverter implements Converter { @@ -481,24 +497,74 @@ private class DBObjectToGenericMessageConverter implements Converter convert(DBObject source) { - MessageReadingMongoConverter converter = (MessageReadingMongoConverter) MongoDbMessageStore.this.template - .getConverter(); - Map headers = converter.normalizeHeaders((Map) source.get("headers")); + Map headers = MongoDbMessageStore.this.converter.normalizeHeaders((Map) source.get("headers")); - Object payload = source.get("payload"); - Object payloadType = source.get(PAYLOAD_TYPE_KEY); - if (payloadType != null && payload instanceof DBObject) { + GenericMessage message = new GenericMessage(MongoDbMessageStore.this.converter.extractPayload(source), headers); + Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); + // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders + innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); + innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); + + return message; + } + + } + + private class DBObjectToMutableMessageConverter implements Converter> { + + @Override + public MutableMessage convert(DBObject source) { + @SuppressWarnings("unchecked") + Map headers = MongoDbMessageStore.this.converter.normalizeHeaders((Map) source.get("headers")); + + return new MutableMessage(MongoDbMessageStore.this.converter.extractPayload(source), headers); + } + + } + + private class DBObjectToAdviceMessageConverter implements Converter { + + @Override + @SuppressWarnings("unchecked") + public AdviceMessage convert(DBObject source) { + Map headers = MongoDbMessageStore.this.converter.normalizeHeaders((Map) source.get("headers")); + + Message inputMessage = null; + + if (source.get("inputMessage") != null) { + DBObject inputMessageObject = (DBObject) source.get("inputMessage"); + Object inputMessageType = inputMessageObject.get("_class"); try { - Class payloadClass = ClassUtils.forName(payloadType.toString(), classLoader); - payload = converter.read(payloadClass, (DBObject) payload); + Class messageClass = ClassUtils.forName(inputMessageType.toString(), classLoader); + inputMessage = (Message) MongoDbMessageStore.this.converter.read(messageClass, inputMessageObject); } catch (Exception e) { - throw new IllegalStateException("failed to load class: " + payloadType, e); + throw new IllegalStateException("failed to load class: " + inputMessageType, e); } } - @SuppressWarnings("rawtypes") - GenericMessage message = new GenericMessage(payload, headers); + AdviceMessage message = new AdviceMessage(MongoDbMessageStore.this.converter.extractPayload(source), headers, inputMessage); + Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); + // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders + innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); + innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); + + return message; + } + + } + + private class DBObjectToErrorMessageConverter implements Converter { + + private final Converter deserializingConverter = new DeserializingConverter(); + + @Override + @SuppressWarnings("unchecked") + public ErrorMessage convert(DBObject source) { + Map headers = MongoDbMessageStore.this.converter.normalizeHeaders((Map) source.get("headers")); + + Object payload = this.deserializingConverter.convert((byte[]) source.get("payload")); + ErrorMessage message = new ErrorMessage((Throwable) payload, headers); Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); @@ -509,6 +575,19 @@ public GenericMessage convert(DBObject source) { } + @WritingConverter + private class ThrowableToBytesConverter implements Converter { + + private final Converter serializingConverter = new SerializingConverter(); + + @Override + public byte[] convert(Throwable source) { + return serializingConverter.convert(source); + } + + } + + /** * Wrapper class used for storing Messages in MongoDB along with their "group" metadata. */ @@ -519,13 +598,15 @@ private static final class MessageWrapper { @Transient private final Message message; + @SuppressWarnings("unused") + private final String _messageType; + private final Object payload; @SuppressWarnings("unused") private final Map headers; - @SuppressWarnings("unused") - private final String _payloadType; + private final Message inputMessage; private volatile long _group_timestamp; @@ -538,9 +619,15 @@ private static final class MessageWrapper { public MessageWrapper(Message message) { Assert.notNull(message, "'message' must not be null"); this.message = message; + this._messageType = message.getClass().getName(); this.payload = message.getPayload(); this.headers = message.getHeaders(); - this._payloadType = this.payload.getClass().getName(); + if (message instanceof AdviceMessage) { + this.inputMessage = ((AdviceMessage) message).getInputMessage(); + } + else { + this.inputMessage = null; + } } public int get_LastReleasedSequenceNumber() { diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocument.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocument.java deleted file mode 100644 index 7a57cdce585..00000000000 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocument.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright 2014 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.mongodb.store.support; - -import java.util.UUID; - -import org.springframework.messaging.Message; -import org.springframework.util.Assert; - -/** - * The entity class to wrap {@link org.springframework.messaging.Message} to the MongoDB document. - * - * @author Artem Bilan - * @since 4.0 - */ -public class MessageDocument { - - private final Message message; - - @SuppressWarnings("unused") - private final UUID messageId; - - private volatile Long createdTime = 0L; - - private volatile Object groupId; - - private volatile Long lastModifiedTime = 0L; - - private volatile Boolean complete = false; - - private volatile Integer lastReleasedSequence = 0; - - public MessageDocument(Message message) { - Assert.notNull(message, "'message' must not be null"); - this.message = message; - this.messageId = message.getHeaders().getId(); - } - - public Message getMessage() { - return message; - } - - public void setGroupId(Object groupId) { - this.groupId = groupId; - } - - public Object getGroupId() { - return groupId; - } - - public Long getLastModifiedTime() { - return lastModifiedTime; - } - - public void setLastModifiedTime(long lastModifiedTime) { - this.lastModifiedTime = lastModifiedTime; - } - - public Long getCreatedTime() { - return createdTime; - } - - public void setCreatedTime(long createdTime) { - this.createdTime = createdTime; - } - - public Boolean isComplete() { - return complete; - } - - public void setComplete(boolean complete) { - this.complete = complete; - } - - public Integer getLastReleasedSequence() { - return lastReleasedSequence; - } - - public void setLastReleasedSequence(int lastReleasedSequence) { - this.lastReleasedSequence = lastReleasedSequence; - } - -} diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentFields.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentFields.java deleted file mode 100644 index 854c710e1bb..00000000000 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentFields.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2014 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.mongodb.store.support; - -/** - * @author Artem Bilan - * @since 4.0 - */ -public final class MessageDocumentFields { - - public static final String MESSAGE_ID = "messageId"; - - public static final String GROUP_ID = "groupId"; - - public final static String CREATED_TIME = "createdTime"; - - public static final String LAST_MODIFIED_TIME = "lastModifiedTime"; - - public static final String LAST_RELEASED_SEQUENCE = "lastReleasedSequence"; - - public static final String COMPLETE = "complete"; - -} diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentMongoConverter.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentMongoConverter.java deleted file mode 100644 index 8eac6e7e38a..00000000000 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/MessageDocumentMongoConverter.java +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Copyright 2014 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.mongodb.store.support; - -import static org.springframework.integration.history.MessageHistory.NAME_PROPERTY; -import static org.springframework.integration.history.MessageHistory.TIMESTAMP_PROPERTY; -import static org.springframework.integration.history.MessageHistory.TYPE_PROPERTY; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.beans.DirectFieldAccessor; -import org.springframework.beans.factory.BeanClassLoaderAware; -import org.springframework.core.convert.converter.Converter; -import org.springframework.core.serializer.support.DeserializingConverter; -import org.springframework.core.serializer.support.SerializingConverter; -import org.springframework.data.convert.WritingConverter; -import org.springframework.data.mongodb.MongoDbFactory; -import org.springframework.data.mongodb.core.convert.CustomConversions; -import org.springframework.data.mongodb.core.convert.MappingMongoConverter; -import org.springframework.data.mongodb.core.mapping.MongoMappingContext; -import org.springframework.integration.history.MessageHistory; -import org.springframework.integration.message.AdviceMessage; -import org.springframework.integration.message.MutableMessage; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.support.ErrorMessage; -import org.springframework.messaging.support.GenericMessage; -import org.springframework.util.Assert; -import org.springframework.util.ClassUtils; - -import com.mongodb.BasicDBList; -import com.mongodb.BasicDBObject; -import com.mongodb.DBObject; - -/** - * The custom {@link MappingMongoConverter} to decompose {@link Message} as a MongoDB field set. - * - * @author Mark Fisher - * @author Oleg Zhurakousky - * @author Artem Bilan - * @since 4.0 - */ -public class MessageDocumentMongoConverter extends MappingMongoConverter implements BeanClassLoaderAware { - - private static final Log logger = LogFactory.getLog(MessageDocumentMongoConverter.class); - - private final static String PAYLOAD_TYPE = "_payloadType"; - - private final static String MESSAGE_TYPE = "_messageType"; - - private final static String HEADERS = "headers"; - - private final static String PAYLOAD = "payload"; - - private ClassLoader classLoader; - - public MessageDocumentMongoConverter(MongoDbFactory mongoDbFactory) { - super(mongoDbFactory, new MongoMappingContext()); - } - - @Override - public void setBeanClassLoader(ClassLoader classLoader) { - this.classLoader = classLoader; - } - - @Override - public void afterPropertiesSet() { - List customConverters = new ArrayList(); - customConverters.add(new MessageHistoryToDBObjectConverter()); - customConverters.add(new DBObjectToGenericMessageConverter()); - customConverters.add(new DBObjectToMutableMessageConverter()); - customConverters.add(new DBObjectToErrorMessageConverter()); - customConverters.add(new DBObjectToAdviceMessageConverter()); - customConverters.add(new ThrowableToBytesConverter()); - this.setCustomConversions(new CustomConversions(customConverters)); - super.afterPropertiesSet(); - } - - @Override - public void write(Object source, DBObject target) { - Assert.isInstanceOf(MessageDocument.class, source); - - MessageDocument document = (MessageDocument) source; - Message message = document.getMessage(); - - target.put(MessageDocumentFields.CREATED_TIME, document.getCreatedTime()); - target.put(MessageDocumentFields.GROUP_ID, document.getGroupId()); - target.put(MessageDocumentFields.LAST_MODIFIED_TIME, document.getLastModifiedTime()); - target.put(MessageDocumentFields.LAST_RELEASED_SEQUENCE, document.getLastReleasedSequence()); - target.put(MessageDocumentFields.COMPLETE, document.isComplete()); - target.put(MessageDocumentFields.MESSAGE_ID, message.getHeaders().getId()); - - target.put(HEADERS, this.convertToMongoType(message.getHeaders())); - - target.put(MESSAGE_TYPE, message.getClass().getName()); - target.put(PAYLOAD_TYPE, message.getPayload().getClass().getName()); - Object payload = message.getPayload(); - if (!this.conversions.isSimpleType(payload.getClass())) { - DBObject dbo = new BasicDBObject(); - super.write(payload, dbo); - payload = dbo; - } - else if (payload instanceof Throwable) { - payload = this.convertToMongoType(payload); - } - - target.put(PAYLOAD, payload); - - if (message instanceof AdviceMessage) { - DBObject dbo = new BasicDBObject(); - super.write(((AdviceMessage) message).getInputMessage(), dbo); - target.put("inputMessage", dbo); - } - target.put("_class", MessageDocument.class.getName()); - } - - - @Override - @SuppressWarnings("unchecked") - public S read(Class clazz, DBObject source) { - if (!MessageDocument.class.equals(clazz)) { - return super.read(clazz, source); - } - if (source != null) { - Message message = null; - Object messageType = source.get(MESSAGE_TYPE); - try { - message = (Message) this.read(ClassUtils.forName(messageType.toString(), classLoader), source); - } - catch (ClassNotFoundException e) { - throw new IllegalStateException("failed to load class: " + messageType, e); - } - - Long groupTimestamp = (Long)source.get(MessageDocumentFields.CREATED_TIME); - Long lastModified = (Long)source.get(MessageDocumentFields.LAST_MODIFIED_TIME); - Integer lastReleasedSequenceNumber = (Integer)source.get(MessageDocumentFields.LAST_RELEASED_SEQUENCE); - Boolean completeGroup = (Boolean)source.get(MessageDocumentFields.COMPLETE); - - MessageDocument document = new MessageDocument(message); - - if (source.containsField(MessageDocumentFields.GROUP_ID)){ - document.setGroupId(source.get(MessageDocumentFields.GROUP_ID)); - } - if (groupTimestamp != null){ - document.setCreatedTime(groupTimestamp); - } - if (lastModified != null){ - document.setLastModifiedTime(lastModified); - } - if (lastReleasedSequenceNumber != null){ - document.setLastReleasedSequence(lastReleasedSequenceNumber); - } - - if (completeGroup != null){ - document.setComplete(completeGroup); - } - - return (S) document; - } - return null; - } - - private Map normalizeHeaders(Map headers) { - Map normalizedHeaders = new HashMap(); - for (String headerName : headers.keySet()) { - Object headerValue = headers.get(headerName); - if (headerValue instanceof DBObject) { - DBObject source = (DBObject) headerValue; - try { - Class typeClass = null; - if (source.containsField("_class")) { - Object type = source.get("_class"); - typeClass = ClassUtils.forName(type.toString(), classLoader); - } - else if (source instanceof BasicDBList) { - typeClass = List.class; - } - else { - throw new IllegalStateException("Unsupported 'DBObject' type: " + source.getClass()); - } - normalizedHeaders.put(headerName, super.read(typeClass, source)); - } - catch (Exception e) { - logger.warn("Header '" + headerName + "' could not be deserialized.", e); - } - } - else { - normalizedHeaders.put(headerName, headerValue); - } - } - return normalizedHeaders; - } - - private Object extractPayload(DBObject source) { - Object payload = source.get(PAYLOAD); - if (payload instanceof DBObject) { - DBObject payloadObject = (DBObject) payload; - Object payloadType = payloadObject.get("_class"); - try { - Class payloadClass = ClassUtils.forName(payloadType.toString(), classLoader); - payload = MessageDocumentMongoConverter.this.read(payloadClass, payloadObject); - } - catch (Exception e) { - throw new IllegalStateException("failed to load class: " + payloadType, e); - } - } - return payload; - } - - - private static class MessageHistoryToDBObjectConverter implements Converter { - - @Override - public DBObject convert(MessageHistory source) { - BasicDBObject obj = new BasicDBObject(); - obj.put("_class", MessageHistory.class.getName()); - BasicDBList dbList = new BasicDBList(); - for (Properties properties : source) { - BasicDBObject dbo = new BasicDBObject(); - dbo.put(NAME_PROPERTY, properties.getProperty(NAME_PROPERTY)); - dbo.put(TYPE_PROPERTY, properties.getProperty(TYPE_PROPERTY)); - dbo.put(TIMESTAMP_PROPERTY, properties.getProperty(TIMESTAMP_PROPERTY)); - dbList.add(dbo); - } - obj.put("components", dbList); - return obj; - } - } - - private class DBObjectToGenericMessageConverter implements Converter> { - - @Override - @SuppressWarnings("unchecked") - public GenericMessage convert(DBObject source) { - Map headers = MessageDocumentMongoConverter.this.normalizeHeaders((Map) source.get("headers")); - - GenericMessage message = new GenericMessage(MessageDocumentMongoConverter.this.extractPayload(source), headers); - Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); - // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders - innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); - innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); - - return message; - } - - } - - private class DBObjectToMutableMessageConverter implements Converter> { - - @Override - public MutableMessage convert(DBObject source) { - @SuppressWarnings("unchecked") - Map headers = MessageDocumentMongoConverter.this.normalizeHeaders((Map) source.get("headers")); - - return new MutableMessage(MessageDocumentMongoConverter.this.extractPayload(source), headers); - } - - } - - private class DBObjectToAdviceMessageConverter implements Converter { - - @Override - @SuppressWarnings("unchecked") - public AdviceMessage convert(DBObject source) { - Map headers = MessageDocumentMongoConverter.this.normalizeHeaders((Map) source.get("headers")); - - Message inputMessage = null; - - if (source.get("inputMessage") != null) { - DBObject inputMessageObject = (DBObject) source.get("inputMessage"); - Object inputMessageType = inputMessageObject.get("_class"); - try { - Class messageClass = ClassUtils.forName(inputMessageType.toString(), classLoader); - inputMessage = (Message) MessageDocumentMongoConverter.this.read(messageClass, inputMessageObject); - } - catch (Exception e) { - throw new IllegalStateException("failed to load class: " + inputMessageType, e); - } - } - - AdviceMessage message = new AdviceMessage(MessageDocumentMongoConverter.this.extractPayload(source), headers, inputMessage); - Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); - // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders - innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); - innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); - - return message; - } - - } - - private class DBObjectToErrorMessageConverter implements Converter { - - private final Converter deserializingConverter = new DeserializingConverter(); - - @Override - @SuppressWarnings("unchecked") - public ErrorMessage convert(DBObject source) { - Map headers = MessageDocumentMongoConverter.this.normalizeHeaders((Map) source.get("headers")); - - Object payload = this.deserializingConverter.convert((byte[]) source.get(PAYLOAD)); - ErrorMessage message = new ErrorMessage((Throwable) payload, headers); - Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); - // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders - innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); - innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); - - return message; - } - - } - - @WritingConverter - private class ThrowableToBytesConverter implements Converter { - - private final Converter serializingConverter = new SerializingConverter(); - - @Override - public byte[] convert(Throwable source) { - return serializingConverter.convert(source); - } - - } - -} diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/package-info.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/package-info.java deleted file mode 100644 index 60dc8b71c03..00000000000 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/support/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * Provides support classes related to the MongoDB message store. - */ -package org.springframework.integration.mongodb.store.support; diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java index a7864f83f75..821714313bd 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java @@ -113,7 +113,7 @@ public void testMessageGroupWithAddedMessageUUIDGroupIdAndUUIDHeader() throws Ex @Test @MongoDbAvailable - public void testCountMessagesInGroup() throws Exception { + public void testCountMessagesInGroup() throws Exception{ this.cleanupCollections(new SimpleMongoDbFactory(new Mongo(), "test")); MessageGroupStore store = this.getMessageGroupStore(); Message messageA = new GenericMessage("A"); @@ -125,7 +125,7 @@ public void testCountMessagesInGroup() throws Exception { @Test @MongoDbAvailable - public void testPollMessages() throws Exception { + public void testPollMessages() throws Exception{ this.cleanupCollections(new SimpleMongoDbFactory(new Mongo(), "test")); MessageGroupStore store = this.getMessageGroupStore(); Message messageA = new GenericMessage("A"); @@ -145,7 +145,7 @@ public void testPollMessages() throws Exception { @Test @MongoDbAvailable - public void testSameMessageMultipleGroupsPoll() throws Exception { + public void testSameMessageMultipleGroupsPoll() throws Exception{ this.cleanupCollections(new SimpleMongoDbFactory(new Mongo(), "test")); MessageGroupStore store = this.getMessageGroupStore(); Message messageA = new GenericMessage("A"); diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageStoreTests.java index f668778a11f..9a56ef68883 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageStoreTests.java @@ -110,7 +110,13 @@ public void testWithMessageHistory() throws Exception{ MessageStore store = getMessageStore(); Foo foo = new Foo(); foo.setName("foo"); - Message message = MessageBuilder.withPayload(foo).build(); + Message message = MessageBuilder.withPayload(foo). + setHeader("foo", foo). + setHeader("bar", new Bar("bar")). + setHeader("baz", new Baz()). + setHeader("abc", new Abc()). + setHeader("xyz", new Xyz()). + build(); DirectChannel fooChannel = new DirectChannel(); fooChannel.setBeanName("fooChannel"); DirectChannel barChannel = new DirectChannel(); @@ -121,6 +127,11 @@ public void testWithMessageHistory() throws Exception{ store.addMessage(message); message = store.getMessage(message.getHeaders().getId()); assertNotNull(message); + assertTrue(message.getHeaders().get("foo") instanceof Foo); + assertTrue(message.getHeaders().get("bar") instanceof Bar); + assertTrue(message.getHeaders().get("baz") instanceof Baz); + assertTrue(message.getHeaders().get("abc") instanceof Abc); + assertTrue(message.getHeaders().get("xyz") instanceof Xyz); MessageHistory messageHistory = MessageHistory.read(message); assertNotNull(messageHistory); assertEquals(2, messageHistory.size()); @@ -293,6 +304,34 @@ public String getName() { } } + public static class Abc implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + + private final String name = "abx"; + + private Abc(){} + + public String getName() { + return name; + } + } + + public static class Xyz implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + + @SuppressWarnings("unused") + private final String name = "xyz"; + + private Xyz(){} + } + + public static class Person implements Serializable{ /** diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests-context.xml b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests-context.xml index 5c63b8cfce8..a6ce73f3ad0 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests-context.xml +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/DelayerHandlerRescheduleIntegrationTests-context.xml @@ -12,10 +12,8 @@ - + - - @@ -28,8 +26,4 @@ default-delay="10000" message-store="messageStore"/> - - - - diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java index a61ff5621be..110ae46663f 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java @@ -17,10 +17,8 @@ import org.junit.Test; -import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; import org.springframework.integration.mongodb.rules.MongoDbAvailable; -import org.springframework.integration.mongodb.store.support.MessageDocumentMongoConverter; import org.springframework.integration.store.MessageStore; import com.mongodb.Mongo; @@ -34,13 +32,8 @@ public class MongoDbMessageGroupStoreTests extends AbstractMongoDbMessageGroupStoreTests { @Override - protected ConfigurableMongoDbMessageStore getMessageGroupStore() throws Exception { - MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test"); - MessageDocumentMongoConverter converter = new MessageDocumentMongoConverter(mongoDbFactory); - converter.afterPropertiesSet(); - ConfigurableMongoDbMessageStore messageStore = new ConfigurableMongoDbMessageStore(mongoDbFactory, converter, "messages"); - messageStore.afterPropertiesSet(); - return messageStore; + protected MongoDbMessageStore getMessageGroupStore() throws Exception { + return new MongoDbMessageStore( new SimpleMongoDbFactory(new Mongo(), "test")); } @Override diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java index 5d497357544..7a31826582b 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreClaimCheckIntegrationTests.java @@ -26,7 +26,6 @@ import org.springframework.data.mongodb.core.SimpleMongoDbFactory; import org.springframework.integration.mongodb.rules.MongoDbAvailable; import org.springframework.integration.mongodb.rules.MongoDbAvailableTests; -import org.springframework.integration.mongodb.store.support.MessageDocumentMongoConverter; import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.transformer.ClaimCheckInTransformer; import org.springframework.integration.transformer.ClaimCheckOutTransformer; @@ -44,10 +43,7 @@ public class MongoDbMessageStoreClaimCheckIntegrationTests extends MongoDbAvaila @MongoDbAvailable public void stringPayload() throws Exception { MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test"); - MessageDocumentMongoConverter converter = new MessageDocumentMongoConverter(mongoDbFactory); - converter.afterPropertiesSet(); - ConfigurableMongoDbMessageStore messageStore = new ConfigurableMongoDbMessageStore(mongoDbFactory, converter, "messages"); - messageStore.afterPropertiesSet(); + MongoDbMessageStore messageStore = new MongoDbMessageStore(mongoDbFactory); ClaimCheckInTransformer checkin = new ClaimCheckInTransformer(messageStore); ClaimCheckOutTransformer checkout = new ClaimCheckOutTransformer(messageStore); Message originalMessage = MessageBuilder.withPayload("test1").build(); @@ -56,16 +52,14 @@ public void stringPayload() throws Exception { Message checkedOutMessage = checkout.transform(claimCheckMessage); assertEquals(claimCheckMessage.getPayload(), checkedOutMessage.getHeaders().getId()); assertEquals(originalMessage.getPayload(), checkedOutMessage.getPayload()); + assertEquals(originalMessage, checkedOutMessage); } @Test @MongoDbAvailable public void objectPayload() throws Exception { MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test"); - MessageDocumentMongoConverter converter = new MessageDocumentMongoConverter(mongoDbFactory); - converter.afterPropertiesSet(); - ConfigurableMongoDbMessageStore messageStore = new ConfigurableMongoDbMessageStore(mongoDbFactory, converter, "messages"); - messageStore.afterPropertiesSet(); + MongoDbMessageStore messageStore = new MongoDbMessageStore(mongoDbFactory); ClaimCheckInTransformer checkin = new ClaimCheckInTransformer(messageStore); ClaimCheckOutTransformer checkout = new ClaimCheckOutTransformer(messageStore); Beverage payload = new Beverage(); @@ -78,6 +72,7 @@ public void objectPayload() throws Exception { Message checkedOutMessage = checkout.transform(claimCheckMessage); assertEquals(originalMessage.getPayload(), checkedOutMessage.getPayload()); assertEquals(claimCheckMessage.getPayload(), checkedOutMessage.getHeaders().getId()); + assertEquals(originalMessage, checkedOutMessage); } @Test diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreTests.java index 45dfb942b24..12641284110 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageStoreTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,30 +13,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.integration.mongodb.store; -import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; -import org.springframework.integration.mongodb.store.support.MessageDocumentMongoConverter; import org.springframework.integration.store.MessageStore; import com.mongodb.Mongo; - /** + * @author Mark Fisher + * @author Oleg Zhurakousky * @author Artem Bilan - * @since 4.0 + * */ public class MongoDbMessageStoreTests extends AbstractMongoDbMessageStoreTests { @Override protected MessageStore getMessageStore() throws Exception { - MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test"); - MessageDocumentMongoConverter converter = new MessageDocumentMongoConverter(mongoDbFactory); - converter.afterPropertiesSet(); - ConfigurableMongoDbMessageStore messageStore = new ConfigurableMongoDbMessageStore(mongoDbFactory, converter, "messages"); - messageStore.afterPropertiesSet(); - return messageStore; + return new MongoDbMessageStore(new SimpleMongoDbFactory(new Mongo(), "test")); } } diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml index dfea5c79f48..dece0dbcbf2 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-config.xml @@ -11,10 +11,8 @@ - + - - @@ -24,8 +22,4 @@ - - - - diff --git a/src/reference/docbook/mongodb.xml b/src/reference/docbook/mongodb.xml index 8b8ceb8f5e4..d08bd035c20 100644 --- a/src/reference/docbook/mongodb.xml +++ b/src/reference/docbook/mongodb.xml @@ -140,21 +140,6 @@ configurableStoreMessages. It is recommended to use this implementation for robust and flexible solutions. The MongoDbMessageStore remains for backward compatibility and may be removed in future releases. - - Starting with Spring Integration 4.0 the MongoDbMessageStore is - deprecated and will be removed in future release. To achieve the similar MongoDB documents representation - as it is with MongoDbMessageStore (to have an access to the headers - and payload properties) it is just necessary to configure - ConfigurableMongoDbMessageStore with MessageDocumentMongoConverter: - - - - - - - -]]> -
diff --git a/src/reference/docbook/whats-new.xml b/src/reference/docbook/whats-new.xml index f21b244dbf9..a68cd54b590 100644 --- a/src/reference/docbook/whats-new.xml +++ b/src/reference/docbook/whats-new.xml @@ -185,14 +185,5 @@ For more information see .
-
- MongoDB MessageStore: MessageDocumentMongoConverter - - The MongoDbMessageStore has been deprecated in favor of - ConfigurableMongoDbMessageStore. The new MessageDocumentMongoConverter - has been introduced to provide MongoDB document representation similar to MongoDbMessageStore results. - For more information see . - -
From 11084c518239663da56e82a93df14bac22834561 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 28 Mar 2014 18:09:54 +0200 Subject: [PATCH 5/6] INT-3337: Make `MDbMS.MessageWrapper` AuditAware Add `_id` persistence field --- .../mongodb/store/MongoDbMessageStore.java | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java index 53104035917..cbfaa19f153 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java @@ -29,11 +29,16 @@ import java.util.Properties; import java.util.UUID; +import org.springframework.beans.BeansException; import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.BeanClassLoaderAware; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.core.convert.converter.Converter; import org.springframework.core.serializer.support.DeserializingConverter; import org.springframework.core.serializer.support.SerializingConverter; +import org.springframework.data.annotation.Id; import org.springframework.data.annotation.Transient; import org.springframework.data.convert.WritingConverter; import org.springframework.data.domain.Sort; @@ -82,7 +87,8 @@ * @author Artem Bilan * @since 2.1 */ -public class MongoDbMessageStore extends AbstractMessageGroupStore implements MessageStore, BeanClassLoaderAware { +public class MongoDbMessageStore extends AbstractMessageGroupStore + implements MessageStore, BeanClassLoaderAware, ApplicationContextAware, InitializingBean { private final static String DEFAULT_COLLECTION_NAME = "messages"; @@ -107,6 +113,8 @@ public class MongoDbMessageStore extends AbstractMessageGroupStore implements Me private volatile ClassLoader classLoader = ClassUtils.getDefaultClassLoader(); + private ApplicationContext applicationContext; + /** * Create a MongoDbMessageStore using the provided {@link MongoDbFactory}.and the default collection name. @@ -126,7 +134,6 @@ public MongoDbMessageStore(MongoDbFactory mongoDbFactory) { public MongoDbMessageStore(MongoDbFactory mongoDbFactory, String collectionName) { Assert.notNull(mongoDbFactory, "mongoDbFactory must not be null"); this.converter = new MessageReadingMongoConverter(mongoDbFactory, new MongoMappingContext()); - this.converter.afterPropertiesSet(); this.template = new MongoTemplate(mongoDbFactory, this.converter); this.collectionName = (StringUtils.hasText(collectionName)) ? collectionName : DEFAULT_COLLECTION_NAME; } @@ -138,6 +145,18 @@ public void setBeanClassLoader(ClassLoader classLoader) { this.classLoader = classLoader; } + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + @Override + public void afterPropertiesSet() throws Exception { + this.template.setApplicationContext(this.applicationContext); + this.converter.setApplicationContext(this.applicationContext); + this.converter.afterPropertiesSet(); + } + @Override public Message addMessage(Message message) { Assert.notNull(message, "'message' must not be null"); @@ -593,6 +612,14 @@ public byte[] convert(Throwable source) { */ private static final class MessageWrapper { + /* + * Needed as a persistence property to suppress 'Cannot determine IsNewStrategy' MappingException + * when the application context is configured with auditing. The document is not + * currently Auditable. + */ + @Id + private String _id; + private volatile Object _groupId; @Transient From 5a0552fb337dd7b8d132bd8d2227b57beed2f27c Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 28 Mar 2014 19:31:30 +0200 Subject: [PATCH 6/6] Addressing PR comments --- .../ConfigurableMongoDbMessageStore.java | 1 + .../mongodb/store/MongoDbMessageStore.java | 33 ++++++++++--------- src/reference/docbook/mongodb.xml | 21 ++++++------ 3 files changed, 29 insertions(+), 26 deletions(-) diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java index e1fa8a50808..9aec43fec9f 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java @@ -362,6 +362,7 @@ private static class MessageDocument { * when the application context is configured with auditing. The document is not * currently Auditable. */ + @SuppressWarnings("unused") @Id private String _id; diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java index cbfaa19f153..3a08501a1b3 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java @@ -474,6 +474,15 @@ private Object extractPayload(DBObject source) { } + @SuppressWarnings("unchecked") + private static void enhanceHeaders(MessageHeaders messageHeaders, Map headers) { + Map innerMap = (Map) new DirectFieldAccessor(messageHeaders).getPropertyValue("headers"); + // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders + innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); + innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); + } + + private static class UuidToDBObjectConverter implements Converter { @Override public DBObject convert(UUID source) { @@ -514,16 +523,13 @@ public DBObject convert(MessageHistory source) { private class DBObjectToGenericMessageConverter implements Converter> { @Override - @SuppressWarnings("unchecked") + public GenericMessage convert(DBObject source) { + @SuppressWarnings("unchecked") Map headers = MongoDbMessageStore.this.converter.normalizeHeaders((Map) source.get("headers")); GenericMessage message = new GenericMessage(MongoDbMessageStore.this.converter.extractPayload(source), headers); - Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); - // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders - innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); - innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); - + enhanceHeaders(message.getHeaders(), headers); return message; } @@ -544,8 +550,8 @@ public MutableMessage convert(DBObject source) { private class DBObjectToAdviceMessageConverter implements Converter { @Override - @SuppressWarnings("unchecked") public AdviceMessage convert(DBObject source) { + @SuppressWarnings("unchecked") Map headers = MongoDbMessageStore.this.converter.normalizeHeaders((Map) source.get("headers")); Message inputMessage = null; @@ -563,10 +569,7 @@ public AdviceMessage convert(DBObject source) { } AdviceMessage message = new AdviceMessage(MongoDbMessageStore.this.converter.extractPayload(source), headers, inputMessage); - Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); - // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders - innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); - innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); + enhanceHeaders(message.getHeaders(), headers); return message; } @@ -578,16 +581,13 @@ private class DBObjectToErrorMessageConverter implements Converter deserializingConverter = new DeserializingConverter(); @Override - @SuppressWarnings("unchecked") public ErrorMessage convert(DBObject source) { + @SuppressWarnings("unchecked") Map headers = MongoDbMessageStore.this.converter.normalizeHeaders((Map) source.get("headers")); Object payload = this.deserializingConverter.convert((byte[]) source.get("payload")); ErrorMessage message = new ErrorMessage((Throwable) payload, headers); - Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers"); - // using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders - innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID)); - innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP)); + enhanceHeaders(message.getHeaders(), headers); return message; } @@ -617,6 +617,7 @@ private static final class MessageWrapper { * when the application context is configured with auditing. The document is not * currently Auditable. */ + @SuppressWarnings("unused") @Id private String _id; diff --git a/src/reference/docbook/mongodb.xml b/src/reference/docbook/mongodb.xml index d08bd035c20..10ecc1384d3 100644 --- a/src/reference/docbook/mongodb.xml +++ b/src/reference/docbook/mongodb.xml @@ -88,8 +88,8 @@ Spring Integration's MongoDB module provides the MongoDbMessageStore which is an implementation of both - the MessageStore strategy (mainly used by the QueueChannel and ClaimCheck - patterns) and the MessageGroupStore strategy (mainly used by the Aggregator and + the MessageStore strategy (mainly used by the ClaimCheckpattern) + and the MessageGroupStore strategy (mainly used by the Aggregator and Resequencer patterns). @@ -111,15 +111,18 @@ and an Aggregator. As you can see it is a simple bean configuration, and it expects a MongoDbFactory as a constructor argument. + + The MongoDbMessageStore expands the Message as a Mongo document + with all nested properties using Spring Data Mongo Mapping mechanism. It is useful when needed to have access to + payload or headers for auditing or analytics, for example, against stored messages. + - The MongoDbMessageStore uses a custom MappingMongoConverter implementation + As far as MongoDbMessageStore uses a custom MappingMongoConverter implementation to store Messages as MongoDB documents and there are some limitations for the properties (payload and header values) of the Message. - For example an ErrorMessage can't be converted to the MongoDB document, because it has an - Exception property, where the cause property is infintely recursed. Also, there is no ability to - configure - custom converters for complex domain payloads or header values. + For example, there is no ability to configure custom converters for complex domain payloads or header values. + Or provide some other MongoTemplate (or MappingMongoConverter) customizations. To achieve these capabilities, an alternative MongoDB MessageStore implementation has been introduced; see next paragraph. @@ -128,8 +131,7 @@ Spring Integration 3.0 introduced the ConfigurableMongoDbMessageStore - MessageStore and MessageGroupStore implementation. This class can receive, as a constructor argument, a MongoTemplate, with which you can - configure with a - custom WriteConcern, for example. Another constructor requires a + configure with a custom WriteConcern, for example. Another constructor requires a MappingMongoConverter, and a MongoDbFactory, which allows you to provide some custom conversions for Messages and their properties. Note, by default, the ConfigurableMongoDbMessageStore uses standard Java serialization @@ -138,7 +140,6 @@ MongoDbFactory and MappingMongoConverter. The default name for the collection stored by the ConfigurableMongoDbMessageStore is configurableStoreMessages. It is recommended to use this implementation for robust and flexible solutions. - The MongoDbMessageStore remains for backward compatibility and may be removed in future releases.