Skip to content

Commit

Permalink
Add IPv6 address support for mongodb > 2.7.4
Browse files Browse the repository at this point in the history
MongoDB of versions greater than 2.7.4 can use the correct bracketed
IPv6 format ([addr]:port). This change adds support for that format.
  • Loading branch information
Mick Gregg committed Nov 24, 2016
1 parent fb7294c commit 51945c8
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 45 deletions.
109 changes: 73 additions & 36 deletions replicaset.go
Expand Up @@ -38,11 +38,6 @@ const (
// initiateAttemptStatusDelay is the amount of time to sleep between failed
// attempts to replSetGetStatus.
initiateAttemptStatusDelay = 500 * time.Millisecond

// rsMembersUnreachableError is the error message returned from mongo
// when it thinks that replicaset members are unreachable. This can
// occur if replSetInitiate is executed shortly after starting up mongo.
rsMembersUnreachableError = "all members and seeds must be reachable to initiate set"
)

var logger = loggo.GetLogger("juju.replicaset")
Expand All @@ -52,6 +47,22 @@ var (
isReady = IsReady
)

// attemptInitiate will attempt to initiate a mongodb replicaset with each of
// the given configs, returning as soon as one config is successful.
func attemptInitiate(monotonicSession *mgo.Session, cfg []Config) error {
var err error
for _, c := range cfg {
logger.Infof("Initiating replicaset with config %#v", c)
err = monotonicSession.Run(bson.D{{"replSetInitiate", c}}, nil)
if err != nil {
logger.Infof("Unsuccessful attempt to initiate replicaset: %v", err)
} else {
return nil
}
}
return err
}

