Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Commit

Permalink
Remove pause/resumer; add Lifecycle
Browse files Browse the repository at this point in the history
Pause/resume was too chatty - user can avoid a rebalance with `max.poll.interval.ms`.

Implement `Lifecyle` and `close()` the consumer on `stop()`.
  • Loading branch information
garyrussell authored and artembilan committed Jan 23, 2018
1 parent 0e65032 commit 623f9de
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.kafka.common.errors.WakeupException;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.Lifecycle;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
Expand Down Expand Up @@ -75,10 +76,12 @@
*
*/
public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object>
implements DisposableBean {
implements DisposableBean, Lifecycle {

private static final long DEFAULT_POLL_TIMEOUT = 50L;

private final Log logger = LogFactory.getLog(getClass());

private final ConsumerFactory<K, V> consumerFactory;

private final KafkaAckCallbackFactory<K, V> ackCallbackFactory;
Expand All @@ -105,7 +108,7 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object>

private volatile Consumer<K, V> consumer;

private volatile Collection<TopicPartition> partitions;
private volatile boolean running;

public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, String... topics) {
this(consumerFactory, new KafkaAckCallbackFactory<>(), topics);
Expand Down Expand Up @@ -244,6 +247,26 @@ private ConsumerFactory<K, V> fixOrRejectConsumerFactory(ConsumerFactory<K, V> s
}
}

@Override
public synchronized boolean isRunning() {
return this.running;
}

@Override
public synchronized void start() {
this.running = true;
}

@Override
public synchronized void stop() {
synchronized (this.consumerMonitor) {
if (this.consumer != null) {
this.consumer.close(30, TimeUnit.SECONDS);
}
}
this.running = false;
}

@Override
protected synchronized Object doReceive() {
if (this.consumer == null) {
Expand All @@ -252,12 +275,7 @@ protected synchronized Object doReceive() {
ConsumerRecord<K, V> record;
TopicPartition topicPartition;
synchronized (this.consumerMonitor) {
Set<TopicPartition> paused = this.consumer.paused();
if (paused.size() > 0) {
this.consumer.resume(paused);
}
ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout);
this.consumer.pause(this.partitions);
if (records == null || records.count() == 0) {
return null;
}
Expand Down Expand Up @@ -295,15 +313,19 @@ protected void createConsumer() {

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
KafkaMessageSource.this.partitions = Collections.emptyList();
if (KafkaMessageSource.this.logger.isInfoEnabled()) {
KafkaMessageSource.this.logger.info("Partitions revoked: " + partitions);
}
if (KafkaMessageSource.this.rebalanceListener != null) {
KafkaMessageSource.this.rebalanceListener.onPartitionsRevoked(partitions);
}
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
KafkaMessageSource.this.partitions = new ArrayList<>(partitions);
if (KafkaMessageSource.this.logger.isInfoEnabled()) {
KafkaMessageSource.this.logger.info("Partitions assigned: " + partitions);
}
if (KafkaMessageSource.this.rebalanceListener != null) {
KafkaMessageSource.this.rebalanceListener.onPartitionsAssigned(partitions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,29 +136,15 @@ public void testAck() {
source.destroy();
InOrder inOrder = inOrder(consumer);
inOrder.verify(consumer).subscribe(anyCollection(), any(ConsumerRebalanceListener.class));
inOrder.verify(consumer).paused();
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(1L)));
inOrder.verify(consumer).paused();
inOrder.verify(consumer).resume(anyCollection());
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(2L)));
inOrder.verify(consumer).paused();
inOrder.verify(consumer).resume(anyCollection());
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(3L)));
inOrder.verify(consumer).paused();
inOrder.verify(consumer).resume(anyCollection());
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(4L)));
inOrder.verify(consumer).paused();
inOrder.verify(consumer).resume(anyCollection());
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).close(30, TimeUnit.SECONDS);
inOrder.verifyNoMoreInteractions();
}
Expand Down Expand Up @@ -212,10 +198,15 @@ public void testAckOutOfOrder() {
KafkaMessageSource source = new KafkaMessageSource(consumerFactory, "foo");

Message<?> received1 = source.receive();
consumer.paused(); // need some other interaction with mock between polls for InOrder
Message<?> received2 = source.receive();
consumer.paused(); // need some other interaction with mock between polls for InOrder
Message<?> received3 = source.receive();
consumer.paused(); // need some other interaction with mock between polls for InOrder
Message<?> received4 = source.receive();
consumer.paused(); // need some other interaction with mock between polls for InOrder
Message<?> received5 = source.receive();
consumer.paused(); // need some other interaction with mock between polls for InOrder
Message<?> received6 = source.receive();
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received3)
.acknowledge(Status.ACCEPT);
Expand All @@ -233,27 +224,17 @@ public void testAckOutOfOrder() {
source.destroy();
InOrder inOrder = inOrder(consumer);
inOrder.verify(consumer).subscribe(anyCollection(), any(ConsumerRebalanceListener.class));
inOrder.verify(consumer).paused();
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).paused();
inOrder.verify(consumer).resume(anyCollection());
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).paused();
inOrder.verify(consumer).resume(anyCollection());
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).paused();
inOrder.verify(consumer).resume(anyCollection());
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).paused();
inOrder.verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(3L)));
inOrder.verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(6L)));
inOrder.verify(consumer).paused();
inOrder.verify(consumer).resume(anyCollection());
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).close(30, TimeUnit.SECONDS);
inOrder.verifyNoMoreInteractions();
}
Expand Down Expand Up @@ -310,28 +291,15 @@ public void testNack() {
source.destroy();
assertThat(received).isNull();
InOrder inOrder = inOrder(consumer);
inOrder.verify(consumer).paused();
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).seek(topicPartition, 0L); // rollback
inOrder.verify(consumer).paused();
inOrder.verify(consumer).resume(anyCollection());
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(1L)));
inOrder.verify(consumer).paused();
inOrder.verify(consumer).resume(anyCollection());
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).seek(topicPartition, 1L); // rollback
inOrder.verify(consumer).paused();
inOrder.verify(consumer).resume(anyCollection());
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(2L)));
inOrder.verify(consumer).paused();
inOrder.verify(consumer).resume(anyCollection());
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).close(30, TimeUnit.SECONDS);
inOrder.verifyNoMoreInteractions();
}
Expand All @@ -354,8 +322,7 @@ public void testNackWithLaterInflight() {
willAnswer(i -> paused.get()).given(consumer).paused();
Map<TopicPartition, List<ConsumerRecord>> records1 = new LinkedHashMap<>();
records1.put(topicPartition, Arrays.asList(
new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "foo"),
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "bar")));
new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "foo")));
ConsumerRecords cr1 = new ConsumerRecords(records1);
Map<TopicPartition, List<ConsumerRecord>> records2 = new LinkedHashMap<>();
records2.put(topicPartition, Collections.singletonList(
Expand All @@ -370,6 +337,7 @@ public void testNackWithLaterInflight() {
KafkaMessageSource source = new KafkaMessageSource(consumerFactory, "foo");

Message<?> received1 = source.receive();
consumer.paused(); // need some other interaction with mock between polls for InOrder
Message<?> received2 = source.receive(); // inflight
assertThat(received1.getHeaders().get(KafkaHeaders.OFFSET)).isEqualTo(0L);
AcknowledgmentCallback ack1 = StaticMessageHeaderAccessor.getAcknowledgmentCallback(received1);
Expand Down Expand Up @@ -397,12 +365,9 @@ public void testNackWithLaterInflight() {
source.destroy();
assertThat(received1).isNull();
InOrder inOrder = inOrder(consumer, log1, log2);
inOrder.verify(consumer).paused();
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).paused();
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection()); // in flight
inOrder.verify(consumer).seek(topicPartition, 0L); // rollback
inOrder.verify(log1).isWarnEnabled();
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
Expand All @@ -416,20 +381,11 @@ public void testNackWithLaterInflight() {
assertThat(captor.getValue())
.contains("Cannot commit offset for ConsumerRecord")
.contains("; an earlier offset was rolled back");
inOrder.verify(consumer).paused();
inOrder.verify(consumer).resume(anyCollection());
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(1L)));
inOrder.verify(consumer).paused();
inOrder.verify(consumer).resume(anyCollection());
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(2L)));
inOrder.verify(consumer).paused();
inOrder.verify(consumer).resume(anyCollection());
inOrder.verify(consumer).poll(anyLong());
inOrder.verify(consumer).pause(anyCollection());
inOrder.verify(consumer).close(30, TimeUnit.SECONDS);
inOrder.verifyNoMoreInteractions();
}
Expand Down

0 comments on commit 623f9de

Please sign in to comment.