diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 9e17257fd00a..a65e1e72f5e8 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -28,9 +28,6 @@ define PROJECT_ENV {heartbeat, 60}, {msg_store_file_size_limit, 16777216}, {msg_store_shutdown_timeout, 600000}, - {fhc_write_buffering, true}, - {fhc_read_buffering, false}, - {queue_index_max_journal_entries, 32768}, {queue_index_embed_msgs_below, 4096}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, @@ -269,7 +266,7 @@ PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channe PARALLEL_CT_SET_2_A = cluster confirms_rejects consumer_timeout rabbit_access_control rabbit_confirms rabbit_core_metrics_gc rabbit_cuttlefish rabbit_db_binding rabbit_db_exchange PARALLEL_CT_SET_2_B = crashing_queues deprecated_features direct_exchange_routing_v2 disconnect_detected_during_alarm exchanges unit_gen_server2 -PARALLEL_CT_SET_2_C = disk_monitor dynamic_qq unit_disk_monitor unit_file_handle_cache unit_log_management unit_operator_policy prevent_startup_if_node_was_reset +PARALLEL_CT_SET_2_C = disk_monitor dynamic_qq unit_disk_monitor unit_log_management unit_operator_policy prevent_startup_if_node_was_reset PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_reconciliation rabbit_fifo rabbit_fifo_dlx rabbit_stream_coordinator PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue diff --git a/deps/rabbit/src/code_server_cache.erl b/deps/rabbit/src/code_server_cache.erl deleted file mode 100644 index 3d3f750ad68e..000000000000 --- a/deps/rabbit/src/code_server_cache.erl +++ /dev/null @@ -1,83 +0,0 @@ -%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- -%% ex: ts=4 sw=4 et -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(code_server_cache). - --include_lib("kernel/include/logger.hrl"). - --behaviour(gen_server). - -%% API --export([start_link/0, - maybe_call_mfa/4]). - -%% gen_server callbacks --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). - --record(state, { - modules = #{} :: #{atom() => boolean()} -}). - -%% API -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -maybe_call_mfa(Module, Function, Args, Default) -> - gen_server:call(?MODULE, {maybe_call_mfa, {Module, Function, Args, Default}}). - -%% gen_server callbacks - -init([]) -> - {ok, #state{}}. - -handle_call({maybe_call_mfa, {Mod, _F, _A, _D} = MFA}, _From, #state{modules = ModuleMap} = State0) -> - Value = maps:get(Mod, ModuleMap, true), - {ok, Reply, State1} = handle_maybe_call_mfa(Value, MFA, State0), - {reply, Reply, State1}; -handle_call(_Request, _From, State) -> - {reply, ignored, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%% Internal functions - -handle_maybe_call_mfa(false, {_M, _F, _A, Default}, State) -> - {ok, Default, State}; -handle_maybe_call_mfa(true, {Module, Function, Args, Default}, State) -> - try - Reply = erlang:apply(Module, Function, Args), - {ok, Reply, State} - catch - error:undef -> - handle_maybe_call_mfa_error(Module, Default, State); - Err:Reason -> - ?LOG_ERROR("Calling ~tp:~tp failed: ~tp:~tp", - [Module, Function, Err, Reason]), - handle_maybe_call_mfa_error(Module, Default, State) - end. - -handle_maybe_call_mfa_error(Module, Default, #state{modules = ModuleMap0} = State0) -> - ModuleMap1 = maps:put(Module, false, ModuleMap0), - State1 = State0#state{modules = ModuleMap1}, - {ok, Default, State1}. diff --git a/deps/rabbit/src/file_handle_cache.erl b/deps/rabbit/src/file_handle_cache.erl deleted file mode 100644 index 46ed39cbc4d2..000000000000 --- a/deps/rabbit/src/file_handle_cache.erl +++ /dev/null @@ -1,1615 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(file_handle_cache). - --include_lib("kernel/include/logger.hrl"). - -%% A File Handle Cache -%% -%% This extends a subset of the functionality of the Erlang file -%% module. In the below, we use "file handle" to specifically refer to -%% file handles, and "file descriptor" to refer to descriptors which -%% are not file handles, e.g. sockets. -%% -%% Some constraints -%% 1) This supports one writer, multiple readers per file. Nothing -%% else. -%% 2) Do not open the same file from different processes. Bad things -%% may happen, especially for writes. -%% 3) Writes are all appends. You cannot write to the middle of a -%% file, although you can truncate and then append if you want. -%% 4) There are read and write buffers. Feel free to use the read_ahead -%% mode, but beware of the interaction between that buffer and the write -%% buffer. -%% -%% Some benefits -%% 1) You do not have to remember to call sync before close -%% 2) Buffering is much more flexible than with the plain file module, -%% and you can control when the buffer gets flushed out. This means -%% that you can rely on reads-after-writes working, without having to -%% call the expensive sync. -%% 3) Unnecessary calls to position and sync get optimised out. -%% 4) You can find out what your 'real' offset is, and what your -%% 'virtual' offset is (i.e. where the hdl really is, and where it -%% would be after the write buffer is written out). -%% -%% There is also a server component which serves to limit the number -%% of open file descriptors. This is a hard limit: the server -%% component will ensure that clients do not have more file -%% descriptors open than it's configured to allow. -%% -%% On open, the client requests permission from the server to open the -%% required number of file handles. The server may ask the client to -%% close other file handles that it has open, or it may queue the -%% request and ask other clients to close file handles they have open -%% in order to satisfy the request. Requests are always satisfied in -%% the order they arrive, even if a latter request (for a small number -%% of file handles) can be satisfied before an earlier request (for a -%% larger number of file handles). On close, the client sends a -%% message to the server. These messages allow the server to keep -%% track of the number of open handles. The client also keeps a -%% gb_tree which is updated on every use of a file handle, mapping the -%% time at which the file handle was last used (timestamp) to the -%% handle. Thus the smallest key in this tree maps to the file handle -%% that has not been used for the longest amount of time. This -%% smallest key is included in the messages to the server. As such, -%% the server keeps track of when the least recently used file handle -%% was used *at the point of the most recent open or close* by each -%% client. -%% -%% Note that this data can go very out of date, by the client using -%% the least recently used handle. -%% -%% When the limit is exceeded (i.e. the number of open file handles is -%% at the limit and there are pending 'open' requests), the server -%% calculates the average age of the last reported least recently used -%% file handle of all the clients. It then tells all the clients to -%% close any handles not used for longer than this average, by -%% invoking the callback the client registered. The client should -%% receive this message and pass it into -%% set_maximum_since_use/1. However, it is highly possible this age -%% will be greater than the ages of all the handles the client knows -%% of because the client has used its file handles in the mean -%% time. Thus at this point the client reports to the server the -%% current timestamp at which its least recently used file handle was -%% last used. The server will check two seconds later that either it -%% is back under the limit, in which case all is well again, or if -%% not, it will calculate a new average age. Its data will be much -%% more recent now, and so it is very likely that when this is -%% communicated to the clients, the clients will close file handles. -%% (In extreme cases, where it's very likely that all clients have -%% used their open handles since they last sent in an update, which -%% would mean that the average will never cause any file handles to -%% be closed, the server can send out an average age of 0, resulting -%% in all available clients closing all their file handles.) -%% -%% Care is taken to ensure that (a) processes which are blocked -%% waiting for file descriptors to become available are not sent -%% requests to close file handles; and (b) given it is known how many -%% file handles a process has open, when the average age is forced to -%% 0, close messages are only sent to enough processes to release the -%% correct number of file handles and the list of processes is -%% randomly shuffled. This ensures we don't cause processes to -%% needlessly close file handles, and ensures that we don't always -%% make such requests of the same processes. -%% -%% The advantage of this scheme is that there is only communication -%% from the client to the server on open, close, and when in the -%% process of trying to reduce file handle usage. There is no -%% communication from the client to the server on normal file handle -%% operations. This scheme forms a feed-back loop - the server does -%% not care which file handles are closed, just that some are, and it -%% checks this repeatedly when over the limit. -%% -%% Handles which are closed as a result of the server are put into a -%% "soft-closed" state in which the handle is closed (data flushed out -%% and sync'd first) but the state is maintained. The handle will be -%% fully reopened again as soon as needed, thus users of this library -%% do not need to worry about their handles being closed by the server -%% - reopening them when necessary is handled transparently. -%% -%% The server also supports obtain, release and transfer. obtain/{0,1} -%% blocks until a file descriptor is available, at which point the -%% requesting process is considered to 'own' more descriptor(s). -%% release/{0,1} is the inverse operation and releases previously obtained -%% descriptor(s). transfer/{1,2} transfers ownership of file descriptor(s) -%% between processes. It is non-blocking. Obtain has a -%% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use -%% the entire limit, but will be evicted by obtain calls up to the -%% point at which no more obtain calls can be satisfied by the obtains -%% limit. Thus there will always be some capacity available for file -%% handles. Processes that use obtain are never asked to return them, -%% and they are not managed in any way by the server. It is simply a -%% mechanism to ensure that processes that need file descriptors such -%% as sockets can do so in such a way that the overall number of open -%% file descriptors is managed. -%% -%% The callers of register_callback/3, obtain, and the argument of -%% transfer are monitored, reducing the count of handles in use -%% appropriately when the processes terminate. - --behaviour(gen_server2). - --export([register_callback/3]). --export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2, - truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1, - copy/3, set_maximum_since_use/1, delete/1, clear/1, - open_with_absolute_path/3]). --export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2, - set_limit/1, get_limit/0, info_keys/0, with_handle/1, with_handle/2, - info/0, info/1, clear_read_cache/0, clear_process_read_cache/0]). --export([set_reservation/0, set_reservation/1, release_reservation/0]). --export([ulimit/0]). - --export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2, code_change/3, prioritise_cast/3]). - --export([clear_metrics_of/1, list_elders/0, list_clients/0, get_client_state/1]). - --define(SERVER, ?MODULE). -%% Reserve 3 handles for ra usage: wal, segment writer and a dets table --define(RESERVED_FOR_OTHERS, 100 + 3). - --define(FILE_HANDLES_LIMIT_OTHER, 1024). --define(FILE_HANDLES_CHECK_INTERVAL, 2000). - --define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)). --define(CLIENT_ETS_TABLE, file_handle_cache_client). --define(ELDERS_ETS_TABLE, file_handle_cache_elders). - --import(rabbit_misc, [safe_ets_update_counter/3, safe_ets_update_counter/4, - safe_ets_update_element/3, safe_ets_update_element/4]). - -%%---------------------------------------------------------------------------- - --record(file, - { reader_count, - has_writer - }). - --record(handle, - { hdl, - ref, - offset, - is_dirty, - write_buffer_size, - write_buffer_size_limit, - write_buffer, - read_buffer, - read_buffer_pos, - read_buffer_rem, %% Num of bytes from pos to end - read_buffer_size, %% Next size of read buffer to use - read_buffer_size_limit, %% Max size of read buffer to use - read_buffer_usage, %% Bytes we have read from it, for tuning - at_eof, - path, - mode, - options, - is_write, - is_read, - last_used_at - }). - --record(fhc_state, - { elders, - limit, - open_count, - open_pending, - obtain_limit, %%socket - obtain_count_socket, - obtain_count_file, - obtain_pending_socket, - obtain_pending_file, - clients, - timer_ref, - alarm_set, - alarm_clear, - reserve_count_socket, - reserve_count_file - }). - --record(cstate, - { pid, - callback, - opened, - obtained_socket, - obtained_file, - blocked, - pending_closes, - reserved_socket, - reserved_file - }). - --record(pending, - { kind, - pid, - requested, - from - }). - -%%---------------------------------------------------------------------------- -%% Specs -%%---------------------------------------------------------------------------- - --type ref() :: any(). --type ok_or_error() :: 'ok' | {'error', any()}. --type val_or_error(T) :: {'ok', T} | {'error', any()}. --type position() :: ('bof' | 'eof' | non_neg_integer() | - {('bof' |'eof'), non_neg_integer()} | - {'cur', integer()}). --type offset() :: non_neg_integer(). - --spec register_callback(atom(), atom(), [any()]) -> 'ok'. --spec open - (file:filename(), [any()], - [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')} | - {'read_buffer', (non_neg_integer() | 'unbuffered')}]) -> - val_or_error(ref()). --spec open_with_absolute_path - (file:filename(), [any()], - [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')} | - {'read_buffer', (non_neg_integer() | 'unbuffered')}]) -> - val_or_error(ref()). --spec close(ref()) -> ok_or_error(). --spec read - (ref(), non_neg_integer()) -> val_or_error([char()] | binary()) | 'eof'. --spec append(ref(), iodata()) -> ok_or_error(). --spec sync(ref()) -> ok_or_error(). --spec position(ref(), position()) -> val_or_error(offset()). --spec truncate(ref()) -> ok_or_error(). --spec current_virtual_offset(ref()) -> val_or_error(offset()). --spec current_raw_offset(ref()) -> val_or_error(offset()). --spec flush(ref()) -> ok_or_error(). --spec copy(ref(), ref(), non_neg_integer()) -> val_or_error(non_neg_integer()). --spec delete(ref()) -> ok_or_error(). --spec clear(ref()) -> ok_or_error(). --spec set_maximum_since_use(non_neg_integer()) -> 'ok'. --spec obtain() -> 'ok'. --spec obtain(non_neg_integer()) -> 'ok'. --spec release() -> 'ok'. --spec release(non_neg_integer()) -> 'ok'. --spec transfer(pid()) -> 'ok'. --spec transfer(pid(), non_neg_integer()) -> 'ok'. --spec with_handle(fun(() -> A)) -> A. --spec with_handle(non_neg_integer(), fun(() -> A)) -> A. --spec set_limit(non_neg_integer()) -> 'ok'. --spec get_limit() -> non_neg_integer(). --spec info_keys() -> rabbit_types:info_keys(). --spec info() -> rabbit_types:infos(). --spec info([atom()]) -> rabbit_types:infos(). --spec ulimit() -> 'unknown' | non_neg_integer(). - -%%---------------------------------------------------------------------------- --define(INFO_KEYS, [total_limit, total_used, sockets_limit, sockets_used]). - -%%---------------------------------------------------------------------------- -%% Public API -%%---------------------------------------------------------------------------- - -start_link() -> - start_link(fun alarm_handler:set_alarm/1, fun alarm_handler:clear_alarm/1). - -start_link(AlarmSet, AlarmClear) -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [AlarmSet, AlarmClear], - [{timeout, infinity}]). - -register_callback(M, F, A) - when is_atom(M) andalso is_atom(F) andalso is_list(A) -> - gen_server2:cast(?SERVER, {register_callback, self(), {M, F, A}}). - -open(Path, Mode, Options) -> - open_with_absolute_path(filename:absname(Path), Mode, Options). - -open_with_absolute_path(Path, Mode, Options) -> - File1 = #file { reader_count = RCount, has_writer = HasWriter } = - case get({Path, fhc_file}) of - File = #file {} -> File; - undefined -> #file { reader_count = 0, - has_writer = false } - end, - Mode1 = append_to_write(Mode), - IsWriter = is_writer(Mode1), - case IsWriter andalso HasWriter of - true -> {error, writer_exists}; - false -> {ok, Ref} = new_closed_handle(Path, Mode1, Options), - case get_or_reopen_timed([{Ref, new}]) of - {ok, [_Handle1]} -> - RCount1 = case is_reader(Mode1) of - true -> RCount + 1; - false -> RCount - end, - HasWriter1 = HasWriter orelse IsWriter, - put({Path, fhc_file}, - File1 #file { reader_count = RCount1, - has_writer = HasWriter1 }), - {ok, Ref}; - Error -> - erase({Ref, fhc_handle}), - Error - end - end. - -close(Ref) -> - case erase({Ref, fhc_handle}) of - undefined -> ok; - Handle -> case hard_close(Handle) of - ok -> ok; - {Error, Handle1} -> put_handle(Ref, Handle1), - Error - end - end. - -read(Ref, Count) -> - with_flushed_handles( - [Ref], keep, - fun ([#handle { is_read = false }]) -> - {error, not_open_for_reading}; - ([#handle{read_buffer_size_limit = 0, - hdl = Hdl, offset = Offset} = Handle]) -> - %% The read buffer is disabled. This is just an - %% optimization: the clauses below can handle this case. - case prim_file_read(Hdl, Count) of - {ok, Data} -> {{ok, Data}, - [Handle#handle{offset = Offset+size(Data)}]}; - eof -> {eof, [Handle #handle { at_eof = true }]}; - Error -> {Error, Handle} - end; - ([Handle = #handle{read_buffer = Buf, - read_buffer_pos = BufPos, - read_buffer_rem = BufRem, - read_buffer_usage = BufUsg, - offset = Offset}]) - when BufRem >= Count -> - <<_:BufPos/binary, Res:Count/binary, _/binary>> = Buf, - {{ok, Res}, [Handle#handle{offset = Offset + Count, - read_buffer_pos = BufPos + Count, - read_buffer_rem = BufRem - Count, - read_buffer_usage = BufUsg + Count }]}; - ([Handle0]) -> - maybe_reduce_read_cache([Ref]), - Handle = #handle{read_buffer = Buf, - read_buffer_pos = BufPos, - read_buffer_rem = BufRem, - read_buffer_size = BufSz, - hdl = Hdl, - offset = Offset} - = tune_read_buffer_limit(Handle0, Count), - WantedCount = Count - BufRem, - case prim_file_read(Hdl, max(BufSz, WantedCount)) of - {ok, Data} -> - <<_:BufPos/binary, BufTl/binary>> = Buf, - ReadCount = size(Data), - case ReadCount < WantedCount of - true -> - OffSet1 = Offset + BufRem + ReadCount, - {{ok, <>}, - [reset_read_buffer( - Handle#handle{offset = OffSet1})]}; - false -> - <> = Data, - OffSet1 = Offset + BufRem + WantedCount, - BufRem1 = ReadCount - WantedCount, - {{ok, <>}, - [Handle#handle{offset = OffSet1, - read_buffer = Data, - read_buffer_pos = WantedCount, - read_buffer_rem = BufRem1, - read_buffer_usage = WantedCount}]} - end; - eof -> - {eof, [Handle #handle { at_eof = true }]}; - Error -> - {Error, [reset_read_buffer(Handle)]} - end - end). - -append(Ref, Data) -> - with_handles( - [Ref], - fun ([#handle { is_write = false }]) -> - {error, not_open_for_writing}; - ([Handle]) -> - case maybe_seek(eof, Handle) of - {{ok, _Offset}, #handle { hdl = Hdl, offset = Offset, - write_buffer_size_limit = 0, - at_eof = true } = Handle1} -> - Offset1 = Offset + iolist_size(Data), - {prim_file_write(Hdl, Data), - [Handle1 #handle { is_dirty = true, offset = Offset1 }]}; - {{ok, _Offset}, #handle { write_buffer = WriteBuffer, - write_buffer_size = Size, - write_buffer_size_limit = Limit, - at_eof = true } = Handle1} -> - WriteBuffer1 = [Data | WriteBuffer], - Size1 = Size + iolist_size(Data), - Handle2 = Handle1 #handle { write_buffer = WriteBuffer1, - write_buffer_size = Size1 }, - case Limit =/= infinity andalso Size1 > Limit of - true -> {Result, Handle3} = write_buffer(Handle2), - {Result, [Handle3]}; - false -> {ok, [Handle2]} - end; - {{error, _} = Error, Handle1} -> - {Error, [Handle1]} - end - end). - -sync(Ref) -> - with_flushed_handles( - [Ref], keep, - fun ([#handle { is_dirty = false, write_buffer = [] }]) -> - ok; - ([Handle = #handle { hdl = Hdl, - is_dirty = true, write_buffer = [] }]) -> - case prim_file_sync(Hdl) of - ok -> {ok, [Handle #handle { is_dirty = false }]}; - Error -> {Error, [Handle]} - end - end). - -needs_sync(Ref) -> - %% This must *not* use with_handles/2; see bug 25052 - case get({Ref, fhc_handle}) of - #handle { is_dirty = false, write_buffer = [] } -> false; - #handle {} -> true - end. - -position(Ref, NewOffset) -> - with_flushed_handles( - [Ref], keep, - fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle), - {Result, [Handle1]} - end). - -truncate(Ref) -> - with_flushed_handles( - [Ref], - fun ([Handle1 = #handle { hdl = Hdl }]) -> - case prim_file:truncate(Hdl) of - ok -> {ok, [Handle1 #handle { at_eof = true }]}; - Error -> {Error, [Handle1]} - end - end). - -current_virtual_offset(Ref) -> - with_handles([Ref], fun ([#handle { at_eof = true, is_write = true, - offset = Offset, - write_buffer_size = Size }]) -> - {ok, Offset + Size}; - ([#handle { offset = Offset }]) -> - {ok, Offset} - end). - -current_raw_offset(Ref) -> - with_handles([Ref], fun ([Handle]) -> {ok, Handle #handle.offset} end). - -flush(Ref) -> - with_flushed_handles([Ref], fun ([Handle]) -> {ok, [Handle]} end). - -copy(Src, Dest, Count) -> - with_flushed_handles( - [Src, Dest], - fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset }, - DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }] - ) -> - case prim_file:copy(SHdl, DHdl, Count) of - {ok, Count1} = Result1 -> - {Result1, - [SHandle #handle { offset = SOffset + Count1 }, - DHandle #handle { offset = DOffset + Count1, - is_dirty = true }]}; - Error -> - {Error, [SHandle, DHandle]} - end; - (_Handles) -> - {error, incorrect_handle_modes} - end). - -delete(Ref) -> - case erase({Ref, fhc_handle}) of - undefined -> - ok; - Handle = #handle { path = Path } -> - case hard_close(Handle #handle { is_dirty = false, - write_buffer = [] }) of - ok -> prim_file:delete(Path); - {Error, Handle1} -> put_handle(Ref, Handle1), - Error - end - end. - -clear(Ref) -> - with_handles( - [Ref], - fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) -> - ok; - ([Handle]) -> - case maybe_seek(bof, Handle#handle{write_buffer = [], - write_buffer_size = 0}) of - {{ok, 0}, Handle1 = #handle { hdl = Hdl }} -> - case prim_file:truncate(Hdl) of - ok -> {ok, [Handle1 #handle { at_eof = true }]}; - Error -> {Error, [Handle1]} - end; - {{error, _} = Error, Handle1} -> - {Error, [Handle1]} - end - end). - -set_maximum_since_use(MaximumAge) -> - Now = erlang:monotonic_time(), - case lists:foldl( - fun ({{Ref, fhc_handle}, - Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) -> - case Hdl =/= closed andalso - erlang:convert_time_unit(Now - Then, - native, - micro_seconds) - >= MaximumAge of - true -> soft_close(Ref, Handle) orelse Rep; - false -> Rep - end; - (_KeyValuePair, Rep) -> - Rep - end, false, get()) of - false -> age_tree_change(), ok; - true -> ok - end. - -obtain() -> obtain(1). -set_reservation() -> set_reservation(1). -release() -> release(1). -release_reservation() -> release_reservation(file). -%% @todo This isn't used. -transfer(Pid) -> transfer(Pid, 1). - -obtain(Count) -> obtain(Count, socket). -set_reservation(Count) -> set_reservation(Count, file). -release(Count) -> release(Count, socket). - -with_handle(Fun) -> - with_handle(1, Fun). - -with_handle(N, Fun) -> - ok = obtain(N, file), - try Fun() - after ok = release(N, file) - end. - -obtain(Count, Type) when Count > 0 -> - %% If the FHC isn't running, obtains succeed immediately. - case whereis(?SERVER) of - undefined -> ok; - _ -> gen_server2:call( - ?SERVER, {obtain, Count, Type, self()}, infinity) - end. - -set_reservation(Count, Type) when Count > 0 -> - %% If the FHC isn't running, reserve succeed immediately. - case whereis(?SERVER) of - undefined -> ok; - _ -> gen_server2:cast(?SERVER, {set_reservation, Count, Type, self()}) - end. - -release(Count, Type) when Count > 0 -> - gen_server2:cast(?SERVER, {release, Count, Type, self()}). - -release_reservation(Type) -> - gen_server2:cast(?SERVER, {release_reservation, Type, self()}). - -transfer(Pid, Count) when Count > 0 -> - gen_server2:cast(?SERVER, {transfer, Count, self(), Pid}). - -set_limit(Limit) -> - gen_server2:call(?SERVER, {set_limit, Limit}, infinity). - -get_limit() -> - gen_server2:call(?SERVER, get_limit, infinity). - -info_keys() -> ?INFO_KEYS. - -info() -> info(?INFO_KEYS). -info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity). - -clear_read_cache() -> - gen_server2:cast(?SERVER, clear_read_cache). - -clear_process_read_cache() -> - [ - begin - Handle1 = reset_read_buffer(Handle), - put({Ref, fhc_handle}, Handle1) - end || - {{Ref, fhc_handle}, Handle} <- get(), - size(Handle#handle.read_buffer) > 0 - ]. - -%% Only used for testing -clear_metrics_of(Pid) -> - case whereis(?SERVER) of - undefined -> ok; - _ -> gen_server2:cast(?SERVER, {clear_metrics_of, Pid}) - end. - -%% Only used for testing -list_elders() -> - case whereis(?SERVER) of - undefined -> ok; - _ -> gen_server2:call(?SERVER, list_elders) - end. - -%% Only used for testing -list_clients() -> - case whereis(?SERVER) of - undefined -> ok; - _ -> gen_server2:call(?SERVER, list_clients) - end. - -%% Only used for testing -get_client_state(Pid) -> - case whereis(?SERVER) of - undefined -> ok; - _ -> gen_server2:call(?SERVER, {get_client_state, Pid}) - end. - -%%---------------------------------------------------------------------------- -%% Internal functions -%%---------------------------------------------------------------------------- - -prim_file_read(Hdl, Size) -> - prim_file:read(Hdl, Size). - -prim_file_write(Hdl, Bytes) -> - prim_file:write(Hdl, Bytes). - -prim_file_sync(Hdl) -> - prim_file:sync(Hdl). - -prim_file_position(Hdl, NewOffset) -> - prim_file:position(Hdl, NewOffset). - -is_reader(Mode) -> lists:member(read, Mode). - -is_writer(Mode) -> lists:member(write, Mode). - -append_to_write(Mode) -> - case lists:member(append, Mode) of - true -> [write | Mode -- [append, write]]; - false -> Mode - end. - -with_handles(Refs, Fun) -> - with_handles(Refs, reset, Fun). - -with_handles(Refs, ReadBuffer, Fun) -> - case get_or_reopen_timed([{Ref, reopen} || Ref <- Refs]) of - {ok, Handles0} -> - Handles = case ReadBuffer of - reset -> [reset_read_buffer(H) || H <- Handles0]; - keep -> Handles0 - end, - case Fun(Handles) of - {Result, Handles1} when is_list(Handles1) -> - _ = lists:zipwith(fun put_handle/2, Refs, Handles1), - Result; - Result -> - Result - end; - Error -> - Error - end. - -with_flushed_handles(Refs, Fun) -> - with_flushed_handles(Refs, reset, Fun). - -with_flushed_handles(Refs, ReadBuffer, Fun) -> - with_handles( - Refs, ReadBuffer, - fun (Handles) -> - case lists:foldl( - fun (Handle, {ok, HandlesAcc}) -> - {Res, Handle1} = write_buffer(Handle), - {Res, [Handle1 | HandlesAcc]}; - (Handle, {Error, HandlesAcc}) -> - {Error, [Handle | HandlesAcc]} - end, {ok, []}, Handles) of - {ok, Handles1} -> - Fun(lists:reverse(Handles1)); - {Error, Handles1} -> - {Error, lists:reverse(Handles1)} - end - end). - -get_or_reopen_timed(RefNewOrReopens) -> - get_or_reopen(RefNewOrReopens). - -get_or_reopen(RefNewOrReopens) -> - case partition_handles(RefNewOrReopens) of - {OpenHdls, []} -> - {ok, [Handle || {_Ref, Handle} <- OpenHdls]}; - {OpenHdls, ClosedHdls} -> - Oldest = oldest(get_age_tree(), - fun () -> erlang:monotonic_time() end), - case gen_server2:call(?SERVER, {open, self(), length(ClosedHdls), - Oldest}, infinity) of - ok -> - case reopen(ClosedHdls) of - {ok, RefHdls} -> sort_handles(RefNewOrReopens, - OpenHdls, RefHdls, []); - Error -> Error - end; - close -> - [soft_close(Ref, Handle) || - {{Ref, fhc_handle}, Handle = #handle { hdl = Hdl }} <- - get(), - Hdl =/= closed], - get_or_reopen(RefNewOrReopens) - end - end. - -reopen(ClosedHdls) -> reopen(ClosedHdls, get_age_tree(), []). - -reopen([], Tree, RefHdls) -> - put_age_tree(Tree), - {ok, lists:reverse(RefHdls)}; -reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, - path = Path, - mode = Mode0, - offset = Offset, - last_used_at = undefined }} | - RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) -> - Mode = case NewOrReopen of - new -> Mode0; - reopen -> [read | Mode0] - end, - case prim_file:open(Path, Mode) of - {ok, Hdl} -> - Now = erlang:monotonic_time(), - {{ok, _Offset}, Handle1} = - maybe_seek(Offset, reset_read_buffer( - Handle#handle{hdl = Hdl, - offset = 0, - last_used_at = Now})), - put({Ref, fhc_handle}, Handle1), - reopen(RefNewOrReopenHdls, gb_trees:insert({Now, Ref}, true, Tree), - [{Ref, Handle1} | RefHdls]); - Error -> - %% NB: none of the handles in ToOpen are in the age tree - Oldest = oldest(Tree, fun () -> undefined end), - [gen_server2:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen], - put_age_tree(Tree), - Error - end. - -partition_handles(RefNewOrReopens) -> - lists:foldr( - fun ({Ref, NewOrReopen}, {Open, Closed}) -> - case get({Ref, fhc_handle}) of - #handle { hdl = closed } = Handle -> - {Open, [{Ref, NewOrReopen, Handle} | Closed]}; - #handle {} = Handle -> - {[{Ref, Handle} | Open], Closed} - end - end, {[], []}, RefNewOrReopens). - -sort_handles([], [], [], Acc) -> - {ok, lists:reverse(Acc)}; -sort_handles([{Ref, _} | RefHdls], [{Ref, Handle} | RefHdlsA], RefHdlsB, Acc) -> - sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]); -sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) -> - sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]). - -put_handle(Ref, Handle = #handle { last_used_at = Then }) -> - Now = erlang:monotonic_time(), - age_tree_update(Then, Now, Ref), - put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }). - -with_age_tree(Fun) -> put_age_tree(Fun(get_age_tree())). - -get_age_tree() -> - case get(fhc_age_tree) of - undefined -> gb_trees:empty(); - AgeTree -> AgeTree - end. - -put_age_tree(Tree) -> put(fhc_age_tree, Tree). - -age_tree_update(Then, Now, Ref) -> - with_age_tree( - fun (Tree) -> - gb_trees:insert({Now, Ref}, true, - gb_trees:delete_any({Then, Ref}, Tree)) - end). - -age_tree_delete(Then, Ref) -> - with_age_tree( - fun (Tree) -> - Tree1 = gb_trees:delete_any({Then, Ref}, Tree), - Oldest = oldest(Tree1, fun () -> undefined end), - gen_server2:cast(?SERVER, {close, self(), Oldest}), - Tree1 - end). - -age_tree_change() -> - with_age_tree( - fun (Tree) -> - case gb_trees:is_empty(Tree) of - true -> Tree; - false -> {{Oldest, _Ref}, _} = gb_trees:smallest(Tree), - gen_server2:cast(?SERVER, {update, self(), Oldest}), - Tree - end - end). - -oldest(Tree, DefaultFun) -> - case gb_trees:is_empty(Tree) of - true -> DefaultFun(); - false -> {{Oldest, _Ref}, _} = gb_trees:smallest(Tree), - Oldest - end. - -new_closed_handle(Path, Mode, Options) -> - WriteBufferSize = - case application:get_env(rabbit, fhc_write_buffering) of - {ok, false} -> 0; - {ok, true} -> - case proplists:get_value(write_buffer, Options, unbuffered) of - unbuffered -> 0; - infinity -> infinity; - N when is_integer(N) -> N - end - end, - ReadBufferSize = - case application:get_env(rabbit, fhc_read_buffering) of - {ok, false} -> 0; - {ok, true} -> - case proplists:get_value(read_buffer, Options, unbuffered) of - unbuffered -> 0; - N2 when is_integer(N2) -> N2 - end - end, - Ref = make_ref(), - put({Ref, fhc_handle}, #handle { hdl = closed, - ref = Ref, - offset = 0, - is_dirty = false, - write_buffer_size = 0, - write_buffer_size_limit = WriteBufferSize, - write_buffer = [], - read_buffer = <<>>, - read_buffer_pos = 0, - read_buffer_rem = 0, - read_buffer_size = ReadBufferSize, - read_buffer_size_limit = ReadBufferSize, - read_buffer_usage = 0, - at_eof = false, - path = Path, - mode = Mode, - options = Options, - is_write = is_writer(Mode), - is_read = is_reader(Mode), - last_used_at = undefined }), - {ok, Ref}. - -soft_close(Ref, Handle) -> - {Res, Handle1} = soft_close(Handle), - case Res of - ok -> put({Ref, fhc_handle}, Handle1), - true; - _ -> put_handle(Ref, Handle1), - false - end. - -soft_close(Handle = #handle { hdl = closed }) -> - {ok, Handle}; -soft_close(Handle) -> - case write_buffer(Handle) of - {ok, #handle { hdl = Hdl, - ref = Ref, - is_dirty = IsDirty, - last_used_at = Then } = Handle1 } -> - ok = case IsDirty of - true -> prim_file_sync(Hdl); - false -> ok - end, - ok = prim_file:close(Hdl), - age_tree_delete(Then, Ref), - {ok, Handle1 #handle { hdl = closed, - is_dirty = false, - last_used_at = undefined }}; - {_Error, _Handle} = Result -> - Result - end. - -hard_close(Handle) -> - case soft_close(Handle) of - {ok, #handle { path = Path, - is_read = IsReader, is_write = IsWriter }} -> - #file { reader_count = RCount, has_writer = HasWriter } = File = - get({Path, fhc_file}), - RCount1 = case IsReader of - true -> RCount - 1; - false -> RCount - end, - HasWriter1 = HasWriter andalso not IsWriter, - case RCount1 =:= 0 andalso not HasWriter1 of - true -> erase({Path, fhc_file}); - false -> put({Path, fhc_file}, - File #file { reader_count = RCount1, - has_writer = HasWriter1 }) - end, - ok; - {_Error, _Handle} = Result -> - Result - end. - -maybe_seek(New, Handle = #handle{hdl = Hdl, - offset = Old, - read_buffer_pos = BufPos, - read_buffer_rem = BufRem, - at_eof = AtEoF}) -> - {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Old, New), - case NeedsSeek of - true when is_number(New) andalso - ((New >= Old andalso New =< BufRem + Old) - orelse (New < Old andalso Old - New =< BufPos)) -> - Diff = New - Old, - {{ok, New}, Handle#handle{offset = New, - at_eof = AtEoF1, - read_buffer_pos = BufPos + Diff, - read_buffer_rem = BufRem - Diff}}; - true -> - case prim_file_position(Hdl, New) of - {ok, Offset1} = Result -> - {Result, reset_read_buffer(Handle#handle{offset = Offset1, - at_eof = AtEoF1})}; - {error, _} = Error -> - {Error, Handle} - end; - false -> - {{ok, Old}, Handle} - end. - -needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false}; -needs_seek( AtEoF, _CurOffset, {cur, 0}) -> {AtEoF, false}; -needs_seek( true, _CurOffset, eof ) -> {true , false}; -needs_seek( true, _CurOffset, {eof, 0}) -> {true , false}; -needs_seek( false, _CurOffset, eof ) -> {true , true }; -needs_seek( false, _CurOffset, {eof, 0}) -> {true , true }; -needs_seek( AtEoF, 0, bof ) -> {AtEoF, false}; -needs_seek( AtEoF, 0, {bof, 0}) -> {AtEoF, false}; -needs_seek( AtEoF, CurOffset, CurOffset) -> {AtEoF, false}; -needs_seek( true, CurOffset, {bof, DesiredOffset}) - when DesiredOffset >= CurOffset -> - {true, true}; -needs_seek( true, _CurOffset, {cur, DesiredOffset}) - when DesiredOffset > 0 -> - {true, true}; -needs_seek( true, CurOffset, DesiredOffset) %% same as {bof, DO} - when is_integer(DesiredOffset) andalso DesiredOffset >= CurOffset -> - {true, true}; -%% because we can't really track size, we could well end up at EoF and not know -needs_seek(_AtEoF, _CurOffset, _DesiredOffset) -> - {false, true}. - -write_buffer(Handle = #handle { write_buffer = [] }) -> - {ok, Handle}; -write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, - write_buffer = WriteBuffer, - write_buffer_size = DataSize, - at_eof = true }) -> - case prim_file_write(Hdl, lists:reverse(WriteBuffer)) of - ok -> - Offset1 = Offset + DataSize, - {ok, Handle #handle { offset = Offset1, is_dirty = true, - write_buffer = [], write_buffer_size = 0 }}; - {error, _} = Error -> - {Error, Handle} - end. - -reset_read_buffer(Handle) -> - Handle#handle{read_buffer = <<>>, - read_buffer_pos = 0, - read_buffer_rem = 0}. - -%% We come into this function whenever there's been a miss while -%% reading from the buffer - but note that when we first start with a -%% new handle the usage will be 0. Therefore in that case don't take -%% it as meaning the buffer was useless, we just haven't done anything -%% yet! -tune_read_buffer_limit(Handle = #handle{read_buffer_usage = 0}, _Count) -> - Handle; -%% In this head we have been using the buffer but now tried to read -%% outside it. So how did we do? If we used less than the size of the -%% buffer, make the new buffer the size of what we used before, but -%% add one byte (so that next time we can distinguish between getting -%% the buffer size exactly right and actually wanting more). If we -%% read 100% of what we had, then double it for next time, up to the -%% limit that was set when we were created. -tune_read_buffer_limit(Handle = #handle{read_buffer = Buf, - read_buffer_usage = Usg, - read_buffer_size = Sz, - read_buffer_size_limit = Lim}, Count) -> - %% If the buffer is <<>> then we are in the first read after a - %% reset, the read_buffer_usage is the total usage from before the - %% reset. But otherwise we are in a read which read off the end of - %% the buffer, so really the size of this read should be included - %% in the usage. - TotalUsg = case Buf of - <<>> -> Usg; - _ -> Usg + Count - end, - Handle#handle{read_buffer_usage = 0, - read_buffer_size = erlang:min(case TotalUsg < Sz of - true -> Usg + 1; - false -> Usg * 2 - end, Lim)}. - -maybe_reduce_read_cache(SparedRefs) -> - case vm_memory_monitor:get_memory_use(bytes) of - {_, infinity} -> ok; - {MemUse, MemLimit} when MemUse < MemLimit -> ok; - {MemUse, MemLimit} -> reduce_read_cache( - (MemUse - MemLimit) * 2, - SparedRefs) - end. - -reduce_read_cache(MemToFree, SparedRefs) -> - Handles = lists:sort( - fun({_, H1}, {_, H2}) -> H1 < H2 end, - [{R, H} || {{R, fhc_handle}, H} <- get(), - not lists:member(R, SparedRefs) - andalso size(H#handle.read_buffer) > 0]), - FreedMem = lists:foldl( - fun - (_, Freed) when Freed >= MemToFree -> - Freed; - ({Ref, #handle{read_buffer = Buf} = Handle}, Freed) -> - Handle1 = reset_read_buffer(Handle), - put({Ref, fhc_handle}, Handle1), - Freed + size(Buf) - end, 0, Handles), - if - FreedMem < MemToFree andalso SparedRefs =/= [] -> - reduce_read_cache(MemToFree - FreedMem, []); - true -> - ok - end. - -infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. - -i(total_limit, #fhc_state{limit = Limit}) -> Limit; -i(total_used, State) -> used(State); -i(sockets_limit, _) -> 0; -i(sockets_used, _) -> 0; -i(files_reserved, #fhc_state{reserve_count_file = RCount}) -> RCount; -i(Item, _) -> throw({bad_argument, Item}). - -used(#fhc_state{open_count = C1, - obtain_count_socket = C2, - obtain_count_file = C3, - reserve_count_socket = C4, - reserve_count_file = C5}) -> C1 + C2 + C3 + C4 + C5. - -%%---------------------------------------------------------------------------- -%% gen_server2 callbacks -%%---------------------------------------------------------------------------- - -init([AlarmSet, AlarmClear]) -> - Limit = case application:get_env(file_handles_high_watermark) of - {ok, Watermark} when (is_integer(Watermark) andalso - Watermark > 0) -> - Watermark; - _ -> - case ulimit() of - unknown -> ?FILE_HANDLES_LIMIT_OTHER; - Lim -> lists:max([2, Lim - ?RESERVED_FOR_OTHERS]) - end - end, - ObtainLimit = obtain_limit(Limit), - ?LOG_INFO("Limiting to approx ~tp file handles (~tp sockets)", - [Limit, ObtainLimit]), - Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]), - Elders = ets:new(?ELDERS_ETS_TABLE, [set, private]), - {ok, #fhc_state { elders = Elders, - limit = Limit, - open_count = 0, - open_pending = pending_new(), - obtain_limit = ObtainLimit, - obtain_count_file = 0, - obtain_pending_file = pending_new(), - obtain_count_socket = 0, - obtain_pending_socket = pending_new(), - clients = Clients, - timer_ref = undefined, - alarm_set = AlarmSet, - alarm_clear = AlarmClear, - reserve_count_file = 0, - reserve_count_socket = 0 }}. - -prioritise_cast(Msg, _Len, _State) -> - case Msg of - {release, _, _, _} -> 5; - {release_reservation, _, _, _} -> 5; - _ -> 0 - end. - -handle_call({open, Pid, Requested, EldestUnusedSince}, From, - State = #fhc_state { open_count = Count, - open_pending = Pending, - elders = Elders, - clients = Clients }) - when EldestUnusedSince =/= undefined -> - true = ets:insert(Elders, {Pid, EldestUnusedSince}), - Item = #pending { kind = open, - pid = Pid, - requested = Requested, - from = From }, - ok = track_client(Pid, Clients), - case needs_reduce(State #fhc_state { open_count = Count + Requested }) of - true -> case ets:lookup(Clients, Pid) of - [#cstate { opened = 0 }] -> - _ = safe_ets_update_element( - Clients, Pid, {#cstate.blocked, true}), - {noreply, - reduce(State #fhc_state { - open_pending = pending_in(Item, Pending) })}; - [#cstate { opened = Opened }] -> - _ = safe_ets_update_element( - Clients, Pid, - {#cstate.pending_closes, Opened}), - {reply, close, State} - end; - false -> {noreply, run_pending_item(Item, State)} - end; - -handle_call({obtain, N, Type, Pid}, From, - State = #fhc_state { clients = Clients }) -> - Count = obtain_state(Type, count, State), - Pending = obtain_state(Type, pending, State), - ok = track_client(Pid, Clients), - Item = #pending { kind = {obtain, Type}, pid = Pid, - requested = N, from = From }, - Enqueue = fun () -> - _ = safe_ets_update_element(Clients, Pid, - {#cstate.blocked, true}), - set_obtain_state(Type, pending, - pending_in(Item, Pending), State) - end, - {noreply, - case obtain_limit_reached(Type, State) of - true -> Enqueue(); - false -> case needs_reduce( - set_obtain_state(Type, count, Count + 1, State)) of - true -> reduce(Enqueue()); - false -> adjust_alarm( - State, run_pending_item(Item, State)) - end - end}; - -handle_call({set_limit, Limit}, _From, State) -> - {reply, ok, adjust_alarm( - State, maybe_reduce( - process_pending( - State #fhc_state { - limit = Limit, - obtain_limit = obtain_limit(Limit) })))}; - -handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) -> - {reply, Limit, State}; - -handle_call({info, Items}, _From, State) -> - {reply, infos(Items, State), State}; - -handle_call(list_elders, _From, State = #fhc_state { elders = Elders }) -> - {reply, ets:tab2list(Elders), State}; - -handle_call(list_clients, _From, State = #fhc_state { clients = Clients }) -> - {reply, ets:tab2list(Clients), State}; - -handle_call({get_client_state, ID}, _From, State = #fhc_state { clients = Clients }) -> - {reply, ets:lookup(Clients, ID), State}. - -handle_cast({register_callback, Pid, MFA}, - State = #fhc_state { clients = Clients }) -> - ok = track_client(Pid, Clients), - _ = safe_ets_update_element(Clients, Pid, {#cstate.callback, MFA}), - {noreply, State}; - -handle_cast({update, Pid, EldestUnusedSince}, - State = #fhc_state { elders = Elders }) - when EldestUnusedSince =/= undefined -> - true = ets:insert(Elders, {Pid, EldestUnusedSince}), - %% don't call maybe_reduce from here otherwise we can create a - %% storm of messages - {noreply, State}; - -handle_cast({release, N, Type, Pid}, State) -> - State1 = process_pending(update_counts({obtain, Type}, Pid, -N, State)), - {noreply, adjust_alarm(State, State1)}; - -handle_cast({close, Pid, EldestUnusedSince}, - State = #fhc_state { elders = Elders, clients = Clients }) -> - true = case EldestUnusedSince of - undefined -> ets:delete(Elders, Pid); - _ -> ets:insert(Elders, {Pid, EldestUnusedSince}) - end, - safe_ets_update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}), - {noreply, adjust_alarm(State, process_pending( - update_counts(open, Pid, -1, State)))}; - -handle_cast({transfer, N, FromPid, ToPid}, State) -> - ok = track_client(ToPid, State#fhc_state.clients), - {noreply, process_pending( - update_counts({obtain, socket}, ToPid, +N, - update_counts({obtain, socket}, FromPid, -N, - State)))}; - -handle_cast(clear_read_cache, State) -> - _ = clear_process_read_cache(), - {noreply, State}; - -handle_cast({release_reservation, Type, Pid}, State) -> - State1 = process_pending(update_counts({reserve, Type}, Pid, 0, State)), - {noreply, adjust_alarm(State, State1)}; - -handle_cast({set_reservation, N, Type, Pid}, - State = #fhc_state { clients = Clients }) -> - ok = track_client(Pid, Clients), - NewState = process_pending(update_counts({reserve, Type}, Pid, N, State)), - {noreply, case needs_reduce(NewState) of - true -> reduce(NewState); - false -> adjust_alarm(State, NewState) - end}; - -handle_cast({clear_metrics_of, Pid}, - State = #fhc_state { elders = Elders, clients = Clients }) -> - ets:delete(Elders, Pid), - ets:delete(Clients, Pid), - safe_ets_update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}), - {noreply, adjust_alarm(State, process_pending( - update_counts(open, Pid, -1, State)))}. - -handle_info(check_counts, State) -> - {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; - -handle_info({'DOWN', _MRef, process, Pid, _Reason}, - State = #fhc_state { elders = Elders, - open_count = OpenCount, - open_pending = OpenPending, - obtain_count_file = ObtainCountF, - obtain_count_socket = ObtainCountS, - obtain_pending_file = ObtainPendingF, - obtain_pending_socket = ObtainPendingS, - reserve_count_file = ReserveCountF, - reserve_count_socket = ReserveCountS, - clients = Clients }) -> - [#cstate { opened = Opened, - obtained_file = ObtainedFile, - obtained_socket = ObtainedSocket, - reserved_file = ReservedFile, - reserved_socket = ReservedSocket }] = - ets:lookup(Clients, Pid), - true = ets:delete(Clients, Pid), - true = ets:delete(Elders, Pid), - Fun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, - State1 = process_pending( - State #fhc_state { - open_count = OpenCount - Opened, - open_pending = filter_pending(Fun, OpenPending), - obtain_count_file = ObtainCountF - ObtainedFile, - obtain_count_socket = ObtainCountS - ObtainedSocket, - obtain_pending_file = filter_pending(Fun, ObtainPendingF), - obtain_pending_socket = filter_pending(Fun, ObtainPendingS), - reserve_count_file = ReserveCountF - ReservedFile, - reserve_count_socket = ReserveCountS - ReservedSocket}), - {noreply, adjust_alarm(State, State1)}. - -terminate(_Reason, State = #fhc_state { clients = Clients, - elders = Elders }) -> - ets:delete(Clients), - ets:delete(Elders), - State. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%---------------------------------------------------------------------------- -%% pending queue abstraction helpers -%%---------------------------------------------------------------------------- - -queue_fold(Fun, Init, Q) -> - case queue:out(Q) of - {empty, _Q} -> Init; - {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) - end. - -filter_pending(Fun, {Count, Queue}) -> - {Delta, Queue1} = - queue_fold( - fun (Item = #pending { requested = Requested }, {DeltaN, QueueN}) -> - case Fun(Item) of - true -> {DeltaN, queue:in(Item, QueueN)}; - false -> {DeltaN - Requested, QueueN} - end - end, {0, queue:new()}, Queue), - {Count + Delta, Queue1}. - -pending_new() -> - {0, queue:new()}. - -pending_in(Item = #pending { requested = Requested }, {Count, Queue}) -> - {Count + Requested, queue:in(Item, Queue)}. - -pending_out({0, _Queue} = Pending) -> - {empty, Pending}; -pending_out({N, Queue}) -> - {{value, #pending { requested = Requested }} = Result, Queue1} = - queue:out(Queue), - {Result, {N - Requested, Queue1}}. - -pending_count({Count, _Queue}) -> - Count. - -%%---------------------------------------------------------------------------- -%% server helpers -%%---------------------------------------------------------------------------- - -obtain_limit(infinity) -> infinity; -obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of - OLimit when OLimit < 0 -> 0; - OLimit -> OLimit - end. - -obtain_limit_reached(socket, State) -> obtain_limit_reached(State); -obtain_limit_reached(file, State) -> needs_reduce(State). - -obtain_limit_reached(#fhc_state{obtain_limit = Limit, - obtain_count_socket = Count, - reserve_count_socket = RCount}) -> - Limit =/= infinity andalso (RCount + Count) >= Limit. - -obtain_state(file, count, #fhc_state{obtain_count_file = N}) -> N; -obtain_state(socket, count, #fhc_state{obtain_count_socket = N}) -> N; -obtain_state(file, pending, #fhc_state{obtain_pending_file = N}) -> N; -obtain_state(socket, pending, #fhc_state{obtain_pending_socket = N}) -> N. - -set_obtain_state(file, count, N, S) -> S#fhc_state{obtain_count_file = N}; -set_obtain_state(socket, count, N, S) -> S#fhc_state{obtain_count_socket = N}; -set_obtain_state(file, pending, N, S) -> S#fhc_state{obtain_pending_file = N}; -set_obtain_state(socket, pending, N, S) -> S#fhc_state{obtain_pending_socket = N}. - -adjust_alarm(OldState = #fhc_state { alarm_set = AlarmSet, - alarm_clear = AlarmClear }, NewState) -> - case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of - {false, true} -> AlarmSet({file_descriptor_limit, []}); - {true, false} -> AlarmClear(file_descriptor_limit); - _ -> ok - end, - NewState. - -process_pending(State = #fhc_state { limit = infinity }) -> - State; -process_pending(State) -> - process_open(process_obtain(socket, process_obtain(file, State))). - -process_open(State = #fhc_state { limit = Limit, - open_pending = Pending}) -> - {Pending1, State1} = process_pending(Pending, Limit - used(State), State), - State1 #fhc_state { open_pending = Pending1 }. - -process_obtain(socket, State = #fhc_state { limit = Limit, - obtain_limit = ObtainLimit, - open_count = OpenCount, - obtain_count_socket = ObtainCount, - obtain_pending_socket = Pending, - obtain_count_file = ObtainCountF, - reserve_count_file = ReserveCountF, - reserve_count_socket = ReserveCount}) -> - Quota = min(ObtainLimit - ObtainCount, - Limit - (OpenCount + ObtainCount + ObtainCountF + ReserveCount + ReserveCountF)), - {Pending1, State1} = process_pending(Pending, Quota, State), - State1#fhc_state{obtain_pending_socket = Pending1}; -process_obtain(file, State = #fhc_state { limit = Limit, - open_count = OpenCount, - obtain_count_socket = ObtainCountS, - obtain_count_file = ObtainCountF, - obtain_pending_file = Pending, - reserve_count_file = ReserveCountF, - reserve_count_socket = ReserveCountS}) -> - Quota = Limit - (OpenCount + ObtainCountS + ObtainCountF + ReserveCountF + ReserveCountS), - {Pending1, State1} = process_pending(Pending, Quota, State), - State1#fhc_state{obtain_pending_file = Pending1}. - -process_pending(Pending, Quota, State) when Quota =< 0 -> - {Pending, State}; -process_pending(Pending, Quota, State) -> - case pending_out(Pending) of - {empty, _Pending} -> - {Pending, State}; - {{value, #pending { requested = Requested }}, _Pending1} - when Requested > Quota -> - {Pending, State}; - {{value, #pending { requested = Requested } = Item}, Pending1} -> - process_pending(Pending1, Quota - Requested, - run_pending_item(Item, State)) - end. - -run_pending_item(#pending { kind = Kind, - pid = Pid, - requested = Requested, - from = From }, - State = #fhc_state { clients = Clients }) -> - gen_server2:reply(From, ok), - safe_ets_update_element(Clients, Pid, {#cstate.blocked, false}), - update_counts(Kind, Pid, Requested, State). - -update_counts(open, Pid, Delta, - State = #fhc_state { open_count = OpenCount, - clients = Clients }) -> - safe_ets_update_counter(Clients, Pid, {#cstate.opened, Delta}, - fun() -> ?LOG_WARNING("FHC: failed to update counter 'opened', client pid: ~p", [Pid]) end), - State #fhc_state { open_count = OpenCount + Delta}; -update_counts({obtain, file}, Pid, Delta, - State = #fhc_state {obtain_count_file = ObtainCountF, - clients = Clients }) -> - safe_ets_update_counter(Clients, Pid, {#cstate.obtained_file, Delta}, - fun() -> ?LOG_WARNING("FHC: failed to update counter 'obtained_file', client pid: ~p", [Pid]) end), - State #fhc_state { obtain_count_file = ObtainCountF + Delta}; -update_counts({obtain, socket}, Pid, Delta, - State = #fhc_state {obtain_count_socket = ObtainCountS, - clients = Clients }) -> - safe_ets_update_counter(Clients, Pid, {#cstate.obtained_socket, Delta}, - fun() -> ?LOG_WARNING("FHC: failed to update counter 'obtained_socket', client pid: ~p", [Pid]) end), - State #fhc_state { obtain_count_socket = ObtainCountS + Delta}; -update_counts({reserve, file}, Pid, NewReservation, - State = #fhc_state {reserve_count_file = ReserveCountF, - clients = Clients }) -> - [#cstate{reserved_file = R}] = ets:lookup(Clients, Pid), - Delta = NewReservation - R, - safe_ets_update_counter(Clients, Pid, {#cstate.reserved_file, Delta}, - fun() -> ?LOG_WARNING("FHC: failed to update counter 'reserved_file', client pid: ~p", [Pid]) end), - State #fhc_state { reserve_count_file = ReserveCountF + Delta}; -update_counts({reserve, socket}, Pid, NewReservation, - State = #fhc_state {reserve_count_socket = ReserveCountS, - clients = Clients }) -> - [#cstate{reserved_file = R}] = ets:lookup(Clients, Pid), - Delta = NewReservation - R, - safe_ets_update_counter(Clients, Pid, {#cstate.reserved_socket, Delta}, - fun() -> ?LOG_WARNING("FHC: failed to update counter 'reserved_socket', client pid: ~p", [Pid]) end), - State #fhc_state { reserve_count_socket = ReserveCountS + Delta}. - -maybe_reduce(State) -> - case needs_reduce(State) of - true -> reduce(State); - false -> State - end. - -needs_reduce(#fhc_state { limit = Limit, - open_count = OpenCount, - open_pending = {OpenPending, _}, - obtain_limit = ObtainLimit, - obtain_count_socket = ObtainCountS, - obtain_count_file = ObtainCountF, - obtain_pending_file = {ObtainPendingF, _}, - obtain_pending_socket = {ObtainPendingS, _}, - reserve_count_socket = ReserveCountS, - reserve_count_file = ReserveCountF}) -> - Limit =/= infinity - andalso (((OpenCount + ObtainCountS + ObtainCountF + ReserveCountS + ReserveCountF) > Limit) - orelse (OpenPending =/= 0) - orelse (ObtainPendingF =/= 0) - orelse (ObtainCountS < ObtainLimit - andalso (ObtainPendingS =/= 0))). - -reduce(State = #fhc_state { open_pending = OpenPending, - obtain_pending_file = ObtainPendingFile, - obtain_pending_socket = ObtainPendingSocket, - elders = Elders, - clients = Clients, - timer_ref = TRef }) -> - Now = erlang:monotonic_time(), - {CStates, Sum, ClientCount} = - ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) -> - [#cstate { pending_closes = PendingCloses, - opened = Opened, - blocked = Blocked } = CState] = - ets:lookup(Clients, Pid), - TimeDiff = erlang:convert_time_unit( - Now - Eldest, native, micro_seconds), - case Blocked orelse PendingCloses =:= Opened of - true -> Accs; - false -> {[CState | CStatesAcc], - SumAcc + TimeDiff, - CountAcc + 1} - end - end, {[], 0, 0}, Elders), - case CStates of - [] -> ok; - _ -> case (Sum / ClientCount) - - (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of - AverageAge when AverageAge > 0 -> - notify_age(CStates, AverageAge); - _ -> - notify_age0(Clients, CStates, - pending_count(OpenPending) + - pending_count(ObtainPendingFile) + - pending_count(ObtainPendingSocket)) - end - end, - case TRef of - undefined -> TRef1 = erlang:send_after( - ?FILE_HANDLES_CHECK_INTERVAL, ?SERVER, - check_counts), - State #fhc_state { timer_ref = TRef1 }; - _ -> State - end. - -notify_age(CStates, AverageAge) -> - lists:foreach( - fun (#cstate { callback = undefined }) -> ok; - (#cstate { callback = {M, F, A} }) -> apply(M, F, A ++ [AverageAge]) - end, CStates). - -notify_age0(Clients, CStates, Required) -> - case [CState || CState <- CStates, CState#cstate.callback =/= undefined] of - [] -> ok; - Notifications -> S = rand:uniform(length(Notifications)), - {L1, L2} = lists:split(S, Notifications), - notify(Clients, Required, L2 ++ L1) - end. - -notify(_Clients, _Required, []) -> - ok; -notify(_Clients, Required, _Notifications) when Required =< 0 -> - ok; -notify(Clients, Required, [#cstate{ pid = Pid, - callback = {M, F, A}, - opened = Opened } | Notifications]) -> - apply(M, F, A ++ [0]), - safe_ets_update_element(Clients, Pid, {#cstate.pending_closes, Opened}), - notify(Clients, Required - Opened, Notifications). - -track_client(Pid, Clients) -> - case ets:insert_new(Clients, #cstate { pid = Pid, - callback = undefined, - opened = 0, - obtained_file = 0, - obtained_socket = 0, - blocked = false, - pending_closes = 0, - reserved_file = 0, - reserved_socket = 0 }) of - true -> _MRef = erlang:monitor(process, Pid), - ok; - false -> ok - end. - - -%% To increase the number of file descriptors: on Windows set ERL_MAX_PORTS -%% environment variable, on Linux set `ulimit -n`. -ulimit() -> - IOStats = case erlang:system_info(check_io) of - [Val | _] when is_list(Val) -> Val; - Val when is_list(Val) -> Val; - _Other -> [] - end, - case proplists:get_value(max_fds, IOStats) of - MaxFds when is_integer(MaxFds) andalso MaxFds > 1 -> - case os:type() of - {win32, _OsName} -> - %% On Windows max_fds is twice the number of open files: - %% https://github.com/yrashk/erlang/blob/e1282325ed75e52a98d5/erts/emulator/sys/win32/sys.c#L2459-2466 - MaxFds div 2; - _Any -> - %% For other operating systems trust Erlang. - MaxFds - end; - _ -> - unknown - end. diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index c213e13abd00..815ccc48cb2d 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -17,8 +17,7 @@ stop_and_halt/0, await_startup/0, await_startup/1, await_startup/3, status/0, is_running/0, is_serving/0, alarms/0, is_running/1, is_serving/1, environment/0, rotate_logs/0, - force_event_refresh/1, - start_fhc/0]). + force_event_refresh/1]). -export([start/2, stop/1, prep_stop/1]). -export([start_apps/1, start_apps/2, stop_apps/1]). @@ -82,7 +81,7 @@ -rabbit_boot_step({database, [{mfa, {rabbit_db, init, []}}, - {requires, file_handle_cache}, + {requires, rabbit_registry}, {enables, external_infrastructure}]}). -rabbit_boot_step({networking_metadata_store, @@ -97,19 +96,6 @@ {requires, database}, {enables, external_infrastructure}]}). --rabbit_boot_step({code_server_cache, - [{description, "code_server cache server"}, - {mfa, {rabbit_sup, start_child, [code_server_cache]}}, - {requires, rabbit_alarm}, - {enables, file_handle_cache}]}). - --rabbit_boot_step({file_handle_cache, - [{description, "file handle cache server"}, - {mfa, {rabbit, start_fhc, []}}, - %% FHC needs memory monitor to be running - {requires, code_server_cache}, - {enables, worker_pool}]}). - -rabbit_boot_step({worker_pool, [{description, "default worker pool"}, {mfa, {rabbit_sup, start_supervisor_child, @@ -765,9 +751,6 @@ status() -> get_disk_free_limit, []}}, {disk_free, {rabbit_disk_monitor, get_disk_free, []}}]), - S3 = rabbit_misc:with_exit_handler( - fun () -> [] end, - fun () -> [{file_descriptors, file_handle_cache:info()}] end), S4 = [{processes, [{limit, erlang:system_info(process_limit)}, {used, erlang:system_info(process_count)}]}, {run_queue, erlang:statistics(run_queue)}, @@ -802,7 +785,7 @@ status() -> (_) -> false end, maps:to_list(product_info())), - S1 ++ S2 ++ S3 ++ S4 ++ S5 ++ S6 ++ S7 ++ S8. + S1 ++ S2 ++ S4 ++ S5 ++ S6 ++ S7 ++ S8. alarms() -> Alarms = rabbit_misc:with_exit_handler(rabbit_misc:const([]), @@ -1642,63 +1625,6 @@ home_dir() -> config_files() -> rabbit_config:config_files(). -%% We don't want this in fhc since it references rabbit stuff. And we can't put -%% this in the bootstep directly. -start_fhc() -> - ok = rabbit_sup:start_restartable_child( - file_handle_cache, - [fun(_) -> ok end, fun(_) -> ok end]), - ensure_working_fhc(), - maybe_warn_low_fd_limit(). - -ensure_working_fhc() -> - %% To test the file handle cache, we simply read a file we know it - %% exists (Erlang kernel's .app file). - %% - %% To avoid any pollution of the application process' dictionary by - %% file_handle_cache, we spawn a separate process. - Parent = self(), - TestFun = fun() -> - ReadBuf = case application:get_env(rabbit, fhc_read_buffering) of - {ok, true} -> "ON"; - {ok, false} -> "OFF" - end, - WriteBuf = case application:get_env(rabbit, fhc_write_buffering) of - {ok, true} -> "ON"; - {ok, false} -> "OFF" - end, - ?LOG_INFO("FHC read buffering: ~ts", [ReadBuf], - #{domain => ?RMQLOG_DOMAIN_GLOBAL}), - ?LOG_INFO("FHC write buffering: ~ts", [WriteBuf], - #{domain => ?RMQLOG_DOMAIN_GLOBAL}), - Filename = filename:join(code:lib_dir(kernel), "ebin/kernel.app"), - {ok, Fd} = file_handle_cache:open(Filename, [raw, binary, read], []), - {ok, _} = file_handle_cache:read(Fd, 1), - ok = file_handle_cache:close(Fd), - Parent ! fhc_ok - end, - TestPid = spawn_link(TestFun), - %% Because we are waiting for the test fun, abuse the - %% 'mnesia_table_loading_retry_timeout' parameter to find a sane timeout - %% value. - Timeout = rabbit_table:retry_timeout(), - receive - fhc_ok -> ok; - {'EXIT', TestPid, Exception} -> throw({ensure_working_fhc, Exception}) - after Timeout -> - throw({ensure_working_fhc, {timeout, TestPid}}) - end. - -maybe_warn_low_fd_limit() -> - case file_handle_cache:ulimit() of - %% unknown is included as atom() > integer(). - L when L > 1024 -> - ok; - L -> - ?LOG_WARNING("Available file handles: ~tp. " - "Please consider increasing system limits", [L]) - end. - %% Any configuration that %% 1. is not allowed to change while RabbitMQ is running, and %% 2. is read often diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index a658747318d8..a15a7968bc54 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -1466,11 +1466,7 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Count, BQS1} = BQ:purge(BQS), State1 = State#q{backing_queue_state = BQS1}, - reply({ok, Count}, notify_decorators_if_became_empty(Count =:= 0, State1)); - -handle_call({requeue, AckTags, ChPid}, From, State) -> - gen_server2:reply(From, ok), - noreply(requeue(AckTags, ChPid, State)). + reply({ok, Count}, notify_decorators_if_became_empty(Count =:= 0, State1)). new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = true, Consumers) -> diff --git a/deps/rabbit/src/rabbit_direct.erl b/deps/rabbit/src/rabbit_direct.erl index 62d34c49776a..698641b5c437 100644 --- a/deps/rabbit/src/rabbit_direct.erl +++ b/deps/rabbit/src/rabbit_direct.erl @@ -16,9 +16,6 @@ -export([list_local/0, conserve_resources/3]). -%% For testing only --export([extract_extra_auth_props/4]). - -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit_common/include/rabbit_misc.hrl"). -include_lib("kernel/include/logger.hrl"). @@ -85,7 +82,7 @@ auth_fun({Username, Password}, VHost, ExtraAuthProps) -> %% during the first authentication. However, we do have the outcome from such successful authentication. connect(Creds, VHost, Protocol, Pid, Infos) -> - ExtraAuthProps = append_authz_backends(extract_extra_auth_props(Creds, VHost, Pid, Infos), Infos), + ExtraAuthProps = get_authz_backends(Infos), AuthFun = auth_fun(Creds, VHost, ExtraAuthProps), case rabbit_boot_state:has_reached_and_is_active(core_started) of @@ -119,37 +116,8 @@ connect(Creds, VHost, Protocol, Pid, Infos) -> false -> {error, broker_not_found_on_node} end. -extract_extra_auth_props(Creds, VHost, Pid, Infos) -> - case extract_protocol(Infos) of - undefined -> - []; - Protocol -> - maybe_call_connection_info_module(Protocol, Creds, VHost, Pid, Infos) - end. - - -append_authz_backends(AuthProps, Infos) -> - case proplists:get_value(authz_backends, Infos, undefined) of - undefined -> AuthProps; - AuthzBackends -> AuthProps ++ AuthzBackends - end. - -extract_protocol(Infos) -> - case proplists:get_value(protocol, Infos, undefined) of - {Protocol, _Version} -> - Protocol; - _ -> - undefined - end. - -maybe_call_connection_info_module(Protocol, Creds, VHost, Pid, Infos) -> - Module = rabbit_data_coercion:to_atom(string:to_lower( - "rabbit_" ++ - lists:flatten(string:replace(rabbit_data_coercion:to_list(Protocol), " ", "_", all)) ++ - "_connection_info") - ), - Args = [Creds, VHost, Pid, Infos], - code_server_cache:maybe_call_mfa(Module, additional_authn_params, Args, []). +get_authz_backends(Infos) -> + proplists:get_value(authz_backends, Infos, []). is_vhost_alive(VHost, {Username, _Password}, Pid) -> PrintedUsername = case Username of diff --git a/deps/rabbit/src/rabbit_fhc_helpers.erl b/deps/rabbit/src/rabbit_fhc_helpers.erl deleted file mode 100644 index c107bdc429f3..000000000000 --- a/deps/rabbit/src/rabbit_fhc_helpers.erl +++ /dev/null @@ -1,44 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(rabbit_fhc_helpers). - --export([clear_read_cache/0]). - --include("amqqueue.hrl"). - -clear_read_cache() -> - case application:get_env(rabbit, fhc_read_buffering) of - {ok, true} -> - file_handle_cache:clear_read_cache(), - clear_vhost_read_cache(rabbit_vhost:list_names()); - _ -> %% undefined or {ok, false} - ok - end. - -clear_vhost_read_cache([]) -> - ok; -clear_vhost_read_cache([VHost | Rest]) -> - clear_queue_read_cache(rabbit_amqqueue:list(VHost)), - clear_vhost_read_cache(Rest). - -clear_queue_read_cache([]) -> - ok; -clear_queue_read_cache([Q | Rest]) when ?is_amqqueue(Q) -> - MPid = amqqueue:get_pid(Q), - %% Limit the action to the current node. - Pids = [P || P <- [MPid], node(P) =:= node()], - %% This function is executed in the context of the backing queue - %% process because the read buffer is stored in the process - %% dictionary. - Fun = fun(_, State) -> - _ = file_handle_cache:clear_process_read_cache(), - State - end, - [rabbit_amqqueue:run_backing_queue(Pid, rabbit_variable_queue, Fun) - || Pid <- Pids], - clear_queue_read_cache(Rest). diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index e8161631c37d..9860670c5fc9 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ -export([start/2, stop/1]). %% exported for testing only --export([start_msg_store/3, stop_msg_store/1]). +-export([start_msg_store/3, stop_msg_store/1, ram_pending_acks/1]). -include("mc.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -123,62 +123,113 @@ -behaviour(rabbit_backing_queue). --record(vqstate, - { q_head, - q_tail, - next_seq_id, - %% seq_id() of first undelivered message - %% everything before this seq_id() was delivered at least once - next_deliver_seq_id, - ram_pending_ack, %% msgs still in RAM - disk_pending_ack, %% msgs in store, paged out - index_state, - store_state, - msg_store_clients, - durable, - transient_threshold, - qi_embed_msgs_below, - - bytes, %% w/o unacked - unacked_bytes, - persistent_count, %% w unacked - persistent_bytes, %% w unacked - - ram_msg_count, %% w/o unacked - ram_msg_count_prev, - ram_ack_count_prev, - ram_bytes, %% w unacked - out_counter, - in_counter, - rates, - %% There are two confirms paths: either store/index produce confirms - %% separately (v2 with per-vhost message store) or the confirms - %% are produced all at once while syncing/flushing (v2 with per-queue - %% message store). The latter is more efficient as it avoids many - %% sets operations. - msgs_on_disk, - msg_indices_on_disk, - unconfirmed, - confirmed, - ack_out_counter, - ack_in_counter, - %% Unlike the other counters these two do not feed into - %% #rates{} and get reset - disk_read_count, - disk_write_count, - - %% Fast path for confirms handling. Instead of having - %% index/store keep track of confirms separately and - %% doing intersect/subtract/union we just put the messages - %% here and on sync move them to 'confirmed'. - %% - %% Note: This field used to be 'memory_reduction_run_count'. - unconfirmed_simple, - %% Queue data is grouped by VHost. We need to store it - %% to work with queue index. - virtual_host, - waiting_bump = false - }). +-record(vqstate, { + %% Head of the queue. Index information has been loaded into + %% memory, and message body may have been depending on size. + q_head, + %% Tail of the queue, fully on disk. + q_tail, + + %% SeqId of the next message published. + next_seq_id, + %% Everything before this SeqId was delivered at least once. + %% @todo Do we really need this if we add delivery_count? + %% No we don't, we will just check delivery_count to know if was already delivered (delivery_count > 1). + %% NO!! We can also remove the is_delivered in the msg_status since that value --doesn't-- DOES!! survive restarts. + %% Actually we are using next_deliver_seq_id to know whether a message was already delivered and that survives restarts. + %% But we could very well do the same with a delivery_count map. So making the delivery_count map survive restarts + %% (and properly clean up on restart by removing transients) is the key to getting rid of these things. + next_deliver_seq_id, + + %% Messages pending acks. These messages have been delivered to the channel + %% and we are expecting an ack (or requeue) back. Messages are in ram or disk + %% depending on whether the #msg_status{} record contains the message body. + %% Typically only smaller message bodies are kept in memory, larger ones are + %% read only when needed. + ram_pending_ack, + disk_pending_ack, + + %% Index, queue store and shared message store states. In the latter's case + %% since the shared message store is separate processes, the state only + %% contains information to access it. + index_state, + store_state, + msg_store_clients, + + %% Whether the queue is durable. Used to determine whether messages are + %% truly persistent (both messages and queue must be durable). + durable, + + %% We must keep the virtual host information around in order to write + %% terms when terminating as the terms file is per-vhost. + virtual_host, + + %% SeqId of the first persistent message. Determined during recovery and + %% used to identify which transient messages belong to a previous + %% incarnation of the node. We don't remove transient messages during + %% recovery to keep recovery fast (otherwise we'd have to go over the + %% entire queue contents), instead we mark where we left off and drop + %% the messages when they would have been consumed. + transient_threshold, + + %% Maximum size of messages written to the queue store. The queue + %% store is meant to contain smaller messages, while larger messages + %% go to the shared message store. The shared message store benefits + %% from mechanisms like compaction and deduplication. + qi_embed_msgs_below, + + %% There are two confirms paths: either store/index produce confirms + %% separately (per-vhost message store) or the confirms + %% are produced all at once while syncing/flushing (per-queue + %% message store). The latter is more efficient as it avoids many + %% sets operations. + msgs_on_disk, + msg_indices_on_disk, + unconfirmed, + confirmed, + %% Fast path for confirms handling. Instead of having + %% index/store keep track of confirms separately and + %% doing intersect/subtract/union we just put the messages + %% here and on sync move them to 'confirmed'. + unconfirmed_simple, + + %% Metrics that are also used for sanity checking. + %% + %% They measure (with "unacked" meaning "messages pending acks"): + %% * the number of bytes in the queue (excluding unacked) + %% * the number of bytes in messages pending acks + %% * the number of persistent messages (including unacked) + %% * the number of bytes for persistent messages (including unacked) + %% * the number of messages currently in memory + %% (excluding unacked because we can get those via `map_size(RPA)`) + %% * the number of bytes of messages currently in memory (including unacked) + %% + %% The total number of bytes of messages in the queue (including unacked) + %% is bytes + unacked_bytes. + %% + %% Messages can be both persistent and in memory at the same time, + %% for example when they are close to being delivered. + bytes, + unacked_bytes, + persistent_count, + persistent_bytes, + ram_msg_count, + ram_bytes, + + %% Metrics for outgoing and ingoing messages rates. + %% + %% Counters get incremented per event and then an average is calculated + %% periodically into the #rates{} record. + out_counter, + in_counter, + ack_out_counter, + ack_in_counter, + rates, + + %% Metrics totalling message reads and writes from/to disk. + disk_read_count, + disk_write_count +}). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -261,8 +312,6 @@ persistent_bytes :: non_neg_integer(), ram_msg_count :: non_neg_integer(), - ram_msg_count_prev :: non_neg_integer(), - ram_ack_count_prev :: non_neg_integer(), ram_bytes :: non_neg_integer(), out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), @@ -562,17 +611,13 @@ ack(AckTags, State) -> ack_out_counter = AckOutCount + length(AckTags) })}. requeue(AckTags, #vqstate { q_head = QHead0, - q_tail = QTail, - in_counter = InCounter } = State) -> - {SeqIds, QHead, MsgIds, State1} = requeue_merge(lists:sort(AckTags), QHead0, [], - q_tail_limit(QTail), State), - {QTail1, MsgIds1, State2} = q_tail_merge(SeqIds, QTail, MsgIds, State1), - MsgCount = length(MsgIds1), - {MsgIds1, a( - maybe_update_rates( - State2 #vqstate { q_head = QHead, - q_tail = QTail1, - in_counter = InCounter + MsgCount }))}. + in_counter = InCounter } = State0) -> + {QHead, MsgIds, State} = requeue_merge(lists:sort(AckTags), QHead0, [], State0), + MsgCount = length(MsgIds), + {MsgIds, a(maybe_update_rates(State#vqstate{ + q_head = QHead, + in_counter = InCounter + MsgCount + }))}. ackfold(MsgFun, Acc, State, AckTags) -> {AccN, StateN} = @@ -1022,8 +1067,6 @@ init(IsDurable, IndexState, StoreState, DiskCount, DiskBytes, Terms, persistent_bytes = DiskBytes1, ram_msg_count = 0, - ram_msg_count_prev = 0, - ram_ack_count_prev = 0, ram_bytes = 0, unacked_bytes = 0, out_counter = 0, @@ -1159,17 +1202,15 @@ stats_acked_pending(MS = #msg_status{is_persistent = false}, St) -> St#vqstate{?UP(unacked_bytes, ram_bytes, -msg_size(MS))}. %% Notice that this is the reverse of stats_pending_acks. +%% Note that messages are always requeued to memory in the current +%% implementation because they are necessarily at the front of the +%% queue which is in memory. stats_requeued_memory(MS = #msg_status{msg = undefined}, St) -> St#vqstate{?UP(bytes, +msg_size(MS)), ?UP(unacked_bytes, -msg_size(MS))}; stats_requeued_memory(MS, St) -> St#vqstate{?UP(ram_msg_count, +1), ?UP(bytes, +msg_size(MS)), ?UP(unacked_bytes, -msg_size(MS))}. -stats_requeued_disk(MS = #msg_status{is_persistent = true}, St) -> - St#vqstate{?UP(bytes, +msg_size(MS)), ?UP(unacked_bytes, -msg_size(MS))}; -stats_requeued_disk(MS = #msg_status{is_persistent = false}, St) -> - St#vqstate{?UP(unacked_bytes, -msg_size(MS))}. - msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined. @@ -1593,6 +1634,9 @@ prepare_to_store(Msg) -> %% Internal gubbins for acks %%---------------------------------------------------------------------------- +ram_pending_acks(#vqstate{ ram_pending_ack = RPA }) -> + RPA. + record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA, @@ -1757,46 +1801,27 @@ msgs_written_to_disk(Callback, MsgIdSet, written) -> %%---------------------------------------------------------------------------- %% Rebuild queue, inserting sequence ids to maintain ordering -requeue_merge(SeqIds, Q, MsgIds, Limit, State) -> - requeue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds, - Limit, State). +requeue_merge(SeqIds, Q, MsgIds, State) -> + requeue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds, State). -requeue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, - Limit, State) - when Limit == undefined orelse SeqId < Limit -> +requeue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, State) -> case ?QUEUE:out(Q) of {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1} when SeqIdQ < SeqId -> %% enqueue from the remaining queue - requeue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds, - Limit, State); + requeue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds, State); {_, _Q1} -> %% enqueue from the remaining list of sequence ids case msg_from_pending_ack(SeqId, State) of {none, _} -> - requeue_merge(Rest, Q, Front, MsgIds, Limit, State); + requeue_merge(Rest, Q, Front, MsgIds, State); {#msg_status { msg_id = MsgId } = MsgStatus, State1} -> State2 = stats_requeued_memory(MsgStatus, State1), - requeue_merge(Rest, Q, ?QUEUE:in(MsgStatus, Front), [MsgId | MsgIds], - Limit, State2) + requeue_merge(Rest, Q, ?QUEUE:in(MsgStatus, Front), [MsgId | MsgIds], State2) end end; -requeue_merge(SeqIds, Q, Front, MsgIds, - _Limit, State) -> - {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}. - -q_tail_merge([], QTail, MsgIds, State) -> - {QTail, MsgIds, State}; -q_tail_merge(SeqIds, QTail, MsgIds, State) -> - lists:foldl(fun (SeqId, {QTail0, MsgIds0, State0} = Acc) -> - case msg_from_pending_ack(SeqId, State0) of - {none, _} -> - Acc; - {#msg_status { msg_id = MsgId } = MsgStatus, State1} -> - {expand_q_tail(SeqId, QTail0), [MsgId | MsgIds0], - stats_requeued_disk(MsgStatus, State1)} - end - end, {QTail, MsgIds, State}, SeqIds). +requeue_merge([], Q, Front, MsgIds, State) -> + {?QUEUE:join(Front, Q), MsgIds, State}. %% Mostly opposite of record_pending_ack/2 msg_from_pending_ack(SeqId, State) -> @@ -1809,9 +1834,6 @@ msg_from_pending_ack(SeqId, State) -> State1} end. -q_tail_limit(?BLANK_Q_TAIL_PATTERN(_)) -> undefined; -q_tail_limit(#q_tail{ start_seq_id = StartSeqId }) -> StartSeqId. - %%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 1990388f3acd..ec02a45967bc 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -769,20 +769,13 @@ gen_result([{msg, MsgId, Msg}|Tail], Offset, Acc) -> %% ------------------------------------------------------------------- setup_backing_queue_test_group(Config) -> - {ok, MaxJournal} = - application:get_env(rabbit, queue_index_max_journal_entries), - application:set_env(rabbit, queue_index_max_journal_entries, 128), {ok, Bytes} = application:get_env(rabbit, queue_index_embed_msgs_below), rabbit_ct_helpers:set_config(Config, [ - {rmq_queue_index_max_journal_entries, MaxJournal}, {rmq_queue_index_embed_msgs_below, Bytes} ]). teardown_backing_queue_test_group(Config) -> - %% FIXME: Undo all the setup function did. - application:set_env(rabbit, queue_index_max_journal_entries, - ?config(rmq_queue_index_max_journal_entries, Config)), %% We will have restarted the message store, and thus changed %% the order of the children of rabbit_sup. This will cause %% problems if there are subsequent failures - see bug 24262. diff --git a/deps/rabbit/test/channel_operation_timeout_test_queue.erl b/deps/rabbit/test/channel_operation_timeout_test_queue.erl index 68759d1bd900..4ef67b84c18b 100644 --- a/deps/rabbit/test/channel_operation_timeout_test_queue.erl +++ b/deps/rabbit/test/channel_operation_timeout_test_queue.erl @@ -5,7 +5,6 @@ %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -%% @todo This module also needs to be updated when variable queue changes. -module(channel_operation_timeout_test_queue). -export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, @@ -28,69 +27,11 @@ %% the test message has been published, and is awaiting acknowledgement in the %% queue index. Test message is "timeout_test_msg!". %% +%% Only access the #msg_status{} record needs to be updated if it changes. %%---------------------------------------------------------------------------- -behaviour(rabbit_backing_queue). --record(vqstate, - { q_head, - q_tail, - next_seq_id, - %% seq_id() of first undelivered message - %% everything before this seq_id() was delivered at least once - next_deliver_seq_id, - ram_pending_ack, %% msgs still in RAM - disk_pending_ack, %% msgs in store, paged out - index_state, - store_state, - msg_store_clients, - durable, - transient_threshold, - qi_embed_msgs_below, - - bytes, %% w/o unacked - unacked_bytes, - persistent_count, %% w unacked - persistent_bytes, %% w unacked - - ram_msg_count, %% w/o unacked - ram_msg_count_prev, - ram_ack_count_prev, - ram_bytes, %% w unacked - out_counter, - in_counter, - rates, - %% There are two confirms paths: either store/index produce confirms - %% separately (v2 with per-vhost message store) or the confirms - %% are produced all at once while syncing/flushing (v2 with per-queue - %% message store). The latter is more efficient as it avoids many - %% sets operations. - msgs_on_disk, - msg_indices_on_disk, - unconfirmed, - confirmed, - ack_out_counter, - ack_in_counter, - %% Unlike the other counters these two do not feed into - %% #rates{} and get reset - disk_read_count, - disk_write_count, - - %% Fast path for confirms handling. Instead of having - %% index/store keep track of confirms separately and - %% doing intersect/subtract/union we just put the messages - %% here and on sync move them to 'confirmed'. - %% - %% Note: This field used to be 'memory_reduction_run_count'. - unconfirmed_simple, - %% Queue data is grouped by VHost. We need to store it - %% to work with queue index. - virtual_host, - waiting_bump = false - }). - --record(rates, { in, out, ack_in, ack_out, timestamp }). - -record(msg_status, { seq_id, msg_id, @@ -103,79 +44,9 @@ msg_props }). --record(q_tail, - { start_seq_id, %% start_seq_id is inclusive - count, - end_seq_id %% end_seq_id is exclusive - }). - - -include_lib("rabbit_common/include/rabbit.hrl"). --define(QUEUE, lqueue). -define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>). -%%---------------------------------------------------------------------------- - --type seq_id() :: non_neg_integer(). - --type rates() :: #rates { in :: float(), - out :: float(), - ack_in :: float(), - ack_out :: float(), - timestamp :: rabbit_types:timestamp()}. - --type q_tail() :: #q_tail { start_seq_id :: non_neg_integer(), - count :: non_neg_integer(), - end_seq_id :: non_neg_integer() }. - -%% The compiler (rightfully) complains that ack() and state() are -%% unused. For this reason we duplicate a -spec from -%% rabbit_backing_queue with the only intent being to remove -%% warnings. The problem here is that we can't parameterise the BQ -%% behaviour by these two types as we would like to. We still leave -%% these here for documentation purposes. --type ack() :: seq_id(). --type state() :: #vqstate { - q_head :: ?QUEUE:?QUEUE(), - q_tail :: q_tail(), - next_seq_id :: seq_id(), - next_deliver_seq_id :: seq_id(), - ram_pending_ack :: map(), - disk_pending_ack :: map(), - index_state :: any(), - store_state :: any(), - msg_store_clients :: 'undefined' | {{any(), binary()}, - {any(), binary()}}, - durable :: boolean(), - transient_threshold :: non_neg_integer(), - qi_embed_msgs_below :: non_neg_integer(), - - bytes :: non_neg_integer(), - unacked_bytes :: non_neg_integer(), - persistent_count :: non_neg_integer(), - persistent_bytes :: non_neg_integer(), - - ram_msg_count :: non_neg_integer(), - ram_msg_count_prev :: non_neg_integer(), - ram_ack_count_prev :: non_neg_integer(), - ram_bytes :: non_neg_integer(), - out_counter :: non_neg_integer(), - in_counter :: non_neg_integer(), - rates :: rates(), - msgs_on_disk :: gb_sets:set(), - msg_indices_on_disk :: gb_sets:set(), - unconfirmed :: gb_sets:set(), - confirmed :: gb_sets:set(), - ack_out_counter :: non_neg_integer(), - ack_in_counter :: non_neg_integer(), - disk_read_count :: non_neg_integer(), - disk_write_count :: non_neg_integer(), - - unconfirmed_simple :: sets:set()}. - -%% Duplicated from rabbit_backing_queue --spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. - %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -198,12 +69,8 @@ delete_and_terminate(Reason, State) -> delete_crashed(Q) -> rabbit_variable_queue:delete_crashed(Q). -purge(State = #vqstate { ram_pending_ack= QPA }) -> - maybe_delay(QPA), - rabbit_variable_queue:purge(State); -%% For v4.2.x because the state has changed. purge(State) -> - QPA = element(9, State), + QPA = ram_pending_acks(State), maybe_delay(QPA), rabbit_variable_queue:purge(State). @@ -236,24 +103,16 @@ drop(AckRequired, State) -> ack(List, State) -> rabbit_variable_queue:ack(List, State). -requeue(AckTags, #vqstate { ram_pending_ack = QPA } = State) -> - maybe_delay(QPA), - rabbit_variable_queue:requeue(AckTags, State); -%% For v4.2.x because the state has changed. requeue(AckTags, State) -> - QPA = element(9, State), + QPA = ram_pending_acks(State), maybe_delay(QPA), rabbit_variable_queue:requeue(AckTags, State). ackfold(MsgFun, Acc, State, AckTags) -> rabbit_variable_queue:ackfold(MsgFun, Acc, State, AckTags). -len(#vqstate { ram_pending_ack = QPA } = State) -> - maybe_delay(QPA), - rabbit_variable_queue:len(State); -%% For v4.2.x because the state has changed. len(State) -> - QPA = element(9, State), + QPA = ram_pending_acks(State), maybe_delay(QPA), rabbit_variable_queue:len(State). @@ -298,6 +157,13 @@ set_queue_mode(_, State) -> set_queue_version(_, State) -> State. +ram_pending_acks(State) -> + case erlang:function_exported(rabbit_variable_queue, ram_pending_acks, 1) of + true -> rabbit_variable_queue:ram_pending_acks(State); + %% For v4.2.x because the state has changed. + false -> element(9, State) + end. + %% Delay maybe_delay(QPA) -> %% The structure for ram_pending_acks has changed to maps in 3.12. diff --git a/deps/rabbit/test/rabbit_dummy_protocol_connection_info.erl b/deps/rabbit/test/rabbit_dummy_protocol_connection_info.erl deleted file mode 100644 index 9b1f3808eb6d..000000000000 --- a/deps/rabbit/test/rabbit_dummy_protocol_connection_info.erl +++ /dev/null @@ -1,19 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - -%% Dummy module to test rabbit_direct:extract_extra_auth_props - --module(rabbit_dummy_protocol_connection_info). - -%% API --export([additional_authn_params/4]). - -additional_authn_params(_Creds, _VHost, Pid, _Infos) -> - case Pid of - -1 -> throw(error); - _ -> [{client_id, <<"DummyClientId">>}] - end. diff --git a/deps/rabbit/test/rabbit_foo_protocol_connection_info.erl b/deps/rabbit/test/rabbit_foo_protocol_connection_info.erl deleted file mode 100644 index 16e2259d6b92..000000000000 --- a/deps/rabbit/test/rabbit_foo_protocol_connection_info.erl +++ /dev/null @@ -1,25 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% --module(rabbit_foo_protocol_connection_info). - -%% Dummy module to test authentication context propagation - -%% API --export([additional_authn_params/4]). - -additional_authn_params(_Creds, _VHost, _Pid, Infos) -> - case proplists:get_value(variable_map, Infos, undefined) of - VariableMap when is_map(VariableMap) -> - case maps:get(<<"key1">>, VariableMap, []) of - Value when is_binary(Value)-> - [{key1, Value}]; - [] -> - [] - end; - _ -> - [] - end. diff --git a/deps/rabbit/test/unit_access_control_SUITE.erl b/deps/rabbit/test/unit_access_control_SUITE.erl index 4d94565ecf50..330a9fea5d7f 100644 --- a/deps/rabbit/test/unit_access_control_SUITE.erl +++ b/deps/rabbit/test/unit_access_control_SUITE.erl @@ -30,8 +30,7 @@ groups() -> login_of_passwordless_user, set_tags_for_passwordless_user, change_password, - auth_backend_internal_expand_topic_permission, - rabbit_direct_extract_extra_auth_props + auth_backend_internal_expand_topic_permission ]} ]. @@ -221,28 +220,6 @@ set_tags_for_passwordless_user1(_Config) -> passed. -rabbit_direct_extract_extra_auth_props(_Config) -> - {ok, CSC} = code_server_cache:start_link(), - % no protocol to extract - [] = rabbit_direct:extract_extra_auth_props( - {<<"guest">>, <<"guest">>}, <<"/">>, 1, - [{name,<<"127.0.0.1:52366 -> 127.0.0.1:1883">>}]), - % protocol to extract, but no module to call - [] = rabbit_direct:extract_extra_auth_props( - {<<"guest">>, <<"guest">>}, <<"/">>, 1, - [{protocol, {'PROTOCOL_WITHOUT_MODULE', "1.0"}}]), - % see rabbit_dummy_protocol_connection_info module - % protocol to extract, module that returns a client ID - [{client_id, <<"DummyClientId">>}] = rabbit_direct:extract_extra_auth_props( - {<<"guest">>, <<"guest">>}, <<"/">>, 1, - [{protocol, {'DUMMY_PROTOCOL', "1.0"}}]), - % protocol to extract, but error thrown in module - [] = rabbit_direct:extract_extra_auth_props( - {<<"guest">>, <<"guest">>}, <<"/">>, -1, - [{protocol, {'DUMMY_PROTOCOL', "1.0"}}]), - gen_server:stop(CSC), - ok. - auth_backend_internal_expand_topic_permission(_Config) -> ExpandMap = #{<<"username">> => <<"guest">>, <<"vhost">> => <<"default">>}, %% simple case diff --git a/deps/rabbit/test/unit_access_control_authn_authz_context_propagation_SUITE.erl b/deps/rabbit/test/unit_access_control_authn_authz_context_propagation_SUITE.erl index 52a1d5c948f6..c4a823f41f78 100644 --- a/deps/rabbit/test/unit_access_control_authn_authz_context_propagation_SUITE.erl +++ b/deps/rabbit/test/unit_access_control_authn_authz_context_propagation_SUITE.erl @@ -83,17 +83,11 @@ propagate_context_to_auth_backend1() -> password = <<"guest">>, adapter_info = #amqp_adapter_info{additional_info = [ {variable_map, #{<<"key1">> => <<"value1">>}} - ], - protocol = {'FOO_PROTOCOL', '1.0'} %% this will trigger a call to rabbit_foo_protocol_connection_info + ] } }, {ok, Conn} = amqp_connection:start(AmqpParams), - %% rabbit_direct will call the rabbit_foo_protocol_connection_info module to extract information - %% this information will be propagated to the authentication backend - [{authentication, AuthProps}] = rabbit_auth_backend_context_propagation_mock:get(authentication), - ?assertEqual(<<"value1">>, proplists:get_value(key1, AuthProps)), - %% variable_map is propagated from rabbit_direct to the authorization backend [{vhost_access, AuthzData}] = rabbit_auth_backend_context_propagation_mock:get(vhost_access), ?assertEqual(<<"value1">>, maps:get(<<"key1">>, AuthzData)), diff --git a/deps/rabbit/test/unit_file_handle_cache_SUITE.erl b/deps/rabbit/test/unit_file_handle_cache_SUITE.erl deleted file mode 100644 index 15a80f455124..000000000000 --- a/deps/rabbit/test/unit_file_handle_cache_SUITE.erl +++ /dev/null @@ -1,213 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(unit_file_handle_cache_SUITE). - --include_lib("eunit/include/eunit.hrl"). --include_lib("eunit/include/eunit.hrl"). --include_lib("kernel/include/file.hrl"). --include_lib("amqp_client/include/amqp_client.hrl"). - --compile(export_all). - --define(TIMEOUT, 30000). - -all() -> - [ - {group, non_parallel_tests} - ]. - -groups() -> - [ - {non_parallel_tests, [], [ - file_handle_cache, %% Change FHC limit. - file_handle_cache_reserve_release, - file_handle_cache_reserve_monitor, - file_handle_cache_reserve_open_file_above_limit - ]} - ]. - -%% ------------------------------------------------------------------- -%% Testsuite setup/teardown. -%% ------------------------------------------------------------------- - -init_per_suite(Config) -> - rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config). - -end_per_suite(Config) -> - rabbit_ct_helpers:run_teardown_steps(Config). - -init_per_group(Group, Config) -> - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, Group}, - {rmq_nodes_count, 2} - ]), - rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps() ++ [ - fun setup_file_handle_cache/1 - ]). - -setup_file_handle_cache(Config) -> - ok = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, setup_file_handle_cache1, []), - Config. - -setup_file_handle_cache1() -> - %% FIXME: Why are we doing this? - application:set_env(rabbit, file_handles_high_watermark, 10), - ok = file_handle_cache:set_limit(10), - ok. - -end_per_group(_Group, Config) -> - rabbit_ct_helpers:run_steps(Config, - rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()). - -init_per_testcase(Testcase, Config) -> - rabbit_ct_helpers:testcase_started(Config, Testcase). - -end_per_testcase(Testcase, Config) -> - rabbit_ct_helpers:testcase_finished(Config, Testcase). - - -%% --------------------------------------------------------------------------- -%% file_handle_cache. -%% --------------------------------------------------------------------------- - -file_handle_cache(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, file_handle_cache1, [Config]). - -file_handle_cache1(_Config) -> - %% test copying when there is just one spare handle - Limit = file_handle_cache:get_limit(), - ok = file_handle_cache:set_limit(5), %% 1 or 2 sockets, 2 msg_stores - TmpDir = filename:join(rabbit:data_dir(), "tmp"), - ok = filelib:ensure_dir(filename:join(TmpDir, "nothing")), - [Src1, Dst1, Src2, Dst2] = Files = - [filename:join(TmpDir, Str) || Str <- ["file1", "file2", "file3", "file4"]], - Content = <<"foo">>, - CopyFun = fun (Src, Dst) -> - {ok, Hdl} = prim_file:open(Src, [binary, write]), - ok = prim_file:write(Hdl, Content), - ok = prim_file:sync(Hdl), - prim_file:close(Hdl), - - {ok, SrcHdl} = file_handle_cache:open(Src, [read], []), - {ok, DstHdl} = file_handle_cache:open(Dst, [write], []), - Size = size(Content), - {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size), - ok = file_handle_cache:delete(SrcHdl), - ok = file_handle_cache:delete(DstHdl) - end, - Pid = spawn(fun () -> {ok, Hdl} = file_handle_cache:open( - filename:join(TmpDir, "file5"), - [write], []), - receive {next, Pid1} -> Pid1 ! {next, self()} end, - file_handle_cache:delete(Hdl), - %% This will block and never return, so we - %% exercise the fhc tidying up the pending - %% queue on the death of a process. - ok = CopyFun(Src1, Dst1) - end), - ok = CopyFun(Src1, Dst1), - ok = file_handle_cache:set_limit(2), - Pid ! {next, self()}, - receive {next, Pid} -> ok end, - timer:sleep(100), - Pid1 = spawn(fun () -> CopyFun(Src2, Dst2) end), - timer:sleep(100), - erlang:monitor(process, Pid), - erlang:monitor(process, Pid1), - exit(Pid, kill), - exit(Pid1, kill), - receive {'DOWN', _MRef, process, Pid, _Reason} -> ok end, - receive {'DOWN', _MRef1, process, Pid1, _Reason1} -> ok end, - [file:delete(File) || File <- Files], - ok = file_handle_cache:set_limit(Limit), - passed. - -file_handle_cache_reserve_release(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, file_handle_cache_reserve_release1, [Config]). - -file_handle_cache_reserve_release1(_Config) -> - ok = file_handle_cache:set_reservation(7), - ?assertEqual([{files_reserved, 7}], file_handle_cache:info([files_reserved])), - ok = file_handle_cache:set_reservation(3), - ?assertEqual([{files_reserved, 3}], file_handle_cache:info([files_reserved])), - ok = file_handle_cache:release_reservation(), - ?assertEqual([{files_reserved, 0}], file_handle_cache:info([files_reserved])), - passed. - -file_handle_cache_reserve_open_file_above_limit(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, file_handle_cache_reserve_open_file_above_limit1, [Config]). - -file_handle_cache_reserve_open_file_above_limit1(_Config) -> - Limit = file_handle_cache:get_limit(), - ok = file_handle_cache:set_limit(5), - %% Reserves are always accepted, even if above the limit - %% These are for special processes such as quorum queues - ok = file_handle_cache:set_reservation(7), - - Self = self(), - TmpDir = filename:join(rabbit:data_dir(), "tmp"), - spawn(fun () -> {ok, _} = file_handle_cache:open( - filename:join(TmpDir, "file_above_limit"), - [write], []), - Self ! opened - end), - - Props = file_handle_cache:info([files_reserved]), - ?assertEqual(7, proplists:get_value(files_reserved, Props)), - - %% The open should still be blocked, as there are no file handles - %% available - receive - opened -> - throw(error_file_opened) - after 30_000 -> - %% Let's release 5 file handles, that should leave - %% enough free for the `open` to go through - file_handle_cache:set_reservation(2), - Props0 = file_handle_cache:info([files_reserved, total_used]), - ?assertEqual(2, proplists:get_value(files_reserved, Props0)), - receive - opened -> - ok = file_handle_cache:set_limit(Limit), - passed - after 30_000 -> - throw(error_file_not_released) - end - end. - -file_handle_cache_reserve_monitor(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, file_handle_cache_reserve_monitor1, [Config]). - -file_handle_cache_reserve_monitor1(_Config) -> - %% Check that if the process that does the reserve dies, the file handlers are - %% released by the cache - Self = self(), - Pid = spawn(fun () -> - ok = file_handle_cache:set_reservation(2), - Self ! done, - receive - stop -> ok - end - end), - receive - done -> ok - end, - ?assertEqual([{files_reserved, 2}], file_handle_cache:info([files_reserved])), - Pid ! stop, - timer:sleep(500), - ?assertEqual([{files_reserved, 0}], file_handle_cache:info([files_reserved])), - passed. diff --git a/deps/rabbit_common/src/rabbit_runtime.erl b/deps/rabbit_common/src/rabbit_runtime.erl index 81e30b750067..b3bfd376cb26 100644 --- a/deps/rabbit_common/src/rabbit_runtime.erl +++ b/deps/rabbit_common/src/rabbit_runtime.erl @@ -17,6 +17,7 @@ -export([guess_number_of_cpu_cores/0, msacc_stats/1]). -export([get_gc_info/1, gc_all_processes/0]). -export([get_erl_path/0]). +-export([ulimit/0]). -spec guess_number_of_cpu_cores() -> pos_integer(). guess_number_of_cpu_cores() -> @@ -63,3 +64,26 @@ get_erl_path() -> _ -> filename:join(BinDir, "erl") end. + +%% To increase the number of file descriptors: on Windows set ERL_MAX_PORTS +%% environment variable, on Linux set `ulimit -n`. +ulimit() -> + IOStats = case erlang:system_info(check_io) of + [Val | _] when is_list(Val) -> Val; + Val when is_list(Val) -> Val; + _Other -> [] + end, + case proplists:get_value(max_fds, IOStats) of + MaxFds when is_integer(MaxFds) andalso MaxFds > 1 -> + case os:type() of + {win32, _OsName} -> + %% On Windows max_fds is twice the number of open files: + %% https://github.com/erlang/otp/blob/64c8ae9966a720b9127f6d5a7e1fb4f9aeaca9b6/erts/emulator/sys/win32/sys.c#L2773-L2782 + MaxFds div 2; + _Any -> + %% For other operating systems trust Erlang. + MaxFds + end; + _ -> + unknown + end. diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/status_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/status_command.ex index 202feb8995e7..6d043091180c 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/status_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/status_command.ex @@ -174,11 +174,6 @@ defmodule RabbitMQ.CLI.Ctl.Commands.StatusCommand do "#{category}: #{IU.convert(val[:bytes], unit)} #{unit} (#{val[:percentage]} %)" end) - file_descriptors = [ - "\n#{bright("File Descriptors")}\n", - "Total: #{m[:file_descriptors][:total_used]}, limit: #{m[:file_descriptors][:total_limit]}" - ] - disk_space_section = [ "\n#{bright("Free Disk Space")}\n", "Low free disk space watermark: #{IU.convert(m[:disk_free_limit], unit)} #{unit}", @@ -210,7 +205,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.StatusCommand do alarms_section ++ tags_section ++ memory_section ++ - file_descriptors ++ disk_space_section ++ totals_section ++ listeners_section + disk_space_section ++ totals_section ++ listeners_section {:ok, Enum.join(lines, line_separator())} end @@ -273,7 +268,6 @@ defmodule RabbitMQ.CLI.Ctl.Commands.StatusCommand do vm_memory_high_watermark_limit: Keyword.get(result, :vm_memory_limit), disk_free_limit: Keyword.get(result, :disk_free_limit), disk_free: Keyword.get(result, :disk_free), - file_descriptors: Enum.into(Keyword.get(result, :file_descriptors), %{}), alarms: Keyword.get(result, :alarms), tags: Keyword.get(result, :tags, []), listeners: listener_maps(Keyword.get(result, :listeners, [])), diff --git a/deps/rabbitmq_cli/test/json_formatting_test.exs b/deps/rabbitmq_cli/test/json_formatting_test.exs index 83beeb3a4d9a..fe0ed19da49a 100644 --- a/deps/rabbitmq_cli/test/json_formatting_test.exs +++ b/deps/rabbitmq_cli/test/json_formatting_test.exs @@ -32,7 +32,6 @@ defmodule JSONFormattingTest do {:ok, doc} = JSON.decode(output) assert Map.has_key?(doc, "memory") - assert Map.has_key?(doc, "file_descriptors") assert Map.has_key?(doc, "listeners") assert Map.has_key?(doc, "processes") assert Map.has_key?(doc, "os") diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_external_stats.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_external_stats.erl index 072617ec13e9..1ed3d632334b 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_external_stats.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_external_stats.erl @@ -344,7 +344,7 @@ format_mochiweb_option(_K, V) -> init([]) -> {ok, Interval} = application:get_env(rabbit, collect_statistics_interval), - State = #state{fd_total = file_handle_cache:ulimit(), + State = #state{fd_total = rabbit_runtime:ulimit(), fhc_stats = get_fhc_stats(), node_owners = sets:new(), interval = Interval}, @@ -370,8 +370,7 @@ code_change(_, State, _) -> {ok, State}. %%-------------------------------------------------------------------- -emit_update(State0) -> - State1 = update_state(State0), +emit_update(State1) -> {State2, MStats} = infos(?METRICS_KEYS, [], State1), {State3, PStats} = infos(?PERSISTER_KEYS, [], State2), {State4, OStats} = infos(?OTHER_KEYS, [], State3), @@ -399,12 +398,6 @@ emit_node_node_stats(State = #state{node_owners = Owners}) -> end || {Node, _Owner, Stats} <- Links], State#state{node_owners = NewOwners}. -update_state(State0) -> - %% Store raw data, the average operation time is calculated during querying - %% from the accumulated total - FHC = get_fhc_stats(), - State0#state{fhc_stats = FHC}. - %% @todo All these stats are zeroes. Remove eventually. get_fhc_stats() -> dict:to_list(dict:merge(fun(_, V1, V2) -> V1 + V2 end, diff --git a/deps/rabbitmq_prelaunch/src/rabbit_prelaunch.erl b/deps/rabbitmq_prelaunch/src/rabbit_prelaunch.erl index 832ecd1cc1a0..460500820f00 100644 --- a/deps/rabbitmq_prelaunch/src/rabbit_prelaunch.erl +++ b/deps/rabbitmq_prelaunch/src/rabbit_prelaunch.erl @@ -106,6 +106,9 @@ do_run() -> rabbit_env:context_to_code_path(Context), rabbit_env:context_to_app_env_vars(Context), + %% 0. Ulimit check. + ok = rabbit_prelaunch_ulimit:check(Context), + %% 1. Erlang/OTP compatibility check. ok = rabbit_prelaunch_erlang_compat:check(Context), diff --git a/deps/rabbitmq_prelaunch/src/rabbit_prelaunch_ulimit.erl b/deps/rabbitmq_prelaunch/src/rabbit_prelaunch_ulimit.erl new file mode 100644 index 000000000000..a7d0947c8f5f --- /dev/null +++ b/deps/rabbitmq_prelaunch/src/rabbit_prelaunch_ulimit.erl @@ -0,0 +1,21 @@ +-module(rabbit_prelaunch_ulimit). + +-include_lib("kernel/include/logger.hrl"). + +-include_lib("rabbit_common/include/logging.hrl"). + +-export([check/1]). + +%% Absolute minimum number of FDs recommended to run RabbitMQ. +-define(ULIMIT_MINIMUM, 1024). + +check(_Context) -> + case rabbit_runtime:ulimit() of + %% unknown is included as atom() > integer(). + L when L > ?ULIMIT_MINIMUM -> + ok; + L -> + ?LOG_WARNING("Available file handles: ~tp. " + "Please consider increasing system limits", [L], + #{domain => ?RMQLOG_DOMAIN_PRELAUNCH}) + end. diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_connection_info.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_connection_info.erl deleted file mode 100644 index d9528799ec57..000000000000 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_connection_info.erl +++ /dev/null @@ -1,17 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(rabbit_stomp_connection_info). - -%% Note: this is necessary to prevent code:get_object_code from -%% backing up due to a missing module. See VESC-888. - -%% API --export([additional_authn_params/4]). - -additional_authn_params(_Creds, _VHost, _Pid, _Infos) -> - []. diff --git a/deps/rabbitmq_web_mqtt/priv/schema/rabbitmq_web_mqtt.schema b/deps/rabbitmq_web_mqtt/priv/schema/rabbitmq_web_mqtt.schema index 981f038a1600..fdab7d2e4d2b 100644 --- a/deps/rabbitmq_web_mqtt/priv/schema/rabbitmq_web_mqtt.schema +++ b/deps/rabbitmq_web_mqtt/priv/schema/rabbitmq_web_mqtt.schema @@ -183,7 +183,7 @@ end}. [{datatype, {enum, [true, false]}}]}. %% -%% File Handle Cache +%% File Handle Cache (no-op, kept for compatibility). %% {mapping, "web_mqtt.use_file_handle_cache", "rabbitmq_web_mqtt.use_file_handle_cache", diff --git a/deps/rabbitmq_web_stomp/priv/schema/rabbitmq_web_stomp.schema b/deps/rabbitmq_web_stomp/priv/schema/rabbitmq_web_stomp.schema index 1e618c627b26..d09e1945d448 100644 --- a/deps/rabbitmq_web_stomp/priv/schema/rabbitmq_web_stomp.schema +++ b/deps/rabbitmq_web_stomp/priv/schema/rabbitmq_web_stomp.schema @@ -191,10 +191,10 @@ end}. [{datatype, {enum, [true, false]}}]}. %% -%% File Handle Cache +%% File Handle Cache (no-op, kept for compatibility). %% {mapping, "web_stomp.use_file_handle_cache", "rabbitmq_web_stomp.use_file_handle_cache", [ {datatype, {enum, [true, false]}} - ]}. \ No newline at end of file + ]}.