Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Resign from ongoing campaign when election manager is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Dec 4, 2017
1 parent 89cc40a commit 5b2fbdc
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
7 changes: 4 additions & 3 deletions aggregator/election_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,9 +710,10 @@ func (mgr *electionManager) campaignLoop(campaignStateWatch watch.Watch) {
}); err == nil {
atomic.StoreInt32(&mgr.campaigning, 1)
} else {
// If we get here, either the manager is closed or the campaign is disabled.
// If the manager is closed, we return immediately. Otherwise we wait for a
// change in the campaign enabled status before continuing campaigning.
// If we get here, the campaign failed and either the manager is closed or
// the campaign is disabled. If the manager is closed, we return immediately.
// Otherwise we wait for a change in the campaign enabled status before continuing
// campaigning.
select {
case <-mgr.doneCh:
return
Expand Down
28 changes: 23 additions & 5 deletions aggregator/election_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestElectionManagerResignSuccess(t *testing.T) {
defer cancel()

var (
statusCh = make(chan campaign.Status)
statusCh = make(chan campaign.Status, 2)
mgr *electionManager
)

Expand All @@ -259,7 +259,6 @@ func TestElectionManagerResignSuccess(t *testing.T) {
},
resignFn: func(electionID string) error {
statusCh <- campaign.Status{State: campaign.Follower}
close(statusCh)
return nil
},
}
Expand Down Expand Up @@ -297,6 +296,7 @@ func TestElectionManagerCloseSuccess(t *testing.T) {

func TestElectionManagerCampaignLoop(t *testing.T) {
iter := 0
var resigned int32
leaderValue := "myself"
campaignCh := make(chan campaign.Status)
nextCampaignCh := make(chan campaign.Status)
Expand All @@ -315,8 +315,7 @@ func TestElectionManagerCampaignLoop(t *testing.T) {
return "someone else", nil
},
resignFn: func(electionID string) error {
campaignCh <- campaign.NewStatus(campaign.Follower)
close(campaignCh)
atomic.StoreInt32(&resigned, 1)
return nil
},
}
Expand Down Expand Up @@ -400,6 +399,13 @@ func TestElectionManagerCampaignLoop(t *testing.T) {
nextCampaignCh <- campaign.NewStatus(campaign.Leader)

require.NoError(t, mgr.Close())

for {
if atomic.LoadInt32(&resigned) == 1 {
break
}
time.Sleep(10 * time.Millisecond)
}
}

func TestElectionManagerVerifyLeaderDelay(t *testing.T) {
Expand Down Expand Up @@ -870,9 +876,21 @@ func testElectionManagerOptions(t *testing.T) ElectionManagerOptions {
}), nil
},
}
leaderService := &mockLeaderService{
campaignFn: func(
electionID string,
opts services.CampaignOptions,
) (<-chan campaign.Status, error) {
return make(chan campaign.Status), nil
},
resignFn: func(electionID string) error {
return nil
},
}
return NewElectionManagerOptions().
SetCampaignOptions(campaignOpts).
SetPlacementManager(placementManager)
SetPlacementManager(placementManager).
SetLeaderService(leaderService)
}

type enabledRes struct {
Expand Down

0 comments on commit 5b2fbdc

Please sign in to comment.