Skip to content

Commit

Permalink
Facade/state work for action abort
Browse files Browse the repository at this point in the history
- Added Cancel to Action state for handling running/pending actions.
- Added facade call on uniter facade to get ActionStatus.
- Extended ActionNotifications to support changes not just new actions.
- Added support for running operations to receive updates to remote
state.
  • Loading branch information
hpidcock committed Feb 20, 2020
1 parent 29900ab commit c5b378e
Show file tree
Hide file tree
Showing 49 changed files with 1,250 additions and 230 deletions.
45 changes: 45 additions & 0 deletions api/uniter/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,55 @@ func (s *actionSuite) TestActionComplete(c *gc.C) {
action, err := s.uniterSuite.wordpressUnit.AddAction(operationID, "fakeaction", nil)
c.Assert(err, jc.ErrorIsNil)

err = s.uniter.ActionBegin(action.ActionTag())
c.Assert(err, jc.ErrorIsNil)

actionResult := map[string]interface{}{"output": "it worked!"}
err = s.uniter.ActionFinish(action.ActionTag(), params.ActionCompleted, actionResult, "")
c.Assert(err, jc.ErrorIsNil)

completed, err = s.uniterSuite.wordpressUnit.CompletedActions()
c.Assert(err, jc.ErrorIsNil)
c.Assert(len(completed), gc.Equals, 1)
c.Assert(completed[0].Status(), gc.Equals, state.ActionCompleted)
res, errstr := completed[0].Results()
c.Assert(errstr, gc.Equals, "")
c.Assert(res, gc.DeepEquals, actionResult)
c.Assert(completed[0].Name(), gc.Equals, "fakeaction")
operation, err := s.Model.Operation(operationID)
c.Assert(err, jc.ErrorIsNil)
c.Assert(operation.Status(), gc.Equals, state.ActionCompleted)
}

func (s *actionSuite) TestActionStatus(c *gc.C) {
completed, err := s.uniterSuite.wordpressUnit.CompletedActions()
c.Assert(err, jc.ErrorIsNil)
c.Assert(completed, gc.DeepEquals, ([]state.Action)(nil))

operationID, err := s.Model.EnqueueOperation("a test")
c.Assert(err, jc.ErrorIsNil)
action, err := s.uniterSuite.wordpressUnit.AddAction(operationID, "fakeaction", nil)
c.Assert(err, jc.ErrorIsNil)

status, err := s.uniter.ActionStatus(action.ActionTag())
c.Assert(err, jc.ErrorIsNil)
c.Assert(status, gc.Equals, params.ActionPending)

err = s.uniter.ActionBegin(action.ActionTag())
c.Assert(err, jc.ErrorIsNil)

status, err = s.uniter.ActionStatus(action.ActionTag())
c.Assert(err, jc.ErrorIsNil)
c.Assert(status, gc.Equals, params.ActionRunning)

actionResult := map[string]interface{}{"output": "it worked!"}
err = s.uniter.ActionFinish(action.ActionTag(), params.ActionCompleted, actionResult, "")
c.Assert(err, jc.ErrorIsNil)

status, err = s.uniter.ActionStatus(action.ActionTag())
c.Assert(err, jc.ErrorIsNil)
c.Assert(status, gc.Equals, params.ActionCompleted)

completed, err = s.uniterSuite.wordpressUnit.CompletedActions()
c.Assert(err, jc.ErrorIsNil)
c.Assert(len(completed), gc.Equals, 1)
Expand Down
27 changes: 27 additions & 0 deletions api/uniter/uniter.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,33 @@ func (st *State) getOneAction(tag *names.ActionTag) (params.ActionResult, error)
return result, nil
}

// ActionStatus provides the status of a single action.
func (st *State) ActionStatus(tag names.ActionTag) (string, error) {
args := params.Entities{
Entities: []params.Entity{
{Tag: tag.String()},
},
}

var results params.StringResults
err := st.facade.FacadeCall("ActionStatus", args, &results)
if err != nil {
return "", err
}

if len(results.Results) > 1 {
return "", fmt.Errorf("expected only 1 action query result, got %d", len(results.Results))
}

// handle server errors
result := results.Results[0]
if err := result.Error; err != nil {
return "", err
}

return result.Result, nil
}

