Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/apply/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
StatusType
PruneType
DeleteType
WaitType
)

// Event is the type of the objects that will be returned through
Expand Down Expand Up @@ -58,6 +59,9 @@ type Event struct {
// DeleteEvent contains information about object that have been
// deleted.
DeleteEvent DeleteEvent

// WaitEvent contains information about any errors encountered in a WaitTask.
WaitEvent WaitEvent
}

type InitEvent struct {
Expand Down Expand Up @@ -85,6 +89,11 @@ type ErrorEvent struct {
Err error
}

type WaitEvent struct {
GroupName string
Error error
}

//go:generate stringer -type=ActionGroupEventType
type ActionGroupEventType int

Expand Down
24 changes: 3 additions & 21 deletions pkg/apply/taskrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,9 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task,
}
}
// A message on the taskChannel means that the current task
// has either completed or failed. If it has failed, we return
// the error. If the abort flag is true, which means something
// has either completed or failed.
// If it has failed, we return the error.
// If the abort flag is true, which means something
// else has gone wrong and we are waiting for the current task to
// finish, we exit.
// If everything is ok, we fetch and start the next task.
Expand All @@ -238,7 +239,6 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task,
},
}
if msg.Err != nil {
b.amendTimeoutError(taskContext, msg.Err)
return msg.Err
}
if abort {
Expand All @@ -262,24 +262,6 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task,
}
}

func (b *baseRunner) amendTimeoutError(taskContext *TaskContext, err error) {
if timeoutErr, ok := err.(*TimeoutError); ok {
var timedOutResources []TimedOutResource
for _, id := range timeoutErr.Identifiers {
result := taskContext.ResourceCache().Get(id)
if timeoutErr.Condition.Meets(result.Status) {
continue
}
timedOutResources = append(timedOutResources, TimedOutResource{
Identifier: id,
Status: result.Status,
Message: result.StatusMessage,
})
}
timeoutErr.TimedOutResources = timedOutResources
}
}

