diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java index a79d98441..6b3104a8c 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java @@ -323,10 +323,10 @@ public void testTrustedPackages() throws Exception { moduleOutputChannel.send(message); CountDownLatch latch = new CountDownLatch(1); - AtomicReference> inboundMessageRef = new AtomicReference<>(); + AtomicReference> inboundMessageRef = new AtomicReference<>(); moduleInputChannel.subscribe(message1 -> { try { - inboundMessageRef.set((Message) message1); + inboundMessageRef.set((Message) message1); } finally { latch.countDown(); @@ -336,7 +336,7 @@ public void testTrustedPackages() throws Exception { Assertions.assertThat(inboundMessageRef.get()).isNotNull(); - Assertions.assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("foo"); + Assertions.assertThat(inboundMessageRef.get().getPayload()).isEqualTo("foo"); Assertions.assertThat(inboundMessageRef.get().getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)).isNull(); Assertions.assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE)) .isEqualTo(MimeTypeUtils.TEXT_PLAIN); @@ -374,10 +374,10 @@ public void testSendAndReceiveNoOriginalContentType() throws Exception { .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build(); moduleOutputChannel.send(message); CountDownLatch latch = new CountDownLatch(1); - AtomicReference> inboundMessageRef = new AtomicReference<>(); + AtomicReference> inboundMessageRef = new AtomicReference<>(); moduleInputChannel.subscribe(message1 -> { try { - inboundMessageRef.set((Message) message1); + inboundMessageRef.set((Message) message1); } finally { latch.countDown(); @@ -386,7 +386,7 @@ public void testSendAndReceiveNoOriginalContentType() throws Exception { Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message"); assertThat(inboundMessageRef.get()).isNotNull(); - assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("foo"); + assertThat(inboundMessageRef.get().getPayload()).isEqualTo("foo"); assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE)) .isEqualTo(MimeTypeUtils.TEXT_PLAIN); producerBinding.unbind(); @@ -413,7 +413,7 @@ public void testSendAndReceive() throws Exception { .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_OCTET_STREAM) .build(); - + // Let the consumer actually bind to the producer before sending a msg binderBindUnbindLatency(); moduleOutputChannel.send(message); @@ -433,13 +433,13 @@ public void testSendAndReceive() throws Exception { assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("foo"); assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE)) .isEqualTo(MimeTypeUtils.APPLICATION_OCTET_STREAM); - + Map topicsInUse = ((KafkaTestBinder)binder).getCoreBinder().getTopicsInUse(); assertThat(topicsInUse.keySet()).contains("foo.bar"); TopicInformation topic = topicsInUse.get("foo.bar"); assertThat(topic.isConsumerTopic()).isTrue(); assertThat(topic.getConsumerGroup()).isEqualTo("testSendAndReceive"); - + producerBinding.unbind(); consumerBinding.unbind(); } @@ -1471,7 +1471,7 @@ public void testAnonymousGroup() throws Exception { TopicInformation topic = topicsInUse.get("defaultGroup.0"); assertThat(topic.isConsumerTopic()).isTrue(); assertThat(topic.getConsumerGroup()).startsWith("anonymous"); - + producerBinding.unbind(); binding1.unbind(); binding2.unbind(); @@ -2392,10 +2392,10 @@ public void testBuiltinSerialization() throws Exception { binderBindUnbindLatency(); moduleOutputChannel.send(message); CountDownLatch latch = new CountDownLatch(1); - AtomicReference> inboundMessageRef = new AtomicReference<>(); + AtomicReference> inboundMessageRef = new AtomicReference<>(); moduleInputChannel.subscribe(message1 -> { try { - inboundMessageRef.set((Message) message1); + inboundMessageRef.set((Message) message1); } finally { latch.countDown(); @@ -2404,7 +2404,7 @@ public void testBuiltinSerialization() throws Exception { Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message"); assertThat(inboundMessageRef.get()).isNotNull(); - assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("test"); + assertThat(inboundMessageRef.get().getPayload()).isEqualTo("test"); assertThat(inboundMessageRef.get().getHeaders()).containsEntry("contentType", MimeTypeUtils.TEXT_PLAIN); } finally {