Skip to content

Commit

Permalink
Remove prefixes
Browse files Browse the repository at this point in the history
These can be accomplished entirely in configuration.
  • Loading branch information
jferris committed Jan 28, 2019
1 parent bcb7a1b commit aef576e
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 52 deletions.
7 changes: 1 addition & 6 deletions src/main/scala/fable/Config.scala
Expand Up @@ -48,21 +48,16 @@ object Config {
* General configuration options for [[Kafka]] instances.
*
* @constructor
* @param prefix optional prefix to apply to topic names and consumer group
* IDs.
* @param uris bootstrap URIs used to connect to a Kafka cluster.
*/
case class Kafka(prefix: Option[String], uris: URIList) {
case class Kafka(uris: URIList) {
private[fable] def properties: Properties = {
val result = new Properties()
result.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
uris.bootstrapServers)
result.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, uris.scheme)
result
}

def prefix[A](name: String)(f: String => A): A =
f(s"${prefix.getOrElse("")}$name")
}

object Kafka {
Expand Down
14 changes: 0 additions & 14 deletions src/main/scala/fable/Kafka.scala
Expand Up @@ -38,20 +38,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
*/
class Kafka[F[_]: ContextShift: Monad: Sync](config: Config.Kafka) {

/**
* Construct a [[Topic]] from the given name. Applies the prefix from
* configuration if one is present.
*/
def topic(name: String): Topic =
config.prefix(name)(Topic.apply)

/**
* Construct a [[GroupId]] from the given name. Applies the prefix from
* configuration if one is present.
*/
def groupId(name: String): GroupId =
config.prefix(name)(GroupId.apply)

/**
* Allocate a [[Consumer]] as a [[cats.effect.Resource]]. The consumer will
* be closed when the resource is released.
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/logback.xml
Expand Up @@ -2,7 +2,7 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%level] [%logger{0}] %msg%n</pattern>
<pattern>[%level] [%logger] %msg%n</pattern>
</encoder>
</appender>

Expand Down
12 changes: 6 additions & 6 deletions src/test/scala/fable/ConsumerSpec.scala
Expand Up @@ -14,7 +14,7 @@ import scala.concurrent.ExecutionContext

class ConsumerSpec extends AsyncFunSuite {
test("poll") {
val topic = kafka.topic("fable-test-example")
val topic = Topic("fable-test-example")
val consumer = kafka.consumer[String, String](consumerConfig)

(for {
Expand All @@ -35,7 +35,7 @@ class ConsumerSpec extends AsyncFunSuite {
}

test("commit") {
val topic = kafka.topic("fable-test-example")
val topic = Topic("fable-test-example")
val consumer = Eval.always {
kafka.consumer[String, String](consumerConfig)
}
Expand Down Expand Up @@ -66,7 +66,7 @@ class ConsumerSpec extends AsyncFunSuite {
}

test("records") {
val topic = kafka.topic("fable-test-example")
val topic = Topic("fable-test-example")
val consumer =
kafka.consumer[String, String](consumerConfig.copy(maxPollRecords = 2))

Expand All @@ -90,7 +90,7 @@ class ConsumerSpec extends AsyncFunSuite {
}

test("partitionsFor") {
val topic = kafka.topic("fable-test-example")
val topic = Topic("fable-test-example")
val consumer = kafka.consumer[String, String](consumerConfig)

(for {
Expand All @@ -102,8 +102,8 @@ class ConsumerSpec extends AsyncFunSuite {
}

test("assign") {
val first = kafka.topic("fable-test-one")
val second = kafka.topic("fable-test-two")
val first = Topic("fable-test-one")
val second = Topic("fable-test-two")
val consumer = kafka.consumer[String, String](consumerConfig)

(for {
Expand Down
25 changes: 0 additions & 25 deletions src/test/scala/fable/KafkaSpec.scala

This file was deleted.

0 comments on commit aef576e

Please sign in to comment.