diff --git a/README.md b/README.md index f6b2a149..773ccf5e 100644 --- a/README.md +++ b/README.md @@ -149,19 +149,8 @@ iex> KafkaEx.create_worker(:no_name) # indicates to the server process not to na ### Use KafkaEx with a pooling library -Note that KafkaEx has a supervisor to manage its workers. If you are using Poolboy or a similar -library, you will want to manually create a worker so that it is not supervised by `KafkaEx.Supervisor`. -To do this, you will need to call: - -```elixir -GenServer.start_link(KafkaEx.Config.server_impl, - [ - [uris: KafkaEx.Config.brokers(), - consumer_group: Application.get_env(:kafka_ex, :consumer_group)], - :no_name - ] -) -``` +Note that KafkaEx has a supervisor to manage its workers started with `KafkaEx.create_worker`. If you are using Poolboy or a similar +library, you will want to manually create a worker with `KafkaEx.start_worker` so that it is not supervised by `KafkaEx.Supervisor`. ### Retrieve kafka metadata For all metadata diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index b678bacd..117f39d1 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -82,6 +82,35 @@ defmodule KafkaEx do end end + @doc """ + start_worker start_link KafkaEx workers. Arguments are the same as `KafkaEx.create_worker`. + The only difference is that start_worker starts worker without supervisor and links it with current process + + ## Example + + ```elixir + iex> KafkaEx.stop_worker(:pr) # where :pr is the name of the worker created + {:ok, #PID<0.171.0>} + iex> KafkaEx.stop_worker(:pr, uris: [{"localhost", 9092}]) + {:ok, #PID<0.172.0>} + iex> KafkaEx.stop_worker(:pr, [uris: [{"localhost", 9092}], consumer_group: "foo"]) + {:ok, #PID<0.173.0>} + iex> KafkaEx.stop_worker(:pr, consumer_group: nil) + {:error, :invalid_consumer_group} + ``` + """ + @spec start_worker(atom, KafkaEx.worker_init()) :: + GenServer.on_start() + def start_worker(name, worker_init \\ []) do + case build_worker_options(worker_init) do + {:ok, worker_init} -> + apply(KafkaEx.Config.server_impl, :start_link, [worker_init, name]) + + {:error, error} -> + {:error, error} + end + end + @doc """ Stop a worker created with create_worker/2 diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index 6869e921..96a7c5a8 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -115,7 +115,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do worker_opts = Keyword.take(opts, [:uris]) {:ok, worker_name} = - KafkaEx.create_worker( + KafkaEx.start_worker( :no_name, [consumer_group: group_name] ++ worker_opts ) @@ -199,8 +199,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do def terminate(_reason, %State{} = state) do {:ok, _state} = leave(state) - Process.unlink(state.worker_name) - KafkaEx.stop_worker(state.worker_name) + GenServer.stop(state.worker_name) end ### Helpers diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index dcb1517d..f7e79a70 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -537,7 +537,7 @@ defmodule KafkaEx.GenConsumer do worker_opts = Keyword.take(opts, [:uris]) {:ok, worker_name} = - KafkaEx.create_worker( + KafkaEx.start_worker( :no_name, [consumer_group: group_name] ++ worker_opts ) @@ -654,8 +654,7 @@ defmodule KafkaEx.GenConsumer do def terminate(_reason, %State{} = state) do commit(state) - Process.unlink(state.worker_name) - KafkaEx.stop_worker(state.worker_name) + GenServer.stop(state.worker_name) end # Helpers