From 51945c8d3b98c13cc7c465be02a38f557e371616 Mon Sep 17 00:00:00 2001 From: Mick Gregg Date: Wed, 23 Nov 2016 16:01:56 +0000 Subject: [PATCH] Add IPv6 address support for mongodb > 2.7.4 MongoDB of versions greater than 2.7.4 can use the correct bracketed IPv6 format ([addr]:port). This change adds support for that format. --- replicaset.go | 109 ++++++++++++++++++++++++++++++--------------- replicaset_test.go | 29 ++++++++---- 2 files changed, 93 insertions(+), 45 deletions(-) diff --git a/replicaset.go b/replicaset.go index 2942bfa..93be24c 100644 --- a/replicaset.go +++ b/replicaset.go @@ -38,11 +38,6 @@ const ( // initiateAttemptStatusDelay is the amount of time to sleep between failed // attempts to replSetGetStatus. initiateAttemptStatusDelay = 500 * time.Millisecond - - // rsMembersUnreachableError is the error message returned from mongo - // when it thinks that replicaset members are unreachable. This can - // occur if replSetInitiate is executed shortly after starting up mongo. - rsMembersUnreachableError = "all members and seeds must be reachable to initiate set" ) var logger = loggo.GetLogger("juju.replicaset") @@ -52,6 +47,22 @@ var ( isReady = IsReady ) +// attemptInitiate will attempt to initiate a mongodb replicaset with each of +// the given configs, returning as soon as one config is successful. +func attemptInitiate(monotonicSession *mgo.Session, cfg []Config) error { + var err error + for _, c := range cfg { + logger.Infof("Initiating replicaset with config %#v", c) + err = monotonicSession.Run(bson.D{{"replSetInitiate", c}}, nil) + if err != nil { + logger.Infof("Unsuccessful attempt to initiate replicaset: %v", err) + } else { + return nil + } + } + return err +} + // Initiate sets up a replica set with the given replica set name with the // single given member. It need be called only once for a given mongo replica // set. The tags specified will be added as tags on the member that is created @@ -66,25 +77,40 @@ func Initiate(session *mgo.Session, address, name string, tags map[string]string monotonicSession := session.Clone() defer monotonicSession.Close() monotonicSession.SetMode(mgo.Monotonic, true) - cfg := Config{ - Name: name, - Version: 1, - Members: []Member{{ - Id: 1, - Address: fixIpv6Address(address), - Tags: tags, - }}, - } - logger.Infof("Initiating replicaset with config %#v", cfg) + // We don't know mongod's ability to use a correct IPv6 addr format + // until the server is started, but we need to know before we can start + // it. Try the older, incorrect format, if the correct format fails. + cfg := []Config{ + Config{ + Name: name, + Version: 1, + Members: []Member{{ + Id: 1, + Address: address, + Tags: tags, + }}, + }, + Config{ + Name: name, + Version: 1, + Members: []Member{{ + Id: 1, + Address: formatIPv6AddressWithoutBrackets(address), + Tags: tags, + }}, + }, + } + var err error + // Attempt replSetInitiate, with potential retries. for i := 0; i < maxInitiateAttempts; i++ { monotonicSession.Refresh() - err = monotonicSession.Run(bson.D{{"replSetInitiate", cfg}}, nil) - if err != nil && err.Error() == rsMembersUnreachableError { + err = attemptInitiate(monotonicSession, cfg) + if err != nil { time.Sleep(initiateAttemptDelay) - continue + } else { + break } - break } // Wait for replSetInitiate to complete. Even if err != nil, @@ -99,9 +125,9 @@ func Initiate(session *mgo.Session, address, name string, tags map[string]string } if err != nil || len(status.Members) == 0 { time.Sleep(initiateAttemptStatusDelay) - continue + } else { + break } - break } return err } @@ -167,11 +193,20 @@ func applyReplSetConfig(cmd string, session *mgo.Session, oldconfig, newconfig * logger.Debugf("%s() changing replica set\nfrom %s\n to %s", cmd, fmtConfigForLog(oldconfig), fmtConfigForLog(newconfig)) - // newConfig here is internal and safe to mutate - for index, member := range newconfig.Members { - newconfig.Members[index].Address = fixIpv6Address(member.Address) + buildInfo, err := session.BuildInfo() + if err != nil { + return err + } + // https://jira.mongodb.org/browse/SERVER-5436 + if !buildInfo.VersionAtLeast(2, 7, 4) { + // newConfig here is internal and safe to mutate + for index, member := range newconfig.Members { + newconfig.Members[index].Address = formatIPv6AddressWithoutBrackets(member.Address) + logger.Debugf("replica set using IP addr %s", + newconfig.Members[index].Address) + } } - err := session.Run(bson.D{{"replSetReconfig", newconfig}}, nil) + err = session.Run(bson.D{{"replSetReconfig", newconfig}}, nil) if err == io.EOF { // If the primary changes due to replSetReconfig, then all // current connections are dropped. @@ -315,10 +350,10 @@ func IsMaster(session *mgo.Session) (*IsMasterResults, error) { return nil, err } - results.Address = unFixIpv6Address(results.Address) - results.PrimaryAddress = unFixIpv6Address(results.PrimaryAddress) + results.Address = formatIPv6AddressWithBrackets(results.Address) + results.PrimaryAddress = formatIPv6AddressWithBrackets(results.PrimaryAddress) for index, address := range results.Addresses { - results.Addresses[index] = unFixIpv6Address(address) + results.Addresses[index] = formatIPv6AddressWithBrackets(address) } return results, nil } @@ -367,7 +402,7 @@ func currentConfig(session *mgo.Session) (*Config, error) { members := make([]Member, len(cfg.Members), len(cfg.Members)) for index, member := range cfg.Members { - member.Address = unFixIpv6Address(member.Address) + member.Address = formatIPv6AddressWithBrackets(member.Address) members[index] = member } cfg.Members = members @@ -391,7 +426,7 @@ func CurrentStatus(session *mgo.Session) (*Status, error) { } for index, member := range status.Members { - status.Members[index].Address = unFixIpv6Address(member.Address) + status.Members[index].Address = formatIPv6AddressWithBrackets(member.Address) } return status, nil } @@ -557,17 +592,19 @@ func (state MemberState) String() string { return memberStateStrings[state] } -// Turn normal ipv6 addresses into the "bad format" that mongo requires us -// to use. (Mongo can't parse square brackets in ipv6 addresses.) -func fixIpv6Address(address string) string { +// formatIPv6AddressWithoutBrackets turns correctly formatted IPv6 addresses +// into the "bad format" (without brackets around the address) that mongo <2.7 +// require use. +func formatIPv6AddressWithoutBrackets(address string) string { address = strings.Replace(address, "[", "", 1) address = strings.Replace(address, "]", "", 1) return address } -// Turn "bad format" ipv6 addresses ("::1:port"), that mongo uses, into good -// format addresses ("[::1]:port"). -func unFixIpv6Address(address string) string { +// formatIPv6AddressWithBrackets turns the "bad format" IPv6 addresses +// (":") that mongo <2.7 uses into correctly format addresses +// ("[]:"). +func formatIPv6AddressWithBrackets(address string) string { if strings.Count(address, ":") >= 2 && strings.Count(address, "[") == 0 { lastColon := strings.LastIndex(address, ":") host := address[:lastColon] diff --git a/replicaset_test.go b/replicaset_test.go index 7b56d17..b1e5f9f 100644 --- a/replicaset_test.go +++ b/replicaset_test.go @@ -64,6 +64,19 @@ func (s *MongoSuite) TearDownTest(c *gc.C) { var initialTags = map[string]string{"foo": "bar"} +// assertMembers asserts the known field values of a retrieved and expected +// Members slice are equal. +func assertMembers(c *gc.C, mems []Member, expectedMembers []Member) { + // 2.x and 3.2 seem to use different default values for bool. For + // example, in 3.2 false is false, not nil, and we can't know the + // pointer value to check with DeepEquals. + for i, _ := range mems { + c.Assert(mems[i].Id, gc.Equals, expectedMembers[i].Id) + c.Assert(mems[i].Address, gc.Equals, expectedMembers[i].Address) + c.Assert(mems[i].Tags, jc.DeepEquals, expectedMembers[i].Tags) + } +} + func dialAndTestInitiate(c *gc.C, inst *testing.MgoInstance, addr string) { session := inst.MustDialDirect() defer session.Close() @@ -84,9 +97,8 @@ func dialAndTestInitiate(c *gc.C, inst *testing.MgoInstance, addr string) { mems, err := CurrentMembers(session) c.Assert(err, jc.ErrorIsNil) - c.Assert(mems, jc.DeepEquals, expectedMembers) + assertMembers(c, mems, expectedMembers) - // now add some data so we get a more real-life test loadData(session, c) } @@ -127,7 +139,7 @@ func loadData(session *mgo.Session, c *gc.C) { } for col := 0; col < 10; col++ { - foos := make([]foo, 10000) + foos := make([]interface{}, 10000) for n := range foos { foos[n] = foo{ Name: fmt.Sprintf("name_%d_%d", col, n), @@ -136,7 +148,7 @@ func loadData(session *mgo.Session, c *gc.C) { } } - err := session.DB("testing").C(fmt.Sprintf("data%d", col)).Insert(foos) + err := session.DB("testing").C(fmt.Sprintf("data%d", col)).Insert(foos...) c.Assert(err, jc.ErrorIsNil) } } @@ -221,7 +233,7 @@ func assertAddRemoveSet(c *gc.C, root *testing.MgoInstance, getAddr func(*testin c.Assert(cfg.Version, gc.Equals, 2) mems := cfg.Members - c.Assert(mems, jc.DeepEquals, expectedMembers) + assertMembers(c, mems, expectedMembers) // Now remove the last two Members... attemptLoop(c, strategy, "Remove()", func() error { @@ -235,7 +247,7 @@ func assertAddRemoveSet(c *gc.C, root *testing.MgoInstance, getAddr func(*testin mems, err = CurrentMembers(session) return err }) - c.Assert(mems, jc.DeepEquals, expectedMembers) + assertMembers(c, mems, expectedMembers) // now let's mix it up and set the new members to a mix of the previous // plus the new arbiter @@ -268,7 +280,7 @@ func assertAddRemoveSet(c *gc.C, root *testing.MgoInstance, getAddr func(*testin mems, err = CurrentMembers(session) return err }) - c.Assert(mems, jc.DeepEquals, expectedMembers) + assertMembers(c, mems, expectedMembers) } func (s *MongoSuite) TestIsMaster(c *gc.C) { @@ -588,11 +600,10 @@ type MongoIPV6Suite struct { var _ = gc.Suite(&MongoIPV6Suite{}) func (s *MongoIPV6Suite) TestAddRemoveSetIPv6(c *gc.C) { - c.Skip("Skipping test until mgo issue 22 is fixed") - root := newServer(c) defer root.Destroy() dialAndTestInitiate(c, root, ipv6GetAddr(root)) + assertAddRemoveSet(c, root, ipv6GetAddr) }