// Initiate sets up a replica set with the given replica set name with the
// single given member. It need be called only once for a given mongo replica
// set. The tags specified will be added as tags on the member that is created
Expand All @@ -66,25 +77,40 @@ func Initiate(session *mgo.Session, address, name string, tags map[string]string
monotonicSession := session.Clone()
defer monotonicSession.Close()
monotonicSession.SetMode(mgo.Monotonic, true)
cfg := Config{
Name: name,
Version: 1,
Members: []Member{{
Id: 1,
Address: fixIpv6Address(address),
Tags: tags,
}},
}
logger.Infof("Initiating replicaset with config %#v", cfg)
// We don't know mongod's ability to use a correct IPv6 addr format
// until the server is started, but we need to know before we can start
// it. Try the older, incorrect format, if the correct format fails.
cfg := []Config{
Config{
Name: name,
Version: 1,
Members: []Member{{
Id: 1,
Address: address,
Tags: tags,
}},
},
Config{
Name: name,
Version: 1,
Members: []Member{{
Id: 1,
Address: formatIPv6AddressWithoutBrackets(address),
Tags: tags,
}},
},
}

var err error
// Attempt replSetInitiate, with potential retries.
for i := 0; i < maxInitiateAttempts; i++ {
monotonicSession.Refresh()
err = monotonicSession.Run(bson.D{{"replSetInitiate", cfg}}, nil)
if err != nil && err.Error() == rsMembersUnreachableError {
err = attemptInitiate(monotonicSession, cfg)
if err != nil {
time.Sleep(initiateAttemptDelay)
continue
} else {
break
}
break
}

// Wait for replSetInitiate to complete. Even if err != nil,
Expand All @@ -99,9 +125,9 @@ func Initiate(session *mgo.Session, address, name string, tags map[string]string
}
if err != nil || len(status.Members) == 0 {
time.Sleep(initiateAttemptStatusDelay)
continue
} else {
break
}
break
}
return err
}
Expand Down Expand Up @@ -167,11 +193,20 @@ func applyReplSetConfig(cmd string, session *mgo.Session, oldconfig, newconfig *
logger.Debugf("%s() changing replica set\nfrom %s\n to %s",
cmd, fmtConfigForLog(oldconfig), fmtConfigForLog(newconfig))

// newConfig here is internal and safe to mutate
for index, member := range newconfig.Members {
newconfig.Members[index].Address = fixIpv6Address(member.Address)
buildInfo, err := session.BuildInfo()
if err != nil {
return err
}
// https://jira.mongodb.org/browse/SERVER-5436
if !buildInfo.VersionAtLeast(2, 7, 4) {
// newConfig here is internal and safe to mutate
for index, member := range newconfig.Members {
newconfig.Members[index].Address = formatIPv6AddressWithoutBrackets(member.Address)
logger.Debugf("replica set using IP addr %s",
newconfig.Members[index].Address)
}
}
err := session.Run(bson.D{{"replSetReconfig", newconfig}}, nil)
err = session.Run(bson.D{{"replSetReconfig", newconfig}}, nil)
if err == io.EOF {
// If the primary changes due to replSetReconfig, then all
// current connections are dropped.
Expand Down Expand Up @@ -315,10 +350,10 @@ func IsMaster(session *mgo.Session) (*IsMasterResults, error) {
return nil, err
}

results.Address = unFixIpv6Address(results.Address)
results.PrimaryAddress = unFixIpv6Address(results.PrimaryAddress)
results.Address = formatIPv6AddressWithBrackets(results.Address)
results.PrimaryAddress = formatIPv6AddressWithBrackets(results.PrimaryAddress)
for index, address := range results.Addresses {
results.Addresses[index] = unFixIpv6Address(address)
results.Addresses[index] = formatIPv6AddressWithBrackets(address)
}
return results, nil
}
Expand Down Expand Up @@ -367,7 +402,7 @@ func currentConfig(session *mgo.Session) (*Config, error) {

members := make([]Member, len(cfg.Members), len(cfg.Members))
for index, member := range cfg.Members {
member.Address = unFixIpv6Address(member.Address)
member.Address = formatIPv6AddressWithBrackets(member.Address)
members[index] = member
}
cfg.Members = members
Expand All @@ -391,7 +426,7 @@ func CurrentStatus(session *mgo.Session) (*Status, error) {
}

for index, member := range status.Members {
status.Members[index].Address = unFixIpv6Address(member.Address)
status.Members[index].Address = formatIPv6AddressWithBrackets(member.Address)
}
return status, nil
}
Expand Down Expand Up @@ -557,17 +592,19 @@ func (state MemberState) String() string {
return memberStateStrings[state]
}

// Turn normal ipv6 addresses into the "bad format" that mongo requires us
// to use. (Mongo can't parse square brackets in ipv6 addresses.)
func fixIpv6Address(address string) string {
// formatIPv6AddressWithoutBrackets turns correctly formatted IPv6 addresses
// into the "bad format" (without brackets around the address) that mongo <2.7
// require use.
func formatIPv6AddressWithoutBrackets(address string) string {
address = strings.Replace(address, "[", "", 1)
address = strings.Replace(address, "]", "", 1)
return address
}

// Turn "bad format" ipv6 addresses ("::1:port"), that mongo uses, into good
// format addresses ("[::1]:port").
func unFixIpv6Address(address string) string {
// formatIPv6AddressWithBrackets turns the "bad format" IPv6 addresses
// ("<addr>:<port>") that mongo <2.7 uses into correctly format addresses
// ("[<addr>]:<port>").
func formatIPv6AddressWithBrackets(address string) string {
if strings.Count(address, ":") >= 2 && strings.Count(address, "[") == 0 {
lastColon := strings.LastIndex(address, ":")
host := address[:lastColon]
Expand Down
29 changes: 20 additions & 9 deletions replicaset_test.go
Expand Up @@ -64,6 +64,19 @@ func (s *MongoSuite) TearDownTest(c *gc.C) {

var initialTags = map[string]string{"foo": "bar"}

// assertMembers asserts the known field values of a retrieved and expected
// Members slice are equal.
func assertMembers(c *gc.C, mems []Member, expectedMembers []Member) {
// 2.x and 3.2 seem to use different default values for bool. For
// example, in 3.2 false is false, not nil, and we can't know the
// pointer value to check with DeepEquals.
for i, _ := range mems {
c.Assert(mems[i].Id, gc.Equals, expectedMembers[i].Id)
c.Assert(mems[i].Address, gc.Equals, expectedMembers[i].Address)
c.Assert(mems[i].Tags, jc.DeepEquals, expectedMembers[i].Tags)
}
}

func dialAndTestInitiate(c *gc.C, inst *testing.MgoInstance, addr string) {
session := inst.MustDialDirect()
defer session.Close()
Expand All @@ -84,9 +97,8 @@ func dialAndTestInitiate(c *gc.C, inst *testing.MgoInstance, addr string) {

mems, err := CurrentMembers(session)
c.Assert(err, jc.ErrorIsNil)
c.Assert(mems, jc.DeepEquals, expectedMembers)
assertMembers(c, mems, expectedMembers)

// now add some data so we get a more real-life test
loadData(session, c)
}

Expand Down Expand Up @@ -127,7 +139,7 @@ func loadData(session *mgo.Session, c *gc.C) {
}

for col := 0; col < 10; col++ {
foos := make([]foo, 10000)
foos := make([]interface{}, 10000)
for n := range foos {
foos[n] = foo{
Name: fmt.Sprintf("name_%d_%d", col, n),
Expand All @@ -136,7 +148,7 @@ func loadData(session *mgo.Session, c *gc.C) {
}
}

err := session.DB("testing").C(fmt.Sprintf("data%d", col)).Insert(foos)
err := session.DB("testing").C(fmt.Sprintf("data%d", col)).Insert(foos...)
c.Assert(err, jc.ErrorIsNil)
}
}
Expand Down Expand Up @@ -221,7 +233,7 @@ func assertAddRemoveSet(c *gc.C, root *testing.MgoInstance, getAddr func(*testin
c.Assert(cfg.Version, gc.Equals, 2)

mems := cfg.Members
c.Assert(mems, jc.DeepEquals, expectedMembers)
assertMembers(c, mems, expectedMembers)

// Now remove the last two Members...
attemptLoop(c, strategy, "Remove()", func() error {
Expand All @@ -235,7 +247,7 @@ func assertAddRemoveSet(c *gc.C, root *testing.MgoInstance, getAddr func(*testin
mems, err = CurrentMembers(session)
return err
})
c.Assert(mems, jc.DeepEquals, expectedMembers)
assertMembers(c, mems, expectedMembers)

// now let's mix it up and set the new members to a mix of the previous
// plus the new arbiter
Expand Down Expand Up @@ -268,7 +280,7 @@ func assertAddRemoveSet(c *gc.C, root *testing.MgoInstance, getAddr func(*testin
mems, err = CurrentMembers(session)
return err
})
c.Assert(mems, jc.DeepEquals, expectedMembers)
assertMembers(c, mems, expectedMembers)
}

func (s *MongoSuite) TestIsMaster(c *gc.C) {
Expand Down Expand Up @@ -588,11 +600,10 @@ type MongoIPV6Suite struct {
var _ = gc.Suite(&MongoIPV6Suite{})

func (s *MongoIPV6Suite) TestAddRemoveSetIPv6(c *gc.C) {
c.Skip("Skipping test until mgo issue 22 is fixed")

root := newServer(c)
defer root.Destroy()
dialAndTestInitiate(c, root, ipv6GetAddr(root))

assertAddRemoveSet(c, root, ipv6GetAddr)
}

Expand Down

0 comments on commit 51945c8

Please sign in to comment.