Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new K8s clients provider in Flux plugin #5379

Merged
merged 3 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type NamespacedResourceWatcherCacheConfig struct {
Gvr schema.GroupVersionResource
// this ClientGetter is for running out-of-request interactions with the Kubernetes API server,
// such as watching for resource changes
ClientGetter clientgetter.BackgroundClientGetterFunc
ClientGetter clientgetter.FixedClusterClientProviderInterface
// 'OnAddFunc' hook is called when an object comes about and the cache does not have a
// corresponding entry. Note this maybe happen as a result of a newly created k8s object
// or a modified object for which there was no entry in the cache
Expand Down
4 changes: 3 additions & 1 deletion cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

// RegisterWithGRPCServer enables a plugin to register with a gRPC server
// returning the server implementation.
//
//nolint:deadcode
func RegisterWithGRPCServer(opts pluginsv1alpha1.GRPCPluginRegistrationOptions) (interface{}, error) {
log.Info("+fluxv2 RegisterWithGRPCServer")
Expand All @@ -25,7 +26,7 @@ func RegisterWithGRPCServer(opts pluginsv1alpha1.GRPCPluginRegistrationOptions)
// 'Shutdown' hook
stopCh := make(chan struct{})

svr, err := NewServer(opts.ConfigGetter, opts.ClustersConfig.KubeappsClusterName, stopCh, opts.PluginConfigPath)
svr, err := NewServer(opts.ConfigGetter, opts.ClustersConfig.KubeappsClusterName, stopCh, opts.PluginConfigPath, opts.ClientQPS, opts.ClientBurst)
if err != nil {
return nil, err
}
Expand All @@ -36,6 +37,7 @@ func RegisterWithGRPCServer(opts pluginsv1alpha1.GRPCPluginRegistrationOptions)

// RegisterHTTPHandlerFromEndpoint enables a plugin to register an http
// handler to translate to the gRPC request.
//
//nolint:deadcode
func RegisterHTTPHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error {
log.Info("+fluxv2 RegisterHTTPHandlerFromEndpoint")
Expand Down
16 changes: 8 additions & 8 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ func (s *Server) filterReadyReposByName(repoList []sourcev1.HelmRepository, matc
}

// Notes:
// 1. with flux, an available package may be from a repo in any namespace accessible to the caller
// 2. can't rely on cache as a real source of truth for key names
// because redis may evict cache entries due to memory pressure to make room for new ones
// 1. with flux, an available package may be from a repo in any namespace accessible to the caller
// 2. can't rely on cache as a real source of truth for key names
// because redis may evict cache entries due to memory pressure to make room for new ones
func (s *Server) getChartsForRepos(ctx context.Context, match []string) (map[string][]models.Chart, error) {
repoList, err := s.listReposInAllNamespaces(ctx)
if err != nil {
Expand Down Expand Up @@ -551,7 +551,9 @@ func (s *Server) createKubeappsManagedRepoSecret(
// using owner references on the secret so that it can be
// (1) cleaned up automatically and/or
// (2) enable some control (ie. if I add a secret manually
// via kubectl before running kubeapps, it won't get deleted just
//
// via kubectl before running kubeapps, it won't get deleted just
//
// because Kubeapps is deleting it)?
// see https://github.com/vmware-tanzu/kubeapps/pull/4630#discussion_r861446394 for details
func (s *Server) setOwnerReferencesForRepoSecret(
Expand Down Expand Up @@ -753,11 +755,9 @@ func (s *Server) deleteRepo(ctx context.Context, repoRef *corev1.PackageReposito
}
}

//
// implements plug-in specific cache-related functionality
//
type repoEventSink struct {
clientGetter clientgetter.BackgroundClientGetterFunc
clientGetter clientgetter.FixedClusterClientProviderInterface
chartCache *cache.ChartCache // chartCache maybe nil only in unit tests
}

Expand Down Expand Up @@ -1282,7 +1282,7 @@ func getRepoTlsConfigAndAuthWithUserManagedSecrets(secret *apiv1.Secret) (*corev

// TODO (gfichtenolt) Per slack discussion
// In fact, keeping the existing API might mean we could return exactly what it already does today
//(i.e. all secrets) if called with an extra explicit option (includeSecrets=true in the request
// (i.e. all secrets) if called with an extra explicit option (includeSecrets=true in the request
// message, not sure, similar to kubectl config view --raw) and by default the secrets are REDACTED
// as you mention? This would mean clients will by default see only REDACTED secrets,
// but can request the full sensitive data when necessary?
Expand Down
12 changes: 6 additions & 6 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2466,9 +2466,9 @@ func (s *Server) redisMockExpectGetFromRepoCache(mock redismock.ClientMock, filt
}

func (s *Server) redisMockSetValueForRepo(mock redismock.ClientMock, repo sourcev1.HelmRepository, oldValue []byte) (key string, bytes []byte, err error) {
backgroundClientGetter := func(ctx context.Context) (clientgetter.ClientInterfaces, error) {
return s.clientGetter(ctx, s.kubeappsCluster)
}
backgroundClientGetter := &clientgetter.FixedClusterClientProvider{ClientsFunc: func(ctx context.Context) (*clientgetter.ClientGetter, error) {
return s.clientGetter.GetClients(ctx, s.kubeappsCluster)
}}
sink := repoEventSink{
clientGetter: backgroundClientGetter,
chartCache: nil,
Expand Down Expand Up @@ -2502,9 +2502,9 @@ func redisMockSetValueForRepo(mock redismock.ClientMock, key string, newValue, o
}

func (s *Server) redisKeyValueForRepo(r sourcev1.HelmRepository) (key string, byteArray []byte, err error) {
cg := func(ctx context.Context) (clientgetter.ClientInterfaces, error) {
return s.clientGetter(ctx, s.kubeappsCluster)
}
cg := &clientgetter.FixedClusterClientProvider{ClientsFunc: func(ctx context.Context) (*clientgetter.ClientGetter, error) {
return s.clientGetter.GetClients(ctx, s.kubeappsCluster)
}}
sinkNoChartCache := repoEventSink{clientGetter: cg}
return sinkNoChartCache.redisKeyValueForRepo(r)
}
Expand Down
27 changes: 15 additions & 12 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ type Server struct {
// non-test implementation.
// It is meant for in-band interactions (i.e. in the context of a caller)
// with k8s API server
clientGetter clientgetter.ClientGetterFunc
clientGetter clientgetter.ClientProviderInterface
// for interactions with k8s API server in the context of
// kubeapps-internal-kubeappsapis service account
serviceAccountClientGetter clientgetter.BackgroundClientGetterFunc
serviceAccountClientGetter clientgetter.FixedClusterClientProviderInterface

actionConfigGetter clientgetter.HelmActionConfigGetterFunc

Expand All @@ -63,7 +63,7 @@ type Server struct {

// NewServer returns a Server automatically configured with a function to obtain
// the k8s client config.
func NewServer(configGetter core.KubernetesConfigGetter, kubeappsCluster string, stopCh <-chan struct{}, pluginConfigPath string) (*Server, error) {
func NewServer(configGetter core.KubernetesConfigGetter, kubeappsCluster string, stopCh <-chan struct{}, pluginConfigPath string, clientQPS float32, clientBurst int) (*Server, error) {
log.Infof("+fluxv2 NewServer(kubeappsCluster: [%v], pluginConfigPath: [%s]",
kubeappsCluster, pluginConfigPath)

Expand Down Expand Up @@ -94,8 +94,7 @@ func NewServer(configGetter core.KubernetesConfigGetter, kubeappsCluster string,
log.Fatalf("%s", err)
}

backgroundClientGetter := clientgetter.NewBackgroundClientGetter(
configGetter, clientgetter.Options{Scheme: scheme})
backgroundClientGetter := clientgetter.NewBackgroundClientProvider(clientgetter.Options{Scheme: scheme}, clientQPS, clientBurst)

s := repoEventSink{
clientGetter: backgroundClientGetter,
Expand Down Expand Up @@ -128,9 +127,12 @@ func NewServer(configGetter core.KubernetesConfigGetter, kubeappsCluster string,
"repoCache", repoCacheConfig, redisCli, stopCh, false); err != nil {
return nil, err
} else {
clientProvider, err := clientgetter.NewClientProvider(configGetter, clientgetter.Options{Scheme: scheme})
if err != nil {
log.Fatalf("%s", err)
}
return &Server{
clientGetter: clientgetter.NewClientGetter(
configGetter, clientgetter.Options{Scheme: scheme}),
clientGetter: clientProvider,
serviceAccountClientGetter: backgroundClientGetter,
actionConfigGetter: clientgetter.NewHelmActionConfigGetter(
configGetter, kubeappsCluster),
Expand Down Expand Up @@ -644,12 +646,13 @@ func (s *Server) SetUserManagedSecrets(ctx context.Context, request *v1alpha1.Se
// aka an "out-of-band" interaction and use cases when the user wants something
// done explicitly, aka "in-band" interaction
func (s *Server) newRepoEventSink() repoEventSink {
cg := func(ctx context.Context) (clientgetter.ClientInterfaces, error) {
return s.clientGetter(ctx, s.kubeappsCluster)
}

// notice a bit of inconsistency here, we are using s.clientGetter
// (i.e. the context of the incoming request) to read the secret
cg := &clientgetter.FixedClusterClientProvider{ClientsFunc: func(ctx context.Context) (*clientgetter.ClientGetter, error) {
return s.clientGetter.GetClients(ctx, s.kubeappsCluster)
}}

// notice a bit of inconsistency here, we are using the context
// of the incoming request to read the secret
// as opposed to s.repoCache.clientGetter (which uses the context of
// User "system:serviceaccount:kubeapps:kubeapps-internal-kubeappsapis")
// which is what is used when the repo is being processed/indexed.
Expand Down
38 changes: 17 additions & 21 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package main
import (
"context"
"io"
apiext "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/kubernetes"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -233,14 +235,11 @@ func newServerWithRepos(t *testing.T, repos []sourcev1.HelmRepository, charts []

apiextIfc := apiextfake.NewSimpleClientset(fluxHelmRepositoryCRD)
ctrlClient := newCtrlClient(repos, nil, nil)
clientGetter := func(context.Context, string) (clientgetter.ClientInterfaces, error) {
return clientgetter.
NewBuilder().
WithTyped(typedClient).
WithApiExt(apiextIfc).
WithControllerRuntime(&ctrlClient).
Build(), nil
}
clientGetter := clientgetter.NewFixedClientProvider(&clientgetter.ClientGetter{
ApiExt: func() (apiext.Interface, error) { return apiextIfc, nil },
Typed: func() (kubernetes.Interface, error) { return typedClient, nil },
ControllerRuntime: func() (ctrlclient.WithWatch, error) { return &ctrlClient, nil },
})
return newServer(t, clientGetter, nil, repos, charts)
}

Expand All @@ -255,14 +254,11 @@ func newServerWithChartsAndReleases(t *testing.T, actionConfig *action.Configura

apiextIfc := apiextfake.NewSimpleClientset(fluxHelmRepositoryCRD)
ctrlClient := newCtrlClient(nil, charts, releases)
clientGetter := func(context.Context, string) (clientgetter.ClientInterfaces, error) {
return clientgetter.
NewBuilder().
WithApiExt(apiextIfc).
WithTyped(typedClient).
WithControllerRuntime(&ctrlClient).
Build(), nil
}
clientGetter := clientgetter.NewFixedClientProvider(&clientgetter.ClientGetter{
ApiExt: func() (apiext.Interface, error) { return apiextIfc, nil },
Typed: func() (kubernetes.Interface, error) { return typedClient, nil },
ControllerRuntime: func() (ctrlclient.WithWatch, error) { return &ctrlClient, nil },
})
return newServer(t, clientGetter, actionConfig, nil, nil)
}

Expand Down Expand Up @@ -318,7 +314,7 @@ func newHelmActionConfig(t *testing.T, namespace string, rels []helmReleaseStub)
// (unlike charts or releases) is that repos are treated special because
// a new instance of a Server object is only returned once the cache has been synced with indexed repos
func newServer(t *testing.T,
clientGetter clientgetter.ClientGetterFunc,
clientGetter clientgetter.ClientProviderInterface,
actionConfig *action.Configuration,
repos []sourcev1.HelmRepository,
charts []testSpecChartWithUrl) (*Server, redismock.ClientMock, error) {
Expand All @@ -332,14 +328,14 @@ func newServer(t *testing.T,
if clientGetter != nil {
// if client getter returns an error, FLUSHDB call does not take place, because
// newCacheWithRedisClient() raises an error before redisCli.FlushDB() call
if _, err := clientGetter(context.TODO(), ""); err == nil {
if _, err := clientGetter.GetClients(context.TODO(), ""); err == nil {
mock.ExpectFlushDB().SetVal("OK")
}
}

backgroundClientGetter := func(ctx context.Context) (clientgetter.ClientInterfaces, error) {
return clientGetter(ctx, KubeappsCluster)
}
backgroundClientGetter := &clientgetter.FixedClusterClientProvider{ClientsFunc: func(ctx context.Context) (*clientgetter.ClientGetter, error) {
return clientGetter.GetClients(ctx, KubeappsCluster)
}}

sink := repoEventSink{
clientGetter: backgroundClientGetter,
Expand Down
Loading