Skip to content

Commit

Permalink
UPSTREAM: 97206: clean up executing request on panic
Browse files Browse the repository at this point in the history
  • Loading branch information
tkashem committed Jan 6, 2021
1 parent b1e9f0d commit d188091
Show file tree
Hide file tree
Showing 5 changed files with 376 additions and 8 deletions.
7 changes: 7 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ go_test(
"//staging/src/k8s.io/api/flowcontrol/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/filters:go_default_library",
Expand All @@ -34,7 +37,11 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
"//vendor/golang.org/x/net/http2:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,19 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"strings"
"sync"
"testing"
"time"

flowcontrol "k8s.io/api/flowcontrol/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/apiserver/pkg/authentication/user"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
Expand All @@ -36,7 +43,11 @@ import (
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
)

type mockDecision int
Expand Down Expand Up @@ -333,6 +344,243 @@ func TestApfCancelWaitRequest(t *testing.T) {
})
}

func TestPriorityAndFairnessWithPanicRecoverAndTimeoutFilter(t *testing.T) {
fcmetrics.Register()

t.Run("priority level concurrency is set to 1, request handler panics, next request should not be rejected", func(t *testing.T) {
const (
requestTimeout = time.Minute
userName = "alice"
fsName = "test-fs"
plName = "test-pl"
serverConcurrency, plConcurrencyShares, plConcurrency = 1, 1, 1
)

objects := newConfiguration(fsName, plName, userName, flowcontrol.LimitResponseTypeReject, plConcurrencyShares)
clientset := newClientset(t, objects...)
// this test does not rely on resync, so resync period is set to zero
factory := informers.NewSharedInformerFactory(clientset, 0)
controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta1(), serverConcurrency, requestTimeout/4)

stopCh, controllerCompletedCh := make(chan struct{}), make(chan struct{})
factory.Start(stopCh)

// wait for the informer cache to sync.
timeout, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
cacheSyncDone := factory.WaitForCacheSync(timeout.Done())
if names := unsyncedInformers(cacheSyncDone); len(names) > 0 {
t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names)
}

var controllerErr error
go func() {
defer close(controllerCompletedCh)
controllerErr = controller.Run(stopCh)
}()

// make sure that apf controller syncs the priority level configuration object we are using in this test.
// read the metrics and ensure the concurrency limit for our priority level is set to the expected value.
pollErr := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
if err := gaugeValueMatch("apiserver_flowcontrol_request_concurrency_limit", map[string]string{"priority_level": plName}, plConcurrency); err != nil {
t.Logf("polling retry - error: %s", err)
return false, nil
}
return true, nil
})
if pollErr != nil {
t.Fatalf("expected the apf controller to sync the priotity level configuration object: %s", "test-pl")
}

var executed bool
// we will raise a panic for the first request.
firstRequestPathPanic := "/request/panic"
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
executed = true
expectMatchingAPFHeaders(t, w, fsName, plName)

if r.URL.Path == firstRequestPathPanic {
panic(fmt.Errorf("request handler panic'd as designed - %#v", r.RequestURI))
}
})
handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout)

server, requestGetter := newHTTP2ServerWithClient(handler)
defer server.Close()

var err error
_, err = requestGetter(firstRequestPathPanic)
if !executed {
t.Errorf("expected inner handler to be executed for request: %s", firstRequestPathPanic)
}
expectResetStreamError(t, err)

executed = false
// the second request should be served successfully.
secondRequestPathShouldWork := "/request/should-work"
response, err := requestGetter(secondRequestPathShouldWork)
if !executed {
t.Errorf("expected inner handler to be executed for request: %s", secondRequestPathShouldWork)
}
if err != nil {
t.Errorf("expected request: %s to succeed, but got error: %#v", secondRequestPathShouldWork, err)
}
if response.StatusCode != http.StatusOK {
t.Errorf("expected HTTP status code: %d for request: %s, but got: %#v", http.StatusOK, secondRequestPathShouldWork, response)
}

close(stopCh)
t.Log("waiting for the controller to shutdown")
<-controllerCompletedCh

if controllerErr != nil {
t.Errorf("expected a nil error from controller, but got: %#v", controllerErr)
}
})
}

// returns a started http2 server, with a client function to send request to the server.
func newHTTP2ServerWithClient(handler http.Handler) (*httptest.Server, func(path string) (*http.Response, error)) {
server := httptest.NewUnstartedServer(handler)
server.EnableHTTP2 = true
server.StartTLS()

return server, func(path string) (*http.Response, error) {
return server.Client().Get(server.URL + path)
}
}

// verifies that the expected flow schema and priority level UIDs are attached to the header.
func expectMatchingAPFHeaders(t *testing.T, w http.ResponseWriter, expectedFS, expectedPL string) {
if w == nil {
t.Fatal("expected a non nil HTTP response")
}

key := flowcontrol.ResponseHeaderMatchedFlowSchemaUID
if value := w.Header().Get(key); expectedFS != value {
t.Fatalf("expected HTTP header %s to have value %q, but got: %q", key, expectedFS, value)
}

key = flowcontrol.ResponseHeaderMatchedPriorityLevelConfigurationUID
if value := w.Header().Get(key); expectedPL != value {
t.Fatalf("expected HTTP header %s to have value %q, but got %q", key, expectedPL, value)
}
}

// when a request panics, http2 resets the stream with an INTERNAL_ERROR message
func expectResetStreamError(t *testing.T, err error) {
if err == nil {
t.Fatalf("expected the server to send an error, but got nil")
}

uerr, ok := err.(*url.Error)
if !ok {
t.Fatalf("expected the error to be of type *url.Error, but got: %T", err)
}
if !strings.Contains(uerr.Error(), "INTERNAL_ERROR") {
t.Fatalf("expected a stream reset error, but got: %s", uerr.Error())
}
}

