Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ defmodule KafkaEx do

@type uri() :: [{binary|char_list, number}]
@type worker_init :: [worker_setting]
@type ssl_options :: [{:ssl_ca_cert_file, binary} |
{:ssl_cert_file, binary} |
{:ssl_cert_key_file, binary} |
@type ssl_options :: [{:cacertfile, binary} |
{:certfile, binary} |
{:keyfile, binary} |
{:password, binary}]
@type worker_setting :: {:uris, uri} |
{:consumer_group, binary | :no_consumer_group} |
Expand Down Expand Up @@ -318,8 +318,8 @@ Optional arguments(KeywordList)
defaults = [
uris: Application.get_env(:kafka_ex, :brokers),
consumer_group: Application.get_env(:kafka_ex, :consumer_group),
use_ssl: Application.get_env(:kafka_ex, :use_ssl, false),
ssl_options: Application.get_env(:kafka_ex, :ssl_options, []),
use_ssl: Config.use_ssl(),
ssl_options: Config.ssl_options(),
]

worker_init = Keyword.merge(defaults, worker_init)
Expand Down
10 changes: 4 additions & 6 deletions lib/kafka_ex/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,10 @@ defmodule KafkaEx.Config do
verify_ssl_file(options, key, Keyword.get(options, key))
end

defp verify_ssl_file(options, key, nil) do
# cert file not specified
raise(
ArgumentError,
message: "SSL option #{inspect key} was not set in #{inspect options}"
)
defp verify_ssl_file(options, _key, nil) do
# cert file not present - it will be up to :ssl to determine if this is ok
# given the other settings
options
end
defp verify_ssl_file(options, key, path) do
# make sure the file is readable to us
Expand Down
5 changes: 2 additions & 3 deletions lib/kafka_ex/server_0_p_9_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ defmodule KafkaEx.Server0P9P0 do
Implements kafkaEx.Server behaviors for kafka 0.9.0 API.
"""
use KafkaEx.Server
alias KafkaEx.Config
alias KafkaEx.Protocol.ConsumerMetadata
alias KafkaEx.Protocol.ConsumerMetadata.Response, as: ConsumerMetadataResponse
alias KafkaEx.Protocol.Heartbeat
Expand Down Expand Up @@ -51,8 +50,8 @@ defmodule KafkaEx.Server0P9P0 do
consumer_group = Keyword.get(args, :consumer_group)
true = KafkaEx.valid_consumer_group?(consumer_group)

use_ssl = Config.use_ssl()
ssl_options = Config.ssl_options()
use_ssl = Keyword.get(args, :use_ssl, false)
ssl_options = Keyword.get(args, :ssl_options, [])

brokers = Enum.map(uris, fn({host, port}) -> %Broker{host: host, port: port, socket: NetworkClient.create_socket(host, port, ssl_options, use_ssl)} end)
sync_timeout = Keyword.get(args, :sync_timeout, Application.get_env(:kafka_ex, :sync_timeout, @sync_timeout))
Expand Down
27 changes: 15 additions & 12 deletions test/kafka_ex/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,45 +31,48 @@ defmodule KafkaEx.ConfigTest do
assert [] == Config.ssl_options()
end

test "ssl_options raises an error if cacertfile is missing or invalid" do
test "ssl_options raises an error if cacertfile is invalid" do
Application.put_env(:kafka_ex, :use_ssl, true)
ssl_options = Application.get_env(:kafka_ex, :ssl_options)

key = :cacertfile
without_file = Keyword.delete(ssl_options, key)

Application.put_env(:kafka_ex, :ssl_options, without_file)
assert_raise(ArgumentError, ~r/not set/, &Config.ssl_options/0)
# the option may be omitted - it is up to :ssl to determine if this is ok
without_key = Keyword.delete(ssl_options, key)
Application.put_env(:kafka_ex, :ssl_options, without_key)
assert without_key == Config.ssl_options()

with_invalid_file = Keyword.put(ssl_options, key, "./should_not_exist")
Application.put_env(:kafka_ex, :ssl_options, with_invalid_file)
assert_raise(ArgumentError, ~r/could not/, &Config.ssl_options/0)
end

test "ssl_options raises an error if certfile is missing or invalid" do
test "ssl_options raises an error if certfile is invalid" do
Application.put_env(:kafka_ex, :use_ssl, true)
ssl_options = Application.get_env(:kafka_ex, :ssl_options)

key = :certfile
without_file = Keyword.delete(ssl_options, key)

Application.put_env(:kafka_ex, :ssl_options, without_file)
assert_raise(ArgumentError, ~r/not set/, &Config.ssl_options/0)
# the option may be omitted - it is up to :ssl to determine if this is ok
without_key = Keyword.delete(ssl_options, key)
Application.put_env(:kafka_ex, :ssl_options, without_key)
assert without_key == Config.ssl_options()

with_invalid_file = Keyword.put(ssl_options, key, "./should_not_exist")
Application.put_env(:kafka_ex, :ssl_options, with_invalid_file)
assert_raise(ArgumentError, ~r/could not/, &Config.ssl_options/0)
end

test "ssl_options raises an error if keyfile is missing or invalid" do
test "ssl_options raises an error if keyfile is invalid" do
Application.put_env(:kafka_ex, :use_ssl, true)
ssl_options = Application.get_env(:kafka_ex, :ssl_options)

key = :keyfile
without_file = Keyword.delete(ssl_options, key)

Application.put_env(:kafka_ex, :ssl_options, without_file)
assert_raise(ArgumentError, ~r/not set/, &Config.ssl_options/0)
# the option may be omitted - it is up to :ssl to determine if this is ok
without_key = Keyword.delete(ssl_options, key)
Application.put_env(:kafka_ex, :ssl_options, without_key)
assert without_key == Config.ssl_options()

with_invalid_file = Keyword.put(ssl_options, key, "./should_not_exist")
Application.put_env(:kafka_ex, :ssl_options, with_invalid_file)
Expand Down