Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#50102 Task 3: Until, backed by retry watcher #67350

Merged
merged 3 commits into from Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 15 additions & 4 deletions pkg/client/tests/listwatch_test.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package tests

import (
"context"
"net/http/httptest"
"net/url"
"testing"
Expand Down Expand Up @@ -194,11 +195,20 @@ func (w lw) Watch(options metav1.ListOptions) (watch.Interface, error) {
func TestListWatchUntil(t *testing.T) {
fw := watch.NewFake()
go func() {
var obj *v1.Pod
obj := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
},
}
fw.Modify(obj)
}()
listwatch := lw{
list: &v1.PodList{Items: []v1.Pod{{}}},
list: &v1.PodList{
ListMeta: metav1.ListMeta{
ResourceVersion: "1",
},
Items: []v1.Pod{{}},
},
watch: fw,
}

Expand All @@ -213,8 +223,9 @@ func TestListWatchUntil(t *testing.T) {
},
}

timeout := 10 * time.Second
lastEvent, err := watchtools.ListWatchUntil(timeout, listwatch, conditions...)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
lastEvent, err := watchtools.ListWatchUntil(ctx, listwatch, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
Expand Down
14 changes: 12 additions & 2 deletions staging/src/k8s.io/client-go/tools/cache/listwatch.go
Expand Up @@ -27,15 +27,25 @@ import (
"k8s.io/client-go/tools/pager"
)

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// Lister is any object that knows how to perform an initial list.
type Lister interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
}

// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
Lister
Watcher
}

// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

Expand Down
7 changes: 7 additions & 0 deletions staging/src/k8s.io/client-go/tools/watch/BUILD
Expand Up @@ -4,18 +4,22 @@ go_library(
name = "go_default_library",
srcs = [
"informerwatcher.go",
"retrywatcher.go",
"until.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/watch",
importpath = "k8s.io/client-go/tools/watch",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
Expand All @@ -24,11 +28,13 @@ go_test(
name = "go_default_test",
srcs = [
"informerwatcher_test.go",
"retrywatcher_test.go",
"until_test.go",
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
Expand All @@ -39,6 +45,7 @@ go_test(
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

Expand Down
283 changes: 283 additions & 0 deletions staging/src/k8s.io/client-go/tools/watch/retrywatcher.go
@@ -0,0 +1,283 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package watch

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"time"

"github.com/davecgh/go-spew/spew"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)

// resourceVersionGetter is an interface used to get resource version from events.
// We can't reuse an interface from meta otherwise it would be a cyclic dependency and we need just this one method
type resourceVersionGetter interface {
GetResourceVersion() string
}

// RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout)
// it will get restarted from the last point without the consumer even knowing about it.
// RetryWatcher does that by inspecting events and keeping track of resourceVersion.
// Especially useful when using watch.UntilWithoutRetry where premature termination is causing issues and flakes.
// Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to
// use Informers for that.
type RetryWatcher struct {
lastResourceVersion string
watcherClient cache.Watcher
resultChan chan watch.Event
stopChan chan struct{}
doneChan chan struct{}
minRestartDelay time.Duration
}

// NewRetryWatcher creates a new RetryWatcher.
// It will make sure that watches gets restarted in case of recoverable errors.
// The initialResourceVersion will be given to watch method when first called.
func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) {
return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second)
}

func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) {
switch initialResourceVersion {
case "", "0":
// TODO: revisit this if we ever get WATCH v2 where it means start "now"
// without doing the synthetic list of objects at the beginning (see #74022)
return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion)
default:
break
}

rw := &RetryWatcher{
lastResourceVersion: initialResourceVersion,
watcherClient: watcherClient,
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
resultChan: make(chan watch.Event, 0),
minRestartDelay: minRestartDelay,
}

go rw.receive()
return rw, nil
}

func (rw *RetryWatcher) send(event watch.Event) bool {
// Writing to an unbuffered channel is blocking operation
// and we need to check if stop wasn't requested while doing so.
select {
case rw.resultChan <- event:
return true
case <-rw.stopChan:
return false
}
}

