Skip to content

Commit

Permalink
Appliy 'mix format' on the project
Browse files Browse the repository at this point in the history
  • Loading branch information
jbruggem committed Nov 2, 2020
1 parent fbefb5b commit 518925e
Show file tree
Hide file tree
Showing 14 changed files with 175 additions and 82 deletions.
3 changes: 1 addition & 2 deletions lib/kafka_ex/consumer_group/heartbeat.ex
Expand Up @@ -81,9 +81,8 @@ defmodule KafkaEx.ConsumerGroup.Heartbeat do
{:stop, {:shutdown, {:error, error_code}}, state}

{:error, reason} ->
Logger.warn("Heartbeat failed, got error reason #{inspect reason}")
Logger.warn("Heartbeat failed, got error reason #{inspect(reason)}")
{:stop, {:shutdown, {:error, reason}}, state}

end
end
end
4 changes: 3 additions & 1 deletion lib/kafka_ex/new/broker.ex
Expand Up @@ -21,6 +21,8 @@ defmodule KafkaEx.New.Broker do
broker.socket != nil && Socket.open?(broker.socket)
end

def has_socket?(%__MODULE__{socket: %Socket{socket: socket}}, socket), do: true
def has_socket?(%__MODULE__{socket: %Socket{socket: socket}}, socket),
do: true

def has_socket?(_, _), do: false
end
4 changes: 3 additions & 1 deletion lib/kafka_ex/protocol/common.ex
Expand Up @@ -22,7 +22,9 @@ defmodule KafkaEx.Protocol.Common do
mod
) do
struct_module = Module.concat(mod, Response)
{partitions, topics_data} = mod.parse_partitions(partitions_size, rest, [], topic)

{partitions, topics_data} =
mod.parse_partitions(partitions_size, rest, [], topic)

