diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 688eef45..2fde081d 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -471,7 +471,18 @@ defmodule KafkaEx do } end - defp build_worker_options(worker_init) do + @doc """ + Builds options to be used with workers + + Merges the given options with defaults from the application env config. + Returns p{:error, :invalid_consumer_options}` if the consumer group + configuation is invalid, and `{:ok, merged_options}` otherwise. + + Note this happens automatically when using `KafkaEx.create_worker`. + """ + @spec build_worker_options(worker_init) :: + {:ok, worker_init} | {:error, :invalid_consumer_group} + def build_worker_options(worker_init) do defaults = [ uris: Application.get_env(:kafka_ex, :brokers), consumer_group: Application.get_env(:kafka_ex, :consumer_group), diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index 7cbcdc23..444d79fc 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -3,6 +3,7 @@ defmodule KafkaEx.Server do Defines the KafkaEx.Server behavior that all Kafka API servers must implement, this module also provides some common callback functions that are injected into the servers that `use` it. """ + alias KafkaEx.NetworkClient alias KafkaEx.Protocol.ConsumerMetadata alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest @@ -21,6 +22,9 @@ defmodule KafkaEx.Server do defmodule State do @moduledoc false + alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse + alias KafkaEx.Protocol.Metadata.Broker + defstruct( metadata: %Metadata.Response{}, brokers: [], @@ -47,6 +51,21 @@ defmodule KafkaEx.Server do ssl_options: KafkaEx.ssl_options, use_ssl: boolean } + + @spec increment_correlation_id(t) :: t + def increment_correlation_id(state = %State{correlation_id: cid}) do + %{state | correlation_id: cid + 1} + end + + @spec broker_for_partition(t, binary, integer) :: Broker.t | nil + def broker_for_partition(state, topic, partition) do + MetadataResponse.broker_for_topic( + state.metadata, + state.brokers, + topic, + partition + ) + end end @callback kafka_server_init(args :: [term]) :: @@ -376,6 +395,110 @@ defmodule KafkaEx.Server do kafka_server_metadata: 2, kafka_server_update_metadata: 1, ] + defp kafka_common_init(args, name) do + use_ssl = Keyword.get(args, :use_ssl, false) + ssl_options = Keyword.get(args, :ssl_options, []) + + uris = Keyword.get(args, :uris, []) + metadata_update_interval = Keyword.get( + args, + :metadata_update_interval, + @metadata_update_interval + ) + + brokers = for {host, port} <- uris do + connect_broker(host, port, ssl_options, use_ssl) + end + + {correlation_id, metadata} = retrieve_metadata( + brokers, + 0, + config_sync_timeout() + ) + + state = %State{ + metadata: metadata, + brokers: brokers, + correlation_id: correlation_id, + metadata_update_interval: metadata_update_interval, + ssl_options: ssl_options, + use_ssl: use_ssl, + worker_name: name + } + + state = update_metadata(state) + {:ok, _} = :timer.send_interval( + state.metadata_update_interval, + :update_metadata + ) + + state + end + + defp connect_broker(host, port, ssl_opts, use_ssl) do + %Broker{ + host: host, + port: port, + socket: NetworkClient.create_socket(host, port, ssl_opts, use_ssl) + } + end + + defp client_request(request, state) do + %{ + request | + client_id: @client_id, + correlation_id: state.correlation_id + } + end + + # gets the broker for a given partition, updating metadata if necessary + # returns {broker, maybe_updated_state} + defp broker_for_partition_with_update(state, topic, partition) do + case State.broker_for_partition(state, topic, partition) do + nil -> + updated_state = update_metadata(state) + { + State.broker_for_partition(updated_state, topic, partition), + updated_state + } + broker -> + {broker, state} + end + end + + # assumes module.create_request(request) and module.parse_response + # both work + defp network_request(request, module, state) do + {broker, updated_state} = broker_for_partition_with_update( + state, + request.topic, + request.partition + ) + + case broker do + nil -> + Logger.error(fn -> + "Leader for topic #{request.topic} is not available" + end) + {{:error, :topic_not_found}, updated_state} + _ -> + wire_request = request + |> client_request(updated_state) + |> module.create_request + + response = broker + |> NetworkClient.send_sync_request( + wire_request, + config_sync_timeout() + ) + |> module.parse_response + + state_out = State.increment_correlation_id(updated_state) + + {response, state_out} + end + end + defp remove_stale_brokers(brokers, metadata_brokers) do {brokers_to_keep, brokers_to_remove} = Enum.partition(brokers, fn(broker) -> Enum.find_value(metadata_brokers, &(broker.node_id == -1 || (broker.node_id == &1.node_id) && broker.socket && Socket.info(broker.socket))) diff --git a/lib/kafka_ex/server_0_p_8_p_0.ex b/lib/kafka_ex/server_0_p_8_p_0.ex index b6745793..a6c0d3ff 100644 --- a/lib/kafka_ex/server_0_p_8_p_0.ex +++ b/lib/kafka_ex/server_0_p_8_p_0.ex @@ -18,25 +18,21 @@ defmodule KafkaEx.Server0P8P0 do use KafkaEx.Server alias KafkaEx.Protocol.Fetch - alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest - alias KafkaEx.Protocol.Metadata.Broker - alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse - alias KafkaEx.Server.State - alias KafkaEx.NetworkClient def kafka_server_init([args]) do kafka_server_init([args, self()]) end def kafka_server_init([args, name]) do - uris = Keyword.get(args, :uris, []) - metadata_update_interval = Keyword.get(args, :metadata_update_interval, @metadata_update_interval) - brokers = Enum.map(uris, fn({host, port}) -> %Broker{host: host, port: port, socket: NetworkClient.create_socket(host, port)} end) - {correlation_id, metadata} = retrieve_metadata(brokers, 0, config_sync_timeout()) - state = %State{metadata: metadata, brokers: brokers, correlation_id: correlation_id, metadata_update_interval: metadata_update_interval, worker_name: name} - # Get the initial "real" broker list and start a regular refresh cycle. - state = update_metadata(state) - {:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata) + # warn if ssl is configured + if Keyword.get(args, :use_ssl) do + Logger.warn(fn -> + "KafkaEx is being configured to use ssl with a broker version that " <> + "does not support ssl" + end) + end + + state = kafka_common_init(args, name) {:ok, state} end @@ -67,28 +63,10 @@ defmodule KafkaEx.Server0P8P0 do def kafka_server_heartbeat(_, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" def kafka_server_update_consumer_metadata(_state), do: raise "Consumer Group Metadata is not supported in 0.8.0 version of kafka" - defp fetch(fetch_request, state) do - fetch_data = Fetch.create_request(%FetchRequest{ - fetch_request | - client_id: @client_id, - correlation_id: state.correlation_id, - }) - {broker, state} = case MetadataResponse.broker_for_topic(state.metadata, state.brokers, fetch_request.topic, fetch_request.partition) do - nil -> - updated_state = update_metadata(state) - {MetadataResponse.broker_for_topic(state.metadata, state.brokers, fetch_request.topic, fetch_request.partition), updated_state} - broker -> {broker, state} - end - - case broker do - nil -> - Logger.log(:error, "Leader for topic #{fetch_request.topic} is not available") - {:topic_not_found, state} - _ -> - response = broker - |> NetworkClient.send_sync_request(fetch_data, config_sync_timeout()) - |> Fetch.parse_response - {response, %{state | correlation_id: state.correlation_id + 1}} + defp fetch(request, state) do + case network_request(request, Fetch, state) do + {{:error, error}, state_out} -> {error, state_out} + {response, state_out} -> {response, state_out} end end end diff --git a/scripts/ci_tests.sh b/scripts/ci_tests.sh index a853c67b..48dd5b95 100755 --- a/scripts/ci_tests.sh +++ b/scripts/ci_tests.sh @@ -25,5 +25,5 @@ then echo "First tests passed, skipping repeat" else echo "Repeating tests" - mix "$TEST_COMMAND" --include integration --include consumer_group --include server_0_p_9_p_0 + mix "$TEST_COMMAND" --include integration --include consumer_group --include server_0_p_9_p_0 --include server_0_p_8_p_0 fi diff --git a/scripts/docker_up.sh b/scripts/docker_up.sh index 953fb0e2..5ae17653 100755 --- a/scripts/docker_up.sh +++ b/scripts/docker_up.sh @@ -48,4 +48,4 @@ done docker-compose up -d # create topics needed for testing -docker-compose exec kafka3 /bin/bash -c "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 KAFKA_PORT=9094 KAFKA_CREATE_TOPICS=consumer_group_implementation_test:4:2 create-topics.sh" +docker-compose exec kafka3 /bin/bash -c "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 KAFKA_PORT=9094 KAFKA_CREATE_TOPICS=consumer_group_implementation_test:4:2,test0p8p0:4:2 create-topics.sh" diff --git a/test/integration/server0_p_8_p_0_test.exs b/test/integration/server0_p_8_p_0_test.exs new file mode 100644 index 00000000..a2cd5b26 --- /dev/null +++ b/test/integration/server0_p_8_p_0_test.exs @@ -0,0 +1,56 @@ +defmodule KafkaEx.Server0P8P0.Test do + use ExUnit.Case + import TestHelper + + @moduletag :server_0_p_8_p_0 + + alias KafkaEx.Server0P8P0, as: Server + + @topic "test0p8p0" + + setup do + {:ok, args} = KafkaEx.build_worker_options([]) + {:ok, worker} = Server.start_link(args, :no_name) + + # we don't want to crash if the worker crashes + Process.unlink(worker) + + on_exit fn -> + if Process.alive?(worker) do + Process.exit(worker, :normal) + end + end + + {:ok, [worker: worker]} + end + + test "can produce and fetch a message", %{worker: worker}do + now = :erlang.monotonic_time + msg = "test message #{now}" + partition = 0 + :ok = KafkaEx.produce(@topic, partition, msg, worker_name: worker) + + wait_for(fn -> + [got] = KafkaEx.fetch( + @topic, + partition, + worker_name: worker, + offset: 1, + auto_commit: false + ) + [got_partition] = got.partitions + Enum.any?(got_partition.message_set, fn(m) -> m.value == msg end) + end) + end + + test "when the partition is not found", %{worker: worker} do + partition = 42 + assert :topic_not_found == KafkaEx.fetch( + @topic, + partition, + worker_name: worker, + offset: 1, + auto_commit: false + ) + end +end