Skip to content

Commit

Permalink
GH-8760 Postgres: using DELETE ... RETURNING
Browse files Browse the repository at this point in the history
Fixes #8760

* Make `PostgresChannelMessageStoreQueryProvider` to use single `DELETE ... RETURNING` for polling statements
* Add `isUsingSingleStatementForPoll` and use it from `JdbcChannelMessageStore`
* Execute Postgres init scripts to `PostgresContainerTest`
* Code clean up
* Document the new feature
  • Loading branch information
joshiste authored and artembilan committed Oct 17, 2023
1 parent 29186e2 commit 87a2ac5
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
* Channel-specific implementation of
* {@link org.springframework.integration.store.BasicMessageGroupStore} using a relational
* database via JDBC.
*
* <p>
* This message store shall be used for message channels only.
* <p>
* As such, the {@link JdbcChannelMessageStore} uses database specific SQL queries.
Expand All @@ -86,6 +86,7 @@
* @author Gary Russell
* @author Meherzad Lahewala
* @author Trung Pham
* @author Johannes Edmeier
*
* @since 2.2
*/
Expand Down Expand Up @@ -554,12 +555,16 @@ public void removeMessageGroup(Object groupId) {
public Message<?> pollMessageFromGroup(Object groupId) {
String key = getKey(groupId);
Message<?> polledMessage = doPollForMessage(key);
if (polledMessage != null && !doRemoveMessageFromGroup(groupId, polledMessage)) {
if (polledMessage != null && !isSingleStatementForPoll() && !doRemoveMessageFromGroup(groupId, polledMessage)) {
return null;
}
return polledMessage;
}

private boolean isSingleStatementForPoll() {
return this.channelMessageStoreQueryProvider.isSingleStatementForPoll();
}

/**
* This method executes a call to the DB to get the oldest Message in the
* MessageGroup which in the context of the {@link JdbcChannelMessageStore}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* @author Artem Bilan
* @author Gary Russell
* @author Adama Sorho
* @author Johannes Edmeier
*
* @since 2.2
*/
Expand All @@ -33,7 +34,7 @@ public interface ChannelMessageStoreQueryProvider {
String SELECT_COMMON = """
SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES
from %PREFIX%CHANNEL_MESSAGE
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region\s
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region
""";

/**
Expand Down Expand Up @@ -125,4 +126,14 @@ default String getDeleteMessageGroupQuery() {
*/
String getPriorityPollFromGroupQuery();

/**
* Indicate if the queries for polling are using a single statement (e.g. DELETE ... RETURNING) to
* retrieve and delete the message from the channel store.
* @return true if a single statement is used, false if a select and delete is required.
* @since 6.2
*/
default boolean isSingleStatementForPoll() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,77 @@
* @author Gunnar Hillert
* @author Artem Bilan
* @author Adama Sorho
* @author Johannes Edmeier
*
* @since 2.2
*/
public class PostgresChannelMessageStoreQueryProvider implements ChannelMessageStoreQueryProvider {

@Override
public String getPollFromGroupExcludeIdsQuery() {
return SELECT_COMMON
+ "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) "
+ "order by CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1 FOR UPDATE SKIP LOCKED";
return """
delete
from %PREFIX%CHANNEL_MESSAGE
where CTID = (select CTID
from %PREFIX%CHANNEL_MESSAGE
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids)
order by CREATED_DATE, MESSAGE_SEQUENCE
limit 1 for update skip locked)
returning MESSAGE_ID, MESSAGE_BYTES;
""";
}

@Override
public String getPollFromGroupQuery() {
return SELECT_COMMON +
"order by CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1 FOR UPDATE SKIP LOCKED";
return """
delete
from %PREFIX%CHANNEL_MESSAGE
where CTID = (select CTID
from %PREFIX%CHANNEL_MESSAGE
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
order by CREATED_DATE, MESSAGE_SEQUENCE
limit 1 for update skip locked)
returning MESSAGE_ID, MESSAGE_BYTES;
""";
}

@Override
public String getPriorityPollFromGroupExcludeIdsQuery() {
return SELECT_COMMON +
"and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " +
"order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE " +
"LIMIT 1 FOR UPDATE SKIP LOCKED";
return """
delete
from %PREFIX%CHANNEL_MESSAGE
where CTID = (select CTID
from %PREFIX%CHANNEL_MESSAGE
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids)
order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
limit 1 for update skip locked)
returning MESSAGE_ID, MESSAGE_BYTES;
""";
}

@Override
public String getPriorityPollFromGroupQuery() {
return SELECT_COMMON +
"order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE " +
"LIMIT 1 FOR UPDATE SKIP LOCKED";
return """
delete
from %PREFIX%CHANNEL_MESSAGE
where CTID = (select CTID
from %PREFIX%CHANNEL_MESSAGE
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
limit 1 for update skip locked)
returning MESSAGE_ID, MESSAGE_BYTES;
""";
}

@Override
public boolean isSingleStatementForPoll() {
return true;
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
-- Autogenerated: do not edit this file

DROP INDEX INT_MESSAGE_IX1 ;
DROP INDEX INT_CHANNEL_MSG_DATE_IDX ;
DROP INDEX INT_CHANNEL_MSG_PRIORITY_IDX ;
DROP TABLE INT_MESSAGE ;
DROP TABLE INT_MESSAGE_GROUP ;
DROP TABLE INT_GROUP_TO_MESSAGE ;
DROP TABLE INT_LOCK ;
DROP TABLE INT_CHANNEL_MESSAGE ;
DROP TABLE INT_METADATA_STORE ;
DROP SEQUENCE INT_MESSAGE_SEQ ;
DROP INDEX IF EXISTS INT_MESSAGE_IX1 ;
DROP INDEX IF EXISTS INT_CHANNEL_MSG_DATE_IDX ;
DROP INDEX IF EXISTS INT_CHANNEL_MSG_PRIORITY_IDX ;
DROP TABLE IF EXISTS INT_MESSAGE ;
DROP TABLE IF EXISTS INT_MESSAGE_GROUP ;
DROP TABLE IF EXISTS INT_GROUP_TO_MESSAGE ;
DROP TABLE IF EXISTS INT_LOCK ;
DROP TABLE IF EXISTS INT_CHANNEL_MESSAGE ;
DROP TABLE IF EXISTS INT_METADATA_STORE ;
DROP SEQUENCE IF EXISTS INT_MESSAGE_SEQ ;
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,6 @@
public class PostgresChannelMessageTableSubscriberTests implements PostgresContainerTest {

private static final String INTEGRATION_DB_SCRIPTS = """
CREATE SEQUENCE INT_MESSAGE_SEQ START WITH 1 INCREMENT BY 1 NO CYCLE;
^^^ END OF SCRIPT ^^^
CREATE TABLE INT_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('INT_MESSAGE_SEQ'),
MESSAGE_BYTES BYTEA,
REGION VARCHAR(100) NOT NULL,
constraint INT_CHANNEL_MESSAGE_PK primary key (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
^^^ END OF SCRIPT ^^^
CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()
RETURNS TRIGGER AS
$BODY$
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,14 +27,17 @@
* Since the Postgres container instance is shared via static property, it is going to be
* started only once per JVM, therefore the target Docker container is reused automatically.
*
* @author Artem Bilan
* @author Rafael Winterhalter
* @author Johannes Edmeier
*
* @since 6.0
*/
@Testcontainers(disabledWithoutDocker = true)
public interface PostgresContainerTest {

PostgreSQLContainer<?> POSTGRES_CONTAINER = new PostgreSQLContainer<>("postgres:11");
PostgreSQLContainer<?> POSTGRES_CONTAINER = new PostgreSQLContainer<>("postgres:11")
.withInitScript("org/springframework/integration/jdbc/schema-postgresql.sql");

@BeforeAll
static void startContainer() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.jdbc.store.channel;

import javax.sql.DataSource;

import org.apache.commons.dbcp2.BasicDataSource;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.jdbc.channel.PostgresContainerTest;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.transaction.PlatformTransactionManager;

/**
* @author Johannes Edmeier
* @author Artem Bilan
*
* @since 6.2
*/
@ContextConfiguration
public class PostgresJdbcChannelMessageStoreTests extends AbstractJdbcChannelMessageStoreTests
implements PostgresContainerTest {

@Configuration
public static class Config {

@Bean
public DataSource dataSource() {
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUrl(PostgresContainerTest.getJdbcUrl());
dataSource.setUsername(PostgresContainerTest.getUsername());
dataSource.setPassword(PostgresContainerTest.getPassword());
return dataSource;
}

@Bean
PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}

@Bean
PostgresChannelMessageStoreQueryProvider queryProvider() {
return new PostgresChannelMessageStoreQueryProvider();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ If your database is not listed, you can implement the `ChannelMessageStoreQueryP

Version 4.0 added the `MESSAGE_SEQUENCE` column to the table to ensure first-in-first-out (FIFO) queueing even when messages are stored in the same millisecond.

Starting with version 6.2, `ChannelMessageStoreQueryProvider` exposes a `isSingleStatementForPoll` flag, where the `PostgresChannelMessageStoreQueryProvider` returns `true` and its queries for polls are now based on a single `DELETE...RETURNING` statement.
The `JdbcChannelMessageStore` consults with the `isSingleStatementForPoll` option and skips a separate `DELETE` statement if only single poll statement is supported.

[[custom-message-insertion]]
=== Custom Message Insertion

Expand Down
4 changes: 3 additions & 1 deletion src/reference/antora/modules/ROOT/pages/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ See xref:kafka.adoc#kafka-inbound-pollable[Kafka Inbound Channel Adapter] for mo
=== JDBC Support Changes

The `JdbcMessageStore`, `JdbcChannelMessageStore`, `JdbcMetadataStore`, and `DefaultLockRepository` implement `SmartLifecycle` and perform a `SELECT COUNT` query, on their respective tables, in the `start()` method to ensure that the required table (according to the provided prefix) is present in the target database.
See xref:jdbc/message-store.adoc#jdbc-db-init[Initializing the Database] for more information.
The `PostgresChannelMessageStoreQueryProvider` now provides single `DELETE...RETURNING` statement for polling queries.
For this purpose the `ChannelMessageStoreQueryProvider` exposes `isSingleStatementForPoll` option which is consulted from the `JdbcChannelMessageStore`.
See xref:jdbc/message-store.adoc[JDBC Message Store] for more information.

[[x6.2-mongodb]]
=== MongoDB Support Changes
Expand Down

0 comments on commit 87a2ac5

Please sign in to comment.