[
%{
Expand Down
6 changes: 5 additions & 1 deletion lib/kafka_ex/protocol/create_topics.ex
Expand Up @@ -54,7 +54,11 @@ defmodule KafkaEx.Protocol.CreateTopics do
defmodule Request do
@moduledoc false
defstruct create_topic_requests: nil, timeout: nil
@type t :: %Request{create_topic_requests: [TopicRequest.t()], timeout: integer}

@type t :: %Request{
create_topic_requests: [TopicRequest.t()],
timeout: integer
}
end

defmodule TopicError do
Expand Down
16 changes: 9 additions & 7 deletions lib/kafka_ex/protocol/join_group.ex
Expand Up @@ -29,13 +29,15 @@ defmodule KafkaEx.Protocol.JoinGroup do
member_id: nil,
members: []

@type t :: %Response{
error_code: atom | integer,
generation_id: integer,
leader_id: binary,
member_id: binary,
members: [binary]
} | {:error, atom}
@type t ::
%Response{
error_code: atom | integer,
generation_id: integer,
leader_id: binary,
member_id: binary,
members: [binary]
}
| {:error, atom}

def leader?(%__MODULE__{member_id: member_id, leader_id: leader_id}) do
member_id == leader_id
Expand Down
8 changes: 5 additions & 3 deletions lib/kafka_ex/protocol/leave_group.ex
Expand Up @@ -14,9 +14,11 @@ defmodule KafkaEx.Protocol.LeaveGroup do

defstruct error_code: nil

@type t :: %Response{
error_code: atom | integer
} | {:error, atom}
@type t ::
%Response{
error_code: atom | integer
}
| {:error, atom}
end

@spec create_request(integer, binary, Request.t()) :: binary
Expand Down
3 changes: 2 additions & 1 deletion lib/kafka_ex/protocol/produce.ex
Expand Up @@ -103,7 +103,8 @@ defmodule KafkaEx.Protocol.Produce do

{message, msize} = create_message(compressed_message_set, nil, attribute)

{[<<0::64-signed>>, <<msize::32-signed>>, message], @int64_size + @int32_size + msize}
{[<<0::64-signed>>, <<msize::32-signed>>, message],
@int64_size + @int32_size + msize}
end

defp create_message_set_uncompressed([
Expand Down
37 changes: 23 additions & 14 deletions lib/kafka_ex/server.ex
Expand Up @@ -401,7 +401,11 @@ defmodule KafkaEx.Server do

produce_request_data =
try do
Produce.create_request(correlation_id, Config.client_id(), produce_request)
Produce.create_request(
correlation_id,
Config.client_id(),
produce_request
)
rescue
e in FunctionClauseError -> nil
end
Expand Down Expand Up @@ -762,16 +766,18 @@ defmodule KafkaEx.Server do

check_brokers_sockets!(brokers)

{correlation_id, metadata} = try do
retrieve_metadata(
brokers,
0,
config_sync_timeout()
)
rescue e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end
{correlation_id, metadata} =
try do
retrieve_metadata(
brokers,
0,
config_sync_timeout()
)
rescue
e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end

state = %State{
metadata: metadata,
Expand Down Expand Up @@ -800,8 +806,9 @@ defmodule KafkaEx.Server do
end

defp check_brokers_sockets!(brokers) do
any_socket_opened = brokers
|> Enum.any?(fn %Broker{socket: socket} -> not is_nil(socket) end)
any_socket_opened =
brokers
|> Enum.any?(fn %Broker{socket: socket} -> not is_nil(socket) end)

if not any_socket_opened do
sleep_for_reconnect()
Expand Down Expand Up @@ -1000,7 +1007,9 @@ defmodule KafkaEx.Server do
Application.get_env(:kafka_ex, :partitioner, KafkaEx.DefaultPartitioner)
end

defp increment_state_correlation_id(%_{correlation_id: correlation_id} = state) do
defp increment_state_correlation_id(
%_{correlation_id: correlation_id} = state
) do
%{state | correlation_id: correlation_id + 1}
end
end
Expand Down
59 changes: 31 additions & 28 deletions lib/kafka_ex/server_0_p_10_and_later.ex
Expand Up @@ -96,31 +96,34 @@ defmodule KafkaEx.Server0P10AndLater do
check_brokers_sockets!(brokers)

{_,
%KafkaEx.Protocol.ApiVersions.Response{
api_versions: api_versions,
error_code: error_code
}, state} = kafka_server_api_versions(%State{brokers: brokers})

%KafkaEx.Protocol.ApiVersions.Response{
api_versions: api_versions,
error_code: error_code
}, state} = kafka_server_api_versions(%State{brokers: brokers})
if error_code == :no_response do
sleep_for_reconnect()
raise "Brokers sockets are closed"
end

:no_error = error_code

api_versions = KafkaEx.ApiVersions.api_versions_map(api_versions)

{correlation_id, metadata} = try do
retrieve_metadata(
brokers,
state.correlation_id,
config_sync_timeout(),
[],
api_versions
)
rescue e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end
{correlation_id, metadata} =
try do
retrieve_metadata(
brokers,
state.correlation_id,
config_sync_timeout(),
[],
api_versions
)
rescue
e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end

state = %State{
metadata: metadata,
Expand Down Expand Up @@ -274,18 +277,18 @@ defmodule KafkaEx.Server0P10AndLater do
{:topic_not_found, state}

_ ->
broker
|> NetworkClient.send_sync_request(
main_request,
config_sync_timeout()
)
|> case do
{:error, reason} ->
{{:error, reason}, increment_state_correlation_id(state)}

response ->
{{:ok, response}, increment_state_correlation_id(state)}
end
broker
|> NetworkClient.send_sync_request(
main_request,
config_sync_timeout()
)
|> case do
{:error, reason} ->
{{:error, reason}, increment_state_correlation_id(state)}

response ->
{{:ok, response}, increment_state_correlation_id(state)}
end
end
end

Expand Down
14 changes: 8 additions & 6 deletions lib/kafka_ex/server_0_p_9_p_0.ex
Expand Up @@ -101,12 +101,14 @@ defmodule KafkaEx.Server0P9P0 do

check_brokers_sockets!(brokers)

{correlation_id, metadata} = try do
retrieve_metadata(brokers, 0, config_sync_timeout())
rescue e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end
{correlation_id, metadata} =
try do
retrieve_metadata(brokers, 0, config_sync_timeout())
rescue
e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end

state = %State{
metadata: metadata,
Expand Down
4 changes: 4 additions & 0 deletions test/integration/new_client_test.exs
Expand Up @@ -104,19 +104,23 @@ defmodule KafkaEx.New.Client.Test do

test "client can receive {:ssl_closed, _}", %{client: client} do
send(client, {:ssl_closed, :unused})

TestHelper.wait_for(fn ->
{:message_queue_len, m} = Process.info(client, :message_queue_len)
m == 0
end)

assert Process.alive?(client)
end

test "client can receive {:tcp_closed, _}", %{client: client} do
send(client, {:tcp_closed, :unused})

TestHelper.wait_for(fn ->
{:message_queue_len, m} = Process.info(client, :message_queue_len)
m == 0
end)

assert Process.alive?(client)
end
end
15 changes: 9 additions & 6 deletions test/kafka_ex/utils/murmur_test.exs
Expand Up @@ -5,12 +5,15 @@ defmodule KafkaEx.Utils.MurmurTest do

test "murmur2 correctly encodes strings" do
# Taken from https://github.com/apache/kafka/blob/8ab0994919752cd4870e771221ba934a6a539a67/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java#L66-L78
assert Murmur.murmur2("21") == -973932308
assert Murmur.murmur2("foobar") == -790332482
assert Murmur.murmur2("a-little-bit-long-string") == -985981536
assert Murmur.murmur2("a-little-bit-longer-string") == -1486304829
assert Murmur.murmur2("lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8") == -58897971
assert Murmur.murmur2("abc") == 479470107
assert Murmur.murmur2("21") == -973_932_308
assert Murmur.murmur2("foobar") == -790_332_482
assert Murmur.murmur2("a-little-bit-long-string") == -985_981_536
assert Murmur.murmur2("a-little-bit-longer-string") == -1_486_304_829

assert Murmur.murmur2("lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8") ==
-58_897_971

assert Murmur.murmur2("abc") == 479_470_107
end

test "umurmur2 correctly encodes strings" do
Expand Down
53 changes: 44 additions & 9 deletions test/protocol/delete_topics_test.exs
Expand Up @@ -3,29 +3,64 @@ defmodule KafkaEx.Protocol.DeleteTopicsTest do

describe "create_request/4" do
test "creates a request to delete a single topic" do
expected_request = <<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 1::32-signed, 6::16, "topic1"::binary, 100::32-signed>>
delete_request = %KafkaEx.Protocol.DeleteTopics.Request{topics: ["topic1"], timeout: 100}
expected_request =
<<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 1::32-signed,
6::16, "topic1"::binary, 100::32-signed>>

delete_response = KafkaEx.Protocol.DeleteTopics.create_request(999, "the-client-id", delete_request, 0)
delete_request = %KafkaEx.Protocol.DeleteTopics.Request{
topics: ["topic1"],
timeout: 100
}

delete_response =
KafkaEx.Protocol.DeleteTopics.create_request(
999,
"the-client-id",
delete_request,
0
)

assert expected_request == delete_response
end

test "creates a request to delete a multiple topic" do
expected_response = <<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 3::32-signed, 6::16, "topic3"::binary, 6::16, "topic2"::binary, 6::16, "topic1"::binary, 100::32-signed>>
expected_response =
<<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 3::32-signed,
6::16, "topic3"::binary, 6::16, "topic2"::binary, 6::16,
"topic1"::binary, 100::32-signed>>

delete_request = %KafkaEx.Protocol.DeleteTopics.Request{
topics: ["topic1", "topic2", "topic3"],
timeout: 100
}

delete_request = %KafkaEx.Protocol.DeleteTopics.Request{topics: ["topic1", "topic2", "topic3"], timeout: 100}
delete_response = KafkaEx.Protocol.DeleteTopics.create_request(999, "the-client-id", delete_request, 0)
delete_response =
KafkaEx.Protocol.DeleteTopics.create_request(
999,
"the-client-id",
delete_request,
0
)

assert expected_response == delete_response
end

test "raise error when non-zero api_version is sent" do
delete_request = %KafkaEx.Protocol.DeleteTopics.Request{topics: ["topic1"], timeout: 100}
delete_request = %KafkaEx.Protocol.DeleteTopics.Request{
topics: ["topic1"],
timeout: 100
}

assert_raise FunctionClauseError,
"no function clause matching in KafkaEx.Protocol.DeleteTopics.create_request/4",
fn -> KafkaEx.Protocol.DeleteTopics.create_request(999, "the-client-id", delete_request, 1) end
"no function clause matching in KafkaEx.Protocol.DeleteTopics.create_request/4",
fn ->
KafkaEx.Protocol.DeleteTopics.create_request(
999,
"the-client-id",
delete_request,
1
)
end
end
end
end

0 comments on commit 518925e

Please sign in to comment.