Skip to content

Commit

Permalink
Implement list topics (#1)
Browse files Browse the repository at this point in the history
This PR implements the listTopics method from Kafka Clients.

Co-Authored-By: Derrick Carr <derrick.the.future@gmail.com>
  • Loading branch information
newuser1992 and whyderrick committed Feb 27, 2019
1 parent 0c2df85 commit a31fb86
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 1 deletion.
13 changes: 12 additions & 1 deletion CONTRIBUTING.md
Expand Up @@ -5,7 +5,7 @@ agree to abide by the thoughtbot [code of conduct].

[code of conduct]: https://thoughtbot.com/open-source-code-of-conduct

Here are some ways *you* can contribute:
Here are some ways _you_ can contribute:

* by using alpha, beta, and prerelease versions
* by reporting bugs
Expand All @@ -17,9 +17,18 @@ Here are some ways *you* can contribute:
* by closing [issues][]
* by reviewing patches

## Development

* In order to run the library, you will need to install Kafka
* On Macs you can install it using brew: `brew install kafka`
* `brew services start kafka`
* Zookeeper is a Kafka manager that installs with Kafka. Run it with `brew services start zookeeper`
* In order to run tests, run `sbt test`. Sbt is the Scala Build Tool. On Macs you can install it via `brew install sbt`

[issues]: https://github.com/thoughtbot/fable/issues

## Submitting an Issue

We use the [GitHub issue tracker][issues] to track bugs and features. Before
submitting a bug report or feature request, check to make sure it hasn't
already been submitted. When submitting a bug report, please include a [Gist][]
Expand All @@ -30,13 +39,15 @@ Ideally, a bug report should include a pull request with failing specs.
[gist]: https://gist.github.com/

## Submitting a Pull Request

1. [Fork][fork] the [official repository][repo].
2. [Create a topic branch.][branch]
3. Implement your feature or bug fix.
4. Add, commit, and push your changes.
5. [Submit a pull request.][pr]

## Notes

* Please add tests if you changed code. Contributions without tests won't be accepted.
* Please don't update the version.

Expand Down
19 changes: 19 additions & 0 deletions src/main/scala/fable/Consumer.scala
Expand Up @@ -175,6 +175,25 @@ class Consumer[F[_]: ContextShift: Monad: Sync, K, V] private[fable] (
}.toMap
}

/**
* Get metadata about partitions for all topics that the user is authorized
* to view. This method will issue a remote call to the server.
*
* @see [https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#listTopics--]
*/
def listTopics: F[Map[Topic, Seq[Partition]]] =
for {
topics <- eval(_.listTopics())
} yield {
topics.asScala.map {
case ((topic, partitionInfos)) =>
val partitions: Seq[Partition] =
partitionInfos.asScala.toSeq.map(partitionInfo =>
Partition(Topic(topic), partitionInfo.partition))
(Topic(topic), partitions)
}.toMap
}

/**
* Perform an operation using the underlying KafkaConsumer and return the
* result suspended in F.
Expand Down
22 changes: 22 additions & 0 deletions src/test/scala/fable/ConsumerSpec.scala
Expand Up @@ -184,6 +184,28 @@ class ConsumerSpec extends AsyncFunSuite {
}).unsafeToFuture
}

test("listTopics") {
val fableTopic = Topic("fable-test")
val consumer = Consumer.resource[IO, String, String](config)

(for {
_ <- createTopic(fableTopic.name)
topicsAndPartitions <- consumer.use { instance =>
for {
_ <- instance.subscribe(fableTopic)
_ <- instance.poll
topics <- instance.listTopics
partitions <- instance.partitionsFor(fableTopic)
} yield (topics, partitions)
}
} yield {
val topic = topicsAndPartitions._1.find(t => {
t._1 == fableTopic
})
assert(topic === Some(fableTopic -> topicsAndPartitions._2.toList))
}).unsafeToFuture
}

private def createTopic(topic: String): IO[Unit] =
IO.delay {
val properties = new Properties
Expand Down

0 comments on commit a31fb86

Please sign in to comment.