Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 84 additions & 47 deletions deps/rabbit/src/rabbit_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
?store_proc_name(Name),
ConnectedAt = os:system_time(milli_seconds),
State = #v1{parent = Parent,
ranch_ref = RanchRef,
sock = RealSocket,
Expand All @@ -337,8 +338,7 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
capabilities = [],
auth_mechanism = none,
auth_state = none,
connected_at = os:system_time(
milli_seconds)},
connected_at = ConnectedAt},
callback = uninitialized_callback,
recv_len = 0,
pending_recv = false,
Expand All @@ -362,17 +362,23 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
handshake, 8)]}) of
%% connection was closed cleanly by the client
#v1{connection = #connection{user = #user{username = Username},
vhost = VHost}} ->
rabbit_log_connection:info("closing AMQP connection (~ts, vhost: '~ts', user: '~ts')",
[dynamic_connection_name(Name), VHost, Username]);
vhost = VHost,
connected_at = ConnectedAt0}} ->
ConnName = dynamic_connection_name(Name),
ConnDuration = connection_duration(ConnectedAt0),
rabbit_log_connection:info("closing AMQP connection (~ts, vhost: '~ts', user: '~ts', duration: '~ts')",
[ConnName, VHost, Username, ConnDuration]);
%% just to be more defensive
_ ->
rabbit_log_connection:info("closing AMQP connection (~ts)",
[dynamic_connection_name(Name)])
end
ConnName = dynamic_connection_name(Name),
ConnDuration = connection_duration(ConnectedAt),
rabbit_log_connection:info("closing AMQP connection (~ts, duration: '~ts')",
[ConnName, ConnDuration])
end
catch
Ex ->
log_connection_exception(dynamic_connection_name(Name), Ex)
ConnNameEx = dynamic_connection_name(Name),
log_connection_exception(ConnNameEx, ConnectedAt, Ex)
after
%% We don't call gen_tcp:close/1 here since it waits for
%% pending output to be sent, which results in unnecessary
Expand Down Expand Up @@ -400,56 +406,67 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
end,
done.

log_connection_exception(Name, Ex) ->
log_connection_exception(Name, ConnectedAt, Ex) ->
Severity = case Ex of
connection_closed_with_no_data_received -> debug;
{connection_closed_abruptly, _} -> warning;
connection_closed_abruptly -> warning;
_ -> error
end,
log_connection_exception(Severity, Name, Ex).
log_connection_exception(Severity, Name, ConnectedAt, Ex).

