Skip to content

Commit

Permalink
Merge branch 'master' into mas-i164-hotbackup
Browse files Browse the repository at this point in the history
  • Loading branch information
martinsumner committed Sep 7, 2018
2 parents 9a8ce88 + faec45a commit b99cde9
Show file tree
Hide file tree
Showing 9 changed files with 574 additions and 104 deletions.
6 changes: 6 additions & 0 deletions include/leveled.hrl
Expand Up @@ -56,6 +56,9 @@
root_path :: string() | undefined,
cdb_options :: #cdb_options{} | undefined,
start_snapshot = false :: boolean(),
%% so a snapshot can monitor the bookie and
%% terminate when it does
bookies_pid :: pid() | undefined,
source_inker :: pid() | undefined,
reload_strategy = [] :: list(),
waste_retention_period :: integer() | undefined,
Expand All @@ -70,6 +73,9 @@
max_inmemory_tablesize :: integer() | undefined,
start_snapshot = false :: boolean(),
snapshot_query,
%% so a snapshot can monitor the bookie and
%% terminate when it does
bookies_pid :: pid() | undefined,
bookies_mem :: tuple() | undefined,
source_penciller :: pid() | undefined,
snapshot_longrunning = true :: boolean(),
Expand Down
315 changes: 311 additions & 4 deletions src/leveled_bookie.erl

Large diffs are not rendered by default.

50 changes: 50 additions & 0 deletions src/leveled_inker.erl
Expand Up @@ -142,6 +142,7 @@
cdb_options :: #cdb_options{} | undefined,
clerk :: pid() | undefined,
compaction_pending = false :: boolean(),
bookie_monref :: reference() | undefined,
is_snapshot = false :: boolean(),
compression_method = native :: lz4|native,
compress_on_receipt = false :: boolean(),
Expand Down Expand Up @@ -440,6 +441,9 @@ init([InkerOpts]) ->
case {InkerOpts#inker_options.root_path,
InkerOpts#inker_options.start_snapshot} of
{undefined, true} ->
%% monitor the bookie, and close the snapshot when bookie
%% exits
BookieMonitor = erlang:monitor(process, InkerOpts#inker_options.bookies_pid),
SrcInker = InkerOpts#inker_options.source_inker,
{Manifest,
ActiveJournalDB,
Expand All @@ -448,6 +452,7 @@ init([InkerOpts]) ->
active_journaldb = ActiveJournalDB,
source_inker = SrcInker,
journal_sqn = JournalSQN,
bookie_monref = BookieMonitor,
is_snapshot = true}};
%% Need to do something about timeout
{_RootPath, false} ->
Expand Down Expand Up @@ -642,6 +647,12 @@ handle_cast({release_snapshot, Snapshot}, State) ->
leveled_log:log("I0004", [length(Rs)]),
{noreply, State#state{registered_snapshots=Rs}}.

%% handle the bookie stopping and stop this snapshot
handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info},
State=#state{bookie_monref = BookieMonRef}) ->
%% Monitor only registered on snapshots
ok = ink_releasesnapshot(State#state.source_inker, self()),
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.

Expand Down Expand Up @@ -1379,4 +1390,43 @@ coverage_cheat_test() ->
{noreply, _State0} = handle_info(timeout, #state{}),
{ok, _State1} = code_change(null, #state{}, null).

handle_down_test() ->
RootPath = "../test/journal",
build_dummy_journal(),
CDBopts = #cdb_options{max_size=300000, binary_mode=true},
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
cdb_options=CDBopts,
compression_method=native,
compress_on_receipt=true}),

FakeBookie = spawn(fun loop/0),

Mon = erlang:monitor(process, FakeBookie),

SnapOpts = #inker_options{start_snapshot=true,
bookies_pid = FakeBookie,
source_inker=Ink1},

{ok, Snap1} = ink_snapstart(SnapOpts),

FakeBookie ! stop,

receive
{'DOWN', Mon, process, FakeBookie, normal} ->
%% Now we know that inker should have received this too!
%% (better than timer:sleep/1)
ok
end,

?assertEqual(undefined, erlang:process_info(Snap1)),

ink_close(Ink1),
clean_testdir(RootPath).

loop() ->
receive
stop ->
ok
end.

-endif.
66 changes: 65 additions & 1 deletion src/leveled_penciller.erl
Expand Up @@ -257,6 +257,7 @@
is_snapshot = false :: boolean(),
snapshot_fully_loaded = false :: boolean(),
source_penciller :: pid() | undefined,
bookie_monref :: reference() | undefined,
levelzero_astree :: list() | undefined,

work_ongoing = false :: boolean(), % i.e. compaction work
Expand Down Expand Up @@ -583,14 +584,19 @@ init([PCLopts]) ->
{undefined, _Snapshot=true, Query, BookiesMem} ->
SrcPenciller = PCLopts#penciller_options.source_penciller,
LongRunning = PCLopts#penciller_options.snapshot_longrunning,
%% monitor the bookie, and close the snapshot when bookie
%% exits
BookieMonitor = erlang:monitor(process, PCLopts#penciller_options.bookies_pid),

{ok, State} = pcl_registersnapshot(SrcPenciller,
self(),
Query,
BookiesMem,
LongRunning),
leveled_log:log("P0001", [self()]),
{ok, State#state{is_snapshot=true,
source_penciller=SrcPenciller}};
bookie_monref = BookieMonitor,
source_penciller=SrcPenciller}};
{_RootPath, _Snapshot=false, _Q, _BM} ->
start_from_file(PCLopts)
end.
Expand Down Expand Up @@ -943,6 +949,11 @@ handle_cast(work_for_clerk, State) ->
end.


