Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,9 @@ public void testCustomReplyTopicHeaderIsNotDuplicated() throws Exception {
RequestReplyMessageFuture<Integer, String> future = template.sendAndReceive(message, Duration.ofSeconds(30));
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
Message<?> resultingMessage = future.get(30, TimeUnit.SECONDS);

assertThat(resultingMessage.getPayload()).isEqualTo("OK");
assertThat(resultingMessage.getHeaders()).containsEntry("originalPayload", "expected_message");
}
finally {
template.stop();
Expand All @@ -418,7 +420,9 @@ public void testCustomReplyHeadersAreNotDuplicated() throws Exception {
RequestReplyMessageFuture<Integer, String> future = template.sendAndReceive(message, Duration.ofSeconds(30));
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
Message<?> resultingMessage = future.get(30, TimeUnit.SECONDS);

assertThat(resultingMessage.getPayload()).isEqualTo("OK");
assertThat(resultingMessage.getHeaders()).containsEntry("originalPayload", "expected_message");
}
finally {
template.stop();
Expand Down Expand Up @@ -932,14 +936,6 @@ void testMessageIterableReturn() throws Exception {
}
}

private static int length(Iterable<?> iterable) {
int counter = 0;
for (Object o : iterable) {
counter++;
}
return counter;
}

@Configuration
@EnableKafka
public static class Config {
Expand Down Expand Up @@ -1116,43 +1112,19 @@ public List<Message<String>> handleM(String in) throws InterruptedException {
}

@KafkaListener(id = CUSTOM_REPLY_HEADER_REQUEST, topics = CUSTOM_REPLY_HEADER_REQUEST)
@SendTo(CUSTOM_REPLY_HEADER_REPLY) // send to custom topic back
public String handleCustomReplyHeaderNoReplyPartition(ConsumerRecord<?, String> inputMessage) {
Headers headers = inputMessage.headers();

if (length(headers.headers("X-Custom-Reply-Header")) != 1) {
return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once";
}

if (length(headers.headers(KafkaHeaders.REPLY_PARTITION)) != 0) {
return "It is expected that the user does NOT specify the reply partition in this test case";
}

if (!"expected_message".equals(inputMessage.value())) {
return "Expected message is 'expected_message', but got %s".formatted(inputMessage.value());
}

return "OK";
@SendTo(CUSTOM_REPLY_HEADER_REPLY)
public Message<String> handleCustomReplyHeaderNoReplyPartition(ConsumerRecord<?, String> inputMessage) {
return MessageBuilder.withPayload("OK")
.setHeader("originalPayload", inputMessage.value())
.build();
}

@KafkaListener(id = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST, topics = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST)
@SendTo(CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY) // send to custom topic back
public String handleCustomReplyHeaderDefaultPartitionHeader(ConsumerRecord<?, String> inputMessage) {
Headers headers = inputMessage.headers();

if (length(headers.headers("X-Custom-Reply-Header")) != 1) {
return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once";
}

if (length(headers.headers("X-Custom-Reply-Partition")) != 1) {
return "Executed a single reply partition header '%s' in the incoming message".formatted(KafkaHeaders.REPLY_PARTITION);
}

if (!"expected_message".equals(inputMessage.value())) {
return "Expected message is 'expected_message', but got %s".formatted(inputMessage.value());
}

return "OK";
@SendTo(CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY)
public Message<String> handleCustomReplyHeaderDefaultPartitionHeader(ConsumerRecord<?, String> inputMessage) {
return MessageBuilder.withPayload("OK")
.setHeader("originalPayload", inputMessage.value())
.build();
}
}

Expand Down