Skip to content

Commit

Permalink
fix fast elect send old vote issue
Browse files Browse the repository at this point in the history
  • Loading branch information
xinmingyao committed Jul 17, 2012
1 parent 183debc commit 7c700c3
Showing 1 changed file with 37 additions and 19 deletions.
56 changes: 37 additions & 19 deletions src/zabe_fast_elect.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
recv_votes::ets,
outof_election::ets,
wait_outof_timer::reference(),
vote_timer::reference(),
logical_clock::integer()
}).


-define(WAIT_TIMEOUT,500).
-define(PROPOSED,proposed).
-define(VOTE_TIMER,2800).




Expand Down Expand Up @@ -79,15 +81,15 @@ start_link(Election) ->
%% @end
%%--------------------------------------------------------------------
init([#election{logical_clock=LogicalClock,parent=ManagerName,last_zxid=LastZxid,ensemble=Ensemble,quorum=Quorum,last_commit_zxid=LastCommitZxid}]) ->
lager:debug("elect start"),
lager:info("elect start"),
% {Epoch,_TxnId}=LastZxid,
LogicalClock=1,
V=#vote{from=node(),leader=node(),zxid=LastZxid,
last_commit_zxid=LastCommitZxid,
epoch=LogicalClock,state=?LOOKING},

send_notifications(V,Ensemble,ManagerName),
{ok,TimeRef}=timer:apply_interval(3000,?MODULE,send_notifications,[V,Ensemble,ManagerName]),
gen_fsm:send_event_after(?VOTE_TIMER,send_notifications),
Recv=ets:new(list_to_atom(atom_to_list(ManagerName)++"_1"),[{keypos,2}]),
OutOf=ets:new(list_to_atom(atom_to_list(ManagerName)++"_2"),[{keypos,2}]),
put(?PROPOSED,V),
Expand All @@ -97,7 +99,6 @@ init([#election{logical_clock=LogicalClock,parent=ManagerName,last_zxid=LastZxid
quorum=Quorum,
recv_votes=Recv,
outof_election=OutOf,
vote_timer=TimeRef,
logical_clock=LogicalClock
}}.

Expand Down Expand Up @@ -134,17 +135,25 @@ looking(V=#vote{},State)->
{next_state,leading,State};
_->
{next_state,following,State}
end
end;
_:_ ->
{stop,"error"}
end
;

looking(send_notifications,State=#state{ensemble=Ensemble,manager_name=ManagerName})->
V=get(?PROPOSED),
send_notifications(V,Ensemble,ManagerName),
gen_fsm:send_event_after(?VOTE_TIMER,send_notifications),
{next_state, looking, State};
looking(_Event, State) ->
{next_state, looking, State}.


looking1(Vote=#vote{from=_From,leader=Leader,state=PeerState,epoch=PeerEpoch}, State=#state{manager_name=ManagerName,ensemble=Ensemble,vote_timer=VoteTimer}) ->
looking1(Vote=#vote{from=_From,leader=Leader,state=PeerState,epoch=PeerEpoch}, State=#state{manager_name=ManagerName,ensemble=Ensemble}) ->

P1=get(?PROPOSED),

Epoch=P1#vote.epoch,
case
PeerState of
Expand All @@ -171,12 +180,14 @@ looking1(Vote=#vote{from=_From,leader=Leader,state=PeerState,epoch=PeerEpoch}, S
case
total_order_predicate(Vote#vote.leader,Vote#vote.zxid,P1#vote.leader,P1#vote.zxid) of
true->

V1=P1#vote{leader=Vote#vote.leader,zxid=Vote#vote.zxid,epoch=PeerEpoch},

send_notifications(V1,Ensemble,ManagerName),

put(?PROPOSED,V1);
false->

ok

end
Expand All @@ -188,7 +199,7 @@ looking1(Vote=#vote{from=_From,leader=Leader,state=PeerState,epoch=PeerEpoch}, S
ReceiveAll=ets:info(RecvVote,size) == ordsets:size(Ensemble),
HaveQuorm=is_have_quorm(State#state.quorum,Vote,RecvVote),
if ReceiveAll ->
timer:cancel(VoteTimer),

notify_manager(ManagerName,ets:tab2list(RecvVote)),
throw(finish);
HaveQuorm->
Expand All @@ -211,7 +222,7 @@ looking1(Vote=#vote{from=_From,leader=Leader,state=PeerState,epoch=PeerEpoch}, S
CheckLeader=check_leader(R2,Vote#vote.leader,node()),
if Vote#vote.state ==?LEADING orelse (CheckLeader andalso HaveQuorm)->
put(?PROPOSED,Vote#vote{from=node()}),
timer:cancel(VoteTimer),

notify_manager(ManagerName,ets:tab2list(State#state.recv_votes)),
throw(finish);
true ->
Expand All @@ -227,7 +238,7 @@ looking1(Vote=#vote{from=_From,leader=Leader,state=PeerState,epoch=PeerEpoch}, S
% V1=P1#vote{epoch=PeerEpoch},
% put(?PROPOSED,V1),
put(?PROPOSED,Vote#vote{from=node()}),
timer:cancel(VoteTimer),

notify_manager(ManagerName,ets:tab2list(State#state.recv_votes)),
ets:delete_all_objects(State#state.recv_votes),%??
throw(finish)
Expand All @@ -240,8 +251,8 @@ looking1(Vote=#vote{from=_From,leader=Leader,state=PeerState,epoch=PeerEpoch}, S
end
.

wait_outof_election({timeout,_,wait_timeout},State=#state{manager_name=M,vote_timer=VoteTimer})->
timer:cancel(VoteTimer),
wait_outof_election({timeout,_,wait_timeout},State=#state{manager_name=M})->

notify_manager(M,ets:tab2list(State#state.recv_votes)),
V=get(?PROPOSED),
case V#vote.leader of
Expand All @@ -263,7 +274,13 @@ wait_outof_election(Vote=#vote{leader=_Leader},State) ->
%TimeRef=gen_fsm:start_timer(?WAIT_TIMEOUT,wait_timeout),
% {next_state,wait_outof_election, State}
% end.
.
;
wait_outof_election(send_notifications,State=#state{ensemble=Ensemble,manager_name=ManagerName})->
V=get(?PROPOSED),
send_notifications(V,Ensemble,ManagerName),
gen_fsm:send_event_after(?VOTE_TIMER,send_notifications),
{next_state, wait_outof_election, State}.

leading(#vote{from=From,leader=_Leader,state=?LOOKING},State)->
V=get(?PROPOSED),
Msg=#msg{cmd=?VOTE_CMD,value=V#vote{state=?LEADING}},
Expand All @@ -278,13 +295,14 @@ leading({re_elect,NewZxid,NewLastCommitZxid},State=#state{recv_votes=Recv,outof_
epoch=L2,state=?LOOKING},

send_notifications(V,Ensemble,ManagerName),
{ok,TimeRef}=timer:apply_interval(3000,?MODULE,send_notifications,[V,Ensemble,ManagerName]),
gen_fsm:send_event_after(?VOTE_TIMER,send_notifications),

ets:delete_all_objects(Recv),
ets:delete_all_objects(Outof),
put(?PROPOSED,V),
{next_state, looking, State#state{
logical_clock=LogicalClock,
vote_timer=TimeRef}
logical_clock=LogicalClock
}
};

leading(_,State) ->
Expand All @@ -305,13 +323,13 @@ following({re_elect,NewZxid,NewLastCommitZxid},State=#state{recv_votes=Recv,outo
epoch=L2,state=?LOOKING},

send_notifications(V,Ensemble,ManagerName),
{ok,TimeRef}=timer:apply_interval(3000,?MODULE,send_notifications,[V,Ensemble,ManagerName]),
gen_fsm:send_event_after(?VOTE_TIMER,send_notifications),
ets:delete_all_objects(Recv),
ets:delete_all_objects(Outof),
put(?PROPOSED,V),
{next_state, looking, State#state{
logical_clock=LogicalClock,
vote_timer=TimeRef}
logical_clock=LogicalClock
}
};
following(_,State) ->
%flush msg
Expand Down Expand Up @@ -439,7 +457,7 @@ code_change(_OldVsn, StateName, State, _Extra) ->
%%%===================================================================

send_notifications(V=#vote{},Ensemble,ManagerName)->
lager:debug("broadcast send ~p ~p",[V,ManagerName]),
lager:debug("broadcast send ~p ~p ~p ",[V,ManagerName, self()]),
Msg=#msg{cmd=?VOTE_CMD,value=V},
lists:map(fun(N)->
catch erlang:send({ManagerName,N},Msg) end ,Ensemble).
Expand Down

0 comments on commit 7c700c3

Please sign in to comment.