Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error handling #144

Merged
merged 8 commits into from
Jul 10, 2019
92 changes: 92 additions & 0 deletions cmd/e2e/broken_stack_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package main

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

func TestBrokenStacks(t *testing.T) {
t.Parallel()

stacksetName := "stackset-broken-stacks"
factory := NewTestStacksetSpecFactory(stacksetName).Ingress().StackGC(1, 30)

firstVersion := "v1"
firstStack := fmt.Sprintf("%s-%s", stacksetName, firstVersion)
spec := factory.Create(firstVersion)
err := createStackSet(stacksetName, 0, spec)
require.NoError(t, err)
_, err = waitForStack(t, stacksetName, firstVersion)
require.NoError(t, err)

unhealthyVersion := "v2"
unhealthyStack := fmt.Sprintf("%s-%s", stacksetName, unhealthyVersion)
spec = factory.Create(unhealthyVersion)
spec.StackTemplate.Spec.Service.Ports = []v1.ServicePort{
{
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromString("foobar"),
},
}
err = updateStackset(stacksetName, spec)
require.NoError(t, err)
_, err = waitForStack(t, stacksetName, unhealthyVersion)
require.NoError(t, err)

_, err = waitForIngress(t, stacksetName)
require.NoError(t, err)

initialWeights := map[string]float64{firstStack: 100}
err = trafficWeightsUpdated(t, stacksetName, weightKindActual, initialWeights, nil).await()
require.NoError(t, err)

// Switch traffic to the second stack, this should fail
desiredWeights := map[string]float64{unhealthyStack: 100}
err = setDesiredTrafficWeights(stacksetName, desiredWeights)
require.NoError(t, err)
err = trafficWeightsUpdated(t, stacksetName, weightKindActual, desiredWeights, nil).await()
require.Error(t, err)

// Create a healthy stack
healthyVersion := "v3"
healthyStack := fmt.Sprintf("%s-%s", stacksetName, healthyVersion)
spec = factory.Create(healthyVersion)
err = updateStackset(stacksetName, spec)
require.NoError(t, err)
_, err = waitForStack(t, stacksetName, healthyVersion)
require.NoError(t, err)

healthyWeights := map[string]float64{healthyStack: 100}
err = setDesiredTrafficWeights(stacksetName, healthyWeights)
require.NoError(t, err)
err = trafficWeightsUpdated(t, stacksetName, weightKindActual, healthyWeights, nil).await()
require.NoError(t, err)

// Create another healthy stack so we can test GC
finalVersion := "v4"
finalStack := fmt.Sprintf("%s-%s", stacksetName, finalVersion)
spec = factory.Create(finalVersion)
err = updateStackset(stacksetName, spec)
require.NoError(t, err)
_, err = waitForStack(t, stacksetName, finalVersion)
require.NoError(t, err)

finalWeights := map[string]float64{finalStack: 100}
err = setDesiredTrafficWeights(stacksetName, finalWeights)
require.NoError(t, err)
err = trafficWeightsUpdated(t, stacksetName, weightKindActual, finalWeights, nil).await()
require.NoError(t, err)

// Check that the unhealthy stack was deleted
for _, stack := range []string{firstStack, unhealthyStack} {
err := resourceDeleted(t, "stack", stack, stackInterface()).withTimeout(time.Second * 60).await()
require.NoError(t, err)
}

}
128 changes: 73 additions & 55 deletions controller/stackset.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
ResetHPAMinReplicasDelayAnnotationKey = "alpha.stackset-controller.zalando.org/reset-hpa-min-replicas-delay"
StacksetControllerControllerAnnotationKey = "stackset-controller.zalando.org/controller"

reasonFailedManageStackSet = "FailedManageStackSet"

defaultResetMinReplicasDelay = 10 * time.Minute
)

Expand Down Expand Up @@ -78,6 +80,21 @@ func NewStackSetController(client clientset.Interface, controllerID string, inte
}
}

func (c *StackSetController) stacksetLogger(ssc *core.StackSetContainer) *log.Entry {
return c.logger.WithFields(map[string]interface{}{
"namespace": ssc.StackSet.Namespace,
"stackset": ssc.StackSet.Name,
})
}

func (c *StackSetController) stackLogger(ssc *core.StackSetContainer, sc *core.StackContainer) *log.Entry {
return c.logger.WithFields(map[string]interface{}{
"namespace": ssc.StackSet.Namespace,
"stackset": ssc.StackSet.Name,
"stack": sc.Name(),
})
}

