Skip to content

Commit

Permalink
More tweaks. Removed handoff tuple and now have a modindex and separa…
Browse files Browse the repository at this point in the history
…te node field in the handoff_status. send_outbound kills existing handoff (if different) before starting up.
  • Loading branch information
massung committed Dec 22, 2011
1 parent 422c163 commit 742347e
Showing 1 changed file with 51 additions and 51 deletions.
102 changes: 51 additions & 51 deletions src/riak_core_handoff_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@

-type mod() :: atom().
-type index() :: integer().
-type node_() :: atom().

-record(handoff_status,
{ handoff :: {mod(),index(),node_()},
{ modindex :: {mod(),index()},
node :: atom(),
direction :: inbound | outbound,
transport_pid :: pid(),
timestamp :: tuple(),
Expand Down Expand Up @@ -102,8 +102,7 @@ handle_call({get_exclusions, Module}, _From, State=#state{excl=Excl}) ->
Reply = [I || {M, I} <- ordsets:to_list(Excl), M =:= Module],
{reply, {ok, Reply}, State};
handle_call({add_outbound,Mod,Idx,Node,Pid},_From,State=#state{handoffs=HS}) ->
ShouldStart=enqueue_handoff(Mod,Idx,Node,Pid,HS),
case ShouldStart andalso send_handoff(Mod,Idx,Node,Pid) of
case send_handoff(Mod,Idx,Node,Pid,HS) of
{ok,Handoff=#handoff_status{transport_pid=Sender}} ->
{reply,{ok,Sender},State#state{handoffs=HS ++ [Handoff]}};
Error ->
Expand All @@ -117,7 +116,8 @@ handle_call({add_inbound,SSLOpts},_From,State=#state{handoffs=HS}) ->
{reply,Error,State}
end;
handle_call(handoff_status,_From,State=#state{handoffs=HS}) ->
Handoffs=[{H,D,active,S} || #handoff_status{ handoff=H,direction=D,status=S } <- HS],
Handoffs=[{{M,I,N},D,active,S} ||
#handoff_status{modindex={M,I},node=N,direction=D,status=S} <- HS],
{reply, {ok, Handoffs}, State};
handle_call({set_concurrency,Limit},_From,State=#state{handoffs=HS}) ->
application:set_env(riak_core,handoff_concurrency,Limit),
Expand Down Expand Up @@ -146,25 +146,28 @@ handle_cast({add_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->

handle_info({'DOWN',_Ref,process,Pid,Reason},State=#state{handoffs=HS}) ->
case lists:keytake(Pid,#handoff_status.transport_pid,HS) of
{value,H=#handoff_status{handoff={Mod,Index,_},direction=Dir},NewHS} ->
{value,
#handoff_status{modindex={M,I},direction=Dir,vnode_pid=Vnode},
NewHS
} ->
WarnVnode =
case Reason of
%% if the reason the handoff process died was anything other
%% than 'normal' we should log the reason why as an error
normal ->
false;
max_concurrency ->
lager:info("An ~w handoff of partition ~w ~w was terminated for reason: ~w~n", [Dir,Mod,Index,Reason]),
lager:info("An ~w handoff of partition ~w ~w was terminated for reason: ~w~n", [Dir,M,I,Reason]),
true;
_ ->
lager:error("An ~w handoff of partition ~w ~w was terminated for reason: ~w~n", [Dir,Mod,Index,Reason]),
lager:error("An ~w handoff of partition ~w ~w was terminated for reason: ~w~n", [Dir,M,I,Reason]),
true
end,

%% if we have the vnode process pid, tell the vnode why the
%% handoff stopped so it can clean up its state
case WarnVnode andalso H#handoff_status.vnode_pid of
Vnode when is_pid(Vnode) ->
case WarnVnode andalso is_pid(Vnode) of
true ->
riak_core_vnode:handoff_error(Vnode,'DOWN',Reason);
_ ->
ok
Expand Down Expand Up @@ -201,52 +204,48 @@ handoff_concurrency_limit_reached () ->
ActiveSenders=proplists:get_value(active,Senders),
get_concurrency_limit() =< (ActiveReceivers + ActiveSenders).

%% checks to see if the handoff is already off to the races, or changed
enqueue_handoff(Mod,Idx,Node,VnodePid,Handoffs) ->
lists:foldl(
fun (#handoff_status{direction=inbound},Flag) ->
Flag;
(H=#handoff_status{handoff={M,I,N},vnode_pid=Pid},Flag) ->
%% a different module or index, add it
case (M==Mod) and (I==Idx) of
false ->
Flag;
true ->
%% or if the same target and vnode, keep it
case (N==Node) and (Pid==VnodePid) of
true ->
false;
false ->
%% kill the old one
erlang:exit(H#handoff_status.transport_pid,
resubmit_handoff_change),
true
end
end
end,
true,
Handoffs).

%% spawn a sender process
send_handoff (Module,Index,TargetNode,VnodePid) ->
send_handoff (Mod,Idx,Node,Vnode,HS) ->
case handoff_concurrency_limit_reached() of
true ->
{error, max_concurrency};
false ->
{ok,Pid}=riak_core_handoff_sender_sup:start_sender(TargetNode,
Module,
Index,
VnodePid),
erlang:monitor(process,Pid),
ShouldHandoff=
case lists:keyfind({Mod,Idx},#handoff_status.modindex,HS) of
false ->
true;
Handoff=#handoff_status{node=Node,vnode_pid=Vnode} ->
{false,Handoff};
#handoff_status{transport_pid=Sender} ->
%% found a running handoff with a different vnode
%% source or a different arget ndoe, kill the current
%% one and the new one will start up
erlang:exit(Sender,resubmit_handoff_change),
true
end,

%% successfully started up a new sender handoff
{ok, #handoff_status{ transport_pid=Pid,
direction=outbound,
timestamp=now(),
handoff={Module,Index,TargetNode},
vnode_pid=VnodePid
}
}
case ShouldHandoff of
true ->
%% start the sender process
{ok,Pid}=riak_core_handoff_sender_sup:start_sender(Node,
Mod,
Idx,
Vnode),
erlang:monitor(process,Pid),

%% successfully started up a new sender handoff
{ok, #handoff_status{ transport_pid=Pid,
direction=outbound,
timestamp=now(),
node=Node,
modindex={Mod,Idx},
vnode_pid=Vnode
}
};
{false,CurrentHandoff} ->
%% handoff already going, just return it
{ok, CurrentHandoff}
end
end.

%% spawn a receiver process
Expand All @@ -262,7 +261,8 @@ receive_handoff (SSLOpts) ->
{ok, #handoff_status{ transport_pid=Pid,
direction=inbound,
timestamp=now(),
handoff={undefined,undefined,undefined}
modindex={undefined,undefined},
node=undefined
}
}
end.
Expand Down

0 comments on commit 742347e

Please sign in to comment.