From ff51c14a3fd4e485d1a6772bfd985c9194a3aadb Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Sat, 14 Jan 2017 10:53:34 -0500 Subject: [PATCH 1/4] Fix per-worker ssl options, random->rand --- lib/kafka_ex.ex | 10 +++++----- lib/kafka_ex/config.ex | 8 +++----- lib/kafka_ex/server_0_p_9_p_0.ex | 5 ++--- test/kafka_ex/config_test.exs | 21 +++------------------ test/test_helper.exs | 4 ++-- 5 files changed, 15 insertions(+), 33 deletions(-) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 0000eaed..7623be5e 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -17,9 +17,9 @@ defmodule KafkaEx do @type uri() :: [{binary|char_list, number}] @type worker_init :: [worker_setting] - @type ssl_options :: [{:ssl_ca_cert_file, binary} | - {:ssl_cert_file, binary} | - {:ssl_cert_key_file, binary} | + @type ssl_options :: [{:cacertfile, binary} | + {:certfile, binary} | + {:keyfile, binary} | {:password, binary}] @type worker_setting :: {:uris, uri} | {:consumer_group, binary | :no_consumer_group} | @@ -318,8 +318,8 @@ Optional arguments(KeywordList) defaults = [ uris: Application.get_env(:kafka_ex, :brokers), consumer_group: Application.get_env(:kafka_ex, :consumer_group), - use_ssl: Application.get_env(:kafka_ex, :use_ssl, false), - ssl_options: Application.get_env(:kafka_ex, :ssl_options, []), + use_ssl: Config.use_ssl(), + ssl_options: Config.ssl_options(), ] worker_init = Keyword.merge(defaults, worker_init) diff --git a/lib/kafka_ex/config.ex b/lib/kafka_ex/config.ex index 456d7f75..1098d312 100644 --- a/lib/kafka_ex/config.ex +++ b/lib/kafka_ex/config.ex @@ -50,11 +50,9 @@ defmodule KafkaEx.Config do end defp verify_ssl_file(options, key, nil) do - # cert file not specified - raise( - ArgumentError, - message: "SSL option #{inspect key} was not set in #{inspect options}" - ) + # cert file not present - it will be up to :ssl to determine if this is ok + # given the other settings + options end defp verify_ssl_file(options, key, path) do # make sure the file is readable to us diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index f7c6a045..c09f48b4 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -3,7 +3,6 @@ defmodule KafkaEx.Server0P9P0 do Implements kafkaEx.Server behaviors for kafka 0.9.0 API. """ use KafkaEx.Server - alias KafkaEx.Config alias KafkaEx.Protocol.ConsumerMetadata alias KafkaEx.Protocol.ConsumerMetadata.Response, as: ConsumerMetadataResponse alias KafkaEx.Protocol.Heartbeat @@ -51,8 +50,8 @@ defmodule KafkaEx.Server0P9P0 do consumer_group = Keyword.get(args, :consumer_group) true = KafkaEx.valid_consumer_group?(consumer_group) - use_ssl = Config.use_ssl() - ssl_options = Config.ssl_options() + use_ssl = Keyword.get(args, :use_ssl, false) + ssl_options = Keyword.get(args, :ssl_options, []) brokers = Enum.map(uris, fn({host, port}) -> %Broker{host: host, port: port, socket: NetworkClient.create_socket(host, port, ssl_options, use_ssl)} end) sync_timeout = Keyword.get(args, :sync_timeout, Application.get_env(:kafka_ex, :sync_timeout, @sync_timeout)) diff --git a/test/kafka_ex/config_test.exs b/test/kafka_ex/config_test.exs index 81001c78..33fa6021 100644 --- a/test/kafka_ex/config_test.exs +++ b/test/kafka_ex/config_test.exs @@ -31,46 +31,31 @@ defmodule KafkaEx.ConfigTest do assert [] == Config.ssl_options() end - test "ssl_options raises an error if cacertfile is missing or invalid" do + test "ssl_options raises an error if cacertfile is invalid" do Application.put_env(:kafka_ex, :use_ssl, true) ssl_options = Application.get_env(:kafka_ex, :ssl_options) key = :cacertfile - without_file = Keyword.delete(ssl_options, key) - - Application.put_env(:kafka_ex, :ssl_options, without_file) - assert_raise(ArgumentError, ~r/not set/, &Config.ssl_options/0) - with_invalid_file = Keyword.put(ssl_options, key, "./should_not_exist") Application.put_env(:kafka_ex, :ssl_options, with_invalid_file) assert_raise(ArgumentError, ~r/could not/, &Config.ssl_options/0) end - test "ssl_options raises an error if certfile is missing or invalid" do + test "ssl_options raises an error if certfile is invalid" do Application.put_env(:kafka_ex, :use_ssl, true) ssl_options = Application.get_env(:kafka_ex, :ssl_options) key = :certfile - without_file = Keyword.delete(ssl_options, key) - - Application.put_env(:kafka_ex, :ssl_options, without_file) - assert_raise(ArgumentError, ~r/not set/, &Config.ssl_options/0) - with_invalid_file = Keyword.put(ssl_options, key, "./should_not_exist") Application.put_env(:kafka_ex, :ssl_options, with_invalid_file) assert_raise(ArgumentError, ~r/could not/, &Config.ssl_options/0) end - test "ssl_options raises an error if keyfile is missing or invalid" do + test "ssl_options raises an error if keyfile is invalid" do Application.put_env(:kafka_ex, :use_ssl, true) ssl_options = Application.get_env(:kafka_ex, :ssl_options) key = :keyfile - without_file = Keyword.delete(ssl_options, key) - - Application.put_env(:kafka_ex, :ssl_options, without_file) - assert_raise(ArgumentError, ~r/not set/, &Config.ssl_options/0) - with_invalid_file = Keyword.put(ssl_options, key, "./should_not_exist") Application.put_env(:kafka_ex, :ssl_options, with_invalid_file) assert_raise(ArgumentError, ~r/could not/, &Config.ssl_options/0) diff --git a/test/test_helper.exs b/test/test_helper.exs index ff42a582..188232f4 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -4,8 +4,8 @@ ExUnit.configure exclude: [integration: true, consumer_group: true, server_0_p_9 defmodule TestHelper do def generate_random_string(string_length \\ 20) do - :random.seed(:os.timestamp) - Enum.map(1..string_length, fn _ -> (:random.uniform * 25 + 65) |> round end) |> to_string + :rand.seed(:os.timestamp) + Enum.map(1..string_length, fn _ -> (:rand.uniform * 25 + 65) |> round end) |> to_string end # Wait for the return value of value_getter to pass the predicate condn From 6b18546c2b1dc688b8b29c9724233ecad6122e39 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Sat, 14 Jan 2017 11:08:24 -0500 Subject: [PATCH 2/4] Remove :rand.seed, clean up warning :rand no longer requires explicit seeding --- lib/kafka_ex/config.ex | 2 +- test/test_helper.exs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/kafka_ex/config.ex b/lib/kafka_ex/config.ex index 1098d312..d43817a1 100644 --- a/lib/kafka_ex/config.ex +++ b/lib/kafka_ex/config.ex @@ -49,7 +49,7 @@ defmodule KafkaEx.Config do verify_ssl_file(options, key, Keyword.get(options, key)) end - defp verify_ssl_file(options, key, nil) do + defp verify_ssl_file(options, _key, nil) do # cert file not present - it will be up to :ssl to determine if this is ok # given the other settings options diff --git a/test/test_helper.exs b/test/test_helper.exs index 188232f4..9fa17671 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -4,7 +4,6 @@ ExUnit.configure exclude: [integration: true, consumer_group: true, server_0_p_9 defmodule TestHelper do def generate_random_string(string_length \\ 20) do - :rand.seed(:os.timestamp) Enum.map(1..string_length, fn _ -> (:rand.uniform * 25 + 65) |> round end) |> to_string end From 201ee2057da5207552d419237a150a61c0108fde Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Sat, 14 Jan 2017 11:24:39 -0500 Subject: [PATCH 3/4] Back out :random->:rand changes I was just trying to clean this up but :rand is not available on Erlang 17 and we should be trying to get rid of the random topic names anyways. --- test/test_helper.exs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/test_helper.exs b/test/test_helper.exs index 9fa17671..ff42a582 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -4,7 +4,8 @@ ExUnit.configure exclude: [integration: true, consumer_group: true, server_0_p_9 defmodule TestHelper do def generate_random_string(string_length \\ 20) do - Enum.map(1..string_length, fn _ -> (:rand.uniform * 25 + 65) |> round end) |> to_string + :random.seed(:os.timestamp) + Enum.map(1..string_length, fn _ -> (:random.uniform * 25 + 65) |> round end) |> to_string end # Wait for the return value of value_getter to pass the predicate condn From fe0fe1faa7a463ca909dfa727c084df0f8c2e83b Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Sat, 14 Jan 2017 15:28:16 -0500 Subject: [PATCH 4/4] Test that omitting some ssl certs is ok --- test/kafka_ex/config_test.exs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/kafka_ex/config_test.exs b/test/kafka_ex/config_test.exs index 33fa6021..2a084acf 100644 --- a/test/kafka_ex/config_test.exs +++ b/test/kafka_ex/config_test.exs @@ -36,6 +36,12 @@ defmodule KafkaEx.ConfigTest do ssl_options = Application.get_env(:kafka_ex, :ssl_options) key = :cacertfile + + # the option may be omitted - it is up to :ssl to determine if this is ok + without_key = Keyword.delete(ssl_options, key) + Application.put_env(:kafka_ex, :ssl_options, without_key) + assert without_key == Config.ssl_options() + with_invalid_file = Keyword.put(ssl_options, key, "./should_not_exist") Application.put_env(:kafka_ex, :ssl_options, with_invalid_file) assert_raise(ArgumentError, ~r/could not/, &Config.ssl_options/0) @@ -46,6 +52,12 @@ defmodule KafkaEx.ConfigTest do ssl_options = Application.get_env(:kafka_ex, :ssl_options) key = :certfile + + # the option may be omitted - it is up to :ssl to determine if this is ok + without_key = Keyword.delete(ssl_options, key) + Application.put_env(:kafka_ex, :ssl_options, without_key) + assert without_key == Config.ssl_options() + with_invalid_file = Keyword.put(ssl_options, key, "./should_not_exist") Application.put_env(:kafka_ex, :ssl_options, with_invalid_file) assert_raise(ArgumentError, ~r/could not/, &Config.ssl_options/0) @@ -56,6 +68,12 @@ defmodule KafkaEx.ConfigTest do ssl_options = Application.get_env(:kafka_ex, :ssl_options) key = :keyfile + + # the option may be omitted - it is up to :ssl to determine if this is ok + without_key = Keyword.delete(ssl_options, key) + Application.put_env(:kafka_ex, :ssl_options, without_key) + assert without_key == Config.ssl_options() + with_invalid_file = Keyword.put(ssl_options, key, "./should_not_exist") Application.put_env(:kafka_ex, :ssl_options, with_invalid_file) assert_raise(ArgumentError, ~r/could not/, &Config.ssl_options/0)