Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 51 additions & 11 deletions include/cppkafka/utils/buffered_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ class CPPKAFKA_API BufferedProducer {
void do_add_message(BuilderType&& builder, QueueKind queue_kind, FlushAction flush_action);
template <typename BuilderType>
void produce_message(BuilderType&& builder);
bool sync_produce(const MessageBuilder& builder, std::chrono::milliseconds timeout, bool throw_on_error);
Configuration prepare_configuration(Configuration config);
void on_delivery_report(const Message& message);
template <typename BuilderType>
Expand Down Expand Up @@ -787,12 +788,19 @@ void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& buil

template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder) {
sync_produce(builder, infinite_timeout);
sync_produce(builder, infinite_timeout, true);
}

template <typename BufferType, typename Allocator>
bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder,
std::chrono::milliseconds timeout) {
return sync_produce(builder, infinite_timeout, true);
}

template <typename BufferType, typename Allocator>
bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder,
std::chrono::milliseconds timeout,
bool throw_on_error) {
if (enable_message_retries_) {
//Adding a retry tracker requires copying the builder since
//we cannot modify the original instance. Cloning is a fast operation
Expand All @@ -802,12 +810,32 @@ bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder&
// produce until we succeed or we reach max retry limit
auto endTime = std::chrono::steady_clock::now() + timeout;
do {
tracker->prepare_to_retry();
produce_message(builder_clone);
//Wait w/o timeout since we must get the ack to avoid a race condition.
//Otherwise retry_again() will block as the producer won't get flushed
//and the delivery callback will never be invoked.
wait_for_current_thread_acks();
try {
tracker->prepare_to_retry();
produce_message(builder_clone);
//Wait w/o timeout since we must get the ack to avoid a race condition.
//Otherwise retry_again() will block as the producer won't get flushed
//and the delivery callback will never be invoked.
wait_for_current_thread_acks();
}
catch (const HandleException& ex) {
// If we have a flush failure callback and it returns true, we retry producing this message later
CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_);
if (!callback || callback(builder, ex.get_error())) {
if (tracker && tracker->has_retries_left()) {
tracker->decrement_retries();
continue;
}
}
++total_messages_dropped_;
// Call the flush termination callback
CallbackInvoker<FlushTerminationCallback>("flush termination", flush_termination_callback_, &producer_)
(builder, ex.get_error());
if (throw_on_error) {
throw;
}
break;
}
}
while (tracker->retry_again() &&
((timeout == infinite_timeout) ||
Expand All @@ -816,10 +844,22 @@ bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder&
}
else {
// produce once
produce_message(builder);
wait_for_current_thread_acks(timeout);
return !ack_monitor_.has_current_thread_pending_acks();
try {
produce_message(builder);
wait_for_current_thread_acks(timeout);
return !ack_monitor_.has_current_thread_pending_acks();
}
catch (const HandleException& ex) {
++total_messages_dropped_;
// Call the flush termination callback
CallbackInvoker<FlushTerminationCallback>("flush termination", flush_termination_callback_, &producer_)
(builder, ex.get_error());
if (throw_on_error) {
throw;
}
}
}
return false;
}

template <typename BufferType, typename Allocator>
Expand Down Expand Up @@ -851,7 +891,7 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
if (preserve_order) {
//When preserving order, we must ensure that each message
//gets delivered before producing the next one.
sync_produce(flush_queue.front(), timeout);
sync_produce(flush_queue.front(), timeout, false);
}
else {
//Produce as fast as possible w/o waiting. If one or more
Expand Down