Skip to content

Commit

Permalink
Merge pull request #160 from rabbitmq/replica-format-status
Browse files Browse the repository at this point in the history
Correctly handle case where replica exits in handle_continue.
  • Loading branch information
kjnilsson committed Apr 30, 2024
2 parents f4f2043 + 2a9109e commit 901b71d
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 38 deletions.
1 change: 0 additions & 1 deletion .github/workflows/erlang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ jobs:
fail-fast: false
matrix:
otp_major:
- "24"
- "25"
- "26"
steps:
Expand Down
62 changes: 34 additions & 28 deletions src/osiris_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,8 @@ truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) ->
-spec init_data_reader(osiris:tail_info(), config()) ->
{ok, state()} |
{error, {offset_out_of_range, empty | {offset(), offset()}}} |
{error, {invalid_last_offset_epoch, epoch(), offset()}}.
{error, {invalid_last_offset_epoch, epoch(), offset()}} |
{error, file:posix()}.
init_data_reader({StartChunkId, PrevEOT}, #{dir := Dir,
name := Name} = Config) ->
IdxFiles = sorted_index_files(Dir),
Expand All @@ -989,19 +990,19 @@ init_data_reader({StartChunkId, PrevEOT}, #{dir := Dir,
_ when PrevEOT == empty ->
%% this assumes the offset is in range
%% first we need to validate PrevEO
{ok, init_data_reader_from(
StartChunkId,
find_segment_for_offset(StartChunkId, IdxFiles),
Config)};
init_data_reader_from(StartChunkId,
find_segment_for_offset(StartChunkId,
IdxFiles),
Config);
_ ->
{PrevEpoch, PrevChunkId, _PrevTs} = PrevEOT,
case check_chunk_has_expected_epoch(Name, PrevChunkId,
PrevEpoch, IdxFiles) of
ok ->
{ok, init_data_reader_from(
StartChunkId,
find_segment_for_offset(StartChunkId, IdxFiles),
Config)};
init_data_reader_from(StartChunkId,
find_segment_for_offset(StartChunkId,
IdxFiles),
Config);
{error, _} = Err ->
Err
end
Expand Down Expand Up @@ -1029,25 +1030,30 @@ init_data_reader_at(ChunkId, FilePos, File,
#{dir := Dir, name := Name,
shared := Shared,
readers_counter_fun := CountersFun} = Config) ->
{ok, Fd} = file:open(File, [raw, binary, read]),
Cnt = make_counter(Config),
counters:put(Cnt, ?C_OFFSET, ChunkId - 1),
CountersFun(1),
#?MODULE{cfg =
#cfg{directory = Dir,
counter = Cnt,
counter_id = counter_id(Config),
name = Name,
readers_counter_fun = CountersFun,
shared = Shared
},
mode =
#read{type = data,
next_offset = ChunkId,
chunk_selector = all,
position = FilePos,
transport = maps:get(transport, Config, tcp)},
fd = Fd}.
case file:open(File, [raw, binary, read]) of
{ok, Fd} ->
Cnt = make_counter(Config),
counters:put(Cnt, ?C_OFFSET, ChunkId - 1),
CountersFun(1),
{ok,
#?MODULE{cfg =
#cfg{directory = Dir,
counter = Cnt,
counter_id = counter_id(Config),
name = Name,
readers_counter_fun = CountersFun,
shared = Shared
},
mode =
#read{type = data,
next_offset = ChunkId,
chunk_selector = all,
position = FilePos,
transport = maps:get(transport, Config, tcp)},
fd = Fd}};
Err ->
Err
end.

init_data_reader_from(ChunkId,
{end_of_log, #seg_info{file = File,
Expand Down
14 changes: 7 additions & 7 deletions src/osiris_replica.erl
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ handle_continue(#{name := Name0,
log = Log,
parse_state = undefined}};
{error, Reason} ->
{stop, Reason, undefined}
?WARN_(Name, " failed to start replica reader. Reason ~0p", [Reason]),
{stop, {shutdown, Reason}, undefined}
end
end.

Expand Down Expand Up @@ -585,10 +586,6 @@ terminate(Reason, #?MODULE{cfg = #cfg{name = Name,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

format_status(undefined) ->
%% Handle formatting the status when the server shut down before start-up,
%% for example when the rpc call in `handle_continue/2' fails.
undefined;
format_status(#{state := #?MODULE{cfg = #cfg{name = Name,
reference = ExtRef},
log = Log,
Expand All @@ -603,8 +600,11 @@ format_status(#{state := #?MODULE{cfg = #cfg{name = Name,
num_offset_listeners => length(OffsetListeners),
committed_offset => CommittedOffset
},
Status).

Status);
format_status(Status) ->
%% Handle formatting the status when the server shut down before start-up,
%% for example when the rpc call in `handle_continue/2' fails.
Status.
%%%===================================================================
%%% Internal functions
%%%===================================================================
Expand Down
10 changes: 8 additions & 2 deletions src/osiris_replica_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,14 @@ init(#{hosts := Hosts,
{ok, State};
{error, no_process} ->
?WARN_(Name,
"osiris writer for ~0p is down,
replica reader will not start",
"osiris writer for ~0p is down, "
"replica reader will not start",
[ExtRef]),
{stop, normal};
{error, enoent} ->
?WARN_(Name,
"data reader for ~0p encountered an 'enonet' error whilst
initialising, replica reader will not start",
[ExtRef]),
{stop, normal};
{error, {offset_out_of_range, Range}} ->
Expand Down

0 comments on commit 901b71d

Please sign in to comment.