Skip to content

Commit

Permalink
Merge pull request #13535 from wallyworld/fix-ha-removenode
Browse files Browse the repository at this point in the history
#13535

Each controller runs its own peer grouper worker. For mongo 4.0, we could construct the desired replicaset and each worker would splat the same copy onto mongo. Multiple workers doing this wasn't an issue.

For mongo 4.4, we need to apply the changes one at a time, and this is done by calculating a diff. This becomes an issue if multiple workers try and do that at the same time. This PR ensures that only the peer grouper running on the primary changes the replicaset. Small test changes were needed to accommodate this, since we need to change which machine the worker thinks is the primary.

## QA steps

./main.sh controller

Also
```
juju bootstrap 
juju enable-ha
```
juju show-controller
check rs.config()

remove a secondary
`juju remove-machine 1`
check rs.config()

`juju enable-ha`
juju show-controller
check rs.config()

remove the primary
`juju remove-machine 0`
juju show-controller
check rs.config()

`juju enable-ha`
juju show-controller
check rs.config()
  • Loading branch information
jujubot committed Dec 1, 2021
2 parents af34439 + 7cd47f6 commit 8a154b7
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 50 deletions.
2 changes: 1 addition & 1 deletion state/enableha.go
Expand Up @@ -467,7 +467,7 @@ func (st *State) HAPrimaryMachine() (names.MachineTag, error) {
}
for _, m := range ms {
if m.Id == nodeID {
if machineID, k := m.Tags["juju-machine-id"]; k {
if machineID, ok := m.Tags["juju-machine-id"]; ok {
return names.NewMachineTag(machineID), nil
}
}
Expand Down
87 changes: 65 additions & 22 deletions worker/peergrouper/desired.go
Expand Up @@ -36,6 +36,7 @@ type peerGroupInfo struct {
// Replica-set member statuses sourced from the Mongo session.
statuses map[string]replicaset.MemberStatus

toRemove []replicaset.Member
extra []replicaset.Member
maxMemberId int
mongoPort int
Expand Down Expand Up @@ -97,34 +98,63 @@ func newPeerGroupInfo(
}

// Iterate over the input members and associate them with a controller if
// possible; add any unassociated members to the "extra" slice.
// possible; add any non-juju unassociated members to the "extra" slice.
// Unassociated members with the juju machine id tag are to be removed.
// Link the statuses with the controller node IDs where associated.
// Keep track of the highest member ID that we observe.
for _, m := range members {
if m.Id > info.maxMemberId {
info.maxMemberId = m.Id
}

controllerId, ok := m.Tags[jujuNodeKey]
if !ok {
info.extra = append(info.extra, m)
continue
}
found := false
if id, ok := m.Tags[jujuNodeKey]; ok {
if controllers[id] != nil {
info.recognised[id] = m
found = true
}
if controllers[controllerId] != nil {
info.recognised[controllerId] = m
found = true
}

// This invariably makes for N^2, but we anticipate small N.
for _, sts := range statuses {
if sts.Id == m.Id {
info.statuses[id] = sts
}
// This invariably makes for N^2, but we anticipate small N.
for _, sts := range statuses {
if sts.Id == m.Id {
info.statuses[controllerId] = sts
}
}
if !found {
info.extra = append(info.extra, m)
info.toRemove = append(info.toRemove, m)
}
}

if m.Id > info.maxMemberId {
info.maxMemberId = m.Id
return &info, nil
}

// isPrimary returns true if the given controller node id is the mongo primary.
func (info *peerGroupInfo) isPrimary(workerControllerId string) (bool, error) {
primaryNodeId := -1
// Current status of replicaset contains node state.
// Here we determine node id of the primary node.
for _, m := range info.statuses {
if m.State == replicaset.PrimaryState {
primaryNodeId = m.Id
break
}
}
if primaryNodeId == -1 {
return false, errors.NotFoundf("HA primary machine")
}

return &info, nil
for _, m := range info.recognised {
if m.Id == primaryNodeId {
if primaryControllerId, ok := m.Tags[jujuNodeKey]; ok {
return primaryControllerId == workerControllerId, nil
}
}
}
return false, errors.NotFoundf("HA primary machine")
}

// getLogMessage generates a nicely formatted log message from the known peer
Expand All @@ -134,20 +164,29 @@ func (info *peerGroupInfo) getLogMessage() string {
fmt.Sprintf("calculating desired peer group\ndesired voting members: (maxId: %d)", info.maxMemberId),
}

template := "\n %#v: rs_id=%d, rs_addr=%s"
template := "\n %#v: rs_id=%d, rs_addr=%s, rs_primary=%v"
ids := make([]string, 0, len(info.recognised))
for id := range info.recognised {
ids = append(ids, id)
}
sortAsInts(ids)
for _, id := range ids {
rm := info.recognised[id]
lines = append(lines, fmt.Sprintf(template, info.controllers[id], rm.Id, rm.Address))
isPrimary := isPrimaryMember(info, id)
lines = append(lines, fmt.Sprintf(template, info.controllers[id], rm.Id, rm.Address, isPrimary))
}

if len(info.extra) > 0 {
lines = append(lines, "\nother members:")
if len(info.toRemove) > 0 {
lines = append(lines, "\nmembers to remove:")
template := "\n rs_id=%d, rs_addr=%s, tags=%v, vote=%t"
for _, em := range info.toRemove {
vote := em.Votes != nil && *em.Votes > 0
lines = append(lines, fmt.Sprintf(template, em.Id, em.Address, em.Tags, vote))
}
}

if len(info.extra) > 0 {
lines = append(lines, "\nother non-juju members:")
template := "\n rs_id=%d, rs_addr=%s, tags=%v, vote=%t"
for _, em := range info.extra {
vote := em.Votes != nil && *em.Votes > 0
Expand Down Expand Up @@ -243,19 +282,23 @@ func (p *peerGroupChanges) checkExtraMembers() error {
// Given that Juju is in control of the replicaset we don't really just 'accept' that some other node has a vote.
// *maybe* we could allow non-voting members that would be used by 3rd parties to provide a warm database backup.
// But I think the right answer is probably to downgrade unknown members from voting.
// Note: (wallyworld) notwithstanding the above, each controller runs its own peer grouper worker. The
// mongo primary will remove nodes as needed from the replicaset. There will be a short time where
// Juju managed nodes will not yet be accounted for by the other secondary workers. These are accounted
// for in the 'toRemove' list.
for _, member := range p.info.extra {
if isVotingMember(&member) {
return fmt.Errorf("non voting member %v found in peer group", member)
return fmt.Errorf("non juju voting member %v found in peer group", member)
}
}
if len(p.info.extra) > 0 {
if len(p.info.toRemove) > 0 || len(p.info.extra) > 0 {
p.desired.isChanged = true
}
return nil
}

// sortAsInts converts all the vals to an integer to sort them as numbers instead of strings
// If any of the values are not valid integers, they will be sorted as stirngs, and added to the end
// If any of the values are not valid integers, they will be sorted as strings, and added to the end
// the slice will be sorted in place.
// (generally this should only be used for strings we expect to represent ints, but we don't want to error if
// something isn't an int.)
Expand Down
38 changes: 33 additions & 5 deletions worker/peergrouper/desired_test.go
Expand Up @@ -128,14 +128,14 @@ func desiredPeerGroupTests(ipVersion TestIPVersion) []desiredPeerGroupTest {
}, {
about: "extra member with nil Vote",
machines: mkMachines("11v", ipVersion),
members: mkMembers("1v 2v", ipVersion),
members: mkMembers("1vT 2v", ipVersion),
statuses: mkStatuses("1p 2s", ipVersion),
expectVoting: []bool{true},
expectErr: "non voting member.* found in peer group",
expectErr: "non juju voting member.* found in peer group",
}, {
about: "extra member with >1 votes",
machines: mkMachines("11v", ipVersion),
members: append(mkMembers("1v", ipVersion), replicaset.Member{
members: append(mkMembers("1vT", ipVersion), replicaset.Member{
Id: 2,
Votes: newInt(2),
Address: net.JoinHostPort(
Expand All @@ -145,7 +145,7 @@ func desiredPeerGroupTests(ipVersion TestIPVersion) []desiredPeerGroupTest {
}),
statuses: mkStatuses("1p 2s", ipVersion),
expectVoting: []bool{true},
expectErr: "non voting member.* found in peer group",
expectErr: "non juju voting member.* found in peer group",
}, {
about: "one controller has become ready to vote (no change)",
machines: mkMachines("11v 12v", ipVersion),
Expand Down Expand Up @@ -410,6 +410,25 @@ func (s *desiredPeerGroupSuite) doTestDesiredPeerGroup(c *gc.C, ipVersion TestIP
}
}

func (s *desiredPeerGroupSuite) TestIsPrimary(c *gc.C) {
machines := mkMachines("11v 12v 13v", testIPv4)
trackerMap := make(map[string]*controllerTracker)
for _, m := range machines {
c.Assert(trackerMap[m.Id()], gc.IsNil)
trackerMap[m.Id()] = m
}
members := mkMembers("1v 2v 3v", testIPv4)
statuses := mkStatuses("1p 2s 3s", testIPv4)
info, err := newPeerGroupInfo(trackerMap, statuses, members, mongoPort, network.SpaceInfo{})
c.Assert(err, jc.ErrorIsNil)
isPrimary, err := info.isPrimary("11")
c.Assert(err, jc.ErrorIsNil)
c.Assert(isPrimary, jc.IsTrue)
isPrimary, err = info.isPrimary("12")
c.Assert(err, jc.ErrorIsNil)
c.Assert(isPrimary, jc.IsFalse)
}

func (s *desiredPeerGroupSuite) TestNewPeerGroupInfoErrWhenNoMembers(c *gc.C) {
_, err := newPeerGroupInfo(nil, nil, nil, 666, network.SpaceInfo{})
c.Check(err, gc.ErrorMatches, "current member set is empty")
Expand All @@ -421,7 +440,7 @@ func (s *desiredPeerGroupSuite) TestCheckExtraMembersReturnsErrorWhenVoterFound(
info: &peerGroupInfo{extra: []replicaset.Member{{Votes: &v}}},
}
err := peerChanges.checkExtraMembers()
c.Check(err, gc.ErrorMatches, "non voting member .+ found in peer group")
c.Check(err, gc.ErrorMatches, "non juju voting member .+ found in peer group")
}

func (s *desiredPeerGroupSuite) TestCheckExtraMembersReturnsTrueWhenCheckMade(c *gc.C) {
Expand All @@ -434,6 +453,15 @@ func (s *desiredPeerGroupSuite) TestCheckExtraMembersReturnsTrueWhenCheckMade(c
c.Check(err, jc.ErrorIsNil)
}

func (s *desiredPeerGroupSuite) TestCheckToRemoveMembersReturnsTrueWhenCheckMade(c *gc.C) {
peerChanges := peerGroupChanges{
info: &peerGroupInfo{toRemove: []replicaset.Member{{}}},
}
err := peerChanges.checkExtraMembers()
c.Check(peerChanges.desired.isChanged, jc.IsTrue)
c.Check(err, jc.ErrorIsNil)
}

func (s *desiredPeerGroupSuite) TestCheckExtraMembersReturnsFalseWhenEmpty(c *gc.C) {
peerChanges := peerGroupChanges{
info: &peerGroupInfo{},
Expand Down
3 changes: 3 additions & 0 deletions worker/peergrouper/manifold.go
Expand Up @@ -122,6 +122,9 @@ func (config ManifoldConfig) start(context dependency.Context) (worker.Worker, e
ControllerAPIPort: stateServingInfo.ControllerAPIPort,
SupportsHA: supportsHA,
PrometheusRegisterer: config.PrometheusRegisterer,
// On machine models, the controller id is the same as the machine/agent id.
// TODO(wallyworld) - revisit when we add HA to k8s.
ControllerId: agentConfig.Tag().Id,
})
if err != nil {
_ = stTracker.Done()
Expand Down
7 changes: 7 additions & 0 deletions worker/peergrouper/manifold_test.go
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/juju/clock/testclock"
"github.com/juju/errors"
"github.com/juju/names/v4"
"github.com/juju/testing"
jc "github.com/juju/testing/checkers"
"github.com/juju/worker/v3"
Expand Down Expand Up @@ -116,6 +117,8 @@ func (s *ManifoldSuite) TestStart(c *gc.C) {
c.Assert(args[0], gc.FitsTypeOf, peergrouper.Config{})
config := args[0].(peergrouper.Config)

c.Assert(config.ControllerId(), gc.Equals, "10")
config.ControllerId = nil
c.Assert(config, jc.DeepEquals, peergrouper.Config{
State: peergrouper.StateShim{s.State},
MongoSession: peergrouper.MongoSessionShim{s.State.MongoSession()},
Expand Down Expand Up @@ -182,6 +185,10 @@ type mockAgentConfig struct {
info *controller.StateServingInfo
}

func (c *mockAgentConfig) Tag() names.Tag {
return names.NewMachineTag("10")
}

func (c *mockAgentConfig) StateServingInfo() (controller.StateServingInfo, bool) {
if c.info != nil {
return *c.info, true
Expand Down
19 changes: 15 additions & 4 deletions worker/peergrouper/mock_test.go
Expand Up @@ -166,10 +166,6 @@ func checkInvariants(st *fakeState) error {
if m == nil {
return fmt.Errorf("voting member with controller id %q has no associated Controller", id)
}

if !m.doc().hasVote {
return fmt.Errorf("controller %q should be marked as having the vote, but does not", id)
}
}
}
}
Expand Down Expand Up @@ -506,6 +502,21 @@ func (session *fakeMongoSession) CurrentStatus() (*replicaset.Status, error) {
return deepCopy(session.status.Get()).(*replicaset.Status), nil
}

func (session *fakeMongoSession) currentPrimary() string {
members := session.members.Get().([]replicaset.Member)
status := session.status.Get().(*replicaset.Status)
for _, statusMember := range status.Members {
if statusMember.State == replicaset.PrimaryState {
for _, member := range members {
if member.Id == statusMember.Id {
return member.Tags["juju-machine-id"]
}
}
}
}
return ""
}

// setStatus sets the status of the current members of the session.
func (session *fakeMongoSession) setStatus(members []replicaset.MemberStatus) {
session.status.Set(deepCopy(&replicaset.Status{
Expand Down
31 changes: 29 additions & 2 deletions worker/peergrouper/worker.go
Expand Up @@ -150,6 +150,11 @@ type Config struct {
APIPort int
ControllerAPIPort int

// ControllerId is the id of the controller running this worker.
// It is used in checking if this working is running on the
// primary mongo node.
ControllerId func() string

// Kubernetes controllers do not support HA yet.
SupportsHA bool

Expand Down Expand Up @@ -633,6 +638,17 @@ func (w *pgWorker) updateReplicaSet() (map[string]*replicaset.Member, error) {
}
}

// Figure out if we are running on the mongo primary.
controllerId := w.config.ControllerId()
isPrimary, err := info.isPrimary(controllerId)
if err != nil && !errors.IsNotFound(err) {
return nil, errors.Annotatef(err, "determining primary status of controller %q", controllerId)
}
logger.Debugf("controller node %q primary: %v", controllerId, isPrimary)
if !isPrimary {
return desired.members, nil
}

// We cannot change the HasVote flag of a controller in state at exactly
// the same moment as changing its voting status in the replica set.
//
Expand Down Expand Up @@ -675,11 +691,22 @@ func (w *pgWorker) updateReplicaSet() (map[string]*replicaset.Member, error) {
if err := setHasVote(added, true); err != nil {
return nil, errors.Annotate(err, "adding new voters")
}

// Currently k8s controllers do not support HA, so only update
// the replicaset config if HA is enabled and there is a change.
// Only controllers corresponding with the mongo primary should
// update the replicaset, otherwise there will be a race since
// a diff needs to be calculated so the changes can be applied
// one at a time.
if w.config.SupportsHA && desired.isChanged {
ms := make([]replicaset.Member, 0, len(desired.members))
for _, m := range desired.members {
ids := make([]string, 0, len(desired.members))
for id := range desired.members {
ids = append(ids, id)
}
sortAsInts(ids)
for _, id := range ids {
m := desired.members[id]
ms = append(ms, *m)
}
if err := w.config.MongoSession.Set(ms); err != nil {
Expand Down Expand Up @@ -714,7 +741,7 @@ func (w *pgWorker) updateReplicaSet() (map[string]*replicaset.Member, error) {
}
for _, removedTracker := range removed {
if removedTracker.host.Life() == state.Alive {
logger.Debugf("vote removed from %v but controller is %s", removedTracker.Id(), state.Alive)
logger.Infof("vote removed from %v but controller is %s, should soon die", removedTracker.Id(), state.Alive)
}
}
return desired.members, nil
Expand Down

0 comments on commit 8a154b7

Please sign in to comment.