diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index dec75fea42ef..64ec732e9bc7 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -371,6 +371,11 @@ suites = [ name = "disconnect_detected_during_alarm_SUITE", size = "medium", ), + rabbitmq_integration_suite( + PACKAGE, + name = "disk_monitor_SUITE", + size = "medium", + ), rabbitmq_integration_suite( PACKAGE, name = "dynamic_ha_SUITE", @@ -869,11 +874,6 @@ suites = [ name = "unit_credit_flow_SUITE", size = "medium", ), - rabbitmq_integration_suite( - PACKAGE, - name = "unit_disk_monitor_mocks_SUITE", - size = "medium", - ), rabbitmq_integration_suite( PACKAGE, name = "unit_disk_monitor_SUITE", diff --git a/deps/rabbit/src/rabbit_disk_monitor.erl b/deps/rabbit/src/rabbit_disk_monitor.erl index 93016b112e6b..c7939d8a4994 100644 --- a/deps/rabbit/src/rabbit_disk_monitor.erl +++ b/deps/rabbit/src/rabbit_disk_monitor.erl @@ -62,7 +62,11 @@ %% on start-up retries, %% Interval between retries - interval + interval, + %% Operating system in use + os, + %% Port running sh to execute df commands + port }). %%---------------------------------------------------------------------------- @@ -97,9 +101,9 @@ get_max_check_interval() -> set_max_check_interval(Interval) -> gen_server:call(?MODULE, {set_max_check_interval, Interval}). --spec get_disk_free() -> (integer() | 'unknown'). +-spec get_disk_free() -> (integer() | 'NaN'). get_disk_free() -> - safe_ets_lookup(disk_free, unknown). + safe_ets_lookup(disk_free, 'NaN'). -spec set_enabled(string()) -> 'ok'. set_enabled(Enabled) -> @@ -114,6 +118,9 @@ start_link(Args) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). init([Limit]) -> + process_flag(trap_exit, true), + process_flag(priority, low), + Dir = dir(), {ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries), {ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval), @@ -126,7 +133,21 @@ init([Limit]) -> interval = Interval}, State1 = set_min_check_interval(?DEFAULT_MIN_DISK_CHECK_INTERVAL, State0), State2 = set_max_check_interval(?DEFAULT_MAX_DISK_CHECK_INTERVAL, State1), - {ok, enable(State2)}. + + OS = os:type(), + Port = case OS of + {unix, _} -> + start_portprogram(); + {win32, _OSname} -> + not_used; + _ -> + exit({unsupported_os, OS}) + end, + State3 = State2#state{port=Port, os=OS}, + + State4 = enable(State3), + + {ok, State4}. handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) -> rabbit_log:info("Cannot set disk free limit: " @@ -149,6 +170,7 @@ handle_call({set_enabled, _Enabled = true}, _From, State) -> start_timer(set_disk_limits(State, State#state.limit)), rabbit_log:info("Free disk space monitor was enabled"), {reply, ok, State#state{enabled = true}}; + handle_call({set_enabled, _Enabled = false}, _From, State) -> erlang:cancel_timer(State#state.timer), rabbit_log:info("Free disk space monitor was manually disabled"), @@ -162,10 +184,15 @@ handle_cast(_Request, State) -> handle_info(try_enable, #state{retries = Retries} = State) -> {noreply, enable(State#state{retries = Retries - 1})}; + handle_info(update, State) -> {noreply, start_timer(internal_update(State))}; -handle_info(_Info, State) -> +handle_info({'EXIT', Port, Reason}, #state{port=Port}=State) -> + {stop, {port_died, Reason}, State#state{port=not_used}}; + +handle_info(Info, State) -> + rabbit_log:debug("~p unhandled msg: ~p", [?MODULE, Info]), {noreply, State}. terminate(_Reason, _State) -> @@ -175,9 +202,43 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- -%% Server Internals +%% Internal functions %%---------------------------------------------------------------------------- +start_portprogram() -> + Args = ["-s", "rabbit_disk_monitor"], + Opts = [stderr_to_stdout, stream, {args, Args}], + erlang:open_port({spawn_executable, "/bin/sh"}, Opts). + +run_port_cmd(Cmd0, Port) -> + %% Insert a carriage return, ^M or ASCII 13, after the command, + %% to indicate end of output + Cmd = io_lib:format("(~s\n) + receive + {Port, {data, N}} -> + case newline(N, O) of + {ok, Str} -> Str; + {more, Acc} -> get_reply(Port, Acc) + end; + {'EXIT', Port, Reason} -> + exit({port_died, Reason}) + end. + +% Character 13 is ^M or carriage return +newline([13|_], B) -> + {ok, lists:reverse(B)}; +newline([H|T], B) -> + newline(T, [H|B]); +newline([], B) -> + {more, B}. + +find_cmd(Cmd) -> + os:find_executable(Cmd). + safe_ets_lookup(Key, Default) -> try case ets:lookup(?ETS_NAME, Key) of @@ -210,10 +271,12 @@ set_disk_limits(State, Limit0) -> ets:insert(?ETS_NAME, {disk_free_limit, Limit}), internal_update(State1). -internal_update(State = #state { limit = Limit, - dir = Dir, - alarmed = Alarmed}) -> - CurrentFree = get_disk_free(Dir), +internal_update(State = #state{limit = Limit, + dir = Dir, + alarmed = Alarmed, + os = OS, + port = Port}) -> + CurrentFree = get_disk_free(Dir, OS, Port), NewAlarmed = CurrentFree < Limit, case {Alarmed, NewAlarmed} of {false, true} -> @@ -228,17 +291,14 @@ internal_update(State = #state { limit = Limit, ets:insert(?ETS_NAME, {disk_free, CurrentFree}), State#state{alarmed = NewAlarmed, actual = CurrentFree}. -get_disk_free(Dir) -> - get_disk_free(Dir, os:type()). - -get_disk_free(Dir, {unix, Sun}) +get_disk_free(Dir, {unix, Sun}, Port) when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris -> - Df = os:find_executable("df"), - parse_free_unix(run_cmd(Df ++ " -k " ++ Dir)); -get_disk_free(Dir, {unix, _}) -> - Df = os:find_executable("df"), - parse_free_unix(run_cmd(Df ++ " -kP " ++ Dir)); -get_disk_free(Dir, {win32, _}) -> + Df = find_cmd("df"), + parse_free_unix(run_port_cmd(Df ++ " -k " ++ Dir, Port)); +get_disk_free(Dir, {unix, _}, Port) -> + Df = find_cmd("df"), + parse_free_unix(run_port_cmd(Df ++ " -kP " ++ Dir, Port)); +get_disk_free(Dir, {win32, _}, not_used) -> % Dir: % "c:/Users/username/AppData/Roaming/RabbitMQ/db/rabbit2@username-z01-mnesia" case win32_get_drive_letter(Dir) of @@ -258,6 +318,8 @@ get_disk_free(Dir, {win32, _}) -> end end. +parse_free_unix({error, Error}) -> + exit({unparseable, Error}); parse_free_unix(Str) -> case string:tokens(Str, "\n") of [_, S | _] -> case string:tokens(S, " \t") of @@ -279,7 +341,7 @@ win32_get_disk_free_pwsh(DriveLetter) when (DriveLetter >= $A andalso DriveLetter =< $Z) -> % DriveLetter $c PoshCmd = "powershell.exe -NoLogo -NoProfile -NonInteractive -Command (Get-PSDrive " ++ [DriveLetter] ++ ").Free", - case run_cmd(PoshCmd) of + case run_os_cmd(PoshCmd) of {error, timeout} -> error; PoshResult -> @@ -309,11 +371,16 @@ win32_get_disk_free_dir(Dir) -> %% See the following page to learn more about this: %% https://ss64.com/nt/syntax-filenames.html RawDir = "\\\\?\\" ++ string:replace(Dir, "/", "\\", all), - CommandResult = run_cmd("dir /-C /W \"" ++ RawDir ++ "\""), - LastLine = lists:last(string:tokens(CommandResult, "\r\n")), - {match, [Free]} = re:run(lists:reverse(LastLine), "(\\d+)", - [{capture, all_but_first, list}]), - {ok, list_to_integer(lists:reverse(Free))}. + case run_os_cmd("dir /-C /W \"" ++ RawDir ++ "\"") of + {error, Error} -> + exit({unparseable, Error}); + CommandResult -> + LastLine0 = lists:last(string:tokens(CommandResult, "\r\n")), + LastLine1 = lists:reverse(LastLine0), + {match, [Free]} = re:run(LastLine1, "(\\d+)", + [{capture, all_but_first, list}]), + {ok, list_to_integer(lists:reverse(Free))} + end. interpret_limit({mem_relative, Relative}) when is_number(Relative) -> @@ -348,23 +415,28 @@ interval(#state{limit = Limit, trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))). enable(#state{retries = 0} = State) -> + rabbit_log:error("Free disk space monitor failed to start!"), State; -enable(#state{dir = Dir, interval = Interval, limit = Limit, retries = Retries} - = State) -> - case {catch get_disk_free(Dir), +enable(#state{dir = Dir, + interval = Interval, + limit = Limit, + retries = Retries, + os = OS, + port = Port} = State) -> + case {catch get_disk_free(Dir, OS, Port), vm_memory_monitor:get_total_memory()} of {N1, N2} when is_integer(N1), is_integer(N2) -> rabbit_log:info("Enabling free disk space monitoring", []), start_timer(set_disk_limits(State, Limit)); Err -> - rabbit_log:info("Free disk space monitor encountered an error " - "(e.g. failed to parse output from OS tools): ~p, retries left: ~b", + rabbit_log:error("Free disk space monitor encountered an error " + "(e.g. failed to parse output from OS tools): ~p, retries left: ~b", [Err, Retries]), erlang:send_after(Interval, self(), try_enable), State#state{enabled = false} end. -run_cmd(Cmd) -> +run_os_cmd(Cmd) -> Pid = self(), Ref = make_ref(), CmdFun = fun() -> diff --git a/deps/rabbit/test/unit_disk_monitor_mocks_SUITE.erl b/deps/rabbit/test/disk_monitor_SUITE.erl similarity index 61% rename from deps/rabbit/test/unit_disk_monitor_mocks_SUITE.erl rename to deps/rabbit/test/disk_monitor_SUITE.erl index ae16cbb37937..3a10a745a1eb 100644 --- a/deps/rabbit/test/unit_disk_monitor_mocks_SUITE.erl +++ b/deps/rabbit/test/disk_monitor_SUITE.erl @@ -5,15 +5,13 @@ %% Copyright (c) 2011-2021 VMware, Inc. or its affiliates. All rights reserved. %% --module(unit_disk_monitor_mocks_SUITE). +-module(disk_monitor_SUITE). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -compile(export_all). --define(TIMEOUT, 30000). - all() -> [ {group, sequential_tests} @@ -63,50 +61,35 @@ end_per_testcase(Testcase, Config) -> %% ------------------------------------------------------------------- disk_monitor(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, disk_monitor1, [Config]). + ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, + disk_monitor1, [Config]). disk_monitor1(_Config) -> %% Issue: rabbitmq-server #91 %% os module could be mocked using 'unstick', however it may have undesired %% side effects in following tests. Thus, we mock at rabbit_misc level - ok = meck:new(rabbit_misc, [passthrough]), - ok = meck:expect(rabbit_misc, os_cmd, fun(_) -> "\n" end), ok = rabbit_sup:stop_child(rabbit_disk_monitor_sup), ok = rabbit_sup:start_delayed_restartable_child(rabbit_disk_monitor, [1000]), - meck:unload(rabbit_misc), - passed. + Value = rabbit_disk_monitor:get_disk_free(), + true = is_integer(Value) andalso Value >= 0, + ok. disk_monitor_enable(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, disk_monitor_enable1, [Config]). + ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, + disk_monitor_enable1, [Config]). disk_monitor_enable1(_Config) -> - ok = meck:new(rabbit_misc, [passthrough]), - ok = meck:expect(rabbit_misc, os_cmd, fun(_) -> "\n" end), application:set_env(rabbit, disk_monitor_failure_retries, 20000), application:set_env(rabbit, disk_monitor_failure_retry_interval, 100), ok = rabbit_sup:stop_child(rabbit_disk_monitor_sup), - ok = rabbit_sup:start_delayed_restartable_child(rabbit_disk_monitor, [1000]), - unknown = rabbit_disk_monitor:get_disk_free(), - Cmd = case os:type() of - {win32, _} -> " Le volume dans le lecteur C n’a pas de nom.\n" - " Le numéro de série du volume est 707D-5BDC\n" - "\n" - " Répertoire de C:\Users\n" - "\n" - "10/12/2015 11:01