Permalink
Browse files

Only run update-status hook after everything else and when no error, …

…and refactor implementation
  • Loading branch information...
1 parent 5fb1ce9 commit 1fbd501e88ee29456b7c471ce617877f6f1393ad @wallyworld wallyworld committed Sep 3, 2015
View
@@ -313,7 +313,7 @@ func (s *RunTestSuite) runListenerForAgent(c *gc.C, agent string) {
c.Assert(err, jc.ErrorIsNil)
c.Assert(listener, gc.NotNil)
s.AddCleanup(func(*gc.C) {
- listener.Close()
+ c.Assert(listener.Close(), jc.ErrorIsNil)
})
}
@@ -47,7 +47,8 @@ func (hi Info) Validate() error {
return fmt.Errorf("%q hook requires a remote unit", hi.Kind)
}
fallthrough
- case hooks.Install, hooks.Start, hooks.ConfigChanged, hooks.UpgradeCharm, hooks.Stop, hooks.RelationBroken, hooks.CollectMetrics, hooks.MeterStatusChanged, hooks.UpdateStatus:
+ case hooks.Install, hooks.Start, hooks.ConfigChanged, hooks.UpgradeCharm, hooks.Stop, hooks.RelationBroken,
+ hooks.CollectMetrics, hooks.MeterStatusChanged, hooks.UpdateStatus:
return nil
case hooks.Action:
return fmt.Errorf("hooks.Kind Action is deprecated")
@@ -48,6 +48,9 @@ type Snapshot struct {
// version of the leader settings for the service.
LeaderSettingsVersion int
+ // UpdateStatusRequired indicates if an UpdateStatus hook should run.
+ UpdateStatusRequired bool
+
// Actions is the list of pending actions to
// be peformed by this unit.
Actions []string
@@ -33,7 +33,9 @@ type RemoteStateWatcher struct {
storageAttachmentWatchers map[names.StorageTag]*storageAttachmentWatcher
storageAttachmentChanges chan storageAttachmentChange
leadershipTracker leadership.Tracker
- tomb tomb.Tomb
+ updateStatusChannel func() <-chan time.Time
+
+ tomb tomb.Tomb
out chan struct{}
mu sync.Mutex
@@ -43,9 +45,10 @@ type RemoteStateWatcher struct {
// WatcherConfig holds configuration parameters for the
// remote state watcher.
type WatcherConfig struct {
- State State
- LeadershipTracker leadership.Tracker
- UnitTag names.UnitTag
+ State State
+ LeadershipTracker leadership.Tracker
+ UpdateStatusChannel func() <-chan time.Time
+ UnitTag names.UnitTag
}
// TimedSignal is the signature of a function used to generate a
@@ -62,6 +65,7 @@ func NewWatcher(config WatcherConfig) (*RemoteStateWatcher, error) {
storageAttachmentWatchers: make(map[names.StorageTag]*storageAttachmentWatcher),
storageAttachmentChanges: make(chan storageAttachmentChange),
leadershipTracker: config.LeadershipTracker,
+ updateStatusChannel: config.UpdateStatusChannel,
// Note: it is important that the out channel be buffered!
// The remote state watcher will perform a non-blocking send
// on the channel to wake up the observer. It is non-blocking
@@ -121,6 +125,10 @@ func (w *RemoteStateWatcher) Snapshot() Snapshot {
for i, action := range w.current.Actions {
snapshot.Actions[i] = action
}
+ // We return a snapshot with the current UpdateStatusRequired value.
+ // We reset it so that subsequent snapshots wait until the timer is
+ // triggered again before setting the value again.
+ w.current.UpdateStatusRequired = false
return snapshot
}
@@ -376,13 +384,27 @@ func (w *RemoteStateWatcher) loop(unitTag names.UnitTag) (err error) {
if err := w.relationUnitsChanged(change); err != nil {
return err
}
+
+ case <-w.updateStatusChannel():
+ logger.Debugf("update status timer triggered")
+ if err := w.updateStatusChanged(); err != nil {
+ return err
+ }
}
// Something changed.
fire()
}
}
+// updateStatusChanged is called when the update status timer expires.
+func (w *RemoteStateWatcher) updateStatusChanged() error {
+ w.mu.Lock()
+ w.current.UpdateStatusRequired = true
+ w.mu.Unlock()
+ return nil
+}
+
// unitChanged responds to changes in the unit.
func (w *RemoteStateWatcher) unitChanged() error {
if err := w.unit.Refresh(); err != nil {
@@ -4,6 +4,9 @@
package remotestate_test
import (
+ "time"
+
+ "github.com/juju/names"
jc "github.com/juju/testing/checkers"
gc "gopkg.in/check.v1"
"gopkg.in/juju/charm.v5"
@@ -12,7 +15,6 @@ import (
"github.com/juju/juju/state/multiwatcher"
"github.com/juju/juju/testing"
"github.com/juju/juju/worker/uniter/remotestate"
- "github.com/juju/names"
)
type WatcherSuite struct {
@@ -21,6 +23,7 @@ type WatcherSuite struct {
st mockState
leadership mockLeadershipTracker
watcher *remotestate.RemoteStateWatcher
+ clock *testing.Clock
}
var _ = gc.Suite(&WatcherSuite{})
@@ -61,10 +64,18 @@ func (s *WatcherSuite) SetUpTest(c *gc.C) {
minionTicket: mockTicket{make(chan struct{}, 1), true},
}
+ s.clock = testing.NewClock(time.Now())
+ statusTicker := func() <-chan time.Time {
+ // Duration is arbitrary, we'll trigger the ticker
+ // by advancing the clock past the duration.
+ return s.clock.After(10 * time.Second)
+ }
+
w, err := remotestate.NewWatcher(remotestate.WatcherConfig{
- State: &s.st,
- LeadershipTracker: &s.leadership,
- UnitTag: s.st.unit.tag,
+ State: &s.st,
+ LeadershipTracker: &s.leadership,
+ UnitTag: s.st.unit.tag,
+ UpdateStatusChannel: statusTicker,
})
c.Assert(err, jc.ErrorIsNil)
s.watcher = w
@@ -167,13 +178,22 @@ func (s *WatcherSuite) TestRemoteStateChanged(c *gc.C) {
s.st.unit.service.relationsWatcher.changes <- []string{}
assertOneChange()
+
+ s.clock.Advance(15 * time.Second)
+ assertOneChange()
}
func (s *WatcherSuite) TestActionsReceived(c *gc.C) {
+ statusTicker := func() <-chan time.Time {
+ // Duration is arbitrary, we'll trigger the ticker
+ // by advancing the clock past the duration.
+ return s.clock.After(10 * time.Second)
+ }
config := remotestate.WatcherConfig{
- State: &s.st,
- LeadershipTracker: &s.leadership,
- UnitTag: s.st.unit.tag,
+ State: &s.st,
+ LeadershipTracker: &s.leadership,
+ UnitTag: s.st.unit.tag,
+ UpdateStatusChannel: statusTicker,
}
w, err := remotestate.NewWatcher(config)
c.Assert(err, jc.ErrorIsNil)
@@ -469,3 +489,27 @@ func (s *WatcherSuite) TestRelationUnitsChanged(c *gc.C) {
map[string]int64{"mysql/2": 1},
)
}
+
+func (s *WatcherSuite) TestUpdateStatusTicker(c *gc.C) {
+ signalAll(&s.st, &s.leadership)
+ assertNotifyEvent(c, s.watcher.RemoteStateChanged(), "waiting for remote state change")
+ c.Assert(s.watcher.Snapshot().UpdateStatusRequired, jc.IsFalse)
+
+ // Advance the clock past the triiger time.
+ s.clock.Advance(11 * time.Second)
+ assertNotifyEvent(c, s.watcher.RemoteStateChanged(), "waiting for remote state change")
+ c.Assert(s.watcher.Snapshot().UpdateStatusRequired, jc.IsTrue)
+ // Flag is reset after snapshot is first read.
+ c.Assert(s.watcher.Snapshot().UpdateStatusRequired, jc.IsFalse)
+
+ // Advance again but not past the trigger time.
+ s.clock.Advance(6 * time.Second)
+ assertNoNotifyEvent(c, s.watcher.RemoteStateChanged(), "unexpected remote state change")
+ c.Assert(s.watcher.Snapshot().UpdateStatusRequired, jc.IsFalse)
+
+ // And we ht the trigger time.
+ s.clock.Advance(5 * time.Second)
+ assertNotifyEvent(c, s.watcher.RemoteStateChanged(), "waiting for remote state change")
+ c.Assert(s.watcher.Snapshot().UpdateStatusRequired, jc.IsTrue)
+ c.Assert(s.watcher.Snapshot().UpdateStatusRequired, jc.IsFalse)
+}
View
@@ -207,5 +207,15 @@ func (s *uniterResolver) nextOp(
return opFactory.NewRunHook(hook.Info{Kind: hooks.ConfigChanged})
}
- return s.relationsResolver.NextOp(localState, remoteState, opFactory)
+ op, err := s.relationsResolver.NextOp(localState, remoteState, opFactory)
+ if errors.Cause(err) != resolver.ErrNoOperation {
+ return op, err
+ }
+
+ // UpdateStatus hook runs if nothing else needs to.
+ if remoteState.UpdateStatusRequired {
+ return opFactory.NewRunHook(hook.Info{Kind: hooks.UpdateStatus})
+ }
+
+ return nil, resolver.ErrNoOperation
}
@@ -4,15 +4,12 @@
package resolver
import (
- "time"
-
"github.com/juju/errors"
"gopkg.in/juju/charm.v5"
"gopkg.in/juju/charm.v5/hooks"
"launchpad.net/tomb"
"github.com/juju/juju/worker/charmdir"
- "github.com/juju/juju/worker/uniter/hook"
"github.com/juju/juju/worker/uniter/operation"
"github.com/juju/juju/worker/uniter/remotestate"
)
@@ -23,7 +20,6 @@ type LoopConfig struct {
Watcher remotestate.Watcher
Executor operation.Executor
Factory operation.Factory
- UpdateStatusChannel func() <-chan time.Time
CharmURL *charm.URL
Conflicted bool
Dying <-chan struct{}
@@ -68,8 +64,6 @@ func Loop(cfg LoopConfig) (LocalState, error) {
updateCharmDir(cfg.Executor.State(), cfg.CharmDirLocker)
for {
- // TODO(axw) move update status to the watcher.
- updateStatus := cfg.UpdateStatusChannel()
rf.RemoteState = cfg.Watcher.Snapshot()
rf.LocalState.State = cfg.Executor.State()
@@ -107,15 +101,6 @@ func Loop(cfg LoopConfig) (LocalState, error) {
case <-cfg.Dying:
return rf.LocalState, tomb.ErrDying
case <-cfg.Watcher.RemoteStateChanged():
- case <-updateStatus:
- // TODO(axw) move this to a resolver.
- op, err := cfg.Factory.NewRunHook(hook.Info{Kind: hooks.UpdateStatus})
- if err != nil {
- return rf.LocalState, errors.Trace(err)
- }
- if err := cfg.Executor.Run(op); err != nil {
- return rf.LocalState, errors.Trace(err)
- }
}
}
}
@@ -53,10 +53,6 @@ func (s *LoopSuite) loop() (resolver.LocalState, error) {
Factory: s.opFactory,
Watcher: s.watcher,
Executor: s.executor,
- UpdateStatusChannel: func() <-chan time.Time {
- // TODO(axw) test update status channel
- return nil
- },
CharmURL: s.charmURL,
Dying: s.dying,
OnIdle: s.onIdle,
@@ -39,7 +39,7 @@ type CommandRunner interface {
}
// RunListener is responsible for listening on the network connection and
-// seting up the rpc server on that net connection. Also starts the go routine
+// setting up the rpc server on that net connection. Also starts the go routine
// that listens and hands off the work.
type RunListener struct {
listener net.Listener
@@ -122,9 +122,11 @@ func (s *RunListener) Run() (err error) {
// Close immediately stops accepting connections, and blocks until all existing
// connections have been closed.
-func (s *RunListener) Close() {
+func (s *RunListener) Close() error {
+ defer func() {
+ <-s.closed
+ logger.Debugf("juju-run listener stopped")
+ }()
close(s.closing)
- s.listener.Close()
- <-s.closed
- logger.Debugf("juju-run listener stopped")
+ return s.listener.Close()
}
@@ -42,7 +42,7 @@ func (s *ListenerSuite) NewRunListener(c *gc.C) *uniter.RunListener {
c.Assert(err, jc.ErrorIsNil)
c.Assert(listener, gc.NotNil)
s.AddCleanup(func(*gc.C) {
- listener.Close()
+ c.Assert(listener.Close(), jc.ErrorIsNil)
})
return listener
}
@@ -56,7 +56,7 @@ func (s *ListenerSuite) TestNewRunListenerOnExistingSocketRemovesItAndSucceeds(c
listener, err := uniter.NewRunListener(&mockRunner{}, s.socketPath)
c.Assert(err, jc.ErrorIsNil)
c.Assert(listener, gc.NotNil)
- listener.Close()
+ c.Assert(listener.Close(), jc.ErrorIsNil)
}
func (s *ListenerSuite) TestClientCall(c *gc.C) {
Oops, something went wrong.

0 comments on commit 1fbd501

Please sign in to comment.