diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index 03326e1841f1..3dc1d2c1f3dc 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -899,7 +899,7 @@ handle_cast({write, CRef, MsgRef, MsgId, Flow}, %% or the non-current files. If the message *is* in the %% current file then the cache entry will be removed by %% the normal logic for that in write_message/4 and - %% maybe_roll_to_new_file/2. + %% flush_or_roll_to_new_file/2. case index_lookup(MsgId, State) of [#msg_location { file = File }] when File == State #msstate.current_file -> @@ -1208,26 +1208,102 @@ gc_candidate(File, State = #msstate{ gc_candidates = Candidates, gc_candidate(File, State = #msstate{ gc_candidates = Candidates }) -> State#msstate{ gc_candidates = Candidates#{ File => true }}. -write_message(MsgId, Msg, - State0 = #msstate { current_file_handle = CurHdl, - current_file = CurFile, - current_file_offset = CurOffset, - file_summary_ets = FileSummaryEts }) -> - {MaybeFlush, TotalSize} = writer_append(CurHdl, MsgId, Msg), - State = case MaybeFlush of - flush -> internal_sync(State0); - ok -> State0 - end, +-define(LARGE_MESSAGE_THRESHOLD, 4194304). %% 4MB. + +write_message(MsgId, MsgBody, State) -> + MsgBodyBin = term_to_binary(MsgBody), + %% Large messages get written to their own files. + if + byte_size(MsgBodyBin) >= ?LARGE_MESSAGE_THRESHOLD -> + write_large_message(MsgId, MsgBodyBin, State); + true -> + write_small_message(MsgId, MsgBodyBin, State) + end. + +write_small_message(MsgId, MsgBodyBin, + State = #msstate { current_file_handle = CurHdl, + current_file = CurFile, + current_file_offset = CurOffset, + file_summary_ets = FileSummaryEts }) -> + {MaybeFlush, TotalSize} = writer_append(CurHdl, MsgId, MsgBodyBin), ok = index_insert( #msg_location { msg_id = MsgId, ref_count = 1, file = CurFile, offset = CurOffset, total_size = TotalSize }, State), [_,_] = ets:update_counter(FileSummaryEts, CurFile, [{#file_summary.valid_total_size, TotalSize}, {#file_summary.file_size, TotalSize}]), - maybe_roll_to_new_file(CurOffset + TotalSize, + flush_or_roll_to_new_file(CurOffset + TotalSize, MaybeFlush, State #msstate { current_file_offset = CurOffset + TotalSize }). +flush_or_roll_to_new_file( + Offset, _MaybeFlush, + State = #msstate { dir = Dir, + current_file_handle = CurHdl, + current_file = CurFile, + file_summary_ets = FileSummaryEts, + cur_file_cache_ets = CurFileCacheEts, + file_size_limit = FileSizeLimit }) + when Offset >= FileSizeLimit -> + State1 = internal_sync(State), + ok = writer_close(CurHdl), + NextFile = CurFile + 1, + {ok, NextHdl} = writer_open(Dir, NextFile), + true = ets:insert_new(FileSummaryEts, #file_summary { + file = NextFile, + valid_total_size = 0, + file_size = 0, + locked = false }), + %% Delete messages from the cache that were written to disk. + true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}), + State1 #msstate { current_file_handle = NextHdl, + current_file = NextFile, + current_file_offset = 0 }; +%% If we need to flush, do so here. +flush_or_roll_to_new_file(_, flush, State) -> + internal_sync(State); +flush_or_roll_to_new_file(_, _, State) -> + State. + +write_large_message(MsgId, MsgBodyBin, + State0 = #msstate { dir = Dir, + current_file_handle = CurHdl, + current_file = CurFile, + file_summary_ets = FileSummaryEts, + cur_file_cache_ets = CurFileCacheEts }) -> + %% Flush the current file and close it. + ok = writer_flush(CurHdl), + ok = writer_close(CurHdl), + %% Open a new file, write the message directly and close it. + LargeMsgFile = CurFile + 1, + {ok, LargeMsgHdl} = writer_open(Dir, LargeMsgFile), + TotalSize = writer_direct_write(LargeMsgHdl, MsgId, MsgBodyBin), + ok = writer_close(LargeMsgHdl), + %% Update ets with the new information directly. + ok = index_insert( + #msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile, + offset = 0, total_size = TotalSize }, State0), + true = ets:insert_new(FileSummaryEts, #file_summary { + file = LargeMsgFile, + valid_total_size = TotalSize, + file_size = TotalSize, + locked = false }), + %% Roll over to the next file. + NextFile = LargeMsgFile + 1, + {ok, NextHdl} = writer_open(Dir, NextFile), + true = ets:insert_new(FileSummaryEts, #file_summary { + file = NextFile, + valid_total_size = 0, + file_size = 0, + locked = false }), + %% Delete messages from the cache that were written to disk. + true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}), + %% Process confirms (this won't flush; we already did) and continue. + State = internal_sync(State0), + State #msstate { current_file_handle = NextHdl, + current_file = NextFile, + current_file_offset = 0 }. + contains_message(MsgId, From, State) -> MsgLocation = index_lookup_positive_ref_count(MsgId, State), gen_server2:reply(From, MsgLocation =/= not_found), @@ -1325,8 +1401,7 @@ writer_recover(Dir, Num, Offset) -> ok = file:truncate(Fd), {ok, #writer{fd = Fd, buffer = prim_buffer:new()}}. -writer_append(#writer{buffer = Buffer}, MsgId, MsgBody) -> - MsgBodyBin = term_to_binary(MsgBody), +writer_append(#writer{buffer = Buffer}, MsgId, MsgBodyBin) -> MsgBodyBinSize = byte_size(MsgBodyBin), EntrySize = MsgBodyBinSize + 16, %% Size of MsgId + MsgBodyBin. %% We send an iovec to the buffer instead of building a binary. @@ -1354,6 +1429,21 @@ writer_flush(#writer{fd = Fd, buffer = Buffer}) -> file:write(Fd, prim_buffer:read_iovec(Buffer, Size)) end. +%% For large messages we don't buffer anything. Large messages +%% are kept within their own files. +%% +%% This is basically the same as writer_append except no buffering. +writer_direct_write(#writer{fd = Fd}, MsgId, MsgBodyBin) -> + MsgBodyBinSize = byte_size(MsgBodyBin), + EntrySize = MsgBodyBinSize + 16, %% Size of MsgId + MsgBodyBin. + file:write(Fd, [ + <>, + MsgId, + MsgBodyBin, + <<255>> %% OK marker. + ]), + EntrySize + 9. + writer_close(#writer{fd = Fd}) -> file:close(Fd). @@ -1700,33 +1790,6 @@ rebuild_index(Gatherer, Files, State) -> %% garbage collection / compaction / aggregation -- internal %%---------------------------------------------------------------------------- -maybe_roll_to_new_file( - Offset, - State = #msstate { dir = Dir, - current_file_handle = CurHdl, - current_file = CurFile, - file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts, - file_size_limit = FileSizeLimit }) - when Offset >= FileSizeLimit -> - State1 = internal_sync(State), - ok = writer_close(CurHdl), - NextFile = CurFile + 1, - {ok, NextHdl} = writer_open(Dir, NextFile), - true = ets:insert_new(FileSummaryEts, #file_summary { - file = NextFile, - valid_total_size = 0, - file_size = 0, - locked = false }), - %% We only delete messages from the cache that were written to disk - %% in the previous file. - true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}), - State1 #msstate { current_file_handle = NextHdl, - current_file = NextFile, - current_file_offset = 0 }; -maybe_roll_to_new_file(_, State) -> - State. - %% We keep track of files that have seen removes and %% check those periodically for compaction. We only %% compact files that have less than half valid data.