diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index c007620dde51..95cb9b401562 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -1515,28 +1515,38 @@ scan_data(<> = Data, %% a remnant from a previous compaction, but it might %% simply be a coincidence. Try the next byte. #{MsgIdInt := true} -> - <<_, Rest2/bits>> = Data, - scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc); + scan_next_byte(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc); %% Data looks to be a message. _ -> - %% Avoid sub-binary construction. - MsgId = <>, TotalSize = Size + 9, - case Fun({MsgId, TotalSize, Offset}) of - %% Confirmed to be a message by the provided fun. - {valid, Entry} -> + case check_msg(Fun, MsgIdInt, TotalSize, Offset, Acc) of + {continue, NewAcc} -> scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize, - MsgIdsFound#{MsgIdInt => true}, [Entry|Acc]); - %% Confirmed to be a message but we don't need it anymore. - previously_valid -> - scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize, - MsgIdsFound#{MsgIdInt => true}, Acc); - %% Not a message, try the next byte. - invalid -> - <<_, Rest2/bits>> = Data, - scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc) + MsgIdsFound#{MsgIdInt => true}, NewAcc); + try_next_byte -> + scan_next_byte(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) end end; +%% Large message alone in its own file +scan_data(<> = Data, Fd, Fun, Offset, FileSize, _MsgIdsFound, _Acc) + when Offset == 0, + FileSize == Size + 9 -> + {ok, CurrentPos} = file:position(Fd, cur), + case file:pread(Fd, FileSize - 1, 1) of + {ok, <<255>>} -> + TotalSize = FileSize, + case check_msg(Fun, MsgIdInt, TotalSize, Offset, []) of + {continue, NewAcc} -> + NewAcc; + try_next_byte -> + {ok, _} = file:position(Fd, CurrentPos), + scan_next_byte(Data, Fd, Fun, Offset, FileSize, #{}, []) + end; + _ -> + %% Wrong end marker + {ok, _} = file:position(Fd, CurrentPos), + scan_next_byte(Data, Fd, Fun, Offset, FileSize, #{}, []) + end; %% This might be the start of a message. scan_data(<> = Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) when byte_size(Rest) < Size + 1, Size < FileSize - Offset -> @@ -1545,9 +1555,27 @@ scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) when byte_size(Data) < 8 -> scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc); %% This is definitely not a message. Try the next byte. -scan_data(<<_, Rest/bits>>, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) -> +scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) -> + scan_next_byte(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc). + +scan_next_byte(<<_, Rest/bits>>, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) -> scan_data(Rest, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc). +check_msg(Fun, MsgIdInt, TotalSize, Offset, Acc) -> + %% Avoid sub-binary construction. + MsgId = <>, + case Fun({MsgId, TotalSize, Offset}) of + %% Confirmed to be a message by the provided fun. + {valid, Entry} -> + {continue, [Entry|Acc]}; + %% Confirmed to be a message but we don't need it anymore. + previously_valid -> + {continue, Acc}; + %% Not a message, try the next byte. + invalid -> + try_next_byte + end. + %%---------------------------------------------------------------------------- %% Ets index %%----------------------------------------------------------------------------