diff --git a/src/osiris.erl b/src/osiris.erl index c787c53..8d95d04 100644 --- a/src/osiris.erl +++ b/src/osiris.erl @@ -83,8 +83,7 @@ -type reader_options() :: #{transport => tcp | ssl, chunk_selector => all | user_data, filter_spec => osiris_bloom:filter_spec(), - read_ahead => boolean(), - read_ahead_limit => pos_integer() + read_ahead => boolean() | non_neg_integer() }. -export_type([name/0, diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 5a215fc..4a339b1 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -423,10 +423,11 @@ filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size() }). -record(ra, - {on = true :: boolean(), - size = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE :: non_neg_integer(), + {size = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE :: non_neg_integer(), buf :: undefined | {Pos :: non_neg_integer(), binary()}, - limit = ?DEFAULT_READ_AHEAD_LIMIT :: pos_integer() + %% Max number of bytes that can be read ahead. Set to zero to + %% turn off read-ahead. + limit = ?DEFAULT_READ_AHEAD_LIMIT :: non_neg_integer() }). -record(read, {type :: data | offset, @@ -3312,7 +3313,7 @@ iter_guess_size(Credit0, NumEntries, DataSize) -> (DataSize div NumEntries * Credit). iter_read_ahead(Fd, Pos, MinReqSize, Credit0, DataSize, NumEntries, - #ra{limit = ReadAheadLimit} = Ra0) + #ra{limit = ReadAheadLimit0} = Ra0) when is_integer(Credit0) andalso MinReqSize =< DataSize -> %% if the minimum request size can be served from read ahead then we @@ -3335,6 +3336,8 @@ iter_read_ahead(Fd, Pos, MinReqSize, Credit0, DataSize, NumEntries, %% needed to serve that, else we read up to the readahead %% limit but not beyond the end of the chunk and not less %% that the minimum request size + ReadAheadLimit = max(ReadAheadLimit0, + ?DEFAULT_READ_AHEAD_LIMIT), MinSize = max(MinReqSize, min(ReadAheadLimit, DataSize)), Size = max(MinSize, iter_guess_size(Credit0, NumEntries, DataSize)), @@ -3364,15 +3367,17 @@ ra_read(_Pos, _Len, _Ra) -> undefined. ra_update_size(undefined, FilterSize, LastDataSize, - #ra{on = true, size = Sz, limit = Limit} = Ra) - when Sz < Limit andalso + #ra{size = Sz, limit = Limit} = Ra) + when Limit =/= 0 andalso + Sz < Limit andalso LastDataSize =< (Limit - ?HEADER_SIZE_B - FilterSize - ?REC_HDR_SZ_SUBBATCH_B) -> %% no filter and last data size was small so enable data read ahead Ra#ra{size = Limit}; ra_update_size(undefined, FilterSize, LastDataSize, - #ra{on = true, size = Limit, limit = Limit} = Ra) - when LastDataSize =< (Limit - ?HEADER_SIZE_B - + #ra{size = Limit, limit = Limit} = Ra) + when Limit =/= 0 andalso + LastDataSize =< (Limit - ?HEADER_SIZE_B - FilterSize - ?REC_HDR_SZ_SUBBATCH_B) -> Ra; ra_update_size(_Filter, FilterSize, _LastDataSize, #ra{size = Sz} = Ra) -> @@ -3399,11 +3404,11 @@ ra_fill(Fd, Pos, #ra{size = Sz} = Ra) -> -spec ra(config()) -> #ra{}. ra(#{options := #{read_ahead := false}}) -> - #ra{on = false}; -ra(#{options := #{read_ahead_limit := Limit}}) when is_integer(Limit) -> + #ra{limit = 0}; +ra(#{options := #{read_ahead := Limit}}) when is_integer(Limit) -> #ra{limit = Limit}; ra(_) -> - #ra{on = true}. + #ra{}. generate_log(Msg, MsgsPerChunk, NumMessages, Directory) -> Name = filename:basename(Directory),