-
Notifications
You must be signed in to change notification settings - Fork 30
KafkaProducer
: refactor flushing
#94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KafkaProducer
: refactor flushing
#94
Conversation
@FranzBusch : how shall we go about vending the |
We should put it into our configuration struct. |
c90ba5a
to
b20104e
Compare
Sources/SwiftKafka/KafkaClient.swift
Outdated
/// Flush any outstanding produce requests. | ||
/// | ||
/// Parameters: | ||
/// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// |
/// Default: `10000` | ||
public var flushTimeoutMilliseconds: Int = 10000 { | ||
didSet { | ||
precondition(0...Int(Int32.max) ~= self.flushTimeoutMilliseconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we provide a message here as well when we fail the precondition please
Motivation: With our own flushing implementation we were flushing until `rd_kakfa_outq_len` reached ``0`. However `rd_kakfa_outq_len` also takes other events such as statistics into account which led to a race where we were flushing for a very long time because more and more other events were produced to the queue while we were flushing. Modifications: * `KafkaClient`: * remove `var outgoingQueueSize` * add new method `flush(timeoutMilliseconds:)` that executes the blocking `rd_kafka_flush` call on a `DispatchQueue` but vends this as an `async func` * `KafkaProducer`: * rename `KafkaProducer.StateMachine.State.flushing` to `KafkaProducer.StateMachine.State.finishing` * invoke `KafkaClient.flush` before terminating poll loop
Modifications: * add messages to `preconditions` * remove extra line in `RDKafkaClient.flush`'s parameter documentation
6d43007
to
6aa35f5
Compare
Motivation:
With our own flushing implementation we were flushing until
rd_kakfa_outq_len
reached
0
. Howeverrd_kakfa_outq_len
also takes other events suchas statistics into account which led to a race where we were flushing
for a very long time because more and more other events were produced to
the queue while we were flushing.
Modifications:
KafkaClient
:var outgoingQueueSize
flush(timeoutMilliseconds:)
that executes theblocking
rd_kafka_flush
call on aDispatchQueue
but vends thisas an
async func
KafkaProducer
:KafkaProducer.StateMachine.State.flushing
toKafkaProducer.StateMachine.State.finishing
KafkaClient.flush
before terminating poll loop