Skip to content

Commit

Permalink
ref
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtek-t committed May 20, 2021
1 parent 76a1b35 commit 5f19207
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ var waitingMark = &requestWatermark{
var atomicMutatingExecuting, atomicReadOnlyExecuting int32
var atomicMutatingWaiting, atomicReadOnlyWaiting int32

// newInitializationSignal is defined for testing purposes.
var newInitializationSignal = utilflowcontrol.NewInitializationSignal

// WithPriorityAndFairness limits the number of in-flight
// requests in a fine-grained way.
func WithPriorityAndFairness(
Expand Down Expand Up @@ -79,8 +82,6 @@ func WithPriorityAndFairness(
// Skip tracking long running non-watch requests.
if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) && !isWatchRequest {
klog.V(6).Infof("Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user)
// FIXME: Remove before submitting.
klog.Errorf("UUUUU Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user)
handler.ServeHTTP(w, r)
return
}
Expand Down Expand Up @@ -119,9 +120,7 @@ func WithPriorityAndFairness(
innerCtx := ctx
var watchInitializationSignal utilflowcontrol.InitializationSignal
if isWatchRequest {
// FIXME: Remove before submitting.
klog.Errorf("AAA Setting initialization channel")
watchInitializationSignal = utilflowcontrol.NewInitializationSignal()
watchInitializationSignal = newInitializationSignal()
innerCtx = utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal)
}
innerReq := r.Clone(innerCtx)
Expand All @@ -139,8 +138,6 @@ func WithPriorityAndFairness(
}()

watchInitializationSignal.Wait()
// FIXME: Remove before submitting.
klog.Errorf("BBB Initialization observed")
} else {
handler.ServeHTTP(w, innerReq)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,21 @@ func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptes
}

func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func(), t *testing.T) *httptest.Server {
fakeFilter := fakeApfFilter{
mockDecision: decision,
postEnqueue: postEnqueue,
postDequeue: postDequeue,
}
return newApfServerWithFilter(fakeFilter, onExecute, postExecute, t)
}

func newApfServerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) *httptest.Server {
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))

apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
onExecute()
}), longRunningRequestCheck, fakeApfFilter{
mockDecision: decision,
postEnqueue: postEnqueue,
postDequeue: postDequeue,
})
}), longRunningRequestCheck, flowControlFilter)

handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{
Expand Down Expand Up @@ -334,6 +339,136 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
})
}

type fakeWatchApfFilter struct {
lock sync.Mutex
inflight int
capacity int
}

func (f *fakeWatchApfFilter) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest,
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration),
_ fq.QueueNoteFn,
execFn func(),
) {
canExecute := false
func() {
f.lock.Lock()
defer f.lock.Unlock()
if f.inflight < f.capacity {
f.inflight++
canExecute = true
}
}()
if !canExecute {
return
}

execFn()

f.lock.Lock()
defer f.lock.Unlock()
f.inflight--
}

func (f *fakeWatchApfFilter) MaintainObservations(stopCh <-chan struct{}) {
}

func (f *fakeWatchApfFilter) Run(stopCh <-chan struct{}) error {
return nil
}

func (t *fakeWatchApfFilter) Install(c *mux.PathRecorderMux) {
}

func (f *fakeWatchApfFilter) wait() error {
return wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
f.lock.Lock()
defer f.lock.Unlock()
return f.inflight == 0, nil
})
}

func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
signalsLock := sync.Mutex{}
signals := []utilflowcontrol.InitializationSignal{}
sendSignals := func() {
signalsLock.Lock()
defer signalsLock.Unlock()
for i := range signals {
signals[i].Signal()
}
signals = signals[:0]
}

newInitializationSignal = func () utilflowcontrol.InitializationSignal {
signalsLock.Lock()
defer signalsLock.Unlock()
signal := utilflowcontrol.NewInitializationSignal()
signals = append(signals, signal)
return signal
}
defer func() {
newInitializationSignal = utilflowcontrol.NewInitializationSignal
}()

// We test if initialization after receiving initialization signal the
// new requests will be allowed to run by:
// - sending N requests that will occupy the whole capacity
// - sending initialiation signals for them
// - ensuring that number of inflight requests will get to zero
concurrentRequests := 5
firstRunning := sync.WaitGroup{}
firstRunning.Add(concurrentRequests)
allRunning := sync.WaitGroup{}
allRunning.Add(2 * concurrentRequests)

fakeFilter := &fakeWatchApfFilter{
capacity: concurrentRequests,
}

onExecuteFunc := func() {
firstRunning.Done()
firstRunning.Wait()

sendSignals()
fakeFilter.wait()

allRunning.Done()
allRunning.Wait()
}

postExecuteFunc := func() {}

server := newApfServerWithFilter(fakeFilter, onExecuteFunc, postExecuteFunc, t)
defer server.Close()

var wg sync.WaitGroup
wg.Add(2 * concurrentRequests)
for i := 0; i < concurrentRequests; i++ {
go func() {
defer wg.Done()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil {
t.Error(err)
}
}()
}

firstRunning.Wait()
fakeFilter.wait()

firstRunning.Add(concurrentRequests)
for i := 0; i < concurrentRequests; i++ {
go func() {
defer wg.Done()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil {
t.Error(err)
}
}()
}
wg.Wait()
}

func TestApfCancelWaitRequest(t *testing.T) {
epmetrics.Register()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,6 @@ func TestWatchInitializationSignal(t *testing.T) {
_, store, cluster := testSetup(t)
defer cluster.Terminate(t)


ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
initSignal := utilflowcontrol.NewInitializationSignal()
ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package flowcontrol
import (
"context"
"sync"

"k8s.io/klog/v2"
)

type priorityAndFairnessKeyType int
Expand Down Expand Up @@ -50,7 +48,6 @@ func initializationSignalFrom(ctx context.Context) (InitializationSignal, bool)
// that a given watch request has already been initialized.
func WatchInitialized(ctx context.Context) {
if signal, ok := initializationSignalFrom(ctx); ok {
klog.Errorf("QQQ: Marking watch initialized")
signal.Signal()
}
}
Expand Down

0 comments on commit 5f19207

Please sign in to comment.