Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use built-in SA token refresh functionality within client-go #7023

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion business/authentication/header_auth_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c headerAuthController) Authenticate(r *http.Request, w http.ResponseWrite
}
}

kialiToken, err := kubernetes.GetKialiTokenForHomeCluster()
kialiToken, _, err := kubernetes.GetKialiTokenForHomeCluster()
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion business/authentication/openid_auth_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (c OpenIdAuthController) ValidateSession(r *http.Request, w http.ResponseWr
// If RBAC is off, it's assumed that the kubernetes cluster will reject the OpenId token.
// Instead, we use the Kiali token and this has the side effect that all users will share the
// same privileges.
token, err = kubernetes.GetKialiTokenForHomeCluster()
token, _, err = kubernetes.GetKialiTokenForHomeCluster()
if err != nil {
return nil, fmt.Errorf("error reading the Kiali ServiceAccount token: %w", err)
}
Expand Down
4 changes: 1 addition & 3 deletions business/dashboards.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"strings"
"sync"

"k8s.io/client-go/tools/clientcmd/api"

"github.com/kiali/kiali/config"
"github.com/kiali/kiali/config/dashboards"
"github.com/kiali/kiali/log"
Expand Down Expand Up @@ -129,7 +127,7 @@ func (in *DashboardsService) resolveReferences(dashboard *dashboards.MonitoringD
}

// GetDashboard returns a dashboard filled-in with target data
func (in *DashboardsService) GetDashboard(authInfo *api.AuthInfo, params models.DashboardQuery, template string) (*models.MonitoringDashboard, error) {
func (in *DashboardsService) GetDashboard(params models.DashboardQuery, template string) (*models.MonitoringDashboard, error) {
promClient, err := in.prom()
if err != nil {
return nil, err
Expand Down
9 changes: 6 additions & 3 deletions business/dashboards_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/client-go/tools/clientcmd/api"

"github.com/kiali/kiali/config"
"github.com/kiali/kiali/config/dashboards"
"github.com/kiali/kiali/kubernetes"
"github.com/kiali/kiali/kubernetes/kubetest"
"github.com/kiali/kiali/models"
pmock "github.com/kiali/kiali/prometheus/prometheustest"
Expand Down Expand Up @@ -57,7 +57,7 @@ func TestGetDashboard(t *testing.T) {
mockClientFactory := kubetest.NewK8SClientFactoryMock(k8s)
SetWithBackends(mockClientFactory, nil)

dashboard, err := service.GetDashboard(&api.AuthInfo{Token: ""}, query, "dashboard1")
dashboard, err := service.GetDashboard(query, "dashboard1")

assert.Nil(err)
assert.Equal("Dashboard 1", dashboard.Title)
Expand All @@ -76,6 +76,9 @@ func TestGetDashboard(t *testing.T) {
func TestGetDashboardFromKialiNamespace(t *testing.T) {
assert := assert.New(t)

// allows GetDashboard to get the SA client under the covers
kubernetes.NewTestingClientFactory(t)

// Setup mocks
service, prom := setupService("my-namespace", []dashboards.MonitoringDashboard{*fakeDashboard("1")})

Expand All @@ -93,7 +96,7 @@ func TestGetDashboardFromKialiNamespace(t *testing.T) {
prom.MockMetric("my_metric_1_1", expectedLabels, &query.RangeQuery, 10)
prom.MockHistogram("my_metric_1_2", expectedLabels, &query.RangeQuery, 11, 12)

dashboard, err := service.GetDashboard(&api.AuthInfo{Token: ""}, query, "dashboard1")
dashboard, err := service.GetDashboard(query, "dashboard1")

assert.Nil(err)
assert.Equal("Dashboard 1", dashboard.Title)
Expand Down
2 changes: 1 addition & 1 deletion business/istio_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func getAddonStatus(name string, enabled bool, isCore bool, auth *config.Auth, u
}

if auth.UseKialiToken {
token, err := kubernetes.GetKialiTokenForHomeCluster()
token, _, err := kubernetes.GetKialiTokenForHomeCluster()
if err != nil {
log.Errorf("Could not read the Kiali Service Account token: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions handlers/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type sessionInfo struct {

func NewAuthenticationHandler() (AuthenticationHandler, error) {
// Read token from the filesystem
saToken, err := kubernetes.GetKialiTokenForHomeCluster()
saToken, _, err := kubernetes.GetKialiTokenForHomeCluster()
if err != nil {
return AuthenticationHandler{}, err
}
Expand All @@ -62,7 +62,7 @@ func (aHandler AuthenticationHandler) Handle(next http.Handler) http.Handler {
}
case config.AuthStrategyAnonymous:
log.Tracef("Access to the server endpoint is not secured with credentials - letting request come in. Url: [%s]", r.URL.String())
token, err := kubernetes.GetKialiTokenForHomeCluster()
token, _, err := kubernetes.GetKialiTokenForHomeCluster()
if err != nil {
token = ""
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func AuthenticationInfo(w http.ResponseWriter, r *http.Request) {

switch conf.Auth.Strategy {
case config.AuthStrategyOpenshift:
token, err := kubernetes.GetKialiTokenForHomeCluster()
token, _, err := kubernetes.GetKialiTokenForHomeCluster()
if err != nil {
RespondWithDetailedError(w, http.StatusInternalServerError, "Error obtaining Kiali SA token", err.Error())
return
Expand Down
1 change: 1 addition & 0 deletions handlers/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ func (r *rejectClient) GetProjects(labelSelector string) ([]osproject_v1.Project

func mockK8s(t *testing.T, reject bool) {
kubernetes.KialiTokenForHomeCluster = "notrealtoken"
kubernetes.KialiTokenFileForHomeCluster = "notrealtokenpath"
k8s := kubetest.NewFakeK8sClient(&osproject_v1.Project{ObjectMeta: meta_v1.ObjectMeta{Name: "tutorial"}})
k8s.OpenShift = true
var kubeClient kubernetes.ClientInterface = k8s
Expand Down
8 changes: 1 addition & 7 deletions handlers/dashboards.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ func CustomDashboard(w http.ResponseWriter, r *http.Request) {
namespace := pathParams["namespace"]
dashboardName := pathParams["dashboard"]

authInfo, err := getAuthInfo(r)
if err != nil {
RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}

layer, err := getBusiness(r)
if err != nil {
RespondWithError(w, http.StatusInternalServerError, err.Error())
Expand Down Expand Up @@ -65,7 +59,7 @@ func CustomDashboard(w http.ResponseWriter, r *http.Request) {
return
}

dashboard, err := svc.GetDashboard(authInfo, params, dashboardName)
dashboard, err := svc.GetDashboard(params, dashboardName)
if err != nil {
if errors.IsNotFound(err) {
RespondWithError(w, http.StatusNotFound, err.Error())
Expand Down
67 changes: 7 additions & 60 deletions kubernetes/cache/cache.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cache

import (
"context"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -58,12 +57,7 @@ type kialiCacheImpl struct {
// TODO: Get rid of embedding.
KubeCache

// Stops the background goroutines which refresh the cache's
// service account token and poll for istiod's proxy status.
cleanup func()
clientFactory kubernetes.ClientFactory
// How often the cache will check for kiali SA client changes.
clientRefreshPollingPeriod time.Duration
// Maps a cluster name to a KubeCache
kubeCache map[string]KubeCache
refreshDuration time.Duration
Expand All @@ -82,14 +76,13 @@ type kialiCacheImpl struct {

func NewKialiCache(clientFactory kubernetes.ClientFactory, cfg config.Config) (KialiCache, error) {
kialiCacheImpl := kialiCacheImpl{
clientFactory: clientFactory,
clientRefreshPollingPeriod: time.Duration(time.Second * 60),
kubeCache: make(map[string]KubeCache),
refreshDuration: time.Duration(cfg.KubernetesConfig.CacheDuration) * time.Second,
tokenNamespaces: make(map[string]namespaceCache),
tokenNamespaceDuration: time.Duration(cfg.KubernetesConfig.CacheTokenNamespaceDuration) * time.Second,
proxyStatusStore: store.New[*kubernetes.ProxyStatus](),
registryStatusStore: store.New[*kubernetes.RegistryStatus](),
clientFactory: clientFactory,
kubeCache: make(map[string]KubeCache),
refreshDuration: time.Duration(cfg.KubernetesConfig.CacheDuration) * time.Second,
tokenNamespaces: make(map[string]namespaceCache),
tokenNamespaceDuration: time.Duration(cfg.KubernetesConfig.CacheTokenNamespaceDuration) * time.Second,
proxyStatusStore: store.New[*kubernetes.ProxyStatus](),
registryStatusStore: store.New[*kubernetes.RegistryStatus](),
}

for cluster, client := range clientFactory.GetSAClients() {
Expand All @@ -114,19 +107,6 @@ func NewKialiCache(clientFactory kubernetes.ClientFactory, cfg config.Config) (K
return nil, errors.New("home cluster not configured in kiali cache")
}

// Starting background goroutines to:
// 1. Refresh the cache's service account token
// These will stop when the context is cancelled.
// Starting goroutines after any errors are handled so as not to leak goroutines.
ctx, cancel := context.WithCancel(context.Background())

// Note that this only watches for changes to the home cluster's token since it is
// expected that the remote cluster tokens will not change. However, that assumption
// may be wrong and in the future the cache may want to watch for changes to all client tokens.
kialiCacheImpl.watchForClientChanges(ctx, clientFactory.GetSAHomeClusterClient().GetToken())

kialiCacheImpl.cleanup = cancel

return &kialiCacheImpl, nil
}

Expand Down Expand Up @@ -157,39 +137,6 @@ func (c *kialiCacheImpl) Stop() {
}(kc)
}
wg.Wait()

c.cleanup()
}

// watchForClientChanges watches for changes to the cache's service account client
// and recreates the cache(s) when the client changes. The client is updated when
// the token for the client changes.
func (c *kialiCacheImpl) watchForClientChanges(ctx context.Context, token string) {
ticker := time.NewTicker(c.clientRefreshPollingPeriod)
go func() {
for {
select {
case <-ticker.C:
if c.clientFactory.GetSAHomeClusterClient().GetToken() != token {
log.Info("[Kiali Cache] Updating cache with new token")

if err := c.KubeCache.UpdateClient(c.clientFactory.GetSAHomeClusterClient()); err != nil {
log.Errorf("[Kiali Cache] Error updating cache with new token. Err: %s", err)
// Try again on the next tick without updating the token.
continue
}

token = c.clientFactory.GetSAHomeClusterClient().GetToken()
} else {
log.Debug("[Kiali Cache] Nothing to refresh")
}
case <-ctx.Done():
log.Debug("[Kiali Cache] Stopping watching for service account token changes")
ticker.Stop()
return
}
}
}()
}

func (c *kialiCacheImpl) GetClusters() []kubernetes.Cluster {
Expand Down
53 changes: 0 additions & 53 deletions kubernetes/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package cache

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
apps_v1 "k8s.io/api/apps/v1"
Expand All @@ -15,57 +13,6 @@ import (
"github.com/kiali/kiali/kubernetes/kubetest"
)

// Need to lock the client when we go to check the value of the token
// but only the tests need this functionality so we can use a fake
// that has access to the kubeCache's lock and has a getClient() method
// that returns the client after locking. Without this, the tests will
// fail with the race detector enabled.
type fakeKubeCache struct {
*kubeCache
}

func (f *fakeKubeCache) getClient() kubernetes.ClientInterface {
f.kubeCache.cacheLock.RLock()
defer f.kubeCache.cacheLock.RUnlock()
return f.kubeCache.client
}

func TestClientUpdatedWhenSAClientChanges(t *testing.T) {
require := require.New(t)
conf := config.NewConfig()
config.Set(conf)

client := kubetest.NewFakeK8sClient()
client.Token = "current-token"
clientFactory := kubetest.NewK8SClientFactoryMock(client)
k8sCache, err := NewKubeCache(client, *conf)
require.NoError(err)

kubeCache := &fakeKubeCache{kubeCache: k8sCache}
kialiCache := &kialiCacheImpl{
clientRefreshPollingPeriod: time.Millisecond,
clientFactory: clientFactory,
KubeCache: kubeCache,
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kialiCache.watchForClientChanges(ctx, client.Token)

// Update the client. This should trigger a cache refresh.
newClient := kubetest.NewFakeK8sClient()
newClient.Token = "new-token"
clientFactory.SetClients(map[string]kubernetes.ClientInterface{conf.KubernetesConfig.ClusterName: newClient})

require.Eventually(
func() bool { return kubeCache.getClient() != client },
500*time.Millisecond,
5*time.Millisecond,
"client and cache should have been updated",
)
}

func TestNoHomeClusterReturnsError(t *testing.T) {
require := require.New(t)
conf := config.NewConfig()
Expand Down
5 changes: 0 additions & 5 deletions kubernetes/cache/kube_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ type KubeCache interface {
// using the Kiali Service Account client.
Client() kubernetes.ClientInterface

// UpdateClient will update the client used by the cache.
// Useful for when the token is refreshed for the client.
// This causes a full refresh of the cache.
UpdateClient(client kubernetes.ClientInterface) error

GetConfigMap(namespace, name string) (*core_v1.ConfigMap, error)
GetDaemonSets(namespace string) ([]apps_v1.DaemonSet, error)
GetDaemonSet(namespace, name string) (*apps_v1.DaemonSet, error)
Expand Down