diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java index 4d06581eb4a..2811f413766 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java @@ -50,9 +50,10 @@ protected BeanDefinitionBuilder buildBeanDefinition(Element element, ParserConte boolean isFixedSubscriber = "true".equals(fixedSubscriberChannel.trim().toLowerCase()); // configure a queue-based channel if any queue sub-element is defined + String channel = element.getAttribute(ID_ATTRIBUTE); if ((queueElement = DomUtils.getChildElementByTagName(element, "queue")) != null) { builder = BeanDefinitionBuilder.genericBeanDefinition(QueueChannel.class); - boolean hasStoreRef = this.parseStoreRef(builder, queueElement, element.getAttribute(ID_ATTRIBUTE)); + boolean hasStoreRef = this.parseStoreRef(builder, queueElement, channel, false); boolean hasQueueRef = this.parseQueueRef(builder, queueElement); if (!hasStoreRef) { boolean hasCapacity = this.parseQueueCapacity(builder, queueElement); @@ -75,6 +76,15 @@ else if ((queueElement = DomUtils.getChildElementByTagName(element, "priority-qu if (StringUtils.hasText(comparatorRef)) { builder.addConstructorArgReference(comparatorRef); } + if (parseStoreRef(builder, queueElement, channel, true)) { + if (StringUtils.hasText(comparatorRef)) { + parserContext.getReaderContext().error( + "The 'message-store' attribute is not allowed" + " when providing a 'comparator' to a priority queue.", + element); + } + builder.getRawBeanDefinition().setBeanClass(QueueChannel.class); + } + } else if ((queueElement = DomUtils.getChildElementByTagName(element, "rendezvous-queue")) != null) { builder = BeanDefinitionBuilder.genericBeanDefinition(RendezvousChannel.class); @@ -158,13 +168,14 @@ private boolean parseQueueRef(BeanDefinitionBuilder builder, Element queueElemen return false; } - private boolean parseStoreRef(BeanDefinitionBuilder builder, Element queueElement, String channel) { + private boolean parseStoreRef(BeanDefinitionBuilder builder, Element queueElement, String channel, boolean priority) { String storeRef = queueElement.getAttribute("message-store"); if (StringUtils.hasText(storeRef)) { BeanDefinitionBuilder queueBuilder = BeanDefinitionBuilder .genericBeanDefinition(MessageGroupQueue.class); queueBuilder.addConstructorArgReference(storeRef); queueBuilder.addConstructorArgValue(new TypedStringValue(storeRef).getValue() + ":" + channel); + queueBuilder.addPropertyValue("priority", priority); parseQueueCapacity(queueBuilder, queueElement); builder.addConstructorArgValue(queueBuilder.getBeanDefinition()); return true; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/message/MutableMessage.java b/spring-integration-core/src/main/java/org/springframework/integration/message/MutableMessage.java index cebde30e1e4..314b08ee4bc 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/message/MutableMessage.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/message/MutableMessage.java @@ -15,6 +15,7 @@ */ package org.springframework.integration.message; +import java.io.Serializable; import java.util.Map; import org.springframework.beans.DirectFieldAccessor; @@ -36,7 +37,9 @@ * @since 4.0 * */ -public class MutableMessage implements Message { +public class MutableMessage implements Message, Serializable { + + private static final long serialVersionUID = -636635024258737500L; private T payload; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/BasicMessageGroupStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/BasicMessageGroupStore.java new file mode 100644 index 00000000000..4ec1c633cc4 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/BasicMessageGroupStore.java @@ -0,0 +1,66 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.store; + +import org.springframework.jmx.export.annotation.ManagedAttribute; +import org.springframework.messaging.Message; + +/** + * Defines a minimal message group store with basic capabilities. + * + * @author Gary Russell + * @since 4.0 + * + */ +public interface BasicMessageGroupStore { + + /** + * Returns the size of this MessageGroup. + * + * @param groupId The group identifier. + * @return The size. + */ + @ManagedAttribute + int messageGroupSize(Object groupId); + + /** + * Return all Messages currently in the MessageStore that were stored using + * {@link #addMessageToGroup(Object, Message)} with this group id. + * + * @param groupId The group identifier. + * @return A group of messages, empty if none exists for this key. + */ + MessageGroup getMessageGroup(Object groupId); + + /** + * Store a message with an association to a group id. This can be used to group messages together. + * + * @param groupId The group id to store the message under. + * @param message A message. + * @return The message group. + */ + MessageGroup addMessageToGroup(Object groupId, Message message); + + /** + * Polls Message from this {@link MessageGroup} (in FIFO style if supported by the implementation) + * while also removing the polled {@link Message} + * + * @param groupId The group identifier. + * @return The message. + */ + Message pollMessageFromGroup(Object groupId); + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/ChannelMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/ChannelMessageStore.java new file mode 100644 index 00000000000..6b574e0b5e4 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/ChannelMessageStore.java @@ -0,0 +1,30 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.store; + +import org.springframework.integration.channel.QueueChannel; + +/** + * A marker interface that indicates this message store has optimizations for + * use in a {@link QueueChannel}. + * + * @author Gary Russell + * @since 4.0 + * + */ +public interface ChannelMessageStore extends BasicMessageGroupStore { + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupQueue.java b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupQueue.java index eb25db2e841..f4786aa3a54 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupQueue.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupQueue.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,6 +40,7 @@ * @author Dave Syer * @author Oleg Zhurakousky * @author Gunnar Hillert + * @author Gary Russell * * @since 2.0 * @@ -50,7 +51,7 @@ public class MessageGroupQueue extends AbstractQueue> implements Bloc private static final int DEFAULT_CAPACITY = Integer.MAX_VALUE; - private final MessageGroupStore messageGroupStore; + private final BasicMessageGroupStore messageGroupStore; private final Object groupId; @@ -63,19 +64,19 @@ public class MessageGroupQueue extends AbstractQueue> implements Bloc private final Condition messageStoreNotEmpty; - public MessageGroupQueue(MessageGroupStore messageGroupStore, Object groupId) { + public MessageGroupQueue(BasicMessageGroupStore messageGroupStore, Object groupId) { this(messageGroupStore, groupId, DEFAULT_CAPACITY, new ReentrantLock(true)); } - public MessageGroupQueue(MessageGroupStore messageGroupStore, Object groupId, int capacity) { + public MessageGroupQueue(BasicMessageGroupStore messageGroupStore, Object groupId, int capacity) { this(messageGroupStore, groupId, capacity, new ReentrantLock(true)); } - public MessageGroupQueue(MessageGroupStore messageGroupStore, Object groupId, Lock storeLock) { + public MessageGroupQueue(BasicMessageGroupStore messageGroupStore, Object groupId, Lock storeLock) { this(messageGroupStore, groupId, DEFAULT_CAPACITY, storeLock); } - public MessageGroupQueue(MessageGroupStore messageGroupStore, Object groupId, int capacity, Lock storeLock) { + public MessageGroupQueue(BasicMessageGroupStore messageGroupStore, Object groupId, int capacity, Lock storeLock) { Assert.isTrue(capacity > 0, "'capacity' must be greater than 0"); Assert.notNull(storeLock, "'storeLock' must not be null"); Assert.notNull(messageGroupStore, "'messageGroupStore' must not be null"); @@ -86,16 +87,45 @@ public MessageGroupQueue(MessageGroupStore messageGroupStore, Object groupId, in this.messageGroupStore = messageGroupStore; this.groupId = groupId; this.capacity = capacity; + if (logger.isWarnEnabled() && !(messageGroupStore instanceof ChannelMessageStore)) { + logger.warn(messageGroupStore.getClass().getSimpleName() + " is not optimized for use " + + "in a 'MessageGroupQueue'; consider using a `ChannelMessageStore'"); + } + } + + /** + * If true, ensures that the message store supports priority. If false WARNs if the + * message store uses priority to determine the message order when receiving. + * @param priority true if priority is expected to be used. + */ + public void setPriority(boolean priority) { + if (priority) { + Assert.isInstanceOf(PriorityCapableChannelMessageStore.class, this.messageGroupStore); + Assert.isTrue(((PriorityCapableChannelMessageStore) this.messageGroupStore).isPriorityEnabled(), + "When using priority, the 'PriorityCapableChannelMessageStore' must have priority enabled."); + } + else { + if (logger.isWarnEnabled() && this.messageGroupStore instanceof PriorityCapableChannelMessageStore + && ((PriorityCapableChannelMessageStore) this.messageGroupStore).isPriorityEnabled()) { + logger.warn("It's not recommended to use a priority-based message store " + + "when declaring a non-priority 'MessageGroupQueue'; message retrieval may not be FIFO; " + + "set 'priority' to 'true' if that is your intent. If you are using the namespace to " + + "define a channel, use ' instead."); + } + } } + @Override public Iterator> iterator() { return getMessages().iterator(); } + @Override public int size() { return messageGroupStore.messageGroupSize(groupId); } + @Override public Message peek() { Message message = null; final Lock storeLock = this.storeLock; @@ -117,6 +147,7 @@ public Message peek() { return message; } + @Override public Message poll(long timeout, TimeUnit unit) throws InterruptedException { Message message = null; long timeoutInNanos = unit.toNanos(timeout); @@ -136,6 +167,7 @@ public Message poll(long timeout, TimeUnit unit) throws InterruptedException return message; } + @Override public Message poll() { Message message = null; final Lock storeLock = this.storeLock; @@ -154,10 +186,12 @@ public Message poll() { return message; } + @Override public int drainTo(Collection> c) { return this.drainTo(c, Integer.MAX_VALUE); } + @Override public int drainTo(Collection> collection, int maxElements) { Assert.notNull(collection, "'collection' must not be null"); int originalSize = collection.size(); @@ -185,6 +219,7 @@ public int drainTo(Collection> collection, int maxElements) { return collection.size() - originalSize; } + @Override public boolean offer(Message message) { boolean offered = true; final Lock storeLock = this.storeLock; @@ -203,6 +238,7 @@ public boolean offer(Message message) { return offered; } + @Override public boolean offer(Message message, long timeout, TimeUnit unit) throws InterruptedException { long timeoutInNanos = unit.toNanos(timeout); boolean offered = false; @@ -225,6 +261,7 @@ public boolean offer(Message message, long timeout, TimeUnit unit) throws Int return offered; } + @Override public void put(Message message) throws InterruptedException { final Lock storeLock = this.storeLock; storeLock.lockInterruptibly(); @@ -241,6 +278,7 @@ public void put(Message message) throws InterruptedException { } } + @Override public int remainingCapacity() { if (capacity == Integer.MAX_VALUE) { return Integer.MAX_VALUE; @@ -248,6 +286,7 @@ public int remainingCapacity() { return capacity - this.size(); } + @Override public Message take() throws InterruptedException { Message message = null; final Lock storeLock = this.storeLock; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java index 416a9749c14..e5f6eb958cb 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java @@ -18,7 +18,7 @@ import org.springframework.messaging.Message; /** - * Interface for storage operations on groups of messages linked by a group id. + * Defines additional storage operations on groups of messages linked by a group id. * * @author Dave Syer * @author Oleg Zhurakousky @@ -27,7 +27,7 @@ * @since 2.0 * */ -public interface MessageGroupStore { +public interface MessageGroupStore extends BasicMessageGroupStore { /** * Optional attribute giving the number of messages in the store over all groups. Implementations may decline to @@ -38,6 +38,7 @@ public interface MessageGroupStore { */ @ManagedAttribute int getMessageCountForAllMessageGroups(); + /** * Optional attribute giving the number of message groups. Implementations may decline * to respond by throwing an exception. @@ -48,33 +49,6 @@ public interface MessageGroupStore { @ManagedAttribute int getMessageGroupCount(); - /** - * Returns the size of this MessageGroup. - * - * @param groupId The group identifier. - * @return The size. - */ - @ManagedAttribute - int messageGroupSize(Object groupId); - - /** - * Return all Messages currently in the MessageStore that were stored using - * {@link #addMessageToGroup(Object, Message)} with this group id. - * - * @param groupId The group identifier. - * @return A group of messages, empty if none exists for this key. - */ - MessageGroup getMessageGroup(Object groupId); - - /** - * Store a message with an association to a group id. This can be used to group messages together. - * - * @param groupId The group id to store the message under. - * @param message A message. - * @return The message group. - */ - MessageGroup addMessageToGroup(Object groupId, Message message); - /** * Persist a deletion on a single message from the group. The group is modified to reflect that 'messageToRemove' is * no longer present in the group. @@ -125,16 +99,6 @@ public interface MessageGroupStore { */ Iterator iterator(); - - /** - * Polls Message from this {@link MessageGroup} (in FIFO style if supported by the implementation) - * while also removing the polled {@link Message} - * - * @param groupId The group identifier. - * @return The message. - */ - Message pollMessageFromGroup(Object groupId); - /** * Completes this MessageGroup. Completion of the MessageGroup generally means * that this group should not be allowing any more mutating operation to be performed on it. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/PriorityCapableChannelMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/PriorityCapableChannelMessageStore.java new file mode 100644 index 00000000000..380bbfa2ea1 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/PriorityCapableChannelMessageStore.java @@ -0,0 +1,34 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.store; + +/** + * A {@link ChannelMessageStore} that supports the + * notion of message priority. It is left to implementations to determine what + * that means and whether all or a subset of priorities are supported. + * + * @author Gary Russell + * @since 4.0 + * + */ +public interface PriorityCapableChannelMessageStore extends ChannelMessageStore { + + /** + * @return true if message priority is enabled in this channel message store. + */ + boolean isPriorityEnabled(); + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java index 935e977e043..75ffc7fd7c4 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java @@ -44,7 +44,8 @@ * @since 2.0 */ @ManagedResource -public class SimpleMessageStore extends AbstractMessageGroupStore implements MessageStore, MessageGroupStore { +public class SimpleMessageStore extends AbstractMessageGroupStore + implements MessageStore, ChannelMessageStore { private volatile LockRegistry lockRegistry; diff --git a/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.0.xsd b/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.0.xsd index 9c177ebff17..20683d81d24 100644 --- a/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.0.xsd +++ b/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.0.xsd @@ -274,11 +274,26 @@ + + + + + + + + + A reference to a bean that implements 'org.springframework.integration.store.PriorityCapableChannelMessageStore'. + A message store that supports priority in a manner defined by the store. When set, the underlying + channel will be a 'QueueChannel` that delegates to a `MessageGroupQueue' backed by the store. + Not allowed if 'comparator' is set. + + + diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/ChannelWithMessageStoreParserTests-context.xml b/spring-integration-core/src/test/java/org/springframework/integration/config/ChannelWithMessageStoreParserTests-context.xml index b0727f98948..6abffbf0248 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/ChannelWithMessageStoreParserTests-context.xml +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/ChannelWithMessageStoreParserTests-context.xml @@ -23,4 +23,10 @@ + + + + + + diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/ChannelWithMessageStoreParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/ChannelWithMessageStoreParserTests.java index 85caeca9a5d..88f713f896f 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/ChannelWithMessageStoreParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/ChannelWithMessageStoreParserTests.java @@ -17,18 +17,23 @@ package org.springframework.integration.config; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.integration.store.MessageGroupStore; +import org.springframework.integration.store.PriorityCapableChannelMessageStore; +import org.springframework.integration.store.SimpleMessageStore; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.PollableChannel; -import org.springframework.integration.store.MessageGroupStore; -import org.springframework.integration.support.MessageBuilder; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @@ -39,7 +44,7 @@ @ContextConfiguration @RunWith(SpringJUnit4ClassRunner.class) public class ChannelWithMessageStoreParserTests { - + private static final String BASE_PACKAGE = "org.springframework.integration"; @Autowired @@ -53,9 +58,15 @@ public class ChannelWithMessageStoreParserTests { @Autowired private TestHandler handler; - @Autowired + @Autowired @Qualifier("messageStore") private MessageGroupStore messageGroupStore; + @Autowired @Qualifier("priority") + private PollableChannel priorityChannel; + + @Autowired @Qualifier("priorityMessageStore") + private MessageGroupStore priorityMessageStore; + @Test @DirtiesContext public void testActivatorSendsToPersistentQueue() throws Exception { @@ -65,17 +76,32 @@ public void testActivatorSendsToPersistentQueue() throws Exception { assertEquals("The message payload is not correct", "123", handler.getMessageString()); // The group id for buffered messages is the channel name assertEquals(1, messageGroupStore.getMessageGroup("messageStore:output").size()); - + Message result = output.receive(100); assertEquals("hello", result.getPayload()); assertEquals(0, messageGroupStore.getMessageGroup(BASE_PACKAGE+".store:output").size()); } + @Test + @DirtiesContext + public void testPriorityMessageStore() { + assertSame(this.priorityMessageStore, TestUtils.getPropertyValue(this.priorityChannel, "queue.messageGroupStore")); + } + private static Message createMessage(T payload, Object correlationId, int sequenceSize, int sequenceNumber, MessageChannel outputChannel) { return MessageBuilder.withPayload(payload).setCorrelationId(correlationId).setSequenceSize(sequenceSize) .setSequenceNumber(sequenceNumber).setReplyChannel(outputChannel).build(); } + public static class DummyPriorityMS extends SimpleMessageStore implements PriorityCapableChannelMessageStore { + + @Override + public boolean isPriorityEnabled() { + return true; + } + + } + } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java index 42376506670..744805c651e 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java @@ -46,6 +46,7 @@ import org.springframework.integration.jdbc.store.channel.OracleChannelMessageStoreQueryProvider; import org.springframework.integration.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider; import org.springframework.integration.store.AbstractMessageGroupStore; +import org.springframework.integration.store.ChannelMessageStore; import org.springframework.integration.store.MessageGroup; import org.springframework.integration.store.MessageGroupStore; import org.springframework.integration.store.MessageStore; @@ -89,7 +90,8 @@ * @since 2.2 */ @ManagedResource -public class JdbcChannelMessageStore extends AbstractMessageGroupStore implements InitializingBean { +public class JdbcChannelMessageStore extends AbstractMessageGroupStore + implements InitializingBean, ChannelMessageStore { private static final Log logger = LogFactory.getLog(JdbcChannelMessageStore.class); diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisChannelMessageStore.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisChannelMessageStore.java new file mode 100644 index 00000000000..f77f18a182f --- /dev/null +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisChannelMessageStore.java @@ -0,0 +1,135 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.redis.store; + +import java.util.List; +import java.util.Set; + +import org.springframework.beans.factory.BeanNameAware; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; +import org.springframework.integration.store.ChannelMessageStore; +import org.springframework.integration.store.MessageGroup; +import org.springframework.integration.store.SimpleMessageGroup; +import org.springframework.jmx.export.annotation.ManagedAttribute; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +/** + * Specialized Redis {@link ChannelMessageGroupStore} that uses a list to back a QueueChannel. + *

+ * Requires {@link #setBeanName(String)} which is used as part of the key. + * + * @author Gary Russell + * @since 4.0 + * + */ +public class RedisChannelMessageStore implements ChannelMessageStore, BeanNameAware, InitializingBean { + + private final RedisTemplate> redisTemplate; + + private String beanName; + + /** + * Construct a message store that uses Java Serialization for messages. + * + * @param connectionFactory The redis connection factory. + */ + public RedisChannelMessageStore(RedisConnectionFactory connectionFactory) { + this.redisTemplate = new RedisTemplate>(); + this.redisTemplate.setConnectionFactory(connectionFactory); + this.redisTemplate.setKeySerializer(new StringRedisSerializer()); + this.redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer()); + this.redisTemplate.afterPropertiesSet(); + } + + /** + * Use a different serializer (default {@link JdkSerializationRedisSerializer} for + * the {@link Message}. + * + * @param valueSerializer The value serializer. + */ + public void setValueSerializer(RedisSerializer valueSerializer) { + Assert.notNull(valueSerializer, "'valueSerializer' must not be null"); + this.redisTemplate.setValueSerializer(valueSerializer); + } + + @Override + public void setBeanName(String name) { + Assert.notNull(name, "'beanName' must not be null"); + this.beanName = name; + } + + protected String getBeanName() { + return beanName; + } + + protected RedisTemplate> getRedisTemplate() { + return redisTemplate; + } + + @Override + public void afterPropertiesSet() throws Exception { + Assert.notNull(this.beanName, "'beanName' must not be null"); + } + + @Override + @ManagedAttribute + public int messageGroupSize(Object groupId) { + return (int) this.redisTemplate.boundListOps(groupId).size().longValue(); + } + + @Override + public MessageGroup getMessageGroup(Object groupId) { + List> messages = this.redisTemplate.boundListOps(groupId).range(0, -1); + return new SimpleMessageGroup(messages, groupId); + } + + @Override + public MessageGroup addMessageToGroup(Object groupId, Message message) { + this.redisTemplate.boundListOps(groupId).leftPush(message); + return null; + } + + public void removeMessageGroup(Object groupId) { + this.redisTemplate.boundListOps(groupId).trim(1, 0); + } + + @Override + public Message pollMessageFromGroup(Object groupId) { + return this.redisTemplate.boundListOps(groupId).rightPop(); + } + + @ManagedAttribute + public int getMessageCountForAllMessageGroups() { + Set keys = this.redisTemplate.keys(this.beanName + ":*"); + int count = 0; + for (Object key : keys) { + count += this.messageGroupSize(key); + } + return count; + } + + @ManagedAttribute + public int getMessageGroupCount() { + return this.redisTemplate.keys(this.beanName + ":*").size(); + } + +} diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisChannelPriorityMessageStore.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisChannelPriorityMessageStore.java new file mode 100644 index 00000000000..904c65f2305 --- /dev/null +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisChannelPriorityMessageStore.java @@ -0,0 +1,170 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.redis.store; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.store.ChannelMessageStore; +import org.springframework.integration.store.MessageGroup; +import org.springframework.integration.store.PriorityCapableChannelMessageStore; +import org.springframework.integration.store.SimpleMessageGroup; +import org.springframework.jmx.export.annotation.ManagedAttribute; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +/** + * Specialized Redis {@link ChannelMessageStore} that uses lists to back a QueueChannel. + * Messages are removed in priority order ({@link IntegrationMessageHeaderAccessor#PRIORITY}. + * Priorities 0-9 are supported; higher values are treated with the same priority (none) + * as messages with no priority header (retrieved after any messages that have a priority) + *

+ * Requires that groupId is a String. + * + * @author Gary Russell + * @since 4.0 + * + */ +public class RedisChannelPriorityMessageStore extends RedisChannelMessageStore implements PriorityCapableChannelMessageStore { + + public RedisChannelPriorityMessageStore(RedisConnectionFactory connectionFactory) { + super(connectionFactory); + } + + @Override + public boolean isPriorityEnabled() { + return true; + } + + @Override + @ManagedAttribute + public int messageGroupSize(Object groupId) { + Assert.isInstanceOf(String.class, groupId); + List list = sortedKeys((String) groupId); + int count = 0; + for (String key : list) { + count += this.getRedisTemplate().boundListOps(key).size(); + } + return count; + } + + @Override + public MessageGroup getMessageGroup(Object groupId) { + Assert.isInstanceOf(String.class, groupId); + List> allMessages = new LinkedList>(); + List list = sortedKeys((String) groupId); + for (String key : list) { + List> messages = this.getRedisTemplate().boundListOps(key).range(0, -1); + allMessages.addAll(messages); + } + return new SimpleMessageGroup(allMessages, groupId); + } + + + @Override + public MessageGroup addMessageToGroup(Object groupId, Message message) { + Assert.isInstanceOf(String.class, groupId); + String key = (String) groupId; + Integer priority = new IntegrationMessageHeaderAccessor(message).getPriority(); + if (priority != null && priority < 10 && priority >=0) { + key = key + ":" + priority; + } + else { + key = key + ":z"; + } + return super.addMessageToGroup(key, message); + } + + @Override + public Message pollMessageFromGroup(Object groupId) { + Assert.isInstanceOf(String.class, groupId); + List list = sortedKeys((String) groupId); + Message message; + for (String key : list) { + message = super.pollMessageFromGroup(key); + if (message != null) { + return message; + } + } + return null; + } + + private List sortedKeys(String groupId) { + Set keys = this.getRedisTemplate().keys(groupId == null ? (this.getBeanName() + ":*") : (groupId + "*")); + List list = new LinkedList(); + for (Object key : keys) { + Assert.isInstanceOf(String.class, key); + list.add((String) key); + } + Collections.sort(list); + return list; + } + + @Override + @ManagedAttribute + public int getMessageGroupCount() { + Set narrowedKeys = narrowedKeys(); + return narrowedKeys.size(); + } + + + private Set narrowedKeys() { + Set keys = this.getRedisTemplate().keys(this.getBeanName() + ":*"); + Set narrowedKeys = new HashSet(); + Iterator iterator = keys.iterator(); + while (iterator.hasNext()) { + Object key = iterator.next(); + Assert.isInstanceOf(String.class, key); + String keyString = (String) key; + int lastIndexOfColon = keyString.lastIndexOf(":"); + if (keyString.indexOf(":") != lastIndexOfColon) { + narrowedKeys.add(keyString.substring(0, lastIndexOfColon)); + } + else { + narrowedKeys.add(key); + } + } + return narrowedKeys; + } + + @Override + public void removeMessageGroup(Object groupId) { + Assert.isInstanceOf(String.class, groupId); + List list = sortedKeys((String) groupId); + for (String key : list) { + super.removeMessageGroup(key); + } + } + + @Override + @ManagedAttribute + public int getMessageCountForAllMessageGroups() { + Set narrowedKeys = narrowedKeys(); + int count = 0; + for (Object key : narrowedKeys) { + count += this.messageGroupSize(key); + } + return count; + } + + +} diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisChannelMessageStoreTests-context.xml b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisChannelMessageStoreTests-context.xml new file mode 100644 index 00000000000..8c493a3662e --- /dev/null +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisChannelMessageStoreTests-context.xml @@ -0,0 +1,40 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisChannelMessageStoreTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisChannelMessageStoreTests.java new file mode 100644 index 00000000000..9dd2f291373 --- /dev/null +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisChannelMessageStoreTests.java @@ -0,0 +1,175 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.redis.store; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; + +import org.hamcrest.Matchers; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.message.MutableMessage; +import org.springframework.integration.redis.rules.RedisAvailable; +import org.springframework.integration.redis.rules.RedisAvailableTests; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +/** + * @author Gary Russell + * @since 4.0 + * + */ +@ContextConfiguration +@RunWith(SpringJUnit4ClassRunner.class) +public class RedisChannelMessageStoreTests extends RedisAvailableTests { + + @Autowired + private PollableChannel testChannel1; + + @Autowired + private PollableChannel testChannel2; + + @Autowired + private PollableChannel testChannel3; + + @Autowired + private PollableChannel testChannel4; + + @Autowired + private RedisChannelMessageStore cms; + + @Autowired + private RedisChannelMessageStore priorityCms; + + @Before + public void setup() { + this.cms.removeMessageGroup("cms:testChannel1"); + this.cms.removeMessageGroup("cms:testChannel2"); + this.priorityCms.removeMessageGroup("priorityCms:testChannel3"); + this.priorityCms.removeMessageGroup("priorityCms:testChannel4"); + } + + @Test + @RedisAvailable + public void testChannel() { + for (int i = 0; i < 10; i++) { + this.testChannel1.send(new GenericMessage(i)); + } + assertEquals(1, this.cms.getMessageGroupCount()); + assertEquals(10, this.cms.messageGroupSize("cms:testChannel1")); + assertEquals(10, this.cms.getMessageGroup("cms:testChannel1").size()); + for (int i = 0; i < 10; i++) { + this.testChannel2.send(new MutableMessage(i)); + } + assertEquals(2, this.cms.getMessageGroupCount()); + assertEquals(10, this.cms.messageGroupSize("cms:testChannel2")); + assertEquals(10, this.cms.getMessageGroup("cms:testChannel2").size()); + assertEquals(20, this.cms.getMessageCountForAllMessageGroups()); + for (int i = 0; i < 10; i++) { + Message out = this.testChannel1.receive(0); + assertThat(out, Matchers.instanceOf(GenericMessage.class)); + assertEquals(Integer.valueOf(i), out.getPayload()); + } + assertNull(this.testChannel1.receive(0)); + for (int i = 0; i < 10; i++) { + Message out = this.testChannel2.receive(0); + assertThat(out, Matchers.instanceOf(MutableMessage.class)); + assertEquals(Integer.valueOf(i), out.getPayload()); + } + assertNull(this.testChannel2.receive(0)); + assertEquals(0, this.cms.getMessageGroupCount()); + + for (int i = 0; i < 10; i++) { + this.testChannel1.send(new GenericMessage(i)); + } + assertEquals(1, this.cms.getMessageGroupCount()); + assertEquals(10, this.cms.messageGroupSize("cms:testChannel1")); + this.cms.removeMessageGroup("cms:testChannel1"); + assertEquals(0, this.cms.getMessageGroupCount()); + assertEquals(0, this.cms.messageGroupSize("cms:testChannel1")); + } + + @Test + @RedisAvailable + public void testPriority() { + for (int i = 0; i < 10; i++) { + Message message = MessageBuilder.withPayload(i).setPriority(9-i).build(); + this.testChannel3.send(message); + this.testChannel3.send(message); + } + this.testChannel3.send(MessageBuilder.withPayload(99).setPriority(199).build()); + this.testChannel3.send(MessageBuilder.withPayload(98).build()); + assertEquals(1, this.priorityCms.getMessageGroupCount()); + assertEquals(22, this.priorityCms.messageGroupSize("priorityCms:testChannel3")); + assertEquals(22, this.priorityCms.getMessageCountForAllMessageGroups()); + assertEquals(22, this.priorityCms.getMessageGroup("priorityCms:testChannel3").size()); + this.testChannel4.send(MessageBuilder.withPayload(98).build()); + this.testChannel4.send(MessageBuilder.withPayload(99).setPriority(5).build()); + assertEquals(2, this.priorityCms.getMessageGroupCount()); + assertEquals(2, this.priorityCms.getMessageGroup("priorityCms:testChannel4").size()); + assertEquals(2, this.priorityCms.messageGroupSize("priorityCms:testChannel4")); + assertEquals(24, this.priorityCms.getMessageCountForAllMessageGroups()); + for (int i = 0; i < 10; i++) { + Message m = this.testChannel3.receive(0); + assertNotNull(m); + assertEquals(Integer.valueOf(i), new IntegrationMessageHeaderAccessor(m).getPriority()); + assertEquals(Integer.valueOf(9-i), m.getPayload()); + m = this.testChannel3.receive(0); + assertNotNull(m); + assertEquals(Integer.valueOf(i), new IntegrationMessageHeaderAccessor(m).getPriority()); + } + Message m = this.testChannel3.receive(0); + assertNotNull(m); + assertEquals(Integer.valueOf(199), new IntegrationMessageHeaderAccessor(m).getPriority()); + assertEquals(Integer.valueOf(99), m.getPayload()); + m = this.testChannel3.receive(0); + assertNotNull(m); + assertNull(new IntegrationMessageHeaderAccessor(m).getPriority()); + assertEquals(Integer.valueOf(98), m.getPayload()); + assertEquals(0, this.priorityCms.messageGroupSize("priorityCms:testChannel3")); + + m = this.testChannel4.receive(0); + assertNotNull(m); + assertEquals(Integer.valueOf(5), new IntegrationMessageHeaderAccessor(m).getPriority()); + m = this.testChannel4.receive(0); + assertNotNull(m); + assertNull(new IntegrationMessageHeaderAccessor(m).getPriority()); + assertEquals(0, this.priorityCms.getMessageGroupCount()); + assertEquals(0, this.priorityCms.getMessageCountForAllMessageGroups()); + assertNull(this.testChannel3.receive(0)); + assertNull(this.testChannel4.receive(0)); + + for (int i = 0; i < 10; i++) { + this.testChannel3.send(new GenericMessage(i)); + } + assertEquals(1, this.priorityCms.getMessageGroupCount()); + assertEquals(10, this.priorityCms.messageGroupSize("priorityCms:testChannel3")); + this.priorityCms.removeMessageGroup("priorityCms:testChannel3"); + assertEquals(0, this.priorityCms.getMessageGroupCount()); + assertEquals(0, this.priorityCms.messageGroupSize("priorityCms:testChannel3")); + } + +} diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java index 907a23e330c..53e542a5753 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java @@ -29,6 +29,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import junit.framework.AssertionFailedError; + import org.junit.Ignore; import org.junit.Test; @@ -46,8 +48,6 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.GenericMessage; -import junit.framework.AssertionFailedError; - /** * @author Oleg Zhurakousky * @author Artem Bilan @@ -318,6 +318,7 @@ public void testConcurrentModifications() throws Exception{ executor = Executors.newCachedThreadPool(); executor.execute(new Runnable() { + @Override public void run() { MessageGroup group = store1.addMessageToGroup(1, message); if (group.getMessages().size() != 1){ @@ -327,6 +328,7 @@ public void run() { } }); executor.execute(new Runnable() { + @Override public void run() { MessageGroup group = store2.removeMessageFromGroup(1, message); if (group.getMessages().size() != 0){ @@ -367,6 +369,7 @@ public void testWithAggregatorWithShutdown(){ Message m3 = MessageBuilder.withPayload("3").setSequenceNumber(3).setSequenceSize(3).setCorrelationId(1).build(); input.send(m3); assertNotNull(output.receive(1000)); + context.close(); } } diff --git a/src/reference/docbook/channel.xml b/src/reference/docbook/channel.xml index 3e1b08a8283..ecf74a480da 100644 --- a/src/reference/docbook/channel.xml +++ b/src/reference/docbook/channel.xml @@ -577,7 +577,7 @@ payload to an Integer. the org.springframework.integration.jdbc package of that module (spring-integration-jdbc). - One important feature is that with any transactional persistent store (e.g., JdbcMessageStore), as long as the poller has a transaction configured, + One important feature is that with any transactional persistent store (e.g., JdbcChannelMessageStore), as long as the poller has a transaction configured, a Message removed from the store will only be permanently removed if the transaction completes successfully, otherwise the transaction will roll back and the Message will not be lost. @@ -585,6 +585,13 @@ payload to an Integer. related to "NoSQL" data stores provide the underlying support. Of course, you can always provide your own implementation of the MessageGroupStore interface if you cannot find one that meets your particular needs. + + Since version 4.0, it is recommended that QueueChannels are + configured to use a ChannelMessageStore if possible. These are generally optimized + for this use, when compared with a general message store. If the ChannelMessageStore + is a ChannelPriorityMessageStore the messages will be received in + FIFO within priority order. The notion of priority is determined by the message store implementation. +
@@ -652,6 +659,13 @@ payload to an Integer. ]]> + + Since version 4.0, the priority-channel child element supports + the message-store option (comparator is not allowed in that case). + The message store must be a ChannelPriorityMessageStore and, in this + case, the namespace parser will declare a QueueChannel instead of + a PriorityChannel. See . +
RendezvousChannel Configuration diff --git a/src/reference/docbook/redis.xml b/src/reference/docbook/redis.xml index 4fa5ea345ca..e5b6e206f79 100644 --- a/src/reference/docbook/redis.xml +++ b/src/reference/docbook/redis.xml @@ -384,16 +384,13 @@ rt.setConnectionFactory(redisConnectionFactory);]]> As described in EIP, a Message Store allows you to persist Messages. This can be very useful when dealing with components that have a capability to buffer messages - (QueueChannel, Aggregator, Resequencer, etc.) if reliability is a concern. + (Aggregator, Resequencer, etc.) if reliability is a concern. In Spring Integration, the MessageStore strategy also provides the foundation for the ClaimCheck pattern, which is described in EIP as well. - Spring Integration's Redis module provides the RedisMessageStore which is an implementation of both the - the MessageStore strategy (mainly used by the QueueChannel and ClaimCheck - patterns) and the MessageGroupStore strategy (mainly used by the Aggregator and - Resequencer patterns). + Spring Integration's Redis module provides the RedisMessageStore. @@ -401,17 +398,13 @@ rt.setConnectionFactory(redisConnectionFactory);]]> - - - - ]]> - Above is a sample RedisMessageStore configuration that shows its usage by a QueueChannel - and an Aggregator. As you can see it is a simple bean configuration, and it expects a + Above is a sample RedisMessageStore configuration that shows its usage by + an Aggregator. As you can see it is a simple bean configuration, and it expects a RedisConnectionFactory as a constructor argument. @@ -419,6 +412,45 @@ rt.setConnectionFactory(redisConnectionFactory);]]> However if you want to use a different serialization technique (e.g., JSON), you can provide your own serializer via the valueSerializer property of the RedisMessageStore. +
+ Redis Channel Message Stores + + The RedisMessageStore above maintains each group as a value under a single key + (the group id). While this can be used to back a QueueChannel for persistence, + a specialized RedisChannelMessageStore is provided for that purpose (since + version 4.0). This store uses a LIST for each channel and + LPUSH when sending and RPOP when receiving messages. This store also + uses JDK serialization by default, but the value serializer can be modified as described above. + + + It is recommended that this store is used for backing channels, instead of the general + RedisMessageStore. + + + + + + + +]]> + + The keys used to store the data has the form <storeBeanName>:<channelId> + (in the above example, redisMessageStore:somePersistentQueueChannel). + + + In addition, a subclass RedisChannelPriorityMessageStore is also provided. + When this is used with a QueueChannel, the messages are received in + (FIFO within) priority order. It use the standard IntegrationMessageHeaderAccessor.PRIORITY header and supports priority values + 0 - 9; messages with other priorities (and messages with no priority) are retrieved + in FIFO order after any messages with priority. + + + These stores implement only BasicMessageGroupStore and + do not implement MessageGroupStore; they + can only be used for situations such as backing a QueueChannel. + +
Redis Metadata Store diff --git a/src/reference/docbook/whats-new.xml b/src/reference/docbook/whats-new.xml index 819dabd3aa8..41bc362a7dd 100644 --- a/src/reference/docbook/whats-new.xml +++ b/src/reference/docbook/whats-new.xml @@ -65,6 +65,20 @@ >Spring Boot - AutoConfigure.
+
+ Redis Channel Message Stores + + A new Redis MessageGroupStore, that is optimized for + use when backing a QueueChannel for persistence, is now + provided. + For more information, see . + + + A new Redis ChannelPriorityMessageStore is now + provided. This can be used to retrieve messages by priority. + For more information, see . + +