%% handle the bookie stopping and stop this snapshot
handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info},
State=#state{bookie_monref = BookieMonRef}) ->
ok = pcl_releasesnapshot(State#state.source_penciller, self()),
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.

Expand Down Expand Up @@ -2145,4 +2156,57 @@ coverage_cheat_test() ->
{noreply, _State0} = handle_info(timeout, #state{}),
{ok, _State1} = code_change(null, #state{}, null).

handle_down_test() ->
RootPath = "../test/ledger",
clean_testdir(RootPath),
{ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath,
max_inmemory_tablesize=1000,
compression_method=native}),
FakeBookie = spawn(fun loop/0),

Mon = erlang:monitor(process, FakeBookie),

FakeBookie ! {snap, PCLr, self()},

{ok, PclSnap, null} =
receive
{FakeBookie, {ok, Snap, null}} ->
{ok, Snap, null}
end,

FakeBookie ! stop,

receive
{'DOWN', Mon, process, FakeBookie, normal} ->
%% Now we know that pclr should have received this too!
%% (better than timer:sleep/1)
ok
end,

?assertEqual(undefined, erlang:process_info(PclSnap)),

pcl_close(PCLr),
clean_testdir(RootPath).

%% the fake bookie. Some calls to leveled_bookie (like the two below)
%% do not go via the gen_server (but it looks like they expect to be
%% called by the gen_server, internally!) they use "self()" to
%% populate the bookie's pid in the pclr. This process wrapping the
%% calls ensures that the TEST controls the bookie's Pid. The
%% FakeBookie.
loop() ->
receive
{snap, PCLr, TestPid} ->
Res = leveled_bookie:snapshot_store(leveled_bookie:empty_ledgercache(),
PCLr,
null,
ledger,
undefined,
false),
TestPid ! {self(), Res},
loop();
stop ->
ok
end.

-endif.
26 changes: 14 additions & 12 deletions test/end_to_end/basic_SUITE.erl
Expand Up @@ -579,8 +579,9 @@ space_clear_ondelete(_Config) ->
G2),

FoldKeysFun = fun(B, K, Acc) -> [{B, K}|Acc] end,
AllKeyQuery = {keylist, o_rkv, {FoldKeysFun, []}},
{async, F1} = leveled_bookie:book_returnfolder(Book1, AllKeyQuery),

{async, F1} = leveled_bookie:book_keylist(Book1, o_rkv, {FoldKeysFun, []}),

SW1 = os:timestamp(),
KL1 = F1(),
ok = case length(KL1) of
Expand All @@ -594,19 +595,20 @@ space_clear_ondelete(_Config) ->
{ok, FNsA_J} = file:list_dir(RootPath ++ "/journal/journal_files"),
io:format("FNsA - Bookie created ~w journal files and ~w ledger files~n",
[length(FNsA_J), length(FNsA_L)]),

% Get an iterator to lock the inker during compaction
FoldObjectsFun = fun(B, K, ObjBin, Acc) ->
[{B, K, erlang:phash2(ObjBin)}|Acc] end,
{async, HTreeF1} = leveled_bookie:book_returnfolder(Book1,
{foldobjects_allkeys,
?RIAK_TAG,
FoldObjectsFun,
false}),

{async, HTreeF1} = leveled_bookie:book_objectfold(Book1,
?RIAK_TAG,
{FoldObjectsFun, []},
false),

% This query does not Snap PreFold - and so will not prevent
% pending deletes from prompting actual deletes

{async, KF1} = leveled_bookie:book_returnfolder(Book1, AllKeyQuery),
{async, KF1} = leveled_bookie:book_keylist(Book1, o_rkv, {FoldKeysFun, []}),
% This query does Snap PreFold, and so will prevent deletes from
% the ledger

Expand Down Expand Up @@ -662,7 +664,7 @@ space_clear_ondelete(_Config) ->
"after deletes~n",
[PointB_Journals, length(FNsB_L)]),

{async, F2} = leveled_bookie:book_returnfolder(Book1, AllKeyQuery),
{async, F2} = leveled_bookie:book_keylist(Book1, o_rkv, {FoldKeysFun, []}),
SW3 = os:timestamp(),
KL2 = F2(),
ok = case length(KL2) of
Expand All @@ -674,7 +676,7 @@ space_clear_ondelete(_Config) ->
ok = leveled_bookie:book_close(Book1),

{ok, Book2} = leveled_bookie:book_start(StartOpts1),
{async, F3} = leveled_bookie:book_returnfolder(Book2, AllKeyQuery),
{async, F3} = leveled_bookie:book_keylist(Book2, o_rkv, {FoldKeysFun, []}),
SW4 = os:timestamp(),
KL3 = F3(),
ok = case length(KL3) of
Expand Down Expand Up @@ -842,4 +844,4 @@ many_put_fetch_switchcompression(_Config) ->
testutil:check_forlist(Bookie3, ChkListFixed),
testutil:check_forobject(Bookie3, TestObject),
testutil:check_formissingobject(Bookie3, "Bookie1", "MissingKey0123"),
ok = leveled_bookie:book_destroy(Bookie3).
ok = leveled_bookie:book_destroy(Bookie3).

0 comments on commit b99cde9

Please sign in to comment.