Skip to content

Commit

Permalink
Add separate cluster for configured kiali instance
Browse files Browse the repository at this point in the history
  • Loading branch information
nrfox committed Dec 6, 2023
1 parent 9a11e24 commit b0bdf4e
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 24 deletions.
58 changes: 44 additions & 14 deletions business/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
core_v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kiali/kiali/config"
"github.com/kiali/kiali/kubernetes"
Expand Down Expand Up @@ -308,6 +309,42 @@ func (in *MeshService) GetClusters(r *http.Request) ([]kubernetes.Cluster, error
clusters = append(clusters, meshCluster)
}

clusterNameSet := make(map[string]struct{})
for _, cluster := range clusters {
clusterNameSet[cluster.Name] = struct{}{}
}

for _, cfgurl := range in.conf.KialiFeatureFlags.Clustering.KialiURLs {
if cfgurl.ClusterName == "" {
continue
}
if _, found := clusterNameSet[cfgurl.ClusterName]; !found {
// Add a cluster for each configured Kiali instance that doesn't have a cluster yet.
cluster := kubernetes.Cluster{
Name: cfgurl.ClusterName,
IsKialiHome: false,
KialiInstances: []kubernetes.KialiInstance{{
ServiceName: cfgurl.InstanceName,
Namespace: cfgurl.Namespace,
Url: cfgurl.URL,
}},
}
clusters = append(clusters, cluster)
} else {
// Add the Kiali instance to the cluster that already exists.
for idx := range clusters {
if clusters[idx].Name == cfgurl.ClusterName {
clusters[idx].KialiInstances = append(clusters[idx].KialiInstances, kubernetes.KialiInstance{
ServiceName: cfgurl.InstanceName,
Namespace: cfgurl.Namespace,
Url: cfgurl.URL,
})
break
}
}
}
}

in.kialiCache.SetClusters(clusters)

return clusters, nil
Expand All @@ -331,21 +368,17 @@ func (in *MeshService) discoverKiali(ctx context.Context, clusterName string, r
if err != nil {
log.Warningf("Discovery for Kiali instances in cluster [%s] failed. Unable to find kube cache for cluster [%s]", clusterName, clusterName)
// But still the Kiali instance can be configured to show the instance and the URL in Mesh page
return in.appendKialiInstancesFromConfig([]kubernetes.KialiInstance{}, clusterName)
return in.appendKialiInstancesFromConfig([]kubernetes.KialiInstance{})
}

// The operator and the helm charts set this fixed label. It's also
// present in the Istio addon manifest of Kiali.
kialiAppLabel := map[string]string{"app.kubernetes.io/part-of": "kiali"}
// TODO: getting services will fail spectacularly if cluster wide mode is not enabled - only call when in cluster-wide mode
if !config.Get().Deployment.ClusterWideAccess {
return in.appendKialiInstancesFromConfig([]kubernetes.KialiInstance{}, clusterName)
}
services, err := kubeCache.GetServices("", kialiAppLabel)
kialiAppLabel := "app.kubernetes.io/part-of=kiali"
services, err := kubeCache.GetServices(metav1.NamespaceAll, kialiAppLabel)
if err != nil {
log.Warningf("Discovery for Kiali instances in cluster [%s] failed: %s", clusterName, err.Error())
// But still the Kiali instance can be configured to show the instance and the URL in Mesh page
return in.appendKialiInstancesFromConfig([]kubernetes.KialiInstance{}, clusterName)
return in.appendKialiInstancesFromConfig([]kubernetes.KialiInstance{})
}

var instances []kubernetes.KialiInstance
Expand All @@ -366,16 +399,13 @@ func (in *MeshService) discoverKiali(ctx context.Context, clusterName string, r
instances = append(instances, kiali)
}
// Read the rest of Kiali instances configured
instances = in.appendKialiInstancesFromConfig(instances, clusterName)
instances = in.appendKialiInstancesFromConfig(instances)
return instances
}

