diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 2fde081d..97c7111d 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -25,7 +25,7 @@ defmodule KafkaEx do alias KafkaEx.Server alias KafkaEx.Stream - @type uri() :: [{binary|char_list, number}] + @type uri() :: [{binary|[char], number}] @type worker_init :: [worker_setting] @type ssl_options :: [{:cacertfile, binary} | {:certfile, binary} | diff --git a/lib/kafka_ex/consumer_group/partition_assignment.ex b/lib/kafka_ex/consumer_group/partition_assignment.ex index d2c53ecc..b082710a 100644 --- a/lib/kafka_ex/consumer_group/partition_assignment.ex +++ b/lib/kafka_ex/consumer_group/partition_assignment.ex @@ -32,7 +32,7 @@ defmodule KafkaEx.ConsumerGroup.PartitionAssignment do @typedoc """ A function that can assign partitions. - `members` is a list of member IDs and `partitions` is a list of partitions + `members` is a list of member IDs and `partitions` is a list of partitions that need to be assigned to a group member. The return value must be a map with member IDs as keys and a list of diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index 4ccfe38b..df390c5a 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -492,7 +492,6 @@ defmodule KafkaEx.GenConsumer do defp consume( %State{ - worker_name: worker_name, topic: topic, partition: partition, current_offset: offset, @@ -554,9 +553,6 @@ defmodule KafkaEx.GenConsumer do worker_name: worker_name, topic: topic, partition: partition, - current_offset: current_offset, - committed_offset: committed_offset, - acked_offset: acked_offset, auto_offset_reset: auto_offset_reset } = state ) do diff --git a/lib/kafka_ex/network_client.ex b/lib/kafka_ex/network_client.ex index 6f82515c..9ea1a89a 100644 --- a/lib/kafka_ex/network_client.ex +++ b/lib/kafka_ex/network_client.ex @@ -51,11 +51,11 @@ defmodule KafkaEx.NetworkClient do response end - @spec format_host(binary) :: char_list | :inet.ip_address + @spec format_host(binary) :: [char] | :inet.ip_address def format_host(host) do case Regex.scan(~r/^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/, host) do [match_data] = [[_, _, _, _, _]] -> match_data |> tl |> List.flatten |> Enum.map(&String.to_integer/1) |> List.to_tuple - _ -> to_char_list(host) + _ -> apply(String, :to_char_list, [host]) # to_char_list is deprecated from Elixir 1.3 onward end end diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index c2faae38..34b96dcc 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -275,7 +275,7 @@ defmodule KafkaEx.Server do def terminate(_, state) do Logger.log(:debug, "Shutting down worker #{inspect state.worker_name}") if state.event_pid do - GenEvent.stop(state.event_pid) + :gen_event.stop(state.event_pid) end Enum.each(state.brokers, fn(broker) -> NetworkClient.close_socket(broker.socket) end) end @@ -501,9 +501,9 @@ defmodule KafkaEx.Server do end defp remove_stale_brokers(brokers, metadata_brokers) do - {brokers_to_keep, brokers_to_remove} = Enum.partition(brokers, fn(broker) -> + {brokers_to_keep, brokers_to_remove} = apply(Enum, :partition, [brokers, fn(broker) -> Enum.find_value(metadata_brokers, &(broker.node_id == -1 || (broker.node_id == &1.node_id) && broker.socket && Socket.info(broker.socket))) - end) + end]) case length(brokers_to_keep) do 0 -> brokers_to_remove _ -> Enum.each(brokers_to_remove, fn(broker) -> diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index 3b9bee5c..e55cce1e 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -129,7 +129,7 @@ defmodule KafkaEx.Server0P9P0 do state ) do unless consumer_group?(state) do - raise ConnsumerGroupRequiredError, request + raise ConsumerGroupRequiredError, request end {broker, state} = broker_for_consumer_group_with_update(state) diff --git a/lib/kafka_ex/stream.ex b/lib/kafka_ex/stream.ex index e4effd78..cdf1cf57 100644 --- a/lib/kafka_ex/stream.ex +++ b/lib/kafka_ex/stream.ex @@ -44,6 +44,10 @@ defmodule KafkaEx.Stream do {:error, __MODULE__} end + def slice(_stream) do + {:error, __MODULE__} + end + ###################################################################### # Main stream flow control