Skip to content

Commit

Permalink
finatra-kafka: Add NullKafkaProducer
Browse files Browse the repository at this point in the history
Problem

FinagleKafkaProducer's class member `producer` will try to make network
connections during class construction time. When running unit tests and
deploying the producer to a personal instance, the network connection
failure can be annoying. Since this class member `producer` is evaluated
during class construction time, a subclass of `FinagleKafkaProducer` cannot
override the construction-time network connection behavior either.

Solution

Introduce `KafkaProducerBase` class and `NullKafkaProducer` class where
`FinagleKafkaProducer` and `NullKafkaProducer` classes both extend
`KafkaProducerBase`.

Result

Unit tests and services deployed to a personal instance can use
`NullKafkaProducer` and they will not show network connection failures.

Differential Revision: https://phabricator.twitter.biz/D429004
  • Loading branch information
tad-zhang authored and jenkins committed Feb 5, 2020
1 parent 0f9269a commit d8d4d5d
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 4 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -7,6 +7,11 @@ Note that ``RB_ID=#`` and ``PHAB_ID=#`` correspond to associated message in comm
Unreleased
----------

Added
~~~~~
* finatra: Add NullKafkaProducer for unit tests to avoid network connection failures in the log
``PHAB_ID=D429004``.

20.1.0
------

Expand Down
Expand Up @@ -2,7 +2,6 @@ package com.twitter.finatra.kafka.producers

import com.twitter.finagle.stats.Stat
import com.twitter.finatra.kafka.stats.KafkaFinagleMetricsReporter.sanitizeMetricName
import com.twitter.inject.Logging
import com.twitter.util._
import java.util
import java.util.concurrent.TimeUnit._
Expand All @@ -11,9 +10,17 @@ import org.apache.kafka.clients.producer._
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import scala.collection.JavaConverters._

/**
* A standard implementation of [[KafkaProducerBase]] that forwards
* events in key/value pairs to [[org.apache.kafka.clients.producer.KafkaProducer]]
*
* @param config a configuration of Kafka producer, including the key
* serializer and the value serializer
* @tparam K type of the key in key/value pairs to be published to Kafka
* @tparam V type of the value in key/value pairs to be published to Kafka
*/
class FinagleKafkaProducer[K, V](config: FinagleKafkaProducerConfig[K, V])
extends Closable
with Logging {
extends KafkaProducerBase[K, V] {

private val keySerializer = config.keySerializer.get
private val valueSerializer = config.valueSerializer.get
Expand Down Expand Up @@ -101,7 +108,7 @@ class FinagleKafkaProducer[K, V](config: FinagleKafkaProducerConfig[K, V])
producer.partitionsFor(topic)
}

override def close(deadline: Time): Future[Unit] = {
def close(deadline: Time): Future[Unit] = {
// com.twitter.Util.Closable.close() calls com.twitter.Util.Closable.close(Duration) with
// a duration of Time.Bottom to wait for the resource to be completely relinquished.
// However, the underlying KafkaProducer will throw an IllegalArgumentException when
Expand Down
@@ -0,0 +1,46 @@
package com.twitter.finatra.kafka.producers

import com.twitter.inject.Logging
import com.twitter.util.{Closable, Future, Time}
import java.util
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import org.apache.kafka.common.{PartitionInfo, TopicPartition}

/**
* An interface for publishing events in key/value pairs to Kafka and
* returning a [[com.twitter.util.Future]]
*
* @tparam K type of the key in key/value pairs to be published to Kafka
* @tparam V type of the value in key/value pairs to be published to Kafka
*/
trait KafkaProducerBase[K, V] extends Closable with Logging {
def send(
topic: String,
key: K,
value: V,
timestamp: Long,
partitionIdx: Option[Integer] = None
): Future[RecordMetadata]

def send(producerRecord: ProducerRecord[K, V]): Future[RecordMetadata]

def initTransactions(): Unit

def beginTransaction(): Unit

def sendOffsetsToTransaction(
offsets: Map[TopicPartition, OffsetAndMetadata],
consumerGroupId: String
): Unit

def commitTransaction(): Unit

def abortTransaction(): Unit

def flush(): Unit

def partitionsFor(topic: String): util.List[PartitionInfo]

def close(deadline: Time): Future[Unit]
}
@@ -0,0 +1,56 @@
package com.twitter.finatra.kafka.producers
import com.twitter.util.{Future, Time}
import java.util
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import org.apache.kafka.common.{PartitionInfo, TopicPartition}

/**
* A no-op [[KafkaProducerBase]]. No network connection is created and
* events are discarded, making this producer useful in unit tests
* and as defaults in situations where event publishing is not needed.
*
* @tparam K type of the key in key/value pairs to be published to Kafka
* @tparam V type of the value in key/value pairs to be published to Kafka
*/
class NullKafkaProducer[K, V] extends KafkaProducerBase[K, V] {

val DataRecord = new RecordMetadata(
new TopicPartition("", 0),
0L, 0L, 0L,
0L, 0, 0
)

val EmptyList = new util.ArrayList[PartitionInfo]()

def send(
topic: String,
key: K,
value: V,
timestamp: Long,
partitionIdx: Option[Integer]
): Future[RecordMetadata] = Future.value(DataRecord)

def send(producerRecord: ProducerRecord[K, V]): Future[RecordMetadata] =
Future.value(DataRecord)

def initTransactions(): Unit = ()

def beginTransaction(): Unit = ()

def sendOffsetsToTransaction(
offsets: Map[TopicPartition, OffsetAndMetadata],
consumerGroupId: String
): Unit = ()

def commitTransaction(): Unit = ()

def abortTransaction(): Unit = ()

def flush(): Unit = ()

def partitionsFor(topic: String): util.List[PartitionInfo] =
EmptyList

def close(deadline: Time): Future[Unit] = Future.Unit
}

0 comments on commit d8d4d5d

Please sign in to comment.