Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ GenServer.start_link(KafkaEx.Config.server_impl,
)
```

Alternatively, you can call

```
KafkaEx.start_link_worker(:no_name)
```

### Retrieve kafka metadata
For all metadata

Expand Down
22 changes: 22 additions & 0 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,28 @@ defmodule KafkaEx do
}
end

@doc """
Start and link a worker outside of a supervision tree

This takes the same arguments as `create_worker/2` except that it adds

- `server_impl` - This is the GenServer that will be used for the
client genserver implementation - e.g., `KafkaEx.Server0P8P0`,
`KafkaEx.Server0P10AndLater`, `KafkaEx.New.Client`. Defaults to the value
of `KafkaEx.Config.server_impl/0` which is determined by the `kafka_version`
setting.
"""
@spec start_link_worker(atom, [
KafkaEx.worker_setting() | {:server_impl, module}
]) :: GenServer.on_start()
def start_link_worker(name, worker_init \\ []) do
{server_impl, worker_init} =
Keyword.pop(worker_init, :server_impl, Config.server_impl())

{:ok, full_worker_init} = build_worker_options(worker_init)
server_impl.start_link(full_worker_init, name)
end

@doc """
Builds options to be used with workers

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@ defmodule KafkaEx.KayrockCompatibility0p10AndLaterTest do
@moduletag :new_client
@num_partitions 10

alias KafkaEx.New.Client
alias KafkaEx.New.KafkaExAPI

setup do
{:ok, args} = KafkaEx.build_worker_options([])

{:ok, pid} = Client.start_link(args, :no_name)
{:ok, pid} =
KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client)

{:ok, %{client: pid}}
end
Expand Down
7 changes: 2 additions & 5 deletions test/integration/kayrock/compatibility_0_p_8_p_0_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@ defmodule KafkaEx.KayrockCompatibility0p8p0Test do

@topic "test0p8p0"

alias KafkaEx.New.Client

setup do
{:ok, args} = KafkaEx.build_worker_options([])

{:ok, pid} = Client.start_link(args, :no_name)
{:ok, pid} =
KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client)

{:ok, %{client: pid}}
end
Expand Down
7 changes: 2 additions & 5 deletions test/integration/kayrock/compatibility_0_p_9_p_0_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ defmodule KafkaEx.KayrockCompatibility0p9p0Test do
alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest
alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest

alias KafkaEx.New.Client

setup do
{:ok, args} = KafkaEx.build_worker_options([])

{:ok, pid} = Client.start_link(args, :no_name)
{:ok, pid} =
KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client)

{:ok, %{client: pid}}
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do

alias KafkaEx.ConsumerGroup
alias KafkaEx.GenConsumer
alias KafkaEx.New.Client
alias KafkaEx.Protocol.OffsetFetch

# note this topic is created by docker_up.sh
Expand Down Expand Up @@ -165,9 +164,8 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
setup do
{:ok, _} = TestPartitioner.start_link()

{:ok, client_args} = KafkaEx.build_worker_options([])

{:ok, client_pid} = Client.start_link(client_args, :no_name)
{:ok, client_pid} =
KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client)

# the client will die on its own, so don't count that
ports_before = num_open_ports()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do

alias KafkaEx.Protocol, as: Proto

alias KafkaEx.New.Client
alias KafkaEx.New.KafkaExAPI

setup do
{:ok, args} = KafkaEx.build_worker_options([])

{:ok, pid} = Client.start_link(args, :no_name)
{:ok, pid} =
KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client)

{:ok, %{client: pid}}
end
Expand Down
5 changes: 2 additions & 3 deletions test/integration/kayrock/compatibility_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ defmodule KafkaEx.KayrockCompatibilityTest do
alias KafkaEx.New.KafkaExAPI

setup do
{:ok, args} = KafkaEx.build_worker_options([])

{:ok, pid} = Client.start_link(args, :no_name)
{:ok, pid} =
KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client)

{:ok, %{client: pid}}
end
Expand Down
6 changes: 2 additions & 4 deletions test/integration/kayrock/offset_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@ defmodule KafkaEx.KayrockOffsetTest do

use ExUnit.Case

alias KafkaEx.New.Client
alias KafkaEx.Protocol.OffsetCommit
alias KafkaEx.Protocol.OffsetFetch

@moduletag :new_client

setup do
{:ok, args} = KafkaEx.build_worker_options([])

{:ok, pid} = Client.start_link(args, :no_name)
{:ok, pid} =
KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client)

{:ok, %{client: pid}}
end
Expand Down
7 changes: 2 additions & 5 deletions test/integration/kayrock/record_batch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@ defmodule KafkaEx.KayrockRecordBatchTest do

use ExUnit.Case

alias KafkaEx.New.Client

@moduletag :new_client

setup do
{:ok, args} = KafkaEx.build_worker_options([])

{:ok, pid} = Client.start_link(args, :no_name)
{:ok, pid} =
KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client)

{:ok, %{client: pid}}
end
Expand Down
6 changes: 2 additions & 4 deletions test/integration/kayrock/timestamp_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@ defmodule KafkaEx.KayrockTimestampTest do

use ExUnit.Case

alias KafkaEx.New.Client
alias KafkaEx.TimestampNotSupportedError

require Logger

@moduletag :new_client

setup do
{:ok, args} = KafkaEx.build_worker_options([])

{:ok, pid} = Client.start_link(args, :no_name)
{:ok, pid} =
KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client)

{:ok, %{client: pid}}
end
Expand Down