Skip to content

Commit

Permalink
GH-881: Null check in closeProducers()
Browse files Browse the repository at this point in the history
Fixes #881

`getAssignedPartitions()` can return `null`.

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

* Add import for `@Nullable`
  • Loading branch information
garyrussell authored and artembilan committed Nov 19, 2018
1 parent 66a7162 commit d6f2f4c
Showing 1 changed file with 15 additions and 11 deletions.
Expand Up @@ -62,6 +62,7 @@
import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition;
import org.springframework.kafka.support.TransactionSupport;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
Expand Down Expand Up @@ -153,6 +154,7 @@ public void setClientIdSuffix(String clientIdSuffix) {
* @return the {@link TopicPartition}s currently assigned to this container,
* either explicitly or by Kafka; may be null if not assigned yet.
*/
@Nullable
public Collection<TopicPartition> getAssignedPartitions() {
ListenerConsumer listenerConsumer = this.listenerConsumer;
if (listenerConsumer != null) {
Expand Down Expand Up @@ -1258,17 +1260,19 @@ public void seekToEnd(String topic, int partition) {
this.seeks.add(new TopicPartitionInitialOffset(topic, partition, SeekPosition.END));
}

private void closeProducers(Collection<TopicPartition> partitions) {
ProducerFactory<?, ?> producerFactory = this.kafkaTxManager.getProducerFactory();
partitions.forEach(tp -> {
try {
producerFactory.closeProducerFor(zombieFenceTxIdSuffix(tp.topic(), tp.partition()));
}
catch (Exception e) {
this.logger.error("Failed to close producer with transaction id suffix: "
+ zombieFenceTxIdSuffix(tp.topic(), tp.partition()), e);
}
});
private void closeProducers(@Nullable Collection<TopicPartition> partitions) {
if (partitions != null) {
ProducerFactory<?, ?> producerFactory = this.kafkaTxManager.getProducerFactory();
partitions.forEach(tp -> {
try {
producerFactory.closeProducerFor(zombieFenceTxIdSuffix(tp.topic(), tp.partition()));
}
catch (Exception e) {
this.logger.error("Failed to close producer with transaction id suffix: "
+ zombieFenceTxIdSuffix(tp.topic(), tp.partition()), e);
}
});
}
}

private String zombieFenceTxIdSuffix(String topic, int partition) {
Expand Down

0 comments on commit d6f2f4c

Please sign in to comment.