Skip to content

Commit

Permalink
Use kiali svc token to create shared tracing and prom clients (kiali#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nrfox committed Dec 20, 2023
1 parent effdb25 commit c777d85
Show file tree
Hide file tree
Showing 15 changed files with 375 additions and 370 deletions.
36 changes: 18 additions & 18 deletions business/istio_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestGrafanaWorking(t *testing.T) {

clients := make(map[string]kubernetes.ClientInterface)
clients[conf.KubernetesConfig.ClusterName] = k8s
iss := NewWithBackends(clients, clients, nil, mockJaeger).IstioStatus
iss := NewWithBackends(clients, clients, nil, mockJaeger()).IstioStatus
icsl, error := iss.GetStatus(context.TODO(), conf.KubernetesConfig.ClusterName)
assert.NoError(error)

Expand Down Expand Up @@ -235,7 +235,7 @@ func TestGrafanaDisabled(t *testing.T) {

clients := make(map[string]kubernetes.ClientInterface)
clients[conf.KubernetesConfig.ClusterName] = k8s
iss := NewWithBackends(clients, clients, nil, mockJaeger).IstioStatus
iss := NewWithBackends(clients, clients, nil, mockJaeger()).IstioStatus
icsl, error := iss.GetStatus(context.TODO(), conf.KubernetesConfig.ClusterName)
assert.NoError(error)

Expand Down Expand Up @@ -290,7 +290,7 @@ func TestGrafanaNotWorking(t *testing.T) {

clients := make(map[string]kubernetes.ClientInterface)
clients[conf.KubernetesConfig.ClusterName] = k8s
iss := NewWithBackends(clients, clients, nil, mockJaeger).IstioStatus
iss := NewWithBackends(clients, clients, nil, mockJaeger()).IstioStatus
icsl, error := iss.GetStatus(context.TODO(), conf.KubernetesConfig.ClusterName)
assert.NoError(error)

Expand Down Expand Up @@ -321,7 +321,7 @@ func TestFailingTracingService(t *testing.T) {

clients := make(map[string]kubernetes.ClientInterface)
clients[conf.KubernetesConfig.ClusterName] = k8s
iss := NewWithBackends(clients, clients, nil, mockFailingJaeger).IstioStatus
iss := NewWithBackends(clients, clients, nil, mockFailingJaeger()).IstioStatus
icsl, error := iss.GetStatus(context.TODO(), conf.KubernetesConfig.ClusterName)
assert.NoError(error)

Expand All @@ -347,7 +347,7 @@ func TestOverriddenUrls(t *testing.T) {

clients := make(map[string]kubernetes.ClientInterface)
clients[conf.KubernetesConfig.ClusterName] = k8s
iss := NewWithBackends(clients, clients, nil, mockJaeger).IstioStatus
iss := NewWithBackends(clients, clients, nil, mockJaeger()).IstioStatus
icsl, error := iss.GetStatus(context.TODO(), conf.KubernetesConfig.ClusterName)
assert.NoError(error)

Expand Down Expand Up @@ -377,7 +377,7 @@ func TestCustomDashboardsMainPrometheus(t *testing.T) {

clients := make(map[string]kubernetes.ClientInterface)
clients[conf.KubernetesConfig.ClusterName] = k8s
iss := NewWithBackends(clients, clients, nil, mockJaeger).IstioStatus
iss := NewWithBackends(clients, clients, nil, mockJaeger()).IstioStatus
icsl, error := iss.GetStatus(context.TODO(), conf.KubernetesConfig.ClusterName)
assert.NoError(error)

Expand All @@ -403,7 +403,7 @@ func TestNoIstioComponentFoundError(t *testing.T) {
clients := make(map[string]kubernetes.ClientInterface)
clients[conf.KubernetesConfig.ClusterName] = k8s

iss := NewWithBackends(clients, clients, nil, mockJaeger).IstioStatus
iss := NewWithBackends(clients, clients, nil, mockJaeger()).IstioStatus
_, error := iss.GetStatus(context.TODO(), conf.KubernetesConfig.ClusterName)
assert.Error(error)
}
Expand All @@ -429,7 +429,7 @@ func TestDefaults(t *testing.T) {

clients := make(map[string]kubernetes.ClientInterface)
clients[conf.KubernetesConfig.ClusterName] = k8s
iss := NewWithBackends(clients, clients, nil, mockJaeger).IstioStatus
iss := NewWithBackends(clients, clients, nil, mockJaeger()).IstioStatus

icsl, err := iss.GetStatus(context.TODO(), conf.KubernetesConfig.ClusterName)
assert.NoError(err)
Expand Down Expand Up @@ -478,7 +478,7 @@ func TestNonDefaults(t *testing.T) {

clients := make(map[string]kubernetes.ClientInterface)
clients[conf.KubernetesConfig.ClusterName] = k8s
iss := NewWithBackends(clients, clients, nil, mockJaeger).IstioStatus
iss := NewWithBackends(clients, clients, nil, mockJaeger()).IstioStatus

icsl, error := iss.GetStatus(context.TODO(), conf.KubernetesConfig.ClusterName)
assert.NoError(error)
Expand Down Expand Up @@ -525,7 +525,7 @@ func TestIstiodNotReady(t *testing.T) {

clients := make(map[string]kubernetes.ClientInterface)
clients[conf.KubernetesConfig.ClusterName] = k8s
iss := NewWithBackends(clients, clients, nil, mockJaeger).IstioStatus
iss := NewWithBackends(clients, clients, nil, mockJaeger()).IstioStatus

icsl, error := iss.GetStatus(context.TODO(), conf.KubernetesConfig.ClusterName)
assert.NoError(error)
Expand Down Expand Up @@ -586,7 +586,7 @@ func TestIstiodUnreachable(t *testing.T) {

clients := make(map[string]kubernetes.ClientInterface)
clients[conf.KubernetesConfig.ClusterName] = k8s
iss := NewWithBackends(clients, clients, nil, mockJaeger).IstioStatus
iss := NewWithBackends(clients, clients, nil, mockJaeger()).IstioStatus

icsl, error := iss.GetStatus(context.TODO(), conf.KubernetesConfig.ClusterName)
assert.NoError(error)
Expand Down Expand Up @@ -641,7 +641,7 @@ func TestCustomizedAppLabel(t *testing.T) {

clients := make(map[string]kubernetes.ClientInterface)
clients[conf.KubernetesConfig.ClusterName] = k8s
iss := NewWithBackends(clients, clients, nil, mockJaeger).IstioStatus
iss := NewWithBackends(clients, clients, nil, mockJaeger()).IstioStatus

icsl, error := iss.GetStatus(context.TODO(), conf.KubernetesConfig.ClusterName)
assert.NoError(error)
Expand Down Expand Up @@ -692,7 +692,7 @@ func TestDaemonSetComponentHealthy(t *testing.T) {

clients := make(map[string]kubernetes.ClientInterface)
clients[conf.KubernetesConfig.ClusterName] = k8s
iss := NewWithBackends(clients, clients, nil, mockJaeger).IstioStatus
iss := NewWithBackends(clients, clients, nil, mockJaeger()).IstioStatus

icsl, error := iss.GetStatus(context.TODO(), conf.KubernetesConfig.ClusterName)
assert.NoError(error)
Expand Down Expand Up @@ -739,7 +739,7 @@ func TestDaemonSetComponentUnhealthy(t *testing.T) {

clients := make(map[string]kubernetes.ClientInterface)
clients[conf.KubernetesConfig.ClusterName] = k8s
iss := NewWithBackends(clients, clients, nil, mockJaeger).IstioStatus
iss := NewWithBackends(clients, clients, nil, mockJaeger()).IstioStatus

icsl, error := iss.GetStatus(context.TODO(), conf.KubernetesConfig.ClusterName)
assert.NoError(error)
Expand Down Expand Up @@ -780,16 +780,16 @@ func assertNotPresent(assert *assert.Assertions, icsl kubernetes.IstioComponentS
assert.False(componentFound)
}

func mockJaeger() (tracing.ClientInterface, error) {
func mockJaeger() tracing.ClientInterface {
j := new(tracingtest.TracingClientMock)
j.On("GetServiceStatus").Return(true, nil)
return tracing.ClientInterface(j), nil
return j
}

func mockFailingJaeger() (tracing.ClientInterface, error) {
func mockFailingJaeger() tracing.ClientInterface {
j := new(tracingtest.TracingClientMock)
j.On("GetServiceStatus").Return(false, errors.New("error connecting with tracing service"))
return tracing.ClientInterface(j), nil
return j
}

// Setup K8S api call to fetch Pods
Expand Down
32 changes: 6 additions & 26 deletions business/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ var (

// Start sets the globals necessary for the business layer.
// TODO: Refactor out global vars.
func Start(cf kubernetes.ClientFactory, controlPlaneMonitor ControlPlaneMonitor, cache cache.KialiCache) {
func Start(cf kubernetes.ClientFactory, controlPlaneMonitor ControlPlaneMonitor, cache cache.KialiCache, prom prometheus.ClientInterface, traceClient tracing.ClientInterface) {
clientFactory = cf
kialiCache = cache
poller = controlPlaneMonitor
prometheusClient = prom
tracingClient = traceClient
}

// Get the business.Layer
Expand All @@ -58,30 +60,8 @@ func Get(authInfo *api.AuthInfo) (*Layer, error) {
return nil, err
}

// Use an existing Prometheus client if it exists, otherwise create and use in the future
if prometheusClient == nil {
prom, err := prometheus.NewClient()
if err != nil {
prometheusClient = nil
return nil, err
}
prometheusClient = prom
}

// Create Tracing client
tracingLoader := func() (tracing.ClientInterface, error) {
var err error
if tracingClient == nil {
tracingClient, err = tracing.NewClient(authInfo.Token)
if err != nil {
tracingClient = nil
}
}
return tracingClient, err
}

kialiSAClient := clientFactory.GetSAClients()
return NewWithBackends(userClients, kialiSAClient, prometheusClient, tracingLoader), nil
return NewWithBackends(userClients, kialiSAClient, prometheusClient, tracingClient), nil
}

// SetWithBackends allows for specifying the ClientFactory and Prometheus clients to be used.
Expand All @@ -94,7 +74,7 @@ func SetWithBackends(cf kubernetes.ClientFactory, prom prometheus.ClientInterfac
// NewWithBackends creates the business layer using the passed k8sClients and prom clients.
// Note that the client passed here should *not* be the Kiali ServiceAccount client.
// It should be the user client based on the logged in user's token.
func NewWithBackends(userClients map[string]kubernetes.ClientInterface, kialiSAClients map[string]kubernetes.ClientInterface, prom prometheus.ClientInterface, tracingClient TracingLoader) *Layer {
func NewWithBackends(userClients map[string]kubernetes.ClientInterface, kialiSAClients map[string]kubernetes.ClientInterface, prom prometheus.ClientInterface, traceClient tracing.ClientInterface) *Layer {
temporaryLayer := &Layer{}
conf := config.Get()

Expand All @@ -105,7 +85,6 @@ func NewWithBackends(userClients map[string]kubernetes.ClientInterface, kialiSAC
temporaryLayer.IstioConfig = IstioConfigService{config: *conf, userClients: userClients, kialiCache: kialiCache, businessLayer: temporaryLayer, controlPlaneMonitor: poller}
temporaryLayer.IstioStatus = NewIstioStatusService(userClients, temporaryLayer, poller)
temporaryLayer.IstioCerts = IstioCertsService{k8s: userClients[homeClusterName], businessLayer: temporaryLayer}
temporaryLayer.Tracing = TracingService{loader: tracingClient, businessLayer: temporaryLayer}
temporaryLayer.Namespace = NewNamespaceService(userClients, kialiSAClients, kialiCache, *conf)
temporaryLayer.Mesh = NewMeshService(kialiSAClients, kialiCache, temporaryLayer.Namespace, *conf)
temporaryLayer.OpenshiftOAuth = OpenshiftOAuthService{k8s: userClients[homeClusterName], kialiSAClient: kialiSAClients[homeClusterName]}
Expand All @@ -118,6 +97,7 @@ func NewWithBackends(userClients map[string]kubernetes.ClientInterface, kialiSAC
temporaryLayer.TokenReview = NewTokenReview(userClients[homeClusterName])
temporaryLayer.Validations = IstioValidationsService{userClients: userClients, businessLayer: temporaryLayer}
temporaryLayer.Workload = *NewWorkloadService(userClients, prom, kialiCache, temporaryLayer, conf)
temporaryLayer.Tracing = NewTracingService(conf, traceClient, &temporaryLayer.Svc, &temporaryLayer.Workload)

return temporaryLayer
}
51 changes: 26 additions & 25 deletions business/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package business

import (
"context"
"fmt"
"strings"
"sync"
"time"
Expand All @@ -16,37 +17,42 @@ import (
)

type (
TracingLoader = func() (tracing.ClientInterface, error)
SpanFilter = func(span *jaegerModels.Span) bool
SpanFilter = func(span *jaegerModels.Span) bool
)

type TracingService struct {
loader TracingLoader
loaderErr error
tracing tracing.ClientInterface
businessLayer *Layer
conf *config.Config
svc *SvcService
tracing tracing.ClientInterface
workload *WorkloadService
}

func NewTracingService(conf *config.Config, tracing tracing.ClientInterface, svcService *SvcService, workloadService *WorkloadService) TracingService {
return TracingService{
conf: conf,
svc: svcService,
tracing: tracing,
workload: workloadService,
}
}

func (in *TracingService) client() (tracing.ClientInterface, error) {
if in.tracing != nil {
return in.tracing, nil
} else if in.loaderErr != nil {
return nil, in.loaderErr
if !in.conf.ExternalServices.Tracing.Enabled {
return nil, fmt.Errorf("Tracing is not enabled")
}
in.tracing, in.loaderErr = in.loader()
return in.tracing, in.loaderErr
return in.tracing, nil
}

func (in *TracingService) getFilteredSpans(ns, app string, query models.TracingQuery, filter SpanFilter) ([]model.TracingSpan, error) {
// This is info needed for Tempo as it is not in the results by default
if config.Get().ExternalServices.Tracing.Provider == tracing.TEMPO {
if in.conf.ExternalServices.Tracing.Provider == config.TempoProvider {
query.Tags["http.method"] = ".*"
}
r, err := in.GetAppTraces(ns, app, query)
if err != nil {
return []model.TracingSpan{}, err
}
spans := tracesToSpans(app, r, filter)
spans := tracesToSpans(app, r, filter, in.conf)
return spans, nil
}

Expand Down Expand Up @@ -79,9 +85,7 @@ func (in *TracingService) GetServiceSpans(ctx context.Context, ns, service strin
)
defer end()

// TODO: Need to include cluster here. This will require custom tracing labeling of traces to add the cluster name
// since it is not standard.
app, err := in.businessLayer.Svc.GetServiceAppName(ctx, query.Cluster, ns, service)
app, err := in.svc.GetServiceAppName(ctx, query.Cluster, ns, service)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -112,7 +116,7 @@ func (in *TracingService) GetWorkloadSpans(ctx context.Context, ns, workload str
)
defer end()

app, err := in.businessLayer.Workload.GetWorkloadAppName(ctx, query.Cluster, ns, workload)
app, err := in.workload.GetWorkloadAppName(ctx, query.Cluster, ns, workload)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -165,9 +169,7 @@ func (in *TracingService) GetServiceTraces(ctx context.Context, ns, service stri
)
defer end()

// TODO: Need to include cluster here. This will require custom tracing labeling of traces to add the cluster name
// since it is not standard.
app, err := in.businessLayer.Svc.GetServiceAppName(ctx, query.Cluster, ns, service)
app, err := in.svc.GetServiceAppName(ctx, query.Cluster, ns, service)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -209,7 +211,7 @@ func (in *TracingService) GetWorkloadTraces(ctx context.Context, ns, workload st
)
defer end()

app, err := in.businessLayer.Workload.GetWorkloadAppName(ctx, query.Cluster, ns, workload)
app, err := in.workload.GetWorkloadAppName(ctx, query.Cluster, ns, workload)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -339,13 +341,13 @@ func spanMatchesWorkload(span *jaegerModels.Span, namespace, workload string) bo
return false
}

func tracesToSpans(app string, r *model.TracingResponse, filter SpanFilter) []model.TracingSpan {
func tracesToSpans(app string, r *model.TracingResponse, filter SpanFilter, conf *config.Config) []model.TracingSpan {
spans := []model.TracingSpan{}
for _, trace := range r.Data {
// Diferent for Tempo & Jaeger
// For Tempo the proccess matched with the service name of the trace batch
// So t is already filtered in the query
if config.Get().ExternalServices.Tracing.Provider == tracing.TEMPO {
if conf.ExternalServices.Tracing.Provider == config.TempoProvider {
// Second, find spans for these processes
for _, span := range trace.Spans {
if span.Process.ServiceName == r.TracingServiceName {
Expand Down Expand Up @@ -378,7 +380,6 @@ func tracesToSpans(app string, r *model.TracingResponse, filter SpanFilter) []mo
}
}
}

}
log.Tracef("Found %d spans in the %d traces for app %s", len(spans), len(r.Data), app)
return spans
Expand Down

0 comments on commit c777d85

Please sign in to comment.