Skip to content

Commit

Permalink
Merge pull request #355 from kafkaex/use_kafka_0p11_for_tests
Browse files Browse the repository at this point in the history
Use kafka 0.11 for tests
  • Loading branch information
joshuawscott committed Jul 8, 2019
2 parents f85e67c + 9dc38b4 commit 08e91ca
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 70 deletions.
12 changes: 6 additions & 6 deletions docker-compose.yml
Expand Up @@ -5,32 +5,32 @@ services:
ports:
- "2181:2181"
kafka1:
image: wurstmeister/kafka:0.10.1.0
image: wurstmeister/kafka:0.11.0.1
ports:
- "9092:9092"
depends_on:
- zookeeper
volumes:
- ./kafka1/server.properties.in:/opt/kafka_2.11-0.10.1.0/config/server.properties.in
- ./kafka1/server.properties.in:/opt/kafka/config/server.properties.in
- ./scripts/docker-start-kafka.sh:/usr/bin/start-kafka.sh
- ./ssl:/ssl
kafka2:
image: wurstmeister/kafka:0.10.1.0
image: wurstmeister/kafka:0.11.0.1
ports:
- "9093:9093"
depends_on:
- zookeeper
volumes:
- ./kafka2/server.properties.in:/opt/kafka_2.11-0.10.1.0/config/server.properties.in
- ./kafka2/server.properties.in:/opt/kafka/config/server.properties.in
- ./scripts/docker-start-kafka.sh:/usr/bin/start-kafka.sh
- ./ssl:/ssl
kafka3:
image: wurstmeister/kafka:0.10.1.0
image: wurstmeister/kafka:0.11.0.1
ports:
- "9094:9094"
depends_on:
- zookeeper
volumes:
- ./kafka3/server.properties.in:/opt/kafka_2.11-0.10.1.0/config/server.properties.in
- ./kafka3/server.properties.in:/opt/kafka/config/server.properties.in
- ./scripts/docker-start-kafka.sh:/usr/bin/start-kafka.sh
- ./ssl:/ssl
57 changes: 31 additions & 26 deletions lib/kafka_ex/consumer_group.ex
Expand Up @@ -86,6 +86,8 @@ defmodule KafkaEx.ConsumerGroup do
changes as consumers start/stop. Default: 5000 (5 seconds).
* `:session_timeout` - Consumer group session timeout in milliseconds.
Default: 30000 (30 seconds). See below.
* `:session_timeout_padding` - Timeout padding for consumer group options.
Default: 10000 (10 seconds). See below.
* Any of `t:KafkaEx.GenConsumer.option/0`,
which will be passed on to consumers
* `:gen_server_opts` - `t:GenServer.options/0` passed on to the manager
Expand All @@ -102,11 +104,14 @@ defmodule KafkaEx.ConsumerGroup do
for `group.min.session.timeout.ms` and `group.max.session.timeout.ms` (6000
and 30000 by default). See
[https://kafka.apache.org/documentation/#configuration](https://kafka.apache.org/documentation/#configuration).
You may need to adjust `session_timeout_padding` on high-latency clusters to
avoid timing out when joining/syncing consumer groups.
"""
@type option ::
KafkaEx.GenConsumer.option()
| {:heartbeat_interval, pos_integer}
| {:session_timeout, pos_integer}
| {:session_timeout_padding, pos_integer}
| {:partition_assignment_callback, PartitionAssignment.callback()}
| {:gen_server_opts, GenServer.options()}
| {:name, Supervisor.name()}
Expand Down Expand Up @@ -177,9 +182,9 @@ defmodule KafkaEx.ConsumerGroup do
The generation id is provided by the broker on sync. Returns `nil` if
queried before the initial sync has completed.
"""
@spec generation_id(Supervisor.supervisor()) :: integer | nil
def generation_id(supervisor_pid) do
call_manager(supervisor_pid, :generation_id)
@spec generation_id(Supervisor.supervisor(), timeout) :: integer | nil
def generation_id(supervisor_pid, timeout \\ 5000) do
call_manager(supervisor_pid, :generation_id, timeout)
end

@doc """
Expand All @@ -188,9 +193,9 @@ defmodule KafkaEx.ConsumerGroup do
The id is assigned by the broker. Returns `nil` if queried before the
initial sync has completed.
"""
@spec member_id(Supervisor.supervisor()) :: binary | nil
def member_id(supervisor_pid) do
call_manager(supervisor_pid, :member_id)
@spec member_id(Supervisor.supervisor(), timeout) :: binary | nil
def member_id(supervisor_pid, timeout \\ 5000) do
call_manager(supervisor_pid, :member_id, timeout)
end

@doc """
Expand All @@ -199,9 +204,9 @@ defmodule KafkaEx.ConsumerGroup do
This is provided by the broker on sync. Returns `nil` if queried before the
initial sync has completed
"""
@spec leader_id(Supervisor.supervisor()) :: binary | nil
def leader_id(supervisor_pid) do
call_manager(supervisor_pid, :leader_id)
@spec leader_id(Supervisor.supervisor(), timeout) :: binary | nil
def leader_id(supervisor_pid, timeout \\ 5000) do
call_manager(supervisor_pid, :leader_id, timeout)
end

@doc """
Expand All @@ -210,9 +215,9 @@ defmodule KafkaEx.ConsumerGroup do
Leaders are elected by the broker and are responsible for assigning
partitions. Returns false if queried before the intiial sync has completed.
"""
@spec leader?(Supervisor.supervisor()) :: boolean
def leader?(supervisor_pid) do
call_manager(supervisor_pid, :am_leader)
@spec leader?(Supervisor.supervisor(), timeout) :: boolean
def leader?(supervisor_pid, timeout \\ 5000) do
call_manager(supervisor_pid, :am_leader, timeout)
end

@doc """
Expand All @@ -221,11 +226,11 @@ defmodule KafkaEx.ConsumerGroup do
These are assigned by the leader and communicated by the broker on sync.
"""
@spec assignments(Supervisor.supervisor()) :: [
@spec assignments(Supervisor.supervisor(), timeout) :: [
{topic :: binary, partition_id :: non_neg_integer}
]
def assignments(supervisor_pid) do
call_manager(supervisor_pid, :assignments)
def assignments(supervisor_pid, timeout \\ 5000) do
call_manager(supervisor_pid, :assignments, timeout)
end

@doc """
Expand All @@ -234,9 +239,9 @@ defmodule KafkaEx.ConsumerGroup do
Returns `nil` if called before the initial sync.
"""
@spec consumer_supervisor_pid(Supervisor.supervisor()) :: nil | pid
def consumer_supervisor_pid(supervisor_pid) do
call_manager(supervisor_pid, :consumer_supervisor_pid)
@spec consumer_supervisor_pid(Supervisor.supervisor(), timeout) :: nil | pid
def consumer_supervisor_pid(supervisor_pid, timeout \\ 5000) do
call_manager(supervisor_pid, :consumer_supervisor_pid, timeout)
end

@doc """
Expand All @@ -252,9 +257,9 @@ defmodule KafkaEx.ConsumerGroup do
@doc """
Returns the name of the consumer group
"""
@spec group_name(Supervisor.supervisor()) :: binary
def group_name(supervisor_pid) do
call_manager(supervisor_pid, :group_name)
@spec group_name(Supervisor.supervisor(), timeout) :: binary
def group_name(supervisor_pid, timeout \\ 5000) do
call_manager(supervisor_pid, :group_name, timeout)
end

@doc """
Expand All @@ -275,9 +280,9 @@ defmodule KafkaEx.ConsumerGroup do
@doc """
Returns true if at least one child consumer process is alive
"""
@spec active?(Supervisor.supervisor()) :: boolean
def active?(supervisor_pid) do
consumer_supervisor = consumer_supervisor_pid(supervisor_pid)
@spec active?(Supervisor.supervisor(), timeout) :: boolean
def active?(supervisor_pid, timeout \\ 5000) do
consumer_supervisor = consumer_supervisor_pid(supervisor_pid, timeout)

if consumer_supervisor && Process.alive?(consumer_supervisor) do
GenConsumer.Supervisor.active?(consumer_supervisor)
Expand Down Expand Up @@ -354,9 +359,9 @@ defmodule KafkaEx.ConsumerGroup do
supervise(children, strategy: :one_for_all, max_restarts: 0, max_seconds: 1)
end

defp call_manager(supervisor_pid, call) do
defp call_manager(supervisor_pid, call, timeout) do
supervisor_pid
|> get_manager_pid
|> GenServer.call(call)
|> GenServer.call(call, timeout)
end
end
34 changes: 26 additions & 8 deletions lib/kafka_ex/consumer_group/manager.ex
Expand Up @@ -23,6 +23,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
:worker_name,
:heartbeat_interval,
:session_timeout,
:session_timeout_padding,
:gen_consumer_module,
:consumer_module,
:consumer_opts,
Expand All @@ -43,7 +44,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do

@heartbeat_interval 5_000
@session_timeout 30_000
@session_timeout_padding 5_000
@session_timeout_padding 10_000

@type assignments :: [{binary(), integer()}]

Expand All @@ -56,8 +57,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
binary,
[binary],
KafkaEx.GenConsumer.options()
) ::
GenServer.on_start()
) :: GenServer.on_start()
def start_link(
{gen_consumer_module, consumer_module},
group_name,
Expand Down Expand Up @@ -92,6 +92,17 @@ defmodule KafkaEx.ConsumerGroup.Manager do
Application.get_env(:kafka_ex, :session_timeout, @session_timeout)
)

