From 1e7abb6c3f10b32692144d624129981cb623b992 Mon Sep 17 00:00:00 2001 From: baicheng Date: Tue, 26 Sep 2017 11:19:52 +0200 Subject: [PATCH 1/2] avoid parse nil response exception --- lib/kafka_ex/server_0_p_8_p_0.ex | 11 +++++++---- lib/kafka_ex/server_0_p_8_p_2.ex | 33 +++++++++++++++++--------------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/lib/kafka_ex/server_0_p_8_p_0.ex b/lib/kafka_ex/server_0_p_8_p_0.ex index b6745793..3fa1c8ee 100644 --- a/lib/kafka_ex/server_0_p_8_p_0.ex +++ b/lib/kafka_ex/server_0_p_8_p_0.ex @@ -85,10 +85,13 @@ defmodule KafkaEx.Server0P8P0 do Logger.log(:error, "Leader for topic #{fetch_request.topic} is not available") {:topic_not_found, state} _ -> - response = broker - |> NetworkClient.send_sync_request(fetch_data, config_sync_timeout()) - |> Fetch.parse_response - {response, %{state | correlation_id: state.correlation_id + 1}} + response = broker |> NetworkClient.send_sync_request(fetch_data, config_sync_timeout()) + case response do + nil -> {response, state} + _ -> + response = Fetch.parse_response(response) + {response, %{state | correlation_id: state.correlation_id + 1}} + end end end end diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index 9fdfc302..af158cfa 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -165,21 +165,24 @@ defmodule KafkaEx.Server0P8P2 do Logger.log(:error, "Leader for topic #{fetch_request.topic} is not available") {:topic_not_found, state} _ -> - response = broker - |> NetworkClient.send_sync_request(fetch_data, config_sync_timeout()) - |> Fetch.parse_response - state = %{state | correlation_id: state.correlation_id + 1} - last_offset = response |> hd |> Map.get(:partitions) |> hd |> Map.get(:last_offset) - if last_offset != nil && fetch_request.auto_commit do - offset_commit_request = %OffsetCommit.Request{ - topic: fetch_request.topic, - offset: last_offset, - partition: fetch_request.partition, - consumer_group: state.consumer_group} - {_, state} = offset_commit(state, offset_commit_request) - {response, state} - else - {response, state} + response = NetworkClient.send_sync_request(broker, fetch_data, config_sync_timeout()) + case response do + nil -> {response, state} + _ -> + response = Fetch.parse_response(response) + state = %{state | correlation_id: state.correlation_id + 1} + last_offset = response |> hd |> Map.get(:partitions) |> hd |> Map.get(:last_offset) + if last_offset != nil && fetch_request.auto_commit do + offset_commit_request = %OffsetCommit.Request{ + topic: fetch_request.topic, + offset: last_offset, + partition: fetch_request.partition, + consumer_group: state.consumer_group} + {_, state} = offset_commit(state, offset_commit_request) + {response, state} + else + {response, state} + end end end end From 82a17f67b3cfe1a9d9b5497445e06a280e3f831c Mon Sep 17 00:00:00 2001 From: baicheng Date: Tue, 26 Sep 2017 11:19:52 +0200 Subject: [PATCH 2/2] avoid parsing nil response exception --- lib/kafka_ex/server_0_p_8_p_0.ex | 11 +++++++---- lib/kafka_ex/server_0_p_8_p_2.ex | 33 +++++++++++++++++--------------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/lib/kafka_ex/server_0_p_8_p_0.ex b/lib/kafka_ex/server_0_p_8_p_0.ex index b6745793..7cae4ecf 100644 --- a/lib/kafka_ex/server_0_p_8_p_0.ex +++ b/lib/kafka_ex/server_0_p_8_p_0.ex @@ -85,10 +85,13 @@ defmodule KafkaEx.Server0P8P0 do Logger.log(:error, "Leader for topic #{fetch_request.topic} is not available") {:topic_not_found, state} _ -> - response = broker - |> NetworkClient.send_sync_request(fetch_data, config_sync_timeout()) - |> Fetch.parse_response - {response, %{state | correlation_id: state.correlation_id + 1}} + response = NetworkClient.send_sync_request(broker, fetch_data, config_sync_timeout()) + case response do + nil -> {response, state} + _ -> + response = Fetch.parse_response(response) + {response, %{state | correlation_id: state.correlation_id + 1}} + end end end end diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index 9fdfc302..af158cfa 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -165,21 +165,24 @@ defmodule KafkaEx.Server0P8P2 do Logger.log(:error, "Leader for topic #{fetch_request.topic} is not available") {:topic_not_found, state} _ -> - response = broker - |> NetworkClient.send_sync_request(fetch_data, config_sync_timeout()) - |> Fetch.parse_response - state = %{state | correlation_id: state.correlation_id + 1} - last_offset = response |> hd |> Map.get(:partitions) |> hd |> Map.get(:last_offset) - if last_offset != nil && fetch_request.auto_commit do - offset_commit_request = %OffsetCommit.Request{ - topic: fetch_request.topic, - offset: last_offset, - partition: fetch_request.partition, - consumer_group: state.consumer_group} - {_, state} = offset_commit(state, offset_commit_request) - {response, state} - else - {response, state} + response = NetworkClient.send_sync_request(broker, fetch_data, config_sync_timeout()) + case response do + nil -> {response, state} + _ -> + response = Fetch.parse_response(response) + state = %{state | correlation_id: state.correlation_id + 1} + last_offset = response |> hd |> Map.get(:partitions) |> hd |> Map.get(:last_offset) + if last_offset != nil && fetch_request.auto_commit do + offset_commit_request = %OffsetCommit.Request{ + topic: fetch_request.topic, + offset: last_offset, + partition: fetch_request.partition, + consumer_group: state.consumer_group} + {_, state} = offset_commit(state, offset_commit_request) + {response, state} + else + {response, state} + end end end end