Skip to content

KafkaConsumer: expose event sequence #97

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

felixschlegel
Copy link
Contributor

Info: this PR sits on top of #96

Motivation:

Like in KafkaProducer (#96), we want to expose an asynchronous
sequence that is able to emit all sorts of events in the future (e.g.
rebalance events for the KafkaConsumer.

Modifications:

  • add a new type KafkaConsumerEvent (currently empty)
  • make KafkaConsumer.init private
  • add factory methods KafkaConsumer.createConsumer and
    KafkaConsumer.createConsumerWithEvents
  • create a new AsyncSequence: KafkaConsumerEvents
  • update README
  • update tests

@felixschlegel felixschlegel force-pushed the fs-kafka-consumer-event-sequence branch from c23cea8 to e7ca788 Compare July 25, 2023 10:29
@felixschlegel felixschlegel marked this pull request as draft July 25, 2023 13:28
@felixschlegel felixschlegel force-pushed the fs-kafka-consumer-event-sequence branch 2 times, most recently from 73fb645 to 8ac4610 Compare July 26, 2023 10:21
@felixschlegel felixschlegel marked this pull request as ready for review July 26, 2023 10:22
Copy link

@gjcairo gjcairo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just a small comment

Comment on lines +140 to +143
/// - config: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``.
/// - logger: A logger.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client and stateMachine are missing

// MARK: - KafkaConsumerEvents

/// `AsyncSequence` implementation for handling ``KafkaConsumerEvent``s emitted by Kafka.
public struct KafkaConsumerEvents: AsyncSequence {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this Sendable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, also applied this to KafkaProducerEvents

README.md Outdated
@@ -88,7 +88,7 @@ var config = KafkaConsumerConfiguration(
)
config.bootstrapServers = [broker]

let consumer = try KafkaConsumer(
let consumer = try KafkaConsumer.makeConsumer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I still think we should make the init() public so that in the case where users are not interested in event they can just call KafkaProducer() and KafkaConsumer().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, also applied this to KafkaProducer.makeProducer

@felixschlegel felixschlegel requested a review from FranzBusch July 26, 2023 15:48
@felixschlegel felixschlegel force-pushed the fs-kafka-consumer-event-sequence branch from d62516a to e53ac31 Compare July 27, 2023 18:11
Motivation:

Like in `KafkaProducer` (swift-server#96), we want to expose an asynchronous
sequence that is able to emit all sorts of events in the future (e.g.
rebalance events for the `KafkaConsumer`.

Modifications:

* add a new type `KafkaConsumerEvent` (currently empty)
* make `KafkaConsumer.init` private
* add factory methods `KafkaConsumer.createConsumer` and
  `KafkaConsumer.createConsumerWithEvents`
* create a new `AsyncSequence`: `KafkaConsumerEvents`
* update README
* update tests
Modifications:

* rename to `KafkaConsumerEvents.AsyncIterator`
* rename to `KafkaConsumerMessages.AsyncIterator`
Modifications:

* make `KafkaConsumerEvents` `Sendable`
* make `KafkaProducerEvents` `Sendable`
* replace `KafkaConsumer.makeConsumer` with `convenience init`
* replace `KafkaProducer.makeProducer` with `convenience init`
@felixschlegel felixschlegel force-pushed the fs-kafka-consumer-event-sequence branch from e53ac31 to bb53e8c Compare July 28, 2023 09:12
@felixschlegel felixschlegel requested a review from FranzBusch July 28, 2023 09:13
@FranzBusch FranzBusch enabled auto-merge (squash) July 28, 2023 09:15
@FranzBusch FranzBusch merged commit 071f51d into swift-server:main Jul 28, 2023
logger: logger
)

let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I fully understand an idea here: it seems like nothing is supposed to be supplied in this source...
Shouldn't be sourceAndSequence.source assigned anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the primary intention of this PR was to expose the KafkaConsumerEvents itself without serving any events at the moment.

Following onto that PR events like rebalance etc. can be easily added into that existing KafkaConsumerEvents sequence.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants