Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Facade/state work for action abort #11214

Merged
merged 1 commit into from
Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 45 additions & 0 deletions api/uniter/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,55 @@ func (s *actionSuite) TestActionComplete(c *gc.C) {
action, err := s.uniterSuite.wordpressUnit.AddAction(operationID, "fakeaction", nil)
c.Assert(err, jc.ErrorIsNil)

err = s.uniter.ActionBegin(action.ActionTag())
c.Assert(err, jc.ErrorIsNil)

actionResult := map[string]interface{}{"output": "it worked!"}
err = s.uniter.ActionFinish(action.ActionTag(), params.ActionCompleted, actionResult, "")
c.Assert(err, jc.ErrorIsNil)

completed, err = s.uniterSuite.wordpressUnit.CompletedActions()
c.Assert(err, jc.ErrorIsNil)
c.Assert(len(completed), gc.Equals, 1)
c.Assert(completed[0].Status(), gc.Equals, state.ActionCompleted)
res, errstr := completed[0].Results()
c.Assert(errstr, gc.Equals, "")
c.Assert(res, gc.DeepEquals, actionResult)
c.Assert(completed[0].Name(), gc.Equals, "fakeaction")
operation, err := s.Model.Operation(operationID)
c.Assert(err, jc.ErrorIsNil)
c.Assert(operation.Status(), gc.Equals, state.ActionCompleted)
}

func (s *actionSuite) TestActionStatus(c *gc.C) {
completed, err := s.uniterSuite.wordpressUnit.CompletedActions()
c.Assert(err, jc.ErrorIsNil)
c.Assert(completed, gc.DeepEquals, ([]state.Action)(nil))

operationID, err := s.Model.EnqueueOperation("a test")
c.Assert(err, jc.ErrorIsNil)
action, err := s.uniterSuite.wordpressUnit.AddAction(operationID, "fakeaction", nil)
c.Assert(err, jc.ErrorIsNil)

status, err := s.uniter.ActionStatus(action.ActionTag())
c.Assert(err, jc.ErrorIsNil)
c.Assert(status, gc.Equals, params.ActionPending)

err = s.uniter.ActionBegin(action.ActionTag())
c.Assert(err, jc.ErrorIsNil)

status, err = s.uniter.ActionStatus(action.ActionTag())
c.Assert(err, jc.ErrorIsNil)
c.Assert(status, gc.Equals, params.ActionRunning)

actionResult := map[string]interface{}{"output": "it worked!"}
err = s.uniter.ActionFinish(action.ActionTag(), params.ActionCompleted, actionResult, "")
c.Assert(err, jc.ErrorIsNil)

status, err = s.uniter.ActionStatus(action.ActionTag())
c.Assert(err, jc.ErrorIsNil)
c.Assert(status, gc.Equals, params.ActionCompleted)

completed, err = s.uniterSuite.wordpressUnit.CompletedActions()
c.Assert(err, jc.ErrorIsNil)
c.Assert(len(completed), gc.Equals, 1)
Expand Down
27 changes: 27 additions & 0 deletions api/uniter/uniter.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,33 @@ func (st *State) getOneAction(tag *names.ActionTag) (params.ActionResult, error)
return result, nil
}

// ActionStatus provides the status of a single action.
func (st *State) ActionStatus(tag names.ActionTag) (string, error) {
args := params.Entities{
Entities: []params.Entity{
{Tag: tag.String()},
},
}

var results params.StringResults
err := st.facade.FacadeCall("ActionStatus", args, &results)
if err != nil {
return "", err
}

if len(results.Results) > 1 {
return "", fmt.Errorf("expected only 1 action query result, got %d", len(results.Results))
}

// handle server errors
result := results.Results[0]
if err := result.Error; err != nil {
return "", err
}

return result.Result, nil
}

// Unit provides access to methods of a state.Unit through the facade.
func (st *State) Unit(tag names.UnitTag) (*Unit, error) {
unit := &Unit{
Expand Down
31 changes: 30 additions & 1 deletion apiserver/common/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func ParamsActionExecutionResultsToStateActionResults(arg params.ActionExecution
status = state.ActionFailed
case params.ActionPending:
status = state.ActionPending
case params.ActionAborting:
status = state.ActionAborting
case params.ActionAborted:
status = state.ActionAborted
default:
return state.ActionResults{}, errors.Errorf("unrecognized action status '%s'", arg.Status)
}
Expand Down Expand Up @@ -162,7 +166,8 @@ func Actions(args params.Entities, actionFn func(string) (state.Action, error))
return results
}

// WatchOneActionReceiverNotifications to create a watcher for one receiver.
// WatchOneActionReceiverNotifications returns a function for creating a
// watcher on all action notifications (action adds + changes) for one receiver.
// It needs a tagToActionReceiver function and a registerFunc to register
// resources.
// It's a helper function currently used by the uniter and by machineactions
Expand All @@ -185,6 +190,30 @@ func WatchOneActionReceiverNotifications(tagToActionReceiver func(tag string) (s
}
}

// WatchPendingActionsForReceiver returns a function for creating a
// watcher on new pending Actions for one receiver.
// It needs a tagToActionReceiver function and a registerFunc to register
// resources.
// It's a helper function currently used by the uniter and by machineactions
func WatchPendingActionsForReceiver(tagToActionReceiver func(tag string) (state.ActionReceiver, error), registerFunc func(r facade.Resource) string) func(names.Tag) (params.StringsWatchResult, error) {
return func(tag names.Tag) (params.StringsWatchResult, error) {
nothing := params.StringsWatchResult{}
receiver, err := tagToActionReceiver(tag.String())
if err != nil {
return nothing, err
}
watch := receiver.WatchPendingActionNotifications()

if changes, ok := <-watch.Changes(); ok {
return params.StringsWatchResult{
StringsWatcherId: registerFunc(watch),
Changes: changes,
}, nil
}
return nothing, watcher.EnsureErr(watch)
}
}

// WatchActionNotifications returns a StringsWatcher for observing incoming actions towards an actionreceiver.
// It's a helper function currently used by the uniter and by machineactions
// canAccess is passed in by the respective caller to provide authorization.
Expand Down
41 changes: 41 additions & 0 deletions apiserver/common/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,43 @@ func (s *actionsSuite) TestWatchOneActionReceiverNotifications(c *gc.C) {
}
}

func (s *actionsSuite) TestWatchPendingActionsForReceiver(c *gc.C) {
expectErr := errors.New("zwoosh")
registerFunc := func(facade.Resource) string { return "bambalam" }
tagToActionReceiver := common.TagToActionReceiverFn(makeFindEntity(map[string]state.Entity{
"machine-1": &fakeActionReceiver{watcher: &fakeWatcher{}},
"machine-2": &fakeActionReceiver{watcher: &fakeWatcher{err: expectErr}},
}))

watchOneFn := common.WatchPendingActionsForReceiver(tagToActionReceiver, registerFunc)

for i, test := range []struct {
tag names.Tag
err string
watcherId string
}{{
tag: names.NewMachineTag("0"),
err: "machine-0 not found",
}, {
tag: names.NewMachineTag("1"),
watcherId: "bambalam",
}, {
tag: names.NewMachineTag("2"),
err: "zwoosh",
}} {
c.Logf("test %d", i)
c.Logf(test.tag.String())
result, err := watchOneFn(test.tag)
if test.err != "" {
c.Check(err, gc.ErrorMatches, test.err)
c.Check(result, jc.DeepEquals, params.StringsWatchResult{})
} else {
c.Check(err, jc.ErrorIsNil)
c.Check(result.StringsWatcherId, gc.Equals, test.watcherId)
}
}
}

func makeWatchOne(mapping map[names.Tag]params.StringsWatchResult) func(names.Tag) (params.StringsWatchResult, error) {
return func(tag names.Tag) (params.StringsWatchResult, error) {
result, ok := mapping[tag]
Expand Down Expand Up @@ -305,6 +342,10 @@ func (mock fakeActionReceiver) WatchActionNotifications() state.StringsWatcher {
return mock.watcher
}

func (mock fakeActionReceiver) WatchPendingActionNotifications() state.StringsWatcher {
return mock.watcher
}

type fakeWatcher struct {
state.StringsWatcher
err error
Expand Down
2 changes: 1 addition & 1 deletion apiserver/facades/agent/machineactions/machineactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (f *Facade) FinishActions(args params.ActionExecutionResults) params.ErrorR
// incoming action calls to a machine.
func (f *Facade) WatchActionNotifications(args params.Entities) params.StringsWatchResults {
tagToActionReceiver := f.backend.TagToActionReceiverFn(f.backend.FindEntity)
watchOne := common.WatchOneActionReceiverNotifications(tagToActionReceiver, f.resources.Register)
watchOne := common.WatchPendingActionsForReceiver(tagToActionReceiver, f.resources.Register)
return common.WatchActionNotifications(args, f.accessMachine, watchOne)
}

Expand Down
43 changes: 42 additions & 1 deletion apiserver/facades/agent/uniter/uniter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
var logger = loggo.GetLogger("juju.apiserver.uniter")

// UniterAPI implements the latest version (v15) of the Uniter API,
// which adds the State and SetState calls.
// which adds the State, SetState calls and changes WatchActionNotifications to notify on action changes.
type UniterAPI struct {
*common.LifeGetter
*StatusAPI
Expand Down Expand Up @@ -1088,6 +1088,18 @@ func (u *UniterAPI) WatchActionNotifications(args params.Entities) (params.Strin
return common.WatchActionNotifications(args, canAccess, watchOne), nil
}

// WatchActionNotifications preserves previous functionality of the ActionNotifications watcher
// to only trigger once on creation of a pending Action.
func (u *UniterAPIV14) WatchActionNotifications(args params.Entities) (params.StringsWatchResults, error) {
tagToActionReceiver := common.TagToActionReceiverFn(u.st.FindEntity)
watchOne := common.WatchPendingActionsForReceiver(tagToActionReceiver, u.resources.Register)
canAccess, err := u.accessUnit()
if err != nil {
return params.StringsWatchResults{}, err
}
return common.WatchActionNotifications(args, canAccess, watchOne), nil
}

// ConfigSettings returns the complete set of application charm config
// settings available to each given unit.
func (u *UniterAPI) ConfigSettings(args params.Entities) (params.ConfigSettingsResults, error) {
Expand Down Expand Up @@ -1166,6 +1178,35 @@ func (u *UniterAPI) Relation(args params.RelationUnits) (params.RelationResults,
return result, nil
}

// ActionStatus returns the status of Actions by Tags passed in.
func (u *UniterAPI) ActionStatus(args params.Entities) (params.StringResults, error) {
canAccess, err := u.accessUnit()
if err != nil {
return params.StringResults{}, err
}

m, err := u.st.Model()
if err != nil {
return params.StringResults{}, errors.Trace(err)
}

results := params.StringResults{
Results: make([]params.StringResult, len(args.Entities)),
}

actionFn := common.AuthAndActionFromTagFn(canAccess, m.ActionByTag)
for k, entity := range args.Entities {
action, err := actionFn(entity.Tag)
if err != nil {
results.Results[k].Error = common.ServerError(err)
continue
}
results.Results[k].Result = string(action.Status())
}

return results, nil
}

// Actions returns the Actions by Tags passed and ensures that the Unit asking
// for them is the same Unit that has the Actions.
func (u *UniterAPI) Actions(args params.Entities) (params.ActionResults, error) {
Expand Down
97 changes: 97 additions & 0 deletions apiserver/facades/agent/uniter/uniter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,36 @@ func (s *uniterSuite) TestLogActionMessage(c *gc.C) {
c.Assert(messages[0].Timestamp(), gc.NotNil)
}

func (s *uniterSuite) TestLogActionMessageAborting(c *gc.C) {
operationID, err := s.Model.EnqueueOperation("a test")
c.Assert(err, jc.ErrorIsNil)
anAction, err := s.wordpressUnit.AddAction(operationID, "fakeaction", nil)
c.Assert(err, jc.ErrorIsNil)
c.Assert(anAction.Messages(), gc.HasLen, 0)
_, err = anAction.Begin()
c.Assert(err, jc.ErrorIsNil)

_, err = anAction.Cancel()
c.Assert(err, jc.ErrorIsNil)

args := params.ActionMessageParams{Messages: []params.EntityString{
{Tag: anAction.Tag().String(), Value: "hello"},
}}
result, err := s.uniter.LogActionsMessages(args)
c.Assert(err, jc.ErrorIsNil)
c.Assert(result, gc.DeepEquals, params.ErrorResults{
Results: []params.ErrorResult{
{},
},
})
anAction, err = s.Model.Action(anAction.Id())
c.Assert(err, jc.ErrorIsNil)
messages := anAction.Messages()
c.Assert(messages, gc.HasLen, 1)
c.Assert(messages[0].Message(), gc.Equals, "hello")
c.Assert(messages[0].Timestamp(), gc.NotNil)
}

func (s *uniterSuite) TestWatchActionNotifications(c *gc.C) {
err := s.wordpressUnit.SetCharmURL(s.wpCharm.URL())
c.Assert(err, jc.ErrorIsNil)
Expand Down Expand Up @@ -1257,6 +1287,13 @@ func (s *uniterSuite) TestWatchActionNotifications(c *gc.C) {
addedAction, err := s.wordpressUnit.AddAction(operationID, "fakeaction", nil)
c.Assert(err, jc.ErrorIsNil)

wc.AssertChange(addedAction.Id())

_, err = addedAction.Begin()
c.Assert(err, jc.ErrorIsNil)
wc.AssertNoChange()

_, err = addedAction.Cancel()
wc.AssertChange(addedAction.Id())
wc.AssertNoChange()
}
Expand Down Expand Up @@ -4962,3 +4999,63 @@ type fakeToken struct {
func (t *fakeToken) Check(int, interface{}) error {
return t.err
}

type uniterV14Suite struct {
uniterSuiteBase
uniterV14 *uniter.UniterAPIV14
}

var _ = gc.Suite(&uniterV14Suite{})

func (s *uniterV14Suite) SetUpTest(c *gc.C) {
s.uniterSuiteBase.SetUpTest(c)

uniterV14, err := uniter.NewUniterAPIV14(s.facadeContext())
c.Assert(err, jc.ErrorIsNil)
s.uniterV14 = uniterV14
}

func (s *uniterV14Suite) TestWatchActionNotificationsLegacy(c *gc.C) {
err := s.wordpressUnit.SetCharmURL(s.wpCharm.URL())
c.Assert(err, jc.ErrorIsNil)

c.Assert(s.resources.Count(), gc.Equals, 0)

args := params.Entities{Entities: []params.Entity{
{Tag: "unit-mysql-0"},
{Tag: "unit-wordpress-0"},
{Tag: "unit-foo-42"},
}}
result, err := s.uniterV14.WatchActionNotifications(args)
c.Assert(err, jc.ErrorIsNil)
c.Assert(result, gc.DeepEquals, params.StringsWatchResults{
Results: []params.StringsWatchResult{
{Error: apiservertesting.ErrUnauthorized},
{StringsWatcherId: "1"},
{Error: apiservertesting.ErrUnauthorized},
},
})

// Verify the resource was registered and stop when done
c.Assert(s.resources.Count(), gc.Equals, 1)
resource := s.resources.Get("1")
defer statetesting.AssertStop(c, resource)

// Check that the Watch has consumed the initial event ("returned" in
// the Watch call)
wc := statetesting.NewStringsWatcherC(c, s.State, resource.(state.StringsWatcher))
wc.AssertNoChange()

operationID, err := s.Model.EnqueueOperation("a test")
c.Assert(err, jc.ErrorIsNil)
addedAction, err := s.wordpressUnit.AddAction(operationID, "fakeaction", nil)
c.Assert(err, jc.ErrorIsNil)
wc.AssertChange(addedAction.Id())

_, err = addedAction.Begin()
c.Assert(err, jc.ErrorIsNil)
wc.AssertNoChange()

_, err = addedAction.Cancel()
wc.AssertNoChange()
}
2 changes: 1 addition & 1 deletion apiserver/facades/client/action/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (a *ActionAPI) cancel(arg params.Entities, compat bool) (params.ActionResul
currentResult.Error = common.ServerError(err)
continue
}
result, err := action.Finish(state.ActionResults{Status: state.ActionCancelled, Message: "action cancelled via the API"})
result, err := action.Cancel()
if err != nil {
currentResult.Error = common.ServerError(err)
continue
Expand Down