Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
use placement manager to look up shard set id
Browse files Browse the repository at this point in the history
  • Loading branch information
Chao Wang committed Aug 29, 2018
1 parent fe3f4a0 commit 0c04ddc
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 17 deletions.
18 changes: 12 additions & 6 deletions aggregator/election_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ type electionManagerMetrics struct {
verifyCampaignDisabled tally.Counter
verifyPendingChangeStale tally.Counter
verifyPlacementErrors tally.Counter
verifyInstanceErrors tally.Counter
verifyLeaderNotInPlacement tally.Counter
followerResign tally.Counter
resignTimeout tally.Counter
Expand Down Expand Up @@ -219,6 +220,7 @@ func newElectionManagerMetrics(scope tally.Scope) electionManagerMetrics {
verifyCampaignDisabled: verifyScope.Counter("campaign-disabled"),
verifyPendingChangeStale: verifyScope.Counter("pending-change-stale"),
verifyPlacementErrors: verifyScope.Counter("placement-errors"),
verifyInstanceErrors: verifyScope.Counter("instance-errors"),
verifyLeaderNotInPlacement: verifyScope.Counter("leader-not-in-placement"),
followerResign: resignScope.Counter("follower-resign"),
resignTimeout: resignScope.Counter("timeout"),
Expand Down Expand Up @@ -265,7 +267,6 @@ type electionManager struct {
doneCh chan struct{}
campaigning int32
campaignStateWatchable watch.Watchable
shardSetID uint32
electionKey string
electionStateWatchable watch.Watchable
nextGoalStateID int64
Expand Down Expand Up @@ -334,7 +335,6 @@ func (mgr *electionManager) Open(shardSetID uint32) error {
if mgr.state != electionManagerNotOpen {
return errElectionManagerAlreadyOpenOrClosed
}
mgr.shardSetID = shardSetID
mgr.electionKey = fmt.Sprintf(mgr.electionKeyFmt, shardSetID)
_, stateChangeWatch, err := mgr.goalStateWatchable.Watch()
if err != nil {
Expand Down Expand Up @@ -516,16 +516,22 @@ func (mgr *electionManager) verifyPendingFollower(watch watch.Watch) {
mgr.logError("error getting placement", err)
return err
}
instance, exist := p.Instance(leader)
leaderInstance, exist := p.Instance(leader)
if !exist {
mgr.metrics.verifyLeaderNotInPlacement.Inc(1)
err := fmt.Errorf("received invalid leader value: [%s], which is not available in placement", leader)
mgr.logError("invalid leader value", err)
return err
}
ssID := instance.ShardSetID()
if ssID != mgr.shardSetID {
err := fmt.Errorf("received invalid leader value: [%s] which owns shardSet %v, while this aggregator owns shardSet %v", leader, ssID, mgr.shardSetID)
instance, err := mgr.placementManager.Instance()
if err != nil {
mgr.metrics.verifyInstanceErrors.Inc(1)
mgr.logError("error getting instance", err)
return err
}
if leaderInstance.ShardSetID() != instance.ShardSetID() {
err := fmt.Errorf("received invalid leader value: [%s] which owns shardSet %v, while this aggregator owns shardSet %v",
leader, leaderInstance.ShardSetID(), instance.ShardSetID())
mgr.logError("invalid leader value", err)
return err
}
Expand Down
47 changes: 36 additions & 11 deletions aggregator/election_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,13 @@ func TestElectionManagerResignSuccess(t *testing.T) {
opts := testElectionManagerOptions(t, ctrl).
SetCampaignOptions(campaignOpts).
SetLeaderService(leaderService)
i := placement.NewInstance().SetID("myself")
opts.PlacementManager().(*MockPlacementManager).
EXPECT().
Instance().
Return(i, nil)
p := placement.NewPlacement().SetInstances([]placement.Instance{
placement.NewInstance().SetID("myself").SetShardSetID(testShardSetID),
placement.NewInstance().SetID("someone else").SetShardSetID(testShardSetID),
i, placement.NewInstance().SetID("someone else"),
})
opts.PlacementManager().(*MockPlacementManager).
EXPECT().
Expand Down Expand Up @@ -377,9 +381,14 @@ func TestElectionManagerCampaignLoop(t *testing.T) {
opts := testElectionManagerOptions(t, ctrl).
SetCampaignOptions(campaignOpts).
SetLeaderService(leaderService)
i := placement.NewInstance().SetID("myself")
opts.PlacementManager().(*MockPlacementManager).
EXPECT().
Instance().
Return(i, nil).
AnyTimes()
p := placement.NewPlacement().SetInstances([]placement.Instance{
placement.NewInstance().SetID("myself").SetShardSetID(testShardSetID),
placement.NewInstance().SetID("someone else").SetShardSetID(testShardSetID),
i, placement.NewInstance().SetID("someone else"),
})
opts.PlacementManager().(*MockPlacementManager).
EXPECT().
Expand Down Expand Up @@ -493,14 +502,20 @@ func TestElectionManagerVerifyLeaderDelayWithValidLeader(t *testing.T) {
opts := testElectionManagerOptions(t, ctrl).
SetCampaignOptions(campaignOpts).
SetLeaderService(leaderService)
i := placement.NewInstance().SetID("myself")
opts.PlacementManager().(*MockPlacementManager).
EXPECT().
Instance().
Return(i, nil).
AnyTimes()
p := placement.NewPlacement().SetInstances([]placement.Instance{
placement.NewInstance().SetID("myself"),
placement.NewInstance().SetID("someone else"),
i, placement.NewInstance().SetID("someone else"),
})
opts.PlacementManager().(*MockPlacementManager).
EXPECT().
Placement().
Return(nil, p, nil)
Return(nil, p, nil).
AnyTimes()
mgr := NewElectionManager(opts).(*electionManager)
retryOpts := retry.NewOptions().
SetInitialBackoff(10 * time.Millisecond).
Expand Down Expand Up @@ -611,9 +626,14 @@ func TestElectionManagerVerifyLeaderDelayWithLeaderOwningDifferentShardSet(t *te
opts := testElectionManagerOptions(t, ctrl).
SetCampaignOptions(campaignOpts).
SetLeaderService(leaderService)
i := placement.NewInstance().SetID("myself")
opts.PlacementManager().(*MockPlacementManager).
EXPECT().
Instance().
Return(i, nil).
AnyTimes()
p := placement.NewPlacement().SetInstances([]placement.Instance{
placement.NewInstance().SetID("myself"),
placement.NewInstance().SetID("someone else").SetShardSetID(100),
i, placement.NewInstance().SetID("someone else").SetShardSetID(100),
})
opts.PlacementManager().(*MockPlacementManager).
EXPECT().
Expand Down Expand Up @@ -673,9 +693,14 @@ func TestElectionManagerVerifyWithLeaderErrors(t *testing.T) {
opts := testElectionManagerOptions(t, ctrl).
SetCampaignOptions(campaignOpts).
SetLeaderService(leaderService)
i := placement.NewInstance().SetID("myself")
opts.PlacementManager().(*MockPlacementManager).
EXPECT().
Instance().
Return(i, nil).
AnyTimes()
p := placement.NewPlacement().SetInstances([]placement.Instance{
placement.NewInstance().SetID("myself"),
placement.NewInstance().SetID("someone else"),
i, placement.NewInstance().SetID("someone else"),
})
opts.PlacementManager().(*MockPlacementManager).
EXPECT().
Expand Down

0 comments on commit 0c04ddc

Please sign in to comment.