// Unit provides access to methods of a state.Unit through the facade.
func (st *State) Unit(tag names.UnitTag) (*Unit, error) {
unit := &Unit{
Expand Down
31 changes: 30 additions & 1 deletion apiserver/common/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func ParamsActionExecutionResultsToStateActionResults(arg params.ActionExecution
status = state.ActionFailed
case params.ActionPending:
status = state.ActionPending
case params.ActionAborting:
status = state.ActionAborting
case params.ActionAborted:
status = state.ActionAborted
default:
return state.ActionResults{}, errors.Errorf("unrecognized action status '%s'", arg.Status)
}
Expand Down Expand Up @@ -162,7 +166,8 @@ func Actions(args params.Entities, actionFn func(string) (state.Action, error))
return results
}

// WatchOneActionReceiverNotifications to create a watcher for one receiver.
// WatchOneActionReceiverNotifications returns a function for creating a
// watcher on all action notifications (action adds + changes) for one receiver.
// It needs a tagToActionReceiver function and a registerFunc to register
// resources.
// It's a helper function currently used by the uniter and by machineactions
Expand All @@ -185,6 +190,30 @@ func WatchOneActionReceiverNotifications(tagToActionReceiver func(tag string) (s
}
}

// WatchPendingActionsForReceiver returns a function for creating a
// watcher on new pending Actions for one receiver.
// It needs a tagToActionReceiver function and a registerFunc to register
// resources.
// It's a helper function currently used by the uniter and by machineactions
func WatchPendingActionsForReceiver(tagToActionReceiver func(tag string) (state.ActionReceiver, error), registerFunc func(r facade.Resource) string) func(names.Tag) (params.StringsWatchResult, error) {
return func(tag names.Tag) (params.StringsWatchResult, error) {
nothing := params.StringsWatchResult{}
receiver, err := tagToActionReceiver(tag.String())
if err != nil {
return nothing, err
}
watch := receiver.WatchPendingActionNotifications()

if changes, ok := <-watch.Changes(); ok {
return params.StringsWatchResult{
StringsWatcherId: registerFunc(watch),
Changes: changes,
}, nil
}
return nothing, watcher.EnsureErr(watch)
}
}

// WatchActionNotifications returns a StringsWatcher for observing incoming actions towards an actionreceiver.
// It's a helper function currently used by the uniter and by machineactions
// canAccess is passed in by the respective caller to provide authorization.
Expand Down
41 changes: 41 additions & 0 deletions apiserver/common/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,43 @@ func (s *actionsSuite) TestWatchOneActionReceiverNotifications(c *gc.C) {
}
}

func (s *actionsSuite) TestWatchPendingActionsForReceiver(c *gc.C) {
expectErr := errors.New("zwoosh")
registerFunc := func(facade.Resource) string { return "bambalam" }
tagToActionReceiver := common.TagToActionReceiverFn(makeFindEntity(map[string]state.Entity{
"machine-1": &fakeActionReceiver{watcher: &fakeWatcher{}},
"machine-2": &fakeActionReceiver{watcher: &fakeWatcher{err: expectErr}},
}))

watchOneFn := common.WatchPendingActionsForReceiver(tagToActionReceiver, registerFunc)

for i, test := range []struct {
tag names.Tag
err string
watcherId string
}{{
tag: names.NewMachineTag("0"),
err: "machine-0 not found",
}, {
tag: names.NewMachineTag("1"),
watcherId: "bambalam",
}, {
tag: names.NewMachineTag("2"),
err: "zwoosh",
}} {
c.Logf("test %d", i)
c.Logf(test.tag.String())
result, err := watchOneFn(test.tag)
if test.err != "" {
c.Check(err, gc.ErrorMatches, test.err)
c.Check(result, jc.DeepEquals, params.StringsWatchResult{})
} else {
c.Check(err, jc.ErrorIsNil)
c.Check(result.StringsWatcherId, gc.Equals, test.watcherId)
}
}
}

