diff --git a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java index bbf454a0c4..c75c0f1579 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java @@ -392,7 +392,9 @@ public void testCustomReplyTopicHeaderIsNotDuplicated() throws Exception { RequestReplyMessageFuture future = template.sendAndReceive(message, Duration.ofSeconds(30)); future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok Message resultingMessage = future.get(30, TimeUnit.SECONDS); + assertThat(resultingMessage.getPayload()).isEqualTo("OK"); + assertThat(resultingMessage.getHeaders()).containsEntry("originalPayload", "expected_message"); } finally { template.stop(); @@ -418,7 +420,9 @@ public void testCustomReplyHeadersAreNotDuplicated() throws Exception { RequestReplyMessageFuture future = template.sendAndReceive(message, Duration.ofSeconds(30)); future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok Message resultingMessage = future.get(30, TimeUnit.SECONDS); + assertThat(resultingMessage.getPayload()).isEqualTo("OK"); + assertThat(resultingMessage.getHeaders()).containsEntry("originalPayload", "expected_message"); } finally { template.stop(); @@ -932,14 +936,6 @@ void testMessageIterableReturn() throws Exception { } } - private static int length(Iterable iterable) { - int counter = 0; - for (Object o : iterable) { - counter++; - } - return counter; - } - @Configuration @EnableKafka public static class Config { @@ -1116,43 +1112,19 @@ public List> handleM(String in) throws InterruptedException { } @KafkaListener(id = CUSTOM_REPLY_HEADER_REQUEST, topics = CUSTOM_REPLY_HEADER_REQUEST) - @SendTo(CUSTOM_REPLY_HEADER_REPLY) // send to custom topic back - public String handleCustomReplyHeaderNoReplyPartition(ConsumerRecord inputMessage) { - Headers headers = inputMessage.headers(); - - if (length(headers.headers("X-Custom-Reply-Header")) != 1) { - return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once"; - } - - if (length(headers.headers(KafkaHeaders.REPLY_PARTITION)) != 0) { - return "It is expected that the user does NOT specify the reply partition in this test case"; - } - - if (!"expected_message".equals(inputMessage.value())) { - return "Expected message is 'expected_message', but got %s".formatted(inputMessage.value()); - } - - return "OK"; + @SendTo(CUSTOM_REPLY_HEADER_REPLY) + public Message handleCustomReplyHeaderNoReplyPartition(ConsumerRecord inputMessage) { + return MessageBuilder.withPayload("OK") + .setHeader("originalPayload", inputMessage.value()) + .build(); } @KafkaListener(id = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST, topics = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST) - @SendTo(CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY) // send to custom topic back - public String handleCustomReplyHeaderDefaultPartitionHeader(ConsumerRecord inputMessage) { - Headers headers = inputMessage.headers(); - - if (length(headers.headers("X-Custom-Reply-Header")) != 1) { - return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once"; - } - - if (length(headers.headers("X-Custom-Reply-Partition")) != 1) { - return "Executed a single reply partition header '%s' in the incoming message".formatted(KafkaHeaders.REPLY_PARTITION); - } - - if (!"expected_message".equals(inputMessage.value())) { - return "Expected message is 'expected_message', but got %s".formatted(inputMessage.value()); - } - - return "OK"; + @SendTo(CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY) + public Message handleCustomReplyHeaderDefaultPartitionHeader(ConsumerRecord inputMessage) { + return MessageBuilder.withPayload("OK") + .setHeader("originalPayload", inputMessage.value()) + .build(); } }