Skip to content

Commit

Permalink
Close volatile resource in some Kafka tests
Browse files Browse the repository at this point in the history
  • Loading branch information
artembilan committed Sep 7, 2022
1 parent 96d3b00 commit 47cfae9
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
assertThat(gateway.isPaused()).isFalse();

gateway.stop();
consumer.close();
pf.reset();
}

@Test
Expand Down Expand Up @@ -268,6 +270,8 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
assertThat(record).has(value("ERROR"));

gateway.stop();
consumer.close();
pf.reset();
}

@Test
Expand Down Expand Up @@ -352,6 +356,8 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
assertThat(record).has(value("ERROR"));

gateway.stop();
consumer.close();
pf.reset();
}

@Test
Expand Down Expand Up @@ -409,6 +415,7 @@ public <T, E extends Throwable> void onError(RetryContext context, RetryCallback

gateway.stop();
consumer.close();
pf.reset();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
assertThat(((ConversionException) error.getPayload()).getRecord()).isNotNull();

adapter.stop();
pf.reset();
}

@Test
Expand Down Expand Up @@ -277,6 +278,7 @@ protected boolean doSend(Message<?> message, long timeout) {
assertThat(receivedMessageHistory.get().toString()).isEqualTo("myNullChannel");

adapter.stop();
pf.reset();
}


Expand Down Expand Up @@ -387,6 +389,7 @@ protected boolean doSend(Message<?> message, long timeout) {
assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(originalMessage).get()).isEqualTo(1);

adapter.stop();
pf.reset();
}

@Test
Expand Down Expand Up @@ -474,7 +477,9 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
assertThat(((ConversionException) error.getPayload()).getMessage())
.contains("Failed to convert to message");
assertThat(((ConversionException) error.getPayload()).getRecords()).hasSize(2);

adapter.stop();
pf.reset();
}

@Test
Expand Down Expand Up @@ -517,6 +522,7 @@ void testInboundJson() {
assertThat(received.getPayload()).isInstanceOf(Map.class);

adapter.stop();
pf.reset();
}

@Test
Expand Down Expand Up @@ -564,6 +570,7 @@ void testInboundJsonWithPayload() {
assertThat(received.getPayload()).isEqualTo(new Foo("baz"));

adapter.stop();
pf.reset();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
assertThat(received).isNull();
assertThat(KafkaTestUtils.getPropertyValue(source, "consumer.fetcher.minBytes")).isEqualTo(2);
source.destroy();
producerFactory.destroy();
template.destroy();
}

}

0 comments on commit 47cfae9

Please sign in to comment.