Skip to content

Commit

Permalink
Wait for resource being deleted, retry reconciliation (#1621)
Browse files Browse the repository at this point in the history
Signed-off-by: Alena Varkockova <varkockova.a@gmail.com>
  • Loading branch information
alenkacz committed Jul 28, 2020
1 parent 4ada4e2 commit f78cf0d
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 7 deletions.
16 changes: 10 additions & 6 deletions pkg/apis/kudo/v1beta1/instance_types.go
Expand Up @@ -126,15 +126,19 @@ func (s *PhaseStatus) SetWithMessage(status ExecutionStatus, message string) {
}

func (s *PlanStatus) Set(status ExecutionStatus) {
s.LastUpdatedTimestamp = &metav1.Time{Time: time.Now()}
s.Status = status
s.Message = ""
if s.Status != status {
s.LastUpdatedTimestamp = &metav1.Time{Time: time.Now()}
s.Status = status
s.Message = ""
}
}

func (s *PlanStatus) SetWithMessage(status ExecutionStatus, message string) {
s.LastUpdatedTimestamp = &metav1.Time{Time: time.Now()}
s.Status = status
s.Message = message
if s.Status != status || s.Message != message {
s.LastUpdatedTimestamp = &metav1.Time{Time: time.Now()}
s.Status = status
s.Message = message
}
}

// ExecutionStatus captures the state of the rollout.
Expand Down
21 changes: 20 additions & 1 deletion pkg/controller/instance/instance_controller.go
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"log"
"math"
"reflect"
"time"

Expand Down Expand Up @@ -255,7 +256,25 @@ func (r *Reconciler) Reconcile(request ctrl.Request) (ctrl.Result, error) {
r.Recorder.Event(instance, "Normal", "PlanFinished", fmt.Sprintf("Execution of plan %s finished with status %s", newStatus.Name, newStatus.Status))
}

return reconcile.Result{}, nil
return computeTheReconcileResult(instance, time.Now), nil
}

// computeTheReconcileResult decides whether retry reconciliation or not
// if plan was finished, reconciliation is not retried
// for others it uses LastUpdatedTimestamp of a current plan
// for plan updated less than a minute ago, the backoff would be a second, then it increases linearly for every additional minute of plan runtime
// maximum backoff is one minute
//
// all this is necessary because we have a health check for deletion (waiting for deleted object disappear from client cache)
// and we cannot setup watches to all types users can create within KUDO (because we don't know ALL the types)
// a pragmatic solution that prevents stalling is periodically schedule reconciliation for unfinished plan with a backoff
func computeTheReconcileResult(instance *kudov1beta1.Instance, timeNow func() time.Time) reconcile.Result {
if instance.Spec.PlanExecution.Status.IsTerminal() {
return reconcile.Result{}
}
lastUpdatedTime := instance.Status.PlanStatus[instance.Spec.PlanExecution.PlanName].LastUpdatedTimestamp.Time
secondsBackoffCount := int(math.Min(59, timeNow().Sub(lastUpdatedTime).Minutes())) + 1
return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(secondsBackoffCount) * time.Second}
}

func (r *Reconciler) resolveDependencies(i *kudov1beta1.Instance, ov *kudov1beta1.OperatorVersion) error {
Expand Down
58 changes: 58 additions & 0 deletions pkg/controller/instance/instance_controller_test.go
Expand Up @@ -6,6 +6,8 @@ import (
"testing"
"time"

"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -588,3 +590,59 @@ func Test_ensurePlanStatusInitialized(t *testing.T) {
})
}
}

func Test_retryReconciliation(t *testing.T) {
instance := &v1beta1.Instance{
TypeMeta: metav1.TypeMeta{APIVersion: "kudo.dev/v1beta1", Kind: "Instance"},
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "test"},
Spec: v1beta1.InstanceSpec{
PlanExecution: v1beta1.PlanExecution{
Status: v1beta1.ExecutionComplete,
},
},
Status: v1beta1.InstanceStatus{},
}
timeNow := time.Now()
deployPlanName := "deploy"

tests := []struct {
name string
i *v1beta1.Instance
want reconcile.Result
}{
{"finished plan", instance, reconcile.Result{}},
{"just started plan", func() *v1beta1.Instance {
i := instance.DeepCopy()
i.Spec.PlanExecution.Status = v1beta1.ExecutionInProgress
i.Spec.PlanExecution.PlanName = deployPlanName
i.Status.PlanStatus = map[string]v1beta1.PlanStatus{
deployPlanName: {LastUpdatedTimestamp: &metav1.Time{Time: timeNow}},
}
return i
}(), reconcile.Result{Requeue: true, RequeueAfter: 1 * time.Second}},
{"2 minutes old update", func() *v1beta1.Instance {
i := instance.DeepCopy()
i.Spec.PlanExecution.Status = v1beta1.ExecutionInProgress
i.Spec.PlanExecution.PlanName = deployPlanName
i.Status.PlanStatus = map[string]v1beta1.PlanStatus{
deployPlanName: {LastUpdatedTimestamp: &metav1.Time{Time: timeNow.Add(-2 * time.Minute)}},
}
return i
}(), reconcile.Result{Requeue: true, RequeueAfter: 3 * time.Second}},
{"long stalled plan", func() *v1beta1.Instance {
i := instance.DeepCopy()
i.Spec.PlanExecution.Status = v1beta1.ExecutionInProgress
i.Spec.PlanExecution.PlanName = deployPlanName
i.Status.PlanStatus = map[string]v1beta1.PlanStatus{
deployPlanName: {LastUpdatedTimestamp: &metav1.Time{Time: timeNow.Add(-2 * time.Hour)}},
}
return i
}(), reconcile.Result{Requeue: true, RequeueAfter: 60 * time.Second}},
}
for _, tt := range tests {
result := computeTheReconcileResult(tt.i, func() time.Time { return timeNow })
if result != tt.want {
t.Errorf("%s: expected %v but got %v", tt.name, tt.want, result)
}
}
}
21 changes: 21 additions & 0 deletions pkg/engine/health/health.go
@@ -1,20 +1,26 @@
package health

import (
"context"
"errors"
"fmt"
"log"
"reflect"

"k8s.io/client-go/discovery"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubectl/pkg/polymorphichelpers"

kudov1beta1 "github.com/kudobuilder/kudo/pkg/apis/kudo/v1beta1"
"github.com/kudobuilder/kudo/pkg/engine"
"github.com/kudobuilder/kudo/pkg/engine/resource"
"github.com/kudobuilder/kudo/pkg/kudoctl/clog"
)

Expand All @@ -28,6 +34,21 @@ func isJobTerminallyFailed(job *batchv1.Job) (bool, string) {
return false, ""
}

func IsDeleted(client client.Client, discovery discovery.CachedDiscoveryInterface, objs []runtime.Object) error {
for _, obj := range objs {
key, err := resource.ObjectKeyFromObject(obj, discovery)
if err != nil {
return err
}
newObj := obj.DeepCopyObject()
err = client.Get(context.TODO(), key, newObj)
if !apierrors.IsNotFound(err) {
return fmt.Errorf("%s/%s is not deleted", key.Namespace, key.Name)
}
}
return nil
}

// IsHealthy returns whether an object is healthy. Must be implemented for each type.
func IsHealthy(obj runtime.Object) error {
if obj == nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/engine/task/task_delete.go
Expand Up @@ -3,6 +3,9 @@ package task
import (
"context"
"fmt"
"log"

"github.com/kudobuilder/kudo/pkg/engine/health"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -52,6 +55,11 @@ func (dt DeleteTask) Run(ctx Context) (bool, error) {
}

// 6. - Check health: always true for Delete task -
err = health.IsDeleted(ctx.Client, ctx.Discovery, objs)
if err != nil {
log.Printf("TaskExecution: %v", err)
return false, nil
}
return true, nil
}

Expand Down

0 comments on commit f78cf0d

Please sign in to comment.