func newClientset(t *testing.T, objects ...runtime.Object) clientset.Interface {
clientset := fake.NewSimpleClientset(objects...)
if clientset == nil {
t.Fatal("unable to create fake client set")
}
return clientset
}

// builds a chain of handlers that include the panic recovery and timeout filter, so we can simulate the behavior of
// a real apiserver.
// the specified user is added as the authenticated user to the request context.
func newHandlerChain(t *testing.T, handler http.Handler, filter utilflowcontrol.Interface, userName string, requestTimeout time.Duration) http.Handler {
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))

apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter)

// add the handler in the chain that adds the specified user to the request context
handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{
Name: userName,
Groups: []string{user.AllAuthenticated},
}))

apfHandler.ServeHTTP(w, r)
})

handler = WithTimeoutForNonLongRunningRequests(handler, longRunningRequestCheck, requestTimeout)
handler = apifilters.WithRequestInfo(handler, requestInfoFactory)
handler = WithPanicRecovery(handler, requestInfoFactory, func() bool { return false })
return handler
}

func unsyncedInformers(status map[reflect.Type]bool) []string {
names := make([]string, 0)

for objType, synced := range status {
if !synced {
names = append(names, objType.Name())
}
}

return names
}

func newConfiguration(fsName, plName, user string, responseType flowcontrol.LimitResponseType, concurrency int32) []runtime.Object {
fs := &flowcontrol.FlowSchema{
ObjectMeta: metav1.ObjectMeta{
Name: fsName,
UID: types.UID(fsName),
},
Spec: flowcontrol.FlowSchemaSpec{
MatchingPrecedence: 1,
PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
Name: plName,
},
DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
Type: flowcontrol.FlowDistinguisherMethodByUserType,
},
Rules: []flowcontrol.PolicyRulesWithSubjects{
{
Subjects: []flowcontrol.Subject{
{
Kind: flowcontrol.SubjectKindUser,
User: &flowcontrol.UserSubject{
Name: user,
},
},
},
NonResourceRules: []flowcontrol.NonResourcePolicyRule{
{
Verbs: []string{flowcontrol.VerbAll},
NonResourceURLs: []string{flowcontrol.NonResourceAll},
},
},
},
},
},
}

pl := &flowcontrol.PriorityLevelConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: plName,
UID: types.UID(plName),
},
Spec: flowcontrol.PriorityLevelConfigurationSpec{
Type: flowcontrol.PriorityLevelEnablementLimited,
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
AssuredConcurrencyShares: concurrency,
LimitResponse: flowcontrol.LimitResponse{
Type: responseType,
},
},
},
}

return []runtime.Object{fs, pl}
}

// gathers and checks the metrics.
func checkForExpectedMetrics(t *testing.T, expectedMetrics []string) {
metricsFamily, err := legacyregistry.DefaultGatherer.Gather()
Expand All @@ -353,3 +601,40 @@ func checkForExpectedMetrics(t *testing.T, expectedMetrics []string) {
}
}
}

// gaugeValueMatch ensures that the value of gauge metrics matching the labelFilter is as expected.
func gaugeValueMatch(name string, labelFilter map[string]string, wantValue int) error {
metrics, err := legacyregistry.DefaultGatherer.Gather()
if err != nil {
return fmt.Errorf("failed to gather metrics: %s", err)
}

sum := 0
familyMatch, labelMatch := false, false
for _, mf := range metrics {
if mf.GetName() != name {
continue
}

familyMatch = true
for _, metric := range mf.GetMetric() {
if !testutil.LabelsMatch(metric, labelFilter) {
continue
}

labelMatch = true
sum += int(metric.GetGauge().GetValue())
}
}
if !familyMatch {
return fmt.Errorf("expected to find the metric family: %s in the gathered result", name)
}
if !labelMatch {
return fmt.Errorf("expected to find metrics with matching labels: %#+v", labelFilter)
}
if wantValue != sum {
return fmt.Errorf("expected the sum to be: %d, but got: %d for gauge metric: %s with labels %#+v", wantValue, sum, name, labelFilter)
}

return nil
}
19 changes: 13 additions & 6 deletions staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,28 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque
}
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, queued=%v", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt, queued)
var executed bool
idle := req.Finish(func() {
idle, panicking := true, true
defer func() {
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, queued=%v, Finish() => panicking=%v idle=%v",
requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt, queued, panicking, idle)
if idle {
cfgCtlr.maybeReap(pl.Name)
}
}()
idle = req.Finish(func() {
if queued {
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
}
metrics.AddDispatch(pl.Name, fs.Name)
executed = true
startExecutionTime := time.Now()
defer func() {
metrics.ObserveExecutionDuration(pl.Name, fs.Name, time.Since(startExecutionTime))
}()
execFn()
metrics.ObserveExecutionDuration(pl.Name, fs.Name, time.Since(startExecutionTime))
})
if queued && !executed {
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
}
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, queued=%v, Finish() => idle=%v", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt, queued, idle)
if idle {
cfgCtlr.maybeReap(pl.Name)
}
panicking = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,15 @@ func (req *request) Finish(execFn func()) bool {
if !exec {
return idle
}
execFn()
return req.qs.finishRequestAndDispatchAsMuchAsPossible(req)
func() {
defer func() {
idle = req.qs.finishRequestAndDispatchAsMuchAsPossible(req)
}()

execFn()
}()

return idle
}

func (req *request) wait() (bool, bool) {
Expand Down
Loading

0 comments on commit d188091

Please sign in to comment.