From 3440e374d06295cddbf0b49a5d6bb1386b086d0f Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 4 Sep 2024 11:50:52 -0700 Subject: [PATCH 1/3] Add connection duration to "closing AMQP connection..." msgs A while back, @mkuratczyk noted that we keep the timestamp of when a connection is established in the connection state and related ETS table. This PR uses the `connected_at` timestamp to calculate the duration of the connection, to make it easier to identify short-running connections via the log files. --- deps/rabbit/src/rabbit_reader.erl | 131 +++++++++++++++++++----------- 1 file changed, 84 insertions(+), 47 deletions(-) diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index 228d12ba2ac9..8c77727dceee 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -318,6 +318,7 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) -> {PeerHost, PeerPort, Host, Port} = socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), ?store_proc_name(Name), + ConnectedAt = os:system_time(milli_seconds), State = #v1{parent = Parent, ranch_ref = RanchRef, sock = RealSocket, @@ -337,8 +338,7 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) -> capabilities = [], auth_mechanism = none, auth_state = none, - connected_at = os:system_time( - milli_seconds)}, + connected_at = ConnectedAt}, callback = uninitialized_callback, recv_len = 0, pending_recv = false, @@ -362,17 +362,23 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) -> handshake, 8)]}) of %% connection was closed cleanly by the client #v1{connection = #connection{user = #user{username = Username}, - vhost = VHost}} -> - rabbit_log_connection:info("closing AMQP connection (~ts, vhost: '~ts', user: '~ts')", - [dynamic_connection_name(Name), VHost, Username]); + vhost = VHost, + connected_at = ConnectedAt0}} -> + ConnName = dynamic_connection_name(Name), + ConnDuration = connection_duration(ConnectedAt0), + rabbit_log_connection:info("closing AMQP connection (~ts, vhost: '~ts', user: '~ts', duration: '~ts')", + [ConnName, VHost, Username, ConnDuration]); %% just to be more defensive _ -> - rabbit_log_connection:info("closing AMQP connection (~ts)", - [dynamic_connection_name(Name)]) - end + ConnName = dynamic_connection_name(Name), + ConnDuration = connection_duration(ConnectedAt), + rabbit_log_connection:info("closing AMQP connection (~ts, duration: '~ts')", + [ConnName, ConnDuration]) + end catch Ex -> - log_connection_exception(dynamic_connection_name(Name), Ex) + ConnNameEx = dynamic_connection_name(Name), + log_connection_exception(ConnNameEx, ConnectedAt, Ex) after %% We don't call gen_tcp:close/1 here since it waits for %% pending output to be sent, which results in unnecessary @@ -400,56 +406,67 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) -> end, done. -log_connection_exception(Name, Ex) -> +log_connection_exception(Name, ConnectedAt, Ex) -> Severity = case Ex of connection_closed_with_no_data_received -> debug; {connection_closed_abruptly, _} -> warning; connection_closed_abruptly -> warning; _ -> error end, - log_connection_exception(Severity, Name, Ex). + log_connection_exception(Severity, Name, ConnectedAt, Ex). -log_connection_exception(Severity, Name, {heartbeat_timeout, TimeoutSec}) -> +log_connection_exception(Severity, Name, ConnectedAt, {heartbeat_timeout, TimeoutSec}) -> + ConnDuration = connection_duration(ConnectedAt), + Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n" + "missed heartbeats from client, timeout: ~ps", %% Long line to avoid extra spaces and line breaks in log - log_connection_exception_with_severity(Severity, - "closing AMQP connection ~tp (~ts):~n" - "missed heartbeats from client, timeout: ~ps", - [self(), Name, TimeoutSec]); -log_connection_exception(Severity, Name, {connection_closed_abruptly, - #v1{connection = #connection{user = #user{username = Username}, - vhost = VHost}}}) -> - log_connection_exception_with_severity(Severity, - "closing AMQP connection ~tp (~ts, vhost: '~ts', user: '~ts'):~nclient unexpectedly closed TCP connection", - [self(), Name, VHost, Username]); + log_connection_exception_with_severity(Severity, Fmt, + [self(), Name, ConnDuration, TimeoutSec]); +log_connection_exception(Severity, Name, _ConnectedAt, + {connection_closed_abruptly, + #v1{connection = #connection{user = #user{username = Username}, + vhost = VHost, + connected_at = ConnectedAt}}}) -> + ConnDuration = connection_duration(ConnectedAt), + Fmt = "closing AMQP connection ~tp (~ts, vhost: '~ts', user: '~ts', duration: '~ts'):~n" + "client unexpectedly closed TCP connection", + log_connection_exception_with_severity(Severity, Fmt, + [self(), Name, VHost, Username, ConnDuration]); %% when client abruptly closes connection before connection.open/authentication/authorization %% succeeded, don't log username and vhost as 'none' -log_connection_exception(Severity, Name, {connection_closed_abruptly, _}) -> - log_connection_exception_with_severity(Severity, - "closing AMQP connection ~tp (~ts):~nclient unexpectedly closed TCP connection", - [self(), Name]); +log_connection_exception(Severity, Name, ConnectedAt, {connection_closed_abruptly, _}) -> + ConnDuration = connection_duration(ConnectedAt), + Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n" + "client unexpectedly closed TCP connection", + log_connection_exception_with_severity(Severity, Fmt, + [self(), Name, ConnDuration]); %% failed connection.tune negotiations -log_connection_exception(Severity, Name, {handshake_error, tuning, _Channel, - {exit, #amqp_error{explanation = Explanation}, - _Method, _Stacktrace}}) -> - log_connection_exception_with_severity(Severity, - "closing AMQP connection ~tp (~ts):~nfailed to negotiate connection parameters: ~ts", - [self(), Name, Explanation]); -log_connection_exception(Severity, Name, {sasl_required, ProtocolId}) -> - log_connection_exception_with_severity( - Severity, - "closing AMQP 1.0 connection (~ts): RabbitMQ requires SASL " - "security layer (expected protocol ID 3, but client sent protocol ID ~b)", - [Name, ProtocolId]); +log_connection_exception(Severity, Name, ConnectedAt, {handshake_error, tuning, _Channel, + {exit, #amqp_error{explanation = Explanation}, + _Method, _Stacktrace}}) -> + ConnDuration = connection_duration(ConnectedAt), + Fmt = "closing AMQP connection ~tp (~ts):~n" + "failed to negotiate connection parameters: ~ts", + log_connection_exception_with_severity(Severity, Fmt, [self(), Name, ConnDuration, Explanation]); +log_connection_exception(Severity, Name, ConnectedAt, {sasl_required, ProtocolId}) -> + ConnDuration = connection_duration(ConnectedAt), + Fmt = "closing AMQP 1.0 connection (~ts, duration: '~ts'): RabbitMQ requires SASL " + "security layer (expected protocol ID 3, but client sent protocol ID ~b)", + log_connection_exception_with_severity(Severity, Fmt, + [Name, ConnDuration, ProtocolId]); %% old exception structure -log_connection_exception(Severity, Name, connection_closed_abruptly) -> - log_connection_exception_with_severity(Severity, - "closing AMQP connection ~tp (~ts):~n" - "client unexpectedly closed TCP connection", - [self(), Name]); -log_connection_exception(Severity, Name, Ex) -> - log_connection_exception_with_severity(Severity, - "closing AMQP connection ~tp (~ts):~n~tp", - [self(), Name, Ex]). +log_connection_exception(Severity, Name, ConnectedAt, connection_closed_abruptly) -> + ConnDuration = connection_duration(ConnectedAt), + Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n" + "client unexpectedly closed TCP connection", + log_connection_exception_with_severity(Severity, Fmt, + [self(), Name, ConnDuration]); +log_connection_exception(Severity, Name, ConnectedAt, Ex) -> + ConnDuration = connection_duration(ConnectedAt), + Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n" + "~tp", + log_connection_exception_with_severity(Severity, Fmt, + [self(), Name, ConnDuration, Ex]). log_connection_exception_with_severity(Severity, Fmt, Args) -> case Severity of @@ -1828,3 +1845,23 @@ get_client_value_detail(channel_max, 0) -> " (no limit)"; get_client_value_detail(_Field, _ClientValue) -> "". + +connection_duration(ConnectedAt) -> + Now = os:system_time(milli_seconds), + DurationMillis = Now - ConnectedAt, + if + DurationMillis >= 1000 -> + DurationSecs = DurationMillis div 1000, + case calendar:seconds_to_daystime(DurationSecs) of + {0, {0, 0, Seconds}} -> + io_lib:format("~Bs", [Seconds]); + {0, {0, Minutes, Seconds}} -> + io_lib:format("~BM, ~Bs", [Minutes, Seconds]); + {0, {Hours, Minutes, Seconds}} -> + io_lib:format("~BH, ~BM, ~Bs", [Hours, Minutes, Seconds]); + {Days, {Hours, Minutes, Seconds}} -> + io_lib:format("~BD, ~BH, ~BM, ~Bs", [Days, Hours, Minutes, Seconds]) + end; + true -> + io_lib:format("~Bms", [DurationMillis]) + end. From 4299e1ddc38916b7a0cb644a82f377d82f42691e Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 4 Sep 2024 21:33:38 -0400 Subject: [PATCH 2/3] Do not quote connection duration It cannot contain spaces like username, virtual host and user-provided connection name can. --- deps/rabbit/src/rabbit_reader.erl | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index 8c77727dceee..740a51622784 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -366,13 +366,13 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) -> connected_at = ConnectedAt0}} -> ConnName = dynamic_connection_name(Name), ConnDuration = connection_duration(ConnectedAt0), - rabbit_log_connection:info("closing AMQP connection (~ts, vhost: '~ts', user: '~ts', duration: '~ts')", + rabbit_log_connection:info("closing AMQP connection (~ts, vhost: '~ts', user: '~ts', duration: ~ts)", [ConnName, VHost, Username, ConnDuration]); %% just to be more defensive _ -> ConnName = dynamic_connection_name(Name), ConnDuration = connection_duration(ConnectedAt), - rabbit_log_connection:info("closing AMQP connection (~ts, duration: '~ts')", + rabbit_log_connection:info("closing AMQP connection (~ts, duration: ~ts)", [ConnName, ConnDuration]) end catch @@ -417,7 +417,7 @@ log_connection_exception(Name, ConnectedAt, Ex) -> log_connection_exception(Severity, Name, ConnectedAt, {heartbeat_timeout, TimeoutSec}) -> ConnDuration = connection_duration(ConnectedAt), - Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n" + Fmt = "closing AMQP connection ~tp (~ts, duration: ~ts):~n" "missed heartbeats from client, timeout: ~ps", %% Long line to avoid extra spaces and line breaks in log log_connection_exception_with_severity(Severity, Fmt, @@ -428,7 +428,7 @@ log_connection_exception(Severity, Name, _ConnectedAt, vhost = VHost, connected_at = ConnectedAt}}}) -> ConnDuration = connection_duration(ConnectedAt), - Fmt = "closing AMQP connection ~tp (~ts, vhost: '~ts', user: '~ts', duration: '~ts'):~n" + Fmt = "closing AMQP connection ~tp (~ts, vhost: '~ts', user: '~ts', duration: ~ts):~n" "client unexpectedly closed TCP connection", log_connection_exception_with_severity(Severity, Fmt, [self(), Name, VHost, Username, ConnDuration]); @@ -436,7 +436,7 @@ log_connection_exception(Severity, Name, _ConnectedAt, %% succeeded, don't log username and vhost as 'none' log_connection_exception(Severity, Name, ConnectedAt, {connection_closed_abruptly, _}) -> ConnDuration = connection_duration(ConnectedAt), - Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n" + Fmt = "closing AMQP connection ~tp (~ts, duration: ~ts):~n" "client unexpectedly closed TCP connection", log_connection_exception_with_severity(Severity, Fmt, [self(), Name, ConnDuration]); @@ -450,20 +450,20 @@ log_connection_exception(Severity, Name, ConnectedAt, {handshake_error, tuning, log_connection_exception_with_severity(Severity, Fmt, [self(), Name, ConnDuration, Explanation]); log_connection_exception(Severity, Name, ConnectedAt, {sasl_required, ProtocolId}) -> ConnDuration = connection_duration(ConnectedAt), - Fmt = "closing AMQP 1.0 connection (~ts, duration: '~ts'): RabbitMQ requires SASL " + Fmt = "closing AMQP 1.0 connection (~ts, duration: ~ts): RabbitMQ requires SASL " "security layer (expected protocol ID 3, but client sent protocol ID ~b)", log_connection_exception_with_severity(Severity, Fmt, [Name, ConnDuration, ProtocolId]); %% old exception structure log_connection_exception(Severity, Name, ConnectedAt, connection_closed_abruptly) -> ConnDuration = connection_duration(ConnectedAt), - Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n" + Fmt = "closing AMQP connection ~tp (~ts, duration: ~ts):~n" "client unexpectedly closed TCP connection", log_connection_exception_with_severity(Severity, Fmt, [self(), Name, ConnDuration]); log_connection_exception(Severity, Name, ConnectedAt, Ex) -> ConnDuration = connection_duration(ConnectedAt), - Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n" + Fmt = "closing AMQP connection ~tp (~ts, duration: ~ts):~n" "~tp", log_connection_exception_with_severity(Severity, Fmt, [self(), Name, ConnDuration, Ex]). From a866ad3fd5561c3842f9061ef863f280d6009df2 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 4 Sep 2024 22:02:49 -0400 Subject: [PATCH 3/3] Revert "Do not quote connection duration" This reverts commit 4299e1ddc38916b7a0cb644a82f377d82f42691e. It can contain spaces as it is formatted to human-readable values such as '4m, 36s' --- deps/rabbit/src/rabbit_reader.erl | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index 740a51622784..8c77727dceee 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -366,13 +366,13 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) -> connected_at = ConnectedAt0}} -> ConnName = dynamic_connection_name(Name), ConnDuration = connection_duration(ConnectedAt0), - rabbit_log_connection:info("closing AMQP connection (~ts, vhost: '~ts', user: '~ts', duration: ~ts)", + rabbit_log_connection:info("closing AMQP connection (~ts, vhost: '~ts', user: '~ts', duration: '~ts')", [ConnName, VHost, Username, ConnDuration]); %% just to be more defensive _ -> ConnName = dynamic_connection_name(Name), ConnDuration = connection_duration(ConnectedAt), - rabbit_log_connection:info("closing AMQP connection (~ts, duration: ~ts)", + rabbit_log_connection:info("closing AMQP connection (~ts, duration: '~ts')", [ConnName, ConnDuration]) end catch @@ -417,7 +417,7 @@ log_connection_exception(Name, ConnectedAt, Ex) -> log_connection_exception(Severity, Name, ConnectedAt, {heartbeat_timeout, TimeoutSec}) -> ConnDuration = connection_duration(ConnectedAt), - Fmt = "closing AMQP connection ~tp (~ts, duration: ~ts):~n" + Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n" "missed heartbeats from client, timeout: ~ps", %% Long line to avoid extra spaces and line breaks in log log_connection_exception_with_severity(Severity, Fmt, @@ -428,7 +428,7 @@ log_connection_exception(Severity, Name, _ConnectedAt, vhost = VHost, connected_at = ConnectedAt}}}) -> ConnDuration = connection_duration(ConnectedAt), - Fmt = "closing AMQP connection ~tp (~ts, vhost: '~ts', user: '~ts', duration: ~ts):~n" + Fmt = "closing AMQP connection ~tp (~ts, vhost: '~ts', user: '~ts', duration: '~ts'):~n" "client unexpectedly closed TCP connection", log_connection_exception_with_severity(Severity, Fmt, [self(), Name, VHost, Username, ConnDuration]); @@ -436,7 +436,7 @@ log_connection_exception(Severity, Name, _ConnectedAt, %% succeeded, don't log username and vhost as 'none' log_connection_exception(Severity, Name, ConnectedAt, {connection_closed_abruptly, _}) -> ConnDuration = connection_duration(ConnectedAt), - Fmt = "closing AMQP connection ~tp (~ts, duration: ~ts):~n" + Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n" "client unexpectedly closed TCP connection", log_connection_exception_with_severity(Severity, Fmt, [self(), Name, ConnDuration]); @@ -450,20 +450,20 @@ log_connection_exception(Severity, Name, ConnectedAt, {handshake_error, tuning, log_connection_exception_with_severity(Severity, Fmt, [self(), Name, ConnDuration, Explanation]); log_connection_exception(Severity, Name, ConnectedAt, {sasl_required, ProtocolId}) -> ConnDuration = connection_duration(ConnectedAt), - Fmt = "closing AMQP 1.0 connection (~ts, duration: ~ts): RabbitMQ requires SASL " + Fmt = "closing AMQP 1.0 connection (~ts, duration: '~ts'): RabbitMQ requires SASL " "security layer (expected protocol ID 3, but client sent protocol ID ~b)", log_connection_exception_with_severity(Severity, Fmt, [Name, ConnDuration, ProtocolId]); %% old exception structure log_connection_exception(Severity, Name, ConnectedAt, connection_closed_abruptly) -> ConnDuration = connection_duration(ConnectedAt), - Fmt = "closing AMQP connection ~tp (~ts, duration: ~ts):~n" + Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n" "client unexpectedly closed TCP connection", log_connection_exception_with_severity(Severity, Fmt, [self(), Name, ConnDuration]); log_connection_exception(Severity, Name, ConnectedAt, Ex) -> ConnDuration = connection_duration(ConnectedAt), - Fmt = "closing AMQP connection ~tp (~ts, duration: ~ts):~n" + Fmt = "closing AMQP connection ~tp (~ts, duration: '~ts'):~n" "~tp", log_connection_exception_with_severity(Severity, Fmt, [self(), Name, ConnDuration, Ex]).