Skip to content

Commit

Permalink
Allow configuration of auto reset, fetch size
Browse files Browse the repository at this point in the history
These are common values to configure for setting up consumers, and the
fetch size is required to be able to run in a throttled instance like
Heroku.
  • Loading branch information
jferris committed Mar 27, 2019
1 parent 77c9bc6 commit eeb01b0
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 0 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Expand Up @@ -8,6 +8,7 @@ val kafkaVersion = "2.1.+"
val log4CatsVersion = "0.2.+"
val pureConfigVersion = "0.10.+"
val scalaTestVersion = "3.0.+"
val squantsVersion = "1.3.+"

def findJar(classPath: Seq[Attributed[File]], name: String): File =
classPath.find(_.data.toString.containsSlice(name)).get.data
Expand Down Expand Up @@ -75,12 +76,14 @@ lazy val fable = (project in file("."))
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % fs2Version,
"com.github.pureconfig" %% "pureconfig" % pureConfigVersion,
"com.github.pureconfig" %% "pureconfig-squants" % pureConfigVersion,
"com.heroku.sdk" % "env-keystore" % envKeyStoreVersion,
"io.chrisdavenport" %% "log4cats-slf4j" % log4CatsVersion,
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.typelevel" %% "cats-core" % catsVersion,
"org.typelevel" %% "cats-effect" % catsEffectVersion,
"org.typelevel" %% "squants" % squantsVersion,
"ch.qos.logback" % "logback-classic" % "1.2.3" % Test
)
)
40 changes: 40 additions & 0 deletions src/main/scala/fable/Config.scala
Expand Up @@ -10,8 +10,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.config.SslConfigs
import pureconfig.ConfigReader
import pureconfig.generic.semiauto.deriveReader
import pureconfig.module.squants._
import scala.concurrent.duration.FiniteDuration
import scala.util.Try
import squants.information.Information

/**
* Configuration objects for Fable. These objects can be constructed manually,
Expand All @@ -29,9 +31,11 @@ import scala.util.Try
*
* test-consumer {
* auto-commit = false
* auto-offset-reset = "latest"
* batch-size = 1024
* client-id = "fable-test"
* group-id = "fable-test"
* fetch-max-size = 1k
* max-poll-records = 1024
* polling-timeout = 1 second
* session-timeout = 30 seconds
Expand All @@ -57,9 +61,12 @@ object Config {
*
* @constructor
* @param autoCommit whether to automatically commit the previous offset
* @param autoOffsetReset what to do when there is no initial offset in
* Kafka or if the current offset does not exist any more on the server
* @param clientCertificate SSL certificate used to authenticate the client
* @param clientId identifier for tracking which client is making requests
* @param clientKey SSL private key used to authenticate the client
* @param fetchMaxSize the maximum data size to fetch in a single poll
* @param groupId identifier for the consumer group this consumer will join
* each time [[Consumer.poll]] is invoked.
* @param maxPollRecords the maximum number of records to return each time
Expand All @@ -77,9 +84,11 @@ object Config {
*/
case class Consumer(
autoCommit: Boolean,
autoOffsetReset: Config.Consumer.AutoOffsetReset,
clientCertificate: Option[String],
clientId: String,
clientKey: Option[String],
fetchMaxSize: Option[Information],
groupId: Option[GroupId],
maxPollRecords: Int,
pollingTimeout: FiniteDuration,
Expand Down Expand Up @@ -116,6 +125,10 @@ object Config {
target.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
new Integer(maxPollRecords))

fetchMaxSize
.map(_.toBytes.toInt.toString)
.foreach(target.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, _))

requestTimeout
.map(_.toMillis.toString)
.foreach(target.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, _))
Expand Down Expand Up @@ -178,6 +191,33 @@ object Config {
}

object Consumer {
sealed trait AutoOffsetReset {
def renderString: String
}

object AutoOffsetReset {
case object Earliest extends AutoOffsetReset {
val renderString = "earliest"
}

case object Latest extends AutoOffsetReset {
val renderString = "latest"
}

case object None extends AutoOffsetReset {
val renderString = "none"
}

implicit val autoOffsetResetReader: ConfigReader[AutoOffsetReset] =
pureconfig.ConfigReader[String].emap { string =>
List(Earliest, Latest, None)
.filter(_.renderString == string)
.headOption
.toRight(pureconfig.error
.CannotConvert(string, "auto offset", "Unkown value"))
}
}

implicit val consumerConfigReader: ConfigReader[Consumer] = deriveReader
}

Expand Down
1 change: 1 addition & 0 deletions src/test/resources/application.conf
@@ -1,6 +1,7 @@
kafka {
consumer {
auto-commit = false
auto-offset-reset = "earliest"
batch-size = 1024
client-id = "fable-test"
group-id = "fable-test"
Expand Down

0 comments on commit eeb01b0

Please sign in to comment.