Skip to content

Commit

Permalink
Consolidate Cache work (#1749)
Browse files Browse the repository at this point in the history
* Cache v2
Focus on cache per namespace
Don't capture cache errors, it's a hack that create locking
Simplify the HasSync operation in same way as Istio is doing

* Add Istio indexers and split logic in different files

* Initialize the cache and use it from the business package

* Get Services and VirtualServices from Kiali Cache

* Document cache config and add extra logs

* Perf: include/exclude controllers for Workloads

* Connect business layer with cache for Services/Deployments

* Remove old cache v1
Fix format issues

* Fetch VS/DR from kialiCache

* Add Pods and ReplicaSets into Kiali cache

* Add Gateway and ServiceEntries to the cache

* Add Istio cached types to config

* Optimize query istio types

* Update operator with cache settings

* Introduce simple timeout caching for namespaces

* Fix bad check in fetch DR.
Exclude by default DeploymentConfig resources.

* Protect kialiCache with NS permissions

* Update cache config in operator cr

* Fix shadowed err assignments

* Fix lint reported issues

* Use seconds for cache configuration

* Refresh namespaces cache on GetNamespace() expiration

* Add GetDeployment() to cache scenario

* Fix format

* Fix bad indentation introduced by IDE

* Fix constants

* Fix initialization Mutex flaws
  • Loading branch information
lucasponce committed Oct 18, 2019
1 parent eb3cf5c commit 973a34c
Show file tree
Hide file tree
Showing 30 changed files with 1,226 additions and 965 deletions.
23 changes: 16 additions & 7 deletions business/apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (

// AppService deals with fetching Workloads group by "app" label, which will be identified as an "application"
type AppService struct {
prom prometheus.ClientInterface
k8s kubernetes.IstioClientInterface
prom prometheus.ClientInterface
k8s kubernetes.IstioClientInterface
businessLayer *Layer
}

// GetAppList is the API handler to fetch the list of applications in a given namespace
Expand All @@ -31,7 +32,7 @@ func (in *AppService) GetAppList(namespace string) (models.AppList, error) {
Namespace: models.Namespace{Name: namespace},
Apps: []models.AppListItem{},
}
apps, err := fetchNamespaceApps(in.k8s, namespace, "")
apps, err := fetchNamespaceApps(in.businessLayer, namespace, "")
if err != nil {
return *appList, err
}
Expand All @@ -57,7 +58,7 @@ func (in *AppService) GetApp(namespace string, appName string) (models.App, erro
defer promtimer.ObserveNow(&err)

appInstance := &models.App{Namespace: models.Namespace{Name: namespace}, Name: appName}
namespaceApps, err := fetchNamespaceApps(in.k8s, namespace, appName)
namespaceApps, err := fetchNamespaceApps(in.businessLayer, namespace, appName)
if err != nil {
return *appInstance, err
}
Expand Down Expand Up @@ -134,7 +135,7 @@ func castAppDetails(services []core_v1.Service, ws models.Workloads) namespaceAp
// Helper method to fetch all applications for a given namespace.
// Optionally if appName parameter is provided, it filters apps for that name.
// Return an error on any problem.
func fetchNamespaceApps(k8s kubernetes.IstioClientInterface, namespace string, appName string) (namespaceApps, error) {
func fetchNamespaceApps(layer *Layer, namespace string, appName string) (namespaceApps, error) {
var services []core_v1.Service
var ws models.Workloads
cfg := config.Get()
Expand All @@ -151,7 +152,15 @@ func fetchNamespaceApps(k8s kubernetes.IstioClientInterface, namespace string, a
go func() {
defer wg.Done()
var err error
services, err = k8s.GetServices(namespace, nil)
// Check if namespace is cached
if kialiCache != nil && kialiCache.CheckNamespace(namespace) {
// Cache uses Kiali ServiceAccount, check if user can access to the namespace
if _, err = layer.Namespace.GetNamespace(namespace); err == nil {
services, err = kialiCache.GetServices(namespace, nil)
}
} else {
services, err = layer.k8s.GetServices(namespace, nil)
}
if appName != "" {
selector := labels.Set(map[string]string{cfg.IstioLabels.AppLabelName: appName}).AsSelector()
services = kubernetes.FilterServicesForSelector(selector, services)
Expand All @@ -165,7 +174,7 @@ func fetchNamespaceApps(k8s kubernetes.IstioClientInterface, namespace string, a
go func() {
defer wg.Done()
var err error
ws, err = fetchWorkloads(k8s, namespace, labelSelector)
ws, err = fetchWorkloads(layer, namespace, labelSelector)
if err != nil {
log.Errorf("Error fetching Workload per namespace %s: %s", namespace, err)
errChan <- err
Expand Down
5 changes: 4 additions & 1 deletion business/apps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (

func setupAppService(k8s *kubetest.K8SClientMock) AppService {
prom := new(prometheustest.PromClientMock)
return AppService{k8s: k8s, prom: prom}
layer := Layer{
k8s: k8s,
}
return AppService{k8s: k8s, prom: prom, businessLayer: &layer}
}

func TestGetAppListFromDeployments(t *testing.T) {
Expand Down
26 changes: 17 additions & 9 deletions business/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (

// HealthService deals with fetching health from various sources and convert to kiali model
type HealthService struct {
prom prometheus.ClientInterface
k8s kubernetes.IstioClientInterface
prom prometheus.ClientInterface
k8s kubernetes.IstioClientInterface
businessLayer *Layer
}

// GetServiceHealth returns a service health (service request error rate)
Expand All @@ -43,7 +44,7 @@ func (in *HealthService) GetAppHealth(namespace, app, rateInterval string, query
selectorLabels[appLabel] = app
labelSelector := labels.FormatLabels(selectorLabels)

ws, err := fetchWorkloads(in.k8s, namespace, labelSelector)
ws, err := fetchWorkloads(in.businessLayer, namespace, labelSelector)
if err != nil {
log.Errorf("Error fetching Workloads per namespace %s and app %s: %s", namespace, app, err)
return models.AppHealth{}, err
Expand Down Expand Up @@ -84,7 +85,7 @@ func (in *HealthService) GetWorkloadHealth(namespace, workload, rateInterval str
promtimer := internalmetrics.GetGoFunctionMetric("business", "HealthService", "GetWorkloadHealth")
defer promtimer.ObserveNow(&err)

w, err := fetchWorkload(in.k8s, namespace, workload)
w, err := fetchWorkload(in.businessLayer, namespace, workload)
if err != nil {
return models.WorkloadHealth{}, err
}
Expand Down Expand Up @@ -116,7 +117,7 @@ func (in *HealthService) GetNamespaceAppHealth(namespace, rateInterval string, q
promtimer := internalmetrics.GetGoFunctionMetric("business", "HealthService", "GetNamespaceAppHealth")
defer promtimer.ObserveNow(&err)

appEntities, err := fetchNamespaceApps(in.k8s, namespace, "")
appEntities, err := fetchNamespaceApps(in.businessLayer, namespace, "")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -163,12 +164,19 @@ func (in *HealthService) GetNamespaceServiceHealth(namespace, rateInterval strin
var err error
promtimer := internalmetrics.GetGoFunctionMetric("business", "HealthService", "GetNamespaceServiceHealth")
defer promtimer.ObserveNow(&err)

services, err := in.k8s.GetServices(namespace, nil)
var services []core_v1.Service
// Check if namespace is cached
if kialiCache != nil && kialiCache.CheckNamespace(namespace) {
// Cache uses Kiali ServiceAccount, check if user can access to the namespace
if _, err = in.businessLayer.Namespace.GetNamespace(namespace); err == nil {
services, err = kialiCache.GetServices(namespace, nil)
}
} else {
services, err = in.k8s.GetServices(namespace, nil)
}
if err != nil {
return nil, err
}

return in.getNamespaceServiceHealth(namespace, services, rateInterval, queryTime), nil
}

Expand Down Expand Up @@ -204,7 +212,7 @@ func (in *HealthService) GetNamespaceWorkloadHealth(namespace, rateInterval stri
promtimer := internalmetrics.GetGoFunctionMetric("business", "HealthService", "GetNamespaceWorkloadHealth")
defer promtimer.ObserveNow(&err)

wl, err := fetchWorkloads(in.k8s, namespace, "")
wl, err := fetchWorkloads(in.businessLayer, namespace, "")
if err != nil {
return nil, err
}
Expand Down
25 changes: 20 additions & 5 deletions business/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ func TestGetAppHealth(t *testing.T) {

// Setup mocks
k8s := new(kubetest.K8SClientMock)
layer := Layer{
k8s: k8s,
}
prom := new(prometheustest.PromClientMock)
conf := config.NewConfig()
config.Set(conf)
hs := HealthService{k8s: k8s, prom: prom}
hs := HealthService{k8s: k8s, prom: prom, businessLayer: &layer}

k8s.On("IsOpenShift").Return(true)
k8s.MockEmptyWorkloads("ns")
Expand All @@ -70,10 +73,13 @@ func TestGetWorkloadHealth(t *testing.T) {

// Setup mocks
k8s := new(kubetest.K8SClientMock)
layer := Layer{
k8s: k8s,
}
prom := new(prometheustest.PromClientMock)
conf := config.NewConfig()
config.Set(conf)
hs := HealthService{k8s: k8s, prom: prom}
hs := HealthService{k8s: k8s, prom: prom, businessLayer: &layer}
k8s.On("IsOpenShift").Return(true)
k8s.MockEmptyWorkload("ns", "reviews-v1")
k8s.On("GetDeployment", "ns", "reviews-v1").Return(&fakeDeploymentsHealthReview()[0], nil)
Expand All @@ -97,10 +103,13 @@ func TestGetAppHealthWithoutIstio(t *testing.T) {

// Setup mocks
k8s := new(kubetest.K8SClientMock)
layer := Layer{
k8s: k8s,
}
prom := new(prometheustest.PromClientMock)
conf := config.NewConfig()
config.Set(conf)
hs := HealthService{k8s: k8s, prom: prom}
hs := HealthService{k8s: k8s, prom: prom, businessLayer: &layer}

k8s.On("IsOpenShift").Return(true)
k8s.MockEmptyWorkloads("ns")
Expand All @@ -121,10 +130,13 @@ func TestGetWorkloadHealthWithoutIstio(t *testing.T) {

// Setup mocks
k8s := new(kubetest.K8SClientMock)
layer := Layer{
k8s: k8s,
}
prom := new(prometheustest.PromClientMock)
conf := config.NewConfig()
config.Set(conf)
hs := HealthService{k8s: k8s, prom: prom}
hs := HealthService{k8s: k8s, prom: prom, businessLayer: &layer}
k8s.On("IsOpenShift").Return(true)
k8s.MockEmptyWorkload("ns", "reviews-v1")
k8s.On("GetDeployment", "ns", "reviews-v1").Return(&fakeDeploymentsHealthReview()[0], nil)
Expand All @@ -142,10 +154,13 @@ func TestGetWorkloadHealthWithoutIstio(t *testing.T) {
func TestGetNamespaceAppHealthWithoutIstio(t *testing.T) {
// Setup mocks
k8s := new(kubetest.K8SClientMock)
layer := Layer{
k8s: k8s,
}
prom := new(prometheustest.PromClientMock)
conf := config.NewConfig()
config.Set(conf)
hs := HealthService{k8s: k8s, prom: prom}
hs := HealthService{k8s: k8s, prom: prom, businessLayer: &layer}

k8s.On("IsOpenShift").Return(false)
k8s.MockEmptyWorkloads("ns")
Expand Down
55 changes: 50 additions & 5 deletions business/istio_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
)

type IstioConfigService struct {
k8s kubernetes.IstioClientInterface
k8s kubernetes.IstioClientInterface
businessLayer *Layer
}

type IstioConfigCriteria struct {
Expand Down Expand Up @@ -133,7 +134,18 @@ func (in *IstioConfigService) GetIstioConfigList(criteria IstioConfigCriteria) (
go func(errChan chan error) {
defer wg.Done()
if criteria.IncludeGateways {
if gg, ggErr := in.k8s.GetGateways(criteria.Namespace); ggErr == nil {
var gg []kubernetes.IstioObject
var ggErr error
// Check if namespace is cached
if kialiCache != nil && kialiCache.CheckIstioResource(kubernetes.GatewayType) && kialiCache.CheckNamespace(criteria.Namespace) {
// Cache uses Kiali ServiceAccount, check if user can access to the namespace
if _, ggErr = in.businessLayer.Namespace.GetNamespace(criteria.Namespace); ggErr == nil {
gg, ggErr = kialiCache.GetIstioResources(kubernetes.GatewayType, criteria.Namespace)
}
} else {
gg, ggErr = in.k8s.GetGateways(criteria.Namespace)
}
if ggErr == nil {
(&istioConfigList.Gateways).Parse(gg)
} else {
errChan <- ggErr
Expand All @@ -144,7 +156,18 @@ func (in *IstioConfigService) GetIstioConfigList(criteria IstioConfigCriteria) (
go func(errChan chan error) {
defer wg.Done()
if criteria.IncludeVirtualServices {
if vs, vsErr := in.k8s.GetVirtualServices(criteria.Namespace, ""); vsErr == nil {
var vs []kubernetes.IstioObject
var vsErr error
// Check if namespace is cached
if kialiCache != nil && kialiCache.CheckIstioResource(kubernetes.VirtualServiceType) && kialiCache.CheckNamespace(criteria.Namespace) {
// Cache uses Kiali ServiceAccount, check if user can access to the namespace
if _, vsErr = in.businessLayer.Namespace.GetNamespace(criteria.Namespace); vsErr == nil {
vs, vsErr = kialiCache.GetIstioResources(kubernetes.VirtualServiceType, criteria.Namespace)
}
} else {
vs, vsErr = in.k8s.GetVirtualServices(criteria.Namespace, "")
}
if vsErr == nil {
(&istioConfigList.VirtualServices).Parse(vs)
} else {
errChan <- vsErr
Expand All @@ -155,7 +178,18 @@ func (in *IstioConfigService) GetIstioConfigList(criteria IstioConfigCriteria) (
go func(errChan chan error) {
defer wg.Done()
if criteria.IncludeDestinationRules {
if dr, drErr := in.k8s.GetDestinationRules(criteria.Namespace, ""); drErr == nil {
var dr []kubernetes.IstioObject
var drErr error
// Check if namespace is cached
if kialiCache != nil && kialiCache.CheckIstioResource(kubernetes.DestinationRuleType) && kialiCache.CheckNamespace(criteria.Namespace) {
// Cache uses Kiali ServiceAccount, check if user can access to the namespace
if _, drErr = in.businessLayer.Namespace.GetNamespace(criteria.Namespace); drErr == nil {
dr, drErr = kialiCache.GetIstioResources(kubernetes.DestinationRuleType, criteria.Namespace)
}
} else {
dr, drErr = in.k8s.GetDestinationRules(criteria.Namespace, "")
}
if drErr == nil {
(&istioConfigList.DestinationRules).Parse(dr)
} else {
errChan <- drErr
Expand All @@ -166,7 +200,18 @@ func (in *IstioConfigService) GetIstioConfigList(criteria IstioConfigCriteria) (
go func(errChan chan error) {
defer wg.Done()
if criteria.IncludeServiceEntries {
if se, seErr := in.k8s.GetServiceEntries(criteria.Namespace); seErr == nil {
var se []kubernetes.IstioObject
var seErr error
// Check if namespace is cached
if kialiCache != nil && kialiCache.CheckIstioResource(kubernetes.ServiceentryType) && kialiCache.CheckNamespace(criteria.Namespace) {
// Cache uses Kiali ServiceAccount, check if user can access to the namespace
if _, seErr = in.businessLayer.Namespace.GetNamespace(criteria.Namespace); seErr == nil {
se, seErr = kialiCache.GetIstioResources(kubernetes.ServiceentryType, criteria.Namespace)
}
} else {
se, seErr = in.k8s.GetServiceEntries(criteria.Namespace)
}
if seErr == nil {
(&istioConfigList.ServiceEntries).Parse(se)
} else {
errChan <- seErr
Expand Down
Loading

0 comments on commit 973a34c

Please sign in to comment.