Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/osiris.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@
-type reader_options() :: #{transport => tcp | ssl,
chunk_selector => all | user_data,
filter_spec => osiris_bloom:filter_spec(),
read_ahead => boolean(),
read_ahead_limit => pos_integer()
read_ahead => boolean() | non_neg_integer()
}.

-export_type([name/0,
Expand Down
27 changes: 16 additions & 11 deletions src/osiris_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,11 @@
filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size()
}).
-record(ra,
{on = true :: boolean(),
size = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE :: non_neg_integer(),
{size = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE :: non_neg_integer(),
buf :: undefined | {Pos :: non_neg_integer(), binary()},
limit = ?DEFAULT_READ_AHEAD_LIMIT :: pos_integer()
%% Max number of bytes that can be read ahead. Set to zero to
%% turn off read-ahead.
limit = ?DEFAULT_READ_AHEAD_LIMIT :: non_neg_integer()
}).
-record(read,
{type :: data | offset,
Expand Down Expand Up @@ -3312,7 +3313,7 @@ iter_guess_size(Credit0, NumEntries, DataSize) ->
(DataSize div NumEntries * Credit).

iter_read_ahead(Fd, Pos, MinReqSize, Credit0, DataSize, NumEntries,
#ra{limit = ReadAheadLimit} = Ra0)
#ra{limit = ReadAheadLimit0} = Ra0)
when is_integer(Credit0) andalso
MinReqSize =< DataSize ->
%% if the minimum request size can be served from read ahead then we
Expand All @@ -3335,6 +3336,8 @@ iter_read_ahead(Fd, Pos, MinReqSize, Credit0, DataSize, NumEntries,
%% needed to serve that, else we read up to the readahead
%% limit but not beyond the end of the chunk and not less
%% that the minimum request size
ReadAheadLimit = max(ReadAheadLimit0,
?DEFAULT_READ_AHEAD_LIMIT),
MinSize = max(MinReqSize, min(ReadAheadLimit, DataSize)),
Size = max(MinSize, iter_guess_size(Credit0, NumEntries,
DataSize)),
Expand Down Expand Up @@ -3364,15 +3367,17 @@ ra_read(_Pos, _Len, _Ra) ->
undefined.

ra_update_size(undefined, FilterSize, LastDataSize,
#ra{on = true, size = Sz, limit = Limit} = Ra)
when Sz < Limit andalso
#ra{size = Sz, limit = Limit} = Ra)
when Limit =/= 0 andalso
Sz < Limit andalso
LastDataSize =< (Limit - ?HEADER_SIZE_B -
FilterSize - ?REC_HDR_SZ_SUBBATCH_B) ->
%% no filter and last data size was small so enable data read ahead
Ra#ra{size = Limit};
ra_update_size(undefined, FilterSize, LastDataSize,
#ra{on = true, size = Limit, limit = Limit} = Ra)
when LastDataSize =< (Limit - ?HEADER_SIZE_B -
#ra{size = Limit, limit = Limit} = Ra)
when Limit =/= 0 andalso
LastDataSize =< (Limit - ?HEADER_SIZE_B -
FilterSize - ?REC_HDR_SZ_SUBBATCH_B) ->
Ra;
ra_update_size(_Filter, FilterSize, _LastDataSize, #ra{size = Sz} = Ra) ->
Expand All @@ -3399,11 +3404,11 @@ ra_fill(Fd, Pos, #ra{size = Sz} = Ra) ->

-spec ra(config()) -> #ra{}.
ra(#{options := #{read_ahead := false}}) ->
#ra{on = false};
ra(#{options := #{read_ahead_limit := Limit}}) when is_integer(Limit) ->
#ra{limit = 0};
ra(#{options := #{read_ahead := Limit}}) when is_integer(Limit) ->
#ra{limit = Limit};
ra(_) ->
#ra{on = true}.
#ra{}.

generate_log(Msg, MsgsPerChunk, NumMessages, Directory) ->
Name = filename:basename(Directory),
Expand Down