Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

/**
Expand All @@ -34,6 +35,7 @@
* a reference to the message and changes will be reflected there too.
*
* @author Gary Russell
* @author Artem Bilan
* @since 4.0
*
*/
Expand All @@ -52,9 +54,10 @@ public MutableMessage(T payload) {
}

@SuppressWarnings("unchecked")
public MutableMessage(T payload, MessageHeaders headers) {
this.payload = payload;
public MutableMessage(T payload, Map<String, Object> headers) {
Assert.notNull(payload, "payload must not be null");
this.headers = new MessageHeaders(headers);
this.payload = payload;
// Needs SPR-11468 to avoid DFA and header manipulation
rawHeaders = (Map<String, Object>) new DirectFieldAccessor(this.headers)
.getPropertyValue("headers");
Expand All @@ -64,6 +67,7 @@ public MutableMessage(T payload, MessageHeaders headers) {
}
}


@Override
public MessageHeaders getHeaders() {
return this.headers;
Expand All @@ -75,6 +79,7 @@ public T getPayload() {
}

public void setPayload(T payload) {
Assert.notNull(payload, "'payload' must not be null");
this.payload = payload;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ private static class MessageDocument {
* when the application context is configured with auditing. The document is not
* currently Auditable.
*/
@SuppressWarnings("unused")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you get a warning for this? Eclipse doesn't complain when the @Id annotation is present.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, IDEA complains

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK; LGTM; merging.

@Id
private String _id;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,18 @@
import java.util.Properties;
import java.util.UUID;

import org.springframework.beans.BeansException;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.serializer.support.DeserializingConverter;
import org.springframework.core.serializer.support.SerializingConverter;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;
import org.springframework.data.convert.WritingConverter;
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Direction;
import org.springframework.data.mapping.context.MappingContext;
Expand All @@ -46,6 +54,8 @@
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.integration.history.MessageHistory;
import org.springframework.integration.message.AdviceMessage;
import org.springframework.integration.message.MutableMessage;
import org.springframework.integration.store.AbstractMessageGroupStore;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
Expand All @@ -54,6 +64,7 @@
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
Expand All @@ -76,7 +87,8 @@
* @author Artem Bilan
* @since 2.1
*/
public class MongoDbMessageStore extends AbstractMessageGroupStore implements MessageStore, BeanClassLoaderAware {
public class MongoDbMessageStore extends AbstractMessageGroupStore
implements MessageStore, BeanClassLoaderAware, ApplicationContextAware, InitializingBean {

private final static String DEFAULT_COLLECTION_NAME = "messages";

Expand All @@ -90,17 +102,19 @@ public class MongoDbMessageStore extends AbstractMessageGroupStore implements Me

private final static String GROUP_UPDATE_TIMESTAMP_KEY = "_group_update_timestamp";

private final static String PAYLOAD_TYPE_KEY = "_payloadType";

private final static String CREATED_DATE = "_createdDate";


private final MongoTemplate template;

private final MessageReadingMongoConverter converter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should go ahead and rename as we discussed before MessageDocumentMongoConverter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind here it has name MessageWrapper and we can't change the name for backward compatibility.
From other side MessageReadingMongoConverter is an inner class, so its name doesn't have value too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np;


private final String collectionName;

private volatile ClassLoader classLoader = ClassUtils.getDefaultClassLoader();

private ApplicationContext applicationContext;


/**
* Create a MongoDbMessageStore using the provided {@link MongoDbFactory}.and the default collection name.
Expand All @@ -119,9 +133,8 @@ public MongoDbMessageStore(MongoDbFactory mongoDbFactory) {
*/
public MongoDbMessageStore(MongoDbFactory mongoDbFactory, String collectionName) {
Assert.notNull(mongoDbFactory, "mongoDbFactory must not be null");
MessageReadingMongoConverter converter = new MessageReadingMongoConverter(mongoDbFactory, new MongoMappingContext());
converter.afterPropertiesSet();
this.template = new MongoTemplate(mongoDbFactory, converter);
this.converter = new MessageReadingMongoConverter(mongoDbFactory, new MongoMappingContext());
this.template = new MongoTemplate(mongoDbFactory, this.converter);
this.collectionName = (StringUtils.hasText(collectionName)) ? collectionName : DEFAULT_COLLECTION_NAME;
}

Expand All @@ -132,6 +145,18 @@ public void setBeanClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

@Override
public void afterPropertiesSet() throws Exception {
this.template.setApplicationContext(this.applicationContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need if not null (NPE in template) - will fix during merge

this.converter.setApplicationContext(this.applicationContext);
this.converter.afterPropertiesSet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this from the constructor causes the tests to fail - added afterPropertiesSet() to the tests.

}

@Override
public <T> Message<T> addMessage(Message<T> message) {
Assert.notNull(message, "'message' must not be null");
Expand Down Expand Up @@ -335,6 +360,10 @@ public void afterPropertiesSet() {
customConverters.add(new DBObjectToUUIDConverter());
customConverters.add(new MessageHistoryToDBObjectConverter());
customConverters.add(new DBObjectToGenericMessageConverter());
customConverters.add(new DBObjectToMutableMessageConverter());
customConverters.add(new DBObjectToErrorMessageConverter());
customConverters.add(new DBObjectToAdviceMessageConverter());
customConverters.add(new ThrowableToBytesConverter());
this.setCustomConversions(new CustomConversions(customConverters));
super.afterPropertiesSet();
}
Expand All @@ -355,24 +384,18 @@ public <S> S read(Class<S> clazz, DBObject source) {
return super.read(clazz, source);
}
if (source != null) {
Map<String, Object> headers = this.normalizeHeaders((Map<String, Object>) source.get("headers"));

Object payload = source.get("payload");
Object payloadType = source.get(PAYLOAD_TYPE_KEY);
if (payloadType != null && payload instanceof DBObject) {
try {
Class<?> payloadClass = ClassUtils.forName(payloadType.toString(), classLoader);
payload = this.read(payloadClass, (DBObject) payload);
}
catch (Exception e) {
throw new IllegalStateException("failed to load class: " + payloadType, e);
}
Message<?> message = null;
Object messageType = source.get("_messageType");
if (messageType == null) {
messageType = GenericMessage.class.getName();
}
try {
message = (Message<?>) this.read(ClassUtils.forName(messageType.toString(), classLoader), source);
}
catch (ClassNotFoundException e) {
throw new IllegalStateException("failed to load class: " + messageType, e);
}
GenericMessage message = new GenericMessage(payload, headers);
Map innerMap = (Map) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers");
// using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders
innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID));
innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP));

Long groupTimestamp = (Long)source.get(GROUP_TIMESTAMP_KEY);
Long lastModified = (Long)source.get(GROUP_UPDATE_TIMESTAMP_KEY);
Integer lastReleasedSequenceNumber = (Integer)source.get(LAST_RELEASED_SEQUENCE_NUMBER);
Expand Down Expand Up @@ -432,8 +455,34 @@ else if (source instanceof BasicDBList) {
}
return normalizedHeaders;
}

private Object extractPayload(DBObject source) {
Object payload = source.get("payload");
if (payload instanceof DBObject) {
DBObject payloadObject = (DBObject) payload;
Object payloadType = payloadObject.get("_class");
try {
Class<?> payloadClass = ClassUtils.forName(payloadType.toString(), classLoader);
payload = this.read(payloadClass, payloadObject);
}
catch (Exception e) {
throw new IllegalStateException("failed to load class: " + payloadType, e);
}
}
return payload;
}

}

@SuppressWarnings("unchecked")
private static void enhanceHeaders(MessageHeaders messageHeaders, Map<String, Object> headers) {
Map<String, Object> innerMap = (Map<String, Object>) new DirectFieldAccessor(messageHeaders).getPropertyValue("headers");
// using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders
innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID));
innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP));
}


