Skip to content

Commit

Permalink
GH-1621: Fix test for back-port
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Nov 12, 2020
1 parent c240737 commit 21f511b
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.withSettings;

import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -113,11 +113,11 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception {
HashMap<TopicPartition, OffsetAndMetadata> commit2 = new HashMap<>();
commit2.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
commit2.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(commit1), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(commit1), anyString());
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 1L);
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 2), 0L);
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(commit2), any(ConsumerGroupMetadata.class));
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(commit2), anyString());
assertThat(this.config.count).isEqualTo(2);
assertThat(this.config.contents.toString()).isEqualTo("[[foo, bar, baz, qux, fiz, buz], [qux, fiz, buz]]");
}
Expand Down Expand Up @@ -219,18 +219,18 @@ public Consumer consumer() {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
Producer producer() {
Producer producer = mock(Producer.class, withSettings().verboseLogging());
Producer producer = mock(Producer.class);
willAnswer(inv -> {
this.commitLatch.countDown();
return null;
}).given(producer).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class));
}).given(producer).sendOffsetsToTransaction(any(), anyString());
return producer;
}

@SuppressWarnings("rawtypes")
@Bean
ProducerFactory pf() {
ProducerFactory pf = mock(ProducerFactory.class, withSettings().verboseLogging());
ProducerFactory pf = mock(ProducerFactory.class);
given(pf.createProducer(isNull())).willReturn(producer());
given(pf.transactionCapable()).willReturn(true);
return pf;
Expand All @@ -251,6 +251,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setTransactionManager(tm());
factory.getContainerProperties().setSubBatchPerPartition(false);
factory.setBatchListener(true);
return factory;
}
Expand Down

0 comments on commit 21f511b

Please sign in to comment.