Skip to content

Commit

Permalink
Merge pull request #8884 from rabbitmq/mergify/bp/v3.10.x/pr-8883
Browse files Browse the repository at this point in the history
Adapt stream Java tests to client 0.12.0 snapshot (backport #8880) (backport #8882) (backport #8883)
  • Loading branch information
acogoluegnes committed Jul 17, 2023
2 parents 064f2ae + 3494bbf commit e15e560
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 43 deletions.
16 changes: 8 additions & 8 deletions deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
</developers>

<properties>
<stream-client.version>[0.7.0-SNAPSHOT,)</stream-client.version>
<junit.jupiter.version>5.9.0</junit.jupiter.version>
<assertj.version>3.23.1</assertj.version>
<logback.version>1.2.11</logback.version>
<maven.compiler.plugin.version>3.10.1</maven.compiler.plugin.version>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<spotless.version>2.24.0</spotless.version>
<google-java-format.version>1.15.0</google-java-format.version>
<stream-client.version>[0.12.0-SNAPSHOT,)</stream-client.version>
<junit.jupiter.version>5.9.3</junit.jupiter.version>
<assertj.version>3.24.2</assertj.version>
<logback.version>1.2.12</logback.version>
<maven.compiler.plugin.version>3.11.0</maven.compiler.plugin.version>
<maven-surefire-plugin.version>3.1.2</maven-surefire-plugin.version>
<spotless.version>2.37.0</spotless.version>
<google-java-format.version>1.17.0</google-java-format.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

package com.rabbitmq.stream;

import static com.rabbitmq.stream.TestUtils.*;
import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok;
import static com.rabbitmq.stream.TestUtils.waitAtMost;
import static com.rabbitmq.stream.TestUtils.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

Expand Down Expand Up @@ -164,7 +163,7 @@ void leaderFailureWhenPublisherConnectedToReplica() throws Exception {
new Client.ClientParameters()
.port(TestUtils.streamPortNode1())
.messageListener(
(subscriptionId, offset, chunkTimestamp, committedChunkId, msg) -> {
(subscriptionId, offset, chunkTimestamp, committedChunkId, context, msg) -> {
bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8));
consumeLatch.countDown();
}));
Expand Down Expand Up @@ -341,11 +340,14 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
cf.get(
new Client.ClientParameters()
.port(m.getReplicas().get(0).getPort())
.chunkListener(
(client1, subscriptionId, offset, messageCount, dataSize) ->
client1.credit(subscriptionId, 1))
.chunkListener(credit())
.messageListener(
(subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> {
(subscriptionId,
offset,
chunkTimestamp,
committedChunkId,
context,
message) -> {
consumed.add(message);
generations.add((Long) message.getApplicationProperties().get("generation"));
if (consumed.size() == confirmed.size()) {
Expand Down Expand Up @@ -447,7 +449,7 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception {
Set<Long> generations = ConcurrentHashMap.newKeySet();
Set<Long> consumedIds = ConcurrentHashMap.newKeySet();
Client.MessageListener messageListener =
(subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> {
(subscriptionId, offset, chunkTimestamp, committedChunkId, context, message) -> {
consumed.add(message);
generations.add((Long) message.getApplicationProperties().get("generation"));
consumedIds.add(message.getProperties().getMessageIdAsLong());
Expand All @@ -471,9 +473,7 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception {
new Client.ClientParameters()
.port(newReplicaPort)
.shutdownListener(shutdownListenerReference.get())
.chunkListener(
(client1, subscriptionId, offset, messageCount, dataSize) ->
client1.credit(subscriptionId, 1))
.chunkListener(credit())
.messageListener(messageListener));

newConsumer.subscribe(
Expand All @@ -494,9 +494,7 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception {
new Client.ClientParameters()
.port(replica.getPort())
.shutdownListener(shutdownListener)
.chunkListener(
(client1, subscriptionId, offset, messageCount, dataSize) ->
client1.credit(subscriptionId, 1))
.chunkListener(credit())
.messageListener(messageListener));

Client.Response response =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.rabbitmq.stream;

import static com.rabbitmq.stream.TestUtils.credit;
import static org.assertj.core.api.Assertions.assertThat;

import com.rabbitmq.stream.impl.Client;
Expand Down Expand Up @@ -115,11 +116,14 @@ void shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember(
cf.get(
new Client.ClientParameters()
.port(consumerBroker.apply(streamMetadata).getPort())
.chunkListener(
(client1, subscriptionId, offset, messageCount1, dataSize) ->
client1.credit(subscriptionId, 10))
.chunkListener(credit())
.messageListener(
(subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> {
(subscriptionId,
offset,
chunkTimestamp,
committedChunkId,
context,
message) -> {
bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8));
consumingLatch.countDown();
}));
Expand All @@ -128,7 +132,8 @@ void shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember(

assertThat(consumingLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(bodies).hasSize(messageCount);
IntStream.range(0, messageCount).forEach(i -> assertThat(bodies.contains("hello " + i)));
IntStream.range(0, messageCount)
.forEach(i -> assertThat(bodies.contains("hello " + i)).isTrue());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,11 @@ static Condition<Response> responseCode(short expectedResponse) {
expectedResponse);
}
}

static Client.ChunkListener credit() {
return (client, subscriptionId, offset, messageCount, dataSize) -> {
client.credit(subscriptionId, 1);
return null;
};
}
}
16 changes: 8 additions & 8 deletions deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
</developers>

<properties>
<stream-client.version>[0.7.0-SNAPSHOT,)</stream-client.version>
<junit.jupiter.version>5.9.0</junit.jupiter.version>
<assertj.version>3.23.1</assertj.version>
<logback.version>1.2.11</logback.version>
<maven.compiler.plugin.version>3.10.1</maven.compiler.plugin.version>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<spotless.version>2.24.0</spotless.version>
<google-java-format.version>1.15.0</google-java-format.version>
<stream-client.version>[0.12.0-SNAPSHOT,)</stream-client.version>
<junit.jupiter.version>5.9.3</junit.jupiter.version>
<assertj.version>3.24.2</assertj.version>
<logback.version>1.2.12</logback.version>
<maven.compiler.plugin.version>3.11.0</maven.compiler.plugin.version>
<maven-surefire-plugin.version>3.1.2</maven-surefire-plugin.version>
<spotless.version>2.37.0</spotless.version>
<google-java-format.version>1.17.0</google-java-format.version>
<okhttp.version>4.9.3</okhttp.version>
<gson.version>2.8.9</gson.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,6 @@ static List<Map<String, Object>> entities(List<Map<String, Object>> entities, Cl
.collect(Collectors.toList());
}

static List<Map<String, Object>> entities(
List<Map<String, Object>> entities, Predicate<Map<String, Object>> filter) {
return entities.stream().filter(filter).collect(Collectors.toList());
}

static Map<String, Object> entity(
List<Map<String, Object>> entities, Predicate<Map<String, Object>> filter) {
return entities.stream().filter(filter).findFirst().orElse(Collections.emptyMap());
Expand Down Expand Up @@ -558,9 +553,7 @@ void consumers(Map<String, String> subscriptionProperties) throws Exception {
cf.get(
new ClientParameters()
.clientProperty("connection_name", connectionProvidedName)
.chunkListener(
(client1, subscriptionId, offset, messageCount, dataSize) ->
client1.credit(subscriptionId, 1))
.chunkListener(TestUtils.credit())
.shutdownListener(shutdownContext -> closed.set(true)));

client.subscribe((byte) 0, stream, OffsetSpecification.first(), 10, subscriptionProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,11 @@ static Condition<Object> notNull() {
static Condition<Object> isNull() {
return new Condition<>(Objects::isNull, "null");
}

static Client.ChunkListener credit() {
return (client, subscriptionId, offset, messageCount, dataSize) -> {
client.credit(subscriptionId, 1);
return null;
};
}
}

0 comments on commit e15e560

Please sign in to comment.