diff --git a/state/enableha.go b/state/enableha.go index 77a66ae7153..9cf3555cee5 100644 --- a/state/enableha.go +++ b/state/enableha.go @@ -467,7 +467,7 @@ func (st *State) HAPrimaryMachine() (names.MachineTag, error) { } for _, m := range ms { if m.Id == nodeID { - if machineID, k := m.Tags["juju-machine-id"]; k { + if machineID, ok := m.Tags["juju-machine-id"]; ok { return names.NewMachineTag(machineID), nil } } diff --git a/worker/peergrouper/desired.go b/worker/peergrouper/desired.go index 7079859a298..4e8e417b87e 100644 --- a/worker/peergrouper/desired.go +++ b/worker/peergrouper/desired.go @@ -36,6 +36,7 @@ type peerGroupInfo struct { // Replica-set member statuses sourced from the Mongo session. statuses map[string]replicaset.MemberStatus + toRemove []replicaset.Member extra []replicaset.Member maxMemberId int mongoPort int @@ -97,34 +98,63 @@ func newPeerGroupInfo( } // Iterate over the input members and associate them with a controller if - // possible; add any unassociated members to the "extra" slice. + // possible; add any non-juju unassociated members to the "extra" slice. + // Unassociated members with the juju machine id tag are to be removed. // Link the statuses with the controller node IDs where associated. // Keep track of the highest member ID that we observe. for _, m := range members { + if m.Id > info.maxMemberId { + info.maxMemberId = m.Id + } + + controllerId, ok := m.Tags[jujuNodeKey] + if !ok { + info.extra = append(info.extra, m) + continue + } found := false - if id, ok := m.Tags[jujuNodeKey]; ok { - if controllers[id] != nil { - info.recognised[id] = m - found = true - } + if controllers[controllerId] != nil { + info.recognised[controllerId] = m + found = true + } - // This invariably makes for N^2, but we anticipate small N. - for _, sts := range statuses { - if sts.Id == m.Id { - info.statuses[id] = sts - } + // This invariably makes for N^2, but we anticipate small N. + for _, sts := range statuses { + if sts.Id == m.Id { + info.statuses[controllerId] = sts } } if !found { - info.extra = append(info.extra, m) + info.toRemove = append(info.toRemove, m) } + } - if m.Id > info.maxMemberId { - info.maxMemberId = m.Id + return &info, nil +} + +// isPrimary returns true if the given controller node id is the mongo primary. +func (info *peerGroupInfo) isPrimary(workerControllerId string) (bool, error) { + primaryNodeId := -1 + // Current status of replicaset contains node state. + // Here we determine node id of the primary node. + for _, m := range info.statuses { + if m.State == replicaset.PrimaryState { + primaryNodeId = m.Id + break } } + if primaryNodeId == -1 { + return false, errors.NotFoundf("HA primary machine") + } - return &info, nil + for _, m := range info.recognised { + if m.Id == primaryNodeId { + if primaryControllerId, ok := m.Tags[jujuNodeKey]; ok { + return primaryControllerId == workerControllerId, nil + } + } + } + return false, errors.NotFoundf("HA primary machine") } // getLogMessage generates a nicely formatted log message from the known peer @@ -134,7 +164,7 @@ func (info *peerGroupInfo) getLogMessage() string { fmt.Sprintf("calculating desired peer group\ndesired voting members: (maxId: %d)", info.maxMemberId), } - template := "\n %#v: rs_id=%d, rs_addr=%s" + template := "\n %#v: rs_id=%d, rs_addr=%s, rs_primary=%v" ids := make([]string, 0, len(info.recognised)) for id := range info.recognised { ids = append(ids, id) @@ -142,12 +172,21 @@ func (info *peerGroupInfo) getLogMessage() string { sortAsInts(ids) for _, id := range ids { rm := info.recognised[id] - lines = append(lines, fmt.Sprintf(template, info.controllers[id], rm.Id, rm.Address)) + isPrimary := isPrimaryMember(info, id) + lines = append(lines, fmt.Sprintf(template, info.controllers[id], rm.Id, rm.Address, isPrimary)) } - if len(info.extra) > 0 { - lines = append(lines, "\nother members:") + if len(info.toRemove) > 0 { + lines = append(lines, "\nmembers to remove:") + template := "\n rs_id=%d, rs_addr=%s, tags=%v, vote=%t" + for _, em := range info.toRemove { + vote := em.Votes != nil && *em.Votes > 0 + lines = append(lines, fmt.Sprintf(template, em.Id, em.Address, em.Tags, vote)) + } + } + if len(info.extra) > 0 { + lines = append(lines, "\nother non-juju members:") template := "\n rs_id=%d, rs_addr=%s, tags=%v, vote=%t" for _, em := range info.extra { vote := em.Votes != nil && *em.Votes > 0 @@ -243,19 +282,23 @@ func (p *peerGroupChanges) checkExtraMembers() error { // Given that Juju is in control of the replicaset we don't really just 'accept' that some other node has a vote. // *maybe* we could allow non-voting members that would be used by 3rd parties to provide a warm database backup. // But I think the right answer is probably to downgrade unknown members from voting. + // Note: (wallyworld) notwithstanding the above, each controller runs its own peer grouper worker. The + // mongo primary will remove nodes as needed from the replicaset. There will be a short time where + // Juju managed nodes will not yet be accounted for by the other secondary workers. These are accounted + // for in the 'toRemove' list. for _, member := range p.info.extra { if isVotingMember(&member) { - return fmt.Errorf("non voting member %v found in peer group", member) + return fmt.Errorf("non juju voting member %v found in peer group", member) } } - if len(p.info.extra) > 0 { + if len(p.info.toRemove) > 0 || len(p.info.extra) > 0 { p.desired.isChanged = true } return nil } // sortAsInts converts all the vals to an integer to sort them as numbers instead of strings -// If any of the values are not valid integers, they will be sorted as stirngs, and added to the end +// If any of the values are not valid integers, they will be sorted as strings, and added to the end // the slice will be sorted in place. // (generally this should only be used for strings we expect to represent ints, but we don't want to error if // something isn't an int.) diff --git a/worker/peergrouper/desired_test.go b/worker/peergrouper/desired_test.go index 8331171f11a..46fdc551a04 100644 --- a/worker/peergrouper/desired_test.go +++ b/worker/peergrouper/desired_test.go @@ -128,14 +128,14 @@ func desiredPeerGroupTests(ipVersion TestIPVersion) []desiredPeerGroupTest { }, { about: "extra member with nil Vote", machines: mkMachines("11v", ipVersion), - members: mkMembers("1v 2v", ipVersion), + members: mkMembers("1vT 2v", ipVersion), statuses: mkStatuses("1p 2s", ipVersion), expectVoting: []bool{true}, - expectErr: "non voting member.* found in peer group", + expectErr: "non juju voting member.* found in peer group", }, { about: "extra member with >1 votes", machines: mkMachines("11v", ipVersion), - members: append(mkMembers("1v", ipVersion), replicaset.Member{ + members: append(mkMembers("1vT", ipVersion), replicaset.Member{ Id: 2, Votes: newInt(2), Address: net.JoinHostPort( @@ -145,7 +145,7 @@ func desiredPeerGroupTests(ipVersion TestIPVersion) []desiredPeerGroupTest { }), statuses: mkStatuses("1p 2s", ipVersion), expectVoting: []bool{true}, - expectErr: "non voting member.* found in peer group", + expectErr: "non juju voting member.* found in peer group", }, { about: "one controller has become ready to vote (no change)", machines: mkMachines("11v 12v", ipVersion), @@ -410,6 +410,25 @@ func (s *desiredPeerGroupSuite) doTestDesiredPeerGroup(c *gc.C, ipVersion TestIP } } +func (s *desiredPeerGroupSuite) TestIsPrimary(c *gc.C) { + machines := mkMachines("11v 12v 13v", testIPv4) + trackerMap := make(map[string]*controllerTracker) + for _, m := range machines { + c.Assert(trackerMap[m.Id()], gc.IsNil) + trackerMap[m.Id()] = m + } + members := mkMembers("1v 2v 3v", testIPv4) + statuses := mkStatuses("1p 2s 3s", testIPv4) + info, err := newPeerGroupInfo(trackerMap, statuses, members, mongoPort, network.SpaceInfo{}) + c.Assert(err, jc.ErrorIsNil) + isPrimary, err := info.isPrimary("11") + c.Assert(err, jc.ErrorIsNil) + c.Assert(isPrimary, jc.IsTrue) + isPrimary, err = info.isPrimary("12") + c.Assert(err, jc.ErrorIsNil) + c.Assert(isPrimary, jc.IsFalse) +} + func (s *desiredPeerGroupSuite) TestNewPeerGroupInfoErrWhenNoMembers(c *gc.C) { _, err := newPeerGroupInfo(nil, nil, nil, 666, network.SpaceInfo{}) c.Check(err, gc.ErrorMatches, "current member set is empty") @@ -421,7 +440,7 @@ func (s *desiredPeerGroupSuite) TestCheckExtraMembersReturnsErrorWhenVoterFound( info: &peerGroupInfo{extra: []replicaset.Member{{Votes: &v}}}, } err := peerChanges.checkExtraMembers() - c.Check(err, gc.ErrorMatches, "non voting member .+ found in peer group") + c.Check(err, gc.ErrorMatches, "non juju voting member .+ found in peer group") } func (s *desiredPeerGroupSuite) TestCheckExtraMembersReturnsTrueWhenCheckMade(c *gc.C) { @@ -434,6 +453,15 @@ func (s *desiredPeerGroupSuite) TestCheckExtraMembersReturnsTrueWhenCheckMade(c c.Check(err, jc.ErrorIsNil) } +func (s *desiredPeerGroupSuite) TestCheckToRemoveMembersReturnsTrueWhenCheckMade(c *gc.C) { + peerChanges := peerGroupChanges{ + info: &peerGroupInfo{toRemove: []replicaset.Member{{}}}, + } + err := peerChanges.checkExtraMembers() + c.Check(peerChanges.desired.isChanged, jc.IsTrue) + c.Check(err, jc.ErrorIsNil) +} + func (s *desiredPeerGroupSuite) TestCheckExtraMembersReturnsFalseWhenEmpty(c *gc.C) { peerChanges := peerGroupChanges{ info: &peerGroupInfo{}, diff --git a/worker/peergrouper/manifold.go b/worker/peergrouper/manifold.go index 1b5684accc1..c8147af556b 100644 --- a/worker/peergrouper/manifold.go +++ b/worker/peergrouper/manifold.go @@ -122,6 +122,9 @@ func (config ManifoldConfig) start(context dependency.Context) (worker.Worker, e ControllerAPIPort: stateServingInfo.ControllerAPIPort, SupportsHA: supportsHA, PrometheusRegisterer: config.PrometheusRegisterer, + // On machine models, the controller id is the same as the machine/agent id. + // TODO(wallyworld) - revisit when we add HA to k8s. + ControllerId: agentConfig.Tag().Id, }) if err != nil { _ = stTracker.Done() diff --git a/worker/peergrouper/manifold_test.go b/worker/peergrouper/manifold_test.go index e8062d0a982..9a3fa5d5b39 100644 --- a/worker/peergrouper/manifold_test.go +++ b/worker/peergrouper/manifold_test.go @@ -8,6 +8,7 @@ import ( "github.com/juju/clock/testclock" "github.com/juju/errors" + "github.com/juju/names/v4" "github.com/juju/testing" jc "github.com/juju/testing/checkers" "github.com/juju/worker/v3" @@ -116,6 +117,8 @@ func (s *ManifoldSuite) TestStart(c *gc.C) { c.Assert(args[0], gc.FitsTypeOf, peergrouper.Config{}) config := args[0].(peergrouper.Config) + c.Assert(config.ControllerId(), gc.Equals, "10") + config.ControllerId = nil c.Assert(config, jc.DeepEquals, peergrouper.Config{ State: peergrouper.StateShim{s.State}, MongoSession: peergrouper.MongoSessionShim{s.State.MongoSession()}, @@ -182,6 +185,10 @@ type mockAgentConfig struct { info *controller.StateServingInfo } +func (c *mockAgentConfig) Tag() names.Tag { + return names.NewMachineTag("10") +} + func (c *mockAgentConfig) StateServingInfo() (controller.StateServingInfo, bool) { if c.info != nil { return *c.info, true diff --git a/worker/peergrouper/mock_test.go b/worker/peergrouper/mock_test.go index 26c078c1d9b..b2f6d5b5d34 100644 --- a/worker/peergrouper/mock_test.go +++ b/worker/peergrouper/mock_test.go @@ -166,10 +166,6 @@ func checkInvariants(st *fakeState) error { if m == nil { return fmt.Errorf("voting member with controller id %q has no associated Controller", id) } - - if !m.doc().hasVote { - return fmt.Errorf("controller %q should be marked as having the vote, but does not", id) - } } } } @@ -506,6 +502,21 @@ func (session *fakeMongoSession) CurrentStatus() (*replicaset.Status, error) { return deepCopy(session.status.Get()).(*replicaset.Status), nil } +func (session *fakeMongoSession) currentPrimary() string { + members := session.members.Get().([]replicaset.Member) + status := session.status.Get().(*replicaset.Status) + for _, statusMember := range status.Members { + if statusMember.State == replicaset.PrimaryState { + for _, member := range members { + if member.Id == statusMember.Id { + return member.Tags["juju-machine-id"] + } + } + } + } + return "" +} + // setStatus sets the status of the current members of the session. func (session *fakeMongoSession) setStatus(members []replicaset.MemberStatus) { session.status.Set(deepCopy(&replicaset.Status{ diff --git a/worker/peergrouper/worker.go b/worker/peergrouper/worker.go index ffbe2f1a88b..748a62c233d 100644 --- a/worker/peergrouper/worker.go +++ b/worker/peergrouper/worker.go @@ -150,6 +150,11 @@ type Config struct { APIPort int ControllerAPIPort int + // ControllerId is the id of the controller running this worker. + // It is used in checking if this working is running on the + // primary mongo node. + ControllerId func() string + // Kubernetes controllers do not support HA yet. SupportsHA bool @@ -633,6 +638,17 @@ func (w *pgWorker) updateReplicaSet() (map[string]*replicaset.Member, error) { } } + // Figure out if we are running on the mongo primary. + controllerId := w.config.ControllerId() + isPrimary, err := info.isPrimary(controllerId) + if err != nil && !errors.IsNotFound(err) { + return nil, errors.Annotatef(err, "determining primary status of controller %q", controllerId) + } + logger.Debugf("controller node %q primary: %v", controllerId, isPrimary) + if !isPrimary { + return desired.members, nil + } + // We cannot change the HasVote flag of a controller in state at exactly // the same moment as changing its voting status in the replica set. // @@ -675,11 +691,22 @@ func (w *pgWorker) updateReplicaSet() (map[string]*replicaset.Member, error) { if err := setHasVote(added, true); err != nil { return nil, errors.Annotate(err, "adding new voters") } + // Currently k8s controllers do not support HA, so only update // the replicaset config if HA is enabled and there is a change. + // Only controllers corresponding with the mongo primary should + // update the replicaset, otherwise there will be a race since + // a diff needs to be calculated so the changes can be applied + // one at a time. if w.config.SupportsHA && desired.isChanged { ms := make([]replicaset.Member, 0, len(desired.members)) - for _, m := range desired.members { + ids := make([]string, 0, len(desired.members)) + for id := range desired.members { + ids = append(ids, id) + } + sortAsInts(ids) + for _, id := range ids { + m := desired.members[id] ms = append(ms, *m) } if err := w.config.MongoSession.Set(ms); err != nil { @@ -714,7 +741,7 @@ func (w *pgWorker) updateReplicaSet() (map[string]*replicaset.Member, error) { } for _, removedTracker := range removed { if removedTracker.host.Life() == state.Alive { - logger.Debugf("vote removed from %v but controller is %s", removedTracker.Id(), state.Alive) + logger.Infof("vote removed from %v but controller is %s, should soon die", removedTracker.Id(), state.Alive) } } return desired.members, nil diff --git a/worker/peergrouper/worker_test.go b/worker/peergrouper/worker_test.go index deec62befb9..51c04613dba 100644 --- a/worker/peergrouper/worker_test.go +++ b/worker/peergrouper/worker_test.go @@ -181,7 +181,7 @@ func (s *workerSuite) doTestSetAndUpdateMembers(c *gc.C, ipVersion TestIPVersion // Update the status of the new members // and check that they become voting. c.Logf("\nupdating new member status") - st.session.setStatus(mkStatuses("0s 1p 2s", ipVersion)) + st.session.setStatus(mkStatuses("0p 1s 2s", ipVersion)) mustNext(c, memberWatcher, "new member status") assertMembers(c, memberWatcher.Value(), mkMembers("0v 1v 2v", ipVersion)) @@ -197,8 +197,14 @@ func (s *workerSuite) doTestSetAndUpdateMembers(c *gc.C, ipVersion TestIPVersion // controller. Also set the status of the new controller to healthy. c.Logf("\nremoving vote from controller 10 and adding it to controller 13") st.controller("10").setWantsVote(false) + // Controller 11 or 12 becomes the new primary (it is randomised). mustNext(c, memberWatcher, "waiting for vote switch") - assertMembers(c, memberWatcher.Value(), mkMembers("0 1v 2 3", ipVersion)) + + if st.session.currentPrimary() == "11" { + assertMembers(c, memberWatcher.Value(), mkMembers("0 1v 2 3", ipVersion)) + } else { + assertMembers(c, memberWatcher.Value(), mkMembers("0 1 2v 3", ipVersion)) + } st.controller("13").setWantsVote(true) @@ -232,8 +238,8 @@ func (s *workerSuite) doTestHasVoteMaintainsEvenWhenReplicaSetFails(c *gc.C, ipV // Simulate a state where we have four controllers, // one has gone down, and we're replacing it: - // 0 - hasvote true, wantsvote false, down - // 1 - hasvote true, wantsvote true + // 0 - hasvote true, wantsvote true, primary + // 1 - hasvote true, wantsvote false, down // 2 - hasvote true, wantsvote true // 3 - hasvote false, wantsvote true // @@ -251,14 +257,14 @@ func (s *workerSuite) doTestHasVoteMaintainsEvenWhenReplicaSetFails(c *gc.C, ipV err = st.controller("13").SetHasVote(false) c.Assert(err, jc.ErrorIsNil) - st.controller("10").setWantsVote(false) - st.controller("11").setWantsVote(true) + st.controller("10").setWantsVote(true) + st.controller("11").setWantsVote(false) st.controller("12").setWantsVote(true) st.controller("13").setWantsVote(true) err = st.session.Set(mkMembers("0v 1v 2v 3", ipVersion)) c.Assert(err, jc.ErrorIsNil) - st.session.setStatus(mkStatuses("0H 1p 2s 3s", ipVersion)) + st.session.setStatus(mkStatuses("0p 1H 2s 3s", ipVersion)) // Make the worker fail to set HasVote to false // after changing the replica set membership. @@ -273,7 +279,7 @@ func (s *workerSuite) doTestHasVoteMaintainsEvenWhenReplicaSetFails(c *gc.C, ipV // Wait for the worker to set the initial members. mustNext(c, memberWatcher, "initial members") - assertMembers(c, memberWatcher.Value(), mkMembers("0 1v 2v 3v", ipVersion)) + assertMembers(c, memberWatcher.Value(), mkMembers("0v 1 2v 3v", ipVersion)) // The worker should encounter an error setting the // has-vote status to false and exit. @@ -309,7 +315,7 @@ loop: correct := true for i := 10; i < 14; i++ { hasVote := st.controller(fmt.Sprint(i)).HasVote() - expectHasVote := i != 10 + expectHasVote := i != 11 if hasVote != expectHasVote { correct = false } @@ -572,6 +578,7 @@ func (s *workerSuite) TestControllersPublishedWithControllerAPIPort(c *gc.C) { State: st, MongoSession: st.session, APIHostPortsSetter: nopAPIHostPortsSetter{}, + ControllerId: func() string { return "10" }, MongoPort: mongoPort, APIPort: apiPort, ControllerAPIPort: controllerAPIPort, @@ -1034,11 +1041,7 @@ func (s *workerSuite) TestRemovePrimaryValidSecondaries(c *gc.C) { // the other secondary. We first unset the invariant checker, because we are intentionally going to an even number // of voters, but this is not the normal condition st.setCheck(nil) - if primaryMemberIndex == 1 { - st.controller("11").setWantsVote(false) - } else { - st.controller("12").setWantsVote(false) - } + st.controller(st.session.currentPrimary()).setWantsVote(false) // member watcher must fire first mustNext(c, memberWatcher, "observing member step down") assertMembers(c, memberWatcher.Value(), mkMembers("0 1v 2v", testIPv4)) @@ -1050,7 +1053,6 @@ func (s *workerSuite) TestRemovePrimaryValidSecondaries(c *gc.C) { s.clock.Advance(2 * pollInterval) testStatus = mustNextStatus(c, statusWatcher, "stepping down new primary") if primaryMemberIndex == 1 { - // 11 was the primary, now 12 is c.Check(testStatus.Members[1].State, gc.Equals, replicaset.MemberState(replicaset.SecondaryState)) c.Check(testStatus.Members[2].State, gc.Equals, replicaset.MemberState(replicaset.PrimaryState)) } else { @@ -1170,7 +1172,7 @@ func (s *workerSuite) newWorkerWithConfig( func (s *workerSuite) newWorker( c *gc.C, st State, - session MongoSession, + session *fakeMongoSession, apiHostPortsSetter APIHostPortsSetter, supportsHA bool, ) worker.Worker { @@ -1178,6 +1180,7 @@ func (s *workerSuite) newWorker( State: st, MongoSession: session, APIHostPortsSetter: apiHostPortsSetter, + ControllerId: session.currentPrimary, MongoPort: mongoPort, APIPort: apiPort, Hub: s.hub,