Skip to content

Commit

Permalink
Fixed tests related to GH-1527 change in core
Browse files Browse the repository at this point in the history
  • Loading branch information
olegz committed Nov 15, 2018
1 parent b589f32 commit 47443f8
Showing 1 changed file with 13 additions and 13 deletions.
Expand Up @@ -323,10 +323,10 @@ public void testTrustedPackages() throws Exception {

moduleOutputChannel.send(message);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
AtomicReference<Message<String>> inboundMessageRef = new AtomicReference<>();
moduleInputChannel.subscribe(message1 -> {
try {
inboundMessageRef.set((Message<byte[]>) message1);
inboundMessageRef.set((Message<String>) message1);
}
finally {
latch.countDown();
Expand All @@ -336,7 +336,7 @@ public void testTrustedPackages() throws Exception {


Assertions.assertThat(inboundMessageRef.get()).isNotNull();
Assertions.assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("foo");
Assertions.assertThat(inboundMessageRef.get().getPayload()).isEqualTo("foo");
Assertions.assertThat(inboundMessageRef.get().getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)).isNull();
Assertions.assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
Expand Down Expand Up @@ -374,10 +374,10 @@ public void testSendAndReceiveNoOriginalContentType() throws Exception {
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build();
moduleOutputChannel.send(message);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
AtomicReference<Message<String>> inboundMessageRef = new AtomicReference<>();
moduleInputChannel.subscribe(message1 -> {
try {
inboundMessageRef.set((Message<byte[]>) message1);
inboundMessageRef.set((Message<String>) message1);
}
finally {
latch.countDown();
Expand All @@ -386,7 +386,7 @@ public void testSendAndReceiveNoOriginalContentType() throws Exception {
Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");

assertThat(inboundMessageRef.get()).isNotNull();
assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("foo");
assertThat(inboundMessageRef.get().getPayload()).isEqualTo("foo");
assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
producerBinding.unbind();
Expand All @@ -413,7 +413,7 @@ public void testSendAndReceive() throws Exception {
.setHeader(MessageHeaders.CONTENT_TYPE,
MimeTypeUtils.APPLICATION_OCTET_STREAM)
.build();

// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
moduleOutputChannel.send(message);
Expand All @@ -433,13 +433,13 @@ public void testSendAndReceive() throws Exception {
assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("foo");
assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.APPLICATION_OCTET_STREAM);

Map<String, TopicInformation> topicsInUse = ((KafkaTestBinder)binder).getCoreBinder().getTopicsInUse();
assertThat(topicsInUse.keySet()).contains("foo.bar");
TopicInformation topic = topicsInUse.get("foo.bar");
assertThat(topic.isConsumerTopic()).isTrue();
assertThat(topic.getConsumerGroup()).isEqualTo("testSendAndReceive");

producerBinding.unbind();
consumerBinding.unbind();
}
Expand Down Expand Up @@ -1471,7 +1471,7 @@ public void testAnonymousGroup() throws Exception {
TopicInformation topic = topicsInUse.get("defaultGroup.0");
assertThat(topic.isConsumerTopic()).isTrue();
assertThat(topic.getConsumerGroup()).startsWith("anonymous");

producerBinding.unbind();
binding1.unbind();
binding2.unbind();
Expand Down Expand Up @@ -2392,10 +2392,10 @@ public void testBuiltinSerialization() throws Exception {
binderBindUnbindLatency();
moduleOutputChannel.send(message);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
AtomicReference<Message<String>> inboundMessageRef = new AtomicReference<>();
moduleInputChannel.subscribe(message1 -> {
try {
inboundMessageRef.set((Message<byte[]>) message1);
inboundMessageRef.set((Message<String>) message1);
}
finally {
latch.countDown();
Expand All @@ -2404,7 +2404,7 @@ public void testBuiltinSerialization() throws Exception {
Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");

assertThat(inboundMessageRef.get()).isNotNull();
assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("test");
assertThat(inboundMessageRef.get().getPayload()).isEqualTo("test");
assertThat(inboundMessageRef.get().getHeaders()).containsEntry("contentType", MimeTypeUtils.TEXT_PLAIN);
}
finally {
Expand Down

0 comments on commit 47443f8

Please sign in to comment.