Skip to content

Commit

Permalink
MB-7629 Fix view compactor stopper to stop native process
Browse files Browse the repository at this point in the history
Compactor stopper has to send a stop message to the
native compactor process and it has to die gracefully.
It should be waiting for native process to die after
sending stop message.

Change-Id: I07c02bd5585538b51e05e9d59595f5b684d8cc3c
Reviewed-on: http://review.couchbase.org/32310
Reviewed-by: Filipe David Borba Manana <fdmanana@gmail.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
  • Loading branch information
t3rm1n4l authored and fdmanana committed Jan 21, 2014
1 parent 19393cd commit d0d76be
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 27 deletions.
72 changes: 47 additions & 25 deletions src/couch_set_view/src/couch_set_view_compactor.erl
Expand Up @@ -142,31 +142,44 @@ compact_group(Group0, EmptyGroup, TmpDir, UpdaterPid, Owner, UserStatus) ->
% For spatial, use erlang view compactor
NewGroup = case Mod of
spatial_view ->
FilterFun = case ?set_cbitmask(Group) of
0 ->
fun(_Kv) -> true end;
_ ->
fun({_Key, <<PartId:16, _/binary>>}) ->
((1 bsl PartId) band ?set_cbitmask(Group)) =:= 0
end
end,

BeforeKVWriteFun = fun(KV, Acc) ->
{KV, update_task(Acc, 1)}
end,
Parent = self(),
ChildPid = spawn_link(fun() ->
FilterFun = case ?set_cbitmask(Group) of
0 ->
fun(_Kv) -> true end;
_ ->
fun({_Key, <<PartId:16, _/binary>>}) ->
((1 bsl PartId) band ?set_cbitmask(Group)) =:= 0
end
end,

BeforeKVWriteFun = fun(KV, Acc) ->
{KV, update_task(Acc, 1)}
end,

ok = couch_set_view_util:open_raw_read_fd(Group),
{NewViews, _} = lists:mapfoldl(fun({View, EmptyView}, Acc) ->
Mod:compact_view(Fd, View, EmptyView, FilterFun, BeforeKVWriteFun, Acc)
end, Acc1, lists:zip(Views, EmptyViews)),
ok = couch_set_view_util:close_raw_read_fd(Group),
Header = NewGroup0#set_view_group.index_header,
SpatialGroup0 = NewGroup0#set_view_group{
views = NewViews,
index_header = Header#set_view_index_header{
view_states = [Mod:get_state(V#set_view.indexer) || V <- NewViews]
}
},
Parent ! {spatial_group, self(), SpatialGroup0}
end),

ok = couch_set_view_util:open_raw_read_fd(Group),
{NewViews, _} = lists:mapfoldl(fun({View, EmptyView}, Acc) ->
Mod:compact_view(Fd, View, EmptyView, FilterFun, BeforeKVWriteFun, Acc)
end, Acc1, lists:zip(Views, EmptyViews)),
ok = couch_set_view_util:close_raw_read_fd(Group),
Header = NewGroup0#set_view_group.index_header,
NewGroup0#set_view_group{
views = NewViews,
index_header = Header#set_view_index_header{
view_states = [Mod:get_state(V#set_view.indexer) || V <- NewViews]
}
};
receive
% Handle force compactor stop
stop ->
couch_util:shutdown_sync(ChildPid),
exit(shutdown);
{spatial_group, ChildPid, SpatialGroup} ->
SpatialGroup
end;
mapreduce_view ->
NewGroup0
end,
Expand Down Expand Up @@ -382,10 +395,19 @@ compact_btrees_wait_loop(Port, Group, EmptyGroup, Acc0, ResultAcc) ->
compact_btrees_wait_loop(Port, Group, EmptyGroup, Acc2, ResultAcc);
{Port, {exit_status, 0}} ->
{ok, {Group, ResultAcc}};
{Port, {exit_status, 1}} ->
?LOG_INFO("Set view `~s`, ~s group `~s`, index compactor stopped successfully.",
[SetName, Type, DDocId]),
exit(shutdown);
{Port, {exit_status, Status}} ->
throw({view_group_index_compactor_exit, Status});
{Port, Error} ->
throw({view_group_index_compactor_error, Error})
throw({view_group_index_compactor_error, Error});
stop ->
?LOG_INFO("Set view `~s`, ~s group `~s`, sending stop message to index compactor.",
[SetName, Type, DDocId]),
true = port_command(Port, "exit"),
compact_btrees_wait_loop(Port, Group, EmptyGroup, Acc, ResultAcc)
end;
<<"Header Len : ", Data/binary>> ->
% Read resulting group from stdout
Expand Down
4 changes: 2 additions & 2 deletions src/couch_set_view/src/couch_set_view_group.erl
Expand Up @@ -1472,7 +1472,7 @@ terminate(Reason, #state{group = #set_view_group{sig = Sig} = Group} = State) ->
catch couch_db_set:close(?db_set(State3)),
couch_set_view_util:shutdown_wait(State3#state.cleaner_pid),
couch_set_view_util:shutdown_wait(State3#state.updater_pid),
couch_util:shutdown_sync(State3#state.compactor_pid),
couch_set_view_util:shutdown_wait(State3#state.compactor_pid),
couch_util:shutdown_sync(State3#state.compactor_file),
couch_util:shutdown_sync(State3#state.replica_group),
Group = State#state.group,
Expand Down Expand Up @@ -2754,7 +2754,7 @@ start_compactor(State, CompactFun) ->
stop_compactor(#state{compactor_pid = nil} = State) ->
State;
stop_compactor(#state{compactor_pid = Pid, compactor_file = CompactFd} = State) ->
couch_util:shutdown_sync(Pid),
couch_set_view_util:shutdown_wait(Pid),
couch_util:shutdown_sync(CompactFd),
CompactFile = compact_file_name(State),
ok = couch_file:delete(?root_dir(State), CompactFile),
Expand Down

0 comments on commit d0d76be

Please sign in to comment.