Skip to content

Commit

Permalink
Add an option for aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
jindijamie authored and enj committed Sep 8, 2022
1 parent cd37983 commit 474db72
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 8 deletions.
9 changes: 5 additions & 4 deletions cmd/kube-apiserver/app/aggregator.go
Expand Up @@ -111,10 +111,11 @@ func createAggregatorConfig(
SharedInformerFactory: externalInformers,
},
ExtraConfig: aggregatorapiserver.ExtraConfig{
ProxyClientCertFile: commandOptions.ProxyClientCertFile,
ProxyClientKeyFile: commandOptions.ProxyClientKeyFile,
ServiceResolver: serviceResolver,
ProxyTransport: proxyTransport,
ProxyClientCertFile: commandOptions.ProxyClientCertFile,
ProxyClientKeyFile: commandOptions.ProxyClientKeyFile,
ServiceResolver: serviceResolver,
ProxyTransport: proxyTransport,
RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects,
},
}

Expand Down
9 changes: 7 additions & 2 deletions cmd/kube-apiserver/app/options/options.go
Expand Up @@ -76,7 +76,8 @@ type ServerRunOptions struct {
ProxyClientCertFile string
ProxyClientKeyFile string

EnableAggregatorRouting bool
EnableAggregatorRouting bool
AggregatorRejectForwardingRedirects bool

MasterCount int
EndpointReconcilerType string
Expand Down Expand Up @@ -132,7 +133,8 @@ func NewServerRunOptions() *ServerRunOptions {
},
HTTPTimeout: time.Duration(5) * time.Second,
},
ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange,
ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange,
AggregatorRejectForwardingRedirects: true,
}

// Overwrite the default for storage data format.
Expand Down Expand Up @@ -244,6 +246,9 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) {
fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting,
"Turns on aggregator routing requests to endpoints IP rather than cluster IP.")

fs.BoolVar(&s.AggregatorRejectForwardingRedirects, "aggregator-reject-forwarding-redirect", s.AggregatorRejectForwardingRedirects,
"Aggregator reject forwarding redirect response back to client.")

fs.StringVar(&s.ServiceAccountSigningKeyFile, "service-account-signing-key-file", s.ServiceAccountSigningKeyFile, ""+
"Path to the file that contains the current private key of the service account token issuer. The issuer will sign issued ID tokens with this private key.")

Expand Down
5 changes: 3 additions & 2 deletions cmd/kube-apiserver/app/options/options_test.go
Expand Up @@ -318,8 +318,9 @@ func TestAddFlags(t *testing.T) {
Traces: &apiserveroptions.TracingOptions{
ConfigFile: "/var/run/kubernetes/tracing_config.yaml",
},
IdentityLeaseDurationSeconds: 3600,
IdentityLeaseRenewIntervalSeconds: 10,
IdentityLeaseDurationSeconds: 3600,
IdentityLeaseRenewIntervalSeconds: 10,
AggregatorRejectForwardingRedirects: true,
}