func makeWatchOne(mapping map[names.Tag]params.StringsWatchResult) func(names.Tag) (params.StringsWatchResult, error) {
return func(tag names.Tag) (params.StringsWatchResult, error) {
result, ok := mapping[tag]
Expand Down Expand Up @@ -305,6 +342,10 @@ func (mock fakeActionReceiver) WatchActionNotifications() state.StringsWatcher {
return mock.watcher
}

func (mock fakeActionReceiver) WatchPendingActionNotifications() state.StringsWatcher {
return mock.watcher
}

type fakeWatcher struct {
state.StringsWatcher
err error
Expand Down
2 changes: 1 addition & 1 deletion apiserver/facades/agent/machineactions/machineactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (f *Facade) FinishActions(args params.ActionExecutionResults) params.ErrorR
// incoming action calls to a machine.
func (f *Facade) WatchActionNotifications(args params.Entities) params.StringsWatchResults {
tagToActionReceiver := f.backend.TagToActionReceiverFn(f.backend.FindEntity)
watchOne := common.WatchOneActionReceiverNotifications(tagToActionReceiver, f.resources.Register)
watchOne := common.WatchPendingActionsForReceiver(tagToActionReceiver, f.resources.Register)
return common.WatchActionNotifications(args, f.accessMachine, watchOne)
}

Expand Down
43 changes: 42 additions & 1 deletion apiserver/facades/agent/uniter/uniter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
var logger = loggo.GetLogger("juju.apiserver.uniter")

// UniterAPI implements the latest version (v15) of the Uniter API,
// which adds the State and SetState calls.
// which adds the State, SetState calls and changes WatchActionNotifications to notify on action changes.
type UniterAPI struct {
*common.LifeGetter
*StatusAPI
Expand Down Expand Up @@ -1088,6 +1088,18 @@ func (u *UniterAPI) WatchActionNotifications(args params.Entities) (params.Strin
return common.WatchActionNotifications(args, canAccess, watchOne), nil
}

// WatchActionNotifications preserves previous functionality of the ActionNotifications watcher
// to only trigger once on creation of a pending Action.
func (u *UniterAPIV14) WatchActionNotifications(args params.Entities) (params.StringsWatchResults, error) {
tagToActionReceiver := common.TagToActionReceiverFn(u.st.FindEntity)
watchOne := common.WatchPendingActionsForReceiver(tagToActionReceiver, u.resources.Register)
canAccess, err := u.accessUnit()
if err != nil {
return params.StringsWatchResults{}, err
}
return common.WatchActionNotifications(args, canAccess, watchOne), nil
}

// ConfigSettings returns the complete set of application charm config
// settings available to each given unit.
func (u *UniterAPI) ConfigSettings(args params.Entities) (params.ConfigSettingsResults, error) {
Expand Down Expand Up @@ -1166,6 +1178,35 @@ func (u *UniterAPI) Relation(args params.RelationUnits) (params.RelationResults,
return result, nil
}

// ActionStatus returns the status of Actions by Tags passed in.
func (u *UniterAPI) ActionStatus(args params.Entities) (params.StringResults, error) {
canAccess, err := u.accessUnit()
if err != nil {
return params.StringResults{}, err
}

m, err := u.st.Model()
if err != nil {
return params.StringResults{}, errors.Trace(err)
}

results := params.StringResults{
Results: make([]params.StringResult, len(args.Entities)),
}

actionFn := common.AuthAndActionFromTagFn(canAccess, m.ActionByTag)
for k, entity := range args.Entities {
action, err := actionFn(entity.Tag)
if err != nil {
results.Results[k].Error = common.ServerError(err)
continue
}
results.Results[k].Result = string(action.Status())
}

return results, nil
}

// Actions returns the Actions by Tags passed and ensures that the Unit asking
// for them is the same Unit that has the Actions.
func (u *UniterAPI) Actions(args params.Entities) (params.ActionResults, error) {
Expand Down
97 changes: 97 additions & 0 deletions apiserver/facades/agent/uniter/uniter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,36 @@ func (s *uniterSuite) TestLogActionMessage(c *gc.C) {
c.Assert(messages[0].Timestamp(), gc.NotNil)
}

func (s *uniterSuite) TestLogActionMessageAborting(c *gc.C) {
operationID, err := s.Model.EnqueueOperation("a test")
c.Assert(err, jc.ErrorIsNil)
anAction, err := s.wordpressUnit.AddAction(operationID, "fakeaction", nil)
c.Assert(err, jc.ErrorIsNil)
c.Assert(anAction.Messages(), gc.HasLen, 0)
_, err = anAction.Begin()
c.Assert(err, jc.ErrorIsNil)

_, err = anAction.Cancel()
c.Assert(err, jc.ErrorIsNil)

args := params.ActionMessageParams{Messages: []params.EntityString{
{Tag: anAction.Tag().String(), Value: "hello"},
}}
result, err := s.uniter.LogActionsMessages(args)
c.Assert(err, jc.ErrorIsNil)
c.Assert(result, gc.DeepEquals, params.ErrorResults{
Results: []params.ErrorResult{
{},
},
})
anAction, err = s.Model.Action(anAction.Id())
c.Assert(err, jc.ErrorIsNil)
messages := anAction.Messages()
c.Assert(messages, gc.HasLen, 1)
c.Assert(messages[0].Message(), gc.Equals, "hello")
c.Assert(messages[0].Timestamp(), gc.NotNil)
}

func (s *uniterSuite) TestWatchActionNotifications(c *gc.C) {
err := s.wordpressUnit.SetCharmURL(s.wpCharm.URL())
c.Assert(err, jc.ErrorIsNil)
Expand Down Expand Up @@ -1257,6 +1287,13 @@ func (s *uniterSuite) TestWatchActionNotifications(c *gc.C) {
addedAction, err := s.wordpressUnit.AddAction(operationID, "fakeaction", nil)
c.Assert(err, jc.ErrorIsNil)

wc.AssertChange(addedAction.Id())

_, err = addedAction.Begin()
c.Assert(err, jc.ErrorIsNil)
wc.AssertNoChange()

_, err = addedAction.Cancel()
wc.AssertChange(addedAction.Id())
wc.AssertNoChange()
}
Expand Down Expand Up @@ -4962,3 +4999,63 @@ type fakeToken struct {
func (t *fakeToken) Check(int, interface{}) error {
return t.err
}

type uniterV14Suite struct {
uniterSuiteBase
uniterV14 *uniter.UniterAPIV14
}

var _ = gc.Suite(&uniterV14Suite{})

func (s *uniterV14Suite) SetUpTest(c *gc.C) {
s.uniterSuiteBase.SetUpTest(c)

uniterV14, err := uniter.NewUniterAPIV14(s.facadeContext())
c.Assert(err, jc.ErrorIsNil)
s.uniterV14 = uniterV14
}

func (s *uniterV14Suite) TestWatchActionNotificationsLegacy(c *gc.C) {
err := s.wordpressUnit.SetCharmURL(s.wpCharm.URL())
c.Assert(err, jc.ErrorIsNil)

c.Assert(s.resources.Count(), gc.Equals, 0)

args := params.Entities{Entities: []params.Entity{
{Tag: "unit-mysql-0"},
{Tag: "unit-wordpress-0"},
{Tag: "unit-foo-42"},
}}
result, err := s.uniterV14.WatchActionNotifications(args)
c.Assert(err, jc.ErrorIsNil)
c.Assert(result, gc.DeepEquals, params.StringsWatchResults{
Results: []params.StringsWatchResult{
{Error: apiservertesting.ErrUnauthorized},
{StringsWatcherId: "1"},
{Error: apiservertesting.ErrUnauthorized},
},
})

// Verify the resource was registered and stop when done
c.Assert(s.resources.Count(), gc.Equals, 1)
resource := s.resources.Get("1")
defer statetesting.AssertStop(c, resource)

// Check that the Watch has consumed the initial event ("returned" in
// the Watch call)
wc := statetesting.NewStringsWatcherC(c, s.State, resource.(state.StringsWatcher))
wc.AssertNoChange()

operationID, err := s.Model.EnqueueOperation("a test")
c.Assert(err, jc.ErrorIsNil)
addedAction, err := s.wordpressUnit.AddAction(operationID, "fakeaction", nil)
c.Assert(err, jc.ErrorIsNil)
wc.AssertChange(addedAction.Id())

_, err = addedAction.Begin()
c.Assert(err, jc.ErrorIsNil)
wc.AssertNoChange()

_, err = addedAction.Cancel()
wc.AssertNoChange()
}
2 changes: 1 addition & 1 deletion apiserver/facades/client/action/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (a *ActionAPI) cancel(arg params.Entities, compat bool) (params.ActionResul
currentResult.Error = common.ServerError(err)
continue
}
result, err := action.Finish(state.ActionResults{Status: state.ActionCancelled, Message: "action cancelled via the API"})
result, err := action.Cancel()
if err != nil {
currentResult.Error = common.ServerError(err)
continue
Expand Down

0 comments on commit c5b378e

Please sign in to comment.