diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 5e3a75ab..a15b486b 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -386,7 +386,11 @@ timestamp :: non_neg_integer(), epoch :: epoch(), num :: non_neg_integer(), - type :: chunk_type() + type :: chunk_type(), + %% size of data + trailer + size :: non_neg_integer(), + %% position in segment file + pos :: integer() }). -record(seg_info, {file :: file:filename(), @@ -795,131 +799,92 @@ truncate_to(Name, Range, [{E, ChId} | NextEOs], SegInfos) -> empty | {offset(), offset()}}} | {error, {invalid_last_offset_epoch, offset(), offset()}}. -init_data_reader({StartOffset, PrevEO}, #{dir := Dir, - readers_counter_fun := Fun} = Config) -> +init_data_reader({StartChunkId, PrevEOT}, #{dir := Dir} = Config) -> SegInfos = build_log_overview(Dir), Range = offset_range_from_segment_infos(SegInfos), ?DEBUG("osiris_segment:init_data_reader/2 at ~b prev " - "~w range: ~w", - [StartOffset, PrevEO, Range]), + "~w local range: ~w", + [StartChunkId, PrevEOT, Range]), %% Invariant: there is always at least one segment left on disk case Range of - {F, _L} when StartOffset < F -> - %% if a lower than exisiting is request simply forward - %% it to the first offset of the log - %% in this case we cannot validate PrevEO - instead - %% the replica should truncate all of it's exisiting log - case find_segment_for_offset(F, SegInfos) of - not_found -> - %% this is unexpected and thus an error - exit({segment_not_found, F, SegInfos}); - {_, StartSegmentInfo} -> - {ok, - init_data_reader_from_segment(Config, StartSegmentInfo, F, Fun)} - end; - empty when StartOffset > 0 -> + {FstOffs, LastOffs} + when StartChunkId < FstOffs + orelse StartChunkId > LastOffs + 1 -> {error, {offset_out_of_range, Range}}; - {_F, L} when StartOffset > L + 1 -> - %% if we are trying to attach to anything larger than - %% the next offset (i.e last +1) this is in out of range - %% error + empty when StartChunkId > 0 -> {error, {offset_out_of_range, Range}}; - _ -> + _ when PrevEOT == empty -> %% this assumes the offset is in range %% first we need to validate PrevEO - case PrevEO of - empty -> - case find_segment_for_offset(StartOffset, SegInfos) of - not_found -> - %% this is unexpected and thus an error - exit({segment_not_found, StartOffset, SegInfos}); - {_, StartSegmentInfo} -> - {ok, - init_data_reader_from_segment(Config, - StartSegmentInfo, - StartOffset, - Fun)} - end; - {PrevE, PrevO, _PrevTs} -> - case find_segment_for_offset(PrevO, SegInfos) of - not_found -> - %% this is unexpected and thus an error - {error, - {invalid_last_offset_epoch, PrevE, unknown}}; - {_, SegmentInfo = #seg_info{file = PrevSeg}} -> - %% prev segment exists, does it have the correct - %% epoch? - {ok, Fd} = file:open(PrevSeg, [raw, binary, read]), - %% TODO: next offset needs to be a chunk offset - {_, FilePos} = scan_idx(PrevO, SegmentInfo), - {ok, FilePos} = file:position(Fd, FilePos), - case file:read(Fd, ?HEADER_SIZE_B) of - {ok, - <>} -> - ok = file:close(Fd), - {ok, - init_data_reader_from_segment(Config, - element(2, - find_segment_for_offset(StartOffset, - SegInfos)), - StartOffset, - Fun)}; - {ok, - <>} -> - ok = file:close(Fd), - {error, - {invalid_last_offset_epoch, PrevE, OtherE}} - end - end + {ok, init_data_reader_from( + StartChunkId, + find_segment_for_offset(StartChunkId, SegInfos), + Config)}; + _ -> + {PrevEpoch, PrevChunkId, _PrevTs} = PrevEOT, + case check_chunk_has_expected_epoch(PrevChunkId, PrevEpoch, SegInfos) of + ok -> + {ok, init_data_reader_from( + StartChunkId, + find_segment_for_offset(StartChunkId, SegInfos), + Config)}; + {error, _} = Err -> + Err + end + end. + +check_chunk_has_expected_epoch(ChunkId, Epoch, SegInfos) -> + case find_segment_for_offset(ChunkId, SegInfos) of + not_found -> + %% this is unexpected and thus an error + {error, + {invalid_last_offset_epoch, Epoch, unknown}}; + {found, SegmentInfo = #seg_info{file = _PrevSeg}} -> + %% prev segment exists, does it have the correct + %% epoch? + case scan_idx(ChunkId, SegmentInfo) of + {ChunkId, Epoch, _PrevPos} -> + ok; + {ChunkId, OtherEpoch, _} -> + {error, + {invalid_last_offset_epoch, Epoch, OtherEpoch}} end end. -init_data_reader_from_segment(#{dir := Dir, name := Name} = Config, - SegmentInfo = #seg_info{file = StartSegment}, - NextOffs, CounterFun) -> - {ok, Fd} = file:open(StartSegment, [raw, binary, read]), - %% TODO: next offset needs to be a chunk offset - {_, FilePos} = scan_idx(NextOffs, SegmentInfo), - {ok, _Pos} = file:position(Fd, FilePos), +init_data_reader_at(ChunkId, FilePos, File, + #{dir := Dir, name := Name, + readers_counter_fun := CountersFun} = Config) -> + {ok, Fd} = file:open(File, [raw, binary, read]), + {ok, FilePos} = file:position(Fd, FilePos), Cnt = make_counter(Config), - counters:put(Cnt, ?C_OFFSET, NextOffs - 1), - CounterFun(1), + counters:put(Cnt, ?C_OFFSET, ChunkId - 1), + CountersFun(1), #?MODULE{cfg = #cfg{directory = Dir, counter = Cnt, name = Name, - readers_counter_fun = CounterFun, + readers_counter_fun = CountersFun, first_offset_fun = fun (_) -> ok end}, mode = #read{type = data, offset_ref = maps:get(offset_ref, Config, undefined), - next_offset = NextOffs, + next_offset = ChunkId, chunk_selector = all, transport = maps:get(transport, Config, tcp)}, fd = Fd}. +init_data_reader_from(ChunkId, + {end_of_log, #seg_info{file = File, + last = LastChunk}}, + Config) -> + {ChunkId, AttachPos} = next_location(LastChunk), + init_data_reader_at(ChunkId, AttachPos, File, Config); +init_data_reader_from(ChunkId, + {found, #seg_info{file = File} = SegInfo}, + Config) -> + {ChunkId, _Epoch, FilePos} = scan_idx(ChunkId, SegInfo), + init_data_reader_at(ChunkId, FilePos, File, Config). + %% @doc Initialise a new offset reader %% @param OffsetSpec specifies where in the log to attach the reader %% `first': Attach at first available offset. @@ -940,9 +905,9 @@ init_data_reader_from_segment(#{dir := Dir, name := Name} = Config, {ok, state()} | {error, {offset_out_of_range, - empty | {From :: offset(), To :: offset()}}}. + empty | {From :: offset(), To :: offset()}}} | + {error, {invalid_chunk_header, term()}}. init_offset_reader({abs, Offs}, #{dir := Dir} = Conf) -> - %% TODO: some unnecessary computation here Range = offset_range_from_segment_infos(build_log_overview(Dir)), case Range of empty -> @@ -957,10 +922,13 @@ init_offset_reader({timestamp, Ts}, #{dir := Dir} = Conf) -> case build_log_overview(Dir) of [] -> init_offset_reader(next, Conf); - [#seg_info{first = #chunk_info{timestamp = Fst}} | _] + [#seg_info{file = SegmentFile, + first = #chunk_info{timestamp = Fst, + pos = FilePos, + id = ChunkId}} | _] when is_integer(Fst) andalso Fst > Ts -> %% timestamp is lower than the first timestamp available - init_offset_reader(first, Conf); + open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); SegInfos -> case lists:search(fun (#seg_info{first = #chunk_info{timestamp = F}, last = #chunk_info{timestamp = L}}) @@ -972,100 +940,146 @@ init_offset_reader({timestamp, Ts}, #{dir := Dir} = Conf) -> end, SegInfos) of - {value, Info} -> + {value, #seg_info{file = SegmentFile} = Info} -> %% segment was found, now we need to scan index to %% find nearest offset - ChunkId = chunk_id_for_timestamp(Info, Ts), - init_offset_reader(ChunkId, Conf); + {ChunkId, FilePos} = chunk_location_for_timestamp(Info, Ts), + open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); false -> %% segment was not found, attach next + %% this should be rare so no need to call the more optimal + %% open_offset_reader_at/4 function init_offset_reader(next, Conf) end end; -init_offset_reader(OffsetSpec, - #{dir := Dir, - name := Name, - offset_ref := OffsetRef, - readers_counter_fun := ReaderCounterFun, - options := Options} = - Conf) -> +init_offset_reader(first, #{dir := Dir} = Conf) -> + case build_log_overview(Dir) of + [#seg_info{file = File, + first = undefined}] -> + %% empty log, attach at 0 + open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); + [#seg_info{file = File, + first = #chunk_info{id = FirstChunkId, + pos = FilePos}} | _] -> + open_offset_reader_at(File, FirstChunkId, FilePos, Conf); + _ -> + exit(no_segments_found) + end; +init_offset_reader(next, #{dir := Dir} = Conf) -> + SegInfos = build_log_overview(Dir), + case lists:reverse(SegInfos) of + [#seg_info{file = File, + last = LastChunk} | _] -> + {NextChunkId, FilePos} = next_location(LastChunk), + open_offset_reader_at(File, NextChunkId, FilePos, Conf); + _ -> + exit(no_segments_found) + end; +init_offset_reader(last, #{dir := Dir} = Conf) -> + SegInfos = build_log_overview(Dir), + case lists:reverse(SegInfos) of + [#seg_info{file = File, + last = undefined}] -> + %% empty log, attach at 0 + open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); + [#seg_info{file = File, + last = #chunk_info{type = ?CHNK_USER, + id = LastChunkId, + pos = FilePos}} | _] -> + open_offset_reader_at(File, LastChunkId, FilePos, Conf); + _ -> + case last_user_chunk_location(SegInfos) of + not_found -> + ?DEBUG("~s:~s use chunk not found, fall back to next", + [?MODULE, ?FUNCTION_NAME]), + %% no user chunks in stream, this is awkward, fall back to next + init_offset_reader(next, Conf); + {ChunkId, FilePos, #seg_info{file = File}} -> + open_offset_reader_at(File, ChunkId, FilePos, Conf) + end + end; +init_offset_reader(OffsetSpec, #{dir := Dir} = Conf) + when is_integer(OffsetSpec) -> SegInfos = build_log_overview(Dir), ChunkRange = chunk_range_from_segment_infos(SegInfos), Range = offset_range_from_chunk_range(ChunkRange), - ?DEBUG("osiris_log:init_offset_reader/2 spec ~w range " - "~w ", + ?DEBUG("osiris_log:init_offset_reader/2 spec ~w range ~w ", [OffsetSpec, Range]), - StartOffset = - case {OffsetSpec, Range} of - {_, empty} -> - 0; - {first, {F, _}} -> - F; - {last, {_, L}} -> - case ChunkRange of - {_FirstChunk, #chunk_info{type = ?CHNK_USER}} -> - %% Last chunk in last segment is a user chunk. - L; - _ -> - %% Last chunk in last segment is not a user chunk (but e.g. a tracking chunk). - %% Therefore, find the last user chunk by searching the index files backwards. - last_user_chunk_id(SegInfos) - end; - {next, {_, L}} -> - L + 1; - {Offset, {S, E}} when is_integer(Offset) -> - max(S, min(Offset, E + 1)) - end, - %% find the appopriate segment and scan the index to find the - %% postition of the next chunk to read - case find_segment_for_offset(StartOffset, SegInfos) of - not_found -> - {error, {offset_out_of_range, Range}}; - {_, SegmentInfo = #seg_info{file = StartSegment}} -> - try - {ok, Fd} = open(StartSegment, [raw, binary, read]), - {ChOffs, FilePos} = + try + StartOffset = + case {OffsetSpec, Range} of + {_, empty} -> + 0; + {Offset, {_, LastOffs}} + when Offset > LastOffs -> + %% out of range, clamp as `next` + throw({retry_with, next, Conf}); + {Offset, {FirstOffs, _LastOffs}} -> + max(FirstOffs, Offset) + end, + %% find the appopriate segment and scan the index to find the + %% postition of the next chunk to read + case find_segment_for_offset(StartOffset, SegInfos) of + not_found -> + {error, {offset_out_of_range, Range}}; + {end_of_log, #seg_info{file = SegmentFile, + last = LastChunk}} -> + {ChunkId, FilePos} = next_location(LastChunk), + open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); + {found, SegmentInfo = #seg_info{file = SegmentFile}} -> + {ChunkId, _Epoch, FilePos} = case scan_idx(StartOffset, SegmentInfo) of eof -> - {StartOffset, 0}; + exit(offset_out_of_range); enoent -> %% index file was not found - %% just retry - _ = file:close(Fd), - init_offset_reader(OffsetSpec, Conf); + %% throw should be caught and trigger a retry + throw(missing_file); + offset_out_of_range -> + exit(offset_out_of_range); IdxResult when is_tuple(IdxResult) -> IdxResult end, - ?DEBUG("osiris_log:init_offset_reader/2 resolved chunk_id ~b" - " at file pos: ~w ", [ChOffs, FilePos]), - {ok, _Pos} = file:position(Fd, FilePos), - Cnt = make_counter(Conf), - ReaderCounterFun(1), - {ok, - #?MODULE{cfg = - #cfg{directory = Dir, - counter = Cnt, - name = Name, - readers_counter_fun = ReaderCounterFun, - first_offset_fun = fun (_) -> ok end - }, - mode = - #read{type = offset, - chunk_selector = maps:get(chunk_selector, Options, user_data), - offset_ref = OffsetRef, - next_offset = ChOffs, - transport = maps:get(transport, Options, tcp)}, - fd = Fd}} - catch - missing_file -> - %% Retention policies are likely being applied, let's try again - %% TODO: should we limit the number of retries? - init_offset_reader(OffsetSpec, Conf) - end + ?DEBUG("osiris_log:init_offset_reader/2 resolved chunk_id ~b" + " at file pos: ~w ", [ChunkId, FilePos]), + open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf) + end + catch + missing_file -> + %% Retention policies are likely being applied, let's try again + %% TODO: should we limit the number of retries? + init_offset_reader(OffsetSpec, Conf); + {retry_with, NewOffsSpec, NewConf} -> + init_offset_reader(NewOffsSpec, NewConf) end. +open_offset_reader_at(SegmentFile, NextChunkId, FilePos, + #{dir := Dir, + name := Name, + offset_ref := OffsetRef, + readers_counter_fun := ReaderCounterFun, + options := Options} = + Conf) -> + {ok, Fd} = open(SegmentFile, [raw, binary, read]), + {ok, FilePos} = file:position(Fd, FilePos), + Cnt = make_counter(Conf), + ReaderCounterFun(1), + {ok, #?MODULE{cfg = #cfg{directory = Dir, + counter = Cnt, + name = Name, + readers_counter_fun = ReaderCounterFun, + first_offset_fun = fun (_) -> ok end + }, + mode = #read{type = offset, + chunk_selector = maps:get(chunk_selector, Options, + user_data), + offset_ref = OffsetRef, + next_offset = NextChunkId, + transport = maps:get(transport, Options, tcp)}, + fd = Fd}}. + %% Searches the index files backwards for the ID of the last user chunk. -last_user_chunk_id(SegInfos) when is_list(SegInfos) -> +last_user_chunk_location(SegInfos) when is_list(SegInfos) -> {Time, Result} = timer:tc( fun() -> last_user_chunk_id0(lists:reverse(SegInfos)) @@ -1075,17 +1089,17 @@ last_user_chunk_id(SegInfos) when is_list(SegInfos) -> last_user_chunk_id0([]) -> %% There are no user chunks in any index files. - 0; -last_user_chunk_id0([#seg_info{index = IdxFile} | Rest]) -> + not_found; +last_user_chunk_id0([#seg_info{index = IdxFile} = Info | Rest]) -> try %% Do not read-ahead since we read the index file backwards chunk by chunk. {ok, IdxFd} = open(IdxFile, [read, raw, binary]), file:position(IdxFd, eof), Last = last_user_chunk_id_in_index(IdxFd), - file:close(IdxFd), + _ = file:close(IdxFd), case Last of - {ok, Id} -> - Id; + {ok, Id, Pos} -> + {Id, Pos, Info}; {error, Reason} -> ?DEBUG("Could not find user chunk in index file ~s (~p)", [IdxFile, Reason]), last_user_chunk_id0(Rest) @@ -1107,14 +1121,14 @@ last_user_chunk_id_in_index(IdxFd) -> <>} -> - {ok, Offset}; + {ok, Offset, FilePos}; {ok, <<_Offset:64/unsigned, _Timestamp:64/signed, _Epoch:64/unsigned, - _FileOffset:32/unsigned, + _FilePos:32/unsigned, _ChType:8/unsigned>>} -> last_user_chunk_id_in_index(IdxFd); {error, _} = Error -> @@ -1426,9 +1440,9 @@ build_log_overview(Dir) when is_list(Dir) -> fun() -> try IdxFiles = - lists:sort( - filelib:wildcard( - filename:join(Dir, "*.index"))), + lists:sort( + filelib:wildcard( + filename:join(Dir, "*.index"))), build_log_overview0(IdxFiles, []) catch missing_file -> @@ -1493,6 +1507,9 @@ build_segment_info(SegFile, LastChunkPos, IdxFile, Acc0) -> FirstTs:64/signed, FirstEpoch:64/unsigned, FirstChId:64/unsigned, + _FirstCrc:32/integer, + FirstSize:32/unsigned, + FirstTSize:32/unsigned, _/binary>>} -> {ok, LastChunkPos} = file:position(Fd, LastChunkPos), {ok, @@ -1523,13 +1540,17 @@ build_segment_info(SegFile, LastChunkPos, IdxFile, Acc0) -> timestamp = FirstTs, id = FirstChId, num = FirstNumRecords, - type = FirstChType}, + type = FirstChType, + size = FirstSize + FirstTSize, + pos = ?LOG_HEADER_SIZE}, last = #chunk_info{epoch = LastEpoch, timestamp = LastTs, id = LastChId, num = LastNumRecords, - type = LastChType}} + type = LastChType, + size = LastSize + LastTSize, + pos = LastChunkPos}} | Acc0] end catch @@ -1914,64 +1935,61 @@ open_index_read(File) -> _ = file:read(Fd, ?IDX_HEADER_SIZE), Fd. -scan_idx(Offset, SegmentInfo = #seg_info{index = IndexFile, last = LastChunkInSegment}) -> +scan_idx(Offset, #seg_info{index = IndexFile, + last = LastChunkInSegment} = SegmentInfo) -> {Time, Result} = timer:tc( fun() -> case offset_range_from_segment_infos([SegmentInfo]) of empty -> - %% if the index is empty do we really know the offset will be next - %% this relies on us always reducing the Offset to within the log range - {0, ?LOG_HEADER_SIZE}; - {SegmentStart, SegmentEnd} -> - case Offset < SegmentStart orelse Offset > SegmentEnd + 1 of - true -> offset_out_of_range; + eof; + {SegmentStartOffs, SegmentEndOffs} -> + case Offset < SegmentStartOffs orelse + Offset > SegmentEndOffs of + true -> + offset_out_of_range; false -> IndexFd = open_index_read(IndexFile), - Result = scan_idx(IndexFd, Offset, LastChunkInSegment), - file:close(IndexFd), + Result = scan_idx(IndexFd, Offset, + LastChunkInSegment), + _ = file:close(IndexFd), Result end end end), - ?DEBUG("~s:~s/~b completed in ~fs", [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time/1000000]), + ?DEBUG("~s:~s/~b completed in ~fs", + [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time/1000000]), Result. scan_idx(Fd, Offset, #chunk_info{id = LastChunkInSegmentId, num = LastChunkInSegmentNum}) -> case file:read(Fd, ?INDEX_RECORD_SIZE_B) of - {ok, - <>} -> - case Offset < ChunkId of + {ok, <>} -> + LastOffsetInSegment = LastChunkInSegmentId + LastChunkInSegmentNum - 1, + case Offset < ChunkId orelse Offset > LastOffsetInSegment of true -> - %% offset is lower than the first chunk in this segment %% shouldn't really happen as we check the range above offset_out_of_range; false -> - case Offset > (LastChunkInSegmentId + LastChunkInSegmentNum - 1) of - true -> - %% offset higher than the last chunk in this segment - {LastChunkInSegmentId + LastChunkInSegmentNum, eof}; - false -> - %% offset is in this segment - scan_idx(Fd, Offset, {ChunkId, FilePos}) - end + %% offset is in this segment + scan_idx0(Fd, Offset, {ChunkId, Epoch, FilePos}) end; eof -> %% this should never happen - offset is in the range and we are reading the first record eof; {error, Posix} -> Posix - end; -scan_idx(Fd, Offset, PreviousChunk) -> + end. + +scan_idx0(Fd, Offset, PreviousChunk) -> case file:read(Fd, ?INDEX_RECORD_SIZE_B) of {ok, <>} -> case Offset < ChunkId of @@ -1982,11 +2000,13 @@ scan_idx(Fd, Offset, PreviousChunk) -> %% return the previous chunk PreviousChunk; false -> - scan_idx(Fd, Offset, {ChunkId, FilePos}) + scan_idx0(Fd, Offset, {ChunkId, Epoch, FilePos}) end; eof -> - %% Offset is in the last chunk - PreviousChunk + %% Offset must be in the last chunk as there is no more data + PreviousChunk; + {error, Posix} -> + Posix end. throw_missing({error, enoent}) -> @@ -1997,11 +2017,11 @@ throw_missing(Any) -> open(SegFile, Options) -> throw_missing(file:open(SegFile, Options)). -chunk_id_for_timestamp(#seg_info{index = Idx}, Ts) -> +chunk_location_for_timestamp(#seg_info{index = Idx}, Ts) -> Fd = open_index_read(Idx), %% scan index file for nearest timestamp - {ChunkId, _Timestamp, _Epoch, _FilePos} = timestamp_idx_scan(Fd, Ts), - ChunkId. + {ChunkId, _Timestamp, _Epoch, FilePos} = timestamp_idx_scan(Fd, Ts), + {ChunkId, FilePos}. timestamp_idx_scan(Fd, Ts) -> case file:read(Fd, ?INDEX_RECORD_SIZE_B) of @@ -2232,6 +2252,14 @@ trigger_retention_eval(#?MODULE{cfg = end), State. +next_location(undefined) -> + {0, ?LOG_HEADER_SIZE}; +next_location(#chunk_info{id = Id, + num = Num, + pos = Pos, + size = Size}) -> + {Id + Num, Pos + Size + ?HEADER_SIZE_B}. + -ifdef(TEST). % -include_lib("eunit/include/eunit.hrl"). diff --git a/test/osiris_SUITE.erl b/test/osiris_SUITE.erl index 15aef575..dba09745 100644 --- a/test/osiris_SUITE.erl +++ b/test/osiris_SUITE.erl @@ -895,7 +895,6 @@ retention_overtakes_offset_reader(Config) -> SegSize = 50000 * 1000, [LeaderNode | Replicas] = Nodes = [start_child_node(N, DataDir) || N <- [s1, s2, s3]], - % [Replica1, Replica2] = Replicas, Conf0 = #{name => Name, epoch => 1, diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index df0e7b8a..49433da9 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -48,6 +48,7 @@ all_tests() -> init_offset_reader_no_user_chunk_in_segments, init_offset_reader_timestamp, init_offset_reader_truncated, + init_data_reader_next, init_data_reader_empty_log, init_data_reader_truncated, init_epoch_offsets_empty, @@ -331,8 +332,7 @@ write_multi_log(Config) -> ?assertEqual(2, length(Segments)), OffRef = atomics:new(2, []), - atomics:put(OffRef, 1, - 1011), %% takes a single offset tracking data into account + atomics:put(OffRef, 1, 1011), %% takes a single offset tracking data into account %% ensure all records can be read {ok, R0} = osiris_log:init_offset_reader(first, Conf#{offset_ref => OffRef}), @@ -371,7 +371,7 @@ init_offset_reader_empty(Config) -> LDir = ?config(leader_dir, Config), LLog0 = seed_log(LDir, [], Config), osiris_log:close(LLog0), - RConf = Conf#{dir => LDir, offset_ref => ?FUNCTION_NAME}, + RConf = Conf#{dir => LDir}, %% first and last falls back to next {ok, L1} = osiris_log:init_offset_reader(first, RConf), {ok, L2} = osiris_log:init_offset_reader(last, RConf), @@ -404,9 +404,10 @@ init_offset_reader(Config, Mode) -> [{1, [<<"one">>]}, {2, [<<"two">>]}, {3, [<<"three">>, <<"four">>]}], LDir = ?config(leader_dir, Config), Conf = ?config(osiris_conf, Config), + set_offset_ref(Conf, 3), LLog0 = seed_log(LDir, EpochChunks, Config), osiris_log:close(LLog0), - RConf = Conf#{dir => LDir, offset_ref => ?FUNCTION_NAME, mode => Mode}, + RConf = Conf#{dir => LDir, mode => Mode}, {ok, L1} = osiris_log:init_offset_reader(first, RConf), ?assertEqual(0, osiris_log:next_offset(L1)), @@ -464,7 +465,8 @@ init_offset_reader_last_chunk_is_not_user_chunk(Config) -> osiris_log:close(S2), Conf = ?config(osiris_conf, Config), - RConf = Conf#{dir => LDir, offset_ref => ?FUNCTION_NAME}, + RConf = Conf#{dir => LDir}, + set_offset_ref(RConf, 4), % Test that 'last' returns last user chunk {ok, L1} = osiris_log:init_offset_reader(last, RConf), ?assertEqual(1, osiris_log:next_offset(L1)), @@ -484,6 +486,7 @@ init_offset_reader_no_user_chunk_in_last_segment(Config) -> Conf0 = ?config(osiris_conf, Config), Conf = Conf0#{max_segment_size_bytes => 120}, S0 = seed_log(Conf, [{1, [<<"one">>]}], Config), + set_offset_ref(Conf, 2), S1 = osiris_log:write([<<"1st tracking delta chunk">>], ?CHNK_TRK_DELTA, ?LINE, @@ -496,7 +499,7 @@ init_offset_reader_no_user_chunk_in_last_segment(Config) -> S1), osiris_log:close(S2), - RConf = Conf#{offset_ref => ?FUNCTION_NAME}, + RConf = Conf, % test that 'last' returns last user chunk {ok, L1} = osiris_log:init_offset_reader(last, RConf), ?assertEqual(0, osiris_log:next_offset(L1)), @@ -521,10 +524,11 @@ init_offset_reader_no_user_chunk_in_segments(Config) -> osiris_log:close(S1), Conf = ?config(osiris_conf, Config), - RConf = Conf#{dir => LDir, offset_ref => ?FUNCTION_NAME}, - % Test that 'last' returns 0 when there is no user chunk + RConf = Conf#{dir => LDir}, + % Test that 'last' falls back to `next` behaviour when there is no user chunk + % present in the log {ok, L1} = osiris_log:init_offset_reader(last, RConf), - ?assertEqual(0, osiris_log:next_offset(L1)), + ?assertEqual(1, osiris_log:next_offset(L1)), osiris_log:close(L1), {ok, L2} = osiris_log:init_offset_reader(next, RConf), @@ -542,8 +546,9 @@ init_offset_reader_timestamp(Config) -> LDir = ?config(leader_dir, Config), Conf = ?config(osiris_conf, Config), LLog0 = seed_log(LDir, EpochChunks, Config), + set_offset_ref(Conf, 3), osiris_log:close(LLog0), - RConf = Conf#{dir => LDir, offset_ref => ?FUNCTION_NAME}, + RConf = Conf#{dir => LDir}, {ok, L1} = osiris_log:init_offset_reader({timestamp, Now - 8000}, RConf), @@ -571,7 +576,8 @@ init_offset_reader_truncated(Config) -> Conf = ?config(osiris_conf, Config), LDir = ?config(leader_dir, Config), LLog0 = seed_log(LDir, EpochChunks, Config), - RConf = Conf#{dir => LDir, offset_ref => ?FUNCTION_NAME}, + set_offset_ref(Conf, 1000), + RConf = Conf#{dir => LDir}, osiris_log:close(LLog0), %% "Truncate" log by deleting first segment @@ -608,6 +614,20 @@ init_offset_reader_truncated(Config) -> osiris_log:close(L5), ok. +init_data_reader_next(Config) -> + Conf = ?config(osiris_conf, Config), + LDir = ?config(leader_dir, Config), + FDir = ?config(follower1_dir, Config), + _LLog0 = seed_log(LDir, [{1, ["one"]}], Config), + %% seed is up to date + FLog0 = seed_log(FDir, [{1, ["one"]}], Config), + RRConf = Conf#{dir => LDir}, + %% the next offset, i.e. offset 0 + {ok, _RLog0} = + osiris_log:init_data_reader( + osiris_log:tail_info(FLog0), RRConf), + ok. + init_data_reader_empty_log(Config) -> Conf = ?config(osiris_conf, Config), LDir = ?config(leader_dir, Config), @@ -644,7 +664,7 @@ init_data_reader_truncated(Config) -> Conf = ?config(osiris_conf, Config), LDir = ?config(leader_dir, Config), LLog0 = seed_log(LDir, EpochChunks, Config), - RConf = Conf#{dir => LDir, offset_ref => ?FUNCTION_NAME}, + RConf = Conf#{dir => LDir}, osiris_log:close(LLog0), %% "Truncate" log by deleting first segment @@ -656,10 +676,7 @@ init_data_reader_truncated(Config) -> filename:join(LDir, "00000000000000000000.segment")), %% when requesting a lower offset than the start of the log - %% it should automatically attach at the first available offset - {ok, L1} = osiris_log:init_data_reader({0, empty}, RConf), - ?assert(0 < osiris_log:next_offset(L1)), - osiris_log:close(L1), + {error, {offset_out_of_range, _}} = osiris_log:init_data_reader({0, empty}, RConf), %% attaching inside the log should be ok too {ok, L2} = osiris_log:init_data_reader({750, {1, 700, ?LINE}}, RConf), @@ -1100,3 +1117,6 @@ make_trailer(Type, K, V) -> (byte_size(K)):8/unsigned, K/binary, V:64/unsigned>>. + +set_offset_ref(#{offset_ref := Ref}, Value) -> + atomics:put(Ref, 1, Value).