diff --git a/src/couch_set_view/src/couch_set_view_compactor.erl b/src/couch_set_view/src/couch_set_view_compactor.erl index 1872ee252..7a7878369 100644 --- a/src/couch_set_view/src/couch_set_view_compactor.erl +++ b/src/couch_set_view/src/couch_set_view_compactor.erl @@ -142,31 +142,44 @@ compact_group(Group0, EmptyGroup, TmpDir, UpdaterPid, Owner, UserStatus) -> % For spatial, use erlang view compactor NewGroup = case Mod of spatial_view -> - FilterFun = case ?set_cbitmask(Group) of - 0 -> - fun(_Kv) -> true end; - _ -> - fun({_Key, <>}) -> - ((1 bsl PartId) band ?set_cbitmask(Group)) =:= 0 - end - end, - - BeforeKVWriteFun = fun(KV, Acc) -> - {KV, update_task(Acc, 1)} - end, + Parent = self(), + ChildPid = spawn_link(fun() -> + FilterFun = case ?set_cbitmask(Group) of + 0 -> + fun(_Kv) -> true end; + _ -> + fun({_Key, <>}) -> + ((1 bsl PartId) band ?set_cbitmask(Group)) =:= 0 + end + end, + + BeforeKVWriteFun = fun(KV, Acc) -> + {KV, update_task(Acc, 1)} + end, + + ok = couch_set_view_util:open_raw_read_fd(Group), + {NewViews, _} = lists:mapfoldl(fun({View, EmptyView}, Acc) -> + Mod:compact_view(Fd, View, EmptyView, FilterFun, BeforeKVWriteFun, Acc) + end, Acc1, lists:zip(Views, EmptyViews)), + ok = couch_set_view_util:close_raw_read_fd(Group), + Header = NewGroup0#set_view_group.index_header, + SpatialGroup0 = NewGroup0#set_view_group{ + views = NewViews, + index_header = Header#set_view_index_header{ + view_states = [Mod:get_state(V#set_view.indexer) || V <- NewViews] + } + }, + Parent ! {spatial_group, self(), SpatialGroup0} + end), - ok = couch_set_view_util:open_raw_read_fd(Group), - {NewViews, _} = lists:mapfoldl(fun({View, EmptyView}, Acc) -> - Mod:compact_view(Fd, View, EmptyView, FilterFun, BeforeKVWriteFun, Acc) - end, Acc1, lists:zip(Views, EmptyViews)), - ok = couch_set_view_util:close_raw_read_fd(Group), - Header = NewGroup0#set_view_group.index_header, - NewGroup0#set_view_group{ - views = NewViews, - index_header = Header#set_view_index_header{ - view_states = [Mod:get_state(V#set_view.indexer) || V <- NewViews] - } - }; + receive + % Handle force compactor stop + stop -> + couch_util:shutdown_sync(ChildPid), + exit(shutdown); + {spatial_group, ChildPid, SpatialGroup} -> + SpatialGroup + end; mapreduce_view -> NewGroup0 end, @@ -382,10 +395,19 @@ compact_btrees_wait_loop(Port, Group, EmptyGroup, Acc0, ResultAcc) -> compact_btrees_wait_loop(Port, Group, EmptyGroup, Acc2, ResultAcc); {Port, {exit_status, 0}} -> {ok, {Group, ResultAcc}}; + {Port, {exit_status, 1}} -> + ?LOG_INFO("Set view `~s`, ~s group `~s`, index compactor stopped successfully.", + [SetName, Type, DDocId]), + exit(shutdown); {Port, {exit_status, Status}} -> throw({view_group_index_compactor_exit, Status}); {Port, Error} -> - throw({view_group_index_compactor_error, Error}) + throw({view_group_index_compactor_error, Error}); + stop -> + ?LOG_INFO("Set view `~s`, ~s group `~s`, sending stop message to index compactor.", + [SetName, Type, DDocId]), + true = port_command(Port, "exit"), + compact_btrees_wait_loop(Port, Group, EmptyGroup, Acc, ResultAcc) end; <<"Header Len : ", Data/binary>> -> % Read resulting group from stdout diff --git a/src/couch_set_view/src/couch_set_view_group.erl b/src/couch_set_view/src/couch_set_view_group.erl index c8f299d2e..48f58331c 100644 --- a/src/couch_set_view/src/couch_set_view_group.erl +++ b/src/couch_set_view/src/couch_set_view_group.erl @@ -1472,7 +1472,7 @@ terminate(Reason, #state{group = #set_view_group{sig = Sig} = Group} = State) -> catch couch_db_set:close(?db_set(State3)), couch_set_view_util:shutdown_wait(State3#state.cleaner_pid), couch_set_view_util:shutdown_wait(State3#state.updater_pid), - couch_util:shutdown_sync(State3#state.compactor_pid), + couch_set_view_util:shutdown_wait(State3#state.compactor_pid), couch_util:shutdown_sync(State3#state.compactor_file), couch_util:shutdown_sync(State3#state.replica_group), Group = State#state.group, @@ -2754,7 +2754,7 @@ start_compactor(State, CompactFun) -> stop_compactor(#state{compactor_pid = nil} = State) -> State; stop_compactor(#state{compactor_pid = Pid, compactor_file = CompactFd} = State) -> - couch_util:shutdown_sync(Pid), + couch_set_view_util:shutdown_wait(Pid), couch_util:shutdown_sync(CompactFd), CompactFile = compact_file_name(State), ok = couch_file:delete(?root_dir(State), CompactFile),