Skip to content

Commit

Permalink
Merge pull request #615 from wallyworld/agent-version-on-startup-master
Browse files Browse the repository at this point in the history
Agent version on startup master

Fixes: https://bugs.launchpad.net/bugs/1359800

Was reverted because it was thought to be causing a regression, but that wasn't the case. So resubmitting.
  • Loading branch information
jujubot committed Aug 27, 2014
2 parents 38d6c6c + 35dcef2 commit 8d0e514
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 39 deletions.
21 changes: 18 additions & 3 deletions cmd/jujud/machine.go
Expand Up @@ -42,6 +42,7 @@ import (
apiagent "github.com/juju/juju/state/api/agent"
"github.com/juju/juju/state/api/params"
"github.com/juju/juju/state/apiserver"
coretools "github.com/juju/juju/tools"
"github.com/juju/juju/version"
"github.com/juju/juju/worker"
"github.com/juju/juju/worker/apiaddressupdater"
Expand Down Expand Up @@ -288,6 +289,13 @@ func (a *MachineAgent) APIWorker() (worker.Worker, error) {
}
}

// Before starting any workers, ensure we record the Juju version this machine
// agent is running.
currentTools := &coretools.Tools{Version: version.Current}
if err := st.Upgrader().SetVersion(agentConfig.Tag().String(), currentTools.Version); err != nil {
return nil, errors.Annotate(err, "cannot set machine agent version")
}

providerType := agentConfig.Value(agent.ProviderType)

