Skip to content
Simple Kafka Client for fs2
Branch: master
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
.circleci
build
project
src
.gitignore
.scalafmt.conf
LICENSE
README.md
build.sbt

README.md

fs2-kafka-client

This project has been deprecated in favour of the fs2-kafka library.

Existing sources and artifacts remain available, but users are encouraged to migrate.
Following is a migration guide showing how to migrate examples to use fs2-kafka.

Migration Guide

Consuming

If you're using consume:

  • the keyDeserializer and valueDeserializer have been moved to ConsumerSettings,
  • change to use consumerStream, which returns a fs2.kafka.KafkaConsumer instance,
  • the KafkaConsumer instance has a subscribe function for subscribing to topics,
  • auto commits are disabled by default, enable it using withEnableAutoCommit(true),
  • the maxParallelism setting no longer exist, but stream with mapAsync achieves parallelism,
  • you need a consumer ExecutionContext, and a sensible default is consumerExecutionContextStream.

Before, with fs2-kafka-client:

import cats.effect.IO
import com.ovoenergy.fs2.kafka._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.serialization._
import scala.concurrent.duration._

val consumerSettings =
  ConsumerSettings(
    pollTimeout = 250.milliseconds,
    maxParallelism = 4,
    nativeSettings = Map(
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "my-group-id",
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )
  )

consume[IO](
  TopicSubscription(Set("my-topic")),
  new StringDeserializer,
  new StringDeserializer,
  consumerSettings
)

After, with fs2-kafka:

import cats.data.NonEmptyList
import cats.effect.IO
import fs2.kafka._
import org.apache.kafka.common.serialization._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

val consumerSettings = (executionContext: ExecutionContext) =>
  ConsumerSettings(
    keyDeserializer = new StringDeserializer,
    valueDeserializer = new StringDeserializer,
    executionContext = executionContext
  )
  .withAutoOffsetReset(AutoOffsetReset.Earliest)
  .withBootstrapServers("localhost:9092")
  .withPollTimeout(250.milliseconds)
  .withGroupId("my-group-id")

for {
  executionContext <- consumerExecutionContextStream[IO]
  consumer <- consumerStream[IO].using(consumerSettings(executionContext))
  _ <- consumer.subscribe(NonEmptyList.one("my-topic"))
  message <- consumer.stream
} yield message.record

If you're using consumeProcessAndCommit or consumeProcessBatchWithPipeAndCommit:

  • see the previous notes regarding migration of the consume function above,
  • change to use consumerStream, which gives you more flexibility of when to commit,
  • the example below commits every 500 records or 15 seconds, whichever happens first.

Before, with fs2-kafka-client:

import cats.effect.IO
import com.ovoenergy.fs2.kafka._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.serialization._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

implicit val executionContext: ExecutionContext =
  ExecutionContext.global

val consumerSettings =
  ConsumerSettings(
    pollTimeout = 250.milliseconds,
    maxParallelism = 4,
    nativeSettings = Map(
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "my-group-id",
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )
  )

def processRecord(record: ConsumerRecord[String, String]): IO[Unit] =
  IO.unit // Apply some side-effect

consumeProcessAndCommit[IO](
  TopicSubscription(Set("my-topic")),
  new StringDeserializer,
  new StringDeserializer,
  consumerSettings
)(processRecord)

After, with fs2-kafka:

import cats.data.NonEmptyList
import cats.effect.IO
import cats.syntax.functor._
import fs2.kafka._
import org.apache.kafka.common.serialization._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

val consumerSettings = (executionContext: ExecutionContext) =>
  ConsumerSettings(
    keyDeserializer = new StringDeserializer,
    valueDeserializer = new StringDeserializer,
    executionContext = executionContext
  )
  .withAutoOffsetReset(AutoOffsetReset.Earliest)
  .withBootstrapServers("localhost:9092")
  .withPollTimeout(250.milliseconds)
  .withGroupId("my-group-id")

def processRecord(record: ConsumerRecord[String, String]): IO[Unit] =
  IO.unit // Apply some side-effect

for {
  executionContext <- consumerExecutionContextStream[IO]
  consumer <- consumerStream[IO].using(consumerSettings(executionContext))
  _ <- consumer.subscribe(NonEmptyList.one("my-topic"))
  _ <- consumer.stream
    // maxParallelism
    .mapAsync(4) { message =>
      processRecord(message.record)
        .as(message.committableOffset)
    }
    .groupWithin(500, 15.seconds)
    .map(_.foldLeft(CommittableOffsetBatch.empty[IO])(_ updated _))
    .evalMap(_.commit)
} yield ()

Producing

If you're using producerStream:

  • the keySerializer and valueSerializer have been moved to ProducerSettings,
  • change to use producerStream instead of produceRecord, returning a fs2.kafka.KafkaProducer,
  • the KakaProducer exposes functions produce and produceBatched for producing records,
  • the produce and produceBatched functions can produce zero, one, or multiple records,
  • once records have been produced, it can then emit a single passthrough value,
  • for parity, wrap the record in ProducerMessage.single with a Unit passthrough.

Before, with fs2-kafka-client:

import cats.effect.IO
import com.ovoenergy.fs2.kafka._
import fs2._
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.serialization._

val producerSettings =
  ProducerSettings(
    Map(
      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
      ProducerConfig.ACKS_CONFIG -> "all",
      ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> "1"
    )
  )

val topic = "my-topic"
val key = "my-key"
val value = "my-value"

producerStream[IO](
  producerSettings,
  new StringSerializer,
  new StringSerializer
).evalMap { producer =>
  produceRecord[IO](producer, new ProducerRecord(topic, key, value))
}

After, with fs2-kafka:

import cats.effect.IO
import fs2.kafka._
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.serialization._

val producerSettings =
  ProducerSettings(
    keySerializer = new StringSerializer,
    valueSerializer = new StringSerializer
  )
  .withMaxInFlightRequestsPerConnection(1)
  .withBootstrapServers("localhost:9092")
  .withAcks(Acks.All)

val topic = "my-topic"
val key = "my-key"
val value = "my-value"

producerStream[IO]
  .using(producerSettings)
  .evalMap { producer =>
    val record = new ProducerRecord(topic, key, value)
    val message = ProducerMessage.single(record, ())
    producer.produce(message)
  }

If you're using produceRecord (without producerStream):

  • you can either switch to producerStream (see above), or use producerResource,
  • the producerResource function returns a KafkaProducer like producerStream,
  • an example using producerResource is shown below.

Before, with fs2-kafka-client:

import cats.effect.IO
import com.ovoenergy.fs2.kafka._
import java.util.Properties
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.serialization._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

implicit val exeuctionContext: ExecutionContext =
  ExecutionContext.global

val topic = "my-topic"
val key = "my-key"
val value = "my-value"

val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)

val producer: Producer[String, String] =
  new KafkaProducer[String, String](props)

produceRecord[IO](producer, new ProducerRecord(topic, key, value))

After, with fs2-kafka:

import cats.effect.IO
import fs2.kafka._
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.serialization._

val producerSettings =
  ProducerSettings(
    keySerializer = new StringSerializer,
    valueSerializer = new StringSerializer
  )
  .withBootstrapServers("localhost:9092")
  .withClientId("KafkaExampleProducer")

val topic = "my-topic"
val key = "my-key"
val value = "my-value"

producerResource[IO]
  .using(producerSettings)
  .use { producer =>
    val record = new ProducerRecord(topic, key, value)
    val message = ProducerMessage.single(record, ())
    producer.produce(message)
  }
You can’t perform that action at this time.