// doReceive returns true when it is done, false otherwise.
// If it is not done the second return value holds the time to wait before calling it again.
func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
tnozicka marked this conversation as resolved.
Show resolved Hide resolved
watcher, err := rw.watcherClient.Watch(metav1.ListOptions{
ResourceVersion: rw.lastResourceVersion,
})
// We are very unlikely to hit EOF here since we are just establishing the call,
// but it may happen that the apiserver is just shutting down (e.g. being restarted)
// This is consistent with how it is handled for informers
switch err {
case nil:
break

case io.EOF:
// watch closed normally
return false, 0
tnozicka marked this conversation as resolved.
Show resolved Hide resolved

case io.ErrUnexpectedEOF:
klog.V(1).Infof("Watch closed with unexpected EOF: %v", err)
return false, 0
tnozicka marked this conversation as resolved.
Show resolved Hide resolved

default:
msg := "Watch failed: %v"
if net.IsProbableEOF(err) {
klog.V(5).Infof(msg, err)
// Retry
return false, 0
}

klog.Errorf(msg, err)
// Retry
return false, 0
}

if watcher == nil {
klog.Error("Watch returned nil watcher")
// Retry
return false, 0
}

ch := watcher.ResultChan()
defer watcher.Stop()

for {
select {
case <-rw.stopChan:
klog.V(4).Info("Stopping RetryWatcher.")
return true, 0
case event, ok := <-ch:
if !ok {
klog.V(4).Infof("Failed to get event! Re-creating the watcher. Last RV: %s", rw.lastResourceVersion)
return false, 0
}

// We need to inspect the event and get ResourceVersion out of it
switch event.Type {
case watch.Added, watch.Modified, watch.Deleted:
metaObject, ok := event.Object.(resourceVersionGetter)
if !ok {
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus,
})
// We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
return true, 0
}

resourceVersion := metaObject.GetResourceVersion()
if resourceVersion == "" {
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus,
})
// We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
return true, 0
}

// All is fine; send the event and update lastResourceVersion
ok = rw.send(event)
if !ok {
return true, 0
}
rw.lastResourceVersion = resourceVersion

continue

case watch.Error:
status, ok := event.Object.(*metav1.Status)
if !ok {
klog.Error(spew.Sprintf("Received an error which is not *metav1.Status but %#+v", event.Object))
// Retry unknown errors
return false, 0
}

statusDelay := time.Duration(0)
if status.Details != nil {
statusDelay = time.Duration(status.Details.RetryAfterSeconds) * time.Second
}

switch status.Code {
case http.StatusGone:
// Never retry RV too old errors
_ = rw.send(event)
return true, 0

case http.StatusGatewayTimeout, http.StatusInternalServerError:
// Retry
return false, statusDelay

default:
liggitt marked this conversation as resolved.
Show resolved Hide resolved
// We retry by default. RetryWatcher is meant to proceed unless it is certain
// that it can't. If we are not certain, we proceed with retry and leave it
// up to the user to timeout if needed.

// Log here so we have a record of hitting the unexpected error
// and we can whitelist some error codes if we missed any that are expected.
klog.V(5).Info(spew.Sprintf("Retrying after unexpected error: %#+v", event.Object))

// Retry
return false, statusDelay
}

default:
klog.Errorf("Failed to recognize Event type %q", event.Type)
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus,
})
// We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
return true, 0
}
}
}
}

// receive reads the result from a watcher, restarting it if necessary.
func (rw *RetryWatcher) receive() {
defer close(rw.doneChan)
liggitt marked this conversation as resolved.
Show resolved Hide resolved
tnozicka marked this conversation as resolved.
Show resolved Hide resolved
defer close(rw.resultChan)

klog.V(4).Info("Starting RetryWatcher.")
defer klog.V(4).Info("Stopping RetryWatcher.")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-rw.stopChan:
cancel()
return
case <-ctx.Done():
return
}
}()

// We use non sliding until so we don't introduce delays on happy path when WATCH call
// timeouts or gets closed and we need to reestablish it while also avoiding hot loops.
wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) {
liggitt marked this conversation as resolved.
Show resolved Hide resolved
done, retryAfter := rw.doReceive()
if done {
cancel()
return
}

time.Sleep(retryAfter)

klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion)
}, rw.minRestartDelay)
}

// ResultChan implements Interface.
func (rw *RetryWatcher) ResultChan() <-chan watch.Event {
return rw.resultChan
}

// Stop implements Interface.
func (rw *RetryWatcher) Stop() {
close(rw.stopChan)
}

// Done allows the caller to be notified when Retry watcher stops.
func (rw *RetryWatcher) Done() <-chan struct{} {
return rw.doneChan
}