New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement list topics #1
Conversation
Co-Authored-By: Derrick Carr <derrick.the.future@gmail.com>
src/main/scala/fable/Consumer.scala
Outdated
topics.asScala.map { | ||
case ((topic, partitionInfos)) => | ||
val partitions: List[Partition] = | ||
partitionInfos.asScala.toList.map(partitionInfo => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we directly call asScala on the java list that gets returned, we get a Buffer. Given the name of the method it felt better to return a list type. Any thoughts on if a Buffer would be useful here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could also use Seq
if you don't care about the particular type of sequence, and then the calling user can call toList
if they need an operation that's only on list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, so a sequence makes sense to us because it would easy to transform it a buffer, list or anything else that extends Sequence. Should we rename it since we're not returning a list? Are we trying to match the kafkaClients API exactly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been trying to stay really close to the kafka-clients API. My thinking is that it makes it easier to use if you already know kafka-clients, and you can easily look up the underlying behavior if you need to.
topicsAndPartitions <- consumer.use { instance => | ||
for { | ||
_ <- instance.subscribe(fableTopic) | ||
_ <- instance.poll |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if we don't poll here for some reason the topic doesn't get returned in from our listTopics method. Not sure why that would be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaConsumer
is kind of weird in when it fetches data. I had the same issue with partitions, and I resolved it by using poll
. It would be nice to find another solution, but I haven't learned one yet.
src/main/scala/fable/Consumer.scala
Outdated
@@ -175,6 +175,25 @@ class Consumer[F[_]: ContextShift: Monad: Sync, K, V] private[fable] ( | |||
}.toMap | |||
} | |||
|
|||
/** | |||
* Get metadata about partitions for all topics that the user is authorized to view. This method will issue a remote call to the server. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is longer than 80 characters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did run scalafmt, does that not run on comments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it doesn't wrap comments for some reason, unfortunately.
src/main/scala/fable/Consumer.scala
Outdated
* | ||
* @see [https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#listTopics--] | ||
*/ | ||
def listTopics(): F[Map[Topic, List[Partition]]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on dropping the empty parenthesis?
partitionInfos.asScala.toList.map(partitionInfo => | ||
Partition(Topic(topic), partitionInfo.partition)) | ||
(Topic(topic), partitions) | ||
}.toMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because Partition
already has the Topic
, does it make sense for this to be Seq[Partition]
rather than Map[Topic, List[Partition]
? What is the advantage of the Map
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might, we're not entirely clear on the use case for this(we picked it up because it looked relatively straight ahead). That being said returning the partitions makes it feel as though the method should be listPartitions. Perhaps that would be a more useful method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given your above comment about matching the API and given how the existing KafkaClient works I would be tempted to keep the existing return type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I definitely misread the type at first. I think what you're doing makes total sense, because it maps to the KafkaConsumer
method.
Co-Authored-By: Derrick Carr <derrick.the.future@gmail.com>
Co-Authored-By: Derrick Carr <derrick.the.future@gmail.com>
@jferris I think we addressed all your comments & we added to the contribution doc |
This PR implements the listTopics method from Kafka Clients.