diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 0000eaed..7623be5e 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -17,9 +17,9 @@ defmodule KafkaEx do @type uri() :: [{binary|char_list, number}] @type worker_init :: [worker_setting] - @type ssl_options :: [{:ssl_ca_cert_file, binary} | - {:ssl_cert_file, binary} | - {:ssl_cert_key_file, binary} | + @type ssl_options :: [{:cacertfile, binary} | + {:certfile, binary} | + {:keyfile, binary} | {:password, binary}] @type worker_setting :: {:uris, uri} | {:consumer_group, binary | :no_consumer_group} | @@ -318,8 +318,8 @@ Optional arguments(KeywordList) defaults = [ uris: Application.get_env(:kafka_ex, :brokers), consumer_group: Application.get_env(:kafka_ex, :consumer_group), - use_ssl: Application.get_env(:kafka_ex, :use_ssl, false), - ssl_options: Application.get_env(:kafka_ex, :ssl_options, []), + use_ssl: Config.use_ssl(), + ssl_options: Config.ssl_options(), ] worker_init = Keyword.merge(defaults, worker_init) diff --git a/lib/kafka_ex/config.ex b/lib/kafka_ex/config.ex index 456d7f75..d43817a1 100644 --- a/lib/kafka_ex/config.ex +++ b/lib/kafka_ex/config.ex @@ -49,12 +49,10 @@ defmodule KafkaEx.Config do verify_ssl_file(options, key, Keyword.get(options, key)) end - defp verify_ssl_file(options, key, nil) do - # cert file not specified - raise( - ArgumentError, - message: "SSL option #{inspect key} was not set in #{inspect options}" - ) + defp verify_ssl_file(options, _key, nil) do + # cert file not present - it will be up to :ssl to determine if this is ok + # given the other settings + options end defp verify_ssl_file(options, key, path) do # make sure the file is readable to us diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index f7c6a045..c09f48b4 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -3,7 +3,6 @@ defmodule KafkaEx.Server0P9P0 do Implements kafkaEx.Server behaviors for kafka 0.9.0 API. """ use KafkaEx.Server - alias KafkaEx.Config alias KafkaEx.Protocol.ConsumerMetadata alias KafkaEx.Protocol.ConsumerMetadata.Response, as: ConsumerMetadataResponse alias KafkaEx.Protocol.Heartbeat @@ -51,8 +50,8 @@ defmodule KafkaEx.Server0P9P0 do consumer_group = Keyword.get(args, :consumer_group) true = KafkaEx.valid_consumer_group?(consumer_group) - use_ssl = Config.use_ssl() - ssl_options = Config.ssl_options() + use_ssl = Keyword.get(args, :use_ssl, false) + ssl_options = Keyword.get(args, :ssl_options, []) brokers = Enum.map(uris, fn({host, port}) -> %Broker{host: host, port: port, socket: NetworkClient.create_socket(host, port, ssl_options, use_ssl)} end) sync_timeout = Keyword.get(args, :sync_timeout, Application.get_env(:kafka_ex, :sync_timeout, @sync_timeout)) diff --git a/test/kafka_ex/config_test.exs b/test/kafka_ex/config_test.exs index 81001c78..2a084acf 100644 --- a/test/kafka_ex/config_test.exs +++ b/test/kafka_ex/config_test.exs @@ -31,45 +31,48 @@ defmodule KafkaEx.ConfigTest do assert [] == Config.ssl_options() end - test "ssl_options raises an error if cacertfile is missing or invalid" do + test "ssl_options raises an error if cacertfile is invalid" do Application.put_env(:kafka_ex, :use_ssl, true) ssl_options = Application.get_env(:kafka_ex, :ssl_options) key = :cacertfile - without_file = Keyword.delete(ssl_options, key) - Application.put_env(:kafka_ex, :ssl_options, without_file) - assert_raise(ArgumentError, ~r/not set/, &Config.ssl_options/0) + # the option may be omitted - it is up to :ssl to determine if this is ok + without_key = Keyword.delete(ssl_options, key) + Application.put_env(:kafka_ex, :ssl_options, without_key) + assert without_key == Config.ssl_options() with_invalid_file = Keyword.put(ssl_options, key, "./should_not_exist") Application.put_env(:kafka_ex, :ssl_options, with_invalid_file) assert_raise(ArgumentError, ~r/could not/, &Config.ssl_options/0) end - test "ssl_options raises an error if certfile is missing or invalid" do + test "ssl_options raises an error if certfile is invalid" do Application.put_env(:kafka_ex, :use_ssl, true) ssl_options = Application.get_env(:kafka_ex, :ssl_options) key = :certfile - without_file = Keyword.delete(ssl_options, key) - Application.put_env(:kafka_ex, :ssl_options, without_file) - assert_raise(ArgumentError, ~r/not set/, &Config.ssl_options/0) + # the option may be omitted - it is up to :ssl to determine if this is ok + without_key = Keyword.delete(ssl_options, key) + Application.put_env(:kafka_ex, :ssl_options, without_key) + assert without_key == Config.ssl_options() with_invalid_file = Keyword.put(ssl_options, key, "./should_not_exist") Application.put_env(:kafka_ex, :ssl_options, with_invalid_file) assert_raise(ArgumentError, ~r/could not/, &Config.ssl_options/0) end - test "ssl_options raises an error if keyfile is missing or invalid" do + test "ssl_options raises an error if keyfile is invalid" do Application.put_env(:kafka_ex, :use_ssl, true) ssl_options = Application.get_env(:kafka_ex, :ssl_options) key = :keyfile - without_file = Keyword.delete(ssl_options, key) - Application.put_env(:kafka_ex, :ssl_options, without_file) - assert_raise(ArgumentError, ~r/not set/, &Config.ssl_options/0) + # the option may be omitted - it is up to :ssl to determine if this is ok + without_key = Keyword.delete(ssl_options, key) + Application.put_env(:kafka_ex, :ssl_options, without_key) + assert without_key == Config.ssl_options() with_invalid_file = Keyword.put(ssl_options, key, "./should_not_exist") Application.put_env(:kafka_ex, :ssl_options, with_invalid_file)