Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 2 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,8 @@ iex> KafkaEx.create_worker(:no_name) # indicates to the server process not to na

### Use KafkaEx with a pooling library

Note that KafkaEx has a supervisor to manage its workers. If you are using Poolboy or a similar
library, you will want to manually create a worker so that it is not supervised by `KafkaEx.Supervisor`.
To do this, you will need to call:

```elixir
GenServer.start_link(KafkaEx.Config.server_impl,
[
[uris: KafkaEx.Config.brokers(),
consumer_group: Application.get_env(:kafka_ex, :consumer_group)],
:no_name
]
)
```
Note that KafkaEx has a supervisor to manage its workers started with `KafkaEx.create_worker`. If you are using Poolboy or a similar
library, you will want to manually create a worker with `KafkaEx.start_worker` so that it is not supervised by `KafkaEx.Supervisor`.

### Retrieve kafka metadata
For all metadata
Expand Down
29 changes: 29 additions & 0 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,35 @@ defmodule KafkaEx do
end
end

@doc """
start_worker start_link KafkaEx workers. Arguments are the same as `KafkaEx.create_worker`.
The only difference is that start_worker starts worker without supervisor and links it with current process

## Example

```elixir
iex> KafkaEx.stop_worker(:pr) # where :pr is the name of the worker created
{:ok, #PID<0.171.0>}
iex> KafkaEx.stop_worker(:pr, uris: [{"localhost", 9092}])
{:ok, #PID<0.172.0>}
iex> KafkaEx.stop_worker(:pr, [uris: [{"localhost", 9092}], consumer_group: "foo"])
{:ok, #PID<0.173.0>}
iex> KafkaEx.stop_worker(:pr, consumer_group: nil)
{:error, :invalid_consumer_group}
```
"""
@spec start_worker(atom, KafkaEx.worker_init()) ::
GenServer.on_start()
def start_worker(name, worker_init \\ []) do
case build_worker_options(worker_init) do
{:ok, worker_init} ->
apply(KafkaEx.Config.server_impl, :start_link, [worker_init, name])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this. Even without even discussing whether we want to break up the supervision tree like this, I think it is going to cause a lot of confusion when people report these sorts of supervision tree errors, and we have to determine which worker creation function is being used, and deal with supervision trees behaving differently based on that.

I'm certainly open to changing the supervision tree, but we should probably chain it over entirely, and we should also do something to verify that this fixes the issue that you saw.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this is a very big change. I don't really love the way our supervision tree works, but this would break a lot of people's code.


{:error, error} ->
{:error, error}
end
end

@doc """
Stop a worker created with create_worker/2

Expand Down
5 changes: 2 additions & 3 deletions lib/kafka_ex/consumer_group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
worker_opts = Keyword.take(opts, [:uris])

{:ok, worker_name} =
KafkaEx.create_worker(
KafkaEx.start_worker(
:no_name,
[consumer_group: group_name] ++ worker_opts
)
Expand Down Expand Up @@ -199,8 +199,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do

def terminate(_reason, %State{} = state) do
{:ok, _state} = leave(state)
Process.unlink(state.worker_name)
KafkaEx.stop_worker(state.worker_name)
GenServer.stop(state.worker_name)
end

### Helpers
Expand Down
5 changes: 2 additions & 3 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ defmodule KafkaEx.GenConsumer do
worker_opts = Keyword.take(opts, [:uris])

{:ok, worker_name} =
KafkaEx.create_worker(
KafkaEx.start_worker(
:no_name,
[consumer_group: group_name] ++ worker_opts
)
Expand Down Expand Up @@ -654,8 +654,7 @@ defmodule KafkaEx.GenConsumer do

def terminate(_reason, %State{} = state) do
commit(state)
Process.unlink(state.worker_name)
KafkaEx.stop_worker(state.worker_name)
GenServer.stop(state.worker_name)
end

# Helpers
Expand Down