Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 12 additions & 144 deletions lib/kafka_ex/server_0_p_9_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,14 @@ defmodule KafkaEx.Server0P9P0 do
use KafkaEx.Server
alias KafkaEx.Protocol.ConsumerMetadata
alias KafkaEx.Protocol.ConsumerMetadata.Response, as: ConsumerMetadataResponse
alias KafkaEx.Protocol.Fetch
alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest
alias KafkaEx.Protocol.Heartbeat
alias KafkaEx.Protocol.JoinGroup
alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest
alias KafkaEx.Protocol.Metadata.Broker
alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse
alias KafkaEx.Protocol.OffsetFetch
alias KafkaEx.Protocol.OffsetCommit
alias KafkaEx.Protocol.SyncGroup
alias KafkaEx.Server.State
alias KafkaEx.NetworkClient
alias KafkaEx.Server0P8P2

@consumer_group_update_interval 30_000

Expand All @@ -29,6 +25,17 @@ defmodule KafkaEx.Server0P9P0 do
GenServer.start_link(__MODULE__, [args, name], [name: name])
end

# The functions below are all defined in KafkaEx.Server0P8P2 and their
# implementation is exactly same across both versions of kafka.

defdelegate kafka_server_consumer_group(state), to: Server0P8P2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be worth adding a comment here to the effect that the implementation for these is the same in 0.9.0 as it is in 0.8.2?

defdelegate kafka_server_fetch(fetch_request, state), to: Server0P8P2
defdelegate kafka_server_offset_fetch(offset_fetch, state), to: Server0P8P2
defdelegate kafka_server_offset_commit(offset_commit_request, state), to: Server0P8P2
defdelegate kafka_server_consumer_group_metadata(state), to: Server0P8P2
defdelegate kafka_server_start_streaming(fetch_request, state), to: Server0P8P2
defdelegate kafka_server_update_consumer_metadata(state), to: Server0P8P2

def kafka_server_init([args]) do
kafka_server_init([args, self()])
end
Expand Down Expand Up @@ -61,54 +68,6 @@ defmodule KafkaEx.Server0P9P0 do
{:ok, state}
end

def kafka_server_consumer_group(state) do
{:reply, state.consumer_group, state}
end

def kafka_server_fetch(fetch_request, state) do
true = consumer_group_if_auto_commit?(fetch_request.auto_commit, state)
{response, state} = fetch(fetch_request, state)

{:reply, response, state}
end

def kafka_server_offset_fetch(offset_fetch, state) do
true = consumer_group?(state)
{broker, state} = broker_for_consumer_group_with_update(state)

# if the request is for a specific consumer group, use that
# otherwise use the worker's consumer group
consumer_group = offset_fetch.consumer_group || state.consumer_group
offset_fetch = %{offset_fetch | consumer_group: consumer_group}

offset_fetch_request = OffsetFetch.create_request(state.correlation_id, @client_id, offset_fetch)

{response, state} = case broker do
nil ->
Logger.log(:error, "Coordinator for topic #{offset_fetch.topic} is not available")
{:topic_not_found, state}
_ ->
response = broker
|> NetworkClient.send_sync_request(offset_fetch_request, state.sync_timeout)
|> OffsetFetch.parse_response
{response, %{state | correlation_id: state.correlation_id + 1}}
end

{:reply, response, state}
end

def kafka_server_offset_commit(offset_commit_request, state) do
{response, state} = offset_commit(state, offset_commit_request)

{:reply, response, state}
end

def kafka_server_consumer_group_metadata(state) do
true = consumer_group?(state)
{consumer_metadata, state} = update_consumer_metadata(state)
{:reply, consumer_metadata, state}
end

def kafka_server_join_group(topics, session_timeout, state) do
true = consumer_group?(state)
{broker, state} = broker_for_consumer_group_with_update(state)
Expand Down Expand Up @@ -146,39 +105,6 @@ defmodule KafkaEx.Server0P9P0 do
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
end

def kafka_server_start_streaming(_, state = %State{event_pid: nil}) do
# our streaming could have been canceled with a streaming update in-flight
{:noreply, state}
end
def kafka_server_start_streaming(fetch_request, state) do
true = consumer_group_if_auto_commit?(fetch_request.auto_commit, state)

{response, state} = fetch(fetch_request, state)
offset = case response do
:topic_not_found ->
fetch_request.offset
_ ->
message = response |> hd |> Map.get(:partitions) |> hd
Enum.each(message.message_set, fn(message_set) -> GenEvent.notify(state.event_pid, message_set) end)
case message.last_offset do
nil -> fetch_request.offset
last_offset -> last_offset + 1
end
end

ref = Process.send_after(
self(), {:start_streaming, %{fetch_request | offset: offset}}, 500
)

{:noreply, %{state | stream_timer: ref}}
end

def kafka_server_update_consumer_metadata(state) do
true = consumer_group?(state)
{_, state} = update_consumer_metadata(state)
{:noreply, state}
end

defp update_consumer_metadata(state), do: update_consumer_metadata(state, @retry_count, 0)

defp update_consumer_metadata(state = %State{consumer_group: consumer_group}, 0, error_code) do
Expand All @@ -199,64 +125,6 @@ defmodule KafkaEx.Server0P9P0 do
end
end

defp updated_broker_for_topic(state, partition, topic) do
case MetadataResponse.broker_for_topic(state.metadata, state.brokers, topic, partition) do
nil ->
updated_state = update_metadata(state)
{MetadataResponse.broker_for_topic(updated_state.metadata, updated_state.brokers, topic, partition), updated_state}
broker -> {broker, state}
end
end

defp fetch(fetch_request, state) do
true = consumer_group_if_auto_commit?(fetch_request.auto_commit, state)
fetch_data = Fetch.create_request(%FetchRequest{
fetch_request |
client_id: @client_id,
correlation_id: state.correlation_id,
})
{broker, state} = updated_broker_for_topic(state, fetch_request.partition, fetch_request.topic)

case broker do
nil ->
Logger.log(:error, "Leader for topic #{fetch_request.topic} is not available")
{:topic_not_found, state}
_ ->
response = broker
|> NetworkClient.send_sync_request(fetch_data, state.sync_timeout)
|> Fetch.parse_response
state = %{state | correlation_id: state.correlation_id + 1}
last_offset = response |> hd |> Map.get(:partitions) |> hd |> Map.get(:last_offset)
if last_offset != nil && fetch_request.auto_commit do
offset_commit_request = %OffsetCommit.Request{
topic: fetch_request.topic,
offset: last_offset,
partition: fetch_request.partition,
consumer_group: state.consumer_group}
{_, state} = offset_commit(state, offset_commit_request)
{response, state}
else
{response, state}
end
end
end

defp offset_commit(state, offset_commit_request) do
{broker, state} = broker_for_consumer_group_with_update(state, true)

# if the request has a specific consumer group, use that
# otherwise use the worker's consumer group
consumer_group = offset_commit_request.consumer_group || state.consumer_group
offset_commit_request = %{offset_commit_request | consumer_group: consumer_group}

offset_commit_request_payload = OffsetCommit.create_request(state.correlation_id, @client_id, offset_commit_request)
response = broker
|> NetworkClient.send_sync_request(offset_commit_request_payload, state.sync_timeout)
|> OffsetCommit.parse_response

{response, %{state | correlation_id: state.correlation_id + 1}}
end

defp broker_for_consumer_group(state) do
ConsumerMetadataResponse.broker_for_consumer_group(state.brokers, state.consumer_metadata)
end
Expand Down