diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml index f238dc168cd3..5f4e47ab4ff0 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml @@ -26,14 +26,14 @@ - [0.7.0-SNAPSHOT,) - 5.9.0 - 3.23.1 - 1.2.11 - 3.10.1 - 2.22.2 - 2.24.0 - 1.15.0 + [0.12.0-SNAPSHOT,) + 5.9.3 + 3.24.2 + 1.2.12 + 3.11.0 + 3.1.2 + 2.37.0 + 1.17.0 UTF-8 diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java index 4d28511400be..d4315a131180 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -16,9 +16,8 @@ package com.rabbitmq.stream; +import static com.rabbitmq.stream.TestUtils.*; import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok; -import static com.rabbitmq.stream.TestUtils.waitAtMost; -import static com.rabbitmq.stream.TestUtils.waitUntil; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -164,7 +163,7 @@ void leaderFailureWhenPublisherConnectedToReplica() throws Exception { new Client.ClientParameters() .port(TestUtils.streamPortNode1()) .messageListener( - (subscriptionId, offset, chunkTimestamp, committedChunkId, msg) -> { + (subscriptionId, offset, chunkTimestamp, committedChunkId, context, msg) -> { bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8)); consumeLatch.countDown(); })); @@ -341,11 +340,14 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { cf.get( new Client.ClientParameters() .port(m.getReplicas().get(0).getPort()) - .chunkListener( - (client1, subscriptionId, offset, messageCount, dataSize) -> - client1.credit(subscriptionId, 1)) + .chunkListener(credit()) .messageListener( - (subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> { + (subscriptionId, + offset, + chunkTimestamp, + committedChunkId, + context, + message) -> { consumed.add(message); generations.add((Long) message.getApplicationProperties().get("generation")); if (consumed.size() == confirmed.size()) { @@ -447,7 +449,7 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception { Set generations = ConcurrentHashMap.newKeySet(); Set consumedIds = ConcurrentHashMap.newKeySet(); Client.MessageListener messageListener = - (subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> { + (subscriptionId, offset, chunkTimestamp, committedChunkId, context, message) -> { consumed.add(message); generations.add((Long) message.getApplicationProperties().get("generation")); consumedIds.add(message.getProperties().getMessageIdAsLong()); @@ -471,9 +473,7 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception { new Client.ClientParameters() .port(newReplicaPort) .shutdownListener(shutdownListenerReference.get()) - .chunkListener( - (client1, subscriptionId, offset, messageCount, dataSize) -> - client1.credit(subscriptionId, 1)) + .chunkListener(credit()) .messageListener(messageListener)); newConsumer.subscribe( @@ -494,9 +494,7 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception { new Client.ClientParameters() .port(replica.getPort()) .shutdownListener(shutdownListener) - .chunkListener( - (client1, subscriptionId, offset, messageCount, dataSize) -> - client1.credit(subscriptionId, 1)) + .chunkListener(credit()) .messageListener(messageListener)); Client.Response response = diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java index cc1464382b4a..7a4d214c3933 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java @@ -16,6 +16,7 @@ package com.rabbitmq.stream; +import static com.rabbitmq.stream.TestUtils.credit; import static org.assertj.core.api.Assertions.assertThat; import com.rabbitmq.stream.impl.Client; @@ -115,11 +116,14 @@ void shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember( cf.get( new Client.ClientParameters() .port(consumerBroker.apply(streamMetadata).getPort()) - .chunkListener( - (client1, subscriptionId, offset, messageCount1, dataSize) -> - client1.credit(subscriptionId, 10)) + .chunkListener(credit()) .messageListener( - (subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> { + (subscriptionId, + offset, + chunkTimestamp, + committedChunkId, + context, + message) -> { bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8)); consumingLatch.countDown(); })); @@ -128,7 +132,8 @@ void shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember( assertThat(consumingLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(bodies).hasSize(messageCount); - IntStream.range(0, messageCount).forEach(i -> assertThat(bodies.contains("hello " + i))); + IntStream.range(0, messageCount) + .forEach(i -> assertThat(bodies.contains("hello " + i)).isTrue()); } @Test diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java index 3ddb695f4b65..3b54147a0bdc 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java @@ -218,4 +218,11 @@ static Condition responseCode(short expectedResponse) { expectedResponse); } } + + static Client.ChunkListener credit() { + return (client, subscriptionId, offset, messageCount, dataSize) -> { + client.credit(subscriptionId, 1); + return null; + }; + } } diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml b/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml index 4f96fcc8dfe2..304a74530f7d 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml @@ -26,14 +26,14 @@ - [0.7.0-SNAPSHOT,) - 5.9.0 - 3.23.1 - 1.2.11 - 3.10.1 - 2.22.2 - 2.24.0 - 1.15.0 + [0.12.0-SNAPSHOT,) + 5.9.3 + 3.24.2 + 1.2.12 + 3.11.0 + 3.1.2 + 2.37.0 + 1.17.0 4.9.3 2.8.9 UTF-8 diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java index 495c831b6ae2..13f5c59f7a85 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java @@ -140,11 +140,6 @@ static List> entities(List> entities, Cl .collect(Collectors.toList()); } - static List> entities( - List> entities, Predicate> filter) { - return entities.stream().filter(filter).collect(Collectors.toList()); - } - static Map entity( List> entities, Predicate> filter) { return entities.stream().filter(filter).findFirst().orElse(Collections.emptyMap()); @@ -558,9 +553,7 @@ void consumers(Map subscriptionProperties) throws Exception { cf.get( new ClientParameters() .clientProperty("connection_name", connectionProvidedName) - .chunkListener( - (client1, subscriptionId, offset, messageCount, dataSize) -> - client1.credit(subscriptionId, 1)) + .chunkListener(TestUtils.credit()) .shutdownListener(shutdownContext -> closed.set(true))); client.subscribe((byte) 0, stream, OffsetSpecification.first(), 10, subscriptionProperties); diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java index 0e3ab66a5de2..1f70845383e2 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java @@ -257,4 +257,11 @@ static Condition notNull() { static Condition isNull() { return new Condition<>(Objects::isNull, "null"); } + + static Client.ChunkListener credit() { + return (client, subscriptionId, offset, messageCount, dataSize) -> { + client.credit(subscriptionId, 1); + return null; + }; + } }