Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 65 additions & 69 deletions lib/kafka_ex/server_0_p_9_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,96 +73,92 @@ defmodule KafkaEx.Server0P9P0 do
{:ok, state}
end

def kafka_server_join_group(join_group_request, network_timeout, state) do
true = consumer_group?(state)
{broker, state} = broker_for_consumer_group_with_update(state)
wire_request = JoinGroup.create_request(
state.correlation_id,
@client_id,
join_group_request
def kafka_server_join_group(request, network_timeout, state_in) do
true = consumer_group?(state_in)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like we could drop this true = consumer_group?(state_in) in each of these functions and just pattern match in consumer_group_sync_request/4? This would also raise a match error, rather than giving a decent failure, which we could do in the shared function.
e.g.

def consumer_group_sync_request(_req, _mod, _timeout, %State{consumer_group: :no_consumer_group}) do
  raise "You are trying to consumer group but you aren't consumer grouping"
end

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. At first I had this in the shared function, but I wanted it to error out in the parent so that you would see that in the stacktrace. Making it raise an error and decorating the error with info from the request struct is even better 👍


{response, state_out} = consumer_group_sync_request(
request,
JoinGroup,
network_timeout,
state_in
)
sync_timeout = config_sync_timeout(network_timeout)
response = broker
|> NetworkClient.send_sync_request(wire_request, sync_timeout)
|> JoinGroup.parse_response

if response.error_code == :not_coordinator_for_consumer do
{_, updated_state} = update_consumer_metadata(state)
kafka_server_join_group(
join_group_request,
network_timeout,
updated_state
)
else
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
end
{:reply, response, state_out}
end

def kafka_server_sync_group(sync_group_request, network_timeout, state) do
true = consumer_group?(state)
{broker, state} = broker_for_consumer_group_with_update(state)
wire_request = SyncGroup.create_request(
state.correlation_id,
@client_id,
sync_group_request
def kafka_server_sync_group(request, network_timeout, state_in) do
true = consumer_group?(state_in)

{response, state_out} = consumer_group_sync_request(
request,
SyncGroup,
network_timeout,
state_in
)
sync_timeout = config_sync_timeout(network_timeout)
response = broker
|> NetworkClient.send_sync_request(wire_request, sync_timeout)
|> SyncGroup.parse_response

if response.error_code == :not_coordinator_for_consumer do
{_, updated_state} = update_consumer_metadata(state)
kafka_server_sync_group(
sync_group_request,
network_timeout,
updated_state
)
else
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
end
{:reply, response, state_out}
end

def kafka_server_leave_group(request, network_timeout, state) do
true = consumer_group?(state)
{broker, state} = broker_for_consumer_group_with_update(state)
wire_request = LeaveGroup.create_request(
state.correlation_id,
@client_id,
request
def kafka_server_leave_group(request, network_timeout, state_in) do
true = consumer_group?(state_in)

{response, state_out} = consumer_group_sync_request(
request,
LeaveGroup,
network_timeout,
state_in
)

sync_timeout = config_sync_timeout(network_timeout)
response = broker
|> NetworkClient.send_sync_request(wire_request, sync_timeout)
|> LeaveGroup.parse_response
{:reply, response, state_out}
end

if response.error_code == :not_coordinator_for_consumer do
{_, updated_state} = update_consumer_metadata(state)
kafka_server_leave_group(request, network_timeout, updated_state)
else
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
end
def kafka_server_heartbeat(request, network_timeout, state_in) do
true = consumer_group?(state_in)

{response, state_out} = consumer_group_sync_request(
request,
Heartbeat,
network_timeout,
state_in
)

{:reply, response, state_out}
end

def kafka_server_heartbeat(request, network_timeout, state) do
true = consumer_group?(state)
defp consumer_group_sync_request(
request,
protocol_module,
network_timeout,
state
) do
{broker, state} = broker_for_consumer_group_with_update(state)
wire_request = Heartbeat.create_request(

sync_timeout = config_sync_timeout(network_timeout)

wire_request = protocol_module.create_request(
state.correlation_id,
@client_id,
request
)
sync_timeout = config_sync_timeout(network_timeout)
response = broker
|> NetworkClient.send_sync_request(wire_request, sync_timeout)
|> Heartbeat.parse_response
wire_response = NetworkClient.send_sync_request(
broker,
wire_request,
sync_timeout
)
response = protocol_module.parse_response(wire_response)

state_out = %{state | correlation_id: state.correlation_id + 1}

if response.error_code == :not_coordinator_for_consumer do
{_, updated_state} = update_consumer_metadata(state)
kafka_server_heartbeat(request, network_timeout, updated_state)
{_, updated_state_out} = update_consumer_metadata(state_out)
consumer_group_sync_request(
request,
protocol_module,
network_timeout,
updated_state_out
)
else
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
{response, state_out}
end
end

Expand Down