// completeIfWaitTask checks if the current task is a wait task. If so,
// we invoke the complete function to complete it.
func completeIfWaitTask(currentTask Task, taskContext *TaskContext) {
Expand Down
37 changes: 22 additions & 15 deletions pkg/apply/taskrunner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ func TestBaseRunner(t *testing.T) {
statusEventsDelay time.Duration
statusEvents []pollevent.Event
expectedEventTypes []event.Type
expectedError error
expectedTimedOutResources []TimedOutResource
expectedErrorMsg string
expectedTimeoutErrorMsg string
}{
"wait task runs until condition is met": {
tasks: []Task{
Expand Down Expand Up @@ -112,17 +111,19 @@ func TestBaseRunner(t *testing.T) {
},
},
expectedEventTypes: []event.Type{
event.ActionGroupType,
event.StatusType,
event.WaitType,
event.ActionGroupType,
},
expectedError: &TimeoutError{},
expectedTimedOutResources: []TimedOutResource{
{
Identifier: depID,
Status: status.UnknownStatus,
Message: "resource not cached",
},
},
expectedErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent",
expectedTimeoutErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent",
},
"wait task times out eventually (InProgress)": {
tasks: []Task{
Expand All @@ -147,16 +148,19 @@ func TestBaseRunner(t *testing.T) {
},
},
expectedEventTypes: []event.Type{
event.ActionGroupType,
event.StatusType,
event.StatusType,
event.WaitType,
event.ActionGroupType,
},
expectedError: &TimeoutError{},
expectedTimedOutResources: []TimedOutResource{
{
Identifier: depID,
Status: status.InProgressStatus,
},
},
expectedErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent",
expectedTimeoutErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent",
},
"tasks run in order": {
tasks: []Task{
Expand Down Expand Up @@ -244,18 +248,13 @@ func TestBaseRunner(t *testing.T) {
close(eventChannel)
wg.Wait()

if tc.expectedError != nil {
assert.IsType(t, tc.expectedError, err)
if timeoutError, ok := err.(*TimeoutError); ok {
assert.ElementsMatch(t, tc.expectedTimedOutResources,
timeoutError.TimedOutResources)
assert.Equal(t, timeoutError.Error(), tc.expectedErrorMsg)
}
return
} else if err != nil {
if err != nil {
t.Errorf("expected no error, but got %v", err)
}

for _, event := range events {
t.Log(event)
}
if want, got := len(tc.expectedEventTypes), len(events); want != got {
t.Errorf("expected %d events, but got %d", want, got)
}
Expand All @@ -265,6 +264,14 @@ func TestBaseRunner(t *testing.T) {
t.Errorf("expected event type %s, but got %s",
want, got)
}
if e.Type == event.WaitType {
err := e.WaitEvent.Error
if timeoutError, ok := err.(*TimeoutError); ok {
assert.ElementsMatch(t, tc.expectedTimedOutResources,
timeoutError.TimedOutResources)
assert.Equal(t, timeoutError.Error(), tc.expectedTimeoutErrorMsg)
}
}
}
})
}
Expand Down
39 changes: 32 additions & 7 deletions pkg/apply/taskrunner/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (w *WaitTask) Start(taskContext *TaskContext) {

// setTimer creates the timer with the timeout value taken from
// the WaitTask struct. Once the timer expires, it will send
// a message on the TaskChannel provided in the taskContext.
// a message on the EventChannel provided in the taskContext.
func (w *WaitTask) setTimer(taskContext *TaskContext) {
timer := time.NewTimer(w.Timeout)
go func() {
Expand All @@ -111,16 +111,23 @@ func (w *WaitTask) setTimer(taskContext *TaskContext) {
// Timeout is cancelled.
<-timer.C
select {
// We only send the taskResult if no one has gotten
// We only send the TimeoutError to the eventChannel if no one has gotten
// to the token first.
case <-w.token:
taskContext.TaskChannel() <- TaskResult{
Err: &TimeoutError{
Identifiers: w.Ids,
Timeout: w.Timeout,
Condition: w.Condition,
err := &TimeoutError{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I like that we no longer need to amend certain error types in the runner itself. In this case, it seems like we can just create the TimeoutError directly without having to have the two steps here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amendTimeoutError should probably be refactored to just have a function that returns the IDs that timed out, like pending except also strip out the ones that are Current or NotFound (for apply or prune).

Identifiers: w.Ids,
Timeout: w.Timeout,
Condition: w.Condition,
}
amendTimeoutError(taskContext, err)
taskContext.EventChannel() <- event.Event{
Type: event.WaitType,
WaitEvent: event.WaitEvent{
GroupName: w.Name(),
Error: err,
},
}
taskContext.TaskChannel() <- TaskResult{}
default:
return
}
Expand All @@ -130,6 +137,24 @@ func (w *WaitTask) setTimer(taskContext *TaskContext) {
}
}

func amendTimeoutError(taskContext *TaskContext, err error) {
if timeoutErr, ok := err.(*TimeoutError); ok {
var timedOutResources []TimedOutResource
for _, id := range timeoutErr.Identifiers {
result := taskContext.ResourceCache().Get(id)
if timeoutErr.Condition.Meets(result.Status) {
continue
}
timedOutResources = append(timedOutResources, TimedOutResource{
Identifier: id,
Status: result.Status,
Message: result.StatusMessage,
})
}
timeoutErr.TimedOutResources = timedOutResources
}
}

// checkCondition checks whether the condition set in the task
// is currently met given the status of resources in the cache.
func (w *WaitTask) checkCondition(taskContext *TaskContext) bool {
Expand Down
16 changes: 12 additions & 4 deletions pkg/apply/taskrunner/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
)

func TestWaitTask_TimeoutTriggered(t *testing.T) {
task := NewWaitTask("wait", object.ObjMetadataSet{}, AllCurrent,
taskName := "wait"
task := NewWaitTask(taskName, object.ObjMetadataSet{}, AllCurrent,
2*time.Second, testutil.NewFakeRESTMapper())

eventChannel := make(chan event.Event)
Expand All @@ -28,9 +29,16 @@ func TestWaitTask_TimeoutTriggered(t *testing.T) {
timer := time.NewTimer(3 * time.Second)

select {
case res := <-taskContext.TaskChannel():
if _, ok := IsTimeoutError(res.Err); !ok {
t.Errorf("expected timeout error, but got %v", res.Err)
case e := <-taskContext.EventChannel():
if e.Type != event.WaitType {
t.Errorf("expected a WaitType event, but got a %v event", e.Type)
}
if e.WaitEvent.GroupName != taskName {
t.Errorf("expected WaitEvent.GroupName = %q, but got %q", taskName, e.WaitEvent.GroupName)
}
err := e.WaitEvent.Error
if _, ok := IsTimeoutError(err); !ok {
t.Errorf("expected timeout error, but got %v", err)
}
return
case <-timer.C:
Expand Down
33 changes: 33 additions & 0 deletions pkg/testutil/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type ExpEvent struct {
StatusEvent *ExpStatusEvent
PruneEvent *ExpPruneEvent
DeleteEvent *ExpDeleteEvent
WaitEvent *ExpWaitEvent
}

type ExpInitEvent struct {
Expand Down Expand Up @@ -67,6 +68,11 @@ type ExpDeleteEvent struct {
Error error
}

type ExpWaitEvent struct {
GroupName string
Error error
}

func VerifyEvents(expEvents []ExpEvent, events []event.Event) error {
if len(expEvents) == 0 && len(events) == 0 {
return nil
Expand Down Expand Up @@ -234,6 +240,24 @@ func isMatch(ee ExpEvent, e event.Event) bool {
return de.Error != nil
}
return de.Error == nil

case event.WaitType:
wee := ee.WaitEvent
if wee == nil {
return true
}
we := e.WaitEvent

if wee.GroupName != "" {
if wee.GroupName != we.GroupName {
return false
}
}

if wee.Error != nil {
return cmp.Equal(wee.Error, we.Error, cmpopts.EquateErrors())
}
return we.Error == nil
}
return true
}
Expand Down Expand Up @@ -317,6 +341,15 @@ func EventToExpEvent(e event.Event) ExpEvent {
Error: e.DeleteEvent.Error,
},
}

case event.WaitType:
return ExpEvent{
EventType: event.WaitType,
WaitEvent: &ExpWaitEvent{
GroupName: e.WaitEvent.GroupName,
Error: e.WaitEvent.Error,
},
}
}
return ExpEvent{}
}
Expand Down