/
controlplane_monitor.go
510 lines (448 loc) · 18.1 KB
/
controlplane_monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
package business
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/kiali/kiali/config"
"github.com/kiali/kiali/kubernetes"
"github.com/kiali/kiali/kubernetes/cache"
"github.com/kiali/kiali/log"
)
// ControlPlaneMonitor is an interface for the control plane monitor.
// This is an interface solely for testing purposes since we need to mock
// out portforwarding and polling.
type ControlPlaneMonitor interface {
PollIstiodForProxyStatus(ctx context.Context)
CanConnectToIstiod(client kubernetes.ClientInterface) (kubernetes.IstioComponentStatus, error)
CanConnectToIstiodForRevision(client kubernetes.ClientInterface, revision string) (kubernetes.IstioComponentStatus, error)
// RefreshIstioCache should update the kiali cache's istio related stores.
RefreshIstioCache(ctx context.Context) error
}
func NewControlPlaneMonitor(cache cache.KialiCache, clientFactory kubernetes.ClientFactory, conf config.Config, meshService *MeshService) *controlPlaneMonitor {
return &controlPlaneMonitor{
cache: cache,
clientFactory: clientFactory,
conf: conf,
pollingInterval: time.Duration(conf.ExternalServices.Istio.IstiodPollingIntervalSeconds) * time.Second,
meshService: meshService,
}
}
// controlPlaneMonitor will periodically scrape the debug endpoint(s) of istiod.
// It scrapes a single pod from each controlplane. The list of controlplanes
// comes from the kialiCache. It will update the kialiCache with the info
// that it scrapes.
type controlPlaneMonitor struct {
// Where we store the proxy status.
cache cache.KialiCache
// Used for getting the Kiali Service Account clients for all clusters.
// Since these can change when clusters are added/removed we want to get
// these directly from the client factory rather than passing in a static list.
clientFactory kubernetes.ClientFactory
conf config.Config
meshService *MeshService
pollingInterval time.Duration
}
// RefreshIstioCache will scrape the debug endpoint(s) of istiod a single time
// and update the kialiCache. The proxy status and the registry services are
// scraped from the debug endpoint.
func (p *controlPlaneMonitor) RefreshIstioCache(ctx context.Context) error {
log.Debug("Scraping istiod for debug info")
ctx, cancel := context.WithTimeout(ctx, p.pollingInterval)
defer cancel()
mesh, err := p.meshService.GetMesh(ctx)
if err != nil {
return fmt.Errorf("unable to get mesh when refreshing istio cache: %s", err)
}
// Get the list of controlplanes we are polling.
revisionsPerCluster := map[string][]ControlPlane{}
for _, controlPlane := range mesh.ControlPlanes {
clusterName := controlPlane.Cluster.Name
revisionsPerCluster[clusterName] = append(revisionsPerCluster[clusterName], controlPlane)
}
// Proxy status endpoint has unique results per controlplane whereas services/config are duplicated across
// all controlplanes for that cluster so we'll get the proxy status per controlplane e.g. from both istiod-rev-1
// and istiod-rev-2 but the services will only be gotten from one of the istiods.
var proxyStatus []*kubernetes.ProxyStatus
registryStatus := make(map[string]*kubernetes.RegistryStatus)
for cluster, controlPlanes := range revisionsPerCluster {
client := p.clientFactory.GetSAClient(cluster)
if client == nil {
log.Errorf("client for cluster [%s] does not exist", cluster)
// Even if one cluster is down we're going to continue to try and get results for the rest.
continue
}
// Retry roughly once. Context set to timeout after p.interval should cancel before any subsequent retries.
interval := p.pollingInterval / 2
for _, controlPlane := range controlPlanes {
pstatus, err := p.getProxyStatusWithRetry(ctx, interval, client, controlPlane.Revision, controlPlane.IstiodNamespace)
if err != nil {
log.Warningf("Unable to get proxy status from istiod for revision: [%s] and cluster: [%s]. Proxy status may be stale: %s", controlPlane.Revision, client.ClusterInfo().Name, err)
continue
}
proxyStatus = append(proxyStatus, pstatus...)
}
// Services can just be done once per cluster since these are shared across revisions
// Whereas the proxy status is per revision.
if len(controlPlanes) > 0 {
// Since it doesn't matter what revision we choose, just choose the first one.
controlPlane := controlPlanes[0]
status := &kubernetes.RegistryStatus{}
services, err := p.getServicesWithRetry(ctx, interval, client, controlPlane.Revision, controlPlane.IstiodNamespace)
if err != nil {
log.Warningf("Unable to get registry services from istiod for revision: [%s] and cluster: [%s]. Registry services may be stale: %s", controlPlane.Revision, client.ClusterInfo().Name, err)
continue
}
status.Services = services
registryStatus[cluster] = status
}
}
p.cache.SetRegistryStatus(registryStatus)
p.cache.SetPodProxyStatus(proxyStatus)
return nil
}
func (p *controlPlaneMonitor) PollIstiodForProxyStatus(ctx context.Context) {
log.Debugf("Starting polling istiod(s) every %d seconds for proxy status", p.conf.ExternalServices.Istio.IstiodPollingIntervalSeconds)
// Prime the pump once by calling refresh immediately here. Any errors are just logged
// because they could be transient and we'll try again on the next interval.
if err := p.RefreshIstioCache(ctx); err != nil {
log.Errorf("Unable to refresh istio cache: %s", err)
}
go func() {
for {
select {
case <-ctx.Done():
log.Debug("Stopping polling for istiod(s) proxy status")
return
case <-time.After(p.pollingInterval):
if err := p.RefreshIstioCache(ctx); err != nil {
log.Errorf("Unable to refresh istio cache: %s", err)
}
}
}
}()
}
func (p *controlPlaneMonitor) getProxyStatusWithRetry(ctx context.Context, interval time.Duration, client kubernetes.ClientInterface, revision string, namespace string) ([]*kubernetes.ProxyStatus, error) {
var (
proxyStatus []*kubernetes.ProxyStatus
err error
)
retryErr := wait.PollUntilContextCancel(ctx, interval, true, func(ctx context.Context) (bool, error) {
log.Tracef("Getting proxy status from istiod in cluster [%s] for revision [%s]", client.ClusterInfo().Name, revision)
var err error
proxyStatus, err = p.getProxyStatus(client, revision, namespace)
if err != nil {
return false, nil
}
return true, nil
})
if retryErr != nil {
log.Warningf("Error getting proxy status from istiod. Proxy status may be stale. Err: %v", err)
return nil, err
}
return proxyStatus, nil
}
func (p *controlPlaneMonitor) getServicesWithRetry(ctx context.Context, interval time.Duration, client kubernetes.ClientInterface, revision string, namespace string) ([]*kubernetes.RegistryService, error) {
var (
registryServices []*kubernetes.RegistryService
err error
)
retryErr := wait.PollUntilContextCancel(ctx, interval, true, func(ctx context.Context) (bool, error) {
log.Tracef("Getting services from istiod in cluster [%s] for revision [%s]", client.ClusterInfo().Name, revision)
var err error
registryServices, err = p.getRegistryServices(client, revision, namespace)
if err != nil {
return false, nil
}
return true, nil
})
if retryErr != nil {
log.Warningf("Error getting proxy status from istiod. Proxy status may be stale. Err: %v", err)
return nil, err
}
return registryServices, nil
}
func joinURL(base, path string) string {
base = strings.TrimSuffix(base, "/")
path = strings.TrimPrefix(path, "/")
return base + "/" + path
}
func getRequest(url string) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("unable to read response body when getting config from remote istiod. Err: %s", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("bad response when getting config from remote istiod. Status: %s. Body: %s", resp.Status, body)
}
return body, err
}
func (p *controlPlaneMonitor) getIstiodDebugStatus(client kubernetes.ClientInterface, revision string, namespace string, debugPath string) (map[string][]byte, error) {
// Check if the kube-api has proxy access to pods in the istio-system
// https://github.com/kiali/kiali/issues/3494#issuecomment-772486224
status, err := p.canConnectToIstiodForRevision(client, revision, namespace)
if err != nil {
return nil, fmt.Errorf("unable to connect to Istiod pods on cluster [%s] for revision [%s]: %s", client.ClusterInfo().Name, revision, err.Error())
}
istiodReachable := false
for _, istiodStatus := range status {
if istiodStatus.Status != kubernetes.ComponentUnreachable {
istiodReachable = true
break
}
}
if !istiodReachable {
return nil, fmt.Errorf("unable to proxy Istiod pods. " +
"Make sure your Kubernetes API server has access to the Istio control plane through 8080 port")
}
var healthyIstiods kubernetes.IstioComponentStatus
for _, istiod := range status {
if istiod.Status == kubernetes.ComponentHealthy {
healthyIstiods = append(healthyIstiods, istiod)
}
}
wg := sync.WaitGroup{}
wg.Add(len(healthyIstiods))
errChan := make(chan error, len(healthyIstiods))
syncChan := make(chan map[string][]byte, len(healthyIstiods))
result := map[string][]byte{}
for _, istiod := range healthyIstiods {
go func(name, namespace string) {
defer wg.Done()
// The 15014 port on Istiod is open for control plane monitoring.
// Here's the Istio doc page about the port usage by istio:
// https://istio.io/latest/docs/ops/deployment/requirements/#ports-used-by-istio
res, err := client.ForwardGetRequest(namespace, name, p.conf.ExternalServices.Istio.IstiodPodMonitoringPort, debugPath)
if err != nil {
errChan <- fmt.Errorf("%s: %s", name, err.Error())
} else {
syncChan <- map[string][]byte{name: res}
}
}(istiod.Name, istiod.Namespace)
}
wg.Wait()
close(errChan)
close(syncChan)
errs := ""
for err := range errChan {
if errs != "" {
errs = errs + "; "
}
errs = errs + err.Error()
}
errs = "Error fetching the proxy-status in the following pods: " + errs
for status := range syncChan {
for pilot, sync := range status {
result[pilot] = sync
}
}
if len(result) > 0 {
return result, nil
} else {
return nil, errors.New(errs)
}
}
// CanConnectToIstiod checks if Kiali can reach the istiod pod(s) via port
// fowarding through the k8s api server or via http if the registry is
// configured with a remote url. An error does not indicate that istiod
// cannot be reached. The kubernetes.IstioComponentStatus must be checked.
func (p *controlPlaneMonitor) CanConnectToIstiodForRevision(client kubernetes.ClientInterface, revision string) (kubernetes.IstioComponentStatus, error) {
return p.canConnectToIstiodForRevision(client, revision, p.conf.IstioNamespace)
}
// CanConnectToIstiod checks if Kiali can reach the istiod pod(s) via port
// fowarding through the k8s api server or via http if the registry is
// configured with a remote url. An error does not indicate that istiod
// cannot be reached. The kubernetes.IstioComponentStatus must be checked.
func (p *controlPlaneMonitor) canConnectToIstiodForRevision(client kubernetes.ClientInterface, revision string, namespace string) (kubernetes.IstioComponentStatus, error) {
if p.conf.ExternalServices.Istio.Registry != nil && p.conf.ExternalServices.Istio.Registry.IstiodURL != "" {
istiodURL := p.conf.ExternalServices.Istio.Registry.IstiodURL
// Being able to hit /debug doesn't necessarily mean we are authorized to hit the others.
url := joinURL(istiodURL, "/debug")
if _, err := getRequest(url); err != nil {
log.Warningf("Kiali can't connect to remote Istiod: %s", err)
return kubernetes.IstioComponentStatus{{Name: istiodURL, Status: kubernetes.ComponentUnreachable, IsCore: true}}, nil
}
return kubernetes.IstioComponentStatus{{Name: istiodURL, Status: kubernetes.ComponentHealthy, IsCore: true}}, nil
}
kubeCache, err := p.cache.GetKubeCache(client.ClusterInfo().Name)
if err != nil {
return nil, err
}
podLabels := map[string]string{
"app": "istiod",
IstioRevisionLabel: revision,
}
istiods, err := kubeCache.GetPods(namespace, labels.Set(podLabels).String())
if err != nil {
return nil, err
}
healthyIstiods := make([]*corev1.Pod, 0, len(istiods))
for i, istiod := range istiods {
if istiod.Status.Phase == corev1.PodRunning {
healthyIstiods = append(healthyIstiods, &istiods[i])
}
}
wg := sync.WaitGroup{}
wg.Add(len(healthyIstiods))
syncChan := make(chan kubernetes.ComponentStatus, len(healthyIstiods))
for _, istiod := range healthyIstiods {
go func(name, namespace string) {
defer wg.Done()
var status kubernetes.ComponentStatus
// The 8080 port is not accessible from outside of the pod. However, it is used for kubernetes to do the live probes.
// Using the proxy method to make sure that K8s API has access to the Istio Control Plane namespace.
// By proxying one Istiod, we ensure that the following connection is allowed:
// Kiali -> K8s API (proxy) -> istiod
// This scenario is not obvious for private clusters (like GKE private cluster)
_, err := client.ForwardGetRequest(namespace, name, 8080, "/ready")
if err != nil {
log.Warningf("Unable to get ready status of istiod: %s/%s. Err: %s", namespace, name, err)
status = kubernetes.ComponentStatus{
Name: name,
Namespace: namespace,
Status: kubernetes.ComponentUnreachable,
IsCore: true,
}
} else {
status = kubernetes.ComponentStatus{
Name: name,
Namespace: namespace,
Status: kubernetes.ComponentHealthy,
IsCore: true,
}
}
syncChan <- status
}(istiod.Name, istiod.Namespace)
}
wg.Wait()
close(syncChan)
ics := kubernetes.IstioComponentStatus{}
for componentStatus := range syncChan {
ics.Merge(kubernetes.IstioComponentStatus{componentStatus})
}
return ics, nil
}
func (p *controlPlaneMonitor) CanConnectToIstiod(client kubernetes.ClientInterface) (kubernetes.IstioComponentStatus, error) {
if p.conf.ExternalServices.Istio.Registry != nil && p.conf.ExternalServices.Istio.Registry.IstiodURL != "" {
istiodURL := p.conf.ExternalServices.Istio.Registry.IstiodURL
// Being able to hit /debug doesn't necessarily mean we are authorized to hit the others.
url := joinURL(istiodURL, "/debug")
if _, err := getRequest(url); err != nil {
log.Warningf("Kiali can't connect to remote Istiod: %s", err)
return kubernetes.IstioComponentStatus{{Name: istiodURL, Status: kubernetes.ComponentUnreachable, IsCore: true}}, nil
}
return kubernetes.IstioComponentStatus{{Name: istiodURL, Status: kubernetes.ComponentHealthy, IsCore: true}}, nil
}
kubeCache, err := p.cache.GetKubeCache(client.ClusterInfo().Name)
if err != nil {
return nil, err
}
// Find the rev label for the controlplane that is set in the config.
istiod, err := kubeCache.GetDeployment(p.conf.IstioNamespace, p.conf.ExternalServices.Istio.IstiodDeploymentName)
if err != nil {
return nil, err
}
return p.CanConnectToIstiodForRevision(client, istiod.Labels[IstioRevisionLabel])
}
func parseProxyStatus(statuses map[string][]byte) ([]*kubernetes.ProxyStatus, error) {
var fullStatus []*kubernetes.ProxyStatus
for pilot, status := range statuses {
var ss []*kubernetes.ProxyStatus
err := json.Unmarshal(status, &ss)
if err != nil {
return nil, err
}
for _, s := range ss {
s.Pilot = pilot
}
fullStatus = append(fullStatus, ss...)
}
return fullStatus, nil
}
func (p *controlPlaneMonitor) getProxyStatus(client kubernetes.ClientInterface, revision string, namespace string) ([]*kubernetes.ProxyStatus, error) {
const synczPath = "/debug/syncz"
var result map[string][]byte
if externalConf := p.conf.ExternalServices.Istio.Registry; externalConf != nil && externalConf.IstiodURL != "" {
url := joinURL(externalConf.IstiodURL, synczPath)
r, err := getRequest(url)
if err != nil {
log.Errorf("Failed to get Istiod info from remote endpoint %s error: %s", synczPath, err)
return nil, err
}
result = map[string][]byte{"remote": r}
} else {
debugStatus, err := p.getIstiodDebugStatus(client, revision, namespace, synczPath)
if err != nil {
log.Errorf("Failed to call Istiod endpoint %s error: %s", synczPath, err)
return nil, err
}
result = debugStatus
}
return parseProxyStatus(result)
}
func (p *controlPlaneMonitor) getRegistryServices(client kubernetes.ClientInterface, revision string, namespace string) ([]*kubernetes.RegistryService, error) {
const registryzPath = "/debug/registryz"
var result map[string][]byte
if externalConf := p.conf.ExternalServices.Istio.Registry; externalConf != nil && externalConf.IstiodURL != "" {
url := joinURL(externalConf.IstiodURL, registryzPath)
r, err := getRequest(url)
if err != nil {
log.Errorf("Failed to get Istiod info from remote endpoint %s error: %s", registryzPath, err)
return nil, err
}
result = map[string][]byte{"remote": r}
} else {
debugStatus, err := p.getIstiodDebugStatus(client, revision, namespace, registryzPath)
if err != nil {
log.Errorf("Failed to call Istiod endpoint %s error: %s", registryzPath, err)
return nil, err
}
result = debugStatus
}
return parseRegistryServices(result)
}
func parseRegistryServices(registries map[string][]byte) ([]*kubernetes.RegistryService, error) {
var fullRegistryServices []*kubernetes.RegistryService
isRegistryLoaded := false
for pilot, registry := range registries {
// skip reading registry configs multiple times in a case of multiple istiod pods
if isRegistryLoaded {
break
}
var rr []*kubernetes.RegistryService
err := json.Unmarshal(registry, &rr)
if err != nil {
log.Errorf("Error parsing RegistryServices results: %s", err)
return nil, err
}
for _, r := range rr {
r.Pilot = pilot
}
fullRegistryServices = append(fullRegistryServices, rr...)
if len(rr) > 0 {
isRegistryLoaded = true
}
}
return fullRegistryServices, nil
}
// Interface guards
var _ ControlPlaneMonitor = &controlPlaneMonitor{}