Skip to content

Commit

Permalink
Parallelize namespace-check requests (#2575)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andres Martinez Gotor committed Mar 23, 2021
1 parent cfd6c48 commit 0b26aa8
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 28 deletions.
7 changes: 7 additions & 0 deletions chart/kubeapps/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ kubectl delete namespace kubeapps
- [Is there any API documentation?](#is-there-any-api-documentation)
- [Why can't I configure global private repositories?](#why-cant-i-configure-global-private-repositories)
- [Does Kubeapps support Operators?](#does-kubeapps-support-operators)
- [Slow response when listing namespaces?](#slow-response-when-listing-namespaces)
- [More questions?](#more-questions)

### How to install Kubeapps for demo purposes?
Expand Down Expand Up @@ -275,6 +276,12 @@ You could alternatively ensure that the `imagePullSecret` is available in all na

Yes! You can get started by following the [operators documentation](https://github.com/kubeapps/kubeapps/blob/master/docs/user/operators.md).

### Slow response when listing namespaces

Kubeapps uses the currently logged-in user credential to retrieve the list of all namespaces. If the user doesn't have permission to list namespaces, the backend will try again with its own service account to list all namespaces and then iterate through each namespace to check if the user has permissions to get secrets for each namespace (to verify if they should be allowed to use that namespace or not and hence whether it is included in the selector). This can lead to a slow response if the number of namespaces on the cluster is large.

To reduce this time, you can increase the number of checks that Kubeapps will perform in parallel (per connection) setting the value: `kubeops.burst=<desired_number>` and `kubeops.QPS=<desired_number>`. The default value, if not set, is 15 burst requests and 10 QPS afterwards.

### More questions?

Feel free to [open an issue](https://github.com/kubeapps/kubeapps/issues/new) if you have any questions!
Expand Down
6 changes: 6 additions & 0 deletions chart/kubeapps/templates/kubeops-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ spec:
{{- if .Values.pinnipedProxy.enabled }}
- --pinniped-proxy-url=http://kubeapps-internal-pinniped-proxy.{{ .Release.Namespace }}:{{ .Values.pinnipedProxy.service.port }}
{{- end }}
{{- if .Values.kubeops.burst }}
- --burst={{ .Values.kubeops.burst }}
{{- end }}
{{- if .Values.kubeops.QPS }}
- --qps={{ .Values.kubeops.QPS }}
{{- end }}
{{- if .Values.clusters }}
volumeMounts:
- name: kubeops-config
Expand Down
4 changes: 4 additions & 0 deletions chart/kubeapps/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,10 @@ kubeops:
nodeSelector: {}
tolerations: []
affinity: {}
## Kubeops QPS and Burst configuration (per user request)
## Used when requesting namespaces
# QPS: 10
# burst: 15

## Assetsvc is used to serve assets metadata over a REST API.
##
Expand Down
4 changes: 3 additions & 1 deletion cmd/kubeops/internal/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type Options struct {
UserAgent string
KubeappsNamespace string
ClustersConfig kube.ClustersConfig
Burst int
QPS float32
}

// Config represents data needed by each handler to be able to create Helm 3 actions.
Expand Down Expand Up @@ -97,7 +99,7 @@ func WithHandlerConfig(storageForDriver agent.StorageForDriver, options Options)
return
}

kubeHandler, err := kube.NewHandler(options.KubeappsNamespace, options.ClustersConfig)
kubeHandler, err := kube.NewHandler(options.KubeappsNamespace, options.Burst, options.QPS, options.ClustersConfig)
if err != nil {
log.Errorf("Failed to create handler: %v", err)
response.NewErrorResponse(http.StatusInternalServerError, authUserError).Write(w)
Expand Down
8 changes: 7 additions & 1 deletion cmd/kubeops/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var (
helmDriverArg string
listLimit int
pinnipedProxyURL string
burst int
qps float32
settings environment.EnvSettings
timeout int64
userAgentComment string
Expand All @@ -51,6 +53,8 @@ func init() {
pflag.Int64Var(&timeout, "timeout", 300, "Timeout to perform release operations (install, upgrade, rollback, delete)")
pflag.StringVar(&clustersConfigPath, "clusters-config-path", "", "Configuration for clusters")
pflag.StringVar(&pinnipedProxyURL, "pinniped-proxy-url", "http://kubeapps-internal-pinniped-proxy.kubeapps:3333", "internal url to be used for requests to clusters configured for credential proxying via pinniped")
pflag.IntVar(&burst, "burst", 15, "internal burst capacity")
pflag.Float32Var(&qps, "qps", 10, "internal QPS rate")
}

func main() {
Expand Down Expand Up @@ -79,6 +83,8 @@ func main() {
Timeout: timeout,
KubeappsNamespace: kubeappsNamespace,
ClustersConfig: clustersConfig,
Burst: burst,
QPS: qps,
}

storageForDriver := agent.StorageForSecrets
Expand Down Expand Up @@ -109,7 +115,7 @@ func main() {
addRoute("DELETE", "/clusters/{cluster}/namespaces/{namespace}/releases/{releaseName}", handler.DeleteRelease)

// Backend routes unrelated to kubeops functionality.
err := backendHandlers.SetupDefaultRoutes(r.PathPrefix("/backend/v1").Subrouter(), clustersConfig)
err := backendHandlers.SetupDefaultRoutes(r.PathPrefix("/backend/v1").Subrouter(), options.Burst, options.QPS, clustersConfig)
if err != nil {
log.Fatalf("Unable to setup backend routes: %+v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/http-handler/http-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ func CanI(kubeHandler kube.AuthHandler) func(w http.ResponseWriter, req *http.Re
}

// SetupDefaultRoutes enables call-sites to use the backend api's default routes with minimal setup.
func SetupDefaultRoutes(r *mux.Router, clustersConfig kube.ClustersConfig) error {
backendHandler, err := kube.NewHandler(os.Getenv("POD_NAMESPACE"), clustersConfig)
func SetupDefaultRoutes(r *mux.Router, burst int, qps float32, clustersConfig kube.ClustersConfig) error {
backendHandler, err := kube.NewHandler(os.Getenv("POD_NAMESPACE"), burst, qps, clustersConfig)
if err != nil {
return err
}
Expand Down
89 changes: 75 additions & 14 deletions pkg/kube/kube_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"path"
"strings"
"sync"

"github.com/kubeapps/kubeapps/cmd/apprepository-controller/pkg/apis/apprepository/v1alpha1"
apprepoclientset "github.com/kubeapps/kubeapps/cmd/apprepository-controller/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -152,6 +154,7 @@ type combinedClientsetInterface interface {
CoreV1() corev1typed.CoreV1Interface
AuthorizationV1() authorizationv1.AuthorizationV1Interface
RestClient() rest.Interface
MaxWorkers() int
}

// Need to use a type alias to embed the two Clientset's without a name clash.
Expand All @@ -166,6 +169,10 @@ func (c *combinedClientset) RestClient() rest.Interface {
return c.restCli
}

func (c *combinedClientset) MaxWorkers() int {
return int(c.restCli.GetRateLimiter().QPS())
}

// kubeHandler handles http requests for operating on app repositories and k8s resources
// in Kubeapps, without exposing implementation details to 3rd party integrations.
type kubeHandler struct {
Expand Down Expand Up @@ -338,7 +345,7 @@ var ErrEmptyOCIRegistry = fmt.Errorf("You need to specify at least one repositor

// NewHandler returns a handler configured with a service account client set and a config
// with a blank token to be copied when creating user client sets with specific tokens.
func NewHandler(kubeappsNamespace string, clustersConfig ClustersConfig) (AuthHandler, error) {
func NewHandler(kubeappsNamespace string, burst int, qps float32, clustersConfig ClustersConfig) (AuthHandler, error) {
clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{
Expand All @@ -356,6 +363,12 @@ func NewHandler(kubeappsNamespace string, clustersConfig ClustersConfig) (AuthHa
if err != nil {
return nil, err
}
// Modify the default number of requests that the given client can do
// This is useful to handle a large number of namespaces which are check in parallel
// Burst is the initial number of request made in parallel
// Then further requests are performed following the QPS rate
config.Burst = burst
config.QPS = qps

svcRestConfig, err := rest.InClusterConfig()
if err != nil {
Expand Down Expand Up @@ -753,24 +766,68 @@ func KubeappsSecretNameForRepo(repoName, namespace string) string {
return fmt.Sprintf("%s-%s", namespace, secretNameForRepo(repoName))
}

func filterAllowedNamespaces(userClientset combinedClientsetInterface, namespaces []corev1.Namespace) ([]corev1.Namespace, error) {
allowedNamespaces := []corev1.Namespace{}
for _, namespace := range namespaces {
type checkNSJob struct {
ns corev1.Namespace
}

type checkNSResult struct {
checkNSJob
allowed bool
Error error
}

func nsCheckerWorker(userClientset combinedClientsetInterface, nsJobs <-chan checkNSJob, resultChan chan checkNSResult) {
for j := range nsJobs {
res, err := userClientset.AuthorizationV1().SelfSubjectAccessReviews().Create(context.TODO(), &authorizationapi.SelfSubjectAccessReview{
Spec: authorizationapi.SelfSubjectAccessReviewSpec{
ResourceAttributes: &authorizationapi.ResourceAttributes{
Group: "",
Resource: "secrets",
Verb: "get",
Namespace: namespace.Name,
Namespace: j.ns.Name,
},
},
}, metav1.CreateOptions{})
if err != nil {
return nil, err
resultChan <- checkNSResult{j, res.Status.Allowed, err}
}
}

func filterAllowedNamespaces(userClientset combinedClientsetInterface, namespaces []corev1.Namespace) ([]corev1.Namespace, error) {
allowedNamespaces := []corev1.Namespace{}

var wg sync.WaitGroup
workers := int(math.Min(float64(len(namespaces)), float64(userClientset.MaxWorkers())))
checkNSJobs := make(chan checkNSJob, workers)
nsCheckRes := make(chan checkNSResult, workers)

// Process maxReq ns at a time
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
nsCheckerWorker(userClientset, checkNSJobs, nsCheckRes)
wg.Done()
}()
}
go func() {
wg.Wait()
close(nsCheckRes)
}()

go func() {
for _, ns := range namespaces {
checkNSJobs <- checkNSJob{ns}
}
if res.Status.Allowed {
allowedNamespaces = append(allowedNamespaces, namespace)
close(checkNSJobs)
}()

// Start receiving results
for res := range nsCheckRes {
if res.Error == nil {
if res.allowed {
allowedNamespaces = append(allowedNamespaces, res.ns)
}
} else {
log.Errorf("failed to check namespace permissions. Got %v", res.Error)
}
}
return allowedNamespaces, nil
Expand All @@ -789,6 +846,7 @@ func filterActiveNamespaces(namespaces []corev1.Namespace) []corev1.Namespace {
// GetNamespaces return the list of namespaces that the user has permission to access
func (a *userHandler) GetNamespaces() ([]corev1.Namespace, error) {
// Try to list namespaces with the user token, for backward compatibility
var namespaceList []corev1.Namespace
namespaces, err := a.clientset.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
if err != nil {
if k8sErrors.IsForbidden(err) {
Expand All @@ -801,12 +859,15 @@ func (a *userHandler) GetNamespaces() ([]corev1.Namespace, error) {
} else {
return nil, err
}
}

// Filter namespaces in which the user has permissions to write (secrets) only
namespaceList, err := filterAllowedNamespaces(a.clientset, namespaces.Items)
if err != nil {
return nil, err
// Filter namespaces in which the user has permissions to write (secrets) only
namespaceList, err = filterAllowedNamespaces(a.clientset, namespaces.Items)
if err != nil {
return nil, err
}
} else {
// If the user can list namespaces, do not filter them
namespaceList = namespaces.Items
}

// Filter namespaces that are in terminating state
Expand Down
15 changes: 5 additions & 10 deletions pkg/kube/kube_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func (f fakeCombinedClientset) RestClient() rest.Interface {
return f.rc
}

func (f fakeCombinedClientset) MaxWorkers() int {
return 1
}

func checkErr(t *testing.T, err error, expectedError error) {
if err == nil && expectedError != nil {
t.Errorf("got: nil, want: %+v", expectedError)
Expand Down Expand Up @@ -868,16 +872,6 @@ func TestGetNamespaces(t *testing.T) {
expectedNamespaces: []string{"foo", "bar", "zed"},
allowed: true,
},
{
name: "it does filter the namespaces if returned by the user client",
existingNamespaces: []existingNs{
{"foo", corev1.NamespaceActive},
{"bar", corev1.NamespaceActive},
{"zed", corev1.NamespaceActive},
},
expectedNamespaces: []string{},
allowed: false,
},
{
name: "it lists namespaces if the userclient fails but the service client succeeds",
existingNamespaces: []existingNs{
Expand Down Expand Up @@ -1129,6 +1123,7 @@ func TestNewClusterConfig(t *testing.T) {
inClusterConfig *rest.Config
expectedConfig *rest.Config
errorExpected bool
maxReq int
}{
{
name: "returns an in-cluster with explicit token for the default cluster",
Expand Down

0 comments on commit 0b26aa8

Please sign in to comment.