if !reflect.DeepEqual(expected, s) {
Expand Down
27 changes: 27 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go
Expand Up @@ -83,6 +83,8 @@ type UpgradeAwareHandler struct {
MaxBytesPerSec int64
// Responder is passed errors that occur while setting up proxying.
Responder ErrorResponder
// Reject to forward redirect response
RejectForwardingRedirects bool
}

const defaultFlushInterval = 200 * time.Millisecond
Expand Down Expand Up @@ -257,6 +259,31 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
proxy.Transport = h.Transport
proxy.FlushInterval = h.FlushInterval
proxy.ErrorLog = log.New(noSuppressPanicError{}, "", log.LstdFlags)
if h.RejectForwardingRedirects {
oldModifyResponse := proxy.ModifyResponse
proxy.ModifyResponse = func(response *http.Response) error {
code := response.StatusCode
if code >= 300 && code <= 399 {
// close the original response
response.Body.Close()
msg := "the backend attempted to redirect this request, which is not permitted"
// replace the response
*response = http.Response{
StatusCode: http.StatusBadGateway,
Status: fmt.Sprintf("%d %s", response.StatusCode, http.StatusText(response.StatusCode)),
Body: io.NopCloser(strings.NewReader(msg)),
ContentLength: int64(len(msg)),
}
} else {
if oldModifyResponse != nil {
if err := oldModifyResponse(response); err != nil {
return err
}
}
}
return nil
}
}
if h.Responder != nil {
// if an optional error interceptor/responder was provided wire it
// the custom responder might be used for providing a unified error reporting
Expand Down
Expand Up @@ -704,6 +704,83 @@ func TestProxyUpgradeErrorResponse(t *testing.T) {
}
}

func TestRejectForwardingRedirectsOption(t *testing.T) {
originalBody := []byte(`some data`)
testCases := []struct {
name string
rejectForwardingRedirects bool
serverStatusCode int
expectStatusCode int
expectBody []byte
}{
{
name: "reject redirection enabled in proxy, backend server sending 200 response",
rejectForwardingRedirects: true,
serverStatusCode: 200,
expectStatusCode: 200,
expectBody: originalBody,
},
{
name: "reject redirection enabled in proxy, backend server sending 301 response",
rejectForwardingRedirects: true,
serverStatusCode: 301,
expectStatusCode: 502,
expectBody: []byte(`the backend attempted to redirect this request, which is not permitted`),
},
{
name: "reject redirection disabled in proxy, backend server sending 200 response",
rejectForwardingRedirects: false,
serverStatusCode: 200,
expectStatusCode: 200,
expectBody: originalBody,
},
{
name: "reject redirection disabled in proxy, backend server sending 301 response",
rejectForwardingRedirects: false,
serverStatusCode: 301,
expectStatusCode: 301,
expectBody: originalBody,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Set up a backend server
backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(tc.serverStatusCode)
w.Write(originalBody)
}))
defer backendServer.Close()
backendServerURL, _ := url.Parse(backendServer.URL)

// Set up a proxy pointing to the backend
proxyHandler := NewUpgradeAwareHandler(backendServerURL, nil, false, false, &fakeResponder{t: t})
proxyHandler.RejectForwardingRedirects = tc.rejectForwardingRedirects
proxy := httptest.NewServer(proxyHandler)
defer proxy.Close()
proxyURL, _ := url.Parse(proxy.URL)

conn, err := net.Dial("tcp", proxyURL.Host)
require.NoError(t, err)
bufferedReader := bufio.NewReader(conn)

req, _ := http.NewRequest("GET", proxyURL.String(), nil)
require.NoError(t, req.Write(conn))
// Verify we get the correct response and message body content
resp, err := http.ReadResponse(bufferedReader, nil)
require.NoError(t, err)
assert.Equal(t, tc.expectStatusCode, resp.StatusCode)
data, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
assert.Equal(t, tc.expectBody, data)
assert.Equal(t, int64(len(tc.expectBody)), resp.ContentLength)
resp.Body.Close()

// clean up
conn.Close()
})
}
}

func TestDefaultProxyTransport(t *testing.T) {
tests := []struct {
name,
Expand Down
7 changes: 7 additions & 0 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
Expand Up @@ -86,6 +86,8 @@ type ExtraConfig struct {

// Mechanism by which the Aggregator will resolve services. Required.
ServiceResolver ServiceResolver

RejectForwardingRedirects bool
}

// Config represents the configuration needed to create an APIAggregator.
Expand Down Expand Up @@ -155,6 +157,9 @@ type APIAggregator struct {
// egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector

// rejectForwardingRedirects is whether to allow to forward redirect response
rejectForwardingRedirects bool
}

// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
Expand Down Expand Up @@ -212,6 +217,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
egressSelector: c.GenericConfig.EgressSelector,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
}

// used later to filter the served resource by those that have expired.
Expand Down Expand Up @@ -442,6 +448,7 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
proxyTransport: s.proxyTransport,
serviceResolver: s.serviceResolver,
egressSelector: s.egressSelector,
rejectForwardingRedirects: s.rejectForwardingRedirects,
}
proxyHandler.updateAPIService(apiService)
if s.openAPIAggregationController != nil {
Expand Down
Expand Up @@ -68,6 +68,9 @@ type proxyHandler struct {
// egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector

// reject to forward redirect response
rejectForwardingRedirects bool
}

type proxyHandlingInfo struct {
Expand Down Expand Up @@ -172,6 +175,9 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}

handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
if r.rejectForwardingRedirects {
handler.RejectForwardingRedirects = true
}
utilflowcontrol.RequestDelegated(req.Context())
handler.ServeHTTP(w, newReq)
}
Expand Down

0 comments on commit 474db72

Please sign in to comment.