Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/kafka_ex/consumer_group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
case sync_group_response do
%SyncGroupResponse{error_code: :no_error, assignments: assignments} ->
new_state = state
|> stop_consumer()
|> start_consumer(unpack_assignments(assignments))
|> start_heartbeat_timer()
{:ok, new_state}
Expand Down
95 changes: 81 additions & 14 deletions lib/kafka_ex/server_0_p_9_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ defmodule KafkaEx.Server0P9P0 do
alias KafkaEx.NetworkClient
alias KafkaEx.Server0P8P2

require Logger

@consumer_group_update_interval 30_000

def start_link(args, name \\ __MODULE__)
Expand Down Expand Up @@ -74,41 +76,94 @@ defmodule KafkaEx.Server0P9P0 do
def kafka_server_join_group(join_group_request, network_timeout, state) do
true = consumer_group?(state)
{broker, state} = broker_for_consumer_group_with_update(state)
request = JoinGroup.create_request(state.correlation_id, @client_id, join_group_request)
wire_request = JoinGroup.create_request(
state.correlation_id,
@client_id,
join_group_request
)
sync_timeout = config_sync_timeout(network_timeout)
response = broker
|> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout))
|> NetworkClient.send_sync_request(wire_request, sync_timeout)
|> JoinGroup.parse_response
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}

if response.error_code == :not_coordinator_for_consumer do
{_, updated_state} = update_consumer_metadata(state)
kafka_server_join_group(
join_group_request,
network_timeout,
updated_state
)
else
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
end
end

def kafka_server_sync_group(sync_group_request, network_timeout, state) do
true = consumer_group?(state)
{broker, state} = broker_for_consumer_group_with_update(state)
request = SyncGroup.create_request(state.correlation_id, @client_id, sync_group_request)
wire_request = SyncGroup.create_request(
state.correlation_id,
@client_id,
sync_group_request
)
sync_timeout = config_sync_timeout(network_timeout)
response = broker
|> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout))
|> NetworkClient.send_sync_request(wire_request, sync_timeout)
|> SyncGroup.parse_response
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}

if response.error_code == :not_coordinator_for_consumer do
{_, updated_state} = update_consumer_metadata(state)
kafka_server_sync_group(
sync_group_request,
network_timeout,
updated_state
)
else
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
end
end

def kafka_server_leave_group(request, network_timeout, state) do
true = consumer_group?(state)
{broker, state} = broker_for_consumer_group_with_update(state)
request = LeaveGroup.create_request(state.correlation_id, @client_id, request)
wire_request = LeaveGroup.create_request(
state.correlation_id,
@client_id,
request
)

sync_timeout = config_sync_timeout(network_timeout)
response = broker
|> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout))
|> NetworkClient.send_sync_request(wire_request, sync_timeout)
|> LeaveGroup.parse_response
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}

if response.error_code == :not_coordinator_for_consumer do
{_, updated_state} = update_consumer_metadata(state)
kafka_server_leave_group(request, network_timeout, updated_state)
else
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
end
end

def kafka_server_heartbeat(request, network_timeout, state) do
true = consumer_group?(state)
{broker, state} = broker_for_consumer_group_with_update(state)
request = Heartbeat.create_request(state.correlation_id, @client_id, request)
wire_request = Heartbeat.create_request(
state.correlation_id,
@client_id,
request
)
sync_timeout = config_sync_timeout(network_timeout)
response = broker
|> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout))
|> NetworkClient.send_sync_request(wire_request, sync_timeout)
|> Heartbeat.parse_response
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}

if response.error_code == :not_coordinator_for_consumer do
{_, updated_state} = update_consumer_metadata(state)
kafka_server_heartbeat(request, network_timeout, updated_state)
else
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
end
end

defp update_consumer_metadata(state), do: update_consumer_metadata(state, @retry_count, 0)
Expand All @@ -125,9 +180,21 @@ defmodule KafkaEx.Server0P9P0 do
|> ConsumerMetadata.parse_response

case response.error_code do
:no_error -> {response, %{state | consumer_metadata: response, correlation_id: state.correlation_id + 1}}
:no_error ->
{
response,
%{
state |
consumer_metadata: response,
correlation_id: state.correlation_id + 1
}
}
_ -> :timer.sleep(400)
update_consumer_metadata(%{state | correlation_id: state.correlation_id + 1}, retry - 1, response.error_code)
update_consumer_metadata(
%{state | correlation_id: state.correlation_id + 1},
retry - 1,
response.error_code
)
end
end

Expand Down