Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule KafkaEx do
alias KafkaEx.Server
alias KafkaEx.Stream

@type uri() :: [{binary|char_list, number}]
@type uri() :: [{binary|[char], number}]
@type worker_init :: [worker_setting]
@type ssl_options :: [{:cacertfile, binary} |
{:certfile, binary} |
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/consumer_group/partition_assignment.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ defmodule KafkaEx.ConsumerGroup.PartitionAssignment do
@typedoc """
A function that can assign partitions.

`members` is a list of member IDs and `partitions` is a list of partitions
`members` is a list of member IDs and `partitions` is a list of partitions
that need to be assigned to a group member.

The return value must be a map with member IDs as keys and a list of
Expand Down
4 changes: 0 additions & 4 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,6 @@ defmodule KafkaEx.GenConsumer do

defp consume(
%State{
worker_name: worker_name,
topic: topic,
partition: partition,
current_offset: offset,
Expand Down Expand Up @@ -554,9 +553,6 @@ defmodule KafkaEx.GenConsumer do
worker_name: worker_name,
topic: topic,
partition: partition,
current_offset: current_offset,
committed_offset: committed_offset,
acked_offset: acked_offset,
auto_offset_reset: auto_offset_reset
} = state
) do
Expand Down
4 changes: 2 additions & 2 deletions lib/kafka_ex/network_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ defmodule KafkaEx.NetworkClient do
response
end

@spec format_host(binary) :: char_list | :inet.ip_address
@spec format_host(binary) :: [char] | :inet.ip_address
def format_host(host) do
case Regex.scan(~r/^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/, host) do
[match_data] = [[_, _, _, _, _]] -> match_data |> tl |> List.flatten |> Enum.map(&String.to_integer/1) |> List.to_tuple
_ -> to_char_list(host)
_ -> apply(String, :to_char_list, [host]) # to_char_list is deprecated from Elixir 1.3 onward

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line is too long (max is 80, was 99).

end
end

Expand Down
6 changes: 3 additions & 3 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ defmodule KafkaEx.Server do
def terminate(_, state) do
Logger.log(:debug, "Shutting down worker #{inspect state.worker_name}")
if state.event_pid do
GenEvent.stop(state.event_pid)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably dead code, there should be no use of GenEvent in Kafkaex AFAIK

:gen_event.stop(state.event_pid)
end
Enum.each(state.brokers, fn(broker) -> NetworkClient.close_socket(broker.socket) end)
end
Expand Down Expand Up @@ -501,9 +501,9 @@ defmodule KafkaEx.Server do
end

defp remove_stale_brokers(brokers, metadata_brokers) do
{brokers_to_keep, brokers_to_remove} = Enum.partition(brokers, fn(broker) ->
{brokers_to_keep, brokers_to_remove} = apply(Enum, :partition, [brokers, fn(broker) ->

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line is too long (max is 80, was 94).

Enum.find_value(metadata_brokers, &(broker.node_id == -1 || (broker.node_id == &1.node_id) && broker.socket && Socket.info(broker.socket)))
end)
end])
case length(brokers_to_keep) do
0 -> brokers_to_remove
_ -> Enum.each(brokers_to_remove, fn(broker) ->
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/server_0_p_9_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ defmodule KafkaEx.Server0P9P0 do
state
) do
unless consumer_group?(state) do
raise ConnsumerGroupRequiredError, request
raise ConsumerGroupRequiredError, request
end

{broker, state} = broker_for_consumer_group_with_update(state)
Expand Down
4 changes: 4 additions & 0 deletions lib/kafka_ex/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ defmodule KafkaEx.Stream do
{:error, __MODULE__}
end

def slice(_stream) do
{:error, __MODULE__}
end

######################################################################
# Main stream flow control

Expand Down