From 0ab12d3ce7acdb120f4c6bce12f6f7a508b0d6a0 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 8 Sep 2021 14:52:06 +0100 Subject: [PATCH 1/2] Refactoring and bug fixes for osiris_log Refactoring to better handle cases where previously the `next` offset spec would try to attach to the eof position even though this can vary depending on whether the log is being written to during reader init. This would result in many unexpected_chunk_id exceptions during load. t Also refactoring and simplifying other aspects of reader init to make code clearer and in some cases faster. --- src/osiris_log.erl | 494 +++++++++++++++++++++----------------- test/osiris_SUITE.erl | 1 - test/osiris_log_SUITE.erl | 52 ++-- 3 files changed, 304 insertions(+), 243 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 5e3a75ab..d24eae9b 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -386,7 +386,11 @@ timestamp :: non_neg_integer(), epoch :: epoch(), num :: non_neg_integer(), - type :: chunk_type() + type :: chunk_type(), + %% size of data + trailer + size :: non_neg_integer(), + %% position in segment file + pos :: integer() }). -record(seg_info, {file :: file:filename(), @@ -795,131 +799,102 @@ truncate_to(Name, Range, [{E, ChId} | NextEOs], SegInfos) -> empty | {offset(), offset()}}} | {error, {invalid_last_offset_epoch, offset(), offset()}}. -init_data_reader({StartOffset, PrevEO}, #{dir := Dir, - readers_counter_fun := Fun} = Config) -> +init_data_reader({StartChunkId, PrevEOT}, #{dir := Dir} = Config) -> SegInfos = build_log_overview(Dir), Range = offset_range_from_segment_infos(SegInfos), ?DEBUG("osiris_segment:init_data_reader/2 at ~b prev " - "~w range: ~w", - [StartOffset, PrevEO, Range]), + "~w local range: ~w", + [StartChunkId, PrevEOT, Range]), %% Invariant: there is always at least one segment left on disk case Range of - {F, _L} when StartOffset < F -> - %% if a lower than exisiting is request simply forward - %% it to the first offset of the log - %% in this case we cannot validate PrevEO - instead - %% the replica should truncate all of it's exisiting log - case find_segment_for_offset(F, SegInfos) of - not_found -> - %% this is unexpected and thus an error - exit({segment_not_found, F, SegInfos}); - {_, StartSegmentInfo} -> - {ok, - init_data_reader_from_segment(Config, StartSegmentInfo, F, Fun)} - end; - empty when StartOffset > 0 -> + {FstOffs, LastOffs} + when StartChunkId < FstOffs + orelse StartChunkId > LastOffs + 1 -> {error, {offset_out_of_range, Range}}; - {_F, L} when StartOffset > L + 1 -> - %% if we are trying to attach to anything larger than - %% the next offset (i.e last +1) this is in out of range - %% error + empty when StartChunkId > 0 -> {error, {offset_out_of_range, Range}}; - _ -> + _ when PrevEOT == empty -> %% this assumes the offset is in range %% first we need to validate PrevEO - case PrevEO of - empty -> - case find_segment_for_offset(StartOffset, SegInfos) of - not_found -> - %% this is unexpected and thus an error - exit({segment_not_found, StartOffset, SegInfos}); - {_, StartSegmentInfo} -> - {ok, - init_data_reader_from_segment(Config, - StartSegmentInfo, - StartOffset, - Fun)} - end; - {PrevE, PrevO, _PrevTs} -> - case find_segment_for_offset(PrevO, SegInfos) of - not_found -> - %% this is unexpected and thus an error - {error, - {invalid_last_offset_epoch, PrevE, unknown}}; - {_, SegmentInfo = #seg_info{file = PrevSeg}} -> - %% prev segment exists, does it have the correct - %% epoch? - {ok, Fd} = file:open(PrevSeg, [raw, binary, read]), - %% TODO: next offset needs to be a chunk offset - {_, FilePos} = scan_idx(PrevO, SegmentInfo), - {ok, FilePos} = file:position(Fd, FilePos), - case file:read(Fd, ?HEADER_SIZE_B) of - {ok, - <>} -> - ok = file:close(Fd), - {ok, - init_data_reader_from_segment(Config, - element(2, - find_segment_for_offset(StartOffset, - SegInfos)), - StartOffset, - Fun)}; - {ok, - <>} -> - ok = file:close(Fd), - {error, - {invalid_last_offset_epoch, PrevE, OtherE}} - end - end + {ok, init_data_reader_from( + StartChunkId, + find_segment_for_offset(StartChunkId, SegInfos), + Config)}; + _ -> + {PrevEpoch, PrevChunkId, _PrevTs} = PrevEOT, + case check_chunk_has_expected_epoch(PrevChunkId, PrevEpoch, SegInfos) of + ok -> + {ok, init_data_reader_from( + StartChunkId, + find_segment_for_offset(StartChunkId, SegInfos), + Config)}; + {error, _} = Err -> + Err end end. -init_data_reader_from_segment(#{dir := Dir, name := Name} = Config, - SegmentInfo = #seg_info{file = StartSegment}, - NextOffs, CounterFun) -> - {ok, Fd} = file:open(StartSegment, [raw, binary, read]), - %% TODO: next offset needs to be a chunk offset - {_, FilePos} = scan_idx(NextOffs, SegmentInfo), - {ok, _Pos} = file:position(Fd, FilePos), +check_chunk_has_expected_epoch(ChunkId, Epoch, SegInfos) -> + case find_segment_for_offset(ChunkId, SegInfos) of + not_found -> + %% this is unexpected and thus an error + {error, + {invalid_last_offset_epoch, Epoch, unknown}}; + {found, SegmentInfo = #seg_info{file = _PrevSeg}} -> + %% prev segment exists, does it have the correct + %% epoch? + case scan_idx(ChunkId, SegmentInfo) of + {ChunkId, Epoch, _PrevPos} -> + ok; + {ChunkId, OtherEpoch, _} -> + {error, + {invalid_last_offset_epoch, Epoch, OtherEpoch}} + end + end. + +init_data_reader_at(ChunkId, FilePos, File, + #{dir := Dir, name := Name, + readers_counter_fun := CountersFun} = Config) -> + {ok, Fd} = file:open(File, [raw, binary, read]), + {ok, FilePos} = file:position(Fd, FilePos), Cnt = make_counter(Config), - counters:put(Cnt, ?C_OFFSET, NextOffs - 1), - CounterFun(1), + counters:put(Cnt, ?C_OFFSET, ChunkId - 1), + CountersFun(1), #?MODULE{cfg = #cfg{directory = Dir, counter = Cnt, name = Name, - readers_counter_fun = CounterFun, + readers_counter_fun = CountersFun, first_offset_fun = fun (_) -> ok end}, mode = #read{type = data, offset_ref = maps:get(offset_ref, Config, undefined), - next_offset = NextOffs, + next_offset = ChunkId, chunk_selector = all, transport = maps:get(transport, Config, tcp)}, fd = Fd}. +init_data_reader_from(ChunkId, + {end_of_log, #seg_info{file = File} = SegInfo}, + Config) -> + case SegInfo of + #seg_info{file = File, last = undefined} -> + init_data_reader_at(0, ?LOG_HEADER_SIZE, File, Config); + #seg_info{file = File, + last = #chunk_info{id = Id, + num = Num, + pos = Pos, + size = Size}} -> + ChunkId = Id + Num, %% assertion + AttachPos = Pos + Size + ?HEADER_SIZE_B, + init_data_reader_at(ChunkId, AttachPos, File, Config) + end; +init_data_reader_from(ChunkId, + {found, #seg_info{file = File} = SegInfo}, + Config) -> + %% TODO: next offset needs to be a chunk offset + {ChunkId, _Epoch, FilePos} = scan_idx(ChunkId, SegInfo), + init_data_reader_at(ChunkId, FilePos, File, Config). + %% @doc Initialise a new offset reader %% @param OffsetSpec specifies where in the log to attach the reader %% `first': Attach at first available offset. @@ -940,9 +915,9 @@ init_data_reader_from_segment(#{dir := Dir, name := Name} = Config, {ok, state()} | {error, {offset_out_of_range, - empty | {From :: offset(), To :: offset()}}}. + empty | {From :: offset(), To :: offset()}}} | + {error, {invalid_chunk_header, term()}}. init_offset_reader({abs, Offs}, #{dir := Dir} = Conf) -> - %% TODO: some unnecessary computation here Range = offset_range_from_segment_infos(build_log_overview(Dir)), case Range of empty -> @@ -957,10 +932,13 @@ init_offset_reader({timestamp, Ts}, #{dir := Dir} = Conf) -> case build_log_overview(Dir) of [] -> init_offset_reader(next, Conf); - [#seg_info{first = #chunk_info{timestamp = Fst}} | _] + [#seg_info{file = SegmentFile, + first = #chunk_info{timestamp = Fst, + pos = FilePos, + id = ChunkId}} | _] when is_integer(Fst) andalso Fst > Ts -> %% timestamp is lower than the first timestamp available - init_offset_reader(first, Conf); + open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); SegInfos -> case lists:search(fun (#seg_info{first = #chunk_info{timestamp = F}, last = #chunk_info{timestamp = L}}) @@ -972,100 +950,153 @@ init_offset_reader({timestamp, Ts}, #{dir := Dir} = Conf) -> end, SegInfos) of - {value, Info} -> + {value, #seg_info{file = SegmentFile} = Info} -> %% segment was found, now we need to scan index to %% find nearest offset - ChunkId = chunk_id_for_timestamp(Info, Ts), - init_offset_reader(ChunkId, Conf); + {ChunkId, FilePos} = chunk_location_for_timestamp(Info, Ts), + open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); false -> %% segment was not found, attach next + %% this should be rare so no need to call the more optimal + %% open_offset_reader_at/4 function init_offset_reader(next, Conf) end end; -init_offset_reader(OffsetSpec, - #{dir := Dir, - name := Name, - offset_ref := OffsetRef, - readers_counter_fun := ReaderCounterFun, - options := Options} = - Conf) -> +init_offset_reader(first, #{dir := Dir} = Conf) -> + case build_log_overview(Dir) of + [#seg_info{file = File, + last = undefined}] -> + %% empty log, attach at 0 + open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); + [#seg_info{file = File, + first = #chunk_info{id = FirstChunkId, + pos = FilePos}} | _] -> + open_offset_reader_at(File, FirstChunkId, FilePos, Conf); + _ -> + %% TODO + exit({?FUNCTION_NAME, first}) + end; +init_offset_reader(next, #{dir := Dir} = Conf) -> + SegInfos = build_log_overview(Dir), + case lists:reverse(SegInfos) of + [#seg_info{file = File, + last = undefined}] -> + %% empty log, attach at 0 + open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); + [#seg_info{file = File, + last = #chunk_info{size = Size, + num = LastNum, + id = LastChunkId, + pos = LastChPos}} | _] -> + FilePos = LastChPos + Size + ?HEADER_SIZE_B, + NextChunkId = LastChunkId + LastNum, + open_offset_reader_at(File, NextChunkId, FilePos, Conf); + _ -> + %% TODO + exit({?FUNCTION_NAME, next}) + end; +init_offset_reader(last, #{dir := Dir} = Conf) -> + SegInfos = build_log_overview(Dir), + case lists:reverse(SegInfos) of + [#seg_info{file = File, + last = undefined}] -> + %% empty log, attach at 0 + open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); + [#seg_info{file = File, + last = #chunk_info{type = ?CHNK_USER, + id = LastChunkId, + pos = FilePos}} | _] -> + open_offset_reader_at(File, LastChunkId, FilePos, Conf); + _ -> + case last_user_chunk_location(SegInfos) of + not_found -> + ?DEBUG("~s:~s use chunk not found, fall back to next", + [?MODULE, ?FUNCTION_NAME]), + %% no user chunks in stream, this is awkward, fall back to next + init_offset_reader(next, Conf); + {ChunkId, FilePos, #seg_info{file = File}} -> + open_offset_reader_at(File, ChunkId, FilePos, Conf) + end + end; +init_offset_reader(OffsetSpec, #{dir := Dir} = Conf) -> SegInfos = build_log_overview(Dir), ChunkRange = chunk_range_from_segment_infos(SegInfos), Range = offset_range_from_chunk_range(ChunkRange), - ?DEBUG("osiris_log:init_offset_reader/2 spec ~w range " - "~w ", + ?DEBUG("osiris_log:init_offset_reader/2 spec ~w range ~w ", [OffsetSpec, Range]), - StartOffset = - case {OffsetSpec, Range} of - {_, empty} -> - 0; - {first, {F, _}} -> - F; - {last, {_, L}} -> - case ChunkRange of - {_FirstChunk, #chunk_info{type = ?CHNK_USER}} -> - %% Last chunk in last segment is a user chunk. - L; - _ -> - %% Last chunk in last segment is not a user chunk (but e.g. a tracking chunk). - %% Therefore, find the last user chunk by searching the index files backwards. - last_user_chunk_id(SegInfos) - end; - {next, {_, L}} -> - L + 1; - {Offset, {S, E}} when is_integer(Offset) -> - max(S, min(Offset, E + 1)) - end, - %% find the appopriate segment and scan the index to find the - %% postition of the next chunk to read - case find_segment_for_offset(StartOffset, SegInfos) of - not_found -> - {error, {offset_out_of_range, Range}}; - {_, SegmentInfo = #seg_info{file = StartSegment}} -> - try - {ok, Fd} = open(StartSegment, [raw, binary, read]), - {ChOffs, FilePos} = + try + StartOffset = + case {OffsetSpec, Range} of + {_, empty} -> + 0; + {Offset, {_, LastOffs}} + when is_integer(Offset) andalso + Offset > LastOffs -> + %% out of range, clamp as `next` + throw({retry_with, next, Conf}); + {Offset, {FirstOffs, _LastOffs}} when is_integer(Offset) -> + max(FirstOffs, Offset) + end, + %% find the appopriate segment and scan the index to find the + %% postition of the next chunk to read + case find_segment_for_offset(StartOffset, SegInfos) of + not_found -> + {error, {offset_out_of_range, Range}}; + {_, SegmentInfo = #seg_info{file = SegmentFile}} -> + {ChunkId, _Epoch, FilePos} = case scan_idx(StartOffset, SegmentInfo) of eof -> - {StartOffset, 0}; + exit(offset_out_of_range); enoent -> %% index file was not found - %% just retry - _ = file:close(Fd), - init_offset_reader(OffsetSpec, Conf); + %% throw should be caught and trigger a retry + throw(missing_file); + offset_out_of_range -> + exit(offset_out_of_range); IdxResult when is_tuple(IdxResult) -> IdxResult end, - ?DEBUG("osiris_log:init_offset_reader/2 resolved chunk_id ~b" - " at file pos: ~w ", [ChOffs, FilePos]), - {ok, _Pos} = file:position(Fd, FilePos), - Cnt = make_counter(Conf), - ReaderCounterFun(1), - {ok, - #?MODULE{cfg = - #cfg{directory = Dir, - counter = Cnt, - name = Name, - readers_counter_fun = ReaderCounterFun, - first_offset_fun = fun (_) -> ok end - }, - mode = - #read{type = offset, - chunk_selector = maps:get(chunk_selector, Options, user_data), - offset_ref = OffsetRef, - next_offset = ChOffs, - transport = maps:get(transport, Options, tcp)}, - fd = Fd}} - catch - missing_file -> - %% Retention policies are likely being applied, let's try again - %% TODO: should we limit the number of retries? - init_offset_reader(OffsetSpec, Conf) - end + ?DEBUG("osiris_log:init_offset_reader/2 resolved chunk_id ~b" + " at file pos: ~w ", [ChunkId, FilePos]), + + open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf) + end + catch + missing_file -> + %% Retention policies are likely being applied, let's try again + %% TODO: should we limit the number of retries? + init_offset_reader(OffsetSpec, Conf); + {retry_with, NewOffsSpec, NewConf} -> + init_offset_reader(NewOffsSpec, NewConf) end. +open_offset_reader_at(SegmentFile, NextChunkId, FilePos, + #{dir := Dir, + name := Name, + offset_ref := OffsetRef, + readers_counter_fun := ReaderCounterFun, + options := Options} = + Conf) -> + {ok, Fd} = open(SegmentFile, [raw, binary, read]), + {ok, FilePos} = file:position(Fd, FilePos), + Cnt = make_counter(Conf), + ReaderCounterFun(1), + {ok, #?MODULE{cfg = #cfg{directory = Dir, + counter = Cnt, + name = Name, + readers_counter_fun = ReaderCounterFun, + first_offset_fun = fun (_) -> ok end + }, + mode = #read{type = offset, + chunk_selector = maps:get(chunk_selector, Options, + user_data), + offset_ref = OffsetRef, + next_offset = NextChunkId, + transport = maps:get(transport, Options, tcp)}, + fd = Fd}}. + %% Searches the index files backwards for the ID of the last user chunk. -last_user_chunk_id(SegInfos) when is_list(SegInfos) -> +last_user_chunk_location(SegInfos) when is_list(SegInfos) -> {Time, Result} = timer:tc( fun() -> last_user_chunk_id0(lists:reverse(SegInfos)) @@ -1075,17 +1106,17 @@ last_user_chunk_id(SegInfos) when is_list(SegInfos) -> last_user_chunk_id0([]) -> %% There are no user chunks in any index files. - 0; -last_user_chunk_id0([#seg_info{index = IdxFile} | Rest]) -> + not_found; +last_user_chunk_id0([#seg_info{index = IdxFile} = Info | Rest]) -> try %% Do not read-ahead since we read the index file backwards chunk by chunk. {ok, IdxFd} = open(IdxFile, [read, raw, binary]), file:position(IdxFd, eof), Last = last_user_chunk_id_in_index(IdxFd), - file:close(IdxFd), + _ = file:close(IdxFd), case Last of - {ok, Id} -> - Id; + {ok, Id, Pos} -> + {Id, Pos, Info}; {error, Reason} -> ?DEBUG("Could not find user chunk in index file ~s (~p)", [IdxFile, Reason]), last_user_chunk_id0(Rest) @@ -1107,14 +1138,14 @@ last_user_chunk_id_in_index(IdxFd) -> <>} -> - {ok, Offset}; + {ok, Offset, FilePos}; {ok, <<_Offset:64/unsigned, _Timestamp:64/signed, _Epoch:64/unsigned, - _FileOffset:32/unsigned, + _FilePos:32/unsigned, _ChType:8/unsigned>>} -> last_user_chunk_id_in_index(IdxFd); {error, _} = Error -> @@ -1426,9 +1457,9 @@ build_log_overview(Dir) when is_list(Dir) -> fun() -> try IdxFiles = - lists:sort( - filelib:wildcard( - filename:join(Dir, "*.index"))), + lists:sort( + filelib:wildcard( + filename:join(Dir, "*.index"))), build_log_overview0(IdxFiles, []) catch missing_file -> @@ -1493,6 +1524,9 @@ build_segment_info(SegFile, LastChunkPos, IdxFile, Acc0) -> FirstTs:64/signed, FirstEpoch:64/unsigned, FirstChId:64/unsigned, + _FirstCrc:32/integer, + FirstSize:32/unsigned, + FirstTSize:32/unsigned, _/binary>>} -> {ok, LastChunkPos} = file:position(Fd, LastChunkPos), {ok, @@ -1523,13 +1557,17 @@ build_segment_info(SegFile, LastChunkPos, IdxFile, Acc0) -> timestamp = FirstTs, id = FirstChId, num = FirstNumRecords, - type = FirstChType}, + type = FirstChType, + size = FirstSize + FirstTSize, + pos = ?LOG_HEADER_SIZE}, last = #chunk_info{epoch = LastEpoch, timestamp = LastTs, id = LastChId, num = LastNumRecords, - type = LastChType}} + type = LastChType, + size = LastSize + LastTSize, + pos = LastChunkPos}} | Acc0] end catch @@ -1914,64 +1952,66 @@ open_index_read(File) -> _ = file:read(Fd, ?IDX_HEADER_SIZE), Fd. -scan_idx(Offset, SegmentInfo = #seg_info{index = IndexFile, last = LastChunkInSegment}) -> +scan_idx(Offset, #seg_info{index = IndexFile, + last = LastChunkInSegment} = SegmentInfo) -> {Time, Result} = timer:tc( fun() -> case offset_range_from_segment_infos([SegmentInfo]) of empty -> - %% if the index is empty do we really know the offset will be next - %% this relies on us always reducing the Offset to within the log range - {0, ?LOG_HEADER_SIZE}; - {SegmentStart, SegmentEnd} -> - case Offset < SegmentStart orelse Offset > SegmentEnd + 1 of - true -> offset_out_of_range; + %% TODO: this does not feel right + %% if the index is empty do we really know + %% the offset will be next + %% this relies on us always reducing the + %% Offset to within the log range + {0, 0, ?LOG_HEADER_SIZE}; + {SegmentStartOffs, SegmentEndOffs} -> + case Offset < SegmentStartOffs orelse + Offset > SegmentEndOffs of + true -> + offset_out_of_range; false -> IndexFd = open_index_read(IndexFile), - Result = scan_idx(IndexFd, Offset, LastChunkInSegment), - file:close(IndexFd), + Result = scan_idx(IndexFd, Offset, + LastChunkInSegment), + _ = file:close(IndexFd), Result end end end), - ?DEBUG("~s:~s/~b completed in ~fs", [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time/1000000]), + ?DEBUG("~s:~s/~b completed in ~fs", + [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time/1000000]), Result. scan_idx(Fd, Offset, #chunk_info{id = LastChunkInSegmentId, num = LastChunkInSegmentNum}) -> case file:read(Fd, ?INDEX_RECORD_SIZE_B) of - {ok, - <>} -> - case Offset < ChunkId of + {ok, <>} -> + LastOffsetInSegment = LastChunkInSegmentId + LastChunkInSegmentNum - 1, + case Offset < ChunkId orelse Offset > LastOffsetInSegment of true -> - %% offset is lower than the first chunk in this segment %% shouldn't really happen as we check the range above offset_out_of_range; false -> - case Offset > (LastChunkInSegmentId + LastChunkInSegmentNum - 1) of - true -> - %% offset higher than the last chunk in this segment - {LastChunkInSegmentId + LastChunkInSegmentNum, eof}; - false -> - %% offset is in this segment - scan_idx(Fd, Offset, {ChunkId, FilePos}) - end + %% offset is in this segment + scan_idx0(Fd, Offset, {ChunkId, Epoch, FilePos}) end; eof -> %% this should never happen - offset is in the range and we are reading the first record eof; {error, Posix} -> Posix - end; -scan_idx(Fd, Offset, PreviousChunk) -> + end. + +scan_idx0(Fd, Offset, PreviousChunk) -> case file:read(Fd, ?INDEX_RECORD_SIZE_B) of {ok, <>} -> case Offset < ChunkId of @@ -1982,11 +2022,13 @@ scan_idx(Fd, Offset, PreviousChunk) -> %% return the previous chunk PreviousChunk; false -> - scan_idx(Fd, Offset, {ChunkId, FilePos}) + scan_idx0(Fd, Offset, {ChunkId, Epoch, FilePos}) end; eof -> - %% Offset is in the last chunk - PreviousChunk + %% Offset must be in the last chunk as there is no more data + PreviousChunk; + {error, Posix} -> + Posix end. throw_missing({error, enoent}) -> @@ -1997,11 +2039,11 @@ throw_missing(Any) -> open(SegFile, Options) -> throw_missing(file:open(SegFile, Options)). -chunk_id_for_timestamp(#seg_info{index = Idx}, Ts) -> +chunk_location_for_timestamp(#seg_info{index = Idx}, Ts) -> Fd = open_index_read(Idx), %% scan index file for nearest timestamp - {ChunkId, _Timestamp, _Epoch, _FilePos} = timestamp_idx_scan(Fd, Ts), - ChunkId. + {ChunkId, _Timestamp, _Epoch, FilePos} = timestamp_idx_scan(Fd, Ts), + {ChunkId, FilePos}. timestamp_idx_scan(Fd, Ts) -> case file:read(Fd, ?INDEX_RECORD_SIZE_B) of diff --git a/test/osiris_SUITE.erl b/test/osiris_SUITE.erl index 15aef575..dba09745 100644 --- a/test/osiris_SUITE.erl +++ b/test/osiris_SUITE.erl @@ -895,7 +895,6 @@ retention_overtakes_offset_reader(Config) -> SegSize = 50000 * 1000, [LeaderNode | Replicas] = Nodes = [start_child_node(N, DataDir) || N <- [s1, s2, s3]], - % [Replica1, Replica2] = Replicas, Conf0 = #{name => Name, epoch => 1, diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index df0e7b8a..49433da9 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -48,6 +48,7 @@ all_tests() -> init_offset_reader_no_user_chunk_in_segments, init_offset_reader_timestamp, init_offset_reader_truncated, + init_data_reader_next, init_data_reader_empty_log, init_data_reader_truncated, init_epoch_offsets_empty, @@ -331,8 +332,7 @@ write_multi_log(Config) -> ?assertEqual(2, length(Segments)), OffRef = atomics:new(2, []), - atomics:put(OffRef, 1, - 1011), %% takes a single offset tracking data into account + atomics:put(OffRef, 1, 1011), %% takes a single offset tracking data into account %% ensure all records can be read {ok, R0} = osiris_log:init_offset_reader(first, Conf#{offset_ref => OffRef}), @@ -371,7 +371,7 @@ init_offset_reader_empty(Config) -> LDir = ?config(leader_dir, Config), LLog0 = seed_log(LDir, [], Config), osiris_log:close(LLog0), - RConf = Conf#{dir => LDir, offset_ref => ?FUNCTION_NAME}, + RConf = Conf#{dir => LDir}, %% first and last falls back to next {ok, L1} = osiris_log:init_offset_reader(first, RConf), {ok, L2} = osiris_log:init_offset_reader(last, RConf), @@ -404,9 +404,10 @@ init_offset_reader(Config, Mode) -> [{1, [<<"one">>]}, {2, [<<"two">>]}, {3, [<<"three">>, <<"four">>]}], LDir = ?config(leader_dir, Config), Conf = ?config(osiris_conf, Config), + set_offset_ref(Conf, 3), LLog0 = seed_log(LDir, EpochChunks, Config), osiris_log:close(LLog0), - RConf = Conf#{dir => LDir, offset_ref => ?FUNCTION_NAME, mode => Mode}, + RConf = Conf#{dir => LDir, mode => Mode}, {ok, L1} = osiris_log:init_offset_reader(first, RConf), ?assertEqual(0, osiris_log:next_offset(L1)), @@ -464,7 +465,8 @@ init_offset_reader_last_chunk_is_not_user_chunk(Config) -> osiris_log:close(S2), Conf = ?config(osiris_conf, Config), - RConf = Conf#{dir => LDir, offset_ref => ?FUNCTION_NAME}, + RConf = Conf#{dir => LDir}, + set_offset_ref(RConf, 4), % Test that 'last' returns last user chunk {ok, L1} = osiris_log:init_offset_reader(last, RConf), ?assertEqual(1, osiris_log:next_offset(L1)), @@ -484,6 +486,7 @@ init_offset_reader_no_user_chunk_in_last_segment(Config) -> Conf0 = ?config(osiris_conf, Config), Conf = Conf0#{max_segment_size_bytes => 120}, S0 = seed_log(Conf, [{1, [<<"one">>]}], Config), + set_offset_ref(Conf, 2), S1 = osiris_log:write([<<"1st tracking delta chunk">>], ?CHNK_TRK_DELTA, ?LINE, @@ -496,7 +499,7 @@ init_offset_reader_no_user_chunk_in_last_segment(Config) -> S1), osiris_log:close(S2), - RConf = Conf#{offset_ref => ?FUNCTION_NAME}, + RConf = Conf, % test that 'last' returns last user chunk {ok, L1} = osiris_log:init_offset_reader(last, RConf), ?assertEqual(0, osiris_log:next_offset(L1)), @@ -521,10 +524,11 @@ init_offset_reader_no_user_chunk_in_segments(Config) -> osiris_log:close(S1), Conf = ?config(osiris_conf, Config), - RConf = Conf#{dir => LDir, offset_ref => ?FUNCTION_NAME}, - % Test that 'last' returns 0 when there is no user chunk + RConf = Conf#{dir => LDir}, + % Test that 'last' falls back to `next` behaviour when there is no user chunk + % present in the log {ok, L1} = osiris_log:init_offset_reader(last, RConf), - ?assertEqual(0, osiris_log:next_offset(L1)), + ?assertEqual(1, osiris_log:next_offset(L1)), osiris_log:close(L1), {ok, L2} = osiris_log:init_offset_reader(next, RConf), @@ -542,8 +546,9 @@ init_offset_reader_timestamp(Config) -> LDir = ?config(leader_dir, Config), Conf = ?config(osiris_conf, Config), LLog0 = seed_log(LDir, EpochChunks, Config), + set_offset_ref(Conf, 3), osiris_log:close(LLog0), - RConf = Conf#{dir => LDir, offset_ref => ?FUNCTION_NAME}, + RConf = Conf#{dir => LDir}, {ok, L1} = osiris_log:init_offset_reader({timestamp, Now - 8000}, RConf), @@ -571,7 +576,8 @@ init_offset_reader_truncated(Config) -> Conf = ?config(osiris_conf, Config), LDir = ?config(leader_dir, Config), LLog0 = seed_log(LDir, EpochChunks, Config), - RConf = Conf#{dir => LDir, offset_ref => ?FUNCTION_NAME}, + set_offset_ref(Conf, 1000), + RConf = Conf#{dir => LDir}, osiris_log:close(LLog0), %% "Truncate" log by deleting first segment @@ -608,6 +614,20 @@ init_offset_reader_truncated(Config) -> osiris_log:close(L5), ok. +init_data_reader_next(Config) -> + Conf = ?config(osiris_conf, Config), + LDir = ?config(leader_dir, Config), + FDir = ?config(follower1_dir, Config), + _LLog0 = seed_log(LDir, [{1, ["one"]}], Config), + %% seed is up to date + FLog0 = seed_log(FDir, [{1, ["one"]}], Config), + RRConf = Conf#{dir => LDir}, + %% the next offset, i.e. offset 0 + {ok, _RLog0} = + osiris_log:init_data_reader( + osiris_log:tail_info(FLog0), RRConf), + ok. + init_data_reader_empty_log(Config) -> Conf = ?config(osiris_conf, Config), LDir = ?config(leader_dir, Config), @@ -644,7 +664,7 @@ init_data_reader_truncated(Config) -> Conf = ?config(osiris_conf, Config), LDir = ?config(leader_dir, Config), LLog0 = seed_log(LDir, EpochChunks, Config), - RConf = Conf#{dir => LDir, offset_ref => ?FUNCTION_NAME}, + RConf = Conf#{dir => LDir}, osiris_log:close(LLog0), %% "Truncate" log by deleting first segment @@ -656,10 +676,7 @@ init_data_reader_truncated(Config) -> filename:join(LDir, "00000000000000000000.segment")), %% when requesting a lower offset than the start of the log - %% it should automatically attach at the first available offset - {ok, L1} = osiris_log:init_data_reader({0, empty}, RConf), - ?assert(0 < osiris_log:next_offset(L1)), - osiris_log:close(L1), + {error, {offset_out_of_range, _}} = osiris_log:init_data_reader({0, empty}, RConf), %% attaching inside the log should be ok too {ok, L2} = osiris_log:init_data_reader({750, {1, 700, ?LINE}}, RConf), @@ -1100,3 +1117,6 @@ make_trailer(Type, K, V) -> (byte_size(K)):8/unsigned, K/binary, V:64/unsigned>>. + +set_offset_ref(#{offset_ref := Ref}, Value) -> + atomics:put(Ref, 1, Value). From 4b825db72e512c2e0f7524afd8fe34caa36a5725 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 8 Sep 2021 16:23:05 +0100 Subject: [PATCH 2/2] some further refactoring --- src/osiris_log.erl | 74 +++++++++++++++++++--------------------------- 1 file changed, 30 insertions(+), 44 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index d24eae9b..a15b486b 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -874,24 +874,14 @@ init_data_reader_at(ChunkId, FilePos, File, fd = Fd}. init_data_reader_from(ChunkId, - {end_of_log, #seg_info{file = File} = SegInfo}, - Config) -> - case SegInfo of - #seg_info{file = File, last = undefined} -> - init_data_reader_at(0, ?LOG_HEADER_SIZE, File, Config); - #seg_info{file = File, - last = #chunk_info{id = Id, - num = Num, - pos = Pos, - size = Size}} -> - ChunkId = Id + Num, %% assertion - AttachPos = Pos + Size + ?HEADER_SIZE_B, - init_data_reader_at(ChunkId, AttachPos, File, Config) - end; + {end_of_log, #seg_info{file = File, + last = LastChunk}}, + Config) -> + {ChunkId, AttachPos} = next_location(LastChunk), + init_data_reader_at(ChunkId, AttachPos, File, Config); init_data_reader_from(ChunkId, - {found, #seg_info{file = File} = SegInfo}, - Config) -> - %% TODO: next offset needs to be a chunk offset + {found, #seg_info{file = File} = SegInfo}, + Config) -> {ChunkId, _Epoch, FilePos} = scan_idx(ChunkId, SegInfo), init_data_reader_at(ChunkId, FilePos, File, Config). @@ -965,7 +955,7 @@ init_offset_reader({timestamp, Ts}, #{dir := Dir} = Conf) -> init_offset_reader(first, #{dir := Dir} = Conf) -> case build_log_overview(Dir) of [#seg_info{file = File, - last = undefined}] -> + first = undefined}] -> %% empty log, attach at 0 open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); [#seg_info{file = File, @@ -973,27 +963,17 @@ init_offset_reader(first, #{dir := Dir} = Conf) -> pos = FilePos}} | _] -> open_offset_reader_at(File, FirstChunkId, FilePos, Conf); _ -> - %% TODO - exit({?FUNCTION_NAME, first}) + exit(no_segments_found) end; init_offset_reader(next, #{dir := Dir} = Conf) -> SegInfos = build_log_overview(Dir), case lists:reverse(SegInfos) of [#seg_info{file = File, - last = undefined}] -> - %% empty log, attach at 0 - open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); - [#seg_info{file = File, - last = #chunk_info{size = Size, - num = LastNum, - id = LastChunkId, - pos = LastChPos}} | _] -> - FilePos = LastChPos + Size + ?HEADER_SIZE_B, - NextChunkId = LastChunkId + LastNum, + last = LastChunk} | _] -> + {NextChunkId, FilePos} = next_location(LastChunk), open_offset_reader_at(File, NextChunkId, FilePos, Conf); _ -> - %% TODO - exit({?FUNCTION_NAME, next}) + exit(no_segments_found) end; init_offset_reader(last, #{dir := Dir} = Conf) -> SegInfos = build_log_overview(Dir), @@ -1018,7 +998,8 @@ init_offset_reader(last, #{dir := Dir} = Conf) -> open_offset_reader_at(File, ChunkId, FilePos, Conf) end end; -init_offset_reader(OffsetSpec, #{dir := Dir} = Conf) -> +init_offset_reader(OffsetSpec, #{dir := Dir} = Conf) + when is_integer(OffsetSpec) -> SegInfos = build_log_overview(Dir), ChunkRange = chunk_range_from_segment_infos(SegInfos), Range = offset_range_from_chunk_range(ChunkRange), @@ -1030,11 +1011,10 @@ init_offset_reader(OffsetSpec, #{dir := Dir} = Conf) -> {_, empty} -> 0; {Offset, {_, LastOffs}} - when is_integer(Offset) andalso - Offset > LastOffs -> + when Offset > LastOffs -> %% out of range, clamp as `next` throw({retry_with, next, Conf}); - {Offset, {FirstOffs, _LastOffs}} when is_integer(Offset) -> + {Offset, {FirstOffs, _LastOffs}} -> max(FirstOffs, Offset) end, %% find the appopriate segment and scan the index to find the @@ -1042,7 +1022,11 @@ init_offset_reader(OffsetSpec, #{dir := Dir} = Conf) -> case find_segment_for_offset(StartOffset, SegInfos) of not_found -> {error, {offset_out_of_range, Range}}; - {_, SegmentInfo = #seg_info{file = SegmentFile}} -> + {end_of_log, #seg_info{file = SegmentFile, + last = LastChunk}} -> + {ChunkId, FilePos} = next_location(LastChunk), + open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); + {found, SegmentInfo = #seg_info{file = SegmentFile}} -> {ChunkId, _Epoch, FilePos} = case scan_idx(StartOffset, SegmentInfo) of eof -> @@ -1058,7 +1042,6 @@ init_offset_reader(OffsetSpec, #{dir := Dir} = Conf) -> end, ?DEBUG("osiris_log:init_offset_reader/2 resolved chunk_id ~b" " at file pos: ~w ", [ChunkId, FilePos]), - open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf) end catch @@ -1958,12 +1941,7 @@ scan_idx(Offset, #seg_info{index = IndexFile, fun() -> case offset_range_from_segment_infos([SegmentInfo]) of empty -> - %% TODO: this does not feel right - %% if the index is empty do we really know - %% the offset will be next - %% this relies on us always reducing the - %% Offset to within the log range - {0, 0, ?LOG_HEADER_SIZE}; + eof; {SegmentStartOffs, SegmentEndOffs} -> case Offset < SegmentStartOffs orelse Offset > SegmentEndOffs of @@ -2274,6 +2252,14 @@ trigger_retention_eval(#?MODULE{cfg = end), State. +next_location(undefined) -> + {0, ?LOG_HEADER_SIZE}; +next_location(#chunk_info{id = Id, + num = Num, + pos = Pos, + size = Size}) -> + {Id + Num, Pos + Size + ?HEADER_SIZE_B}. + -ifdef(TEST). % -include_lib("eunit/include/eunit.hrl").