private static class UuidToDBObjectConverter implements Converter<UUID, DBObject> {
@Override
public DBObject convert(UUID source) {
Expand Down Expand Up @@ -474,53 +523,118 @@ public DBObject convert(MessageHistory source) {
private class DBObjectToGenericMessageConverter implements Converter<DBObject, GenericMessage<?>> {

@Override
@SuppressWarnings("unchecked")

public GenericMessage<?> convert(DBObject source) {
MessageReadingMongoConverter converter = (MessageReadingMongoConverter) MongoDbMessageStore.this.template
.getConverter();
Map<String, Object> headers = converter.normalizeHeaders((Map<String, Object>) source.get("headers"));
@SuppressWarnings("unchecked")
Map<String, Object> headers = MongoDbMessageStore.this.converter.normalizeHeaders((Map<String, Object>) source.get("headers"));

Object payload = source.get("payload");
Object payloadType = source.get(PAYLOAD_TYPE_KEY);
if (payloadType != null && payload instanceof DBObject) {
GenericMessage<?> message = new GenericMessage<Object>(MongoDbMessageStore.this.converter.extractPayload(source), headers);
enhanceHeaders(message.getHeaders(), headers);
return message;
}

}

private class DBObjectToMutableMessageConverter implements Converter<DBObject, MutableMessage<?>> {

@Override
public MutableMessage<?> convert(DBObject source) {
@SuppressWarnings("unchecked")
Map<String, Object> headers = MongoDbMessageStore.this.converter.normalizeHeaders((Map<String, Object>) source.get("headers"));

return new MutableMessage<Object>(MongoDbMessageStore.this.converter.extractPayload(source), headers);
}

}

private class DBObjectToAdviceMessageConverter implements Converter<DBObject, AdviceMessage> {

@Override
public AdviceMessage convert(DBObject source) {
@SuppressWarnings("unchecked")
Map<String, Object> headers = MongoDbMessageStore.this.converter.normalizeHeaders((Map<String, Object>) source.get("headers"));

Message<?> inputMessage = null;

if (source.get("inputMessage") != null) {
DBObject inputMessageObject = (DBObject) source.get("inputMessage");
Object inputMessageType = inputMessageObject.get("_class");
try {
Class<?> payloadClass = ClassUtils.forName(payloadType.toString(), classLoader);
payload = converter.read(payloadClass, (DBObject) payload);
Class<?> messageClass = ClassUtils.forName(inputMessageType.toString(), classLoader);
inputMessage = (Message<?>) MongoDbMessageStore.this.converter.read(messageClass, inputMessageObject);
}
catch (Exception e) {
throw new IllegalStateException("failed to load class: " + payloadType, e);
throw new IllegalStateException("failed to load class: " + inputMessageType, e);
}
}

@SuppressWarnings("rawtypes")
GenericMessage<Object> message = new GenericMessage(payload, headers);
Map<String, Object> innerMap = (Map<String, Object>) new DirectFieldAccessor(message.getHeaders()).getPropertyValue("headers");
// using reflection to set ID and TIMESTAMP since they are immutable through MessageHeaders
innerMap.put(MessageHeaders.ID, headers.get(MessageHeaders.ID));
innerMap.put(MessageHeaders.TIMESTAMP, headers.get(MessageHeaders.TIMESTAMP));
AdviceMessage message = new AdviceMessage(MongoDbMessageStore.this.converter.extractPayload(source), headers, inputMessage);
enhanceHeaders(message.getHeaders(), headers);

return message;
}

}

private class DBObjectToErrorMessageConverter implements Converter<DBObject, ErrorMessage> {

private final Converter<byte[], Object> deserializingConverter = new DeserializingConverter();

@Override
public ErrorMessage convert(DBObject source) {
@SuppressWarnings("unchecked")
Map<String, Object> headers = MongoDbMessageStore.this.converter.normalizeHeaders((Map<String, Object>) source.get("headers"));

Object payload = this.deserializingConverter.convert((byte[]) source.get("payload"));
ErrorMessage message = new ErrorMessage((Throwable) payload, headers);
enhanceHeaders(message.getHeaders(), headers);

return message;
}

}

@WritingConverter
private class ThrowableToBytesConverter implements Converter<Throwable, byte[]> {

private final Converter<Object, byte[]> serializingConverter = new SerializingConverter();

@Override
public byte[] convert(Throwable source) {
return serializingConverter.convert(source);
}

}


/**
* Wrapper class used for storing Messages in MongoDB along with their "group" metadata.
*/
private static final class MessageWrapper {

/*
* Needed as a persistence property to suppress 'Cannot determine IsNewStrategy' MappingException
* when the application context is configured with auditing. The document is not
* currently Auditable.
*/
@SuppressWarnings("unused")
@Id
private String _id;

private volatile Object _groupId;

@Transient
private final Message<?> message;

@SuppressWarnings("unused")
private final String _messageType;

private final Object payload;

@SuppressWarnings("unused")
private final Map<String, ?> headers;

@SuppressWarnings("unused")
private final String _payloadType;
private final Message<?> inputMessage;

private volatile long _group_timestamp;

Expand All @@ -533,9 +647,15 @@ private static final class MessageWrapper {
public MessageWrapper(Message<?> message) {
Assert.notNull(message, "'message' must not be null");
this.message = message;
this._messageType = message.getClass().getName();
this.payload = message.getPayload();
this.headers = message.getHeaders();
this._payloadType = this.payload.getClass().getName();
if (message instanceof AdviceMessage) {
this.inputMessage = ((AdviceMessage) message).getInputMessage();
}
else {
this.inputMessage = null;
}
}

public int get_LastReleasedSequenceNumber() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public void testPollMessages() throws Exception{
Message<?> messageA = new GenericMessage<String>("A");
Message<?> messageB = new GenericMessage<String>("B");
store.addMessageToGroup(1, messageA);
Thread.sleep(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timing issue: both messages may get the same time and there is no guaranty that messageA will be polled first.
When we will introduce messageSequence we'll remove this sleep

store.addMessageToGroup(1, messageB);
assertEquals(2, store.messageGroupSize(1));
Message<?> out = store.pollMessageFromGroup(1);
Expand Down
Loading