Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

safe_gen_call catches more exit exceptions #492

Merged
merged 6 commits into from
Feb 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 13 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
* 3.16.2
* Update kafka_protocol from 4.0.1 to 4.0.3.
Prior to this change the actual time spent in establishing a
Kafka connection might be longer than desired due to the timeout
being used in SSL upgrade (if enabled), then API version query.
This has been fixed by turning the given timeout config
into a deadline, and the sub-steps will try to meet the deadline.
see more details here: https://github.com/kafka4beam/kafka_protocol/pull/9
* Catch `timeout` and other `DOWN` reasons when making `gen_server` call to
`brod_client`, `brod_consumer` and producer/consumer supervisor,
and return as `Reason` in `{error, Reason}`.
Previously only `noproc` reaon is caught. (#492)
* Propagate `connect_timeout` config to `kpro` API functions as `timeout` arg
affected APIs: connect_group_coordinator, create_topics, delete_topics,
resolve_offset, fetch, fold, fetch_committed_offsets
resolve_offset, fetch, fold, fetch_committed_offsets (#458)
* Fix bad field name in group describe request (#486)
* 3.16.1
* Fix `brod` script in `brod-cli` in release.
* Support `rebalance_timeout` consumer group option
Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{deps, [ {supervisor3, "1.1.11"}
, {kafka_protocol, "4.0.1"}
, {kafka_protocol, "4.0.3"}
, {snappyer, "1.2.8"}
]}.
{edoc_opts, [{preprocess, true}, {macros, [{build_brod_cli, true}]}]}.
Expand Down
6 changes: 4 additions & 2 deletions src/brod.erl
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ get_partitions_count(Client, Topic) ->
-spec get_consumer(client(), topic(), partition()) ->
{ok, pid()} | {error, Reason}
when Reason :: client_down
| {consumer_down, noproc}
| {client_down, any()}
| {consumer_down, any()}
| {consumer_not_found, topic()}
| {consumer_not_found, topic(), partition()}.
get_consumer(Client, Topic, Partition) ->
Expand All @@ -465,7 +466,8 @@ get_consumer(Client, Topic, Partition) ->
-spec get_producer(client(), topic(), partition()) ->
{ok, pid()} | {error, Reason}
when Reason :: client_down
| {producer_down, noproc}
| {client_down, any()}
| {producer_down, any()}
| {producer_not_found, topic()}
| {producer_not_found, topic(), partition()}.
get_producer(Client, Topic, Partition) ->
Expand Down
22 changes: 13 additions & 9 deletions src/brod_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@
| ?CONSUMER_KEY(topic(), partition()).

-type get_producer_error() :: client_down
| {producer_down, noproc}
| {client_down, any()}
| {producer_down, any()}
| {producer_not_found, topic()}
| { producer_not_found
, topic()
, partition()}.
| {producer_not_found, topic(), partition()}.

-type get_consumer_error() :: client_down
| {consumer_down, noproc}
| {client_down, any()}
| {consumer_down, any()}
| {consumer_not_found, topic()}
| {consumer_not_found, topic(), partition()}.

Expand Down Expand Up @@ -829,16 +829,20 @@ ensure_partition_workers(TopicName, State, F) ->
end
end).

%% Catch noproc exit exception when making gen_server:call.
%% Catches exit exceptions when making gen_server:call.
-spec safe_gen_call(pid() | atom(), Call, Timeout) -> Return
when Call :: term(),
Timeout :: infinity | integer(),
Return :: ok | {ok, term()} | {error, client_down | term()}.
Return :: ok | {ok, term()} | {error, Reason},
Reason :: client_down | {client_down, any()} | any().
safe_gen_call(Server, Call, Timeout) ->
try
gen_server:call(Server, Call, Timeout)
catch exit : {noproc, _} ->
{error, client_down}
catch
exit : {noproc, _} ->
{error, client_down};
exit : {Reason, _} ->
{error, {client_down, Reason}}
end.

-spec kf(kpro:field_name(), kpro:struct()) -> kpro:field_value().
Expand Down
9 changes: 5 additions & 4 deletions src/brod_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -794,16 +794,17 @@ reset_buffer(#state{ pending_acks = #pending_acks{queue = Queue}
, last_req_ref = ?undef
}.

%% Catch noproc exit exception when making gen_server:call.
%% Catch exit exceptions when making gen_server:call.
-spec safe_gen_call(pid() | atom(), Call, Timeout) -> Return
when Call :: term(),
Timeout :: infinity | integer(),
Return :: ok | {ok, term()} | {error, consumer_down | term()}.
Return :: ok | {ok, term()} | {error, any()}.
safe_gen_call(Server, Call, Timeout) ->
try
gen_server:call(Server, Call, Timeout)
catch exit : {noproc, _} ->
{error, consumer_down}
catch
exit : {Reason, _} ->
{error, Reason}
end.

%% Init payload connection regardless of subscriber state.
Expand Down
6 changes: 3 additions & 3 deletions src/brod_consumers_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ stop_consumer(SupPid, TopicName) ->
{ok, pid()} | {error, Reason} when
Reason :: {consumer_not_found, brod:topic()}
| {consumer_not_found, brod:topic(), brod:partition()}
| {consumer_down, noproc}.
| {consumer_down, any()}.
find_consumer(SupPid, Topic, Partition) ->
case supervisor3:find_child(SupPid, Topic) of
[] ->
Expand All @@ -83,8 +83,8 @@ find_consumer(SupPid, Topic, Partition) ->
[Pid] ->
{ok, Pid}
end
catch exit : {noproc, _} ->
{error, {consumer_down, noproc}}
catch exit : {Reason, _} ->
{error, {consumer_down, Reason}}
end
end.

Expand Down
6 changes: 3 additions & 3 deletions src/brod_producers_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ stop_producer(SupPid, TopicName) ->
{ok, pid()} | {error, Reason} when
Reason :: {producer_not_found, brod:topic()}
| {producer_not_found, brod:topic(), brod:partition()}
| {producer_down, noproc}.
| {producer_down, any()}.
find_producer(SupPid, Topic, Partition) ->
case supervisor3:find_child(SupPid, Topic) of
[] ->
Expand All @@ -87,8 +87,8 @@ find_producer(SupPid, Topic, Partition) ->
[Pid] ->
{ok, Pid}
end
catch exit : {noproc, _} ->
{error, {producer_down, noproc}}
catch exit : {Reason, _} ->
{error, {producer_down, Reason}}
end
end.

Expand Down