diff --git a/cmd/e2e/broken_stack_test.go b/cmd/e2e/broken_stack_test.go new file mode 100644 index 00000000..137263ea --- /dev/null +++ b/cmd/e2e/broken_stack_test.go @@ -0,0 +1,92 @@ +package main + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +func TestBrokenStacks(t *testing.T) { + t.Parallel() + + stacksetName := "stackset-broken-stacks" + factory := NewTestStacksetSpecFactory(stacksetName).Ingress().StackGC(1, 30) + + firstVersion := "v1" + firstStack := fmt.Sprintf("%s-%s", stacksetName, firstVersion) + spec := factory.Create(firstVersion) + err := createStackSet(stacksetName, 0, spec) + require.NoError(t, err) + _, err = waitForStack(t, stacksetName, firstVersion) + require.NoError(t, err) + + unhealthyVersion := "v2" + unhealthyStack := fmt.Sprintf("%s-%s", stacksetName, unhealthyVersion) + spec = factory.Create(unhealthyVersion) + spec.StackTemplate.Spec.Service.Ports = []v1.ServicePort{ + { + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromString("foobar"), + }, + } + err = updateStackset(stacksetName, spec) + require.NoError(t, err) + _, err = waitForStack(t, stacksetName, unhealthyVersion) + require.NoError(t, err) + + _, err = waitForIngress(t, stacksetName) + require.NoError(t, err) + + initialWeights := map[string]float64{firstStack: 100} + err = trafficWeightsUpdated(t, stacksetName, weightKindActual, initialWeights, nil).await() + require.NoError(t, err) + + // Switch traffic to the second stack, this should fail + desiredWeights := map[string]float64{unhealthyStack: 100} + err = setDesiredTrafficWeights(stacksetName, desiredWeights) + require.NoError(t, err) + err = trafficWeightsUpdated(t, stacksetName, weightKindActual, desiredWeights, nil).await() + require.Error(t, err) + + // Create a healthy stack + healthyVersion := "v3" + healthyStack := fmt.Sprintf("%s-%s", stacksetName, healthyVersion) + spec = factory.Create(healthyVersion) + err = updateStackset(stacksetName, spec) + require.NoError(t, err) + _, err = waitForStack(t, stacksetName, healthyVersion) + require.NoError(t, err) + + healthyWeights := map[string]float64{healthyStack: 100} + err = setDesiredTrafficWeights(stacksetName, healthyWeights) + require.NoError(t, err) + err = trafficWeightsUpdated(t, stacksetName, weightKindActual, healthyWeights, nil).await() + require.NoError(t, err) + + // Create another healthy stack so we can test GC + finalVersion := "v4" + finalStack := fmt.Sprintf("%s-%s", stacksetName, finalVersion) + spec = factory.Create(finalVersion) + err = updateStackset(stacksetName, spec) + require.NoError(t, err) + _, err = waitForStack(t, stacksetName, finalVersion) + require.NoError(t, err) + + finalWeights := map[string]float64{finalStack: 100} + err = setDesiredTrafficWeights(stacksetName, finalWeights) + require.NoError(t, err) + err = trafficWeightsUpdated(t, stacksetName, weightKindActual, finalWeights, nil).await() + require.NoError(t, err) + + // Check that the unhealthy stack was deleted + for _, stack := range []string{firstStack, unhealthyStack} { + err := resourceDeleted(t, "stack", stack, stackInterface()).withTimeout(time.Second * 60).await() + require.NoError(t, err) + } + +} diff --git a/controller/stackset.go b/controller/stackset.go index 49b11bb3..fba104ab 100644 --- a/controller/stackset.go +++ b/controller/stackset.go @@ -34,6 +34,8 @@ const ( ResetHPAMinReplicasDelayAnnotationKey = "alpha.stackset-controller.zalando.org/reset-hpa-min-replicas-delay" StacksetControllerControllerAnnotationKey = "stackset-controller.zalando.org/controller" + reasonFailedManageStackSet = "FailedManageStackSet" + defaultResetMinReplicasDelay = 10 * time.Minute ) @@ -78,6 +80,21 @@ func NewStackSetController(client clientset.Interface, controllerID string, inte } } +func (c *StackSetController) stacksetLogger(ssc *core.StackSetContainer) *log.Entry { + return c.logger.WithFields(map[string]interface{}{ + "namespace": ssc.StackSet.Namespace, + "stackset": ssc.StackSet.Name, + }) +} + +func (c *StackSetController) stackLogger(ssc *core.StackSetContainer, sc *core.StackContainer) *log.Entry { + return c.logger.WithFields(map[string]interface{}{ + "namespace": ssc.StackSet.Namespace, + "stackset": ssc.StackSet.Name, + "stack": sc.Name(), + }) +} + // Run runs the main loop of the StackSetController. Before the loops it // sets up a watcher to watch StackSet resources. The watch will send // changes over a channel which is polled from the main loop. @@ -99,17 +116,14 @@ func (c *StackSetController) Run(ctx context.Context) { var reconcileGroup errgroup.Group for stackset, container := range stackContainers { - container := *container + container := container reconcileGroup.Go(func() error { if _, ok := c.stacksetStore[stackset]; ok { err := c.ReconcileStackSet(container) if err != nil { - c.logger. - WithField("namespace", container.StackSet.Namespace). - WithField("name", container.StackSet.Name). - Errorf("unable to reconcile a stackset: %v", err) - return c.errorEventf(container.StackSet, "FailedManageStackSet", err) + c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err) + return c.errorEventf(container.StackSet, reasonFailedManageStackSet, err) } } return nil @@ -122,10 +136,7 @@ func (c *StackSetController) Run(ctx context.Context) { } case e := <-c.stacksetEvents: stackset := *e.StackSet - // set TypeMeta manually because of this bug: - // https://github.com/kubernetes/client-go/issues/308 - stackset.APIVersion = core.APIVersion - stackset.Kind = core.KindStackSet + fixupStackSetTypeMeta(&stackset) // update/delete existing entry if _, ok := c.stacksetStore[stackset.UID]; ok { @@ -245,11 +256,7 @@ func (c *StackSetController) collectStacks(stacksets map[types.UID]*core.StackSe if uid, ok := getOwnerUID(stack.ObjectMeta); ok { if s, ok := stacksets[uid]; ok { stack := stack - - // set TypeMeta manually because of this bug: - // https://github.com/kubernetes/client-go/issues/308 - stack.APIVersion = core.APIVersion - stack.Kind = core.KindStack + fixupStackTypeMeta(&stack) s.StackContainers[stack.UID] = &core.StackContainer{ Stack: &stack, @@ -460,7 +467,7 @@ func retryUpdate(updateFn func(retry bool) error) error { } // ReconcileStatuses reconciles the statuses of StackSets and Stacks. -func (c *StackSetController) ReconcileStatuses(ssc core.StackSetContainer) error { +func (c *StackSetController) ReconcileStatuses(ssc *core.StackSetContainer) error { for _, sc := range ssc.StackContainers { stack := sc.Stack.DeepCopy() status := *sc.GenerateStackStatus() @@ -506,7 +513,7 @@ func (c *StackSetController) ReconcileStatuses(ssc core.StackSetContainer) error } // CreateCurrentStack creates a new Stack object for the current stack, if needed -func (c *StackSetController) CreateCurrentStack(ssc core.StackSetContainer) error { +func (c *StackSetController) CreateCurrentStack(ssc *core.StackSetContainer) error { newStack, newStackVersion := ssc.NewStack() if newStack == nil { return nil @@ -514,13 +521,9 @@ func (c *StackSetController) CreateCurrentStack(ssc core.StackSetContainer) erro created, err := c.client.ZalandoV1().Stacks(newStack.Namespace()).Create(newStack.Stack) if err != nil { - return c.errorEventf(ssc.StackSet, "FailedCreateStack", err) + return err } - - // set TypeMeta manually because of this bug: - // https://github.com/kubernetes/client-go/issues/308 - created.APIVersion = core.APIVersion - created.Kind = core.KindStack + fixupStackTypeMeta(created) c.recorder.Eventf( ssc.StackSet, @@ -529,13 +532,15 @@ func (c *StackSetController) CreateCurrentStack(ssc core.StackSetContainer) erro "Created stack %s", newStack.Name()) - // Update observedStackVersion - ssc.StackSet.Status.ObservedStackVersion = newStackVersion - updated, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ssc.StackSet) + // Persist ObservedStackVersion in the status + updated := ssc.StackSet.DeepCopy() + updated.Status.ObservedStackVersion = newStackVersion + result, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(updated) if err != nil { return err } - ssc.StackSet = updated + fixupStackSetTypeMeta(result) + ssc.StackSet = result ssc.StackContainers[created.UID] = &core.StackContainer{ Stack: created, @@ -546,7 +551,7 @@ func (c *StackSetController) CreateCurrentStack(ssc core.StackSetContainer) erro } // CleanupOldStacks deletes stacks that are no longer needed. -func (c *StackSetController) CleanupOldStacks(ssc core.StackSetContainer) error { +func (c *StackSetController) CleanupOldStacks(ssc *core.StackSetContainer) error { for _, sc := range ssc.StackContainers { if !sc.PendingRemoval { continue @@ -627,14 +632,7 @@ func (c *StackSetController) ReconcileStackSetIngress(stackset *zv1.StackSet, ex return nil } -func (c *StackSetController) ReconcileResources(ssc core.StackSetContainer) error { - for _, sc := range ssc.StackContainers { - err := c.ReconcileStackResources(ssc, sc) - if err != nil { - return c.errorEventf(sc.Stack, "FailedManageStack", err) - } - } - +func (c *StackSetController) ReconcileStackSetResources(ssc *core.StackSetContainer) error { err := c.ReconcileStackSetIngress(ssc.StackSet, ssc.Ingress, ssc.GenerateIngress) if err != nil { return c.errorEventf(ssc.StackSet, "FailedManageIngress", err) @@ -658,7 +656,7 @@ func (c *StackSetController) ReconcileResources(ssc core.StackSetContainer) erro return nil } -func (c *StackSetController) ReconcileStackResources(ssc core.StackSetContainer, sc *core.StackContainer) error { +func (c *StackSetController) ReconcileStackResources(ssc *core.StackSetContainer, sc *core.StackContainer) error { err := c.ReconcileStackDeployment(sc.Stack, sc.Resources.Deployment, sc.GenerateDeployment) if err != nil { return c.errorEventf(sc.Stack, "FailedManageDeployment", err) @@ -682,26 +680,24 @@ func (c *StackSetController) ReconcileStackResources(ssc core.StackSetContainer, return nil } -func (c *StackSetController) ReconcileStackSet(container core.StackSetContainer) error { - // Create current stack, if needed +func (c *StackSetController) ReconcileStackSet(container *core.StackSetContainer) error { + // Create current stack, if needed. Proceed on errors. err := c.CreateCurrentStack(container) if err != nil { - return err + err = c.errorEventf(container.StackSet, "FailedCreateStack", err) + c.stacksetLogger(container).Errorf("Unable to create stack: %v", err) } - // Update statuses from external resources (ingresses, deployments, etc) + // Update statuses from external resources (ingresses, deployments, etc). Abort on errors. err = container.UpdateFromResources() if err != nil { return err } - // Update the stacks with the currently selected traffic reconciler + // Update the stacks with the currently selected traffic reconciler. Proceed on errors. err = container.ManageTraffic(time.Now()) if err != nil { - if !core.IsTrafficSwitchError(err) { - return err - } - c.logger.Warnf("Traffic reconciliation for %s/%s failed: %v", container.StackSet.Namespace, container.StackSet.Name, err) + c.stacksetLogger(container).Errorf("Traffic reconciliation failed: %v", err) c.recorder.Eventf( container.StackSet, v1.EventTypeWarning, @@ -710,24 +706,32 @@ func (c *StackSetController) ReconcileStackSet(container core.StackSetContainer) } // Mark stacks that should be removed - err = container.MarkExpiredStacks() - if err != nil { - return err + container.MarkExpiredStacks() + + // Reconcile stack resources. Proceed on errors. + for _, sc := range container.StackContainers { + err := c.ReconcileStackResources(container, sc) + if err != nil { + err = c.errorEventf(sc.Stack, "FailedManageStack", err) + c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err) + } } - // Create or update resources - err = c.ReconcileResources(container) + // Reconcile stackset resources. Proceed on errors. + err = c.ReconcileStackSetResources(container) if err != nil { - return err + err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err) + c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err) } - // Delete old stacks + // Delete old stacks. Proceed on errors. err = c.CleanupOldStacks(container) if err != nil { - return err + err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err) + c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err) } - // Update statuses + // Update statuses. err = c.ReconcileStatuses(container) if err != nil { return err @@ -749,3 +753,17 @@ func getResetMinReplicasDelay(annotations map[string]string) (time.Duration, boo } return resetDelay, true } + +func fixupStackSetTypeMeta(stackset *zv1.StackSet) { + // set TypeMeta manually because of this bug: + // https://github.com/kubernetes/client-go/issues/308 + stackset.APIVersion = core.APIVersion + stackset.Kind = core.KindStackSet +} + +func fixupStackTypeMeta(stack *zv1.Stack) { + // set TypeMeta manually because of this bug: + // https://github.com/kubernetes/client-go/issues/308 + stack.APIVersion = core.APIVersion + stack.Kind = core.KindStack +} diff --git a/controller/stackset_test.go b/controller/stackset_test.go index 796d3d08..c9d20050 100644 --- a/controller/stackset_test.go +++ b/controller/stackset_test.go @@ -270,7 +270,7 @@ func TestCreateCurrentStack(t *testing.T) { _, err = env.client.ZalandoV1().Stacks(stackset.Namespace).Get("foo-v1", metav1.GetOptions{}) require.True(t, errors.IsNotFound(err)) - container := core.StackSetContainer{ + container := &core.StackSetContainer{ StackSet: &stackset, StackContainers: map[types.UID]*core.StackContainer{}, TrafficReconciler: &core.SimpleTrafficReconciler{}, @@ -292,7 +292,7 @@ func TestCreateCurrentStack(t *testing.T) { Stack: stack, }, }, container.StackContainers) - require.Equal(t, "v1", stackset.Status.ObservedStackVersion) + require.Equal(t, "v1", container.StackSet.Status.ObservedStackVersion) // Check that we don't create the stack if not needed stackset.Status.ObservedStackVersion = "v2" @@ -320,7 +320,7 @@ func TestCleanupOldStacks(t *testing.T) { err = env.CreateStacks([]zv1.Stack{testStack1, testStack2, testStack3, testStack4}) require.NoError(t, err) - container := core.StackSetContainer{ + container := &core.StackSetContainer{ StackSet: &stackset, StackContainers: map[types.UID]*core.StackContainer{ testStack1.UID: { diff --git a/pkg/core/stackset.go b/pkg/core/stackset.go index 154c451d..aaf55c3c 100644 --- a/pkg/core/stackset.go +++ b/pkg/core/stackset.go @@ -98,7 +98,7 @@ func (ssc *StackSetContainer) NewStack() (*StackContainer, string) { } // MarkExpiredStacks marks stacks that should be deleted -func (ssc *StackSetContainer) MarkExpiredStacks() error { +func (ssc *StackSetContainer) MarkExpiredStacks() { historyLimit := defaultStackLifecycleLimit if ssc.StackSet.Spec.StackLifecycle.Limit != nil { historyLimit = int(*ssc.StackSet.Spec.StackLifecycle.Limit) @@ -115,7 +115,7 @@ func (ssc *StackSetContainer) MarkExpiredStacks() error { // only garbage collect if history limit is reached if len(gcCandidates) <= historyLimit { - return nil + return } // sort candidates by oldest @@ -128,8 +128,6 @@ func (ssc *StackSetContainer) MarkExpiredStacks() error { for _, sc := range gcCandidates[:excessStacks] { sc.PendingRemoval = true } - - return nil } func (ssc *StackSetContainer) GenerateIngress() (*extensions.Ingress, error) { diff --git a/pkg/core/stackset_test.go b/pkg/core/stackset_test.go index 2f2fed69..55573423 100644 --- a/pkg/core/stackset_test.go +++ b/pkg/core/stackset_test.go @@ -1,6 +1,7 @@ package core import ( + "strconv" "testing" "time" @@ -114,8 +115,7 @@ func TestExpiredStacks(t *testing.T) { c.StackContainers[types.UID(stack.Name())] = stack } - err := c.MarkExpiredStacks() - require.NoError(t, err) + c.MarkExpiredStacks() for _, stack := range tc.stacks { require.Equal(t, tc.expected[stack.Name()], stack.PendingRemoval, "stack %s", stack.Stack.Name) } @@ -308,6 +308,47 @@ func TestStackUpdateFromResources(t *testing.T) { }) } + deployment := func(stackGeneration int64, generation int64, observedGeneration int64) *apps.Deployment { + return &apps.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Generation: generation, + Annotations: map[string]string{ + stackGenerationAnnotationKey: strconv.FormatInt(stackGeneration, 10), + }, + }, + Status: apps.DeploymentStatus{ + ObservedGeneration: observedGeneration, + }, + } + } + service := func(stackGeneration int64) *v1.Service { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + stackGenerationAnnotationKey: strconv.FormatInt(stackGeneration, 10), + }, + }, + } + } + ingress := func(stackGeneration int64) *extensions.Ingress { + return &extensions.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + stackGenerationAnnotationKey: strconv.FormatInt(stackGeneration, 10), + }, + }, + } + } + hpa := func(stackGeneration int64) *autoscaling.HorizontalPodAutoscaler { + return &autoscaling.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + stackGenerationAnnotationKey: strconv.FormatInt(stackGeneration, 10), + }, + }, + } + } + hourAgo := time.Now().Add(-time.Hour) runTest("stackset replicas default to 1", func(t *testing.T, container *StackContainer) { @@ -345,9 +386,9 @@ func TestStackUpdateFromResources(t *testing.T) { require.EqualValues(t, hourAgo, container.noTrafficSince) }) - runTest("missing deployment is handled fine", func(t *testing.T, container *StackContainer) { + runTest("missing resources are handled fine", func(t *testing.T, container *StackContainer) { container.updateFromResources() - require.EqualValues(t, false, container.deploymentUpdated) + require.EqualValues(t, false, container.resourcesUpdated) require.EqualValues(t, 0, container.createdReplicas) require.EqualValues(t, 0, container.readyReplicas) require.EqualValues(t, 0, container.updatedReplicas) @@ -380,51 +421,80 @@ func TestStackUpdateFromResources(t *testing.T) { }) runTest("deployment isn't considered updated if the generation is different", func(t *testing.T, container *StackContainer) { container.Stack.Generation = 11 - container.Resources.Deployment = &apps.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Generation: 5, - Annotations: map[string]string{ - stackGenerationAnnotationKey: "10", - }, - }, - Status: apps.DeploymentStatus{ - ObservedGeneration: 5, - }, - } + container.Resources.Deployment = deployment(10, 5, 5) + container.Resources.Service = service(11) container.updateFromResources() - require.EqualValues(t, false, container.deploymentUpdated) + require.EqualValues(t, false, container.resourcesUpdated) }) runTest("deployment isn't considered updated if observedGeneration is different", func(t *testing.T, container *StackContainer) { container.Stack.Generation = 11 - container.Resources.Deployment = &apps.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Generation: 5, - Annotations: map[string]string{ - stackGenerationAnnotationKey: "11", - }, - }, - Status: apps.DeploymentStatus{ - ObservedGeneration: 4, - }, - } + container.Resources.Deployment = deployment(11, 5, 4) + container.Resources.Service = service(11) container.updateFromResources() - require.EqualValues(t, false, container.deploymentUpdated) + require.EqualValues(t, false, container.resourcesUpdated) }) - runTest("deployment is considered updated if observedGeneration is the same", func(t *testing.T, container *StackContainer) { + + runTest("service isn't considered updated if the generation is different", func(t *testing.T, container *StackContainer) { container.Stack.Generation = 11 - container.Resources.Deployment = &apps.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Generation: 5, - Annotations: map[string]string{ - stackGenerationAnnotationKey: "11", - }, - }, - Status: apps.DeploymentStatus{ - ObservedGeneration: 5, - }, - } + container.Resources.Deployment = deployment(11, 5, 5) + container.Resources.Service = service(10) + container.updateFromResources() + require.EqualValues(t, false, container.resourcesUpdated) + }) + + runTest("ingress isn't considered updated if the generation is different", func(t *testing.T, container *StackContainer) { + container.Stack.Generation = 11 + container.ingressSpec = &zv1.StackSetIngressSpec{} + container.Resources.Deployment = deployment(11, 5, 5) + container.Resources.Service = service(11) + container.Resources.Ingress = ingress(10) + container.updateFromResources() + require.EqualValues(t, false, container.resourcesUpdated) + }) + runTest("ingress isn't considered updated if it should be gone", func(t *testing.T, container *StackContainer) { + container.Stack.Generation = 11 + container.Resources.Deployment = deployment(11, 5, 5) + container.Resources.Service = service(11) + container.Resources.Ingress = ingress(11) + container.updateFromResources() + require.EqualValues(t, false, container.resourcesUpdated) + }) + + runTest("hpa isn't considered updated if the generation is different", func(t *testing.T, container *StackContainer) { + container.Stack.Generation = 11 + container.Stack.Spec.Autoscaler = &zv1.Autoscaler{} + container.Resources.Deployment = deployment(11, 5, 5) + container.Resources.Service = service(11) + container.Resources.HPA = hpa(10) + container.updateFromResources() + require.EqualValues(t, false, container.resourcesUpdated) + }) + runTest("hpa isn't considered updated if it should be gone", func(t *testing.T, container *StackContainer) { + container.Stack.Generation = 11 + container.Resources.Deployment = deployment(11, 5, 5) + container.Resources.Service = service(11) + container.Resources.HPA = hpa(11) + container.updateFromResources() + require.EqualValues(t, false, container.resourcesUpdated) + }) + + runTest("resources are recognised as updated correctly (deployment and service)", func(t *testing.T, container *StackContainer) { + container.Stack.Generation = 11 + container.Resources.Deployment = deployment(11, 5, 5) + container.Resources.Service = service(11) + container.updateFromResources() + require.EqualValues(t, true, container.resourcesUpdated) + }) + runTest("resources are recognised as updated correctly (all resources)", func(t *testing.T, container *StackContainer) { + container.Stack.Generation = 11 + container.ingressSpec = &zv1.StackSetIngressSpec{} + container.Stack.Spec.Autoscaler = &zv1.Autoscaler{} + container.Resources.Deployment = deployment(11, 5, 5) + container.Resources.Service = service(11) + container.Resources.Ingress = ingress(11) + container.Resources.HPA = hpa(11) container.updateFromResources() - require.EqualValues(t, true, container.deploymentUpdated) + require.EqualValues(t, true, container.resourcesUpdated) }) runTest("prescaling information is parsed from the status", func(t *testing.T, container *StackContainer) { diff --git a/pkg/core/test_helpers.go b/pkg/core/test_helpers.go index 4547620a..324a7bfc 100644 --- a/pkg/core/test_helpers.go +++ b/pkg/core/test_helpers.go @@ -29,15 +29,15 @@ func testStack(name string) *testStackFactory { } func (f *testStackFactory) ready(replicas int32) *testStackFactory { - f.container.deploymentUpdated = true + f.container.resourcesUpdated = true f.container.deploymentReplicas = replicas f.container.updatedReplicas = replicas f.container.readyReplicas = replicas return f } -func (f *testStackFactory) deployment(updated bool, deploymentReplicas, updatedReplicas, readyReplicas int32) *testStackFactory { - f.container.deploymentUpdated = updated +func (f *testStackFactory) deployment(resourcesUpdated bool, deploymentReplicas, updatedReplicas, readyReplicas int32) *testStackFactory { + f.container.resourcesUpdated = resourcesUpdated f.container.deploymentReplicas = deploymentReplicas f.container.updatedReplicas = updatedReplicas f.container.readyReplicas = readyReplicas diff --git a/pkg/core/traffic.go b/pkg/core/traffic.go index 486a6a4d..f108d29d 100644 --- a/pkg/core/traffic.go +++ b/pkg/core/traffic.go @@ -1,7 +1,6 @@ package core import ( - "fmt" "time" ) @@ -10,23 +9,6 @@ const ( backendWeightsAnnotationKey = "zalando.org/backend-weights" ) -type trafficSwitchError struct { - reason string -} - -func (e *trafficSwitchError) Error() string { - return e.reason -} - -func IsTrafficSwitchError(err error) bool { - _, ok := err.(*trafficSwitchError) - return ok -} - -func newTrafficSwitchError(format string, args ...interface{}) error { - return &trafficSwitchError{reason: fmt.Sprintf(format, args...)} -} - type TrafficReconciler interface { // Handle the traffic switching and/or scaling logic. Reconcile(stacks map[string]*StackContainer, currentTimestamp time.Time) error diff --git a/pkg/core/traffic_prescaling.go b/pkg/core/traffic_prescaling.go index e0ee47b1..d4a37e24 100644 --- a/pkg/core/traffic_prescaling.go +++ b/pkg/core/traffic_prescaling.go @@ -1,6 +1,7 @@ package core import ( + "fmt" "math" "sort" "strings" @@ -101,7 +102,7 @@ func (r PrescalingTrafficReconciler) Reconcile(stacks map[string]*StackContainer if len(nonReadyStacks) > 0 { sort.Strings(nonReadyStacks) - return newTrafficSwitchError("stacks not ready: %s", strings.Join(nonReadyStacks, ", ")) + return fmt.Errorf("stacks not ready: %s", strings.Join(nonReadyStacks, ", ")) } // TODO: think of case were all are zero and the service/deployment is deleted. diff --git a/pkg/core/traffic_simple.go b/pkg/core/traffic_simple.go index 388932f4..264ed135 100644 --- a/pkg/core/traffic_simple.go +++ b/pkg/core/traffic_simple.go @@ -1,6 +1,7 @@ package core import ( + "fmt" "sort" "strings" "time" @@ -23,7 +24,7 @@ func (SimpleTrafficReconciler) Reconcile(stacks map[string]*StackContainer, curr } if len(nonReadyStacks) > 0 { sort.Strings(nonReadyStacks) - return newTrafficSwitchError("stacks not ready: %s", strings.Join(nonReadyStacks, ", ")) + return fmt.Errorf("stacks not ready: %s", strings.Join(nonReadyStacks, ", ")) } // TODO: think of case were all are zero and the service/deployment is deleted. diff --git a/pkg/core/traffic_test.go b/pkg/core/traffic_test.go index 3468009c..993aabec 100644 --- a/pkg/core/traffic_test.go +++ b/pkg/core/traffic_test.go @@ -50,28 +50,28 @@ func TestTrafficSwitchSimpleNotReady(t *testing.T) { for _, tc := range []struct { name string stack *StackContainer - deploymentUpdated bool + resourcesUpdated bool deploymentReplicas int32 updatedReplicas int32 readyReplicas int32 }{ { name: "deployment not updated yet", - deploymentUpdated: false, + resourcesUpdated: false, deploymentReplicas: 3, updatedReplicas: 3, readyReplicas: 3, }, { name: "not enough updated replicas", - deploymentUpdated: true, + resourcesUpdated: true, deploymentReplicas: 3, updatedReplicas: 2, readyReplicas: 3, }, { name: "not enough ready replicas", - deploymentUpdated: true, + resourcesUpdated: true, deploymentReplicas: 3, updatedReplicas: 3, readyReplicas: 2, @@ -85,16 +85,14 @@ func TestTrafficSwitchSimpleNotReady(t *testing.T) { }, }, StackContainers: map[types.UID]*StackContainer{ - "v1": testStack("foo-v1").traffic(70, 30).deployment(tc.deploymentUpdated, tc.deploymentReplicas, tc.updatedReplicas, tc.readyReplicas).stack(), + "v1": testStack("foo-v1").traffic(70, 30).deployment(tc.resourcesUpdated, tc.deploymentReplicas, tc.updatedReplicas, tc.readyReplicas).stack(), "v2": testStack("foo-v2").traffic(30, 70).ready(3).stack(), }, TrafficReconciler: SimpleTrafficReconciler{}, } err := c.ManageTraffic(time.Now()) - expected := &trafficSwitchError{ - reason: "stacks not ready: foo-v1", - } - require.Equal(t, expected, err) + require.Error(t, err) + require.Equal(t, "stacks not ready: foo-v1", err.Error()) }) } } diff --git a/pkg/core/types.go b/pkg/core/types.go index e0dcadca..4765b4f8 100644 --- a/pkg/core/types.go +++ b/pkg/core/types.go @@ -68,7 +68,7 @@ type StackContainer struct { stackReplicas int32 // Fields from the stack resources - deploymentUpdated bool + resourcesUpdated bool deploymentReplicas int32 createdReplicas int32 readyReplicas int32 @@ -102,12 +102,8 @@ func (sc *StackContainer) HasTraffic() bool { } func (sc *StackContainer) IsReady() bool { - // Haven't updated yet - if !sc.deploymentUpdated { - return false - } - - return sc.deploymentReplicas == sc.updatedReplicas && sc.deploymentReplicas == sc.readyReplicas + // Stacks are considered ready when all subresources have been updated, and we have enough replicas + return sc.resourcesUpdated && sc.deploymentReplicas == sc.updatedReplicas && sc.deploymentReplicas == sc.readyReplicas } func (sc *StackContainer) MaxReplicas() int32 { @@ -245,20 +241,41 @@ func (ssc *StackSetContainer) TrafficChanges() []TrafficChange { func (sc *StackContainer) updateFromResources() { sc.stackReplicas = effectiveReplicas(sc.Stack.Spec.Replicas) + var deploymentUpdated, serviceUpdated, ingressUpdated, hpaUpdated bool + + // deployment if sc.Resources.Deployment != nil { deployment := sc.Resources.Deployment sc.deploymentReplicas = effectiveReplicas(deployment.Spec.Replicas) sc.createdReplicas = deployment.Status.Replicas sc.readyReplicas = deployment.Status.ReadyReplicas sc.updatedReplicas = deployment.Status.UpdatedReplicas - sc.deploymentUpdated = IsResourceUpToDate(sc.Stack, sc.Resources.Deployment.ObjectMeta) && deployment.Status.ObservedGeneration == deployment.Generation + deploymentUpdated = IsResourceUpToDate(sc.Stack, sc.Resources.Deployment.ObjectMeta) && deployment.Status.ObservedGeneration == deployment.Generation + } + + // service + serviceUpdated = sc.Resources.Service != nil && IsResourceUpToDate(sc.Stack, sc.Resources.Service.ObjectMeta) + + // ingress + if sc.ingressSpec != nil { + ingressUpdated = sc.Resources.Ingress != nil && IsResourceUpToDate(sc.Stack, sc.Resources.Ingress.ObjectMeta) } else { - sc.deploymentUpdated = false + ingressUpdated = sc.Resources.Ingress == nil } + // hpa if sc.Resources.HPA != nil { - sc.desiredReplicas = sc.Resources.HPA.Status.DesiredReplicas + hpa := sc.Resources.HPA + sc.desiredReplicas = hpa.Status.DesiredReplicas } + if sc.IsAutoscaled() { + hpaUpdated = sc.Resources.HPA != nil && IsResourceUpToDate(sc.Stack, sc.Resources.HPA.ObjectMeta) + } else { + hpaUpdated = sc.Resources.HPA == nil + } + + // aggregated 'resources updated' for the readiness + sc.resourcesUpdated = deploymentUpdated && serviceUpdated && ingressUpdated && hpaUpdated status := sc.Stack.Status sc.noTrafficSince = unwrapTime(status.NoTrafficSince)