Skip to content
Permalink
Browse files
Merge pull request #105511 from benluddy/apf-delegated-signal
Free APF seats for watches handled by an aggregated apiserver.
  • Loading branch information
k8s-ci-robot committed Oct 21, 2021
2 parents 14c0f84 + 1873915 commit 313b43a8cb792efb26de0ade8da7bb9abe523674
@@ -52,6 +52,17 @@ func WatchInitialized(ctx context.Context) {
}
}

// RequestDelegated informs the priority and fairness dispatcher that
// a given request has been delegated to an aggregated API
// server. No-op when priority and fairness is disabled.
func RequestDelegated(ctx context.Context) {
// The watch initialization signal doesn't traverse request
// boundaries, so we generously fire it as soon as we know
// that the request won't be serviced locally. Safe to call
// for non-watch requests.
WatchInitialized(ctx)
}

// InitializationSignal is an interface that allows sending and handling
// initialization signals.
type InitializationSignal interface {
@@ -35,6 +35,7 @@ import (
genericfeatures "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server/egressselector"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/apiserver/pkg/util/x509metrics"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport"
@@ -175,6 +176,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
handler.InterceptRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects)
handler.RequireSameHostRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ValidateProxyRedirects)
utilflowcontrol.RequestDelegated(req.Context())
handler.ServeHTTP(w, newReq)
}

@@ -46,6 +46,7 @@ import (
"k8s.io/apiserver/pkg/authentication/user"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/egressselector"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
@@ -899,6 +900,135 @@ func TestProxyCertReload(t *testing.T) {
}
}

type fcInitSignal struct {
nSignals int32
}

func (s *fcInitSignal) SignalCount() int {
return int(atomic.SwapInt32(&s.nSignals, 0))
}

func (s *fcInitSignal) Signal() {
atomic.AddInt32(&s.nSignals, 1)
}

func (s *fcInitSignal) Wait() {
}

type hookedListener struct {
l net.Listener
onAccept func()
}

func (wl *hookedListener) Accept() (net.Conn, error) {
wl.onAccept()
return wl.l.Accept()
}

func (wl *hookedListener) Close() error {
return wl.l.Close()
}

func (wl *hookedListener) Addr() net.Addr {
return wl.l.Addr()
}

func TestFlowControlSignal(t *testing.T) {
for _, tc := range []struct {
Name string
Local bool
Available bool
Request http.Request
SignalExpected bool
}{
{
Name: "local",
Local: true,
SignalExpected: false,
},
{
Name: "unavailable",
Local: false,
Available: false,
SignalExpected: false,
},
{
Name: "request performed",
Local: false,
Available: true,
SignalExpected: true,
},
{
Name: "upgrade request performed",
Local: false,
Available: true,
Request: http.Request{
Header: http.Header{"Connection": []string{"Upgrade"}},
},
SignalExpected: true,
},
} {
t.Run(tc.Name, func(t *testing.T) {
okh := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

var sig fcInitSignal

var signalCountOnAccept int32
backend := httptest.NewUnstartedServer(okh)
backend.Listener = &hookedListener{
l: backend.Listener,
onAccept: func() {
atomic.StoreInt32(&signalCountOnAccept, int32(sig.SignalCount()))
},
}
backend.Start()
defer backend.Close()

p := proxyHandler{
localDelegate: okh,
serviceResolver: &mockedRouter{destinationHost: backend.Listener.Addr().String()},
}

server := httptest.NewServer(contextHandler(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p.ServeHTTP(w, r.WithContext(utilflowcontrol.WithInitializationSignal(r.Context(), &sig)))
}),
&user.DefaultInfo{
Name: "username",
Groups: []string{"one", "two"},
},
))
defer server.Close()

p.handlingInfo.Store(proxyHandlingInfo{
local: tc.Local,
serviceAvailable: tc.Available,
proxyRoundTripper: backend.Client().Transport,
})

surl, err := url.Parse(server.URL)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

req := tc.Request
req.URL = surl
_, err = server.Client().Do(&req)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if fired := (atomic.LoadInt32(&signalCountOnAccept) > 0); tc.SignalExpected && !fired {
t.Errorf("flow control signal expected but not fired")
} else if fired && !tc.SignalExpected {
t.Errorf("flow control signal fired but not expected")
}
})
}
}

func getCertAndKeyPaths(t *testing.T) (string, string, string) {
dir, err := ioutil.TempDir(os.TempDir(), "k8s-test-handler-proxy-cert")
if err != nil {

0 comments on commit 313b43a

Please sign in to comment.