Permalink
Browse files

COUCHDB-1444 Broken couch_view_group

It appears that exit signals were propogating between couch_view_updater
processes through couch_os_process and friends. This changes the use of
the 'EXIT' signal to a '$gen_cast' message.
  • Loading branch information...
1 parent bb83977 commit 53490d915038c04ed47f0c8a00b771583c483b0e @davisp davisp committed Aug 10, 2012
Showing with 56 additions and 41 deletions.
  1. +20 −6 src/couchdb/couch_view_compactor.erl
  2. +31 −32 src/couchdb/couch_view_group.erl
  3. +5 −3 src/couchdb/couch_view_updater.erl
@@ -117,13 +117,27 @@ maybe_retry_compact(#db{name = DbName} = Db, GroupId, NewGroup) ->
couch_db:close(Db);
update ->
{ok, Db2} = couch_db:reopen(Db),
- {_, Ref} = erlang:spawn_monitor(fun() ->
- couch_view_updater:update(nil, NewGroup, Db2)
+ Self = self(),
+ {MPid, MRef} = erlang:spawn_monitor(fun() ->
+ couch_view_updater:update(Self, NewGroup, Db2)
end),
- receive
- {'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
- maybe_retry_compact(Db2, GroupId, NewGroup2)
- end
+ NewGroup1 = get_new_group(MPid, MRef),
+ erlang:demonitor(MRef, [flush]),
+ maybe_retry_compact(Db2, GroupId, NewGroup1)
+ end.
+
+get_new_group(Pid, Ref) ->
+ receive
+ {'DOWN', Ref, _, _, {new_group, NewGroup}} ->
+ NewGroup;
+ {'DOWN', Ref, _, _, Reason} ->
+ erlang:error({view_compaction_error, Reason});
+ {'$gen_cast', {Pid, new_group, NewGroup}} ->
+ NewGroup;
+ {'$gen_cast', {partial_update, _, _}} ->
+ get_new_group(Pid, Ref);
+ Else ->
+ erlang:error({view_compaction_error, Else})
end.
%% @spec compact_view(View, EmptyView, Retry, Acc) -> {CompactView, NewAcc}
@@ -271,6 +271,37 @@ handle_cast({partial_update, Pid, #group{sig=GroupSig}=NewGroup}, #group_state{u
handle_cast({partial_update, _, _}, State) ->
%% message from an old (probably pre-compaction) updater; ignore
{noreply, State};
+handle_cast({FromPid, new_group, #group{sig=GroupSig} = Group},
+ #group_state{db_name=DbName,
+ group=#group{sig=GroupSig},
+ updater_pid=UpPid,
+ ref_counter=RefCounter,
+ waiting_list=WaitList,
+ shutdown=Shutdown,
+ waiting_commit=WaitingCommit}=State) when UpPid == FromPid ->
+ if not WaitingCommit ->
+ erlang:send_after(1000, self(), delayed_commit);
+ true -> ok
+ end,
+ case reply_with_group(Group, WaitList, [], RefCounter) of
+ [] ->
+ case Shutdown of
+ true ->
+ {stop, normal, State};
+ false ->
+ {noreply, State#group_state{waiting_commit=true, waiting_list=[],
+ group=Group, updater_pid=nil}}
+ end;
+ StillWaiting ->
+ % we still have some waiters, reopen the database and reupdate the index
+ Owner = self(),
+ Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group, DbName) end),
+ {noreply, State#group_state{waiting_commit=true,
+ waiting_list=StillWaiting, updater_pid=Pid}}
+ end;
+handle_cast({_, new_group, _}, State) ->
+ %% message from an old (probably pre-compaction) updater; ignore
+ {noreply, State};
handle_cast(ddoc_updated, State) ->
#group_state{
db_name = DbName,
@@ -317,38 +348,6 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
{noreply, State#group_state{waiting_commit=true}}
end;
-handle_info({'EXIT', FromPid, {new_group, #group{sig=GroupSig} = Group}},
- #group_state{db_name=DbName,
- group=#group{sig=GroupSig},
- updater_pid=UpPid,
- ref_counter=RefCounter,
- waiting_list=WaitList,
- shutdown=Shutdown,
- waiting_commit=WaitingCommit}=State) when UpPid == FromPid ->
- if not WaitingCommit ->
- erlang:send_after(1000, self(), delayed_commit);
- true -> ok
- end,
- case reply_with_group(Group, WaitList, [], RefCounter) of
- [] ->
- case Shutdown of
- true ->
- {stop, normal, State};
- false ->
- {noreply, State#group_state{waiting_commit=true, waiting_list=[],
- group=Group, updater_pid=nil}}
- end;
- StillWaiting ->
- % we still have some waiters, reopen the database and reupdate the index
- Owner = self(),
- Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group, DbName) end),
- {noreply, State#group_state{waiting_commit=true,
- waiting_list=StillWaiting, updater_pid=Pid}}
- end;
-handle_info({'EXIT', _, {new_group, _}}, State) ->
- %% message from an old (probably pre-compaction) updater; ignore
- {noreply, State};
-
handle_info({'EXIT', UpPid, reset},
#group_state{init_args=InitArgs, updater_pid=UpPid} = State) ->
case prepare_group(InitArgs, true) of
@@ -92,9 +92,11 @@ update(Owner, Group, #db{name = DbName} = Db) ->
end,
ok, []),
couch_work_queue:close(MapQueue),
- receive {new_group, NewGroup} ->
- exit({new_group,
- NewGroup#group{current_seq=couch_db:get_update_seq(Db)}})
+ receive {new_group, NewGroup0} ->
+ NewGroup = NewGroup0#group{
+ current_seq=couch_db:get_update_seq(Db)
+ },
+ gen_server:cast(Owner, {self(), new_group, NewGroup})
end.

0 comments on commit 53490d9

Please sign in to comment.