Skip to content

Commit

Permalink
MB-9730 Integrate native viewgroup cleanup support
Browse files Browse the repository at this point in the history
An external native program `couch_view_group_cleanup`
is invoked to perform viewgroup cleanup. On successful cleanup,
viewgroup couch_file processes are refreshed to use new views
after cleanup.

A comparison of time taken for cleanup before and after this change
benchmarked by running `test/11-updates-cleanup-many-views.t` with
6400000 documents are as follows:

Before:
Removed 30000 values from the index in 4.468 seconds

After:
Removed 30000 values from the index in 2.364 seconds

Change-Id: I1ef010a5c1dad8366e90dbbba4bf93117a92377f
Reviewed-on: http://review.couchbase.org/30787
Reviewed-by: Filipe David Borba Manana <fdmanana@gmail.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
  • Loading branch information
t3rm1n4l authored and fdmanana committed Jan 6, 2014
1 parent cafce84 commit 37f038a
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 24 deletions.
17 changes: 11 additions & 6 deletions src/couch_set_view/src/couch_set_view_group.erl
Expand Up @@ -1195,8 +1195,9 @@ handle_info({updater_info, _Pid, {state, _UpdaterState}}, State) ->
% Message from an old updater, ignore.
{noreply, State, ?GET_TIMEOUT(State)};

