From e9f7780e842635473c92b064915b5e4da9fb45f7 Mon Sep 17 00:00:00 2001 From: abilan Date: Wed, 7 Jun 2023 13:55:11 -0400 Subject: [PATCH] Fix race condition in `DebeziumMessProducerTests` The `then(debeziumEngineMock).should().run()` cannot be just checked after `debeziumMessageProducer.start()`: the `DebeziumEngine` is really started on a separate thread. * Check for a `run()` interaction with the mock already after calling `debeziumMessageProducer.stop()`. The `stop()` waits for an internal `latch` which is fulfilled when `DebeziumEngine` exists from its `run()` cycle * Rename `DebeziumMessageProducer.latch` to `lifecycleLatch` to give it more sense. --- .../debezium/inbound/DebeziumMessageProducer.java | 10 +++++----- .../debezium/inbound/DebeziumMessageProducerTests.java | 8 ++++++-- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducer.java b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducer.java index 1894bc7e48b..3f6877a1b4e 100644 --- a/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducer.java +++ b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducer.java @@ -77,7 +77,7 @@ public class DebeziumMessageProducer extends MessageProducerSupport { private ThreadFactory threadFactory; - private volatile CountDownLatch latch = new CountDownLatch(0); + private volatile CountDownLatch lifecycleLatch = new CountDownLatch(0); /** * Create new Debezium message producer inbound channel adapter. @@ -174,10 +174,10 @@ protected void onInit() { @Override protected void doStart() { - if (this.latch.getCount() > 0) { + if (this.lifecycleLatch.getCount() > 0) { return; } - this.latch = new CountDownLatch(1); + this.lifecycleLatch = new CountDownLatch(1); this.executorService.execute(() -> { try { // Runs the debezium connector and deliver database changes to the registered consumer. This method @@ -191,7 +191,7 @@ protected void doStart() { this.debeziumEngine.run(); } finally { - this.latch.countDown(); + this.lifecycleLatch.countDown(); } }); } @@ -205,7 +205,7 @@ protected void doStop() { logger.warn(e, "Debezium failed to close!"); } try { - if (!this.latch.await(5, TimeUnit.SECONDS)) { + if (!this.lifecycleLatch.await(5, TimeUnit.SECONDS)) { throw new IllegalStateException("Failed to stop " + this); } } diff --git a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducerTests.java b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducerTests.java index 30205866435..27f6b079c90 100644 --- a/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducerTests.java +++ b/spring-integration-debezium/src/test/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducerTests.java @@ -38,6 +38,7 @@ /** * @author Christian Tzolov + * @author Artem Bilan * * @since 6.2 */ @@ -76,12 +77,15 @@ public void testDebeziumMessageProducerLifecycle() throws IOException { await().atMost(5, TimeUnit.SECONDS).until(() -> debeziumMessageProducer.isRunning()); assertThat(debeziumMessageProducer.isActive()).isEqualTo(true); - then(debeziumEngineMock).should().run(); debeziumMessageProducer.stop(); // STOP assertThat(debeziumMessageProducer.isActive()).isEqualTo(false); assertThat(debeziumMessageProducer.isRunning()).isEqualTo(false); + + // The DebeziumEngine is started on a different thread. + // Only the way to catch the run() mock is to stop DebeziumMessageProducer and wait for its internal latch + then(debeziumEngineMock).should().run(); then(debeziumEngineMock).should().close(); reset(debeziumEngineMock); @@ -90,10 +94,10 @@ public void testDebeziumMessageProducerLifecycle() throws IOException { await().atMost(5, TimeUnit.SECONDS).until(() -> debeziumMessageProducer.isRunning()); assertThat(debeziumMessageProducer.isActive()).isEqualTo(true); - then(debeziumEngineMock).should().run(); debeziumMessageProducer.destroy(); // DESTROY + then(debeziumEngineMock).should().run(); then(debeziumEngineMock).should().close(); }