Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ protected BeanDefinitionBuilder buildBeanDefinition(Element element, ParserConte
boolean isFixedSubscriber = "true".equals(fixedSubscriberChannel.trim().toLowerCase());

// configure a queue-based channel if any queue sub-element is defined
String channel = element.getAttribute(ID_ATTRIBUTE);
if ((queueElement = DomUtils.getChildElementByTagName(element, "queue")) != null) {
builder = BeanDefinitionBuilder.genericBeanDefinition(QueueChannel.class);
boolean hasStoreRef = this.parseStoreRef(builder, queueElement, element.getAttribute(ID_ATTRIBUTE));
boolean hasStoreRef = this.parseStoreRef(builder, queueElement, channel, false);
boolean hasQueueRef = this.parseQueueRef(builder, queueElement);
if (!hasStoreRef) {
boolean hasCapacity = this.parseQueueCapacity(builder, queueElement);
Expand All @@ -75,6 +76,15 @@ else if ((queueElement = DomUtils.getChildElementByTagName(element, "priority-qu
if (StringUtils.hasText(comparatorRef)) {
builder.addConstructorArgReference(comparatorRef);
}
if (parseStoreRef(builder, queueElement, channel, true)) {
if (StringUtils.hasText(comparatorRef)) {
parserContext.getReaderContext().error(
"The 'message-store' attribute is not allowed" + " when providing a 'comparator' to a priority queue.",
element);
}
builder.getRawBeanDefinition().setBeanClass(QueueChannel.class);
}

}
else if ((queueElement = DomUtils.getChildElementByTagName(element, "rendezvous-queue")) != null) {
builder = BeanDefinitionBuilder.genericBeanDefinition(RendezvousChannel.class);
Expand Down Expand Up @@ -158,13 +168,14 @@ private boolean parseQueueRef(BeanDefinitionBuilder builder, Element queueElemen
return false;
}

private boolean parseStoreRef(BeanDefinitionBuilder builder, Element queueElement, String channel) {
private boolean parseStoreRef(BeanDefinitionBuilder builder, Element queueElement, String channel, boolean priority) {
String storeRef = queueElement.getAttribute("message-store");
if (StringUtils.hasText(storeRef)) {
BeanDefinitionBuilder queueBuilder = BeanDefinitionBuilder
.genericBeanDefinition(MessageGroupQueue.class);
queueBuilder.addConstructorArgReference(storeRef);
queueBuilder.addConstructorArgValue(new TypedStringValue(storeRef).getValue() + ":" + channel);
queueBuilder.addPropertyValue("priority", priority);
parseQueueCapacity(queueBuilder, queueElement);
builder.addConstructorArgValue(queueBuilder.getBeanDefinition());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.integration.message;

import java.io.Serializable;
import java.util.Map;

import org.springframework.beans.DirectFieldAccessor;
Expand All @@ -36,7 +37,9 @@
* @since 4.0
*
*/
public class MutableMessage<T> implements Message<T> {
public class MutableMessage<T> implements Message<T>, Serializable {

private static final long serialVersionUID = -636635024258737500L;

private T payload;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.store;

import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.messaging.Message;

/**
* Defines a minimal message group store with basic capabilities.
*
* @author Gary Russell
* @since 4.0
*
*/
public interface BasicMessageGroupStore {

/**
* Returns the size of this MessageGroup.
*
* @param groupId The group identifier.
* @return The size.
*/
@ManagedAttribute
int messageGroupSize(Object groupId);

/**
* Return all Messages currently in the MessageStore that were stored using
* {@link #addMessageToGroup(Object, Message)} with this group id.
*
* @param groupId The group identifier.
* @return A group of messages, empty if none exists for this key.
*/
MessageGroup getMessageGroup(Object groupId);

/**
* Store a message with an association to a group id. This can be used to group messages together.
*
* @param groupId The group id to store the message under.
* @param message A message.
* @return The message group.
*/
MessageGroup addMessageToGroup(Object groupId, Message<?> message);

/**
* Polls Message from this {@link MessageGroup} (in FIFO style if supported by the implementation)
* while also removing the polled {@link Message}
*
* @param groupId The group identifier.
* @return The message.
*/
Message<?> pollMessageFromGroup(Object groupId);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.store;

import org.springframework.integration.channel.QueueChannel;

/**
* A marker interface that indicates this message store has optimizations for
* use in a {@link QueueChannel}.
*
* @author Gary Russell
* @since 4.0
*
*/
public interface ChannelMessageStore extends BasicMessageGroupStore {

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,6 +40,7 @@
* @author Dave Syer
* @author Oleg Zhurakousky
* @author Gunnar Hillert
* @author Gary Russell
*
* @since 2.0
*
Expand All @@ -50,7 +51,7 @@ public class MessageGroupQueue extends AbstractQueue<Message<?>> implements Bloc

private static final int DEFAULT_CAPACITY = Integer.MAX_VALUE;

private final MessageGroupStore messageGroupStore;
private final BasicMessageGroupStore messageGroupStore;

private final Object groupId;

Expand All @@ -63,19 +64,19 @@ public class MessageGroupQueue extends AbstractQueue<Message<?>> implements Bloc

private final Condition messageStoreNotEmpty;

public MessageGroupQueue(MessageGroupStore messageGroupStore, Object groupId) {
public MessageGroupQueue(BasicMessageGroupStore messageGroupStore, Object groupId) {
this(messageGroupStore, groupId, DEFAULT_CAPACITY, new ReentrantLock(true));
}

public MessageGroupQueue(MessageGroupStore messageGroupStore, Object groupId, int capacity) {
public MessageGroupQueue(BasicMessageGroupStore messageGroupStore, Object groupId, int capacity) {
this(messageGroupStore, groupId, capacity, new ReentrantLock(true));
}

public MessageGroupQueue(MessageGroupStore messageGroupStore, Object groupId, Lock storeLock) {
public MessageGroupQueue(BasicMessageGroupStore messageGroupStore, Object groupId, Lock storeLock) {
this(messageGroupStore, groupId, DEFAULT_CAPACITY, storeLock);
}

public MessageGroupQueue(MessageGroupStore messageGroupStore, Object groupId, int capacity, Lock storeLock) {
public MessageGroupQueue(BasicMessageGroupStore messageGroupStore, Object groupId, int capacity, Lock storeLock) {
Assert.isTrue(capacity > 0, "'capacity' must be greater than 0");
Assert.notNull(storeLock, "'storeLock' must not be null");
Assert.notNull(messageGroupStore, "'messageGroupStore' must not be null");
Expand All @@ -86,16 +87,45 @@ public MessageGroupQueue(MessageGroupStore messageGroupStore, Object groupId, in
this.messageGroupStore = messageGroupStore;
this.groupId = groupId;
this.capacity = capacity;
if (logger.isWarnEnabled() && !(messageGroupStore instanceof ChannelMessageStore)) {
logger.warn(messageGroupStore.getClass().getSimpleName() + " is not optimized for use "
+ "in a 'MessageGroupQueue'; consider using a `ChannelMessageStore'");
}
}

/**
* If true, ensures that the message store supports priority. If false WARNs if the
* message store uses priority to determine the message order when receiving.
* @param priority true if priority is expected to be used.
*/
public void setPriority(boolean priority) {
if (priority) {
Assert.isInstanceOf(PriorityCapableChannelMessageStore.class, this.messageGroupStore);
Assert.isTrue(((PriorityCapableChannelMessageStore) this.messageGroupStore).isPriorityEnabled(),
"When using priority, the 'PriorityCapableChannelMessageStore' must have priority enabled.");
}
else {
if (logger.isWarnEnabled() && this.messageGroupStore instanceof PriorityCapableChannelMessageStore
&& ((PriorityCapableChannelMessageStore) this.messageGroupStore).isPriorityEnabled()) {
logger.warn("It's not recommended to use a priority-based message store " +
"when declaring a non-priority 'MessageGroupQueue'; message retrieval may not be FIFO; " +
"set 'priority' to 'true' if that is your intent. If you are using the namespace to " +
"define a channel, use '<priority-queue message-store.../> instead.");
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else {
if (logger.isWarnEnabled() && messageGroupStore instanceof ChannelPriorityMessageStore) {
     logger.warn("It's not recommended to use `ChannelPriorityMessageStore`" for simple `QueueChannel`: the desired FIFO logic may be broken.);
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

@Override
public Iterator<Message<?>> iterator() {
return getMessages().iterator();
}

@Override
public int size() {
return messageGroupStore.messageGroupSize(groupId);
}

@Override
public Message<?> peek() {
Message<?> message = null;
final Lock storeLock = this.storeLock;
Expand All @@ -117,6 +147,7 @@ public Message<?> peek() {
return message;
}

@Override
public Message<?> poll(long timeout, TimeUnit unit) throws InterruptedException {
Message<?> message = null;
long timeoutInNanos = unit.toNanos(timeout);
Expand All @@ -136,6 +167,7 @@ public Message<?> poll(long timeout, TimeUnit unit) throws InterruptedException
return message;
}

@Override
public Message<?> poll() {
Message<?> message = null;
final Lock storeLock = this.storeLock;
Expand All @@ -154,10 +186,12 @@ public Message<?> poll() {
return message;
}

@Override
public int drainTo(Collection<? super Message<?>> c) {
return this.drainTo(c, Integer.MAX_VALUE);
}

@Override
public int drainTo(Collection<? super Message<?>> collection, int maxElements) {
Assert.notNull(collection, "'collection' must not be null");
int originalSize = collection.size();
Expand Down Expand Up @@ -185,6 +219,7 @@ public int drainTo(Collection<? super Message<?>> collection, int maxElements) {
return collection.size() - originalSize;
}

@Override
public boolean offer(Message<?> message) {
boolean offered = true;
final Lock storeLock = this.storeLock;
Expand All @@ -203,6 +238,7 @@ public boolean offer(Message<?> message) {
return offered;
}

@Override
public boolean offer(Message<?> message, long timeout, TimeUnit unit) throws InterruptedException {
long timeoutInNanos = unit.toNanos(timeout);
boolean offered = false;
Expand All @@ -225,6 +261,7 @@ public boolean offer(Message<?> message, long timeout, TimeUnit unit) throws Int
return offered;
}

@Override
public void put(Message<?> message) throws InterruptedException {
final Lock storeLock = this.storeLock;
storeLock.lockInterruptibly();
Expand All @@ -241,13 +278,15 @@ public void put(Message<?> message) throws InterruptedException {
}
}

@Override
public int remainingCapacity() {
if (capacity == Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return capacity - this.size();
}

@Override
public Message<?> take() throws InterruptedException {
Message<?> message = null;
final Lock storeLock = this.storeLock;
Expand Down
Loading