// Run runs the main loop of the StackSetController. Before the loops it
// sets up a watcher to watch StackSet resources. The watch will send
// changes over a channel which is polled from the main loop.
Expand All @@ -99,17 +116,14 @@ func (c *StackSetController) Run(ctx context.Context) {

var reconcileGroup errgroup.Group
for stackset, container := range stackContainers {
container := *container
container := container

reconcileGroup.Go(func() error {
if _, ok := c.stacksetStore[stackset]; ok {
err := c.ReconcileStackSet(container)
if err != nil {
c.logger.
WithField("namespace", container.StackSet.Namespace).
WithField("name", container.StackSet.Name).
Errorf("unable to reconcile a stackset: %v", err)
return c.errorEventf(container.StackSet, "FailedManageStackSet", err)
c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
return c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
}
}
return nil
Expand All @@ -122,10 +136,7 @@ func (c *StackSetController) Run(ctx context.Context) {
}
case e := <-c.stacksetEvents:
stackset := *e.StackSet
// set TypeMeta manually because of this bug:
// https://github.com/kubernetes/client-go/issues/308
stackset.APIVersion = core.APIVersion
stackset.Kind = core.KindStackSet
fixupStackSetTypeMeta(&stackset)

// update/delete existing entry
if _, ok := c.stacksetStore[stackset.UID]; ok {
Expand Down Expand Up @@ -245,11 +256,7 @@ func (c *StackSetController) collectStacks(stacksets map[types.UID]*core.StackSe
if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
if s, ok := stacksets[uid]; ok {
stack := stack

// set TypeMeta manually because of this bug:
// https://github.com/kubernetes/client-go/issues/308
stack.APIVersion = core.APIVersion
stack.Kind = core.KindStack
fixupStackTypeMeta(&stack)

s.StackContainers[stack.UID] = &core.StackContainer{
Stack: &stack,
Expand Down Expand Up @@ -460,7 +467,7 @@ func retryUpdate(updateFn func(retry bool) error) error {
}

// ReconcileStatuses reconciles the statuses of StackSets and Stacks.
func (c *StackSetController) ReconcileStatuses(ssc core.StackSetContainer) error {
func (c *StackSetController) ReconcileStatuses(ssc *core.StackSetContainer) error {
for _, sc := range ssc.StackContainers {
stack := sc.Stack.DeepCopy()
status := *sc.GenerateStackStatus()
Expand Down Expand Up @@ -506,21 +513,17 @@ func (c *StackSetController) ReconcileStatuses(ssc core.StackSetContainer) error
}

// CreateCurrentStack creates a new Stack object for the current stack, if needed
func (c *StackSetController) CreateCurrentStack(ssc core.StackSetContainer) error {
func (c *StackSetController) CreateCurrentStack(ssc *core.StackSetContainer) error {
newStack, newStackVersion := ssc.NewStack()
if newStack == nil {
return nil
}

created, err := c.client.ZalandoV1().Stacks(newStack.Namespace()).Create(newStack.Stack)
if err != nil {
return c.errorEventf(ssc.StackSet, "FailedCreateStack", err)
return err
}

// set TypeMeta manually because of this bug:
// https://github.com/kubernetes/client-go/issues/308
created.APIVersion = core.APIVersion
created.Kind = core.KindStack
fixupStackTypeMeta(created)

c.recorder.Eventf(
ssc.StackSet,
Expand All @@ -529,13 +532,15 @@ func (c *StackSetController) CreateCurrentStack(ssc core.StackSetContainer) erro
"Created stack %s",
newStack.Name())

// Update observedStackVersion
ssc.StackSet.Status.ObservedStackVersion = newStackVersion
updated, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ssc.StackSet)
// Persist ObservedStackVersion in the status
updated := ssc.StackSet.DeepCopy()
updated.Status.ObservedStackVersion = newStackVersion
result, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(updated)
if err != nil {
return err
}
ssc.StackSet = updated
fixupStackSetTypeMeta(result)
ssc.StackSet = result

ssc.StackContainers[created.UID] = &core.StackContainer{
Stack: created,
Expand All @@ -546,7 +551,7 @@ func (c *StackSetController) CreateCurrentStack(ssc core.StackSetContainer) erro
}

// CleanupOldStacks deletes stacks that are no longer needed.
func (c *StackSetController) CleanupOldStacks(ssc core.StackSetContainer) error {
func (c *StackSetController) CleanupOldStacks(ssc *core.StackSetContainer) error {
for _, sc := range ssc.StackContainers {
if !sc.PendingRemoval {
continue
Expand Down Expand Up @@ -627,14 +632,7 @@ func (c *StackSetController) ReconcileStackSetIngress(stackset *zv1.StackSet, ex
return nil
}

func (c *StackSetController) ReconcileResources(ssc core.StackSetContainer) error {
for _, sc := range ssc.StackContainers {
err := c.ReconcileStackResources(ssc, sc)
if err != nil {
return c.errorEventf(sc.Stack, "FailedManageStack", err)
}
}

func (c *StackSetController) ReconcileStackSetResources(ssc *core.StackSetContainer) error {
err := c.ReconcileStackSetIngress(ssc.StackSet, ssc.Ingress, ssc.GenerateIngress)
if err != nil {
return c.errorEventf(ssc.StackSet, "FailedManageIngress", err)
Expand All @@ -658,7 +656,7 @@ func (c *StackSetController) ReconcileResources(ssc core.StackSetContainer) erro
return nil
}

func (c *StackSetController) ReconcileStackResources(ssc core.StackSetContainer, sc *core.StackContainer) error {
func (c *StackSetController) ReconcileStackResources(ssc *core.StackSetContainer, sc *core.StackContainer) error {
err := c.ReconcileStackDeployment(sc.Stack, sc.Resources.Deployment, sc.GenerateDeployment)
if err != nil {
return c.errorEventf(sc.Stack, "FailedManageDeployment", err)
Expand All @@ -682,26 +680,24 @@ func (c *StackSetController) ReconcileStackResources(ssc core.StackSetContainer,
return nil
}

func (c *StackSetController) ReconcileStackSet(container core.StackSetContainer) error {
// Create current stack, if needed
func (c *StackSetController) ReconcileStackSet(container *core.StackSetContainer) error {
// Create current stack, if needed. Proceed on errors.
err := c.CreateCurrentStack(container)
if err != nil {
return err
err = c.errorEventf(container.StackSet, "FailedCreateStack", err)
c.stacksetLogger(container).Errorf("Unable to create stack: %v", err)
}

// Update statuses from external resources (ingresses, deployments, etc)
// Update statuses from external resources (ingresses, deployments, etc). Abort on errors.
err = container.UpdateFromResources()
if err != nil {
return err
}

// Update the stacks with the currently selected traffic reconciler
// Update the stacks with the currently selected traffic reconciler. Proceed on errors.
err = container.ManageTraffic(time.Now())
if err != nil {
if !core.IsTrafficSwitchError(err) {
return err
}
c.logger.Warnf("Traffic reconciliation for %s/%s failed: %v", container.StackSet.Namespace, container.StackSet.Name, err)
c.stacksetLogger(container).Errorf("Traffic reconciliation failed: %v", err)
c.recorder.Eventf(
container.StackSet,
v1.EventTypeWarning,
Expand All @@ -710,24 +706,32 @@ func (c *StackSetController) ReconcileStackSet(container core.StackSetContainer)
}

// Mark stacks that should be removed
err = container.MarkExpiredStacks()
if err != nil {
return err
container.MarkExpiredStacks()

// Reconcile stack resources. Proceed on errors.
for _, sc := range container.StackContainers {
err := c.ReconcileStackResources(container, sc)
if err != nil {
err = c.errorEventf(sc.Stack, "FailedManageStack", err)
c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err)
}
}

// Create or update resources
err = c.ReconcileResources(container)
// Reconcile stackset resources. Proceed on errors.
err = c.ReconcileStackSetResources(container)
if err != nil {
return err
err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err)
}

// Delete old stacks
// Delete old stacks. Proceed on errors.
err = c.CleanupOldStacks(container)
if err != nil {
return err
err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err)
}

// Update statuses
// Update statuses.
err = c.ReconcileStatuses(container)
if err != nil {
return err
Expand All @@ -749,3 +753,17 @@ func getResetMinReplicasDelay(annotations map[string]string) (time.Duration, boo
}
return resetDelay, true
}

func fixupStackSetTypeMeta(stackset *zv1.StackSet) {
// set TypeMeta manually because of this bug:
// https://github.com/kubernetes/client-go/issues/308
stackset.APIVersion = core.APIVersion
stackset.Kind = core.KindStackSet
}

func fixupStackTypeMeta(stack *zv1.Stack) {
// set TypeMeta manually because of this bug:
// https://github.com/kubernetes/client-go/issues/308
stack.APIVersion = core.APIVersion
stack.Kind = core.KindStack
}
6 changes: 3 additions & 3 deletions controller/stackset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func TestCreateCurrentStack(t *testing.T) {
_, err = env.client.ZalandoV1().Stacks(stackset.Namespace).Get("foo-v1", metav1.GetOptions{})
require.True(t, errors.IsNotFound(err))

container := core.StackSetContainer{
container := &core.StackSetContainer{
StackSet: &stackset,
StackContainers: map[types.UID]*core.StackContainer{},
TrafficReconciler: &core.SimpleTrafficReconciler{},
Expand All @@ -292,7 +292,7 @@ func TestCreateCurrentStack(t *testing.T) {
Stack: stack,
},
}, container.StackContainers)
require.Equal(t, "v1", stackset.Status.ObservedStackVersion)
require.Equal(t, "v1", container.StackSet.Status.ObservedStackVersion)

// Check that we don't create the stack if not needed
stackset.Status.ObservedStackVersion = "v2"
Expand Down Expand Up @@ -320,7 +320,7 @@ func TestCleanupOldStacks(t *testing.T) {
err = env.CreateStacks([]zv1.Stack{testStack1, testStack2, testStack3, testStack4})
require.NoError(t, err)

container := core.StackSetContainer{
container := &core.StackSetContainer{
StackSet: &stackset,
StackContainers: map[types.UID]*core.StackContainer{
testStack1.UID: {
Expand Down
Loading