Skip to content

Commit

Permalink
finatra-kafka: Implicit implementation of Flaggable[SeekStrategy] a…
Browse files Browse the repository at this point in the history
…nd `Flaggable[OffsetResetStrategy]`

Problem

When creating a flag for type `c.t.finatra.kafka.domain.SeekStrategy` or
`org.apache.kafka.clients.consumer.OffsetResetStrategy` it is not possible
to be parsed into an instance without a defined `Flaggable` for the type.

Solution

Create an implicit implementation of `Flaggable[SeekStrategy]`
and `Flaggable[OffsetResetStrategy]`

Result

Users can now simply define a flag for a `c.t.finatra.kafka.domain.SeekStrategy` as

```
    private val seekStrategyFlag = flag[SeekStrategy](
      "seek.strategy.flag",
      SeekStrategy.RESUME,
      "This is the seek strategy flag"
    )
```

The flag accepts the values `resume`, `beginning`, `rewind`, and `end`.

or for an `org.apache.kafka.clients.consumer.OffsetResetStrategy`

```
  private val offsetResetStrategyFlag = flag[OffsetResetStrategy](
    "offset.reset.strategy.flag",
    OffsetResetStrategy.LATEST,
    "This is the offset reset strategy flag"
  )
```

The flag accepts the values "latest", "earliest", and "none".

JIRA Issues: DHIS-2963

Differential Revision: https://phabricator.twitter.biz/D271098
  • Loading branch information
Russell Teabeault authored and jenkins committed Feb 8, 2019
1 parent df29f6a commit ef071e5
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 0 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ Unreleased
Added
~~~~~

* finatra-kafka: Adding an implicit implementation of
`c.t.app.Flaggable[c.t.finatra.kafka.domain.SeekStrategy]`
and `c.t.app.Flaggable[org.apache.kafka.clients.consumer.OffsetResetStrategy]`.
``PHAB_ID=D271098``

* finatra-http: Added support to serve `c.t.io.Reader` as a streaming response in
`c.t.finatra.http.internal.marshalling.CallbackConverter`. ``PHAB_ID=D266863``

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.twitter.finatra.kafka.consumers

import com.twitter.app.Flaggable
import com.twitter.finatra.kafka.domain.SeekStrategy
import org.apache.kafka.clients.consumer.OffsetResetStrategy

/**
* Contains implicit Flaggable implementations for various kafka configuration types.
*/
object Flaggables {

/**
* Allows you to create a flag which will convert the flag's input String into a
* [[com.twitter.finatra.kafka.domain.SeekStrategy]]
*
* {{{
* import com.twitter.fanatra.kafka.consumers.Flaggables.seekStrategyFlaggable
*
* private val seekStrategyFlag = flag[SeekStrategy](
* "seek.strategy.flag",
* SeekStrategy.RESUME,
* "This is the seek strategy flag"
* )
* }}}
*/
implicit val seekStrategyFlaggable: Flaggable[SeekStrategy] = new Flaggable[SeekStrategy] {
override def parse(s: String): SeekStrategy = s match {
case "beginning" => SeekStrategy.BEGINNING
case "end" => SeekStrategy.END
case "resume" => SeekStrategy.RESUME
case "rewind" => SeekStrategy.REWIND
case _ => throw new IllegalArgumentException(s"$s is not a valid seek strategy.")
}
}

/**
* Allows you to create a flag which will convert the flag's input String into a
* [[org.apache.kafka.clients.consumer.OffsetResetStrategy]]
*
* {{{
* import org.apache.kafka.clients.consumer.OffsetResetStrategy
*
* private val offsetResetStrategyFlag = flag[OffsetResetStrategy](
* "offset.reset.strategy.flag",
* OffsetResetStrategy.LATEST,
* "This is the offset reset strategy flag"
* )
* }}}
*/
implicit val offsetResetStrategyFlaggable: Flaggable[OffsetResetStrategy] =
new Flaggable[OffsetResetStrategy] {
override def parse(s: String): OffsetResetStrategy = s match {
case "latest" => OffsetResetStrategy.LATEST
case "earliest" => OffsetResetStrategy.EARLIEST
case "none" => OffsetResetStrategy.NONE
case _ => throw new IllegalArgumentException(s"$s is not a valid offset reset strategy")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.twitter.finatra.kafka.consumers

import com.twitter.finatra.kafka.consumers.Flaggables.{offsetResetStrategyFlaggable, seekStrategyFlaggable}
import com.twitter.finatra.kafka.domain.SeekStrategy
import com.twitter.inject.Test
import org.apache.kafka.clients.consumer.OffsetResetStrategy

class FlaggablesTest extends Test {

test("Flaggables#seekStrategyFlaggable") {
seekStrategyFlaggable.parse("beginning") should equal(SeekStrategy.BEGINNING)
seekStrategyFlaggable.parse("resume") should equal(SeekStrategy.RESUME)
seekStrategyFlaggable.parse("rewind") should equal(SeekStrategy.REWIND)
seekStrategyFlaggable.parse("end") should equal(SeekStrategy.END)
an [IllegalArgumentException] should be thrownBy seekStrategyFlaggable.parse("unknown")
}

test("Flaggables#offsetResetStrategyFlaggable ") {
offsetResetStrategyFlaggable.parse("latest") should equal(OffsetResetStrategy.LATEST)
offsetResetStrategyFlaggable.parse("earliest") should equal(OffsetResetStrategy.EARLIEST)
offsetResetStrategyFlaggable.parse("none") should equal(OffsetResetStrategy.NONE)
an [IllegalArgumentException] should be thrownBy offsetResetStrategyFlaggable.parse("unknown")
}
}

0 comments on commit ef071e5

Please sign in to comment.