diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 051200be..688eef45 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -73,6 +73,17 @@ defmodule KafkaEx do end end + @doc """ + Stop a worker created with create_worker/2 + + Returns `:ok` on success or `:error` if `worker` is not a valid worker + """ + @spec stop_worker(atom | pid) :: :ok | + {:error, :not_found} | {:error, :simple_one_for_one} + def stop_worker(worker) do + KafkaEx.Supervisor.stop_child(worker) + end + @doc """ Returns the name of the consumer group for the given worker. diff --git a/lib/kafka_ex/consumer_group.ex b/lib/kafka_ex/consumer_group.ex index 4be18363..f679b329 100644 --- a/lib/kafka_ex/consumer_group.ex +++ b/lib/kafka_ex/consumer_group.ex @@ -258,6 +258,24 @@ defmodule KafkaEx.ConsumerGroup do end end + @doc """ + Returns the pid of the `KafkaEx.ConsumerGroup.Manager` process for the + given consumer group supervisor. + + Intended for introspection usage only. + """ + @spec get_manager_pid(Supervisor.supervisor) :: pid + def get_manager_pid(supervisor_pid) do + {_, pid, _, _} = Enum.find( + Supervisor.which_children(supervisor_pid), + fn + ({KafkaEx.ConsumerGroup.Manager, _, _, _}) -> true + ({_, _, _, _}) -> false + end + ) + pid + end + @doc false # used by ConsumerGroup.Manager to set partition assignments def start_consumer(pid, consumer_module, group_name, assignments, opts) do child = supervisor( @@ -302,15 +320,4 @@ defmodule KafkaEx.ConsumerGroup do |> get_manager_pid |> GenServer.call(call) end - - defp get_manager_pid(supervisor_pid) do - {_, pid, _, _} = Enum.find( - Supervisor.which_children(supervisor_pid), - fn - ({KafkaEx.ConsumerGroup.Manager, _, _, _}) -> true - ({_, _, _, _}) -> false - end - ) - pid - end end diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index bc45237f..58cdc743 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -12,7 +12,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest alias KafkaEx.Protocol.Heartbeat.Response, as: HeartbeatResponse alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest - alias KafkaEx.Protocol.LeaveGroup.Response, as: LeaveGroupResponse alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest alias KafkaEx.Protocol.SyncGroup.Response, as: SyncGroupResponse @@ -169,6 +168,8 @@ defmodule KafkaEx.ConsumerGroup.Manager do def terminate(_reason, %State{generation_id: nil, member_id: nil}), do: :ok def terminate(_reason, %State{} = state) do :ok = leave(state) + Process.unlink(state.worker_name) + KafkaEx.stop_worker(state.worker_name) end ### Helpers diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index 261111ca..02fcfb6d 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -454,6 +454,8 @@ defmodule KafkaEx.GenConsumer do def terminate(_reason, %State{} = state) do commit(state) + Process.unlink(state.worker_name) + KafkaEx.stop_worker(state.worker_name) end # Helpers diff --git a/lib/kafka_ex/supervisor.ex b/lib/kafka_ex/supervisor.ex index c1cdd5d3..57687f36 100644 --- a/lib/kafka_ex/supervisor.ex +++ b/lib/kafka_ex/supervisor.ex @@ -1,12 +1,17 @@ defmodule KafkaEx.Supervisor do - use Supervisor @moduledoc false + use Supervisor + def start_link(server, max_restarts, max_seconds) do {:ok, pid} = Supervisor.start_link(__MODULE__, [server, max_restarts, max_seconds], [name: __MODULE__]) {:ok, pid} end + def stop_child(child) do + Supervisor.terminate_child(__MODULE__, child) + end + def init([server, max_restarts, max_seconds]) do children = [ worker(server, []) diff --git a/test/integration/consumer_group_implementation_test.exs b/test/integration/consumer_group_implementation_test.exs index df02cf48..dcfcd5e4 100644 --- a/test/integration/consumer_group_implementation_test.exs +++ b/test/integration/consumer_group_implementation_test.exs @@ -98,7 +98,15 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do end) end + def num_open_ports() do + :erlang.ports + |> Enum.map(&(:erlang.port_info(&1, :name))) + |> Enum.filter(&(&1 == {:name, 'tcp_inet'})) + |> length + end + setup do + ports_before = num_open_ports() {:ok, _} = TestPartitioner.start_link {:ok, consumer_group_pid1} = ConsumerGroup.start_link( TestConsumer, @@ -129,11 +137,14 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do { :ok, consumer_group_pid1: consumer_group_pid1, - consumer_group_pid2: consumer_group_pid2 + consumer_group_pid2: consumer_group_pid2, + ports_before: ports_before } end test "basic startup, consume, and shutdown test", context do + assert num_open_ports() > context[:ports_before] + assert TestPartitioner.calls > 0 generation_id1 = ConsumerGroup.generation_id(context[:consumer_group_pid1]) @@ -238,9 +249,14 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do ending_offset == last_offset + 1 end) end + + # ports should be released + assert context[:ports_before] == num_open_ports() end test "starting/stopping consumers rebalances assignments", context do + assert num_open_ports() > context[:ports_before] + Process.unlink(context[:consumer_group_pid1]) sync_stop(context[:consumer_group_pid1]) @@ -279,5 +295,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do Process.unlink(consumer_group_pid3) sync_stop(consumer_group_pid3) + + assert context[:ports_before] == num_open_ports() end end