Skip to content

Commit

Permalink
Optional request and session timeouts
Browse files Browse the repository at this point in the history
* Add config for request timeouts
* Fall back to Kafka's defaults if they aren't specified
  • Loading branch information
jferris committed Feb 8, 2019
1 parent 310baae commit 51a1f9b
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions src/main/scala/fable/Config.scala
Expand Up @@ -66,6 +66,8 @@ object Config {
* [[Consumer.poll]] is invoked.
* @param pollingTimeout how long to wait before giving up when
* [[Consumer.poll]] is invoked.
* @param requestTimeout how long to wait before giving up on requests to
* Kafka nodes
* @param sessionTimeout how long to wait before assuming a failure has
* occurred when using a consumer group
* @param sslIdentificationAlgorithm the algorithm used to identify the
Expand All @@ -81,7 +83,8 @@ object Config {
groupId: Option[GroupId],
maxPollRecords: Int,
pollingTimeout: FiniteDuration,
sessionTimeout: FiniteDuration,
requestTimeout: Option[FiniteDuration],
sessionTimeout: Option[FiniteDuration],
sslIdentificationAlgorithm: Option[String],
trustedCertificate: Option[String],
uris: URIList
Expand Down Expand Up @@ -112,8 +115,15 @@ object Config {
target.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString)
target.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
new Integer(maxPollRecords))
target.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
sessionTimeout.toMillis.toString)

requestTimeout
.map(_.toMillis.toString)
.foreach(target.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, _))

sessionTimeout
.map(_.toMillis.toString)
.foreach(target.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, _))

groupId
.map(_.name)
.foreach(
Expand Down

0 comments on commit 51a1f9b

Please sign in to comment.