Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disk monitor strikes back! (backport #5726) #5739

Merged
merged 3 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@ suites = [
name = "disconnect_detected_during_alarm_SUITE",
size = "medium",
),
rabbitmq_integration_suite(
PACKAGE,
name = "disk_monitor_SUITE",
size = "medium",
),
rabbitmq_integration_suite(
PACKAGE,
name = "dynamic_ha_SUITE",
Expand Down Expand Up @@ -912,11 +917,6 @@ suites = [
name = "unit_credit_flow_SUITE",
size = "medium",
),
rabbitmq_integration_suite(
PACKAGE,
name = "unit_disk_monitor_mocks_SUITE",
size = "medium",
),
rabbitmq_integration_suite(
PACKAGE,
name = "unit_disk_monitor_SUITE",
Expand Down
130 changes: 101 additions & 29 deletions deps/rabbit/src/rabbit_disk_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@
%% on start-up
retries,
%% Interval between retries
interval
interval,
%% Operating system in use
os,
%% Port running sh to execute df commands
port
}).

%%----------------------------------------------------------------------------
Expand Down Expand Up @@ -97,9 +101,9 @@ get_max_check_interval() ->
set_max_check_interval(Interval) ->
gen_server:call(?MODULE, {set_max_check_interval, Interval}).

-spec get_disk_free() -> (integer() | 'unknown').
-spec get_disk_free() -> (integer() | 'NaN').
get_disk_free() ->
safe_ets_lookup(disk_free, unknown).
safe_ets_lookup(disk_free, 'NaN').

-spec set_enabled(string()) -> 'ok'.
set_enabled(Enabled) ->
Expand All @@ -114,6 +118,9 @@ start_link(Args) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).

init([Limit]) ->
process_flag(trap_exit, true),
process_flag(priority, low),

Dir = dir(),
{ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries),
{ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval),
Expand All @@ -126,7 +133,21 @@ init([Limit]) ->
interval = Interval},
State1 = set_min_check_interval(?DEFAULT_MIN_DISK_CHECK_INTERVAL, State0),
State2 = set_max_check_interval(?DEFAULT_MAX_DISK_CHECK_INTERVAL, State1),
{ok, enable(State2)}.

OS = os:type(),
Port = case OS of
{unix, _} ->
start_portprogram();
{win32, _OSname} ->
not_used;
_ ->
exit({unsupported_os, OS})
end,
State3 = State2#state{port=Port, os=OS},

State4 = enable(State3),

{ok, State4}.

handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) ->
rabbit_log:info("Cannot set disk free limit: "
Expand All @@ -149,6 +170,7 @@ handle_call({set_enabled, _Enabled = true}, _From, State) ->
start_timer(set_disk_limits(State, State#state.limit)),
rabbit_log:info("Free disk space monitor was enabled"),
{reply, ok, State#state{enabled = true}};

handle_call({set_enabled, _Enabled = false}, _From, State) ->
erlang:cancel_timer(State#state.timer),
rabbit_log:info("Free disk space monitor was manually disabled"),
Expand All @@ -162,9 +184,13 @@ handle_cast(_Request, State) ->

handle_info(try_enable, #state{retries = Retries} = State) ->
{noreply, enable(State#state{retries = Retries - 1})};

handle_info(update, State) ->
{noreply, start_timer(internal_update(State))};

handle_info({'EXIT', Port, Reason}, #state{port=Port}=State) ->
{stop, {port_died, Reason}, State#state{port=not_used}};

handle_info(Info, State) ->
rabbit_log:debug("~p unhandled msg: ~p", [?MODULE, Info]),
{noreply, State}.
Expand All @@ -176,9 +202,43 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%----------------------------------------------------------------------------
%% Server Internals
%% Internal functions
%%----------------------------------------------------------------------------

start_portprogram() ->
Args = ["-s", "rabbit_disk_monitor"],
Opts = [stderr_to_stdout, stream, {args, Args}],
erlang:open_port({spawn_executable, "/bin/sh"}, Opts).

run_port_cmd(Cmd0, Port) ->
%% Insert a carriage return, ^M or ASCII 13, after the command,
%% to indicate end of output
Cmd = io_lib:format("(~s\n) </dev/null; echo \"\^M\"\n", [Cmd0]),
Port ! {self(), {command, [Cmd, 10]}}, % The 10 at the end is a newline
get_reply(Port, []).

get_reply(Port, O) ->
receive
{Port, {data, N}} ->
case newline(N, O) of
{ok, Str} -> Str;
{more, Acc} -> get_reply(Port, Acc)
end;
{'EXIT', Port, Reason} ->
exit({port_died, Reason})
end.

% Character 13 is ^M or carriage return
newline([13|_], B) ->
{ok, lists:reverse(B)};
newline([H|T], B) ->
newline(T, [H|B]);
newline([], B) ->
{more, B}.

find_cmd(Cmd) ->
os:find_executable(Cmd).

safe_ets_lookup(Key, Default) ->
try
case ets:lookup(?ETS_NAME, Key) of
Expand Down Expand Up @@ -211,10 +271,12 @@ set_disk_limits(State, Limit0) ->
ets:insert(?ETS_NAME, {disk_free_limit, Limit}),
internal_update(State1).

internal_update(State = #state { limit = Limit,
dir = Dir,
alarmed = Alarmed}) ->
CurrentFree = get_disk_free(Dir),
internal_update(State = #state{limit = Limit,
dir = Dir,
alarmed = Alarmed,
os = OS,
port = Port}) ->
CurrentFree = get_disk_free(Dir, OS, Port),
NewAlarmed = CurrentFree < Limit,
case {Alarmed, NewAlarmed} of
{false, true} ->
Expand All @@ -229,17 +291,14 @@ internal_update(State = #state { limit = Limit,
ets:insert(?ETS_NAME, {disk_free, CurrentFree}),
State#state{alarmed = NewAlarmed, actual = CurrentFree}.

get_disk_free(Dir) ->
get_disk_free(Dir, os:type()).

get_disk_free(Dir, {unix, Sun})
get_disk_free(Dir, {unix, Sun}, Port)
when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris ->
Df = os:find_executable("df"),
parse_free_unix(run_cmd(Df ++ " -k " ++ Dir));
get_disk_free(Dir, {unix, _}) ->
Df = os:find_executable("df"),
parse_free_unix(run_cmd(Df ++ " -kP " ++ Dir));
get_disk_free(Dir, {win32, _}) ->
Df = find_cmd("df"),
parse_free_unix(run_port_cmd(Df ++ " -k " ++ Dir, Port));
get_disk_free(Dir, {unix, _}, Port) ->
Df = find_cmd("df"),
parse_free_unix(run_port_cmd(Df ++ " -kP " ++ Dir, Port));
get_disk_free(Dir, {win32, _}, not_used) ->
% Dir:
% "c:/Users/username/AppData/Roaming/RabbitMQ/db/rabbit2@username-z01-mnesia"
case win32_get_drive_letter(Dir) of
Expand Down Expand Up @@ -272,6 +331,8 @@ get_disk_free(Dir, {win32, _}) ->
list_to_integer(FreeBytesAvailableToCallerStr)
end.

parse_free_unix({error, Error}) ->
exit({unparseable, Error});
parse_free_unix(Str) ->
case string:tokens(Str, "\n") of
[_, S | _] -> case string:tokens(S, " \t") of
Expand Down Expand Up @@ -311,11 +372,16 @@ win32_get_disk_free_dir(Dir) ->
%% See the following page to learn more about this:
%% https://ss64.com/nt/syntax-filenames.html
RawDir = "\\\\?\\" ++ string:replace(Dir, "/", "\\", all),
CommandResult = run_cmd("dir /-C /W \"" ++ RawDir ++ "\""),
LastLine = lists:last(string:tokens(CommandResult, "\r\n")),
{match, [Free]} = re:run(lists:reverse(LastLine), "(\\d+)",
[{capture, all_but_first, list}]),
{ok, list_to_integer(lists:reverse(Free))}.
case run_os_cmd("dir /-C /W \"" ++ RawDir ++ "\"") of
{error, Error} ->
exit({unparseable, Error});
CommandResult ->
LastLine0 = lists:last(string:tokens(CommandResult, "\r\n")),
LastLine1 = lists:reverse(LastLine0),
{match, [Free]} = re:run(LastLine1, "(\\d+)",
[{capture, all_but_first, list}]),
{ok, list_to_integer(lists:reverse(Free))}
end.

interpret_limit({mem_relative, Relative})
when is_number(Relative) ->
Expand Down Expand Up @@ -350,22 +416,28 @@ interval(#state{limit = Limit,
trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))).

enable(#state{retries = 0} = State) ->
rabbit_log:error("Free disk space monitor failed to start!"),
State;
enable(#state{dir = Dir, interval = Interval, limit = Limit, retries = Retries} = State) ->
case {catch get_disk_free(Dir),
enable(#state{dir = Dir,
interval = Interval,
limit = Limit,
retries = Retries,
os = OS,
port = Port} = State) ->
case {catch get_disk_free(Dir, OS, Port),
vm_memory_monitor:get_total_memory()} of
{N1, N2} when is_integer(N1), is_integer(N2) ->
rabbit_log:info("Enabling free disk space monitoring", []),
start_timer(set_disk_limits(State, Limit));
Err ->
rabbit_log:info("Free disk space monitor encountered an error "
"(e.g. failed to parse output from OS tools): ~p, retries left: ~b",
rabbit_log:error("Free disk space monitor encountered an error "
"(e.g. failed to parse output from OS tools): ~p, retries left: ~b",
[Err, Retries]),
erlang:send_after(Interval, self(), try_enable),
State#state{enabled = false}
end.

run_cmd(Cmd) ->
run_os_cmd(Cmd) ->
Pid = self(),
Ref = make_ref(),
CmdFun = fun() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@
%% Copyright (c) 2011-2022 VMware, Inc. or its affiliates. All rights reserved.
%%

-module(unit_disk_monitor_mocks_SUITE).
-module(disk_monitor_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").

-compile(export_all).

-define(TIMEOUT, 30000).

all() ->
[
{group, sequential_tests}
Expand Down Expand Up @@ -63,50 +61,35 @@ end_per_testcase(Testcase, Config) ->
%% -------------------------------------------------------------------

disk_monitor(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, disk_monitor1, [Config]).
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE,
disk_monitor1, [Config]).

disk_monitor1(_Config) ->
%% Issue: rabbitmq-server #91
%% os module could be mocked using 'unstick', however it may have undesired
%% side effects in following tests. Thus, we mock at rabbit_misc level
ok = meck:new(rabbit_misc, [passthrough]),
ok = meck:expect(rabbit_misc, os_cmd, fun(_) -> "\n" end),
ok = rabbit_sup:stop_child(rabbit_disk_monitor_sup),
ok = rabbit_sup:start_delayed_restartable_child(rabbit_disk_monitor, [1000]),
meck:unload(rabbit_misc),
passed.
Value = rabbit_disk_monitor:get_disk_free(),
true = is_integer(Value) andalso Value >= 0,
ok.

disk_monitor_enable(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, disk_monitor_enable1, [Config]).
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE,
disk_monitor_enable1, [Config]).

disk_monitor_enable1(_Config) ->
ok = meck:new(rabbit_misc, [passthrough]),
ok = meck:expect(rabbit_misc, os_cmd, fun(_) -> "\n" end),
application:set_env(rabbit, disk_monitor_failure_retries, 20000),
application:set_env(rabbit, disk_monitor_failure_retry_interval, 100),
ok = rabbit_sup:stop_child(rabbit_disk_monitor_sup),
ok = rabbit_sup:start_delayed_restartable_child(rabbit_disk_monitor, [1000]),
unknown = rabbit_disk_monitor:get_disk_free(),
Cmd = case os:type() of
{win32, _} -> " Le volume dans le lecteur C n’a pas de nom.\n"
" Le numéro de série du volume est 707D-5BDC\n"
"\n"
" Répertoire de C:\Users\n"
"\n"
"10/12/2015 11:01 <DIR> .\n"
"10/12/2015 11:01 <DIR> ..\n"
" 0 fichier(s) 0 octets\n"
" 2 Rép(s) 758537121792 octets libres\n";
_ -> "Filesystem 1024-blocks Used Available Capacity iused ifree %iused Mounted on\n"
"/dev/disk1 975798272 234783364 740758908 25% 58759839 185189727 24% /\n"
end,
ok = meck:expect(rabbit_misc, os_cmd, fun(_) -> Cmd end),
timer:sleep(1000),
Bytes = 740758908 * 1024,
Bytes = rabbit_disk_monitor:get_disk_free(),
meck:unload(rabbit_misc),
ok = rabbit_sup:start_delayed_restartable_child(rabbit_disk_monitor, [250]),

Value0 = rabbit_disk_monitor:get_disk_free(),
true = (is_integer(Value0) andalso Value0 >= 0),
ok = timer:sleep(500),
Value1 = rabbit_disk_monitor:get_disk_free(),
true = (is_integer(Value1) andalso Value1 >= 0),

application:set_env(rabbit, disk_monitor_failure_retries, 10),
application:set_env(rabbit, disk_monitor_failure_retry_interval, 120000),
passed.
ok.
2 changes: 1 addition & 1 deletion deps/rabbit/test/unit_disk_monitor_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ set_disk_free_limit_command(Config) ->
set_disk_free_limit_command1(_Config) ->
F = fun () ->
DiskFree = rabbit_disk_monitor:get_disk_free(),
DiskFree =/= unknown
DiskFree =/= 'NaN'
end,
rabbit_ct_helpers:await_condition(F),

Expand Down