Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of the KafkaConsumer #24

Merged
merged 6 commits into from
Sep 26, 2022

Conversation

felixschlegel
Copy link
Contributor

@felixschlegel felixschlegel commented Sep 6, 2022

Modifications:

  • implemented the KafkaConsumer

  • KafkaConsumer has two public initializers:

    // Subscribe to topics as part of a consumer group
    KafkaConsumer(
        topics: [String],
        groupID: String,
        config: KafkaConfig = KafkaConfig(),
        logger: Logger
    ) async throws {}
    
    // Assign consumer to a particular topic partition
    KafkaConsumer(
        topic: String,
        partition: KafkaPartition,
        offset: Int64 = Int64(RD_KAFKA_OFFSET_END),
        config: KafkaConfig = KafkaConfig(),
        logger: Logger
    ) async throws {
  • incoming messages can be read using KafkaConsumer's messages AsyncSequence that has backpressure implemented

  • added the ability for manual offset management by implementing the commitSync() method for KafkaConsumer

  • implemented Integration Tests

Result:

Users can consume messages from a Kafka cluster.

@felixschlegel felixschlegel force-pushed the fs-kafka-consumer branch 5 times, most recently from 88b2742 to 8f0a618 Compare September 11, 2022 17:34
func poll(timeout: Int32 = 100) async throws -> KafkaConsumerMessage? {
// TODO: clock API
// TODO: ideally: make private
// TODO: is this state needed here?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This is a good candidate for Clock API — shall I create a ticket?
  2. Again, this function is not meant to be used from outside the KafkaConsumer — we could therefore think about omitting state here as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I don't think we need the Clock API here. There is no real value as far as I can see
  2. Yeah let's make it private and remove the state

try await self._commitSync(message)
}

// TODO: commit multiple messages at once -> different topics + test
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about refactoring this function into a function that takes an array of KafkaConsumerMessage as a parameter and commits all of them at once?

This would then always commit the highest offset for each topic.

// let errorString = errorStringBuffer.readString(length: errorStringBuffer.readableBytes)

// TODO: what to do with error string?
// TODO: handle errors here or in consumer?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle errors here or in KafkaConsumer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think handling it here and throwing is fine, but shouldn't we do it as the first thing in this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could move it up but we would have to move up the valuePointer and the valueBufferPointer as well — I'll implement it in my next commit

@felixschlegel felixschlegel marked this pull request as ready for review September 11, 2022 18:01
@felixschlegel felixschlegel changed the title Implementation of the Kafka Consumers Implementation of the KafkaConsumer Sep 11, 2022
Modifications:

* created a new .testTarget "IntegrationTests" with a dedicated
  directory
* made KafkaConsumerMessage and KafkaAcknowledgedMessage use
  KafkaPartition as type for the partition property
* implemented first consumer that enables users to subscribe to topics
  and poll the consumer
* built a KafkaSequenceConsumer on top of KafkaConsumer that enables
  users to consume messages in an AsyncSequence
* enforced that the integration-test-topic is created when in the Kafka
  docker container

Result:

Users are able to consume messages from a Kafka cluster
Modifications:

* created Tests for KafkaConsumer
* updated Docker compose Kafka server to create different topics for
  different tests to avoid overlaps
* introduced two initializers for KafkaConsumer allowing to subscribe to
  a topic as a group or assign the KafkaConsumer to a particular
  topic+partition pair
* added ability to manually commit offsets to Kafka
Modifications:

* made KafkaConsumer an actor
* implemented backpressure for KafkaConsumer
Modifications:

* made KafkaConsumer.close() method async instead of blocking
@mr-swifter
Copy link
Collaborator

Hello @felixschlegel !

I was looking at the PR have one small question, just curious.

Do you have plans to address that now some lifrdkafka's blocking functions are called from Tasks? I see, in particular:

  • rd_kafka_commit from commitSync/_commitSync
  • rd_kafka_consumer_poll from produceMore

Or it's beyond the scope? That would be very nice to make the client truly behave async.

I would be happy to discuss how that can be addressed.

@felixschlegel
Copy link
Contributor Author

felixschlegel commented Sep 19, 2022

Hello @felixschlegel !

I was looking at the PR have one small question, just curious.

Do you have plans to address that now some lifrdkafka's blocking functions are called from Tasks? I see, in particular:

  • rd_kafka_commit from commitSync/_commitSync
  • rd_kafka_consumer_poll from produceMore

Or it's beyond the scope? That would be very nice to make the client truly behave async.

I would be happy to discuss how that can be addressed.

Hey @mr-swifter,

Nice to hear that you are getting involved in developing this package!

Thanks for pointing out that the Tasks might get us into trouble. In this comment, I already addressed the desire to get rid of the Task for the produceMore method.

It is possible to make the commitSync method behave async in a non-blocking manner. However, we would only be able to know if the commit was successfully scheduled and not if the commit itself was successful.

I hope I understood your comment correctly. Please let me know what you think!

@mr-swifter
Copy link
Collaborator

Hi @felixschlegel !

Yep, it seems I missed this comment when was looking at changes. It's good if you have some plans to fix this!

Regarding commitSync I mean that it would be nice to make the call behave that await consumer.commitSync will be returned back only when commit is send and has been replied by kafka, so client can return proper result to user, but meantime the Task where commitSync happens will be properly suspended, but not sitting on blocking call, so the thread can execute other tasks meanwhile.

I do something like this at my version, maybe not perfect, but works in my case and provide desired behaviour:

func commitSync(...) async { 
    await withCheckedContinuation { continuation in
        kafkaDispatchQueue.async { // shedule executution to separate queue managed by my kafka client package outside of cooperative pool 
            let result = rd_kafka_commit(...) // or any other blocking or timeout-based blocking call like rd_kafka_consumer_poll
            /// wrap-up the result if needed 
            continuation.resume(returning: result)
        }
    }
}

...
let result = await consumer.commitSync()
/// here we know the result of commit

So, basically I use separate execution queue for scheduling blocking calls and use continuation to resume task execution when librdkafka blocking function is over.
At the moment it's single queue, but I see maybe I will need to migrate to a few queue for different purposes (like poll vs commits vs transactions etc).

@FranzBusch
Copy link
Contributor

@mr-swifter Thanks for raising this. I fully agree with you that we should not block any of the cooperative thread pools threads here since it can lead to various problems and the proper thing to do is actually back the consumer by a queue and dispatch every call onto this queue. So that we only block the thread backing the queue and just take a continuation that will be resumed once the blocking call finished.

The interesting challenge here is going to be handling cancellation; however, in the beginning we could just punt that and implement that at a later point. Having a queue is already a bigger refactor and brings us a lot of benefits.

Modifications:

* KafkaConsumer blocking operations are now executed on a serial
  `DispatchQueue`
* remove `deinit` as `close` is a blocking function and shall be invoked
  by the user anyway
@felixschlegel
Copy link
Contributor Author

Hey @mr-swifter , Hey @FranzBusch ,

thank you for your comments! I have implemented a single serial DispatchQueue that takes care of all our blocking methods. Let me know what you think!

Copy link
Contributor

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really good already, I left a few comments inline. There are a few larger points which I would like us to track in issues.

  1. We should think again if we may want to split this type into KafkaSinglePartitionConsumer and KafkaGroupConsumer
  2. librdkafka exposes a callback based API for consuming messages which ought to be faster than the poll based one. We should investigate this

// let errorString = errorStringBuffer.readString(length: errorStringBuffer.readableBytes)

// TODO: what to do with error string?
// TODO: handle errors here or in consumer?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think handling it here and throwing is fine, but shouldn't we do it as the first thing in this method?

Sources/SwiftKafka/KafkaConsumer.swift Outdated Show resolved Hide resolved
Sources/SwiftKafka/KafkaConsumer.swift Outdated Show resolved Hide resolved
Sources/SwiftKafka/KafkaConsumer.swift Outdated Show resolved Hide resolved
Comment on lines 96 to 101
private typealias Element = Result<KafkaConsumerMessage, Error> // TODO: replace with a more specific Error type
private let messagesSource: NIOAsyncSequenceProducer<
Element,
ConsumerMessagesAsyncSequence.HighLowWatermark,
ConsumerMessagesAsyncSequenceDelegate
>.Source
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we should make this a throwing sequence instead of result based. Always hard to say what happens if a consumed message produces an error if more messages can come afterward

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we postpone this topic?

func poll(timeout: Int32 = 100) async throws -> KafkaConsumerMessage? {
// TODO: clock API
// TODO: ideally: make private
// TODO: is this state needed here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I don't think we need the Clock API here. There is no real value as far as I can see
  2. Yeah let's make it private and remove the state

/// This method blocks for a maximum of `timeout` milliseconds.
/// - Parameter timeout: Maximum amount of milliseconds this method waits for a new message.
/// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages.
func poll(timeout: Int32 = 100) throws -> KafkaConsumerMessage? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a precondition in this method to check that we are on the current serial queue please

Comment on lines 428 to 432
try await withCheckedThrowingContinuation { continuation in
self.serialQueue.async {
body(continuation)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a note here that we are not supporting cancellation at the moment and that we need to fix this up. Please also create an issue for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#33

}

/// Stop consuming messages. This step is irreversible.
public func close() async throws {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we really expose this? IMO we should just do this through the didTerminate callback on the AsyncSequence.

// TODO: commit multiple messages at once -> different topics + test
// TODO: docc: https://github.com/segmentio/kafka-go#explicit-commits note about highest offset
private func _commitSync(_ message: KafkaConsumerMessage) throws {
guard self.config.value(forKey: "enable.auto.commit") == "false" else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a precondition for the queue here as well please

@mr-swifter
Copy link
Collaborator

Great stuff! Thanks @felixschlegel , @FranzBusch !

Modifications:

* move from Swift 5.6 to Swift 5.7
* add offset property to assignment consumer initializer
* remove default config object in consumer initializer
* added preconditions in KafkaConsumer that check if functions are
  called on the serial queue
* make ConsumerMessagesAsyncSequenceDelegate a struct containing
  closures
* close KafkaConsumer on NIOAsyncSequenceProducerDelegate.didTerminate
  instead of a public close function
* add test utility functions to create and delete topics
* Docker: build librdkafka from source instead of installing from apt
  because apt only provides librdkafka version 1.2.1 whereas we use
1.9.2 currently
* repalce KafkaConsumer.State with simple boolean "closed" and
  assertions
* refactor KafkaConsumerMessage
* test IntegrationTests with multiple messages for each test
@felixschlegel
Copy link
Contributor Author

Hey @FranzBusch,

I have implemented your suggested changes and created new issues for some of your comments. Please let me know what you think!

With the new topic creation method, I get this error when testing commitSync in the IntegrationTests:

%5|1663939258.150|PARTCNT|rdkafka#consumer-9| [thrd:main]: Topic 6B1E56D4-3387-4CEA-AC41-FB92471FBAFD partition count changed from 1 to 0
%4|1663939258.189|COMMITFAIL|rdkafka#consumer-9| [thrd:main]: Offset commit (unassigned partitions) failed for 1/1 partition(s) in join-state init: Broker: Unknown topic or partition: 6B1E56D4-3387-4CEA-AC41-FB92471FBAFD[0]@10(Broker: Unknown topic or partition)

However, the test still succeeds. Just wanted to let you know, maybe you have an idea what's going on here!

let messageResult: Element
do {
guard let message = try self.poll() else {
self.produceMore()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FranzBusch
Is this behaviour correct or will the NIOAsyncSequence invoke produceMore() again anyway?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for the shape that we are doing here it is okay to call self.produceMore() again. The NIOAsyncSequenceProducer will only call produceMore once when an element has been consumed and you need to make sure that at some point that leads to a new message.

Copy link
Contributor

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's get this merged. I think we made some good progress here on an initial consumer and opened a few issues to improve the API.

Comment on lines +117 to +121
let messagesSequenceDelegate = ConsumerMessagesAsyncSequenceDelegate { [weak self] in
self?.produceMore()
} didTerminateClosure: { [weak self] in
self?.close()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to find a way around weak here can we open an issue for this please. weak has some serious performance implications sadly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#37

let messageResult: Element
do {
guard let message = try self.poll() else {
self.produceMore()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for the shape that we are doing here it is okay to call self.produceMore() again. The NIOAsyncSequenceProducer will only call produceMore once when an element has been consumed and you need to make sure that at some point that leads to a new message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants