diff --git a/cmd/jujud/agent/machine.go b/cmd/jujud/agent/machine.go index 7be6667f93..8a8705c888 100644 --- a/cmd/jujud/agent/machine.go +++ b/cmd/jujud/agent/machine.go @@ -116,21 +116,20 @@ var ( ) // Variable to override in tests, default is true -var EnableJournaling = true +var ProductionMongoWriteConcern = true func init() { stateWorkerDialOpts = mongo.DefaultDialOpts() stateWorkerDialOpts.PostDial = func(session *mgo.Session) error { - safe := mgo.Safe{ - // Wait for group commit if journaling is enabled, - // which is always true in production. - J: EnableJournaling, - } - _, err := replicaset.CurrentConfig(session) - if err == nil { - // set mongo to write-majority (writes only returned after - // replicated to a majority of replica-set members). - safe.WMode = "majority" + safe := mgo.Safe{} + if ProductionMongoWriteConcern { + safe.J = true + _, err := replicaset.CurrentConfig(session) + if err == nil { + // set mongo to write-majority (writes only returned after + // replicated to a majority of replica-set members). + safe.WMode = "majority" + } } session.SetSafe(&safe) return nil @@ -1224,6 +1223,8 @@ func (a *MachineAgent) limitLoginsDuringUpgrade(req params.LoginRequest) error { } } +var stateWorkerServingConfigErr = errors.New("state worker started with no state serving info") + // ensureMongoServer ensures that mongo is installed and running, // and ready for opening a state connection. func (a *MachineAgent) ensureMongoServer(agentConfig agent.Config) (err error) { @@ -1239,55 +1240,47 @@ func (a *MachineAgent) ensureMongoServer(agentConfig agent.Config) (err error) { } }() - servingInfo, ok := agentConfig.StateServingInfo() + // Many of the steps here, such as adding the state server to the + // admin DB and initiating the replicaset, are once-only actions, + // required when upgrading from a pre-HA-capable + // environment. These calls won't do anything if the thing they + // need to set up has already been done. + + if _, err := a.ensureMongoAdminUser(agentConfig); err != nil { + return errors.Trace(err) + } + + if err := a.ensureMongoSharedSecret(agentConfig); err != nil { + return errors.Trace(err) + } + agentConfig = a.CurrentConfig() // ensureMongoSharedSecret may have updated the config + + mongoInfo, ok := agentConfig.MongoInfo() if !ok { - return fmt.Errorf("state worker was started with no state serving info") - } - - // When upgrading from a pre-HA-capable environment, - // we must add machine-0 to the admin database and - // initiate its replicaset. - // - // TODO(axw) remove this when we no longer need - // to upgrade from pre-HA-capable environments. - var shouldInitiateMongoServer bool - var addrs []network.Address - if isPreHAVersion(a.previousAgentVersion) { - _, err := a.ensureMongoAdminUser(agentConfig) - if err != nil { - return err - } - if servingInfo.SharedSecret == "" { - servingInfo.SharedSecret, err = mongo.GenerateSharedSecret() - if err != nil { - return err - } - if err = a.ChangeConfig(func(config agent.ConfigSetter) error { - config.SetStateServingInfo(servingInfo) - return nil - }); err != nil { - return err - } - agentConfig = a.CurrentConfig() - } - // Note: we set Direct=true in the mongo options because it's - // possible that we've previously upgraded the mongo server's - // configuration to form a replicaset, but failed to initiate it. - st, m, err := openState(agentConfig, mongo.DialOpts{Direct: true}) + return errors.New("unable to retrieve mongo info to check replicaset") + } + + haveReplicaset, err := isReplicasetConfigured(mongoInfo) + if err != nil { + return errors.Annotate(err, "error while checking replicaset") + } + + // If the replicaset is to be initialised the machine addresses + // need to be retrieved *before* MongoDB is restarted with the + // --replset option (in EnsureMongoServer). Once MongoDB is + // started with --replset it won't respond to queries until the + // replicaset is initiated. + var machineAddrs []network.Address + if !haveReplicaset { + logger.Infof("replicaset not yet configured") + + machineAddrs, err = getMachineAddresses(agentConfig) if err != nil { - return err + return errors.Trace(err) } - ssi := cmdutil.ParamsStateServingInfoToStateStateServingInfo(servingInfo) - if err := st.SetStateServingInfo(ssi); err != nil { - st.Close() - return fmt.Errorf("cannot set state serving info: %v", err) - } - st.Close() - addrs = m.Addresses() - shouldInitiateMongoServer = true } - // ensureMongoServer installs/upgrades the init config as necessary. + // EnsureMongoServer installs/upgrades the init config as necessary. ensureServerParams, err := cmdutil.NewEnsureServerParams(agentConfig) if err != nil { return err @@ -1295,45 +1288,30 @@ func (a *MachineAgent) ensureMongoServer(agentConfig agent.Config) (err error) { if err := cmdutil.EnsureMongoServer(ensureServerParams); err != nil { return err } - if !shouldInitiateMongoServer { - return nil - } - // Initiate the replicaset for upgraded environments. - // - // TODO(axw) remove this when we no longer need - // to upgrade from pre-HA-capable environments. - stateInfo, ok := agentConfig.MongoInfo() - if !ok { - return fmt.Errorf("state worker was started with no state serving info") - } - dialInfo, err := mongo.DialInfo(stateInfo.Info, mongo.DefaultDialOpts()) - if err != nil { - return err - } - peerAddr := mongo.SelectPeerAddress(addrs) - if peerAddr == "" { - return fmt.Errorf("no appropriate peer address found in %q", addrs) - } - if err := maybeInitiateMongoServer(peergrouper.InitiateMongoParams{ - DialInfo: dialInfo, - MemberHostPort: net.JoinHostPort(peerAddr, fmt.Sprint(servingInfo.StatePort)), - // TODO(dfc) InitiateMongoParams should take a Tag - User: stateInfo.Tag.String(), - Password: stateInfo.Password, - }); err != nil && err != peergrouper.ErrReplicaSetAlreadyInitiated { - return err + // Create the replicaset it hasn't been set up yet. + if !haveReplicaset { + servingInfo, ok := agentConfig.StateServingInfo() + if !ok { + return stateWorkerServingConfigErr + } + if err := initiateReplicaSet(mongoInfo, servingInfo.StatePort, machineAddrs); err != nil { + return err + } } + return nil } +// ensureMongoAdminUser ensures that the machine's mongo user is in +// the admin DB. func (a *MachineAgent) ensureMongoAdminUser(agentConfig agent.Config) (added bool, err error) { - stateInfo, ok1 := agentConfig.MongoInfo() + mongoInfo, ok1 := agentConfig.MongoInfo() servingInfo, ok2 := agentConfig.StateServingInfo() if !ok1 || !ok2 { - return false, fmt.Errorf("no state serving info configuration") + return false, stateWorkerServingConfigErr } - dialInfo, err := mongo.DialInfo(stateInfo.Info, mongo.DefaultDialOpts()) + dialInfo, err := mongo.DialInfo(mongoInfo.Info, mongo.DefaultDialOpts()) if err != nil { return false, err } @@ -1346,13 +1324,118 @@ func (a *MachineAgent) ensureMongoAdminUser(agentConfig agent.Config) (added boo Namespace: agentConfig.Value(agent.Namespace), DataDir: agentConfig.DataDir(), Port: servingInfo.StatePort, - User: stateInfo.Tag.String(), - Password: stateInfo.Password, + User: mongoInfo.Tag.String(), + Password: mongoInfo.Password, }) } -func isPreHAVersion(v version.Number) bool { - return v.Compare(version.MustParse("1.19.0")) < 0 +// ensureMongoSharedSecret generates a MongoDB shared secret if +// required, updating the agent's config and state. +func (a *MachineAgent) ensureMongoSharedSecret(agentConfig agent.Config) error { + servingInfo, ok := agentConfig.StateServingInfo() + if !ok { + return stateWorkerServingConfigErr + } + + if servingInfo.SharedSecret != "" { + return nil // Already done + } + + logger.Infof("state serving info has no shared secret - generating") + + var err error + servingInfo.SharedSecret, err = mongo.GenerateSharedSecret() + if err != nil { + return err + } + logger.Debugf("updating state serving info in agent config") + if err = a.ChangeConfig(func(config agent.ConfigSetter) error { + config.SetStateServingInfo(servingInfo) + return nil + }); err != nil { + return err + } + agentConfig = a.CurrentConfig() + + logger.Debugf("updating state serving info in state") + + // Note: we set Direct=true in the mongo options because it's + // possible that we've previously upgraded the mongo server's + // configuration to form a replicaset, but failed to initiate it. + st, _, err := openState(agentConfig, mongo.DialOpts{Direct: true}) + if err != nil { + return err + } + defer st.Close() + + ssi := cmdutil.ParamsStateServingInfoToStateStateServingInfo(servingInfo) + if err := st.SetStateServingInfo(ssi); err != nil { + return errors.Errorf("cannot set state serving info: %v", err) + } + + logger.Infof("shared secret updated in state serving info") + return nil +} + +// isReplicasetConfigured returns true if the replicaset has been +// successfully initiated. +func isReplicasetConfigured(mongoInfo *mongo.MongoInfo) (bool, error) { + dialInfo, err := mongo.DialInfo(mongoInfo.Info, mongo.DefaultDialOpts()) + if err != nil { + return false, errors.Annotate(err, "cannot generate dial info to check replicaset") + } + dialInfo.Username = mongoInfo.Tag.String() + dialInfo.Password = mongoInfo.Password + + session, err := mgo.DialWithInfo(dialInfo) + if err != nil { + return false, errors.Annotate(err, "cannot dial mongo to check replicaset") + } + defer session.Close() + + cfg, err := replicaset.CurrentConfig(session) + if err != nil { + logger.Debugf("couldn't retrieve replicaset config (not fatal): %v", err) + return false, nil + } + numMembers := len(cfg.Members) + logger.Debugf("replicaset member count: %d", numMembers) + return numMembers > 0, nil +} + +// getMachineAddresses connects to state to determine the machine's +// network addresses. +func getMachineAddresses(agentConfig agent.Config) ([]network.Address, error) { + logger.Debugf("opening state to get machine addresses") + st, m, err := openState(agentConfig, mongo.DialOpts{Direct: true}) + if err != nil { + return nil, errors.Annotate(err, "failed to open state to retrieve machine addresses") + } + defer st.Close() + return m.Addresses(), nil +} + +// initiateReplicaSet connects to MongoDB and sets up the replicaset. +func initiateReplicaSet(mongoInfo *mongo.MongoInfo, statePort int, machineAddrs []network.Address) error { + peerAddr := mongo.SelectPeerAddress(machineAddrs) + if peerAddr == "" { + return errors.Errorf("no appropriate peer address found in %q", machineAddrs) + } + + dialInfo, err := mongo.DialInfo(mongoInfo.Info, mongo.DefaultDialOpts()) + if err != nil { + return errors.Annotate(err, "cannot generate dial info to initiate replicaset") + } + + if err := maybeInitiateMongoServer(peergrouper.InitiateMongoParams{ + DialInfo: dialInfo, + MemberHostPort: net.JoinHostPort(peerAddr, fmt.Sprint(statePort)), + User: mongoInfo.Tag.String(), // TODO(dfc) InitiateMongoParams should take a Tag + Password: mongoInfo.Password, + }); err != nil && err != peergrouper.ErrReplicaSetAlreadyInitiated { + return err + } + return nil } func openState(agentConfig agent.Config, dialOpts mongo.DialOpts) (_ *state.State, _ *state.Machine, err error) { diff --git a/cmd/jujud/agent/machine_test.go b/cmd/jujud/agent/machine_test.go index 22a3b54033..4b8a171473 100644 --- a/cmd/jujud/agent/machine_test.go +++ b/cmd/jujud/agent/machine_test.go @@ -96,7 +96,7 @@ func TestPackage(t *testing.T) { type commonMachineSuite struct { singularRecord *singularRunnerRecord lxctesting.TestSuite - fakeEnsureMongo agenttesting.FakeEnsure + fakeEnsureMongo *agenttesting.FakeEnsureMongo AgentSuite } @@ -134,9 +134,8 @@ func (s *commonMachineSuite) SetUpTest(c *gc.C) { return newDummyWorker(), nil }) - s.fakeEnsureMongo = agenttesting.FakeEnsure{} - s.AgentSuite.PatchValue(&cmdutil.EnsureMongoServer, s.fakeEnsureMongo.FakeEnsureMongo) - s.AgentSuite.PatchValue(&maybeInitiateMongoServer, s.fakeEnsureMongo.FakeInitiateMongo) + s.fakeEnsureMongo = agenttesting.InstallFakeEnsureMongo(s) + s.AgentSuite.PatchValue(&maybeInitiateMongoServer, s.fakeEnsureMongo.InitiateMongo) } func fakeCmd(path string) { @@ -1478,6 +1477,8 @@ func (s *MachineSuite) TestMachineAgentUpgradeMongo(c *gc.C) { err = s.State.MongoSession().DB("admin").RemoveUser(m.Tag().String()) c.Assert(err, jc.ErrorIsNil) + s.fakeEnsureMongo.ReplicasetInitiated = false + s.AgentSuite.PatchValue(&ensureMongoAdminUser, func(p mongo.EnsureAdminUserParams) (bool, error) { err := s.State.MongoSession().DB("admin").AddUser(p.User, p.Password, false) c.Assert(err, jc.ErrorIsNil) @@ -1583,6 +1584,42 @@ func (s *MachineSuite) TestNewEnvironmentStartsNewWorkers(c *gc.C) { c.Assert(workers, jc.DeepEquals, perEnvSingularWorkers) } +func (s *MachineSuite) TestReplicasetInitiation(c *gc.C) { + if runtime.GOOS == "windows" { + c.Skip("state servers on windows aren't supported") + } + + s.fakeEnsureMongo.ReplicasetInitiated = false + + m, _, _ := s.primeAgent(c, version.Current, state.JobManageEnviron) + a := s.newAgent(c, m) + agentConfig := a.CurrentConfig() + + err := a.ensureMongoServer(agentConfig) + c.Assert(err, jc.ErrorIsNil) + + c.Assert(s.fakeEnsureMongo.EnsureCount, gc.Equals, 1) + c.Assert(s.fakeEnsureMongo.InitiateCount, gc.Equals, 1) +} + +func (s *MachineSuite) TestReplicasetAlreadyInitiated(c *gc.C) { + if runtime.GOOS == "windows" { + c.Skip("state servers on windows aren't supported") + } + + s.fakeEnsureMongo.ReplicasetInitiated = true + + m, _, _ := s.primeAgent(c, version.Current, state.JobManageEnviron) + a := s.newAgent(c, m) + agentConfig := a.CurrentConfig() + + err := a.ensureMongoServer(agentConfig) + c.Assert(err, jc.ErrorIsNil) + + c.Assert(s.fakeEnsureMongo.EnsureCount, gc.Equals, 1) + c.Assert(s.fakeEnsureMongo.InitiateCount, gc.Equals, 0) +} + // MachineWithCharmsSuite provides infrastructure for tests which need to // work with charms. type MachineWithCharmsSuite struct { diff --git a/cmd/jujud/agent/testing/agent.go b/cmd/jujud/agent/testing/agent.go index 656049eb66..479c8a529d 100644 --- a/cmd/jujud/agent/testing/agent.go +++ b/cmd/jujud/agent/testing/agent.go @@ -10,12 +10,15 @@ import ( "time" "github.com/juju/cmd" + "github.com/juju/errors" "github.com/juju/names" jc "github.com/juju/testing/checkers" gc "gopkg.in/check.v1" + "gopkg.in/mgo.v2" "github.com/juju/juju/agent" agenttools "github.com/juju/juju/agent/tools" + cmdutil "github.com/juju/juju/cmd/jujud/util" "github.com/juju/juju/environs" "github.com/juju/juju/environs/filestorage" envtesting "github.com/juju/juju/environs/testing" @@ -23,6 +26,8 @@ import ( "github.com/juju/juju/juju/testing" "github.com/juju/juju/mongo" "github.com/juju/juju/network" + "github.com/juju/juju/replicaset" + jujureplicaset "github.com/juju/juju/replicaset" "github.com/juju/juju/state" coretesting "github.com/juju/juju/testing" coretools "github.com/juju/juju/tools" @@ -30,18 +35,47 @@ import ( "github.com/juju/juju/worker/peergrouper" ) -type FakeEnsure struct { - EnsureCount int - InitiateCount int - DataDir string - Namespace string - OplogSize int - Info state.StateServingInfo - InitiateParams peergrouper.InitiateMongoParams - Err error +type patchingSuite interface { + PatchValue(interface{}, interface{}) } -func (f *FakeEnsure) FakeEnsureMongo(args mongo.EnsureServerParams) error { +// InstallFakeEnsureMongo creates a new FakeEnsureMongo, patching +// out replicaset.CurrentConfig and cmdutil.EnsureMongoServer. +func InstallFakeEnsureMongo(suite patchingSuite) *FakeEnsureMongo { + f := &FakeEnsureMongo{ + ReplicasetInitiated: true, + } + suite.PatchValue(&replicaset.CurrentConfig, f.CurrentConfig) + suite.PatchValue(&cmdutil.EnsureMongoServer, f.EnsureMongo) + return f +} + +// FakeEnsureMongo provides test fakes for the functions used to +// initialise MongoDB. +type FakeEnsureMongo struct { + EnsureCount int + InitiateCount int + DataDir string + Namespace string + OplogSize int + Info state.StateServingInfo + InitiateParams peergrouper.InitiateMongoParams + Err error + ReplicasetInitiated bool +} + +func (f *FakeEnsureMongo) CurrentConfig(*mgo.Session) (*jujureplicaset.Config, error) { + if f.ReplicasetInitiated { + // Return a dummy replicaset config that's good enough to + // indicate that the replicaset is initiated. + return &jujureplicaset.Config{ + Members: []jujureplicaset.Member{{}}, + }, nil + } + return nil, errors.NotFoundf("replicaset") +} + +func (f *FakeEnsureMongo) EnsureMongo(args mongo.EnsureServerParams) error { f.EnsureCount++ f.DataDir, f.Namespace, f.OplogSize = args.DataDir, args.Namespace, args.OplogSize f.Info = state.StateServingInfo{ @@ -56,7 +90,7 @@ func (f *FakeEnsure) FakeEnsureMongo(args mongo.EnsureServerParams) error { return f.Err } -func (f *FakeEnsure) FakeInitiateMongo(p peergrouper.InitiateMongoParams) error { +func (f *FakeEnsureMongo) InitiateMongo(p peergrouper.InitiateMongoParams) error { f.InitiateCount++ f.InitiateParams = p return nil diff --git a/cmd/jujud/bootstrap_test.go b/cmd/jujud/bootstrap_test.go index a525bfbf45..01722efae1 100644 --- a/cmd/jujud/bootstrap_test.go +++ b/cmd/jujud/bootstrap_test.go @@ -66,7 +66,7 @@ type BootstrapSuite struct { dataDir string logDir string mongoOplogSize string - fakeEnsureMongo agenttesting.FakeEnsure + fakeEnsureMongo *agenttesting.FakeEnsureMongo bootstrapName string toolsStorage storage.Storage @@ -75,9 +75,6 @@ type BootstrapSuite struct { var _ = gc.Suite(&BootstrapSuite{}) func (s *BootstrapSuite) SetUpSuite(c *gc.C) { - s.PatchValue(&cmdutil.EnsureMongoServer, s.fakeEnsureMongo.FakeEnsureMongo) - s.PatchValue(&maybeInitiateMongoServer, s.fakeEnsureMongo.FakeInitiateMongo) - storageDir := c.MkDir() restorer := gitjujutesting.PatchValue(&envtools.DefaultBaseURL, storageDir) s.AddSuiteCleanup(func(*gc.C) { @@ -108,7 +105,8 @@ func (s *BootstrapSuite) SetUpTest(c *gc.C) { s.dataDir = c.MkDir() s.logDir = c.MkDir() s.mongoOplogSize = "1234" - s.fakeEnsureMongo = agenttesting.FakeEnsure{} + s.fakeEnsureMongo = agenttesting.InstallFakeEnsureMongo(s) + s.PatchValue(&maybeInitiateMongoServer, s.fakeEnsureMongo.InitiateMongo) // Create fake tools.tar.gz and downloaded-tools.txt. toolsDir := filepath.FromSlash(agenttools.SharedToolsDir(s.dataDir, version.Current)) diff --git a/featuretests/leadership_test.go b/featuretests/leadership_test.go index 78dcd0a60d..ccd56cb025 100644 --- a/featuretests/leadership_test.go +++ b/featuretests/leadership_test.go @@ -23,7 +23,6 @@ import ( "github.com/juju/juju/apiserver/params" agentcmd "github.com/juju/juju/cmd/jujud/agent" agenttesting "github.com/juju/juju/cmd/jujud/agent/testing" - cmdutil "github.com/juju/juju/cmd/jujud/util" "github.com/juju/juju/mongo" "github.com/juju/juju/state" statetesting "github.com/juju/juju/state/testing" @@ -50,12 +49,8 @@ func (s *leadershipSuite) SetUpTest(c *gc.C) { defer file.Close() s.AgentSuite.PatchValue(&agentcmd.JujuRun, file.Name()) - if runtime.GOOS == "windows" { - s.AgentSuite.PatchValue(&agentcmd.EnableJournaling, false) - } - - fakeEnsureMongo := agenttesting.FakeEnsure{} - s.AgentSuite.PatchValue(&cmdutil.EnsureMongoServer, fakeEnsureMongo.FakeEnsureMongo) + agenttesting.InstallFakeEnsureMongo(s) + s.PatchValue(&agentcmd.ProductionMongoWriteConcern, false) // Create a machine to manage the environment. stateServer, password := s.Factory.MakeMachineReturningPassword(c, &factory.MachineParams{ @@ -301,13 +296,8 @@ func (s *uniterLeadershipSuite) SetUpTest(c *gc.C) { file, _ := ioutil.TempFile("", "juju-run") defer file.Close() s.AgentSuite.PatchValue(&agentcmd.JujuRun, file.Name()) - - if runtime.GOOS == "windows" { - s.AgentSuite.PatchValue(&agentcmd.EnableJournaling, false) - } - - fakeEnsureMongo := agenttesting.FakeEnsure{} - s.AgentSuite.PatchValue(&cmdutil.EnsureMongoServer, fakeEnsureMongo.FakeEnsureMongo) + agenttesting.InstallFakeEnsureMongo(s) + s.PatchValue(&agentcmd.ProductionMongoWriteConcern, false) s.factory = factory.NewFactory(s.State) diff --git a/replicaset/replicaset.go b/replicaset/replicaset.go index b3c5876712..47d8ffe0c8 100644 --- a/replicaset/replicaset.go +++ b/replicaset/replicaset.go @@ -345,7 +345,9 @@ func CurrentMembers(session *mgo.Session) ([]Member, error) { // CurrentConfig returns the Config for the given session's replica set. If // there is no current config, the error returned will be mgo.ErrNotFound. -func CurrentConfig(session *mgo.Session) (*Config, error) { +var CurrentConfig = currentConfig // To support patching out in tests. + +func currentConfig(session *mgo.Session) (*Config, error) { cfg := &Config{} monotonicSession := session.Clone() defer monotonicSession.Close() diff --git a/worker/peergrouper/initiate.go b/worker/peergrouper/initiate.go index b05da03f1c..bd6a7c7347 100644 --- a/worker/peergrouper/initiate.go +++ b/worker/peergrouper/initiate.go @@ -62,21 +62,18 @@ func InitiateMongoServer(p InitiateMongoParams, force bool) error { } // Initiate may fail while mongo is initialising, so we retry until - // we succssfully populate the replicaset config. + // we successfully populate the replicaset config. var err error for attempt := initiateAttemptStrategy.Start(); attempt.Next(); { err = attemptInitiateMongoServer(p.DialInfo, p.MemberHostPort, force) - if err == nil { + if err == nil || err == ErrReplicaSetAlreadyInitiated { logger.Infof("replica set initiated") - return nil + return err } if attempt.HasNext() { logger.Debugf("replica set initiation failed, will retry: %v", err) } } - if err == ErrReplicaSetAlreadyInitiated { - return err - } return errors.Annotatef(err, "cannot initiate replica set") }