diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java index 9075ebedc3..68ae5110a1 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java @@ -236,9 +236,10 @@ private void doStart(CountDownLatch startingLatch) { if (!isActive()) { return; } - if (notifications == null || notifications.length == 0) { + if ((notifications == null || notifications.length == 0) && !conn.isValid(1)) { //We did not receive any notifications within the timeout period. - //We will close the connection and re-establish it. + //If the connection is still valid, we will continue polling + //Otherwise, we will close the connection and re-establish it. break; } for (PGNotification notification : notifications) { diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java index 7a74cd7202..d028e7ee82 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java @@ -23,8 +23,10 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import javax.sql.DataSource; @@ -270,7 +272,18 @@ public void testRenewConnection() throws Exception { CountDownLatch latch = new CountDownLatch(2); List payloads = new ArrayList<>(); CountDownLatch connectionLatch = new CountDownLatch(2); - connectionSupplier.onGetConnection = connectionLatch::countDown; + AtomicBoolean connectionCloseState = new AtomicBoolean(); + connectionSupplier.onGetConnection = conn -> { + connectionLatch.countDown(); + if (connectionCloseState.compareAndSet(false, true)) { + try { + conn.close(); + } + catch (Exception e) { + //nop + } + } + }; postgresChannelMessageTableSubscriber.start(); postgresSubscribableChannel.subscribe(message -> { payloads.add(message.getPayload()); @@ -326,7 +339,7 @@ public JdbcChannelMessageStore jdbcChannelMessageStore(DataSource dataSource) { private static class ConnectionSupplier implements PgConnectionSupplier { - Runnable onGetConnection; + Consumer onGetConnection; @Override public PgConnection get() throws SQLException { @@ -335,10 +348,11 @@ public PgConnection get() throws SQLException { POSTGRES_CONTAINER.getPassword()) .unwrap(PgConnection.class); if (this.onGetConnection != null) { - this.onGetConnection.run(); + this.onGetConnection.accept(conn); } return conn; } } + }