Skip to content

Commit

Permalink
Disk monitor strikes back!
Browse files Browse the repository at this point in the history
* Crash when a sub-command times out
* Use atom `NaN` when free space can not be determined

Fixes #5721

Use port to run /bin/sh on `unix` systems to then run `df` command

Update disk monitor tests to not use mocks because we no longer use rabbit_misc:os_cmd/1
  • Loading branch information
lukebakken committed Sep 8, 2022
1 parent 20216ef commit 7c6f4aa
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 63 deletions.
130 changes: 101 additions & 29 deletions deps/rabbit/src/rabbit_disk_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@
%% on start-up
retries,
%% Interval between retries
interval
interval,
%% Operating system in use
os,
%% Port running sh to execute df commands
port
}).

%%----------------------------------------------------------------------------
Expand Down Expand Up @@ -97,9 +101,9 @@ get_max_check_interval() ->
set_max_check_interval(Interval) ->
gen_server:call(?MODULE, {set_max_check_interval, Interval}).

-spec get_disk_free() -> (integer() | 'unknown').
-spec get_disk_free() -> (integer() | 'NaN').
get_disk_free() ->
safe_ets_lookup(disk_free, unknown).
safe_ets_lookup(disk_free, 'NaN').

-spec set_enabled(string()) -> 'ok'.
set_enabled(Enabled) ->
Expand All @@ -114,6 +118,9 @@ start_link(Args) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).

init([Limit]) ->
process_flag(trap_exit, true),
process_flag(priority, low),

Dir = dir(),
{ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries),
{ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval),
Expand All @@ -126,7 +133,21 @@ init([Limit]) ->
interval = Interval},
State1 = set_min_check_interval(?DEFAULT_MIN_DISK_CHECK_INTERVAL, State0),
State2 = set_max_check_interval(?DEFAULT_MAX_DISK_CHECK_INTERVAL, State1),
{ok, enable(State2)}.

OS = os:type(),
Port = case OS of
{unix, _} ->
start_portprogram();
{win32, _OSname} ->
not_used;
_ ->
exit({unsupported_os, OS})
end,
State3 = State2#state{port=Port, os=OS},

State4 = enable(State3),

{ok, State4}.

handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) ->
rabbit_log:info("Cannot set disk free limit: "
Expand All @@ -149,6 +170,7 @@ handle_call({set_enabled, _Enabled = true}, _From, State) ->
start_timer(set_disk_limits(State, State#state.limit)),
rabbit_log:info("Free disk space monitor was enabled"),
{reply, ok, State#state{enabled = true}};

handle_call({set_enabled, _Enabled = false}, _From, State) ->
erlang:cancel_timer(State#state.timer),
rabbit_log:info("Free disk space monitor was manually disabled"),
Expand All @@ -162,9 +184,13 @@ handle_cast(_Request, State) ->

handle_info(try_enable, #state{retries = Retries} = State) ->
{noreply, enable(State#state{retries = Retries - 1})};

handle_info(update, State) ->
{noreply, start_timer(internal_update(State))};

handle_info({'EXIT', Port, Reason}, #state{port=Port}=State) ->
{stop, {port_died, Reason}, State#state{port=not_used}};

handle_info(Info, State) ->
rabbit_log:debug("~p unhandled msg: ~p", [?MODULE, Info]),
{noreply, State}.
Expand All @@ -176,9 +202,43 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%----------------------------------------------------------------------------
%% Server Internals
%% Internal functions
%%----------------------------------------------------------------------------

start_portprogram() ->
Args = ["-s", "rabbit_disk_monitor"],
Opts = [stderr_to_stdout, stream, {args, Args}],
erlang:open_port({spawn_executable, "/bin/sh"}, Opts).

run_port_cmd(Cmd0, Port) ->
%% Insert a carriage return, ^M or ASCII 13, after the command,
%% to indicate end of output
Cmd = io_lib:format("(~s\n) </dev/null; echo \"\^M\"\n", [Cmd0]),
Port ! {self(), {command, [Cmd, 10]}}, % The 10 at the end is a newline
get_reply(Port, []).

get_reply(Port, O) ->
receive
{Port, {data, N}} ->
case newline(N, O) of
{ok, Str} -> Str;
{more, Acc} -> get_reply(Port, Acc)
end;
{'EXIT', Port, Reason} ->
exit({port_died, Reason})
end.

% Character 13 is ^M or carriage return
newline([13|_], B) ->
{ok, lists:reverse(B)};
newline([H|T], B) ->
newline(T, [H|B]);
newline([], B) ->
{more, B}.

find_cmd(Cmd) ->
os:find_executable(Cmd).

safe_ets_lookup(Key, Default) ->
try
case ets:lookup(?ETS_NAME, Key) of
Expand Down Expand Up @@ -211,10 +271,12 @@ set_disk_limits(State, Limit0) ->
ets:insert(?ETS_NAME, {disk_free_limit, Limit}),
internal_update(State1).

internal_update(State = #state { limit = Limit,
dir = Dir,
alarmed = Alarmed}) ->
CurrentFree = get_disk_free(Dir),
internal_update(State = #state{limit = Limit,
dir = Dir,
alarmed = Alarmed,
os = OS,
port = Port}) ->
CurrentFree = get_disk_free(Dir, OS, Port),
NewAlarmed = CurrentFree < Limit,
case {Alarmed, NewAlarmed} of
{false, true} ->
Expand All @@ -229,17 +291,14 @@ internal_update(State = #state { limit = Limit,
ets:insert(?ETS_NAME, {disk_free, CurrentFree}),
State#state{alarmed = NewAlarmed, actual = CurrentFree}.

get_disk_free(Dir) ->
get_disk_free(Dir, os:type()).

get_disk_free(Dir, {unix, Sun})
get_disk_free(Dir, {unix, Sun}, Port)
when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris ->
Df = os:find_executable("df"),
parse_free_unix(run_cmd(Df ++ " -k " ++ Dir));
get_disk_free(Dir, {unix, _}) ->
Df = os:find_executable("df"),
parse_free_unix(run_cmd(Df ++ " -kP " ++ Dir));
get_disk_free(Dir, {win32, _}) ->
Df = find_cmd("df"),
parse_free_unix(run_port_cmd(Df ++ " -k " ++ Dir, Port));
get_disk_free(Dir, {unix, _}, Port) ->
Df = find_cmd("df"),
parse_free_unix(run_port_cmd(Df ++ " -kP " ++ Dir, Port));
get_disk_free(Dir, {win32, _}, not_used) ->
% Dir:
% "c:/Users/username/AppData/Roaming/RabbitMQ/db/rabbit2@username-z01-mnesia"
case win32_get_drive_letter(Dir) of
Expand Down Expand Up @@ -272,6 +331,8 @@ get_disk_free(Dir, {win32, _}) ->
list_to_integer(FreeBytesAvailableToCallerStr)
end.

parse_free_unix({error, Error}) ->
exit({unparseable, Error});
parse_free_unix(Str) ->
case string:tokens(Str, "\n") of
[_, S | _] -> case string:tokens(S, " \t") of
Expand Down Expand Up @@ -311,11 +372,16 @@ win32_get_disk_free_dir(Dir) ->
%% See the following page to learn more about this:
%% https://ss64.com/nt/syntax-filenames.html
RawDir = "\\\\?\\" ++ string:replace(Dir, "/", "\\", all),
CommandResult = run_cmd("dir /-C /W \"" ++ RawDir ++ "\""),
LastLine = lists:last(string:tokens(CommandResult, "\r\n")),
{match, [Free]} = re:run(lists:reverse(LastLine), "(\\d+)",
[{capture, all_but_first, list}]),
{ok, list_to_integer(lists:reverse(Free))}.
case run_os_cmd("dir /-C /W \"" ++ RawDir ++ "\"") of
{error, Error} ->
exit({unparseable, Error});
CommandResult ->
LastLine0 = lists:last(string:tokens(CommandResult, "\r\n")),
LastLine1 = lists:reverse(LastLine0),
{match, [Free]} = re:run(LastLine1, "(\\d+)",
[{capture, all_but_first, list}]),
{ok, list_to_integer(lists:reverse(Free))}
end.

interpret_limit({mem_relative, Relative})
when is_number(Relative) ->
Expand Down Expand Up @@ -350,22 +416,28 @@ interval(#state{limit = Limit,
trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))).

enable(#state{retries = 0} = State) ->
rabbit_log:error("Free disk space monitor failed to start!"),
State;
enable(#state{dir = Dir, interval = Interval, limit = Limit, retries = Retries} = State) ->
case {catch get_disk_free(Dir),
enable(#state{dir = Dir,
interval = Interval,
limit = Limit,
retries = Retries,
os = OS,
port = Port} = State) ->
case {catch get_disk_free(Dir, OS, Port),
vm_memory_monitor:get_total_memory()} of
{N1, N2} when is_integer(N1), is_integer(N2) ->
rabbit_log:info("Enabling free disk space monitoring", []),
start_timer(set_disk_limits(State, Limit));
Err ->
rabbit_log:info("Free disk space monitor encountered an error "
"(e.g. failed to parse output from OS tools): ~p, retries left: ~b",
rabbit_log:error("Free disk space monitor encountered an error "
"(e.g. failed to parse output from OS tools): ~p, retries left: ~b",
[Err, Retries]),
erlang:send_after(Interval, self(), try_enable),
State#state{enabled = false}
end.

run_cmd(Cmd) ->
run_os_cmd(Cmd) ->
Pid = self(),
Ref = make_ref(),
CmdFun = fun() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@
%% Copyright (c) 2011-2022 VMware, Inc. or its affiliates. All rights reserved.
%%

-module(unit_disk_monitor_mocks_SUITE).
-module(disk_monitor_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").

-compile(export_all).

-define(TIMEOUT, 30000).

all() ->
[
{group, sequential_tests}
Expand Down Expand Up @@ -63,50 +61,35 @@ end_per_testcase(Testcase, Config) ->
%% -------------------------------------------------------------------

disk_monitor(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, disk_monitor1, [Config]).
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE,
disk_monitor1, [Config]).

disk_monitor1(_Config) ->
%% Issue: rabbitmq-server #91
%% os module could be mocked using 'unstick', however it may have undesired
%% side effects in following tests. Thus, we mock at rabbit_misc level
ok = meck:new(rabbit_misc, [passthrough]),
ok = meck:expect(rabbit_misc, os_cmd, fun(_) -> "\n" end),
ok = rabbit_sup:stop_child(rabbit_disk_monitor_sup),
ok = rabbit_sup:start_delayed_restartable_child(rabbit_disk_monitor, [1000]),
meck:unload(rabbit_misc),
passed.
Value = rabbit_disk_monitor:get_disk_free(),
true = is_integer(Value) andalso Value >= 0,
ok.

disk_monitor_enable(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, disk_monitor_enable1, [Config]).
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE,
disk_monitor_enable1, [Config]).

disk_monitor_enable1(_Config) ->
ok = meck:new(rabbit_misc, [passthrough]),
ok = meck:expect(rabbit_misc, os_cmd, fun(_) -> "\n" end),
application:set_env(rabbit, disk_monitor_failure_retries, 20000),
application:set_env(rabbit, disk_monitor_failure_retry_interval, 100),
ok = rabbit_sup:stop_child(rabbit_disk_monitor_sup),
ok = rabbit_sup:start_delayed_restartable_child(rabbit_disk_monitor, [1000]),
unknown = rabbit_disk_monitor:get_disk_free(),
Cmd = case os:type() of
{win32, _} -> " Le volume dans le lecteur C n’a pas de nom.\n"
" Le numéro de série du volume est 707D-5BDC\n"
"\n"
" Répertoire de C:\Users\n"
"\n"
"10/12/2015 11:01 <DIR> .\n"
"10/12/2015 11:01 <DIR> ..\n"
" 0 fichier(s) 0 octets\n"
" 2 Rép(s) 758537121792 octets libres\n";
_ -> "Filesystem 1024-blocks Used Available Capacity iused ifree %iused Mounted on\n"
"/dev/disk1 975798272 234783364 740758908 25% 58759839 185189727 24% /\n"
end,
ok = meck:expect(rabbit_misc, os_cmd, fun(_) -> Cmd end),
timer:sleep(1000),
Bytes = 740758908 * 1024,
Bytes = rabbit_disk_monitor:get_disk_free(),
meck:unload(rabbit_misc),

Value0 = rabbit_disk_monitor:get_disk_free(),
true = (is_integer(Value0) andalso Value0 >= 0),

Value1 = rabbit_disk_monitor:get_disk_free(),
true = (is_integer(Value1) andalso Value1 >= 0),

application:set_env(rabbit, disk_monitor_failure_retries, 10),
application:set_env(rabbit, disk_monitor_failure_retry_interval, 120000),
passed.
ok.
2 changes: 1 addition & 1 deletion deps/rabbit/test/unit_disk_monitor_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ set_disk_free_limit_command(Config) ->
set_disk_free_limit_command1(_Config) ->
F = fun () ->
DiskFree = rabbit_disk_monitor:get_disk_free(),
DiskFree =/= unknown
DiskFree =/= 'NaN'
end,
rabbit_ct_helpers:await_condition(F),

Expand Down

0 comments on commit 7c6f4aa

Please sign in to comment.