From 72233b3ff983b35628bbbab76dc269b6a7e2a7a2 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 7 Apr 2014 16:02:33 +0300 Subject: [PATCH 1/5] INT-3339: Add `priority` to the `JdbcChannelMS` JIRA: https://jira.spring.io/browse/INT-3339 --- .../jdbc/store/JdbcChannelMessageStore.java | 54 +++++++++++++----- ...tractChannelMessageStoreQueryProvider.java | 10 ++-- .../jdbc/store/channel/schema-derby.sql | 3 +- .../jdbc/store/channel/schema-hsql.sql | 1 + .../jdbc/store/channel/schema-mysql.sql | 3 +- .../jdbc/store/channel/schema-oracle.sql | 1 + .../jdbc/store/channel/schema-postgresql.sql | 3 +- ...qlJdbcChannelMessageStoreTests-context.xml | 5 +- ...HsqlTxTimeoutMessageStoreTests-context.xml | 7 +++ .../HsqlTxTimeoutMessageStoreTests.java | 57 +++++++++++++++++++ .../JdbcChannelMessageStoreTests-context.xml | 8 --- ...qlJdbcChannelMessageStoreTests-context.xml | 5 +- 12 files changed, 117 insertions(+), 40 deletions(-) delete mode 100644 spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/JdbcChannelMessageStoreTests-context.xml diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java index 744805c651e..f631dddf158 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java @@ -38,6 +38,7 @@ import org.springframework.core.serializer.Serializer; import org.springframework.core.serializer.support.DeserializingConverter; import org.springframework.core.serializer.support.SerializingConverter; +import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.jdbc.JdbcMessageStore; import org.springframework.integration.jdbc.store.channel.ChannelMessageStoreQueryProvider; import org.springframework.integration.jdbc.store.channel.DerbyChannelMessageStoreQueryProvider; @@ -50,6 +51,7 @@ import org.springframework.integration.store.MessageGroup; import org.springframework.integration.store.MessageGroupStore; import org.springframework.integration.store.MessageStore; +import org.springframework.integration.store.PriorityCapableChannelMessageStore; import org.springframework.integration.store.SimpleMessageGroup; import org.springframework.integration.transaction.TransactionSynchronizationFactory; import org.springframework.integration.util.UUIDConverter; @@ -91,7 +93,7 @@ */ @ManagedResource public class JdbcChannelMessageStore extends AbstractMessageGroupStore - implements InitializingBean, ChannelMessageStore { + implements InitializingBean, ChannelMessageStore, PriorityCapableChannelMessageStore { private static final Log logger = LogFactory.getLog(JdbcChannelMessageStore.class); @@ -117,8 +119,6 @@ public class JdbcChannelMessageStore extends AbstractMessageGroupStore private ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider; - public static final int DEFAULT_LONG_STRING_LENGTH = 2500; - /** * The name of the message header that stores a flag to indicate that the message has been saved. This is an * optimization for the put method. @@ -148,6 +148,8 @@ public class JdbcChannelMessageStore extends AbstractMessageGroupStore private boolean usingIdCache = false; + private boolean priorityEnabled; + /** * Convenient constructor for configuration use. */ @@ -171,8 +173,6 @@ public JdbcChannelMessageStore(DataSource dataSource) { this.jdbcTemplate.setFetchSize(1); this.jdbcTemplate.setMaxRows(1); - - this.jdbcTemplate.afterPropertiesSet(); } /** @@ -188,8 +188,6 @@ public void setDataSource(DataSource dataSource) { this.jdbcTemplate.setFetchSize(1); this.jdbcTemplate.setMaxRows(1); - - this.jdbcTemplate.afterPropertiesSet(); } /** @@ -355,6 +353,15 @@ public void setUsingIdCache(boolean usingIdCache) { this.usingIdCache = usingIdCache; } + public void setPriorityEnabled(boolean priorityEnabled) { + this.priorityEnabled = priorityEnabled; + } + + @Override + public boolean isPriorityEnabled() { + return this.priorityEnabled; + } + /** * Check mandatory properties ({@link DataSource} and * {@link #setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider)}). If no {@link MessageRowMapper} was @@ -379,16 +386,17 @@ public void afterPropertiesSet() throws Exception { } if (this.jdbcTemplate.getFetchSize() != 1 && logger.isWarnEnabled()) { - logger.warn("The jdbcTemplate's fetchsize is not 1 but %s. This may cause FIFO issues with Oracle databases."); + logger.warn("The jdbcTemplate's fetchsize is not 1. This may cause FIFO issues with Oracle databases."); } + this.jdbcTemplate.afterPropertiesSet(); } /** * Store a message in the database. The groupId identifies the channel for which * the message is to be stored. * - * Keep in mind that the actual groupdId (Channel + * Keep in mind that the actual groupId (Channel * Identifier) is converted to a String-based UUID identifier. * * @param groupId the group id to store the message under @@ -396,13 +404,13 @@ public void afterPropertiesSet() throws Exception { */ @Override @SuppressWarnings({ "rawtypes", "unchecked" }) - public MessageGroup addMessageToGroup(Object groupId, Message message) { + public MessageGroup addMessageToGroup(Object groupId, final Message message) { final String groupKey = getKey(groupId); final long createdDate = System.currentTimeMillis(); final Message result = this.getMessageBuilderFactory().fromMessage(message).setHeader(SAVED_KEY, Boolean.TRUE) - .setHeader(CREATED_DATE_KEY, new Long(createdDate)).build(); + .setHeader(CREATED_DATE_KEY, createdDate).build(); final Map innerMap = (Map) new DirectFieldAccessor(result.getHeaders()).getPropertyValue("headers"); // using reflection to set ID since it is immutable through MessageHeaders @@ -421,7 +429,19 @@ public void setValues(PreparedStatement ps) throws SQLException { ps.setString(2, groupKey); ps.setString(3, region); ps.setLong(4, createdDate); - lobHandler.getLobCreator().setBlobAsBytes(ps, 5, messageBytes); + + Integer priority = new IntegrationMessageHeaderAccessor(message).getPriority(); + if (priority != null) { + ps.setInt(5, priority); + } + else { + /* + Since not all RDBMS implement 'NULLS LAST' order option, usage 'Integer.MIN_VALUE' + is a good compromise to present the 'null' priorities. + */ + ps.setInt(5, Integer.MIN_VALUE); + } + lobHandler.getLobCreator().setBlobAsBytes(ps, 6, messageBytes); } }); @@ -454,7 +474,7 @@ protected Message doPollForMessage(String groupIdKey) { parameters.addValue("region", region); parameters.addValue("group_key", groupIdKey); - final String query; + String query; final List> messages; @@ -463,9 +483,13 @@ protected Message doPollForMessage(String groupIdKey) { if (this.usingIdCache && !this.idCache.isEmpty()) { query = getQuery(this.channelMessageStoreQueryProvider.getPollFromGroupExcludeIdsQuery()); parameters.addValue("message_ids", idCache); - } else { + } + else { query = getQuery(this.channelMessageStoreQueryProvider.getPollFromGroupQuery()); } + if (this.priorityEnabled) { + query = query.replaceFirst("CREATED_DATE ASC", "MESSAGE_PRIORITY DESC, CREATED_DATE ASC"); + } messages = namedParameterJdbcTemplate.query(query, parameters, messageRowMapper); } finally { @@ -476,7 +500,7 @@ protected Message doPollForMessage(String groupIdKey) { Assert.isTrue(messages.size() == 0 || messages.size() == 1); if (messages.size() > 0){ - final Messagemessage = messages.get(0); + final Message message = messages.get(0); final String messageId = message.getHeaders().getId().toString(); if (this.usingIdCache) { diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/AbstractChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/AbstractChannelMessageStoreQueryProvider.java index 336a22b514c..7e5d8db6b87 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/AbstractChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/AbstractChannelMessageStoreQueryProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -15,6 +15,7 @@ /** * @author Gunnar Hillert + * @author Artem Bilan * @since 2.2 */ public abstract class AbstractChannelMessageStoreQueryProvider implements ChannelMessageStoreQueryProvider { @@ -23,9 +24,6 @@ public String getCountAllMessagesInGroupQuery() { return "SELECT COUNT(MESSAGE_ID) from %PREFIX%CHANNEL_MESSAGE where GROUP_KEY=? and REGION=?"; } - public abstract String getPollFromGroupExcludeIdsQuery(); - public abstract String getPollFromGroupQuery(); - public String getMessageQuery() { return "SELECT MESSAGE_ID, CREATED_DATE, MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE where MESSAGE_ID=? and GROUP_KEY=? and REGION=?"; } @@ -39,8 +37,8 @@ public String getDeleteMessageQuery() { } public String getCreateMessageQuery() { - return "INSERT into %PREFIX%CHANNEL_MESSAGE(MESSAGE_ID, GROUP_KEY, REGION, CREATED_DATE, MESSAGE_BYTES)" - + " values (?, ?, ?, ?, ?)"; + return "INSERT into %PREFIX%CHANNEL_MESSAGE(MESSAGE_ID, GROUP_KEY, REGION, CREATED_DATE, MESSAGE_PRIORITY, MESSAGE_BYTES)" + + " values (?, ?, ?, ?, ?, ?)"; } public String getDeleteMessageGroupQuery() { diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-derby.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-derby.sql index 3e46c287c8b..a86eff8159a 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-derby.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-derby.sql @@ -2,9 +2,10 @@ CREATE TABLE INT_CHANNEL_MESSAGE ( MESSAGE_ID CHAR(36) NOT NULL, GROUP_KEY CHAR(36) NOT NULL, CREATED_DATE BIGINT NOT NULL, + MESSAGE_PRIORITY INT, MESSAGE_BYTES BLOB, REGION VARCHAR(100) NOT NULL, constraint INT_CHANNEL_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION) ); -CREATE INDEX INT_CHANNEL_MSG_DATE_IDX ON INT_CHANNEL_MESSAGE (CREATED_DATE); \ No newline at end of file +CREATE INDEX INT_CHANNEL_MSG_DATE_IDX ON INT_CHANNEL_MESSAGE (CREATED_DATE); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-hsql.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-hsql.sql index 339f25869a4..64a846e6c43 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-hsql.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-hsql.sql @@ -2,6 +2,7 @@ CREATE TABLE INT_CHANNEL_MESSAGE ( MESSAGE_ID CHAR(36) NOT NULL, GROUP_KEY CHAR(36) NOT NULL, CREATED_DATE BIGINT NOT NULL, + MESSAGE_PRIORITY INT, MESSAGE_BYTES LONGVARBINARY, REGION VARCHAR(100) NOT NULL, constraint INT_CHANNEL_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION) diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-mysql.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-mysql.sql index 8ae9fb85b5f..fca20c658cf 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-mysql.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-mysql.sql @@ -2,10 +2,11 @@ CREATE TABLE INT_CHANNEL_MESSAGE ( MESSAGE_ID CHAR(36) NOT NULL, GROUP_KEY CHAR(36) NOT NULL, CREATED_DATE BIGINT NOT NULL, + MESSAGE_PRIORITY INT, MESSAGE_BYTES BLOB, REGION VARCHAR(100) NOT NULL, constraint INT_CHANNEL_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION) ) ENGINE=InnoDB; ALTER TABLE INT_CHANNEL_MESSAGE -ADD INDEX MSG_INDEX_DATE_IDX USING BTREE (CREATED_DATE ASC) ; \ No newline at end of file +ADD INDEX MSG_INDEX_DATE_IDX USING BTREE (CREATED_DATE ASC) ; diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-oracle.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-oracle.sql index ad884b801d9..313b6e23645 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-oracle.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-oracle.sql @@ -2,6 +2,7 @@ CREATE TABLE INT_CHANNEL_MESSAGE ( MESSAGE_ID CHAR(36) NOT NULL, GROUP_KEY CHAR(36) NOT NULL, CREATED_DATE NUMBER(19,0) NOT NULL, + MESSAGE_PRIORITY NUMBER, MESSAGE_BYTES BLOB, REGION VARCHAR2(100) NOT NULL, constraint INT_CHANNEL_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION) diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-postgresql.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-postgresql.sql index d39aa200b70..ac4b10e76c0 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-postgresql.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-postgresql.sql @@ -2,6 +2,7 @@ CREATE TABLE INT_CHANNEL_MESSAGE ( MESSAGE_ID character(36) NOT NULL, GROUP_KEY character(36) NOT NULL, CREATED_DATE BIGINT NOT NULL, + MESSAGE_PRIORITY INT, MESSAGE_BYTES bytea, REGION character varying(100) NOT NULL, constraint INT_CHANNEL_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION) @@ -9,4 +10,4 @@ CREATE TABLE INT_CHANNEL_MESSAGE ( CREATE INDEX MSG_INDEX_DATE_IDX ON INT_CHANNEL_MESSAGE - USING btree (created_date); \ No newline at end of file + USING btree (created_date); diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlJdbcChannelMessageStoreTests-context.xml b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlJdbcChannelMessageStoreTests-context.xml index 32fd55b6b14..05f95e156fb 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlJdbcChannelMessageStoreTests-context.xml +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlJdbcChannelMessageStoreTests-context.xml @@ -1,11 +1,8 @@ + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> - diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests-context.xml b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests-context.xml index 8090b96870c..2fbf9fa5115 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests-context.xml +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests-context.xml @@ -16,5 +16,12 @@ + + + + + + + diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java index 44cca78bf4f..f71c413119c 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java @@ -12,11 +12,19 @@ */ package org.springframework.integration.jdbc.store.channel; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + import java.util.concurrent.ExecutionException; import org.junit.Test; import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.PollableChannel; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @@ -31,6 +39,9 @@ @DirtiesContext // close at the end after class public class HsqlTxTimeoutMessageStoreTests extends AbstractTxTimeoutMessageStoreTests { + @Autowired + private PollableChannel priorityChannel; + @Test @Override public void test() throws InterruptedException { @@ -49,4 +60,50 @@ public void testInt3181ConcurrentPolling() throws InterruptedException { super.testInt3181ConcurrentPolling(); } + @Test + public void testPriorityChannel() throws Exception { + Message message = MessageBuilder.withPayload("1").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 1).build(); + priorityChannel.send(message); + message = MessageBuilder.withPayload("-1").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, -1).build(); + priorityChannel.send(message); + message = MessageBuilder.withPayload("3").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 3).build(); + priorityChannel.send(message); + message = MessageBuilder.withPayload("0").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 0).build(); + priorityChannel.send(message); + message = MessageBuilder.withPayload("2").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 2).build(); + priorityChannel.send(message); + message = MessageBuilder.withPayload("none").build(); + priorityChannel.send(message); + message = MessageBuilder.withPayload("31").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 3).build(); + priorityChannel.send(message); + + Message receive = priorityChannel.receive(1000); + assertNotNull(receive); + assertEquals("3", receive.getPayload()); + + receive = priorityChannel.receive(1000); + assertNotNull(receive); + assertEquals("31", receive.getPayload()); + + receive = priorityChannel.receive(1000); + assertNotNull(receive); + assertEquals("2", receive.getPayload()); + + receive = priorityChannel.receive(1000); + assertNotNull(receive); + assertEquals("1", receive.getPayload()); + + receive = priorityChannel.receive(1000); + assertNotNull(receive); + assertEquals("0", receive.getPayload()); + + receive = priorityChannel.receive(1000); + assertNotNull(receive); + assertEquals("-1", receive.getPayload()); + + receive = priorityChannel.receive(1000); + assertNotNull(receive); + assertEquals("none", receive.getPayload()); + } + } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/JdbcChannelMessageStoreTests-context.xml b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/JdbcChannelMessageStoreTests-context.xml deleted file mode 100644 index b400bfe7acf..00000000000 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/JdbcChannelMessageStoreTests-context.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlJdbcChannelMessageStoreTests-context.xml b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlJdbcChannelMessageStoreTests-context.xml index cbd2a03018c..2f93a51c988 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlJdbcChannelMessageStoreTests-context.xml +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlJdbcChannelMessageStoreTests-context.xml @@ -1,11 +1,8 @@ + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> - From 9db31220ed58395e8e96a0ce93206345183b6ab7 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 8 Apr 2014 19:22:42 +0300 Subject: [PATCH 2/5] INT-3339: Improve `priority` logic --- .../jdbc/store/JdbcChannelMessageStore.java | 24 +++++--- .../ChannelMessageStoreQueryProvider.java | 18 +++++- ...DerbyChannelMessageStoreQueryProvider.java | 18 +++++- .../HsqlChannelMessageStoreQueryProvider.java | 18 +++++- ...MySqlChannelMessageStoreQueryProvider.java | 18 +++++- ...racleChannelMessageStoreQueryProvider.java | 23 ++++++-- ...tgresChannelMessageStoreQueryProvider.java | 18 +++++- .../AbstractTxTimeoutMessageStoreTests.java | 52 ++++++++++++++++- .../DerbyTxTimeoutMessageStoreTests.java | 17 +++++- ...HsqlTxTimeoutMessageStoreTests-context.xml | 20 +------ .../HsqlTxTimeoutMessageStoreTests.java | 56 +------------------ ...ySqlTxTimeoutMessageStoreTests-context.xml | 13 +---- .../MySqlTxTimeoutMessageStoreTests.java | 17 +++++- ...acleTxTimeoutMessageStoreTests-context.xml | 13 +---- .../OracleTxTimeoutMessageStoreTests.java | 23 +++++++- ...gresTxTimeoutMessageStoreTests-context.xml | 13 +---- .../PostgresTxTimeoutMessageStoreTests.java | 23 +++++++- .../TxTimeoutMessageStoreTests-context.xml | 8 +++ .../src/test/resources/log4j.properties | 6 +- 19 files changed, 269 insertions(+), 129 deletions(-) diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java index f631dddf158..3e6e0538ee8 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java @@ -431,16 +431,14 @@ public void setValues(PreparedStatement ps) throws SQLException { ps.setLong(4, createdDate); Integer priority = new IntegrationMessageHeaderAccessor(message).getPriority(); - if (priority != null) { + + if (JdbcChannelMessageStore.this.priorityEnabled && priority != null) { ps.setInt(5, priority); } else { - /* - Since not all RDBMS implement 'NULLS LAST' order option, usage 'Integer.MIN_VALUE' - is a good compromise to present the 'null' priorities. - */ - ps.setInt(5, Integer.MIN_VALUE); + ps.setNull(5, Types.NUMERIC); } + lobHandler.getLobCreator().setBlobAsBytes(ps, 6, messageBytes); } }); @@ -481,11 +479,21 @@ protected Message doPollForMessage(String groupIdKey) { this.idCacheReadLock.lock(); try { if (this.usingIdCache && !this.idCache.isEmpty()) { - query = getQuery(this.channelMessageStoreQueryProvider.getPollFromGroupExcludeIdsQuery()); + if (this.priorityEnabled) { + query = getQuery(this.channelMessageStoreQueryProvider.getPriorityPollFromGroupExcludeIdsQuery()); + } + else { + query = getQuery(this.channelMessageStoreQueryProvider.getPollFromGroupExcludeIdsQuery()); + } parameters.addValue("message_ids", idCache); } else { - query = getQuery(this.channelMessageStoreQueryProvider.getPollFromGroupQuery()); + if (this.priorityEnabled) { + query = getQuery(this.channelMessageStoreQueryProvider.getPriorityPollFromGroupQuery()); + } + else { + query = getQuery(this.channelMessageStoreQueryProvider.getPollFromGroupQuery()); + } } if (this.priorityEnabled) { query = query.replaceFirst("CREATED_DATE ASC", "MESSAGE_PRIORITY DESC, CREATED_DATE ASC"); diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java index 11aea94f455..5b30160b02d 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -20,6 +20,7 @@ * {@link JdbcChannelMessageStore} to provide database-specific queries. * * @author Gunnar Hillert + * @author Artem Bilan * @since 2.2 */ public interface ChannelMessageStoreQueryProvider { @@ -47,6 +48,21 @@ public interface ChannelMessageStoreQueryProvider { */ String getPollFromGroupQuery(); + /** + * Get the query used to retrieve the oldest message by priority for a channel excluding + * messages that match the provided message ids. + * + * @return Sql Query + */ + String getPriorityPollFromGroupExcludeIdsQuery(); + + /** + * Get the query used to retrieve the oldest message by priority for a channel. + * + * @return Sql Query + */ + String getPriorityPollFromGroupQuery(); + /** * Query that retrieves a message for the provided message id, channel and * region. diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DerbyChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DerbyChannelMessageStoreQueryProvider.java index cd079b661c9..1221607babb 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DerbyChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DerbyChannelMessageStoreQueryProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -14,6 +14,7 @@ /** * @author Gunnar Hillert + * @author Artem Bilan * @since 2.2 * * https://blogs.oracle.com/kah/entry/derby_10_5_preview_fetch @@ -34,4 +35,19 @@ public String getPollFromGroupQuery() { "order by CREATED_DATE ASC FETCH FIRST ROW ONLY"; } + @Override + public String getPriorityPollFromGroupExcludeIdsQuery() { + return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " + + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC FETCH FIRST ROW ONLY"; + } + + @Override + public String getPriorityPollFromGroupQuery() { + return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC FETCH FIRST ROW ONLY"; + } + } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/HsqlChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/HsqlChannelMessageStoreQueryProvider.java index b507a6ab244..08febf4c036 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/HsqlChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/HsqlChannelMessageStoreQueryProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -14,6 +14,7 @@ /** * @author Gunnar Hillert + * @author Artem Bilan * @since 2.2 * */ @@ -33,4 +34,19 @@ public String getPollFromGroupQuery() { "order by CREATED_DATE ASC LIMIT 1"; } + @Override + public String getPriorityPollFromGroupExcludeIdsQuery() { + return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " + + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC LIMIT 1"; + } + + @Override + public String getPriorityPollFromGroupQuery() { + return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC LIMIT 1"; + } + } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/MySqlChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/MySqlChannelMessageStoreQueryProvider.java index 6a021bb35b1..185503c8ae0 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/MySqlChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/MySqlChannelMessageStoreQueryProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -14,6 +14,7 @@ /** * @author Gunnar Hillert + * @author Artem Bilan * @since 2.2 */ public class MySqlChannelMessageStoreQueryProvider extends AbstractChannelMessageStoreQueryProvider { @@ -32,4 +33,19 @@ public String getPollFromGroupQuery() { "order by CREATED_DATE ASC LIMIT 1"; } + @Override + public String getPriorityPollFromGroupExcludeIdsQuery() { + return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " + + "order by MESSAGE_PRIORITY DESC, CREATED_DATE ASC LIMIT 1"; + } + + @Override + public String getPriorityPollFromGroupQuery() { + return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + + "order by MESSAGE_PRIORITY DESC, CREATED_DATE ASC LIMIT 1"; + } + } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/OracleChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/OracleChannelMessageStoreQueryProvider.java index 77add088313..988cadbbaac 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/OracleChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/OracleChannelMessageStoreQueryProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -18,18 +18,18 @@ /** * Contains Oracle-specific queries for the {@link JdbcChannelMessageStore}. * Please ensure that the used {@link JdbcTemplate}'s fetchSize property is 1. - * + *

