From 9d2a5030f0e97b4b58d0da4429352df1342e6e22 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Wed, 20 Jun 2018 10:19:02 -0400 Subject: [PATCH] Invoke error callback if present instead of log callback --- include/cppkafka/configuration.h | 11 +++++++---- src/consumer.cpp | 15 +++++++++------ 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index 5c7d56d1..bddc0d2e 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -62,19 +62,22 @@ class KafkaHandleBase; class CPPKAFKA_API Configuration : public ConfigurationBase { public: using DeliveryReportCallback = std::function; - using OffsetCommitCallback = std::function; - using ErrorCallback = std::function; using ThrottleCallback = std::function; - using LogCallback = std::function; using StatsCallback = std::function; - using SocketCallback = std::function; + using SocketCallback = std::function; using ConfigurationBase::set; using ConfigurationBase::get; diff --git a/src/consumer.cpp b/src/consumer.cpp index 2243400c..9a308c65 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -79,16 +79,19 @@ Consumer::~Consumer() { rebalance_error_callback_ = nullptr; close(); } - catch (const Exception& ex) { - const char* library_name = "cppkafka"; + catch (const HandleException& ex) { ostringstream error_msg; error_msg << "Failed to close consumer [" << get_name() << "]: " << ex.what(); - CallbackInvoker logger("log", get_configuration().get_log_callback(), nullptr); - if (logger) { - logger(*this, static_cast(LogLevel::LOG_ERR), library_name, error_msg.str()); + CallbackInvoker error_cb("error", get_configuration().get_error_callback(), this); + CallbackInvoker logger_cb("log", get_configuration().get_log_callback(), nullptr); + if (error_cb) { + error_cb(*this, static_cast(ex.get_error().get_error()), error_msg.str()); + } + else if (logger_cb) { + logger_cb(*this, static_cast(LogLevel::LOG_ERR), "cppkafka", error_msg.str()); } else { - rd_kafka_log_print(get_handle(), static_cast(LogLevel::LOG_ERR), library_name, error_msg.str().c_str()); + rd_kafka_log_print(get_handle(), static_cast(LogLevel::LOG_ERR), "cppkafka", error_msg.str().c_str()); } } }