session_timeout_padding =
Keyword.get(
opts,
:session_timeout_padding,
Application.get_env(
:kafka_ex,
:session_timeout_padding,
@session_timeout_padding
)
)

partition_assignment_callback =
Keyword.get(
opts,
Expand Down Expand Up @@ -125,6 +136,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
worker_name: worker_name,
heartbeat_interval: heartbeat_interval,
session_timeout: session_timeout,
session_timeout_padding: session_timeout_padding,
consumer_module: consumer_module,
gen_consumer_module: gen_consumer_module,
partition_assignment_callback: partition_assignment_callback,
Expand Down Expand Up @@ -238,6 +250,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
%State{
worker_name: worker_name,
session_timeout: session_timeout,
session_timeout_padding: session_timeout_padding,
group_name: group_name,
topics: topics,
member_id: member_id
Expand All @@ -254,16 +267,19 @@ defmodule KafkaEx.ConsumerGroup.Manager do
KafkaEx.join_group(
join_request,
worker_name: worker_name,
timeout: session_timeout + @session_timeout_padding
timeout: session_timeout + session_timeout_padding
)

# crash the worker if we recieve an error, but do it with a meaningful
# error message
case join_response do
%{error_code: :no_error} -> :ok
%{error_code: :no_error} ->
:ok

%{error_code: error_code} ->
raise "Error joining consumer group #{group_name}: " <>
"#{inspect(error_code)}"

{:error, reason} ->
raise "Error joining consumer group #{group_name}: " <>
"#{inspect(reason)}"
Expand Down Expand Up @@ -308,7 +324,8 @@ defmodule KafkaEx.ConsumerGroup.Manager do
member_id: member_id,
generation_id: generation_id,
worker_name: worker_name,
session_timeout: session_timeout
session_timeout: session_timeout,
session_timeout_padding: session_timeout_padding
} = state,
assignments
) do
Expand All @@ -323,7 +340,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
KafkaEx.sync_group(
sync_request,
worker_name: worker_name,
timeout: session_timeout + @session_timeout_padding
timeout: session_timeout + session_timeout_padding
)