* Fore more details, please see: http://stackoverflow.com/questions/6117254/force-oracle-to-return-top-n-rows-with-skip-locked * * @author Gunnar Hillert + * @author Artem Bilan * @since 2.2 */ public class OracleChannelMessageStoreQueryProvider extends AbstractChannelMessageStoreQueryProvider { @Override public String getPollFromGroupExcludeIdsQuery() { - return - "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + + return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) order by CREATED_DATE ASC FOR UPDATE SKIP LOCKED"; } @@ -41,4 +41,19 @@ public String getPollFromGroupQuery() { "order by CREATED_DATE ASC FOR UPDATE SKIP LOCKED"; } + @Override + public String getPriorityPollFromGroupExcludeIdsQuery() { + return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " + + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC FOR UPDATE SKIP LOCKED"; + } + + @Override + public String getPriorityPollFromGroupQuery() { + return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC FOR UPDATE SKIP LOCKED"; + } + } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java index 87d7c9a91ad..4aaa60f1172 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -14,6 +14,7 @@ /** * @author Gunnar Hillert + * @author Artem Bilan * @since 2.2 */ public class PostgresChannelMessageStoreQueryProvider extends AbstractChannelMessageStoreQueryProvider { @@ -32,4 +33,19 @@ public String getPollFromGroupQuery() { "order by CREATED_DATE ASC LIMIT 1 FOR UPDATE"; } + @Override + public String getPriorityPollFromGroupExcludeIdsQuery() { + return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " + + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC LIMIT 1 FOR UPDATE"; + } + + @Override + public String getPriorityPollFromGroupQuery() { + return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC LIMIT 1 FOR UPDATE"; + } + } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractTxTimeoutMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractTxTimeoutMessageStoreTests.java index 0201ce4656f..88e2ce1a83c 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractTxTimeoutMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractTxTimeoutMessageStoreTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -13,6 +13,7 @@ package org.springframework.integration.jdbc.store.channel; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.concurrent.Callable; @@ -33,10 +34,12 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.jdbc.store.JdbcChannelMessageStore; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.PollableChannel; import org.springframework.messaging.support.GenericMessage; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; @@ -79,6 +82,8 @@ abstract class AbstractTxTimeoutMessageStoreTests { @Autowired private AtomicInteger errorAtomicInteger; + @Autowired + protected PollableChannel priorityChannel; public void test() throws InterruptedException { @@ -185,4 +190,49 @@ public void testInt3181ConcurrentPolling() throws InterruptedException { assertEquals(0, errorAtomicInteger.get()); } + public void testPriorityChannel() throws Exception { + Message message = MessageBuilder.withPayload("1").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 1).build(); + priorityChannel.send(message); + message = MessageBuilder.withPayload("-1").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, -1).build(); + priorityChannel.send(message); + message = MessageBuilder.withPayload("3").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 3).build(); + priorityChannel.send(message); + message = MessageBuilder.withPayload("0").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 0).build(); + priorityChannel.send(message); + message = MessageBuilder.withPayload("2").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 2).build(); + priorityChannel.send(message); + message = MessageBuilder.withPayload("none").build(); + priorityChannel.send(message); + message = MessageBuilder.withPayload("31").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 3).build(); + priorityChannel.send(message); + + Message receive = priorityChannel.receive(1000); + assertNotNull(receive); + assertEquals("3", receive.getPayload()); + + receive = priorityChannel.receive(1000); + assertNotNull(receive); + assertEquals("31", receive.getPayload()); + + receive = priorityChannel.receive(1000); + assertNotNull(receive); + assertEquals("2", receive.getPayload()); + + receive = priorityChannel.receive(1000); + assertNotNull(receive); + assertEquals("1", receive.getPayload()); + + receive = priorityChannel.receive(1000); + assertNotNull(receive); + assertEquals("0", receive.getPayload()); + + receive = priorityChannel.receive(1000); + assertNotNull(receive); + assertEquals("-1", receive.getPayload()); + + receive = priorityChannel.receive(1000); + assertNotNull(receive); + assertEquals("none", receive.getPayload()); + } + } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DerbyTxTimeoutMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DerbyTxTimeoutMessageStoreTests.java index b9543312421..cd27498fe76 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DerbyTxTimeoutMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DerbyTxTimeoutMessageStoreTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -12,6 +12,8 @@ */ package org.springframework.integration.jdbc.store.channel; +import java.util.concurrent.ExecutionException; + import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @@ -24,6 +26,7 @@ /** * * @author Gunnar Hillert + * @author Artem Bilan * */ @Ignore @@ -43,4 +46,16 @@ public void testInt3181ConcurrentPolling() throws InterruptedException { super.testInt3181ConcurrentPolling(); } + @Test + @Override + public void testInt2993IdCacheConcurrency() throws InterruptedException, ExecutionException { + super.testInt2993IdCacheConcurrency(); + } + + @Test + @Override + public void testPriorityChannel() throws Exception { + super.testPriorityChannel(); + } + } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests-context.xml b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests-context.xml index 2fbf9fa5115..f32aae8fd49 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests-context.xml +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests-context.xml @@ -1,27 +1,11 @@ + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> - - - - + - - - diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java index f71c413119c..6681ddc7331 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java @@ -12,19 +12,11 @@ */ package org.springframework.integration.jdbc.store.channel; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - import java.util.concurrent.ExecutionException; import org.junit.Test; import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.integration.IntegrationMessageHeaderAccessor; -import org.springframework.integration.support.MessageBuilder; -import org.springframework.messaging.Message; -import org.springframework.messaging.PollableChannel; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @@ -32,6 +24,7 @@ /** * * @author Gunnar Hillert + * @author Artem Bilan * */ @ContextConfiguration @@ -39,9 +32,6 @@ @DirtiesContext // close at the end after class public class HsqlTxTimeoutMessageStoreTests extends AbstractTxTimeoutMessageStoreTests { - @Autowired - private PollableChannel priorityChannel; - @Test @Override public void test() throws InterruptedException { @@ -61,49 +51,9 @@ public void testInt3181ConcurrentPolling() throws InterruptedException { } @Test + @Override public void testPriorityChannel() throws Exception { - Message message = MessageBuilder.withPayload("1").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 1).build(); - priorityChannel.send(message); - message = MessageBuilder.withPayload("-1").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, -1).build(); - priorityChannel.send(message); - message = MessageBuilder.withPayload("3").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 3).build(); - priorityChannel.send(message); - message = MessageBuilder.withPayload("0").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 0).build(); - priorityChannel.send(message); - message = MessageBuilder.withPayload("2").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 2).build(); - priorityChannel.send(message); - message = MessageBuilder.withPayload("none").build(); - priorityChannel.send(message); - message = MessageBuilder.withPayload("31").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 3).build(); - priorityChannel.send(message); - - Message receive = priorityChannel.receive(1000); - assertNotNull(receive); - assertEquals("3", receive.getPayload()); - - receive = priorityChannel.receive(1000); - assertNotNull(receive); - assertEquals("31", receive.getPayload()); - - receive = priorityChannel.receive(1000); - assertNotNull(receive); - assertEquals("2", receive.getPayload()); - - receive = priorityChannel.receive(1000); - assertNotNull(receive); - assertEquals("1", receive.getPayload()); - - receive = priorityChannel.receive(1000); - assertNotNull(receive); - assertEquals("0", receive.getPayload()); - - receive = priorityChannel.receive(1000); - assertNotNull(receive); - assertEquals("-1", receive.getPayload()); - - receive = priorityChannel.receive(1000); - assertNotNull(receive); - assertEquals("none", receive.getPayload()); + super.testPriorityChannel(); } } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlTxTimeoutMessageStoreTests-context.xml b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlTxTimeoutMessageStoreTests-context.xml index ef71626908d..8f40b358293 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlTxTimeoutMessageStoreTests-context.xml +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlTxTimeoutMessageStoreTests-context.xml @@ -1,19 +1,10 @@ + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> + diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlTxTimeoutMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlTxTimeoutMessageStoreTests.java index 7eaa27a7f74..3dc05918c20 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlTxTimeoutMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlTxTimeoutMessageStoreTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -12,6 +12,8 @@ */ package org.springframework.integration.jdbc.store.channel; +import java.util.concurrent.ExecutionException; + import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @@ -23,6 +25,7 @@ /** * * @author Gunnar Hillert + * @author Artem Bilan * */ @Ignore @@ -42,4 +45,16 @@ public void testInt3181ConcurrentPolling() throws InterruptedException { super.testInt3181ConcurrentPolling(); } + @Test + @Override + public void testInt2993IdCacheConcurrency() throws InterruptedException, ExecutionException { + super.testInt2993IdCacheConcurrency(); + } + + @Test + @Override + public void testPriorityChannel() throws Exception { + super.testPriorityChannel(); + } + } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/OracleTxTimeoutMessageStoreTests-context.xml b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/OracleTxTimeoutMessageStoreTests-context.xml index c3b49275246..b32449fc30b 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/OracleTxTimeoutMessageStoreTests-context.xml +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/OracleTxTimeoutMessageStoreTests-context.xml @@ -1,19 +1,10 @@ + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> + diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/OracleTxTimeoutMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/OracleTxTimeoutMessageStoreTests.java index 5b28021488e..fe7ecf43c6e 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/OracleTxTimeoutMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/OracleTxTimeoutMessageStoreTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -12,6 +12,8 @@ */ package org.springframework.integration.jdbc.store.channel; +import java.util.concurrent.ExecutionException; + import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @@ -22,6 +24,7 @@ /** * * @author Gunnar Hillert + * @author Artem Bilan * */ @Ignore @@ -35,4 +38,22 @@ public void test() throws InterruptedException { super.test(); } + @Test + @Override + public void testInt2993IdCacheConcurrency() throws InterruptedException, ExecutionException { + super.testInt2993IdCacheConcurrency(); + } + + @Test + @Override + public void testInt3181ConcurrentPolling() throws InterruptedException { + super.testInt3181ConcurrentPolling(); + } + + @Test + @Override + public void testPriorityChannel() throws Exception { + super.testPriorityChannel(); + } + } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests-context.xml b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests-context.xml index b9171cdcd4a..6cb4b3ae7b2 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests-context.xml +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests-context.xml @@ -1,19 +1,10 @@ + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> + diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests.java index 6bf7b7bf6d5..4acb4f5942b 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -12,6 +12,8 @@ */ package org.springframework.integration.jdbc.store.channel; +import java.util.concurrent.ExecutionException; + import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @@ -22,6 +24,7 @@ /** * * @author Gunnar Hillert + * @author Artem Bilan * */ @Ignore @@ -35,4 +38,22 @@ public void test() throws InterruptedException { super.test(); } + @Test + @Override + public void testInt2993IdCacheConcurrency() throws InterruptedException, ExecutionException { + super.testInt2993IdCacheConcurrency(); + } + + @Test + @Override + public void testInt3181ConcurrentPolling() throws InterruptedException { + super.testInt3181ConcurrentPolling(); + } + + @Test + @Override + public void testPriorityChannel() throws Exception { + super.testPriorityChannel(); + } + } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/TxTimeoutMessageStoreTests-context.xml b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/TxTimeoutMessageStoreTests-context.xml index 7276d88ed6a..7d82ee2ab29 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/TxTimeoutMessageStoreTests-context.xml +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/TxTimeoutMessageStoreTests-context.xml @@ -93,5 +93,13 @@ + + + + + + + + diff --git a/spring-integration-jdbc/src/test/resources/log4j.properties b/spring-integration-jdbc/src/test/resources/log4j.properties index 6cb45df1712..5c0370d306c 100644 --- a/spring-integration-jdbc/src/test/resources/log4j.properties +++ b/spring-integration-jdbc/src/test/resources/log4j.properties @@ -4,8 +4,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n -log4j.category.org.springframework.integration=INFO -log4j.category.org.apache.derby=INFO +log4j.category.org.springframework.integration=WARN +log4j.category.org.apache.derby=WARN log4j.category.org.springframework.jdbc=WARN -log4j.category.org.springframework.integration.jdbc.JdbcChannelMessageStore=INFO +log4j.category.org.springframework.integration.jdbc=WARN From db8bc7106517d16e561e9b397c1a81102129077f Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 9 Apr 2014 01:32:06 +0300 Subject: [PATCH 3/5] INT-3339: Add `MESSAGE_SEQUENCE` stuff --- .../store/BasicMessageGroupStore.java | 6 + .../integration/store/MessageGroupStore.java | 7 - .../jdbc/store/JdbcChannelMessageStore.java | 251 ++++++------------ ...DerbyChannelMessageStoreQueryProvider.java | 8 +- .../HsqlChannelMessageStoreQueryProvider.java | 14 +- ...MySqlChannelMessageStoreQueryProvider.java | 8 +- ...racleChannelMessageStoreQueryProvider.java | 14 +- ...tgresChannelMessageStoreQueryProvider.java | 8 +- .../jdbc/store/channel/schema-derby.sql | 4 +- .../jdbc/store/channel/schema-drop-derby.sql | 3 + .../jdbc/store/channel/schema-drop-hsql.sql | 3 + .../jdbc/store/channel/schema-drop-oracle.sql | 2 + .../store/channel/schema-drop-postgres.sql | 4 +- .../jdbc/store/channel/schema-hsql.sql | 6 +- .../jdbc/store/channel/schema-mysql.sql | 6 +- .../jdbc/store/channel/schema-oracle.sql | 7 +- .../jdbc/store/channel/schema-postgresql.sql | 8 +- .../HsqlTxTimeoutMessageStoreTests.java | 7 +- 18 files changed, 165 insertions(+), 201 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/BasicMessageGroupStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/BasicMessageGroupStore.java index 4ec1c633cc4..776dfc5695a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/BasicMessageGroupStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/BasicMessageGroupStore.java @@ -63,4 +63,10 @@ public interface BasicMessageGroupStore { */ Message pollMessageFromGroup(Object groupId); + /** + * Remove the message group with this id. + * + * @param groupId The id of the group to remove. + */ + void removeMessageGroup(Object groupId); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java index e5f6eb958cb..7b216ea82e8 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java @@ -59,13 +59,6 @@ public interface MessageGroupStore extends BasicMessageGroupStore { */ MessageGroup removeMessageFromGroup(Object key, Message messageToRemove); - /** - * Remove the message group with this id. - * - * @param groupId The id of the group to remove. - */ - void removeMessageGroup(Object groupId); - /** * Register a callback for when a message group is expired through {@link #expireMessageGroups(long)}. * diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java index 3e6e0538ee8..6d51d6c3aea 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java @@ -18,7 +18,6 @@ import java.sql.Types; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -32,13 +31,17 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.beans.BeansException; import org.springframework.beans.DirectFieldAccessor; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.serializer.Deserializer; import org.springframework.core.serializer.Serializer; import org.springframework.core.serializer.support.DeserializingConverter; import org.springframework.core.serializer.support.SerializingConverter; import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.jdbc.JdbcMessageStore; import org.springframework.integration.jdbc.store.channel.ChannelMessageStoreQueryProvider; import org.springframework.integration.jdbc.store.channel.DerbyChannelMessageStoreQueryProvider; @@ -46,13 +49,13 @@ import org.springframework.integration.jdbc.store.channel.MySqlChannelMessageStoreQueryProvider; import org.springframework.integration.jdbc.store.channel.OracleChannelMessageStoreQueryProvider; import org.springframework.integration.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider; -import org.springframework.integration.store.AbstractMessageGroupStore; -import org.springframework.integration.store.ChannelMessageStore; import org.springframework.integration.store.MessageGroup; import org.springframework.integration.store.MessageGroupStore; import org.springframework.integration.store.MessageStore; import org.springframework.integration.store.PriorityCapableChannelMessageStore; import org.springframework.integration.store.SimpleMessageGroup; +import org.springframework.integration.support.DefaultMessageBuilderFactory; +import org.springframework.integration.support.MessageBuilderFactory; import org.springframework.integration.transaction.TransactionSynchronizationFactory; import org.springframework.integration.util.UUIDConverter; import org.springframework.jdbc.core.JdbcOperations; @@ -92,8 +95,7 @@ * @since 2.2 */ @ManagedResource -public class JdbcChannelMessageStore extends AbstractMessageGroupStore - implements InitializingBean, ChannelMessageStore, PriorityCapableChannelMessageStore { +public class JdbcChannelMessageStore implements PriorityCapableChannelMessageStore, InitializingBean, BeanFactoryAware { private static final Log logger = LogFactory.getLog(JdbcChannelMessageStore.class); @@ -130,6 +132,8 @@ public class JdbcChannelMessageStore extends AbstractMessageGroupStore */ public static final String CREATED_DATE_KEY = JdbcChannelMessageStore.class.getSimpleName() + ".CREATED_DATE"; + private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory(); + private volatile String region = DEFAULT_REGION; private volatile String tablePrefix = DEFAULT_TABLE_PREFIX; @@ -215,15 +219,6 @@ public void setJdbcTemplate(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } - /** - * Method not implemented. - * @throws UnsupportedOperationException Method not supported. - */ - @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, final int sequenceNumber) { - throw new UnsupportedOperationException("Not implemented"); - } - /** * Override the {@link LobHandler} that is used to create and unpack large objects in SQL queries. The default is * fine for almost all platforms, but some Oracle drivers require a native implementation. @@ -362,6 +357,11 @@ public boolean isPriorityEnabled() { return this.priorityEnabled; } + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + this.messageBuilderFactory = IntegrationContextUtils.getMessageBuilderFactory(beanFactory); + } + /** * Check mandatory properties ({@link DataSource} and * {@link #setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider)}). If no {@link MessageRowMapper} was @@ -409,7 +409,7 @@ public MessageGroup addMessageToGroup(Object groupId, final Message message) final String groupKey = getKey(groupId); final long createdDate = System.currentTimeMillis(); - final Message result = this.getMessageBuilderFactory().fromMessage(message).setHeader(SAVED_KEY, Boolean.TRUE) + final Message result = this.messageBuilderFactory.fromMessage(message).setHeader(SAVED_KEY, Boolean.TRUE) .setHeader(CREATED_DATE_KEY, createdDate).build(); final Map innerMap = (Map) new DirectFieldAccessor(result.getHeaders()).getPropertyValue("headers"); @@ -446,90 +446,6 @@ public void setValues(PreparedStatement ps) throws SQLException { return getMessageGroup(groupId); } - /** - * Method not implemented. - * - * @throws UnsupportedOperationException Method not supported. - */ - @Override - public void completeGroup(Object groupId) { - throw new UnsupportedOperationException("Not implemented"); - } - - /** - * This method executes a call to the DB to get the oldest Message in the - * MessageGroup which in the context of the {@link JdbcChannelMessageStore} - * means the channel identifier. - * - * @param groupIdKey String representation of message group (Channel) ID - * @return a message; could be null if query produced no Messages - */ - protected Message doPollForMessage(String groupIdKey) { - - final NamedParameterJdbcTemplate namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(jdbcTemplate); - final MapSqlParameterSource parameters = new MapSqlParameterSource(); - - parameters.addValue("region", region); - parameters.addValue("group_key", groupIdKey); - - String query; - - final List> messages; - - this.idCacheReadLock.lock(); - try { - if (this.usingIdCache && !this.idCache.isEmpty()) { - if (this.priorityEnabled) { - query = getQuery(this.channelMessageStoreQueryProvider.getPriorityPollFromGroupExcludeIdsQuery()); - } - else { - query = getQuery(this.channelMessageStoreQueryProvider.getPollFromGroupExcludeIdsQuery()); - } - parameters.addValue("message_ids", idCache); - } - else { - if (this.priorityEnabled) { - query = getQuery(this.channelMessageStoreQueryProvider.getPriorityPollFromGroupQuery()); - } - else { - query = getQuery(this.channelMessageStoreQueryProvider.getPollFromGroupQuery()); - } - } - if (this.priorityEnabled) { - query = query.replaceFirst("CREATED_DATE ASC", "MESSAGE_PRIORITY DESC, CREATED_DATE ASC"); - } - messages = namedParameterJdbcTemplate.query(query, parameters, messageRowMapper); - } - finally { - this.idCacheReadLock.unlock(); - } - - - Assert.isTrue(messages.size() == 0 || messages.size() == 1); - if (messages.size() > 0){ - - final Message message = messages.get(0); - final String messageId = message.getHeaders().getId().toString(); - - if (this.usingIdCache) { - this.idCacheWriteLock.lock(); - try { - boolean added = this.idCache.add(messageId); - - if (logger.isDebugEnabled()) { - logger.debug(String.format("Polled message with id '%s' added: '%s'.", messageId, added)); - } - } - finally { - this.idCacheWriteLock.unlock(); - } - } - - return message; - } - return null; - } - /** * Helper method that converts the channel id to a UUID using * {@link UUIDConverter#getUUID(Object)}. @@ -541,27 +457,6 @@ private String getKey(Object input) { return input == null ? null : UUIDConverter.getUUID(input).toString(); } - /** - * Method not implemented. - * @return The message count. - * @throws UnsupportedOperationException Method not supported. - */ - @ManagedAttribute - public long getMessageCount() { - throw new UnsupportedOperationException("Not implemented"); - } - - /** - * Method not implemented. - * @return The message count. - * @throws UnsupportedOperationException Method not supported. - */ - @Override - @ManagedAttribute - public int getMessageCountForAllMessageGroups() { - throw new UnsupportedOperationException("Not implemented"); - } - /** * Not fully used. Only wraps the provided group id. */ @@ -576,10 +471,10 @@ public MessageGroup getMessageGroup(Object groupId) { * @return The message group count. * @throws UnsupportedOperationException Method not supported. */ - @Override @ManagedAttribute public int getMessageGroupCount() { - throw new UnsupportedOperationException("Not implemented"); + return this.jdbcTemplate.queryForObject(this.getQuery("SELECT COUNT(DISTINCT GROUP_KEY) from %PREFIX%CHANNEL_MESSAGE where REGION = ?"), + Integer.class, this.region); } /** @@ -601,16 +496,6 @@ protected String getQuery(String sqlQuery) { return query; } - /** - * Method not implemented. - * - * @throws UnsupportedOperationException Method not supported. - */ - @Override - public Iterator iterator() { - throw new UnsupportedOperationException("Not implemented"); - } - /** * Returns the number of messages persisted for the specified channel id (groupId) * and the specified region ({@link #setRegion(String)}). @@ -625,6 +510,11 @@ public int messageGroupSize(Object groupId) { Integer.class, key, this.region); } + public void removeMessageGroup(Object groupId) { + this.jdbcTemplate.update(this.getQuery(this.channelMessageStoreQueryProvider.getDeleteMessageGroupQuery()), + this.getKey(groupId), this.region); + } + /** * Polls the database for a new message that is persisted for the given * group id which represents the channel identifier. @@ -645,25 +535,81 @@ public Message pollMessageFromGroup(Object groupId) { } /** - * Remove a single message from the database. - * - * @param groupId The channel id to remove the message from. - * @param messageToRemove The message to remove. + * This method executes a call to the DB to get the oldest Message in the + * MessageGroup which in the context of the {@link JdbcChannelMessageStore} + * means the channel identifier. * + * @param groupIdKey String representation of message group (Channel) ID + * @return a message; could be null if query produced no Messages */ - @Override - public MessageGroup removeMessageFromGroup(Object groupId, Message messageToRemove) { + protected Message doPollForMessage(String groupIdKey) { - this.doRemoveMessageFromGroup(groupId, messageToRemove); + final NamedParameterJdbcTemplate namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(jdbcTemplate); + final MapSqlParameterSource parameters = new MapSqlParameterSource(); - return getMessageGroup(groupId); + parameters.addValue("region", region); + parameters.addValue("group_key", groupIdKey); + + String query; + + final List> messages; + + this.idCacheReadLock.lock(); + try { + if (this.usingIdCache && !this.idCache.isEmpty()) { + if (this.priorityEnabled) { + query = getQuery(this.channelMessageStoreQueryProvider.getPriorityPollFromGroupExcludeIdsQuery()); + } + else { + query = getQuery(this.channelMessageStoreQueryProvider.getPollFromGroupExcludeIdsQuery()); + } + parameters.addValue("message_ids", idCache); + } + else { + if (this.priorityEnabled) { + query = getQuery(this.channelMessageStoreQueryProvider.getPriorityPollFromGroupQuery()); + } + else { + query = getQuery(this.channelMessageStoreQueryProvider.getPollFromGroupQuery()); + } + } + messages = namedParameterJdbcTemplate.query(query, parameters, messageRowMapper); + } + finally { + this.idCacheReadLock.unlock(); + } + + + Assert.isTrue(messages.size() == 0 || messages.size() == 1); + if (messages.size() > 0){ + + final Message message = messages.get(0); + final String messageId = message.getHeaders().getId().toString(); + + if (this.usingIdCache) { + this.idCacheWriteLock.lock(); + try { + boolean added = this.idCache.add(messageId); + + if (logger.isDebugEnabled()) { + logger.debug(String.format("Polled message with id '%s' added: '%s'.", messageId, added)); + } + } + finally { + this.idCacheWriteLock.unlock(); + } + } + + return message; + } + return null; } private boolean doRemoveMessageFromGroup(Object groupId, Message messageToRemove) { final UUID id = messageToRemove.getHeaders().getId(); - int updated = jdbcTemplate.update(getQuery(channelMessageStoreQueryProvider.getDeleteMessageQuery()), new Object[] { getKey(id), getKey(groupId), region }, new int[] { - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }); + int updated = jdbcTemplate.update(getQuery(channelMessageStoreQueryProvider.getDeleteMessageQuery()), + new Object[] { getKey(id), getKey(groupId), region }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }); boolean result = updated != 0; if (result) { @@ -710,25 +656,4 @@ public int getSizeOfIdCache() { return this.idCache.size(); } - /** - * Will remove all messages from the message channel. - */ - @Override - public void removeMessageGroup(Object groupId) { - - final String groupKey = getKey(groupId); - - jdbcTemplate.update(getQuery(channelMessageStoreQueryProvider.getDeleteMessageGroupQuery()), new PreparedStatementSetter() { - @Override - public void setValues(PreparedStatement ps) throws SQLException { - if (logger.isDebugEnabled()){ - logger.debug("Marking messages with group key=" + groupKey); - } - ps.setString(1, groupKey); - ps.setString(2, region); - } - }); - - } - } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DerbyChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DerbyChannelMessageStoreQueryProvider.java index 1221607babb..86e38b1d194 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DerbyChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DerbyChannelMessageStoreQueryProvider.java @@ -25,14 +25,14 @@ public class DerbyChannelMessageStoreQueryProvider extends AbstractChannelMessag public String getPollFromGroupExcludeIdsQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + - "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) order by CREATED_DATE ASC FETCH FIRST ROW ONLY"; + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) order by CREATED_DATE, MESSAGE_SEQUENCE FETCH FIRST ROW ONLY"; } @Override public String getPollFromGroupQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + - "order by CREATED_DATE ASC FETCH FIRST ROW ONLY"; + "order by CREATED_DATE, MESSAGE_SEQUENCE FETCH FIRST ROW ONLY"; } @Override @@ -40,14 +40,14 @@ public String getPriorityPollFromGroupExcludeIdsQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " + - "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC FETCH FIRST ROW ONLY"; + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE FETCH FIRST ROW ONLY"; } @Override public String getPriorityPollFromGroupQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + - "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC FETCH FIRST ROW ONLY"; + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE FETCH FIRST ROW ONLY"; } } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/HsqlChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/HsqlChannelMessageStoreQueryProvider.java index 08febf4c036..09ca2ea4964 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/HsqlChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/HsqlChannelMessageStoreQueryProvider.java @@ -20,18 +20,24 @@ */ public class HsqlChannelMessageStoreQueryProvider extends AbstractChannelMessageStoreQueryProvider { + @Override + public String getCreateMessageQuery() { + return "INSERT into %PREFIX%CHANNEL_MESSAGE(MESSAGE_ID, GROUP_KEY, REGION, CREATED_DATE, MESSAGE_PRIORITY, MESSAGE_SEQUENCE, MESSAGE_BYTES)" + + " values (?, ?, ?, ?, ?, NEXT VALUE FOR %PREFIX%MESSAGE_SEQ, ?)"; + } + @Override public String getPollFromGroupExcludeIdsQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + - "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) order by CREATED_DATE ASC LIMIT 1"; + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) order by CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1"; } @Override public String getPollFromGroupQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + - "order by CREATED_DATE ASC LIMIT 1"; + "order by CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1"; } @Override @@ -39,14 +45,14 @@ public String getPriorityPollFromGroupExcludeIdsQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " + - "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC LIMIT 1"; + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1"; } @Override public String getPriorityPollFromGroupQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + - "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC LIMIT 1"; + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1"; } } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/MySqlChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/MySqlChannelMessageStoreQueryProvider.java index 185503c8ae0..8d8ac215a24 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/MySqlChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/MySqlChannelMessageStoreQueryProvider.java @@ -23,14 +23,14 @@ public class MySqlChannelMessageStoreQueryProvider extends AbstractChannelMessag public String getPollFromGroupExcludeIdsQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + - "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) order by CREATED_DATE ASC LIMIT 1"; + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) order by CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1"; } @Override public String getPollFromGroupQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + - "order by CREATED_DATE ASC LIMIT 1"; + "order by CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1"; } @Override @@ -38,14 +38,14 @@ public String getPriorityPollFromGroupExcludeIdsQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " + - "order by MESSAGE_PRIORITY DESC, CREATED_DATE ASC LIMIT 1"; + "order by MESSAGE_PRIORITY DESC, CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1"; } @Override public String getPriorityPollFromGroupQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + - "order by MESSAGE_PRIORITY DESC, CREATED_DATE ASC LIMIT 1"; + "order by MESSAGE_PRIORITY DESC, CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1"; } } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/OracleChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/OracleChannelMessageStoreQueryProvider.java index 988cadbbaac..b31eb41821f 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/OracleChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/OracleChannelMessageStoreQueryProvider.java @@ -27,18 +27,24 @@ */ public class OracleChannelMessageStoreQueryProvider extends AbstractChannelMessageStoreQueryProvider { + @Override + public String getCreateMessageQuery() { + return "INSERT into %PREFIX%CHANNEL_MESSAGE(MESSAGE_ID, GROUP_KEY, REGION, CREATED_DATE, MESSAGE_PRIORITY, MESSAGE_SEQUENCE, MESSAGE_BYTES)" + + " values (?, ?, ?, ?, ?, %PREFIX%MESSAGE_SEQ.NEXTVAL, ?)"; + } + @Override public String getPollFromGroupExcludeIdsQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + - "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) order by CREATED_DATE ASC FOR UPDATE SKIP LOCKED"; + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) order by CREATED_DATE, MESSAGE_SEQUENCE FOR UPDATE SKIP LOCKED"; } @Override public String getPollFromGroupQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + - "order by CREATED_DATE ASC FOR UPDATE SKIP LOCKED"; + "order by CREATED_DATE, MESSAGE_SEQUENCE FOR UPDATE SKIP LOCKED"; } @Override @@ -46,14 +52,14 @@ public String getPriorityPollFromGroupExcludeIdsQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " + - "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC FOR UPDATE SKIP LOCKED"; + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE FOR UPDATE SKIP LOCKED"; } @Override public String getPriorityPollFromGroupQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + - "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC FOR UPDATE SKIP LOCKED"; + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE FOR UPDATE SKIP LOCKED"; } } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java index 4aaa60f1172..70487cbaba2 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java @@ -23,14 +23,14 @@ public class PostgresChannelMessageStoreQueryProvider extends AbstractChannelMes public String getPollFromGroupExcludeIdsQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + - "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) order by CREATED_DATE ASC LIMIT 1 FOR UPDATE"; + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) order by CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1 FOR UPDATE"; } @Override public String getPollFromGroupQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + - "order by CREATED_DATE ASC LIMIT 1 FOR UPDATE"; + "order by CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1 FOR UPDATE"; } @Override @@ -38,14 +38,14 @@ public String getPriorityPollFromGroupExcludeIdsQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " + - "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC LIMIT 1 FOR UPDATE"; + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1 FOR UPDATE"; } @Override public String getPriorityPollFromGroupQuery() { return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + - "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE ASC LIMIT 1 FOR UPDATE"; + "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1 FOR UPDATE"; } } diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-derby.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-derby.sql index a86eff8159a..f62b27367c0 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-derby.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-derby.sql @@ -3,9 +3,11 @@ CREATE TABLE INT_CHANNEL_MESSAGE ( GROUP_KEY CHAR(36) NOT NULL, CREATED_DATE BIGINT NOT NULL, MESSAGE_PRIORITY INT, + MESSAGE_SEQUENCE BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), MESSAGE_BYTES BLOB, REGION VARCHAR(100) NOT NULL, constraint INT_CHANNEL_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION) ); -CREATE INDEX INT_CHANNEL_MSG_DATE_IDX ON INT_CHANNEL_MESSAGE (CREATED_DATE); +CREATE INDEX INT_CHANNEL_MSG_DATE_IDX ON INT_CHANNEL_MESSAGE (CREATED_DATE, MESSAGE_SEQUENCE); +CREATE INDEX INT_CHANNEL_MSG_PRIORITY_IDX ON INT_CHANNEL_MESSAGE (MESSAGE_PRIORITY DESC, CREATED_DATE, MESSAGE_SEQUENCE); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-derby.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-derby.sql index ce4d2c90b7e..9a37bb5670a 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-derby.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-derby.sql @@ -1 +1,4 @@ DROP TABLE INT_CHANNEL_MESSAGE; +DROP INDEX INT_CHANNEL_MSG_DATE_IDX; +DROP INDEX INT_CHANNEL_MSG_PRIORITY_IDX; + diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-hsql.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-hsql.sql index ce4d2c90b7e..257045f2f4a 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-hsql.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-hsql.sql @@ -1 +1,4 @@ DROP TABLE INT_CHANNEL_MESSAGE; +DROP INDEX INT_CHANNEL_MSG_DATE_IDX; +DROP INDEX INT_CHANNEL_MSG_PRIORITY_IDX; +DROP SEQUENCE INT_MESSAGE_SEQ; diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-oracle.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-oracle.sql index 3b9c9caf8f6..f907782103b 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-oracle.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-oracle.sql @@ -1,3 +1,5 @@ DROP INDEX INT_CHANNEL_MSG_DATE_IDX; DROP TABLE INT_CHANNEL_MESSAGE; +DROP INDEX INT_CHANNEL_MSG_PRIORITY_IDX; +DROP SEQUENCE INT_MESSAGE_SEQ; diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-postgres.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-postgres.sql index b83c9dce6e1..257045f2f4a 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-postgres.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-drop-postgres.sql @@ -1,2 +1,4 @@ DROP TABLE INT_CHANNEL_MESSAGE; - +DROP INDEX INT_CHANNEL_MSG_DATE_IDX; +DROP INDEX INT_CHANNEL_MSG_PRIORITY_IDX; +DROP SEQUENCE INT_MESSAGE_SEQ; diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-hsql.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-hsql.sql index 64a846e6c43..2f475e9089a 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-hsql.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-hsql.sql @@ -3,9 +3,13 @@ CREATE TABLE INT_CHANNEL_MESSAGE ( GROUP_KEY CHAR(36) NOT NULL, CREATED_DATE BIGINT NOT NULL, MESSAGE_PRIORITY INT, + MESSAGE_SEQUENCE BIGINT NOT NULL, MESSAGE_BYTES LONGVARBINARY, REGION VARCHAR(100) NOT NULL, constraint INT_CHANNEL_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION) ); -CREATE INDEX INT_CHANNEL_MSG_DATE_IDX ON INT_CHANNEL_MESSAGE (CREATED_DATE); +CREATE INDEX INT_CHANNEL_MSG_DATE_IDX ON INT_CHANNEL_MESSAGE (CREATED_DATE, MESSAGE_SEQUENCE); +CREATE INDEX INT_CHANNEL_MSG_PRIORITY_IDX ON INT_CHANNEL_MESSAGE (MESSAGE_PRIORITY DESC, CREATED_DATE, MESSAGE_SEQUENCE); + +CREATE SEQUENCE INT_MESSAGE_SEQ AS BIGINT START WITH 1 INCREMENT BY 1; diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-mysql.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-mysql.sql index fca20c658cf..1989e4872c5 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-mysql.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-mysql.sql @@ -3,10 +3,14 @@ CREATE TABLE INT_CHANNEL_MESSAGE ( GROUP_KEY CHAR(36) NOT NULL, CREATED_DATE BIGINT NOT NULL, MESSAGE_PRIORITY INT, + MESSAGE_SEQUENCE BIGINT AUTO_INCREMENT UNIQUE, MESSAGE_BYTES BLOB, REGION VARCHAR(100) NOT NULL, constraint INT_CHANNEL_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION) ) ENGINE=InnoDB; ALTER TABLE INT_CHANNEL_MESSAGE -ADD INDEX MSG_INDEX_DATE_IDX USING BTREE (CREATED_DATE ASC) ; +ADD INDEX MSG_INDEX_DATE_IDX USING BTREE (CREATED_DATE, MESSAGE_SEQUENCE); + +ALTER TABLE INT_CHANNEL_MESSAGE +ADD INDEX MSG_INDEX_PRIORITY_IDX USING BTREE (MESSAGE_PRIORITY DESC, CREATED_DATE, MESSAGE_SEQUENCE); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-oracle.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-oracle.sql index 313b6e23645..d384813e2f1 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-oracle.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-oracle.sql @@ -3,10 +3,13 @@ CREATE TABLE INT_CHANNEL_MESSAGE ( GROUP_KEY CHAR(36) NOT NULL, CREATED_DATE NUMBER(19,0) NOT NULL, MESSAGE_PRIORITY NUMBER, + MESSAGE_SEQUENCE NUMBER NOT NULL, MESSAGE_BYTES BLOB, REGION VARCHAR2(100) NOT NULL, constraint INT_CHANNEL_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION) ); -CREATE INDEX INT_CHANNEL_MSG_DATE_IDX - ON INT_CHANNEL_MESSAGE (CREATED_DATE); +CREATE INDEX INT_CHANNEL_MSG_DATE_IDX ON INT_CHANNEL_MESSAGE (CREATED_DATE, MESSAGE_SEQUENCE); +CREATE INDEX INT_CHANNEL_MSG_PRIORITY_IDX ON INT_CHANNEL_MESSAGE (MESSAGE_PRIORITY DESC, CREATED_DATE, MESSAGE_SEQUENCE); + +CREATE SEQUENCE INT_MESSAGE_SEQ START WITH 1 INCREMENT BY 1 NOCACHE NOCYCLE; diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-postgresql.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-postgresql.sql index ac4b10e76c0..f0edc5dc412 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-postgresql.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/store/channel/schema-postgresql.sql @@ -3,11 +3,13 @@ CREATE TABLE INT_CHANNEL_MESSAGE ( GROUP_KEY character(36) NOT NULL, CREATED_DATE BIGINT NOT NULL, MESSAGE_PRIORITY INT, + MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('INT_MESSAGE_SEQ'), MESSAGE_BYTES bytea, REGION character varying(100) NOT NULL, constraint INT_CHANNEL_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION) ); -CREATE INDEX MSG_INDEX_DATE_IDX - ON INT_CHANNEL_MESSAGE - USING btree (created_date); +CREATE INDEX MSG_INDEX_DATE_IDX ON INT_CHANNEL_MESSAGE USING btree (CREATED_DATE, MESSAGE_SEQUENCE); +CREATE INDEX INT_CHANNEL_MSG_PRIORITY_IDX ON INT_CHANNEL_MESSAGE USING btree (MESSAGE_PRIORITY DESC, CREATED_DATE, MESSAGE_SEQUENCE); + +CREATE SEQUENCE INT_MESSAGE_SEQ START WITH 1 INCREMENT BY 1 NO CYCLE; diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java index 6681ddc7331..b80798dbf06 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java @@ -14,10 +14,11 @@ import java.util.concurrent.ExecutionException; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; -import org.springframework.test.annotation.DirtiesContext; +import org.springframework.integration.test.support.LongRunningIntegrationTest; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @@ -29,9 +30,11 @@ */ @ContextConfiguration @RunWith(SpringJUnit4ClassRunner.class) -@DirtiesContext // close at the end after class public class HsqlTxTimeoutMessageStoreTests extends AbstractTxTimeoutMessageStoreTests { + @Rule + public LongRunningIntegrationTest longTests = new LongRunningIntegrationTest(); + @Test @Override public void test() throws InterruptedException { From a9725f9fe6152caba91e1bc7c96b1b912c8cb850 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 9 Apr 2014 17:06:50 +0300 Subject: [PATCH 4/5] INT-3339: Docs --- src/reference/docbook/jdbc.xml | 14 +++++++++++++- src/reference/docbook/whats-new.xml | 11 +++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/reference/docbook/jdbc.xml b/src/reference/docbook/jdbc.xml index 04ffb9f3fed..5de2fa3b5cd 100644 --- a/src/reference/docbook/jdbc.xml +++ b/src/reference/docbook/jdbc.xml @@ -520,7 +520,19 @@ …]]> - + + Priority Channel + + Starting with version 4.0 the JdbcChannelMessageStore + implements PriorityCapableChannelMessageStore and provides + priorityEnabled option allowing it to be used as a message-store + reference for priority-queues. For this purpose the INT_CHANNEL_MESSAGE + provides the MESSAGE_PRIORITY column to store the value of PRIORITY Message header. + In addition the new MESSAGE_SEQUENCE column is also provide to achieve robust 'the oldest' polling + mechanism. Having that the messages are polled (selected) from database with order + order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE. + +

