From da29e2da6a21d1ab1e6d23883f7ac424f2200149 Mon Sep 17 00:00:00 2001 From: Johannes Edmeier Date: Fri, 3 May 2024 14:55:47 +0200 Subject: [PATCH] PostgresChannelMessageTableSubscriber: Renew connection only if invalid Fixes: #9111 An evolution of the #9061: renew the connection only when we need to. **Auto-cherry-pick to `6.2.x` & `6.1.x`** --- ...PostgresChannelMessageTableSubscriber.java | 5 +++-- ...resChannelMessageTableSubscriberTests.java | 20 ++++++++++++++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java index 4b72c138ee..7a7028f940 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java @@ -216,9 +216,10 @@ private void doStart(CountDownLatch startingLatch) { if (!isActive()) { return; } - if (notifications == null || notifications.length == 0) { + if ((notifications == null || notifications.length == 0) && !conn.isValid(1)) { //We did not receive any notifications within the timeout period. - //We will close the connection and re-establish it. + //If the connection is still valid, we will continue polling + //Otherwise, we will close the connection and re-establish it. break; } for (PGNotification notification : notifications) { diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java index 99120ca127..e595af5ec9 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java @@ -23,8 +23,10 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import javax.sql.DataSource; @@ -268,7 +270,18 @@ public void testRenewConnection() throws Exception { CountDownLatch latch = new CountDownLatch(2); List payloads = new ArrayList<>(); CountDownLatch connectionLatch = new CountDownLatch(2); - connectionSupplier.onGetConnection = connectionLatch::countDown; + AtomicBoolean connectionCloseState = new AtomicBoolean(); + connectionSupplier.onGetConnection = conn -> { + connectionLatch.countDown(); + if (connectionCloseState.compareAndSet(false, true)) { + try { + conn.close(); + } + catch (Exception e) { + //nop + } + } + }; postgresChannelMessageTableSubscriber.start(); postgresSubscribableChannel.subscribe(message -> { payloads.add(message.getPayload()); @@ -324,7 +337,7 @@ public JdbcChannelMessageStore jdbcChannelMessageStore(DataSource dataSource) { private static class ConnectionSupplier implements PgConnectionSupplier { - Runnable onGetConnection; + Consumer onGetConnection; @Override public PgConnection get() throws SQLException { @@ -333,10 +346,11 @@ public PgConnection get() throws SQLException { POSTGRES_CONTAINER.getPassword()) .unwrap(PgConnection.class); if (this.onGetConnection != null) { - this.onGetConnection.run(); + this.onGetConnection.accept(conn); } return conn; } } + }