Skip to content

Commit

Permalink
stream coordinator: further logging improvements
Browse files Browse the repository at this point in the history
Also increased the tick timeout to avoid checking for new rabbit nodes
to auto add too often.

Also increased sleep times for nodedowns to retry less often.
  • Loading branch information
kjnilsson committed Aug 26, 2021
1 parent e167cbc commit df8d976
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 35 deletions.
70 changes: 36 additions & 34 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Expand Up @@ -646,8 +646,8 @@ phase_start_replica(StreamId, #{epoch := Epoch,
fun() ->
try osiris_replica:start(Node, Conf0) of
{ok, Pid} ->
rabbit_log:debug("~s: ~s: replica started on ~s in ~b pid ~w",
[?MODULE, StreamId, Node, Epoch, Pid]),
rabbit_log:info("~s: ~s: replica started on ~s in ~b pid ~w",
[?MODULE, StreamId, Node, Epoch, Pid]),
send_self_command({member_started, StreamId,
Args#{pid => Pid}});
{error, already_present} ->
Expand All @@ -662,13 +662,13 @@ phase_start_replica(StreamId, #{epoch := Epoch,
send_self_command({member_started, StreamId,
Args#{pid => Pid}});
{error, Reason} ->
rabbit_log:warning("~s: Error while starting replica for ~s on node ~s in ~b : ~W",
[?MODULE, maps:get(name, Conf0), Node, Epoch, Reason, 10]),
maybe_sleep(Reason),
rabbit_log:warning("~s: Error while starting replica for ~s : ~W",
[?MODULE, maps:get(name, Conf0), Reason, 10]),
send_action_failed(StreamId, starting, Args)
catch _:Error ->
rabbit_log:warning("~s: Error while starting replica for ~s : ~W",
[?MODULE, maps:get(name, Conf0), Error, 10]),
rabbit_log:warning("~s: Error while starting replica for ~s on node ~s in ~b : ~W",
[?MODULE, maps:get(name, Conf0), Node, Epoch, Error, 10]),
maybe_sleep(Error),
send_action_failed(StreamId, starting, Args)
end
Expand Down Expand Up @@ -710,25 +710,25 @@ phase_stop_member(StreamId, #{node := Node,
[?MODULE, StreamId, Node, Epoch, Tail]),
send_self_command({member_stopped, StreamId, Arg});
Err ->
rabbit_log:warning("Stream coordinator failed to get tail
of member ~s ~w Error: ~w",
[StreamId, Node, Err]),
rabbit_log:warning("~s: failed to get tail of member ~s on ~s in ~b Error: ~w",
[?MODULE, StreamId, Node, Epoch, Err]),
maybe_sleep(Err),
send_action_failed(StreamId, stopping, Arg0)
catch _:Err ->
rabbit_log:warning("Stream coordinator failed to get
tail of member ~s ~w Error: ~w",
[StreamId, Node, Err]),
rabbit_log:warning("~s: failed to get tail of member ~s on ~s in ~b Error: ~w",
[?MODULE, StreamId, Node, Epoch, Err]),
maybe_sleep(Err),
send_action_failed(StreamId, stopping, Arg0)
end;
Err ->
rabbit_log:warning("Stream coordinator failed to stop
member ~s ~w Error: ~w",
[StreamId, Node, Err]),
rabbit_log:warning("~s: failed to stop "
"member ~s ~w Error: ~w",
[?MODULE, StreamId, Node, Err]),
maybe_sleep(Err),
send_action_failed(StreamId, stopping, Arg0)
catch _:Err ->
rabbit_log:warning("Stream coordinator failed to stop
member ~s ~w Error: ~w",
[StreamId, Node, Err]),
rabbit_log:warning("~s: failed to stop member ~s ~w Error: ~w",
[?MODULE, StreamId, Node, Err]),
maybe_sleep(Err),
send_action_failed(StreamId, stopping, Arg0)
end
Expand All @@ -740,19 +740,18 @@ phase_start_writer(StreamId, #{epoch := Epoch,
try osiris_writer:start(Conf) of
{ok, Pid} ->
Args = Args0#{epoch => Epoch, pid => Pid},
rabbit_log:warning("~s: started writer ~s on ~w in ~b",
[?MODULE, StreamId, Node, Epoch]),
rabbit_log:info("~s: started writer ~s on ~w in ~b",
[?MODULE, StreamId, Node, Epoch]),
send_self_command({member_started, StreamId, Args});
Err ->
%% no sleep for writer failures
rabbit_log:warning("~s: failed to start
writer ~s ~w Error: ~w",
[?MODULE, StreamId, Node, Err]),
%% no sleep for writer failures as we want to trigger a new
%% election asap
rabbit_log:warning("~s: failed to start writer ~s on ~s in ~b Error: ~w",
[?MODULE, StreamId, Node, Epoch, Err]),
send_action_failed(StreamId, starting, Args0)
catch _:Err ->
rabbit_log:warning("~s: failed to start
writer ~s ~w Error: ~w",
[?MODULE, StreamId, Node, Err]),
rabbit_log:warning("~s: failed to start writer ~s on ~s in ~b Error: ~w",
[?MODULE, StreamId, Node, Epoch, Err]),
send_action_failed(StreamId, starting, Args0)
end
end.
Expand All @@ -763,14 +762,13 @@ phase_update_retention(StreamId, #{pid := Pid,
try osiris:update_retention(Pid, Retention) of
ok ->
send_self_command({retention_updated, StreamId, Args});
{error, Err} ->
rabbit_log:warning("~s: failed to update
retention for ~s ~w Error: ~w",
[?MODULE, StreamId, node(Pid), Err]),
{error, Reason} = Err ->
rabbit_log:warning("~s: failed to update retention for ~s ~w Reason: ~w",
[?MODULE, StreamId, node(Pid), Reason]),
maybe_sleep(Err),
send_action_failed(StreamId, update_retention, Args)
catch _:Err ->
rabbit_log:warning("~s: failed to update
retention for ~s ~w Error: ~w",
rabbit_log:warning("~s: failed to update retention for ~s ~w Error: ~w",
[?MODULE, StreamId, node(Pid), Err]),
maybe_sleep(Err),
send_action_failed(StreamId, update_retention, Args)
Expand Down Expand Up @@ -1512,9 +1510,13 @@ select_leader(Offsets) ->
Node.

maybe_sleep({{nodedown, _}, _}) ->
timer:sleep(5000);
timer:sleep(10000);
maybe_sleep({noproc, _}) ->
timer:sleep(5000);
maybe_sleep({error, nodedown}) ->
timer:sleep(5000);
maybe_sleep({error, _}) ->
timer:sleep(5000);
maybe_sleep(_) ->
ok.

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_stream_coordinator.hrl
@@ -1,6 +1,6 @@

-define(STREAM_COORDINATOR_STARTUP, {stream_coordinator_startup, self()}).
-define(TICK_TIMEOUT, 1000).
-define(TICK_TIMEOUT, 30000).
-define(RESTART_TIMEOUT, 1000).
-define(PHASE_RETRY_TIMEOUT, 10000).
-define(CMD_TIMEOUT, 30000).
Expand Down

0 comments on commit df8d976

Please sign in to comment.