case error_code do
Expand Down Expand Up @@ -351,7 +368,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do
member_id: member_id
} = state
) do

leave_request = %LeaveGroupRequest{
group_name: group_name,
member_id: member_id
Expand All @@ -363,11 +379,13 @@ defmodule KafkaEx.ConsumerGroup.Manager do
case leave_group_response do
%{error_code: :no_error} ->
Logger.debug(fn -> "Left consumer group #{group_name}" end)

%{error_code: error_code} ->
Logger.warn(fn ->
"Received error #{inspect(error_code)}, " <>
"consumer group manager will exit regardless."
end)

{:error, reason} ->
Logger.warn(fn ->
"Received error #{inspect(reason)}, " <>
Expand Down
10 changes: 6 additions & 4 deletions test/integration/consumer_group_implementation_test.exs
Expand Up @@ -140,7 +140,8 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do
@consumer_group_name,
[@topic_name],
heartbeat_interval: 100,
partition_assignment_callback: &TestPartitioner.assign_partitions/2
partition_assignment_callback: &TestPartitioner.assign_partitions/2,
session_timeout_padding: 30000
)

{:ok, consumer_group_pid2} =
Expand All @@ -149,13 +150,14 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do
@consumer_group_name,
[@topic_name],
heartbeat_interval: 100,
partition_assignment_callback: &TestPartitioner.assign_partitions/2
partition_assignment_callback: &TestPartitioner.assign_partitions/2,
session_timeout_padding: 30000
)

# wait for both consumer groups to join
wait_for(fn ->
ConsumerGroup.active?(consumer_group_pid1) &&
ConsumerGroup.active?(consumer_group_pid2)
ConsumerGroup.active?(consumer_group_pid1, 30000) &&
ConsumerGroup.active?(consumer_group_pid2, 30000)
end)

on_exit(fn ->
Expand Down
3 changes: 0 additions & 3 deletions test/integration/integration_test.exs
Expand Up @@ -569,7 +569,6 @@ defmodule KafkaEx.Integration.Test do
topic_name,
0,
worker_name: :stream,
max_bytes: 50,
offset: 0,
auto_commit: false
)
Expand Down Expand Up @@ -607,7 +606,6 @@ defmodule KafkaEx.Integration.Test do
topic_name,
0,
worker_name: :stream,
max_bytes: 50,
offset: 0,
auto_commit: false
)
Expand Down Expand Up @@ -660,7 +658,6 @@ defmodule KafkaEx.Integration.Test do
topic_name,
0,
worker_name: :stream,
max_bytes: 50,
offset: 0,
auto_commit: false,
no_wait_at_logend: true
Expand Down

0 comments on commit 08e91ca

Please sign in to comment.