diff --git a/README.md b/README.md index 1c575ac1..a1885ebf 100644 --- a/README.md +++ b/README.md @@ -164,6 +164,12 @@ GenServer.start_link(KafkaEx.Config.server_impl, ) ``` +Alternatively, you can call + +``` +KafkaEx.start_link_worker(:no_name) +``` + ### Retrieve kafka metadata For all metadata diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index d6a5cec0..9095bce9 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -550,6 +550,28 @@ defmodule KafkaEx do } end + @doc """ + Start and link a worker outside of a supervision tree + + This takes the same arguments as `create_worker/2` except that it adds + + - `server_impl` - This is the GenServer that will be used for the + client genserver implementation - e.g., `KafkaEx.Server0P8P0`, + `KafkaEx.Server0P10AndLater`, `KafkaEx.New.Client`. Defaults to the value + of `KafkaEx.Config.server_impl/0` which is determined by the `kafka_version` + setting. + """ + @spec start_link_worker(atom, [ + KafkaEx.worker_setting() | {:server_impl, module} + ]) :: GenServer.on_start() + def start_link_worker(name, worker_init \\ []) do + {server_impl, worker_init} = + Keyword.pop(worker_init, :server_impl, Config.server_impl()) + + {:ok, full_worker_init} = build_worker_options(worker_init) + server_impl.start_link(full_worker_init, name) + end + @doc """ Builds options to be used with workers diff --git a/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs b/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs index 9040d91f..5a696433 100644 --- a/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs +++ b/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs @@ -13,13 +13,11 @@ defmodule KafkaEx.KayrockCompatibility0p10AndLaterTest do @moduletag :new_client @num_partitions 10 - alias KafkaEx.New.Client alias KafkaEx.New.KafkaExAPI setup do - {:ok, args} = KafkaEx.build_worker_options([]) - - {:ok, pid} = Client.start_link(args, :no_name) + {:ok, pid} = + KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end diff --git a/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs b/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs index c96e985e..8525d670 100644 --- a/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs +++ b/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs @@ -14,12 +14,9 @@ defmodule KafkaEx.KayrockCompatibility0p8p0Test do @topic "test0p8p0" - alias KafkaEx.New.Client - setup do - {:ok, args} = KafkaEx.build_worker_options([]) - - {:ok, pid} = Client.start_link(args, :no_name) + {:ok, pid} = + KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end diff --git a/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs b/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs index b4d468a6..45d73625 100644 --- a/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs +++ b/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs @@ -17,12 +17,9 @@ defmodule KafkaEx.KayrockCompatibility0p9p0Test do alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest - alias KafkaEx.New.Client - setup do - {:ok, args} = KafkaEx.build_worker_options([]) - - {:ok, pid} = Client.start_link(args, :no_name) + {:ok, pid} = + KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end diff --git a/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs b/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs index b55f2090..9ee2e8b2 100644 --- a/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs +++ b/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs @@ -16,7 +16,6 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do alias KafkaEx.ConsumerGroup alias KafkaEx.GenConsumer - alias KafkaEx.New.Client alias KafkaEx.Protocol.OffsetFetch # note this topic is created by docker_up.sh @@ -165,9 +164,8 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do setup do {:ok, _} = TestPartitioner.start_link() - {:ok, client_args} = KafkaEx.build_worker_options([]) - - {:ok, client_pid} = Client.start_link(client_args, :no_name) + {:ok, client_pid} = + KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) # the client will die on its own, so don't count that ports_before = num_open_ports() diff --git a/test/integration/kayrock/compatibility_consumer_group_test.exs b/test/integration/kayrock/compatibility_consumer_group_test.exs index 0ce80cfc..90a175f1 100644 --- a/test/integration/kayrock/compatibility_consumer_group_test.exs +++ b/test/integration/kayrock/compatibility_consumer_group_test.exs @@ -14,13 +14,11 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do alias KafkaEx.Protocol, as: Proto - alias KafkaEx.New.Client alias KafkaEx.New.KafkaExAPI setup do - {:ok, args} = KafkaEx.build_worker_options([]) - - {:ok, pid} = Client.start_link(args, :no_name) + {:ok, pid} = + KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end diff --git a/test/integration/kayrock/compatibility_test.exs b/test/integration/kayrock/compatibility_test.exs index bccdc975..aaa430bd 100644 --- a/test/integration/kayrock/compatibility_test.exs +++ b/test/integration/kayrock/compatibility_test.exs @@ -20,9 +20,8 @@ defmodule KafkaEx.KayrockCompatibilityTest do alias KafkaEx.New.KafkaExAPI setup do - {:ok, args} = KafkaEx.build_worker_options([]) - - {:ok, pid} = Client.start_link(args, :no_name) + {:ok, pid} = + KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end diff --git a/test/integration/kayrock/offset_test.exs b/test/integration/kayrock/offset_test.exs index f5181adc..bfebd40e 100644 --- a/test/integration/kayrock/offset_test.exs +++ b/test/integration/kayrock/offset_test.exs @@ -5,16 +5,14 @@ defmodule KafkaEx.KayrockOffsetTest do use ExUnit.Case - alias KafkaEx.New.Client alias KafkaEx.Protocol.OffsetCommit alias KafkaEx.Protocol.OffsetFetch @moduletag :new_client setup do - {:ok, args} = KafkaEx.build_worker_options([]) - - {:ok, pid} = Client.start_link(args, :no_name) + {:ok, pid} = + KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end diff --git a/test/integration/kayrock/record_batch_test.exs b/test/integration/kayrock/record_batch_test.exs index 9f5398d0..5ab15bcc 100644 --- a/test/integration/kayrock/record_batch_test.exs +++ b/test/integration/kayrock/record_batch_test.exs @@ -5,14 +5,11 @@ defmodule KafkaEx.KayrockRecordBatchTest do use ExUnit.Case - alias KafkaEx.New.Client - @moduletag :new_client setup do - {:ok, args} = KafkaEx.build_worker_options([]) - - {:ok, pid} = Client.start_link(args, :no_name) + {:ok, pid} = + KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end diff --git a/test/integration/kayrock/timestamp_test.exs b/test/integration/kayrock/timestamp_test.exs index c343b23a..24037d30 100644 --- a/test/integration/kayrock/timestamp_test.exs +++ b/test/integration/kayrock/timestamp_test.exs @@ -5,7 +5,6 @@ defmodule KafkaEx.KayrockTimestampTest do use ExUnit.Case - alias KafkaEx.New.Client alias KafkaEx.TimestampNotSupportedError require Logger @@ -13,9 +12,8 @@ defmodule KafkaEx.KayrockTimestampTest do @moduletag :new_client setup do - {:ok, args} = KafkaEx.build_worker_options([]) - - {:ok, pid} = Client.start_link(args, :no_name) + {:ok, pid} = + KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) {:ok, %{client: pid}} end