log_connection_exception(Severity, Name, {heartbeat_timeout, TimeoutSec}) ->
log_connection_exception(Severity, Name, ConnectedAt, {heartbeat_timeout, TimeoutSec}) ->
ConnDuration = connection_duration(ConnectedAt),
Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n"
"missed heartbeats from client, timeout: ~ps",
%% Long line to avoid extra spaces and line breaks in log
log_connection_exception_with_severity(Severity,
"closing AMQP connection ~tp (~ts):~n"
"missed heartbeats from client, timeout: ~ps",
[self(), Name, TimeoutSec]);
log_connection_exception(Severity, Name, {connection_closed_abruptly,
#v1{connection = #connection{user = #user{username = Username},
vhost = VHost}}}) ->
log_connection_exception_with_severity(Severity,
"closing AMQP connection ~tp (~ts, vhost: '~ts', user: '~ts'):~nclient unexpectedly closed TCP connection",
[self(), Name, VHost, Username]);
log_connection_exception_with_severity(Severity, Fmt,
[self(), Name, ConnDuration, TimeoutSec]);
log_connection_exception(Severity, Name, _ConnectedAt,
{connection_closed_abruptly,
#v1{connection = #connection{user = #user{username = Username},
vhost = VHost,
connected_at = ConnectedAt}}}) ->
ConnDuration = connection_duration(ConnectedAt),
Fmt = "closing AMQP connection ~tp (~ts, vhost: '~ts', user: '~ts', duration: '~ts'):~n"
"client unexpectedly closed TCP connection",
log_connection_exception_with_severity(Severity, Fmt,
[self(), Name, VHost, Username, ConnDuration]);
%% when client abruptly closes connection before connection.open/authentication/authorization
%% succeeded, don't log username and vhost as 'none'
log_connection_exception(Severity, Name, {connection_closed_abruptly, _}) ->
log_connection_exception_with_severity(Severity,
"closing AMQP connection ~tp (~ts):~nclient unexpectedly closed TCP connection",
[self(), Name]);
log_connection_exception(Severity, Name, ConnectedAt, {connection_closed_abruptly, _}) ->
ConnDuration = connection_duration(ConnectedAt),
Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n"
"client unexpectedly closed TCP connection",
log_connection_exception_with_severity(Severity, Fmt,
[self(), Name, ConnDuration]);
%% failed connection.tune negotiations
log_connection_exception(Severity, Name, {handshake_error, tuning, _Channel,
{exit, #amqp_error{explanation = Explanation},
_Method, _Stacktrace}}) ->
log_connection_exception_with_severity(Severity,
"closing AMQP connection ~tp (~ts):~nfailed to negotiate connection parameters: ~ts",
[self(), Name, Explanation]);
log_connection_exception(Severity, Name, {sasl_required, ProtocolId}) ->
log_connection_exception_with_severity(
Severity,
"closing AMQP 1.0 connection (~ts): RabbitMQ requires SASL "
"security layer (expected protocol ID 3, but client sent protocol ID ~b)",
[Name, ProtocolId]);
log_connection_exception(Severity, Name, ConnectedAt, {handshake_error, tuning, _Channel,
{exit, #amqp_error{explanation = Explanation},
_Method, _Stacktrace}}) ->
ConnDuration = connection_duration(ConnectedAt),
Fmt = "closing AMQP connection ~tp (~ts):~n"
"failed to negotiate connection parameters: ~ts",
log_connection_exception_with_severity(Severity, Fmt, [self(), Name, ConnDuration, Explanation]);
log_connection_exception(Severity, Name, ConnectedAt, {sasl_required, ProtocolId}) ->
ConnDuration = connection_duration(ConnectedAt),
Fmt = "closing AMQP 1.0 connection (~ts, duration: '~ts'): RabbitMQ requires SASL "
"security layer (expected protocol ID 3, but client sent protocol ID ~b)",
log_connection_exception_with_severity(Severity, Fmt,
[Name, ConnDuration, ProtocolId]);
%% old exception structure
log_connection_exception(Severity, Name, connection_closed_abruptly) ->
log_connection_exception_with_severity(Severity,
"closing AMQP connection ~tp (~ts):~n"
"client unexpectedly closed TCP connection",
[self(), Name]);
log_connection_exception(Severity, Name, Ex) ->
log_connection_exception_with_severity(Severity,
"closing AMQP connection ~tp (~ts):~n~tp",
[self(), Name, Ex]).
log_connection_exception(Severity, Name, ConnectedAt, connection_closed_abruptly) ->
ConnDuration = connection_duration(ConnectedAt),
Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n"
"client unexpectedly closed TCP connection",
log_connection_exception_with_severity(Severity, Fmt,
[self(), Name, ConnDuration]);
log_connection_exception(Severity, Name, ConnectedAt, Ex) ->
ConnDuration = connection_duration(ConnectedAt),
Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n"
"~tp",
log_connection_exception_with_severity(Severity, Fmt,
[self(), Name, ConnDuration, Ex]).

log_connection_exception_with_severity(Severity, Fmt, Args) ->
case Severity of
Expand Down Expand Up @@ -1828,3 +1845,23 @@ get_client_value_detail(channel_max, 0) ->
" (no limit)";
get_client_value_detail(_Field, _ClientValue) ->
"".

connection_duration(ConnectedAt) ->
Now = os:system_time(milli_seconds),
DurationMillis = Now - ConnectedAt,
if
DurationMillis >= 1000 ->
DurationSecs = DurationMillis div 1000,
case calendar:seconds_to_daystime(DurationSecs) of
{0, {0, 0, Seconds}} ->
io_lib:format("~Bs", [Seconds]);
{0, {0, Minutes, Seconds}} ->
io_lib:format("~BM, ~Bs", [Minutes, Seconds]);
{0, {Hours, Minutes, Seconds}} ->
io_lib:format("~BH, ~BM, ~Bs", [Hours, Minutes, Seconds]);
{Days, {Hours, Minutes, Seconds}} ->
io_lib:format("~BD, ~BH, ~BM, ~Bs", [Days, Hours, Minutes, Seconds])
end;
true ->
io_lib:format("~Bms", [DurationMillis])
end.