Skip to content

Commit

Permalink
Merge pull request kubernetes#105640 from caesarxuchao/automated-cher…
Browse files Browse the repository at this point in the history
…ry-pick-of-#104985-kubernetes#105475-kubernetes#105582-upstream-release-1.22

Automated cherry pick of kubernetes#104985: Aggregator uses the regular transport even if the request
kubernetes#105475: apiserver aggregator upgrade unit test
kubernetes#105582: Verifying the auth headers are set for upgraded aggregated
  • Loading branch information
k8s-ci-robot committed Oct 14, 2021
2 parents 98556f6 + b30f24e commit a6a28f5
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,16 @@ func NewEgressSelector(config *apiserver.EgressSelectorConfiguration) (*EgressSe
return cs, nil
}

// NewEgressSelectorWithMap returns a EgressSelector with the supplied EgressType to DialFunc map.
func NewEgressSelectorWithMap(m map[EgressType]utilnet.DialFunc) *EgressSelector {
if m == nil {
m = make(map[EgressType]utilnet.DialFunc)
}
return &EgressSelector{
egressToDialer: m,
}
}

// Lookup gets the dialer function for the network context.
// This is configured for the Kubernetes API Server at startup.
func (cs *EgressSelector) Lookup(networkContext NetworkContext) (utilnet.DialFunc, error) {
Expand Down
2 changes: 0 additions & 2 deletions staging/src/k8s.io/kube-aggregator/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 8 additions & 32 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/proxy"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
Expand Down Expand Up @@ -161,23 +160,21 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}

// we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers
proxyRoundTripper, upgrade, err := maybeWrapForConnectionUpgrades(handlingInfo.restConfig, handlingInfo.proxyRoundTripper, req)
if err != nil {
proxyError(w, req, err.Error(), http.StatusInternalServerError)
return
}
proxyRoundTripper := handlingInfo.proxyRoundTripper
upgrade := httpstream.IsUpgradeRequest(req)

proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper)

// if we are upgrading, then the upgrade path tries to use this request with the TLS config we provide, but it does
// NOT use the roundtripper. Its a direct call that bypasses the round tripper. This means that we have to
// attach the "correct" user headers to the request ahead of time. After the initial upgrade, we'll be back
// at the roundtripper flow, so we only have to muck with this request, but we do have to do it.
// If we are upgrading, then the upgrade path tries to use this request with the TLS config we provide, but it does
// NOT use the proxyRoundTripper. It's a direct dial that bypasses the proxyRoundTripper. This means that we have to
// attach the "correct" user headers to the request ahead of time.
if upgrade {
transport.SetAuthProxyHeaders(newReq, user.GetName(), user.GetGroups(), user.GetExtra())
}

handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
handler.InterceptRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects)
handler.RequireSameHostRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ValidateProxyRedirects)
handler.ServeHTTP(w, newReq)
}

Expand Down Expand Up @@ -212,27 +209,6 @@ func newRequestForProxy(location *url.URL, req *http.Request) (*http.Request, co
return newReq, cancelFn
}

// maybeWrapForConnectionUpgrades wraps the roundtripper for upgrades. The bool indicates if it was wrapped
func maybeWrapForConnectionUpgrades(restConfig *restclient.Config, rt http.RoundTripper, req *http.Request) (http.RoundTripper, bool, error) {
if !httpstream.IsUpgradeRequest(req) {
return rt, false, nil
}

tlsConfig, err := restclient.TLSConfigFor(restConfig)
if err != nil {
return nil, true, err
}
followRedirects := utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects)
requireSameHostRedirects := utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ValidateProxyRedirects)
upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, followRedirects, requireSameHostRedirects)
wrappedRT, err := restclient.HTTPWrappersForConfig(restConfig, upgradeRoundTripper)
if err != nil {
return nil, true, err
}

return wrappedRT, true, nil
}

// responder implements rest.Responder for assisting a connector in writing objects or errors.
type responder struct {
w http.ResponseWriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ limitations under the License.
package apiserver

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"net/http/httputil"
Expand All @@ -38,10 +40,12 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/authentication/user"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
Expand Down Expand Up @@ -379,11 +383,43 @@ func TestProxyHandler(t *testing.T) {
}
}

type mockEgressDialer struct {
called int
}

func (m *mockEgressDialer) dial(ctx context.Context, net, addr string) (net.Conn, error) {
m.called++
return http.DefaultTransport.(*http.Transport).DialContext(ctx, net, addr)
}

func (m *mockEgressDialer) dialBroken(ctx context.Context, net, addr string) (net.Conn, error) {
m.called++
return nil, fmt.Errorf("Broken dialer")
}

