Skip to content

Commit

Permalink
Implement support for watch initialization in P&F
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtek-t committed May 17, 2021
1 parent c08526c commit 76a1b35
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ const (
observationMaintenancePeriod = 10 * time.Second
)

var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
var (
nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
watchVerbs = sets.NewString("watch")
)

func handleError(w http.ResponseWriter, r *http.Request, err error) {
errorMsg := fmt.Sprintf("Internal Server Error: %#v", r.RequestURI)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ limitations under the License.
package filters

import (
"context"
"fmt"
"net/http"
"sync"
"sync/atomic"

flowcontrol "k8s.io/api/flowcontrol/v1beta1"
Expand All @@ -31,10 +31,6 @@ import (
"k8s.io/klog/v2"
)

type priorityAndFairnessKeyType int

const priorityAndFairnessKey priorityAndFairnessKeyType = iota

// PriorityAndFairnessClassification identifies the results of
// classification for API Priority and Fairness
type PriorityAndFairnessClassification struct {
Expand All @@ -44,12 +40,6 @@ type PriorityAndFairnessClassification struct {
PriorityLevelUID apitypes.UID
}

// GetClassification returns the classification associated with the
// given context, if any, otherwise nil
func GetClassification(ctx context.Context) *PriorityAndFairnessClassification {
return ctx.Value(priorityAndFairnessKey).(*PriorityAndFairnessClassification)
}

// waitingMark tracks requests waiting rather than being executed
var waitingMark = &requestWatermark{
phase: epmetrics.WaitingPhase,
Expand Down Expand Up @@ -84,9 +74,13 @@ func WithPriorityAndFairness(
return
}

// Skip tracking long running requests.
if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) {
isWatchRequest := watchVerbs.Has(requestInfo.Verb)

// Skip tracking long running non-watch requests.
if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) && !isWatchRequest {
klog.V(6).Infof("Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user)
// FIXME: Remove before submitting.
klog.Errorf("UUUUU Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user)
handler.ServeHTTP(w, r)
return
}
Expand Down Expand Up @@ -116,15 +110,40 @@ func WithPriorityAndFairness(
waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta)))
}
}
wg := sync.WaitGroup{}
execute := func() {
noteExecutingDelta(1)
defer noteExecutingDelta(-1)
served = true
innerCtx := context.WithValue(ctx, priorityAndFairnessKey, classification)

innerCtx := ctx
var watchInitializationSignal utilflowcontrol.InitializationSignal
if isWatchRequest {
// FIXME: Remove before submitting.
klog.Errorf("AAA Setting initialization channel")
watchInitializationSignal = utilflowcontrol.NewInitializationSignal()
innerCtx = utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal)
}
innerReq := r.Clone(innerCtx)
setResponseHeaders(classification, w)

handler.ServeHTTP(w, innerReq)
if isWatchRequest {
wg.Add(1)
go func() {
defer wg.Done()
// Protect from the situations when request will not reach storage layer
// and the initialization signal will not be send.
defer watchInitializationSignal.Signal()

handler.ServeHTTP(w, innerReq)
}()

watchInitializationSignal.Wait()
// FIXME: Remove before submitting.
klog.Errorf("BBB Initialization observed")
} else {
handler.ServeHTTP(w, innerReq)
}
}
digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user}
fcIfc.Handle(ctx, digest, note, func(inQueue bool) {
Expand All @@ -143,9 +162,13 @@ func WithPriorityAndFairness(
epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.ReadOnlyKind).Inc()
}
epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests)
if isWatchRequest {
wg.Done()
}
tooManyRequests(r, w)
}

// In case of watch, from P&F POV it already finished, but we need to wait until the request itself finishes.
wg.Wait()
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestApfSkipLongRunningRequest(t *testing.T) {
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())

// send a watch request to test skipping long running request
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil {
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/foos/foo/proxy", server.URL), http.StatusOK); err != nil {
// request should not be rejected
t.Error(err)
}
Expand Down
9 changes: 9 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
Expand Down Expand Up @@ -1413,6 +1414,14 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime)
}

// At this point we already start processing incoming watch events.
// However, the init event can still be processed because their serialization
// and sending to the client happens asynchrnously.
// TODO: As describe in the KEP, we would like to estimate that by delaying
// the initialization signal proportionally to the number of events to
// process, but we're leaving this to the tuning phase.
utilflowcontrol.WatchInitialized(ctx)

defer close(c.result)
defer c.Stop()
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
)

var (
Expand Down Expand Up @@ -701,6 +702,26 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
}
}

func TestWatchInitializationSignal(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()

ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
initSignal := utilflowcontrol.NewInitializationSignal()
ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal)

_, err = cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}

initSignal.Wait()
}

func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBookmarks bool) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
Expand Down
9 changes: 9 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"

"go.etcd.io/etcd/clientv3"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -120,6 +121,14 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, p
}
wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred)
go wc.run()

// For etcd watch we don't have an easy way to answer whether the watch
// has already caught up. So in the initial version (given that watchcache
// is by default enabled for all resources but Events), we just deliver
// the initialization signal immediately. Improving this will be explored
// in the future.
utilflowcontrol.WatchInitialized(ctx)

return wc, nil
}

Expand Down
19 changes: 19 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
)

func TestWatch(t *testing.T) {
Expand Down Expand Up @@ -313,6 +314,24 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
}
}

func TestWatchInitializationSignal(t *testing.T) {
_, store, cluster := testSetup(t)
defer cluster.Terminate(t)


ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
initSignal := utilflowcontrol.NewInitializationSignal()
ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal)

key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
_, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}

initSignal.Wait()
}

func TestProgressNotify(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
clusterConfig := &integration.ClusterConfig{
Expand Down
85 changes: 85 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package flowcontrol

import (
"context"
"sync"

"k8s.io/klog/v2"
)

type priorityAndFairnessKeyType int

const (
// priorityAndFairnessInitializationSignalKey is a key under which
// initialization signal function for watch requests is stored
// in the context.
priorityAndFairnessInitializationSignalKey priorityAndFairnessKeyType = iota
)

// WithInitializationSignal creates a copy of parent context with
// priority and fairness initialization signal value.
func WithInitializationSignal(ctx context.Context, signal InitializationSignal) context.Context {
return context.WithValue(ctx, priorityAndFairnessInitializationSignalKey, signal)
}

// initializationSignalFrom returns an initialization signal function
// which when called signals that watch initialization has already finished
// to priority and fairness dispatcher.
func initializationSignalFrom(ctx context.Context) (InitializationSignal, bool) {
signal, ok := ctx.Value(priorityAndFairnessInitializationSignalKey).(InitializationSignal)
return signal, ok && signal != nil
}

// WatchInitialized sends a signal to priority and fairness dispatcher
// that a given watch request has already been initialized.
func WatchInitialized(ctx context.Context) {
if signal, ok := initializationSignalFrom(ctx); ok {
klog.Errorf("QQQ: Marking watch initialized")
signal.Signal()
}
}

// InitializationSignal is an interface that allows sending and handling
// initialization signals.
type InitializationSignal interface {
// Signal notifies the dispatcher about finished initialization.
Signal()
// Wait waits for the initialization signal.
Wait()
}

type initializationSignal struct {
once sync.Once
done chan struct{}
}

func NewInitializationSignal() InitializationSignal {
return &initializationSignal{
once: sync.Once{},
done: make(chan struct{}),
}
}

func (i *initializationSignal) Signal() {
i.once.Do(func() { close(i.done) })
}

func (i *initializationSignal) Wait() {
<-i.done
}

0 comments on commit 76a1b35

Please sign in to comment.