From a31fb8628cc96024a80feaf63e1d95946aabdf72 Mon Sep 17 00:00:00 2001 From: rmathew1992 Date: Wed, 27 Feb 2019 14:12:31 -0800 Subject: [PATCH] Implement list topics (#1) This PR implements the listTopics method from Kafka Clients. Co-Authored-By: Derrick Carr --- CONTRIBUTING.md | 13 ++++++++++++- src/main/scala/fable/Consumer.scala | 19 +++++++++++++++++++ src/test/scala/fable/ConsumerSpec.scala | 22 ++++++++++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 563e635..72e6e98 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -5,7 +5,7 @@ agree to abide by the thoughtbot [code of conduct]. [code of conduct]: https://thoughtbot.com/open-source-code-of-conduct -Here are some ways *you* can contribute: +Here are some ways _you_ can contribute: * by using alpha, beta, and prerelease versions * by reporting bugs @@ -17,9 +17,18 @@ Here are some ways *you* can contribute: * by closing [issues][] * by reviewing patches +## Development + +* In order to run the library, you will need to install Kafka + * On Macs you can install it using brew: `brew install kafka` + * `brew services start kafka` + * Zookeeper is a Kafka manager that installs with Kafka. Run it with `brew services start zookeeper` +* In order to run tests, run `sbt test`. Sbt is the Scala Build Tool. On Macs you can install it via `brew install sbt` + [issues]: https://github.com/thoughtbot/fable/issues ## Submitting an Issue + We use the [GitHub issue tracker][issues] to track bugs and features. Before submitting a bug report or feature request, check to make sure it hasn't already been submitted. When submitting a bug report, please include a [Gist][] @@ -30,6 +39,7 @@ Ideally, a bug report should include a pull request with failing specs. [gist]: https://gist.github.com/ ## Submitting a Pull Request + 1. [Fork][fork] the [official repository][repo]. 2. [Create a topic branch.][branch] 3. Implement your feature or bug fix. @@ -37,6 +47,7 @@ Ideally, a bug report should include a pull request with failing specs. 5. [Submit a pull request.][pr] ## Notes + * Please add tests if you changed code. Contributions without tests won't be accepted. * Please don't update the version. diff --git a/src/main/scala/fable/Consumer.scala b/src/main/scala/fable/Consumer.scala index 63f76d5..cd1a968 100644 --- a/src/main/scala/fable/Consumer.scala +++ b/src/main/scala/fable/Consumer.scala @@ -175,6 +175,25 @@ class Consumer[F[_]: ContextShift: Monad: Sync, K, V] private[fable] ( }.toMap } + /** + * Get metadata about partitions for all topics that the user is authorized + * to view. This method will issue a remote call to the server. + * + * @see [https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#listTopics--] + */ + def listTopics: F[Map[Topic, Seq[Partition]]] = + for { + topics <- eval(_.listTopics()) + } yield { + topics.asScala.map { + case ((topic, partitionInfos)) => + val partitions: Seq[Partition] = + partitionInfos.asScala.toSeq.map(partitionInfo => + Partition(Topic(topic), partitionInfo.partition)) + (Topic(topic), partitions) + }.toMap + } + /** * Perform an operation using the underlying KafkaConsumer and return the * result suspended in F. diff --git a/src/test/scala/fable/ConsumerSpec.scala b/src/test/scala/fable/ConsumerSpec.scala index 52d8c49..d60df03 100644 --- a/src/test/scala/fable/ConsumerSpec.scala +++ b/src/test/scala/fable/ConsumerSpec.scala @@ -184,6 +184,28 @@ class ConsumerSpec extends AsyncFunSuite { }).unsafeToFuture } + test("listTopics") { + val fableTopic = Topic("fable-test") + val consumer = Consumer.resource[IO, String, String](config) + + (for { + _ <- createTopic(fableTopic.name) + topicsAndPartitions <- consumer.use { instance => + for { + _ <- instance.subscribe(fableTopic) + _ <- instance.poll + topics <- instance.listTopics + partitions <- instance.partitionsFor(fableTopic) + } yield (topics, partitions) + } + } yield { + val topic = topicsAndPartitions._1.find(t => { + t._1 == fableTopic + }) + assert(topic === Some(fableTopic -> topicsAndPartitions._2.toList)) + }).unsafeToFuture + } + private def createTopic(topic: String): IO[Unit] = IO.delay { val properties = new Properties