Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for super stream exchange #357

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/test-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ on:
branches:
- main

env:
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:super-stream-exchange-type-otp-max-bazel'

jobs:
build:
runs-on: ubuntu-22.04
Expand Down
140 changes: 140 additions & 0 deletions src/test/java/com/rabbitmq/stream/impl/SuperStreamExchangeTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.impl.TestUtils.*;
import static com.rabbitmq.stream.impl.TestUtils.SuperStreamExchangeType.SUPER;
import static org.assertj.core.api.Assertions.assertThat;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.stream.*;
import io.netty.channel.EventLoopGroup;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;import java.util.stream.IntStream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
public class SuperStreamExchangeTest {

EventLoopGroup eventLoopGroup;

Environment environment;

Connection connection;
int partitions = 3;
int messageCount = 10_000;
String superStream;

@BeforeEach
void init(TestInfo info) throws Exception {
EnvironmentBuilder environmentBuilder =
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
environmentBuilder.addressResolver(add -> localhost());
environment = environmentBuilder.build();
connection = new ConnectionFactory().newConnection();
superStream = TestUtils.streamName(info);
}

@AfterEach
void tearDown() throws Exception {
environment.close();
deleteSuperStreamTopology(connection, superStream, partitions);
connection.close();
}

@Test
void publish() throws Exception {
declareSuperStreamTopology(connection, superStream, SUPER, partitions);
List<String> routingKeys = new ArrayList<>(messageCount);
IntStream.range(0, messageCount)
.forEach(ignored -> routingKeys.add(UUID.randomUUID().toString()));

CountDownLatch publishLatch = new CountDownLatch(messageCount);
try (Producer producer =
environment
.producerBuilder()
.superStream(superStream)
.routing(msg -> msg.getProperties().getMessageIdAsString())
.producerBuilder()
.build()) {
ConfirmationHandler confirmationHandler = status -> publishLatch.countDown();
routingKeys.forEach(
rk ->
producer.send(
producer.messageBuilder().properties().messageId(rk).messageBuilder().build(),
confirmationHandler));
latchAssert(publishLatch).completes();
}

java.util.function.Consumer<Map<String, Set<String>>> consumeMessages =
receivedMessages -> {
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
try (Consumer ignored =
environment
.consumerBuilder()
.superStream(superStream)
.offset(OffsetSpecification.first())
.messageHandler(
(ctx, msg) -> {
receivedMessages
.computeIfAbsent(ctx.stream(), k -> ConcurrentHashMap.newKeySet())
.add(msg.getProperties().getMessageIdAsString());
consumeLatch.countDown();
})
.build()) {

latchAssert(consumeLatch).completes();
assertThat(receivedMessages.values().stream().mapToInt(Set::size).sum())
.isEqualTo(messageCount);
}
};

Map<String, Set<String>> streamProducerMessages = new ConcurrentHashMap<>(partitions);
consumeMessages.accept(streamProducerMessages);

deleteSuperStreamTopology(connection, superStream, partitions);
declareSuperStreamTopology(connection, superStream, SUPER, partitions);

try (Channel channel = connection.createChannel()) {
channel.confirmSelect();
for (String rk : routingKeys) {
channel.basicPublish(
superStream, rk, new AMQP.BasicProperties.Builder().messageId(rk).build(), null);
}
channel.waitForConfirmsOrDie();
}

Map<String, Set<String>> amqpProducerMessages = new ConcurrentHashMap<>(partitions);
consumeMessages.accept(amqpProducerMessages);
assertThat(amqpProducerMessages).hasSameSizeAs(streamProducerMessages)
.containsKeys(streamProducerMessages.keySet().toArray(new String[]{}));

BiConsumer<Set<String>, Set<String>> compareSets = (s1, s2) -> {
assertThat(s1).hasSameSizeAs(s2);
s1.forEach(rk -> assertThat(s2).contains(rk));
};

amqpProducerMessages.forEach(
(key, value) -> compareSets.accept(value, streamProducerMessages.get(key)));
}
}
34 changes: 32 additions & 2 deletions src/test/java/com/rabbitmq/stream/impl/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.impl.TestUtils.SuperStreamExchangeType.DIRECT;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;

import ch.qos.logback.classic.Level;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.stream.Address;
Expand Down Expand Up @@ -272,18 +272,37 @@ static <T> void doIfNotNull(T obj, Consumer<T> action) {

static void declareSuperStreamTopology(Connection connection, String superStream, int partitions)
throws Exception {
declareSuperStreamTopology(connection, superStream, DIRECT, partitions);
}

static void declareSuperStreamTopology(
Connection connection,
String superStream,
SuperStreamExchangeType exchangeType,
int partitions)
throws Exception {
declareSuperStreamTopology(
connection,
superStream,
exchangeType,
IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new));
}

static void declareSuperStreamTopology(Connection connection, String superStream, String... rks)
throws Exception {
declareSuperStreamTopology(connection, superStream, DIRECT, rks);
}

static void declareSuperStreamTopology(
Connection connection,
String superStream,
SuperStreamExchangeType exchangeType,
String... rks)
throws Exception {
try (Channel ch = connection.createChannel()) {
ch.exchangeDeclare(
superStream,
BuiltinExchangeType.DIRECT,
exchangeType.value,
true,
false,
Collections.singletonMap("x-super-stream", true));
Expand All @@ -309,6 +328,17 @@ static void declareSuperStreamTopology(Connection connection, String superStream
}
}

public enum SuperStreamExchangeType {
DIRECT("direct"),
SUPER("x-super-stream");

final String value;

SuperStreamExchangeType(String value) {
this.value = value;
}
}

static void deleteSuperStreamTopology(Connection connection, String superStream, int partitions)
throws Exception {
deleteSuperStreamTopology(
Expand Down