// appendKialiInstancesFromConfig appends the rest of Kiali instances which are configured in KialiFeatureFlags.Clustering.KialiURLs into existing list of instances.
func (in *MeshService) appendKialiInstancesFromConfig(instances []kubernetes.KialiInstance, clusterName string) []kubernetes.KialiInstance {
for _, cfgurl := range config.Get().KialiFeatureFlags.Clustering.KialiURLs {
if cfgurl.ClusterName != clusterName {
continue
}
func (in *MeshService) appendKialiInstancesFromConfig(instances []kubernetes.KialiInstance) []kubernetes.KialiInstance {
for _, cfgurl := range in.conf.KialiFeatureFlags.Clustering.KialiURLs {
found := false
for _, kiali := range instances {
if cfgurl.InstanceName == kiali.ServiceName && cfgurl.Namespace == kiali.Namespace {
Expand Down
62 changes: 62 additions & 0 deletions business/mesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,3 +1115,65 @@ func TestIstioConfigMapName(t *testing.T) {
})
}
}

func TestGetClustersShowsConfiguredKialiInstances(t *testing.T) {
require := require.New(t)
assert := assert.New(t)
conf := config.NewConfig()
conf.KialiFeatureFlags.Clustering.KialiURLs = []config.KialiURL{{
InstanceName: "kiali",
Namespace: "istio-system",
ClusterName: "west",
URL: "kiali.istio-system.west",
}}
kubernetes.SetConfig(t, *conf)

k8s := kubetest.NewFakeK8sClient()
business.SetupBusinessLayer(t, k8s, *conf)
clients := map[string]kubernetes.ClientInterface{conf.KubernetesConfig.ClusterName: k8s}
svc := business.NewWithBackends(clients, clients, nil, nil).Mesh
clusters, err := svc.GetClusters(nil)

require.NoError(err)
require.Len(clusters, 1)
require.Len(clusters[0].KialiInstances, 1)
kialiInstance := clusters[0].KialiInstances[0]
assert.Equal("kiali", kialiInstance.ServiceName)
assert.Equal("istio-system", kialiInstance.Namespace)
assert.Equal("kiali.istio-system.west", kialiInstance.Url)
}

func TestGetClustersWorksWithNamespacedScope(t *testing.T) {
require := require.New(t)
assert := assert.New(t)
conf := config.NewConfig()
conf.Server.WebFQDN = "localhost"
conf.Server.WebSchema = "http"
conf.Deployment.ClusterWideAccess = false
conf.Deployment.AccessibleNamespaces = []string{"istio-system"}
kubernetes.SetConfig(t, *conf)

kialiService := &core_v1.Service{
ObjectMeta: v1.ObjectMeta{
Name: "kiali",
Namespace: "istio-system",
Labels: map[string]string{"app.kubernetes.io/part-of": "kiali"},
},
}
k8s := kubetest.NewFakeK8sClient(
&core_v1.Namespace{ObjectMeta: v1.ObjectMeta{Name: "istio-system"}},
kialiService,
)
business.SetupBusinessLayer(t, k8s, *conf)
clients := map[string]kubernetes.ClientInterface{conf.KubernetesConfig.ClusterName: k8s}
svc := business.NewWithBackends(clients, clients, nil, nil).Mesh
clusters, err := svc.GetClusters(nil)

require.NoError(err)
require.Len(clusters, 1)
require.Len(clusters[0].KialiInstances, 1)
kialiInstance := clusters[0].KialiInstances[0]
assert.Equal("kiali", kialiInstance.ServiceName)
assert.Equal("istio-system", kialiInstance.Namespace)
assert.Equal("http://localhost:20001", kialiInstance.Url)
}
2 changes: 1 addition & 1 deletion business/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (in *SvcService) getServiceListForCluster(ctx context.Context, criteria Ser
log.Warningf("Services not filtered. Selector %s not valid", criteria.ServiceSelector)
}
}
svcs, err2 = kubeCache.GetServices(criteria.Namespace, selectorLabels)
svcs, err2 = kubeCache.GetServicesBySelectorLabels(criteria.Namespace, selectorLabels)
if err2 != nil {
log.Errorf("Error fetching Services per namespace %s: %s", criteria.Namespace, err2)
errChan <- err2
Expand Down
2 changes: 1 addition & 1 deletion graph/telemetry/istio/appender/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ type servicesError struct {
errorMsg string
}

func (s *servicesError) GetServices(namespace string, selectorLabels map[string]string) ([]core_v1.Service, error) {
func (s *servicesError) GetServicesBySelectorLabels(namespace string, selectorLabels map[string]string) ([]core_v1.Service, error) {
return nil, fmt.Errorf(s.errorMsg)
}

Expand Down
56 changes: 48 additions & 8 deletions kubernetes/cache/kube_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ type KubeCache interface {
GetEndpoints(namespace, name string) (*core_v1.Endpoints, error)
GetStatefulSets(namespace string) ([]apps_v1.StatefulSet, error)
GetStatefulSet(namespace, name string) (*apps_v1.StatefulSet, error)
GetServices(namespace string, selectorLabels map[string]string) ([]core_v1.Service, error)
GetServicesBySelectorLabels(namespace string, selectorLabels map[string]string) ([]core_v1.Service, error)
GetServices(namespace string, labelSelector string) ([]core_v1.Service, error)
GetService(namespace string, name string) (*core_v1.Service, error)
GetPods(namespace, labelSelector string) ([]core_v1.Pod, error)
GetReplicaSets(namespace string) ([]apps_v1.ReplicaSet, error)
Expand Down Expand Up @@ -691,24 +692,63 @@ func (c *kubeCache) GetStatefulSet(namespace, name string) (*apps_v1.StatefulSet
return retSet, nil
}

// GetServices returns list of services filtered by Spec.Selector instead of Metadata.Labels
func (c *kubeCache) GetServices(namespace string, selectorLabels map[string]string) ([]core_v1.Service, error) {
// GetServices returns list of services filtered by the labelSelector.
func (c *kubeCache) GetServices(namespace string, labelSelector string) ([]core_v1.Service, error) {
// Read lock will prevent the cache from being refreshed while we are reading from the lister
// but it won't prevent other routines from reading from the lister.
defer c.cacheLock.RUnlock()
c.cacheLock.RLock()

var services []*core_v1.Service
var err error
selector, err := labels.Parse(labelSelector)
if err != nil {
return nil, err
}

services := []*core_v1.Service{}
if namespace == metav1.NamespaceAll {
services, err = c.getCacheLister(namespace).serviceLister.List(labels.Everything())
if c.clusterScoped {
services, err = c.clusterCacheLister.serviceLister.List(selector)
if err != nil {
return nil, err
}
} else {
for _, nsCacheLister := range c.nsCacheLister {
servicesNamespaced, err := nsCacheLister.serviceLister.List(selector)
if err != nil {
return nil, err
}
services = append(services, servicesNamespaced...)
}
}
} else {
services, err = c.getCacheLister(namespace).serviceLister.Services(namespace).List(labels.Everything())
services, err = c.getCacheLister(namespace).serviceLister.Services(namespace).List(selector)
if err != nil {
return nil, err
}
}

log.Tracef("[Kiali Cache] Get [resource: Service] for [namespace: %s] = %d", namespace, len(services))

var retServices []core_v1.Service
for _, ss := range services {
s := ss.DeepCopy()
s.Kind = kubernetes.ServiceType
retServices = append(retServices, *s)
}
return retServices, nil
}

// GetServicesBySelectorLabels returns list of services filtered by Spec.Selector instead of Metadata.Labels
func (c *kubeCache) GetServicesBySelectorLabels(namespace string, selectorLabels map[string]string) ([]core_v1.Service, error) {
// Read lock will prevent the cache from being refreshed while we are reading from the lister
// but it won't prevent other routines from reading from the lister.
defer c.cacheLock.RUnlock()
c.cacheLock.RLock()

services, err := c.GetServices(namespace, labels.Everything().String())
if err != nil {
return nil, err
}
log.Tracef("[Kiali Cache] Get [resource: Service] for [namespace: %s] = %d", namespace, len(services))

selector := labels.Set(selectorLabels)
retServices := []core_v1.Service{}
Expand Down

0 comments on commit b0bdf4e

Please sign in to comment.