func newDialerAndSelector() (*mockEgressDialer, *egressselector.EgressSelector) {
dialer := &mockEgressDialer{}
m := make(map[egressselector.EgressType]utilnet.DialFunc)
m[egressselector.Cluster] = dialer.dial
es := egressselector.NewEgressSelectorWithMap(m)
return dialer, es
}

func newBrokenDialerAndSelector() (*mockEgressDialer, *egressselector.EgressSelector) {
dialer := &mockEgressDialer{}
m := make(map[egressselector.EgressType]utilnet.DialFunc)
m[egressselector.Cluster] = dialer.dialBroken
es := egressselector.NewEgressSelectorWithMap(m)
return dialer, es
}

func TestProxyUpgrade(t *testing.T) {
upgradeUser := "upgradeUser"
testcases := map[string]struct {
APIService *apiregistration.APIService
ExpectError bool
ExpectCalled bool
APIService *apiregistration.APIService
NewEgressSelector func() (*mockEgressDialer, *egressselector.EgressSelector)
ExpectError bool
ExpectCalled bool
}{
"valid hostname + CABundle": {
APIService: &apiregistration.APIService{
Expand Down Expand Up @@ -436,18 +472,58 @@ func TestProxyUpgrade(t *testing.T) {
ExpectError: true,
ExpectCalled: false,
},
"valid hostname + CABundle + egress selector": {
APIService: &apiregistration.APIService{
Spec: apiregistration.APIServiceSpec{
CABundle: testCACrt,
Group: "mygroup",
Version: "v1",
Service: &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
},
Status: apiregistration.APIServiceStatus{
Conditions: []apiregistration.APIServiceCondition{
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
},
},
},
NewEgressSelector: newDialerAndSelector,
ExpectError: false,
ExpectCalled: true,
},
"valid hostname + CABundle + egress selector non working": {
APIService: &apiregistration.APIService{
Spec: apiregistration.APIServiceSpec{
CABundle: testCACrt,
Group: "mygroup",
Version: "v1",
Service: &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: pointer.Int32Ptr(443)},
},
Status: apiregistration.APIServiceStatus{
Conditions: []apiregistration.APIServiceCondition{
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
},
},
},
NewEgressSelector: newBrokenDialerAndSelector,
ExpectError: true,
ExpectCalled: false,
},
}

for k, tc := range testcases {
tcName := k
path := "/apis/" + tc.APIService.Spec.Group + "/" + tc.APIService.Spec.Version + "/foo"
timesCalled := int32(0)

func() { // Cleanup after each test case.
t.Run(tcName, func(t *testing.T) {
path := "/apis/" + tc.APIService.Spec.Group + "/" + tc.APIService.Spec.Version + "/foo"
timesCalled := int32(0)
backendHandler := http.NewServeMux()
backendHandler.Handle(path, websocket.Handler(func(ws *websocket.Conn) {
atomic.AddInt32(&timesCalled, 1)
defer ws.Close()
req := ws.Request()
user := req.Header.Get("X-Remote-User")
if user != upgradeUser {
t.Errorf("expected user %q, got %q", upgradeUser, user)
}
body := make([]byte, 5)
ws.Read(body)
ws.Write([]byte("hello " + string(body)))
Expand Down Expand Up @@ -475,8 +551,16 @@ func TestProxyUpgrade(t *testing.T) {
proxyTransport: &http.Transport{},
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
}

var dialer *mockEgressDialer
var selector *egressselector.EgressSelector
if tc.NewEgressSelector != nil {
dialer, selector = tc.NewEgressSelector()
proxyHandler.egressSelector = selector
}

proxyHandler.updateAPIService(tc.APIService)
aggregator := httptest.NewServer(contextHandler(proxyHandler, &user.DefaultInfo{Name: "username"}))
aggregator := httptest.NewServer(contextHandler(proxyHandler, &user.DefaultInfo{Name: upgradeUser}))
defer aggregator.Close()

ws, err := websocket.Dial("ws://"+aggregator.Listener.Addr().String()+path, "", "http://127.0.0.1/")
Expand All @@ -487,6 +571,12 @@ func TestProxyUpgrade(t *testing.T) {
return
}
defer ws.Close()

// if the egressselector is configured assume it has to be called
if dialer != nil && dialer.called != 1 {
t.Errorf("expect egress dialer gets called %d times, got %d", 1, dialer.called)
}

if tc.ExpectError {
t.Errorf("%s: expected websocket error, got none", tcName)
return
Expand All @@ -507,7 +597,7 @@ func TestProxyUpgrade(t *testing.T) {
t.Errorf("%s: expected '%#v', got '%#v'", tcName, e, a)
return
}
}()
})
}
}

Expand Down

0 comments on commit a6a28f5

Please sign in to comment.