From af0646b2707b535b45ae9a0a5c0fc615b7e8960b Mon Sep 17 00:00:00 2001 From: bjhaid Date: Fri, 13 Jan 2017 16:50:35 -0600 Subject: [PATCH 1/2] Cleanup code duplicated between Server0P9P0 and Server0P8P2. --- lib/kafka_ex/server_0_p_9_p_0.ex | 154 ++----------------------------- 1 file changed, 10 insertions(+), 144 deletions(-) diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index 95b6c78a..f134a3ad 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -5,18 +5,14 @@ defmodule KafkaEx.Server0P9P0 do use KafkaEx.Server alias KafkaEx.Protocol.ConsumerMetadata alias KafkaEx.Protocol.ConsumerMetadata.Response, as: ConsumerMetadataResponse - alias KafkaEx.Protocol.Fetch - alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest alias KafkaEx.Protocol.Heartbeat alias KafkaEx.Protocol.JoinGroup alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest alias KafkaEx.Protocol.Metadata.Broker - alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse - alias KafkaEx.Protocol.OffsetFetch - alias KafkaEx.Protocol.OffsetCommit alias KafkaEx.Protocol.SyncGroup alias KafkaEx.Server.State alias KafkaEx.NetworkClient + alias KafkaEx.Server0P8P2 @consumer_group_update_interval 30_000 @@ -29,6 +25,15 @@ defmodule KafkaEx.Server0P9P0 do GenServer.start_link(__MODULE__, [args, name], [name: name]) end + defdelegate kafka_server_consumer_group(state), to: Server0P8P2 + defdelegate kafka_server_fetch(fetch_request, state), to: Server0P8P2 + defdelegate kafka_server_offset_fetch(offset_fetch, state), to: Server0P8P2 + defdelegate kafka_server_offset_commit(offset_commit_request, state), to: Server0P8P2 + + defdelegate kafka_server_consumer_group_metadata(state), to: Server0P8P2 + defdelegate kafka_server_start_streaming(fetch_request, state), to: Server0P8P2 + defdelegate kafka_server_update_consumer_metadata(state), to: Server0P8P2 + def kafka_server_init([args]) do kafka_server_init([args, self()]) end @@ -61,54 +66,6 @@ defmodule KafkaEx.Server0P9P0 do {:ok, state} end - def kafka_server_consumer_group(state) do - {:reply, state.consumer_group, state} - end - - def kafka_server_fetch(fetch_request, state) do - true = consumer_group_if_auto_commit?(fetch_request.auto_commit, state) - {response, state} = fetch(fetch_request, state) - - {:reply, response, state} - end - - def kafka_server_offset_fetch(offset_fetch, state) do - true = consumer_group?(state) - {broker, state} = broker_for_consumer_group_with_update(state) - - # if the request is for a specific consumer group, use that - # otherwise use the worker's consumer group - consumer_group = offset_fetch.consumer_group || state.consumer_group - offset_fetch = %{offset_fetch | consumer_group: consumer_group} - - offset_fetch_request = OffsetFetch.create_request(state.correlation_id, @client_id, offset_fetch) - - {response, state} = case broker do - nil -> - Logger.log(:error, "Coordinator for topic #{offset_fetch.topic} is not available") - {:topic_not_found, state} - _ -> - response = broker - |> NetworkClient.send_sync_request(offset_fetch_request, state.sync_timeout) - |> OffsetFetch.parse_response - {response, %{state | correlation_id: state.correlation_id + 1}} - end - - {:reply, response, state} - end - - def kafka_server_offset_commit(offset_commit_request, state) do - {response, state} = offset_commit(state, offset_commit_request) - - {:reply, response, state} - end - - def kafka_server_consumer_group_metadata(state) do - true = consumer_group?(state) - {consumer_metadata, state} = update_consumer_metadata(state) - {:reply, consumer_metadata, state} - end - def kafka_server_join_group(topics, session_timeout, state) do true = consumer_group?(state) {broker, state} = broker_for_consumer_group_with_update(state) @@ -146,39 +103,6 @@ defmodule KafkaEx.Server0P9P0 do {:reply, response, %{state | correlation_id: state.correlation_id + 1}} end - def kafka_server_start_streaming(_, state = %State{event_pid: nil}) do - # our streaming could have been canceled with a streaming update in-flight - {:noreply, state} - end - def kafka_server_start_streaming(fetch_request, state) do - true = consumer_group_if_auto_commit?(fetch_request.auto_commit, state) - - {response, state} = fetch(fetch_request, state) - offset = case response do - :topic_not_found -> - fetch_request.offset - _ -> - message = response |> hd |> Map.get(:partitions) |> hd - Enum.each(message.message_set, fn(message_set) -> GenEvent.notify(state.event_pid, message_set) end) - case message.last_offset do - nil -> fetch_request.offset - last_offset -> last_offset + 1 - end - end - - ref = Process.send_after( - self(), {:start_streaming, %{fetch_request | offset: offset}}, 500 - ) - - {:noreply, %{state | stream_timer: ref}} - end - - def kafka_server_update_consumer_metadata(state) do - true = consumer_group?(state) - {_, state} = update_consumer_metadata(state) - {:noreply, state} - end - defp update_consumer_metadata(state), do: update_consumer_metadata(state, @retry_count, 0) defp update_consumer_metadata(state = %State{consumer_group: consumer_group}, 0, error_code) do @@ -199,64 +123,6 @@ defmodule KafkaEx.Server0P9P0 do end end - defp updated_broker_for_topic(state, partition, topic) do - case MetadataResponse.broker_for_topic(state.metadata, state.brokers, topic, partition) do - nil -> - updated_state = update_metadata(state) - {MetadataResponse.broker_for_topic(updated_state.metadata, updated_state.brokers, topic, partition), updated_state} - broker -> {broker, state} - end - end - - defp fetch(fetch_request, state) do - true = consumer_group_if_auto_commit?(fetch_request.auto_commit, state) - fetch_data = Fetch.create_request(%FetchRequest{ - fetch_request | - client_id: @client_id, - correlation_id: state.correlation_id, - }) - {broker, state} = updated_broker_for_topic(state, fetch_request.partition, fetch_request.topic) - - case broker do - nil -> - Logger.log(:error, "Leader for topic #{fetch_request.topic} is not available") - {:topic_not_found, state} - _ -> - response = broker - |> NetworkClient.send_sync_request(fetch_data, state.sync_timeout) - |> Fetch.parse_response - state = %{state | correlation_id: state.correlation_id + 1} - last_offset = response |> hd |> Map.get(:partitions) |> hd |> Map.get(:last_offset) - if last_offset != nil && fetch_request.auto_commit do - offset_commit_request = %OffsetCommit.Request{ - topic: fetch_request.topic, - offset: last_offset, - partition: fetch_request.partition, - consumer_group: state.consumer_group} - {_, state} = offset_commit(state, offset_commit_request) - {response, state} - else - {response, state} - end - end - end - - defp offset_commit(state, offset_commit_request) do - {broker, state} = broker_for_consumer_group_with_update(state, true) - - # if the request has a specific consumer group, use that - # otherwise use the worker's consumer group - consumer_group = offset_commit_request.consumer_group || state.consumer_group - offset_commit_request = %{offset_commit_request | consumer_group: consumer_group} - - offset_commit_request_payload = OffsetCommit.create_request(state.correlation_id, @client_id, offset_commit_request) - response = broker - |> NetworkClient.send_sync_request(offset_commit_request_payload, state.sync_timeout) - |> OffsetCommit.parse_response - - {response, %{state | correlation_id: state.correlation_id + 1}} - end - defp broker_for_consumer_group(state) do ConsumerMetadataResponse.broker_for_consumer_group(state.brokers, state.consumer_metadata) end From 3dd368181436af13816464ba652c4672ef30dadd Mon Sep 17 00:00:00 2001 From: bjhaid Date: Fri, 13 Jan 2017 17:15:03 -0600 Subject: [PATCH 2/2] Add comment depicting where to find the delegated functions. --- lib/kafka_ex/server_0_p_9_p_0.ex | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index f134a3ad..8c1698cc 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -25,11 +25,13 @@ defmodule KafkaEx.Server0P9P0 do GenServer.start_link(__MODULE__, [args, name], [name: name]) end + # The functions below are all defined in KafkaEx.Server0P8P2 and their + # implementation is exactly same across both versions of kafka. + defdelegate kafka_server_consumer_group(state), to: Server0P8P2 defdelegate kafka_server_fetch(fetch_request, state), to: Server0P8P2 defdelegate kafka_server_offset_fetch(offset_fetch, state), to: Server0P8P2 defdelegate kafka_server_offset_commit(offset_commit_request, state), to: Server0P8P2 - defdelegate kafka_server_consumer_group_metadata(state), to: Server0P8P2 defdelegate kafka_server_start_streaming(fetch_request, state), to: Server0P8P2 defdelegate kafka_server_update_consumer_metadata(state), to: Server0P8P2