From 6d08a4c9e7b70e08e759382425eb7be4ad0e672a Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Thu, 16 Nov 2017 18:38:05 -0500 Subject: [PATCH 1/2] More gracefully handle errors on leaving consumer group This seems to be related to #247 though it may not be the root cause. We were seeing the leave group fail with an `unknown_member_id`, which happens when there is a problem between the client and the broker. This would cause the manager to exit abruptly and may be causing the rest of the consumer group's processes to exit uncleanly. Regardless of the error returned, we want to exit the consumer group cleanly. --- lib/kafka_ex/consumer_group/manager.ex | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index 691d9604..5f47a9bf 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -339,10 +339,17 @@ defmodule KafkaEx.ConsumerGroup.Manager do member_id: member_id, } - %LeaveGroupResponse{error_code: :no_error} = + leave_group_response = KafkaEx.leave_group(leave_request, worker_name: worker_name) - Logger.debug(fn -> "Left consumer group #{group_name}" end) + if leave_group_response.error_code == :no_error do + Logger.debug(fn -> "Left consumer group #{group_name}" end) + else + Logger.warn(fn -> + "Received error #{inspect leave_group_response.error_code}, " + "consumer group manager will exit regardless." + end) + end :ok end From 966175afeb740803c0bff63dd7fa240d7a5808a0 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Thu, 16 Nov 2017 18:49:32 -0500 Subject: [PATCH 2/2] Fix string concatenation --- lib/kafka_ex/consumer_group/manager.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index 5f47a9bf..bc45237f 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -346,7 +346,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do Logger.debug(fn -> "Left consumer group #{group_name}" end) else Logger.warn(fn -> - "Received error #{inspect leave_group_response.error_code}, " + "Received error #{inspect leave_group_response.error_code}, " <> "consumer group manager will exit regardless." end) end