// Run the upgrader and the upgrade-steps worker without waiting for
Expand Down Expand Up @@ -343,6 +351,10 @@ func (a *MachineAgent) APIWorker() (worker.Worker, error) {

// Perform the operations needed to set up hosting for containers.
if err := a.setupContainerSupport(runner, st, entity, agentConfig); err != nil {
cause := errors.Cause(err)
if params.IsCodeDead(cause) || cause == worker.ErrTerminateAgent {
return nil, worker.ErrTerminateAgent
}
return nil, fmt.Errorf("setting up container support: %v", err)
}
for _, job := range entity.Jobs() {
Expand Down Expand Up @@ -412,17 +424,20 @@ func (a *MachineAgent) updateSupportedContainers(
return err
}
machine, err := pr.Machine(tag)
if errors.IsNotFound(err) || err == nil && machine.Life() == params.Dead {
return worker.ErrTerminateAgent
}
if err != nil {
return fmt.Errorf("%s is not in state: %v", tag, err)
return errors.Annotatef(err, "cannot load machine %s from state", tag)
}
if len(containers) == 0 {
if err := machine.SupportsNoContainers(); err != nil {
return fmt.Errorf("clearing supported containers for %s: %v", tag, err)
return errors.Annotatef(err, "clearing supported containers for %s", tag)
}
return nil
}
if err := machine.SetSupportedContainers(containers...); err != nil {
return fmt.Errorf("setting supported containers for %s: %v", tag, err)
return errors.Annotatef(err, "setting supported containers for %s", tag)
}
initLock, err := hookExecutionLock(agentConfig.DataDir())
if err != nil {
Expand Down
37 changes: 37 additions & 0 deletions cmd/jujud/machine_test.go
Expand Up @@ -248,6 +248,8 @@ func (s *MachineSuite) TestDyingMachine(c *gc.C) {
defer func() {
c.Check(a.Stop(), gc.IsNil)
}()
// Wait for configuration to be finished
<-a.WorkersStarted()
err := m.Destroy()
c.Assert(err, gc.IsNil)
select {
Expand Down Expand Up @@ -694,6 +696,41 @@ func (s *MachineSuite) TestManageEnvironServesAPI(c *gc.C) {
})
}

func (s *MachineSuite) assertAgentSetsToolsVersion(c *gc.C, job state.MachineJob) {
vers := version.Current
vers.Minor = version.Current.Minor + 1
m, _, _ := s.primeAgent(c, vers, job)
a := s.newAgent(c, m)
go func() { c.Check(a.Run(nil), gc.IsNil) }()
defer func() { c.Check(a.Stop(), gc.IsNil) }()

timeout := time.After(coretesting.LongWait)
for done := false; !done; {
select {
case <-timeout:
c.Fatalf("timeout while waiting for agent version to be set")
case <-time.After(coretesting.ShortWait):
err := m.Refresh()
c.Assert(err, gc.IsNil)
agentTools, err := m.AgentTools()
c.Assert(err, gc.IsNil)
if agentTools.Version.Minor != version.Current.Minor {
continue
}
c.Assert(agentTools.Version, gc.DeepEquals, version.Current)
done = true
}
}
}

func (s *MachineSuite) TestAgentSetsToolsVersionManageEnviron(c *gc.C) {
s.assertAgentSetsToolsVersion(c, state.JobManageEnviron)
}

func (s *MachineSuite) TestAgentSetsToolsVersionHostUnits(c *gc.C) {
s.assertAgentSetsToolsVersion(c, state.JobHostUnits)
}

func (s *MachineSuite) TestManageEnvironRunsCleaner(c *gc.C) {
s.assertJobWithState(c, state.JobManageEnviron, func(conf agent.Config, agentState *state.State) {
// Create a service and unit, and destroy the service.
Expand Down
10 changes: 10 additions & 0 deletions cmd/jujud/unit.go
Expand Up @@ -8,12 +8,14 @@ import (
"runtime"

"github.com/juju/cmd"
"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/names"
"launchpad.net/gnuflag"
"launchpad.net/tomb"

"github.com/juju/juju/network"
"github.com/juju/juju/tools"
"github.com/juju/juju/version"
"github.com/juju/juju/worker"
"github.com/juju/juju/worker/apiaddressupdater"
Expand Down Expand Up @@ -93,6 +95,14 @@ func (a *UnitAgent) APIWorkers() (worker.Worker, error) {
if err != nil {
return nil, err
}

// Before starting any workers, ensure we record the Juju version this unit
// agent is running.
currentTools := &tools.Tools{Version: version.Current}
if err := st.Upgrader().SetVersion(agentConfig.Tag().String(), currentTools.Version); err != nil {
return nil, errors.Annotate(err, "cannot set unit agent version")
}

runner := worker.NewRunner(connectionIsFatal(st), moreImportant)
runner.StartWorker("upgrader", func() (worker.Worker, error) {
return upgrader.NewUpgrader(
Expand Down
30 changes: 30 additions & 0 deletions cmd/jujud/unit_test.go
Expand Up @@ -263,6 +263,36 @@ func (s *UnitSuite) TestRsyslogConfigWorker(c *gc.C) {
}
}

func (s *UnitSuite) TestAgentSetsToolsVersion(c *gc.C) {
_, unit, _, _ := s.primeAgent(c)
vers := version.Current
vers.Minor = version.Current.Minor + 1
err := unit.SetAgentVersion(vers)
c.Assert(err, gc.IsNil)

a := s.newAgent(c, unit)
go func() { c.Check(a.Run(nil), gc.IsNil) }()
defer func() { c.Check(a.Stop(), gc.IsNil) }()

timeout := time.After(coretesting.LongWait)
for done := false; !done; {
select {
case <-timeout:
c.Fatalf("timeout while waiting for agent version to be set")
case <-time.After(coretesting.ShortWait):
err := unit.Refresh()
c.Assert(err, gc.IsNil)
agentTools, err := unit.AgentTools()
c.Assert(err, gc.IsNil)
if agentTools.Version.Minor != version.Current.Minor {
continue
}
c.Assert(agentTools.Version, gc.DeepEquals, version.Current)
done = true
}
}
}

func (s *UnitSuite) TestUnitAgentRunsAPIAddressUpdaterWorker(c *gc.C) {
_, unit, _, _ := s.primeAgent(c)
a := s.newAgent(c, unit)
Expand Down
5 changes: 5 additions & 0 deletions state/api/params/apierror.go
Expand Up @@ -41,6 +41,7 @@ const (
CodeUnitHasSubordinates = "unit has subordinates"
CodeNotAssigned = "not assigned"
CodeStopped = "stopped"
CodeDead = "dead"
CodeHasAssignedUnits = "machine has assigned units"
CodeNotProvisioned = "not provisioned"
CodeNoAddressSet = "no address set"
Expand Down Expand Up @@ -117,6 +118,10 @@ func IsCodeStopped(err error) bool {
return ErrCode(err) == CodeStopped
}

func IsCodeDead(err error) bool {
return ErrCode(err) == CodeDead
}

func IsCodeHasAssignedUnits(err error) bool {
return ErrCode(err) == CodeHasAssignedUnits
}
Expand Down
1 change: 1 addition & 0 deletions state/apiserver/common/errors.go
Expand Up @@ -79,6 +79,7 @@ var singletonErrorCodes = map[error]string{
state.ErrCannotEnterScopeYet: params.CodeCannotEnterScopeYet,
state.ErrCannotEnterScope: params.CodeCannotEnterScope,
state.ErrUnitHasSubordinates: params.CodeUnitHasSubordinates,
state.ErrDead: params.CodeDead,
txn.ErrExcessiveContention: params.CodeExcessiveContention,
ErrBadId: params.CodeNotFound,
ErrBadCreds: params.CodeUnauthorized,
Expand Down
4 changes: 4 additions & 0 deletions state/apiserver/common/errors_test.go
Expand Up @@ -44,6 +44,10 @@ var errorTransformTests = []struct {
err: state.ErrCannotEnterScope,
code: params.CodeCannotEnterScope,
helperFunc: params.IsCodeCannotEnterScope,
}, {
err: state.ErrDead,
code: params.CodeDead,
helperFunc: params.IsCodeDead,
}, {
err: txn.ErrExcessiveContention,
code: params.CodeExcessiveContention,
Expand Down
12 changes: 7 additions & 5 deletions state/machine.go
Expand Up @@ -255,7 +255,7 @@ func (m *Machine) SetHasVote(hasVote bool) error {
Update: bson.D{{"$set", bson.D{{"hasvote", hasVote}}}},
}}
if err := m.st.runTransaction(ops); err != nil {
return fmt.Errorf("cannot set HasVote of machine %v: %v", m, onAbort(err, errDead))
return fmt.Errorf("cannot set HasVote of machine %v: %v", m, onAbort(err, ErrDead))
}
m.doc.HasVote = hasVote
return nil
Expand Down Expand Up @@ -323,7 +323,7 @@ func (m *Machine) SetAgentVersion(v version.Binary) (err error) {
Update: bson.D{{"$set", bson.D{{"tools", tools}}}},
}}
if err := m.st.runTransaction(ops); err != nil {
return onAbort(err, errDead)
return onAbort(err, ErrDead)
}
m.doc.Tools = tools
return nil
Expand Down Expand Up @@ -358,7 +358,7 @@ func (m *Machine) setPasswordHash(passwordHash string) error {
Update: bson.D{{"$set", bson.D{{"passwordhash", passwordHash}}}},
}}
if err := m.st.runTransaction(ops); err != nil {
return fmt.Errorf("cannot set password of machine %v: %v", m, onAbort(err, errDead))
return fmt.Errorf("cannot set password of machine %v: %v", m, onAbort(err, ErrDead))
}
m.doc.PasswordHash = passwordHash
return nil
Expand Down Expand Up @@ -1010,7 +1010,7 @@ func (m *Machine) setAddresses(addresses []network.Address, field *[]address, fi
}
}
if m.doc.Life == Dead {
return nil, errDead
return nil, ErrDead
}
op := txn.Op{
C: machinesC,
Expand Down Expand Up @@ -1329,7 +1329,9 @@ func (m *Machine) updateSupportedContainers(supportedContainers []instance.Conta
},
}
if err = m.st.runTransaction(ops); err != nil {
return fmt.Errorf("cannot update supported containers of machine %v: %v", m, onAbort(err, errDead))
err = onAbort(err, ErrDead)
logger.Errorf("cannot update supported containers of machine %v: %v", m, err)
return err
}
m.doc.SupportedContainers = supportedContainers
m.doc.SupportedContainersKnown = true
Expand Down
2 changes: 1 addition & 1 deletion state/state.go
Expand Up @@ -402,7 +402,7 @@ func (st *State) SetEnvironConstraints(cons constraints.Value) error {
return writeConstraints(st, environGlobalKey, cons)
}

var errDead = fmt.Errorf("not found or dead")
var ErrDead = fmt.Errorf("not found or dead")
var errNotAlive = fmt.Errorf("not found or not alive")

func onAbort(txnErr, err error) error {
Expand Down
12 changes: 6 additions & 6 deletions state/unit.go
Expand Up @@ -195,7 +195,7 @@ func (u *Unit) SetAgentVersion(v version.Binary) (err error) {
Update: bson.D{{"$set", bson.D{{"tools", tools}}}},
}}
if err := u.st.runTransaction(ops); err != nil {
return onAbort(err, errDead)
return onAbort(err, ErrDead)
}
u.doc.Tools = tools
return nil
Expand All @@ -221,7 +221,7 @@ func (u *Unit) setPasswordHash(passwordHash string) error {
}}
err := u.st.runTransaction(ops)
if err != nil {
return fmt.Errorf("cannot set password of unit %q: %v", u, onAbort(err, errDead))
return fmt.Errorf("cannot set password of unit %q: %v", u, onAbort(err, ErrDead))
}
u.doc.PasswordHash = passwordHash
return nil
Expand Down Expand Up @@ -724,7 +724,7 @@ func (u *Unit) SetStatus(status params.Status, info string, data params.StatusDa
}
err := u.st.runTransaction(ops)
if err != nil {
return fmt.Errorf("cannot set status of unit %q: %v", u, onAbort(err, errDead))
return fmt.Errorf("cannot set status of unit %q: %v", u, onAbort(err, ErrDead))
}
return nil
}
Expand Down Expand Up @@ -784,7 +784,7 @@ func (u *Unit) openUnitPort(protocol string, number int) (err error) {
}}
err = u.st.runTransaction(ops)
if err != nil {
return onAbort(err, errDead)
return onAbort(err, ErrDead)
}
found := false
for _, p := range u.doc.Ports {
Expand Down Expand Up @@ -813,7 +813,7 @@ func (u *Unit) closeUnitPort(protocol string, number int) (err error) {
}}
err = u.st.runTransaction(ops)
if err != nil {
return onAbort(err, errDead)
return onAbort(err, ErrDead)
}
newPorts := make([]network.Port, 0, len(u.doc.Ports))
for _, p := range u.doc.Ports {
Expand Down Expand Up @@ -1700,7 +1700,7 @@ func (u *Unit) SetResolved(mode ResolvedMode) (err error) {
if ok, err := isNotDead(u.st.db, unitsC, u.doc.Name); err != nil {
return err
} else if !ok {
return errDead
return ErrDead
}
// For now, the only remaining assert is that resolved was unset.
return fmt.Errorf("already resolved")
Expand Down
8 changes: 1 addition & 7 deletions worker/provisioner/container_initialisation.go
Expand Up @@ -86,7 +86,7 @@ func (cs *ContainerSetup) Handle(containerIds []string) (resultError error) {
return nil
}

logger.Tracef("initial container setup with ids: %v", containerIds)
logger.Infof("initial container setup with ids: %v", containerIds)
for _, id := range containerIds {
containerType := state.ContainerTypeFromId(id)
// If this container type has been dealt with, do nothing.
Expand Down Expand Up @@ -119,12 +119,6 @@ func (cs *ContainerSetup) initialiseAndStartProvisioner(containerType instance.C
}
}

// We only care about the initial container creation.
// This worker has done its job so stop it.
// We do not expect there will be an error, and there's not much we can do anyway.
if err := cs.runner.StopWorker(cs.workerName); err != nil {
logger.Warningf("stopping machine agent container watcher: %v", err)
}
if initialiser, broker, err := cs.getContainerArtifacts(containerType); err != nil {
return fmt.Errorf("initialising container infrastructure on host machine: %v", err)
} else {
Expand Down
9 changes: 2 additions & 7 deletions worker/upgrader/upgrader.go
Expand Up @@ -104,11 +104,6 @@ func allowedTargetVersion(
}

func (u *Upgrader) loop() error {
currentTools := &coretools.Tools{Version: version.Current}
err := u.st.SetVersion(u.tag.String(), currentTools.Version)
if err != nil {
return err
}
versionWatcher, err := u.st.WatchAPIVersion(u.tag.String())
if err != nil {
return err
Expand Down Expand Up @@ -142,7 +137,7 @@ func (u *Upgrader) loop() error {
case <-dying:
return nil
}
if wantVersion == currentTools.Version.Number {
if wantVersion == version.Current.Number {
continue
} else if !allowedTargetVersion(u.origAgentVersion, version.Current.Number,
u.isUpgradeRunning(), wantVersion) {
Expand All @@ -155,7 +150,7 @@ func (u *Upgrader) loop() error {
wantVersion, version.Current)
continue
}
logger.Infof("upgrade requested from %v to %v", currentTools.Version, wantVersion)
logger.Infof("upgrade requested from %v to %v", version.Current, wantVersion)

// Check if tools have already been downloaded.
wantVersionBinary := toBinaryVersion(wantVersion)
Expand Down

0 comments on commit 8d0e514

Please sign in to comment.