Initializing the Database diff --git a/src/reference/docbook/whats-new.xml b/src/reference/docbook/whats-new.xml index c043b1eb93c..2a02d858521 100644 --- a/src/reference/docbook/whats-new.xml +++ b/src/reference/docbook/whats-new.xml @@ -253,5 +253,16 @@ .
+
+ JdbcChannelMessageStore and PriorityChannel + + The JdbcChannelMessageStore now implements + PriorityCapableChannelMessageStore, allowing it to be used as + a message-store reference for priority-queues. + For more information, see , + , and + . + +
From f021e10a83390cb21fe876f82992cf433ad49ef7 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 9 Apr 2014 21:39:52 +0300 Subject: [PATCH 5/5] INT-3339: Polishing and PR comments --- .../jdbc/store/JdbcChannelMessageStore.java | 12 ++--- .../AbstractJdbcChannelMessageStoreTests.java | 20 ++++++-- .../AbstractTxTimeoutMessageStoreTests.java | 48 +++++++++++++++++-- .../DerbyTxTimeoutMessageStoreTests.java | 31 ------------ .../HsqlJdbcChannelMessageStoreTests.java | 26 ---------- .../HsqlTxTimeoutMessageStoreTests.java | 30 ------------ .../MySqlJdbcChannelMessageStoreTests.java | 27 +---------- .../MySqlTxTimeoutMessageStoreTests.java | 35 +------------- .../OracleTxTimeoutMessageStoreTests.java | 31 +----------- .../PostgresTxTimeoutMessageStoreTests.java | 31 +----------- src/reference/docbook/jdbc.xml | 27 ++++++++++- 11 files changed, 94 insertions(+), 224 deletions(-) diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java index 6d51d6c3aea..1f0722e1b17 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java @@ -199,7 +199,7 @@ public void setDataSource(DataSource dataSource) { * * @param deserializer the deserializer to set */ - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"unchecked", "rawtypes"}) public void setDeserializer(Deserializer> deserializer) { this.deserializer = new DeserializingConverter((Deserializer) deserializer); } @@ -403,7 +403,7 @@ public void afterPropertiesSet() throws Exception { * @param message a message */ @Override - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({"rawtypes", "unchecked"}) public MessageGroup addMessageToGroup(Object groupId, final Message message) { final String groupKey = getKey(groupId); @@ -422,7 +422,7 @@ public MessageGroup addMessageToGroup(Object groupId, final Message message) jdbcTemplate.update(getQuery(channelMessageStoreQueryProvider.getCreateMessageQuery()), new PreparedStatementSetter() { @Override public void setValues(PreparedStatement ps) throws SQLException { - if (logger.isDebugEnabled()){ + if (logger.isDebugEnabled()) { logger.debug("Inserting message with id key=" + messageId); } ps.setString(1, messageId); @@ -525,7 +525,7 @@ public Message pollMessageFromGroup(Object groupId) { final String key = getKey(groupId); final Message polledMessage = this.doPollForMessage(key); - if (polledMessage != null){ + if (polledMessage != null) { if (!this.doRemoveMessageFromGroup(groupId, polledMessage)) { return null; } @@ -581,7 +581,7 @@ protected Message doPollForMessage(String groupIdKey) { Assert.isTrue(messages.size() == 0 || messages.size() == 1); - if (messages.size() > 0){ + if (messages.size() > 0) { final Message message = messages.get(0); final String messageId = message.getHeaders().getId().toString(); @@ -609,7 +609,7 @@ private boolean doRemoveMessageFromGroup(Object groupId, Message messageToRem final UUID id = messageToRemove.getHeaders().getId(); int updated = jdbcTemplate.update(getQuery(channelMessageStoreQueryProvider.getDeleteMessageQuery()), - new Object[] { getKey(id), getKey(groupId), region }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }); + new Object[] {getKey(id), getKey(groupId), region}, new int[] {Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}); boolean result = updated != 0; if (result) { diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractJdbcChannelMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractJdbcChannelMessageStoreTests.java index ead567e806b..742da64c863 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractJdbcChannelMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractJdbcChannelMessageStoreTests.java @@ -16,16 +16,20 @@ package org.springframework.integration.jdbc.store.channel; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.*; import javax.sql.DataSource; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.messaging.Message; import org.springframework.integration.jdbc.store.JdbcChannelMessageStore; import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; @@ -36,7 +40,10 @@ /** * @author Gunnar Hillert */ -public class AbstractJdbcChannelMessageStoreTests { + +@RunWith(SpringJUnit4ClassRunner.class) +@DirtiesContext // close at the end after class +public abstract class AbstractJdbcChannelMessageStoreTests { protected static final String TEST_MESSAGE_GROUP = "AbstractJdbcChannelMessageStoreTests"; @@ -51,6 +58,7 @@ public class AbstractJdbcChannelMessageStoreTests { @Autowired protected ChannelMessageStoreQueryProvider queryProvider; + @Before public void init() throws Exception { messageStore = new JdbcChannelMessageStore(dataSource); messageStore.setRegion("AbstractJdbcChannelMessageStoreTests"); @@ -59,11 +67,13 @@ public void init() throws Exception { messageStore.removeMessageGroup("AbstractJdbcChannelMessageStoreTests"); } + @Test public void testGetNonExistentMessageFromGroup() throws Exception { Message result = messageStore.pollMessageFromGroup(TEST_MESSAGE_GROUP); assertNull(result); } + @Test public void testAddAndGet() throws Exception { final Message message = MessageBuilder.withPayload("Cartman and Kenny") .setHeader("homeTown", "Southpark") diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractTxTimeoutMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractTxTimeoutMessageStoreTests.java index 88e2ce1a83c..ddffcad8158 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractTxTimeoutMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractTxTimeoutMessageStoreTests.java @@ -12,10 +12,10 @@ */ package org.springframework.integration.jdbc.store.channel; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; +import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.CountDownLatch; @@ -30,17 +30,24 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.hamcrest.Matchers; import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.jdbc.store.JdbcChannelMessageStore; import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.util.UUIDConverter; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; @@ -53,7 +60,9 @@ * @author Gunnar Hillert * @author Artem Bilan */ -abstract class AbstractTxTimeoutMessageStoreTests { +@RunWith(SpringJUnit4ClassRunner.class) +@DirtiesContext // close at the end after class +public abstract class AbstractTxTimeoutMessageStoreTests { private static final Log log = LogFactory.getLog(AbstractTxTimeoutMessageStoreTests.class); @@ -85,6 +94,7 @@ abstract class AbstractTxTimeoutMessageStoreTests { @Autowired protected PollableChannel priorityChannel; + @Test public void test() throws InterruptedException { int maxMessages = 10; @@ -111,7 +121,7 @@ protected void doInTransactionWithoutResult(TransactionStatus status) { log.info("Done sending " + maxMessages + " messages."); - Assert.assertTrue(String.format("Contdown latch did not count down from " + + Assert.assertTrue(String.format("Countdown latch did not count down from " + "%s to 0 in %sms.", maxMessages, maxWaitTime), testService.await(maxWaitTime)); Thread.sleep(2000); @@ -121,6 +131,7 @@ protected void doInTransactionWithoutResult(TransactionStatus status) { Assert.assertEquals(Integer.valueOf(0), Integer.valueOf(testService.getDuplicateMessagesCount())); } + @Test public void testInt2993IdCacheConcurrency() throws InterruptedException, ExecutionException { final String groupId = "testInt2993Group"; for (int i = 0; i < 100; i++) { @@ -180,6 +191,7 @@ public Boolean doInTransaction(TransactionStatus status) { assertTrue(executorService.awaitTermination(5, TimeUnit.SECONDS)); } + @Test public void testInt3181ConcurrentPolling() throws InterruptedException { for (int i = 0; i < 10; i++) { this.first.send(new GenericMessage("test")); @@ -190,6 +202,32 @@ public void testInt3181ConcurrentPolling() throws InterruptedException { assertEquals(0, errorAtomicInteger.get()); } + @Test + public void testMessageSequenceColumn() throws InterruptedException { + JdbcTemplate jdbcTemplate = new JdbcTemplate(this.dataSource); + String messageGroup = "TEST_MESSAGE_GROUP"; + this.jdbcChannelMessageStore.addMessageToGroup(messageGroup, new GenericMessage("foo")); + // The simple sleep to to be sure that messages are stored with different 'CREATED_DATE' + Thread.sleep(10); + this.jdbcChannelMessageStore.addMessageToGroup(messageGroup, new GenericMessage("bar")); + + List> result = + jdbcTemplate.queryForList("SELECT MESSAGE_SEQUENCE FROM INT_CHANNEL_MESSAGE " + + "WHERE GROUP_KEY = ? ORDER BY CREATED_DATE", UUIDConverter.getUUID(messageGroup).toString()); + assertEquals(2, result.size()); + Object messageSequence1 = result.get(0).get("MESSAGE_SEQUENCE"); + Object messageSequence2 = result.get(1).get("MESSAGE_SEQUENCE"); + assertNotNull(messageSequence1); + assertThat(messageSequence1, Matchers.instanceOf(Long.class)); + assertNotNull(messageSequence2); + assertThat(messageSequence2, Matchers.instanceOf(Long.class)); + + assertThat((Long) messageSequence1, Matchers.lessThan((Long) messageSequence2)); + + this.jdbcChannelMessageStore.removeMessageGroup(messageGroup); + } + + @Test public void testPriorityChannel() throws Exception { Message message = MessageBuilder.withPayload("1").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 1).build(); priorityChannel.send(message); diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DerbyTxTimeoutMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DerbyTxTimeoutMessageStoreTests.java index cd27498fe76..5af85f0e6a3 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DerbyTxTimeoutMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DerbyTxTimeoutMessageStoreTests.java @@ -12,15 +12,9 @@ */ package org.springframework.integration.jdbc.store.channel; -import java.util.concurrent.ExecutionException; - import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - /** @@ -31,31 +25,6 @@ */ @Ignore @ContextConfiguration -@RunWith(SpringJUnit4ClassRunner.class) public class DerbyTxTimeoutMessageStoreTests extends AbstractTxTimeoutMessageStoreTests { - @Test - @Override - public void test() throws InterruptedException { - super.test(); - } - - @Test - @Override - public void testInt3181ConcurrentPolling() throws InterruptedException { - super.testInt3181ConcurrentPolling(); - } - - @Test - @Override - public void testInt2993IdCacheConcurrency() throws InterruptedException, ExecutionException { - super.testInt2993IdCacheConcurrency(); - } - - @Test - @Override - public void testPriorityChannel() throws Exception { - super.testPriorityChannel(); - } - } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlJdbcChannelMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlJdbcChannelMessageStoreTests.java index 448fb607dad..a123a2264f0 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlJdbcChannelMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlJdbcChannelMessageStoreTests.java @@ -16,38 +16,12 @@ package org.springframework.integration.jdbc.store.channel; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** * @author Gunnar Hillert */ @ContextConfiguration -@RunWith(SpringJUnit4ClassRunner.class) -@DirtiesContext // close at the end after class public class HsqlJdbcChannelMessageStoreTests extends AbstractJdbcChannelMessageStoreTests { - @Before - @Override - public void init() throws Exception { - super.init(); - } - - @Test - @Override - public void testGetNonExistentMessageFromGroup() throws Exception { - super.testGetNonExistentMessageFromGroup(); - } - - @Test - @Override - public void testAddAndGet() throws Exception { - super.testAddAndGet(); - } - } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java index b80798dbf06..35ff4a634fa 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/HsqlTxTimeoutMessageStoreTests.java @@ -12,15 +12,10 @@ */ package org.springframework.integration.jdbc.store.channel; -import java.util.concurrent.ExecutionException; - import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; import org.springframework.integration.test.support.LongRunningIntegrationTest; import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** * @@ -29,34 +24,9 @@ * */ @ContextConfiguration -@RunWith(SpringJUnit4ClassRunner.class) public class HsqlTxTimeoutMessageStoreTests extends AbstractTxTimeoutMessageStoreTests { @Rule public LongRunningIntegrationTest longTests = new LongRunningIntegrationTest(); - @Test - @Override - public void test() throws InterruptedException { - super.test(); - } - - @Test - @Override - public void testInt2993IdCacheConcurrency() throws InterruptedException, ExecutionException { - super.testInt2993IdCacheConcurrency(); - } - - @Test - @Override - public void testInt3181ConcurrentPolling() throws InterruptedException { - super.testInt3181ConcurrentPolling(); - } - - @Test - @Override - public void testPriorityChannel() throws Exception { - super.testPriorityChannel(); - } - } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlJdbcChannelMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlJdbcChannelMessageStoreTests.java index a328780151a..fe9f47ef8ed 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlJdbcChannelMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlJdbcChannelMessageStoreTests.java @@ -16,38 +16,13 @@ package org.springframework.integration.jdbc.store.channel; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - /** * @author Gunnar Hillert */ -@Ignore +//@Ignore @ContextConfiguration -@RunWith(SpringJUnit4ClassRunner.class) public class MySqlJdbcChannelMessageStoreTests extends AbstractJdbcChannelMessageStoreTests { - @Before - @Override - public void init() throws Exception { - super.init(); - } - - @Test - @Override - public void testGetNonExistentMessageFromGroup() throws Exception { - super.testGetNonExistentMessageFromGroup(); - } - - @Test - @Override - public void testAddAndGet() throws Exception { - super.testAddAndGet(); - } - } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlTxTimeoutMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlTxTimeoutMessageStoreTests.java index 3dc05918c20..f8833d5e0da 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlTxTimeoutMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlTxTimeoutMessageStoreTests.java @@ -12,15 +12,7 @@ */ package org.springframework.integration.jdbc.store.channel; -import java.util.concurrent.ExecutionException; - -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; - import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - /** * @@ -28,33 +20,8 @@ * @author Artem Bilan * */ -@Ignore +//@Ignore @ContextConfiguration -@RunWith(SpringJUnit4ClassRunner.class) public class MySqlTxTimeoutMessageStoreTests extends AbstractTxTimeoutMessageStoreTests { - @Test - @Override - public void test() throws InterruptedException { - super.test(); - } - - @Test - @Override - public void testInt3181ConcurrentPolling() throws InterruptedException { - super.testInt3181ConcurrentPolling(); - } - - @Test - @Override - public void testInt2993IdCacheConcurrency() throws InterruptedException, ExecutionException { - super.testInt2993IdCacheConcurrency(); - } - - @Test - @Override - public void testPriorityChannel() throws Exception { - super.testPriorityChannel(); - } - } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/OracleTxTimeoutMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/OracleTxTimeoutMessageStoreTests.java index fe7ecf43c6e..0abd810397c 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/OracleTxTimeoutMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/OracleTxTimeoutMessageStoreTests.java @@ -12,13 +12,9 @@ */ package org.springframework.integration.jdbc.store.channel; -import java.util.concurrent.ExecutionException; - import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; + import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** @@ -29,31 +25,6 @@ */ @Ignore @ContextConfiguration -@RunWith(SpringJUnit4ClassRunner.class) public class OracleTxTimeoutMessageStoreTests extends AbstractTxTimeoutMessageStoreTests { - @Test - @Override - public void test() throws InterruptedException { - super.test(); - } - - @Test - @Override - public void testInt2993IdCacheConcurrency() throws InterruptedException, ExecutionException { - super.testInt2993IdCacheConcurrency(); - } - - @Test - @Override - public void testInt3181ConcurrentPolling() throws InterruptedException { - super.testInt3181ConcurrentPolling(); - } - - @Test - @Override - public void testPriorityChannel() throws Exception { - super.testPriorityChannel(); - } - } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests.java index 4acb4f5942b..d36ada8a2fb 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests.java @@ -12,13 +12,9 @@ */ package org.springframework.integration.jdbc.store.channel; -import java.util.concurrent.ExecutionException; - import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; + import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** @@ -29,31 +25,6 @@ */ @Ignore @ContextConfiguration -@RunWith(SpringJUnit4ClassRunner.class) public class PostgresTxTimeoutMessageStoreTests extends AbstractTxTimeoutMessageStoreTests { - @Test - @Override - public void test() throws InterruptedException { - super.test(); - } - - @Test - @Override - public void testInt2993IdCacheConcurrency() throws InterruptedException, ExecutionException { - super.testInt2993IdCacheConcurrency(); - } - - @Test - @Override - public void testInt3181ConcurrentPolling() throws InterruptedException { - super.testInt3181ConcurrentPolling(); - } - - @Test - @Override - public void testPriorityChannel() throws Exception { - super.testPriorityChannel(); - } - } diff --git a/src/reference/docbook/jdbc.xml b/src/reference/docbook/jdbc.xml index 5de2fa3b5cd..11648343ac1 100644 --- a/src/reference/docbook/jdbc.xml +++ b/src/reference/docbook/jdbc.xml @@ -526,12 +526,37 @@ Starting with version 4.0 the JdbcChannelMessageStore implements PriorityCapableChannelMessageStore and provides priorityEnabled option allowing it to be used as a message-store - reference for priority-queues. For this purpose the INT_CHANNEL_MESSAGE + reference for priority-queues. For this purpose the INT_CHANNEL_MESSAGE provides the MESSAGE_PRIORITY column to store the value of PRIORITY Message header. In addition the new MESSAGE_SEQUENCE column is also provide to achieve robust 'the oldest' polling mechanism. Having that the messages are polled (selected) from database with order order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE. + + + It's not recommended to use the same JdbcChannelMessageStore bean + for priority and non-priority, because priorityEnabled option applies to the entire store. + However the same INT_CHANNEL_MESSAGE and even region can be used for both + JdbcChannelMessageStore types. From configuration perspective it's just enough + to extend one bean from another: + + + + + + + + + + + + + + + +]]> + +