Skip to content

Commit

Permalink
use built-in SA token refresh functionality within client-go
Browse files Browse the repository at this point in the history
fixes: kiali#6924
  • Loading branch information
jmazzitelli committed Jan 16, 2024
1 parent 7eb7107 commit 38c1f76
Show file tree
Hide file tree
Showing 16 changed files with 40 additions and 237 deletions.
2 changes: 1 addition & 1 deletion business/authentication/header_auth_controller.go
Expand Up @@ -72,7 +72,7 @@ func (c headerAuthController) Authenticate(r *http.Request, w http.ResponseWrite
}
}

kialiToken, err := kubernetes.GetKialiTokenForHomeCluster()
kialiToken, _, err := kubernetes.GetKialiTokenForHomeCluster()
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion business/authentication/openid_auth_controller.go
Expand Up @@ -247,7 +247,7 @@ func (c OpenIdAuthController) ValidateSession(r *http.Request, w http.ResponseWr
// If RBAC is off, it's assumed that the kubernetes cluster will reject the OpenId token.
// Instead, we use the Kiali token and this has the side effect that all users will share the
// same privileges.
token, err = kubernetes.GetKialiTokenForHomeCluster()
token, _, err = kubernetes.GetKialiTokenForHomeCluster()
if err != nil {
return nil, fmt.Errorf("error reading the Kiali ServiceAccount token: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion business/istio_status.go
Expand Up @@ -270,7 +270,7 @@ func getAddonStatus(name string, enabled bool, isCore bool, auth *config.Auth, u
}

if auth.UseKialiToken {
token, err := kubernetes.GetKialiTokenForHomeCluster()
token, _, err := kubernetes.GetKialiTokenForHomeCluster()
if err != nil {
log.Errorf("Could not read the Kiali Service Account token: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions handlers/authentication.go
Expand Up @@ -35,7 +35,7 @@ type sessionInfo struct {

func NewAuthenticationHandler() (AuthenticationHandler, error) {
// Read token from the filesystem
saToken, err := kubernetes.GetKialiTokenForHomeCluster()
saToken, _, err := kubernetes.GetKialiTokenForHomeCluster()
if err != nil {
return AuthenticationHandler{}, err
}
Expand All @@ -62,7 +62,7 @@ func (aHandler AuthenticationHandler) Handle(next http.Handler) http.Handler {
}
case config.AuthStrategyAnonymous:
log.Tracef("Access to the server endpoint is not secured with credentials - letting request come in. Url: [%s]", r.URL.String())
token, err := kubernetes.GetKialiTokenForHomeCluster()
token, _, err := kubernetes.GetKialiTokenForHomeCluster()
if err != nil {
token = ""
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func AuthenticationInfo(w http.ResponseWriter, r *http.Request) {

switch conf.Auth.Strategy {
case config.AuthStrategyOpenshift:
token, err := kubernetes.GetKialiTokenForHomeCluster()
token, _, err := kubernetes.GetKialiTokenForHomeCluster()
if err != nil {
RespondWithDetailedError(w, http.StatusInternalServerError, "Error obtaining Kiali SA token", err.Error())
return
Expand Down
1 change: 1 addition & 0 deletions handlers/authentication_test.go
Expand Up @@ -252,6 +252,7 @@ func (r *rejectClient) GetProjects(labelSelector string) ([]osproject_v1.Project

func mockK8s(t *testing.T, reject bool) {
kubernetes.KialiTokenForHomeCluster = "notrealtoken"
kubernetes.KialiTokenFileForHomeCluster = "notrealtokenpath"
k8s := kubetest.NewFakeK8sClient(&osproject_v1.Project{ObjectMeta: meta_v1.ObjectMeta{Name: "tutorial"}})
k8s.OpenShift = true
var kubeClient kubernetes.ClientInterface = k8s
Expand Down
67 changes: 7 additions & 60 deletions kubernetes/cache/cache.go
@@ -1,7 +1,6 @@
package cache

import (
"context"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -58,12 +57,7 @@ type kialiCacheImpl struct {
// TODO: Get rid of embedding.
KubeCache

// Stops the background goroutines which refresh the cache's
// service account token and poll for istiod's proxy status.
cleanup func()
clientFactory kubernetes.ClientFactory
// How often the cache will check for kiali SA client changes.
clientRefreshPollingPeriod time.Duration
// Maps a cluster name to a KubeCache
kubeCache map[string]KubeCache
refreshDuration time.Duration
Expand All @@ -82,14 +76,13 @@ type kialiCacheImpl struct {

func NewKialiCache(clientFactory kubernetes.ClientFactory, cfg config.Config) (KialiCache, error) {
kialiCacheImpl := kialiCacheImpl{
clientFactory: clientFactory,
clientRefreshPollingPeriod: time.Duration(time.Second * 60),
kubeCache: make(map[string]KubeCache),
refreshDuration: time.Duration(cfg.KubernetesConfig.CacheDuration) * time.Second,
tokenNamespaces: make(map[string]namespaceCache),
tokenNamespaceDuration: time.Duration(cfg.KubernetesConfig.CacheTokenNamespaceDuration) * time.Second,
proxyStatusStore: store.New[*kubernetes.ProxyStatus](),
registryStatusStore: store.New[*kubernetes.RegistryStatus](),
clientFactory: clientFactory,
kubeCache: make(map[string]KubeCache),
refreshDuration: time.Duration(cfg.KubernetesConfig.CacheDuration) * time.Second,
tokenNamespaces: make(map[string]namespaceCache),
tokenNamespaceDuration: time.Duration(cfg.KubernetesConfig.CacheTokenNamespaceDuration) * time.Second,
proxyStatusStore: store.New[*kubernetes.ProxyStatus](),
registryStatusStore: store.New[*kubernetes.RegistryStatus](),
}

for cluster, client := range clientFactory.GetSAClients() {
Expand All @@ -114,19 +107,6 @@ func NewKialiCache(clientFactory kubernetes.ClientFactory, cfg config.Config) (K
return nil, errors.New("home cluster not configured in kiali cache")
}

// Starting background goroutines to:
// 1. Refresh the cache's service account token
// These will stop when the context is cancelled.
// Starting goroutines after any errors are handled so as not to leak goroutines.
ctx, cancel := context.WithCancel(context.Background())

// Note that this only watches for changes to the home cluster's token since it is
// expected that the remote cluster tokens will not change. However, that assumption
// may be wrong and in the future the cache may want to watch for changes to all client tokens.
kialiCacheImpl.watchForClientChanges(ctx, clientFactory.GetSAHomeClusterClient().GetToken())

kialiCacheImpl.cleanup = cancel

return &kialiCacheImpl, nil
}

Expand Down Expand Up @@ -157,39 +137,6 @@ func (c *kialiCacheImpl) Stop() {
}(kc)
}
wg.Wait()

c.cleanup()
}

// watchForClientChanges watches for changes to the cache's service account client
// and recreates the cache(s) when the client changes. The client is updated when
// the token for the client changes.
func (c *kialiCacheImpl) watchForClientChanges(ctx context.Context, token string) {
ticker := time.NewTicker(c.clientRefreshPollingPeriod)
go func() {
for {
select {
case <-ticker.C:
if c.clientFactory.GetSAHomeClusterClient().GetToken() != token {
log.Info("[Kiali Cache] Updating cache with new token")

if err := c.KubeCache.UpdateClient(c.clientFactory.GetSAHomeClusterClient()); err != nil {
log.Errorf("[Kiali Cache] Error updating cache with new token. Err: %s", err)
// Try again on the next tick without updating the token.
continue
}

token = c.clientFactory.GetSAHomeClusterClient().GetToken()
} else {
log.Debug("[Kiali Cache] Nothing to refresh")
}
case <-ctx.Done():
log.Debug("[Kiali Cache] Stopping watching for service account token changes")
ticker.Stop()
return
}
}
}()
}

func (c *kialiCacheImpl) GetClusters() []kubernetes.Cluster {
Expand Down
38 changes: 0 additions & 38 deletions kubernetes/cache/cache_test.go
@@ -1,9 +1,7 @@
package cache

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
apps_v1 "k8s.io/api/apps/v1"
Expand All @@ -30,42 +28,6 @@ func (f *fakeKubeCache) getClient() kubernetes.ClientInterface {
return f.kubeCache.client
}

func TestClientUpdatedWhenSAClientChanges(t *testing.T) {
require := require.New(t)
conf := config.NewConfig()
config.Set(conf)

client := kubetest.NewFakeK8sClient()
client.Token = "current-token"
clientFactory := kubetest.NewK8SClientFactoryMock(client)
k8sCache, err := NewKubeCache(client, *conf)
require.NoError(err)

kubeCache := &fakeKubeCache{kubeCache: k8sCache}
kialiCache := &kialiCacheImpl{
clientRefreshPollingPeriod: time.Millisecond,
clientFactory: clientFactory,
KubeCache: kubeCache,
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kialiCache.watchForClientChanges(ctx, client.Token)

// Update the client. This should trigger a cache refresh.
newClient := kubetest.NewFakeK8sClient()
newClient.Token = "new-token"
clientFactory.SetClients(map[string]kubernetes.ClientInterface{conf.KubernetesConfig.ClusterName: newClient})

require.Eventually(
func() bool { return kubeCache.getClient() != client },
500*time.Millisecond,
5*time.Millisecond,
"client and cache should have been updated",
)
}

func TestNoHomeClusterReturnsError(t *testing.T) {
require := require.New(t)
conf := config.NewConfig()
Expand Down
5 changes: 0 additions & 5 deletions kubernetes/cache/kube_cache.go
Expand Up @@ -62,11 +62,6 @@ type KubeCache interface {
// using the Kiali Service Account client.
Client() kubernetes.ClientInterface

// UpdateClient will update the client used by the cache.
// Useful for when the token is refreshed for the client.
// This causes a full refresh of the cache.
UpdateClient(client kubernetes.ClientInterface) error

GetConfigMap(namespace, name string) (*core_v1.ConfigMap, error)
GetDaemonSets(namespace string) ([]apps_v1.DaemonSet, error)
GetDaemonSet(namespace, name string) (*apps_v1.DaemonSet, error)
Expand Down
7 changes: 6 additions & 1 deletion kubernetes/client.go
Expand Up @@ -130,7 +130,12 @@ func getConfigForLocalCluster() (*rest.Config, error) {
remoteSecretPath := kialiconfig.Get().Deployment.RemoteSecretPath
if remoteSecret, readErr := GetRemoteSecret(remoteSecretPath); readErr == nil {
log.Debugf("Using remote secret for local cluster config found at: [%s]. Kiali must be running outside the kube cluster.", remoteSecretPath)
return clientcmd.NewDefaultClientConfig(*remoteSecret, nil).ClientConfig()
cc, err := clientcmd.NewDefaultClientConfig(*remoteSecret, nil).ClientConfig()
if err != nil {
return cc, err
}
cc.BearerTokenFile = remoteSecretPath
return cc, nil
} else {
log.Debugf("Unable to read remote secret. It may or may not exist. Error: %v. Falling back to in cluster config", readErr)
// Fallback to in cluster config
Expand Down
60 changes: 3 additions & 57 deletions kubernetes/client_factory.go
Expand Up @@ -185,7 +185,7 @@ func (cf *clientFactory) newClient(authInfo *api.AuthInfo, expirationTime time.D
var err error

if cluster == cf.homeCluster {
kialiToken, err = GetKialiTokenForHomeCluster()
kialiToken, _, err = GetKialiTokenForHomeCluster()
} else {
kialiToken, err = cf.GetSAClient(cluster).GetToken(), nil
}
Expand Down Expand Up @@ -417,66 +417,11 @@ func getTokenHash(authInfo *api.AuthInfo) string {

// KialiSAClients returns all clients associated with the Kiali service account across clusters.
func (cf *clientFactory) GetSAClient(cluster string) ClientInterface {
// while we are here, refresh the client
if err := cf.refreshClientIfTokenChanged(cluster); err != nil {
log.Errorf("Unable to refresh Kiali SA client for cluster [%s]: %v", cluster, err)
}

cf.mutex.RLock()
defer cf.mutex.RUnlock()
return cf.saClientEntries[cluster]
}

// Check for kiali token changes and refresh the client when it does.
func (cf *clientFactory) refreshClientIfTokenChanged(cluster string) error {
var refreshTheClient bool // will be true if the client needs to be refreshed
var rci *RemoteClusterInfo

if cluster == cf.homeCluster {
// LOCAL CLUSTER
if newTokenToCheck, err := GetKialiTokenForHomeCluster(); err != nil {
return err
} else {
cf.mutex.RLock()
client, ok := cf.saClientEntries[cluster]
cf.mutex.RUnlock()
if !ok {
return errors.New("there is no home cluster SA client to refresh")
}
refreshTheClient = client.GetToken() != newTokenToCheck
rci = nil
}
} else {
// REMOTE CLUSTER
cf.mutex.RLock()
remoteRci, ok := cf.remoteClusterInfos[cluster]
cf.mutex.RUnlock()
if !ok {
return fmt.Errorf("cannot refresh token for unknown cluster [%s]", cluster)
} else {
if reloadedRci, err := reloadRemoteClusterInfoFromFile(remoteRci); err != nil {
return err
} else {
refreshTheClient = (reloadedRci != nil) // note that anything (not just the token) that changed will trigger the client to be refreshed
rci = reloadedRci
}
}
}

if refreshTheClient {
log.Debugf("Kiali SA token has changed for cluster [%s], refreshing the client", cluster)
newClient, err := cf.newSAClient(rci)
if err != nil {
return err
}
cf.mutex.Lock()
cf.saClientEntries[cluster] = newClient
cf.mutex.Unlock()
}

return nil
}

// KialiSAHomeClusterClient returns the Kiali service account client for the cluster where Kiali is running.
func (cf *clientFactory) GetSAHomeClusterClient() ClientInterface {
return cf.GetSAClient(cf.homeCluster)
Expand All @@ -498,13 +443,14 @@ func (cf *clientFactory) getConfig(clusterInfo *RemoteClusterInfo) (*rest.Config
} else {
// Just read the token and then use the base config.
// We're an in cluster client. Read the kiali service account token.
kialiToken, err := GetKialiTokenForHomeCluster()
kialiToken, kialiTokenFile, err := GetKialiTokenForHomeCluster()
if err != nil {
return nil, fmt.Errorf("unable to get Kiali service account token: %s", err)
}

// Copy over the base rest config and the token
clientConfig.BearerToken = kialiToken
clientConfig.BearerTokenFile = kialiTokenFile
}

if !kialiConfig.KialiFeatureFlags.Clustering.EnableExecProvider {
Expand Down
57 changes: 0 additions & 57 deletions kubernetes/client_factory_test.go
Expand Up @@ -130,63 +130,6 @@ func TestConcurrentClientFactory(t *testing.T) {
wg.Wait()
}

func TestSAHomeClientUpdatesWhenKialiTokenChanges(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
kialiConfig := config.NewConfig()
config.Set(kialiConfig)
currentToken := KialiTokenForHomeCluster
currentTime := tokenRead
t.Cleanup(func() {
// Other tests use this global var so we need to reset it.
tokenRead = currentTime
KialiTokenForHomeCluster = currentToken
})

tokenRead = time.Now()
KialiTokenForHomeCluster = "current-token"

restConfig := rest.Config{}
clientFactory, err := newClientFactory(&restConfig)
require.NoError(err)

currentClient := clientFactory.GetSAHomeClusterClient()
assert.Equal(KialiTokenForHomeCluster, currentClient.GetToken())
assert.Equal(currentClient, clientFactory.GetSAHomeClusterClient())

KialiTokenForHomeCluster = "new-token"

// Assert that the token has changed and the client has changed.
newClient := clientFactory.GetSAHomeClusterClient()
assert.Equal(KialiTokenForHomeCluster, newClient.GetToken())
assert.NotEqual(currentClient, newClient)
}

func TestSAClientsUpdateWhenKialiTokenChanges(t *testing.T) {
require := require.New(t)
conf := config.NewConfig()
config.Set(conf)
t.Cleanup(func() {
// Other tests use this global var so we need to reset it.
KialiTokenForHomeCluster = ""
})

tokenRead = time.Now()
KialiTokenForHomeCluster = "current-token"

restConfig := rest.Config{}
clientFactory, err := newClientFactory(&restConfig)
require.NoError(err)

client := clientFactory.GetSAClient(conf.KubernetesConfig.ClusterName)
require.Equal(KialiTokenForHomeCluster, client.GetToken())

KialiTokenForHomeCluster = "new-token"

client = clientFactory.GetSAClient(conf.KubernetesConfig.ClusterName)
require.Equal(KialiTokenForHomeCluster, client.GetToken())
}

func TestClientCreatedWithClusterInfo(t *testing.T) {
// Create a fake cluster info file.
// Ensure client gets created with this.
Expand Down

0 comments on commit 38c1f76

Please sign in to comment.