handle_info({'EXIT', Pid, {clean_group, NewGroup0, Count, Time}}, #state{cleaner_pid = Pid} = State) ->
handle_info({'EXIT', Pid, {clean_group, CleanGroup, Count, Time}}, #state{cleaner_pid = Pid} = State) ->
#state{group = OldGroup} = State,
{ok, NewGroup0} = couch_set_view_util:refresh_viewgroup_header(CleanGroup),
NewGroup = update_clean_group_seqs(OldGroup, NewGroup0),
?LOG_INFO("Cleanup finished for set view `~s`, ~s (~s) group `~s`~n"
"Removed ~p values from the index in ~.3f seconds~n"
Expand All @@ -1211,7 +1212,7 @@ handle_info({'EXIT', Pid, {clean_group, NewGroup0, Count, Time}}, #state{cleaner
"Current set of replica partitions: ~w~n"
"Current set of replicas on transfer: ~w~n";
false ->
[]
[]
end,
[?set_name(State), ?type(State), ?category(State), ?group_id(State),
Count, Time,
Expand All @@ -1234,7 +1235,8 @@ handle_info({'EXIT', Pid, {clean_group, NewGroup0, Count, Time}}, #state{cleaner
inc_cleanups(State2#state.group, Time, Count, false),
{noreply, maybe_apply_pending_transition(State2)};

handle_info({'EXIT', Pid, Reason}, #state{cleaner_pid = Pid} = State) ->
handle_info({'EXIT', Pid, Reason}, #state{cleaner_pid = Pid, group = Group} = State) ->
ok = couch_file:refresh_eof(Group#set_view_group.fd),
?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`, cleanup process ~p"
" died with unexpected reason: ~p",
[?set_name(State), ?type(State), ?category(State),
Expand Down Expand Up @@ -2624,7 +2626,7 @@ maybe_start_cleaner(#state{group = Group} = State) ->
-spec stop_cleaner(#state{}) -> #state{}.
stop_cleaner(#state{cleaner_pid = nil} = State) ->
State;
stop_cleaner(#state{cleaner_pid = Pid} = State) when is_pid(Pid) ->
stop_cleaner(#state{cleaner_pid = Pid, group = Group} = State) when is_pid(Pid) ->
MRef = erlang:monitor(process, Pid),
Pid ! stop,
unlink(Pid),
Expand All @@ -2638,6 +2640,7 @@ stop_cleaner(#state{cleaner_pid = Pid} = State) when is_pid(Pid) ->
after_cleaner_stopped(State, Reason)
after 5000 ->
couch_util:shutdown_sync(Pid),
ok = couch_file:refresh_eof(Group#set_view_group.fd),
?LOG_ERROR("Timeout stopping cleanup process ~p for"
" set view `~s`, ~s (~s) group `~s`",
[Pid, ?set_name(State), ?type(State), ?category(State),
Expand All @@ -2648,8 +2651,9 @@ stop_cleaner(#state{cleaner_pid = Pid} = State) when is_pid(Pid) ->
NewState.


after_cleaner_stopped(State, {clean_group, NewGroup0, Count, Time}) ->
after_cleaner_stopped(State, {clean_group, CleanGroup, Count, Time}) ->
#state{group = OldGroup} = State,
{ok, NewGroup0} = couch_set_view_util:refresh_viewgroup_header(CleanGroup),
NewGroup = update_clean_group_seqs(OldGroup, NewGroup0),
?LOG_INFO("Stopped cleanup process for"
" set view `~s`, ~s (~s) group `~s`.~n"
Expand All @@ -2670,7 +2674,8 @@ after_cleaner_stopped(State, {clean_group, NewGroup0, Count, Time}) ->
group = NewGroup,
cleaner_pid = nil
};
after_cleaner_stopped(#state{cleaner_pid = Pid} = State, Reason) ->
after_cleaner_stopped(#state{cleaner_pid = Pid, group = Group} = State, Reason) ->
ok = couch_file:refresh_eof(Group#set_view_group.fd),
?LOG_ERROR("Cleanup process ~p for set view `~s`, ~s (~s) group `~s`,"
" died with reason: ~p",
[Pid, ?set_name(State), ?type(State), ?category(State),
Expand Down
45 changes: 43 additions & 2 deletions src/couch_set_view/src/couch_set_view_util.erl
Expand Up @@ -31,6 +31,7 @@
-export([get_part_seq/2, has_part_seq/2, find_part_seq/2]).
-export([set_view_sig/1]).
-export([check_primary_key_size/5, check_primary_value_size/5]).
-export([refresh_viewgroup_header/1]).


-include("couch_db.hrl").
Expand Down Expand Up @@ -230,11 +231,20 @@ compute_indexed_bitmap(Mod, IdBtree, Views) ->
cleanup_group(Group) when ?set_cbitmask(Group) == 0 ->
{ok, Group, 0};
cleanup_group(Group) ->
#set_view_group{mod = Mod} = Group,
case Mod of
mapreduce_view ->
Mod:cleanup_view_group(Group);
_ ->
cleanup_group(Mod, Group)
end.


cleanup_group(Mod, Group) ->
#set_view_group{
index_header = Header,
id_btree = IdBtree,
views = Views,
mod = Mod
views = Views
} = Group,
PurgeFun = make_btree_purge_fun(Group),
ok = couch_set_view_util:open_raw_read_fd(Group),
Expand Down Expand Up @@ -611,3 +621,34 @@ check_primary_value_size(Bin, Max, Key, DocId, Group) when byte_size(Bin) > Max
throw({error, Error});
check_primary_value_size(_Bin, _Max, _Key, _DocId, _Group) ->
ok.


% Read latest header from index file and update viewgroup
-spec refresh_viewgroup_header(#set_view_group{}) -> {'ok', #set_view_group{}}.
refresh_viewgroup_header(Group) ->
#set_view_group{
fd = Fd,
mod = Mod,
id_btree = IdBtree,
views = Views
} = Group,
ok = couch_file:refresh_eof(Fd),
{ok, HeaderBin, NewHeaderPos} = couch_file:read_header_bin(Fd),
NewHeader = couch_set_view_util:header_bin_to_term(HeaderBin),
#set_view_index_header{
id_btree_state = IdBtreeState,
view_states = ViewStates
} = NewHeader,
NewIdBtree = couch_btree:set_state(IdBtree, IdBtreeState),
NewViews = lists:zipwith(fun(NewState, SetView) ->
View = SetView#set_view.indexer,
NewView = Mod:set_state(View, NewState),
SetView#set_view{indexer = NewView}
end, ViewStates, Views),
NewGroup = Group#set_view_group{
header_pos = NewHeaderPos,
id_btree = NewIdBtree,
views = NewViews,
index_header = NewHeader
},
{ok, NewGroup}.
63 changes: 47 additions & 16 deletions src/couch_set_view/src/mapreduce_view.erl
Expand Up @@ -23,7 +23,7 @@
-export([design_doc_to_set_view_group/2, view_group_data_size/2,
reset_view/1, setup_views/5]).
% For the utils
-export([clean_views/5]).
-export([cleanup_view_group/1]).
% For the compactor
-export([compact_view/6, apply_log/2]).
% For the main module
Expand Down Expand Up @@ -458,22 +458,53 @@ setup_views(Fd, BtreeOptions, Group, ViewStates, Views) ->
ViewStates, Views).


% XXX vmx 2013-01-04: Check if we really need the full function, or just the
% `guided_purge` in this indexer specific module
clean_views(_, _, [], Count, Acc) ->
{Count, lists:reverse(Acc)};
clean_views(stop, _, Rest, Count, Acc) ->
{Count, lists:reverse(Acc, Rest)};
clean_views(go, PurgeFun, [SetView | Rest], Count, Acc) ->
View = SetView#set_view.indexer,
Btree = View#mapreduce_view.btree,
{ok, NewBtree, {Go, PurgedCount}} =
couch_btree:guided_purge(Btree, PurgeFun, {go, Count}),
NewAcc = [SetView#set_view{
indexer = View#mapreduce_view{btree = NewBtree}
} | Acc],
clean_views(Go, PurgeFun, Rest, PurgedCount, NewAcc).
% Native viewgroup cleanup
cleanup_view_group(Group) ->
case os:find_executable("couch_view_group_cleanup") of
false ->
Cmd = nil,
throw(<<"couch_view_group_cleanup command not found">>);
Cmd ->
ok
end,
Options = [exit_status, use_stdio, stderr_to_stdout, {line, 4096}, binary],
Port = open_port({spawn_executable, Cmd}, Options),
send_group_info(Group, Port),
PurgedCount = try cleanup_view_group_wait_loop(Port, Group, [], 0) of
{ok, Count0} ->
Count0
catch
Error ->
exit(Error)
after
catch port_close(Port)
end,
{ok, Group, PurgedCount}.

cleanup_view_group_wait_loop(Port, Group, Acc, PurgedCount) ->
receive
{Port, {exit_status, 0}} ->
{ok, PurgedCount};
{Port, {exit_status, Status}} ->
throw({view_group_cleanup_exit, Status});
{Port, {data, {noeol, Data}}} ->
cleanup_view_group_wait_loop(Port, Group, [Data | Acc], PurgedCount);
{Port, {data, {eol, <<"PurgedCount ", Data/binary>>}}} ->
{Count,[]} = string:to_integer(erlang:binary_to_list(Data)),
cleanup_view_group_wait_loop(Port, Group, Acc, Count);
{Port, {data, {eol, Data}}} ->
#set_view_group{
set_name = SetName,
name = DDocId,
type = Type
} = Group,
Msg = lists:reverse([Data | Acc]),
?LOG_ERROR("Set view `~s`, ~s group `~s`, received error from index cleanup: ~s",
[SetName, Type, DDocId, Msg]),
cleanup_view_group_wait_loop(Port, Group, [], PurgedCount);
{Port, Error} ->
throw({view_group_cleanup_error, Error})
end.

compact_view(Fd, SetView, EmptySetView, FilterFun, BeforeKVWriteFun, Acc0) ->
EmptyView = EmptySetView#set_view.indexer,
Expand Down

0 comments on commit 37f038a

Please sign in to comment.