Skip to content

Commit

Permalink
Implement a new RetryClient that wraps the Kubernetes Client with ret…
Browse files Browse the repository at this point in the history
…ries. (#547)

Fixes #526

This ensures that we properly retry on network failures in our integration tests to prevent an flaky tests.
  • Loading branch information
jbarrick-mesosphere committed Jul 10, 2019
1 parent e5f7d0b commit 3befd46
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pkg/test/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (h *Harness) Client(forceNew bool) (client.Client, error) {
return nil, err
}

h.client, err = client.New(config, client.Options{
h.client, err = testutils.NewRetryClient(config, client.Options{
Scheme: testutils.Scheme(),
})
return h.client, err
Expand Down
46 changes: 17 additions & 29 deletions pkg/test/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,21 @@ func (s *Step) DeleteExisting(namespace string) error {
}

if ref.Labels != nil && len(ref.Labels) != 0 {
// If the reference has a label selector, List all objects that match
if err := testutils.Retry(context.TODO(), func(ctx context.Context) error {
u := &unstructured.UnstructuredList{}
u.SetGroupVersionKind(gvk)

listOptions := []client.ListOptionFunc{client.MatchingLabels(ref.Labels)}
if objNs != "" {
listOptions = append(listOptions, client.InNamespace(objNs))
}

err := s.Client.List(ctx, u, listOptions...)
if err != nil {
return errors.Wrap(err, "listing matching resources")
}

for index := range u.Items {
toDelete = append(toDelete, &u.Items[index])
}

return nil
}, testutils.IsJSONSyntaxError); err != nil {
return err
u := &unstructured.UnstructuredList{}
u.SetGroupVersionKind(gvk)

listOptions := []client.ListOptionFunc{client.MatchingLabels(ref.Labels)}
if objNs != "" {
listOptions = append(listOptions, client.InNamespace(objNs))
}

err := s.Client.List(context.TODO(), u, listOptions...)
if err != nil {
return errors.Wrap(err, "listing matching resources")
}

for index := range u.Items {
toDelete = append(toDelete, &u.Items[index])
}
} else {
// Otherwise just append the object specified.
Expand All @@ -109,13 +102,8 @@ func (s *Step) DeleteExisting(namespace string) error {
}

for _, obj := range toDelete {
if err := testutils.Retry(context.TODO(), func(ctx context.Context) error {
err := s.Client.Delete(context.TODO(), obj.DeepCopyObject())
if err != nil && k8serrors.IsNotFound(err) {
return nil
}
return err
}, testutils.IsJSONSyntaxError); err != nil {
err := s.Client.Delete(context.TODO(), obj.DeepCopyObject())
if err != nil && !k8serrors.IsNotFound(err) {
return err
}
}
Expand Down
91 changes: 89 additions & 2 deletions pkg/test/utils/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,93 @@ func Retry(ctx context.Context, fn func(context.Context) error, errValidationFun
return lastErr
}

// RetryClient implements the Client interface, with retries built in.
type RetryClient struct {
Client client.Client
}

// RetryStatusWriter implements the StatusWriter interface, with retries built in.
type RetryStatusWriter struct {
StatusWriter client.StatusWriter
}

// NewRetryClient initializes a new Kubernetes client that automatically retries on network-related errors.
func NewRetryClient(cfg *rest.Config, opts client.Options) (*RetryClient, error) {
client, err := client.New(cfg, opts)
return &RetryClient{Client: client}, err
}

// Create saves the object obj in the Kubernetes cluster.
func (r *RetryClient) Create(ctx context.Context, obj runtime.Object, opts ...client.CreateOptionFunc) error {
return Retry(ctx, func(ctx context.Context) error {
return r.Client.Create(ctx, obj, opts...)
}, IsJSONSyntaxError)
}

// Delete deletes the given obj from Kubernetes cluster.
func (r *RetryClient) Delete(ctx context.Context, obj runtime.Object, opts ...client.DeleteOptionFunc) error {
return Retry(ctx, func(ctx context.Context) error {
return r.Client.Delete(ctx, obj, opts...)
}, IsJSONSyntaxError)
}

// Update updates the given obj in the Kubernetes cluster. obj must be a
// struct pointer so that obj can be updated with the content returned by the Server.
func (r *RetryClient) Update(ctx context.Context, obj runtime.Object, opts ...client.UpdateOptionFunc) error {
return Retry(ctx, func(ctx context.Context) error {
return r.Client.Update(ctx, obj, opts...)
}, IsJSONSyntaxError)
}

// Patch patches the given obj in the Kubernetes cluster. obj must be a
// struct pointer so that obj can be updated with the content returned by the Server.
func (r *RetryClient) Patch(ctx context.Context, obj runtime.Object, patch client.Patch, opts ...client.PatchOptionFunc) error {
return Retry(ctx, func(ctx context.Context) error {
return r.Client.Patch(ctx, obj, patch, opts...)
}, IsJSONSyntaxError)
}

// Get retrieves an obj for the given object key from the Kubernetes Cluster.
// obj must be a struct pointer so that obj can be updated with the response
// returned by the Server.
func (r *RetryClient) Get(ctx context.Context, key client.ObjectKey, obj runtime.Object) error {
return Retry(ctx, func(ctx context.Context) error {
return r.Client.Get(ctx, key, obj)
}, IsJSONSyntaxError)
}

// List retrieves list of objects for a given namespace and list options. On a
// successful call, Items field in the list will be populated with the
// result returned from the server.
func (r *RetryClient) List(ctx context.Context, list runtime.Object, opts ...client.ListOptionFunc) error {
return Retry(ctx, func(ctx context.Context) error {
return r.Client.List(ctx, list, opts...)
}, IsJSONSyntaxError)
}

// Status returns a client which can update status subresource for kubernetes objects.
func (r *RetryClient) Status() client.StatusWriter {
return &RetryStatusWriter{
StatusWriter: r.Client.Status(),
}
}

// Update updates the given obj in the Kubernetes cluster. obj must be a
// struct pointer so that obj can be updated with the content returned by the Server.
func (r *RetryStatusWriter) Update(ctx context.Context, obj runtime.Object, opts ...client.UpdateOptionFunc) error {
return Retry(ctx, func(ctx context.Context) error {
return r.StatusWriter.Update(ctx, obj, opts...)
}, IsJSONSyntaxError)
}

// Patch patches the given obj in the Kubernetes cluster. obj must be a
// struct pointer so that obj can be updated with the content returned by the Server.
func (r *RetryStatusWriter) Patch(ctx context.Context, obj runtime.Object, patch client.Patch, opts ...client.PatchOptionFunc) error {
return Retry(ctx, func(ctx context.Context) error {
return r.StatusWriter.Patch(ctx, obj, patch, opts...)
}, IsJSONSyntaxError)
}

// Scheme returns an initialized Kubernetes Scheme.
func Scheme() *runtime.Scheme {
apis.AddToScheme(scheme.Scheme)
Expand Down Expand Up @@ -454,7 +541,7 @@ func CreateOrUpdate(ctx context.Context, client client.Client, obj runtime.Objec
validators := []func(err error) bool{}

if retryOnError {
validators = append(validators, k8serrors.IsConflict, IsJSONSyntaxError)
validators = append(validators, k8serrors.IsConflict)
}

return updated, Retry(ctx, func(ctx context.Context) error {
Expand Down Expand Up @@ -566,7 +653,7 @@ func StartTestEnvironment() (env TestEnvironment, err error) {
return
}

env.Client, err = client.New(env.Config, client.Options{})
env.Client, err = NewRetryClient(env.Config, client.Options{})
if err != nil {
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/test/utils/kubernetes_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestCreateOrUpdate(t *testing.T) {

func TestWaitForCRDs(t *testing.T) {
// Kubernetes client caches the types, se we need to re-initialize it.
testClient, err := client.New(testenv.Config, client.Options{
testClient, err := NewRetryClient(testenv.Config, client.Options{
Scheme: Scheme(),
})
assert.Nil(t, err)
Expand All @@ -88,7 +88,7 @@ func TestWaitForCRDs(t *testing.T) {
assert.Nil(t, WaitForCRDs(testenv.DiscoveryClient, crds))

// Kubernetes client caches the types, se we need to re-initialize it.
testClient, err = client.New(testenv.Config, client.Options{
testClient, err = NewRetryClient(testenv.Config, client.Options{
Scheme: Scheme(),
})
assert.Nil(t, err)
Expand Down

0 comments on commit 3befd46

Please sign in to comment.