-
Notifications
You must be signed in to change notification settings - Fork 7.6k
/
push_context.go
2478 lines (2179 loc) · 91.8 KB
/
push_context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"cmp"
"encoding/json"
"fmt"
"math"
"sort"
"strings"
"sync"
"time"
"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/types"
extensions "istio.io/api/extensions/v1alpha1"
meshconfig "istio.io/api/mesh/v1alpha1"
networking "istio.io/api/networking/v1alpha3"
"istio.io/api/security/v1beta1"
"istio.io/istio/pilot/pkg/features"
"istio.io/istio/pilot/pkg/serviceregistry/provider"
"istio.io/istio/pkg/cluster"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/config/host"
"istio.io/istio/pkg/config/labels"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/config/schema/kind"
"istio.io/istio/pkg/config/security"
"istio.io/istio/pkg/config/visibility"
"istio.io/istio/pkg/jwt"
"istio.io/istio/pkg/monitoring"
"istio.io/istio/pkg/network"
"istio.io/istio/pkg/slices"
"istio.io/istio/pkg/spiffe"
"istio.io/istio/pkg/util/sets"
"istio.io/istio/pkg/workloadapi"
"istio.io/istio/pkg/xds"
)
// Metrics is an interface for capturing metrics on a per-node basis.
type Metrics interface {
// AddMetric will add an case to the metric for the given node.
AddMetric(metric monitoring.Metric, key string, proxyID, msg string)
}
var _ Metrics = &PushContext{}
// serviceIndex is an index of all services by various fields for easy access during push.
type serviceIndex struct {
// privateByNamespace are services that can reachable within the same namespace, with exportTo "."
privateByNamespace map[string][]*Service
// public are services reachable within the mesh with exportTo "*"
public []*Service
// exportedToNamespace are services that were made visible to this namespace
// by an exportTo explicitly specifying this namespace.
exportedToNamespace map[string][]*Service
// HostnameAndNamespace has all services, indexed by hostname then namespace.
HostnameAndNamespace map[host.Name]map[string]*Service `json:"-"`
// instancesByPort contains a map of service key and instances by port. It is stored here
// to avoid recomputations during push. This caches instanceByPort calls with empty labels.
// Call InstancesByPort directly when instances need to be filtered by actual labels.
instancesByPort map[string]map[int][]*IstioEndpoint
}
func newServiceIndex() serviceIndex {
return serviceIndex{
public: []*Service{},
privateByNamespace: map[string][]*Service{},
exportedToNamespace: map[string][]*Service{},
HostnameAndNamespace: map[host.Name]map[string]*Service{},
instancesByPort: map[string]map[int][]*IstioEndpoint{},
}
}
// exportToDefaults contains the default exportTo values.
type exportToDefaults struct {
service sets.Set[visibility.Instance]
virtualService sets.Set[visibility.Instance]
destinationRule sets.Set[visibility.Instance]
}
// virtualServiceIndex is the index of virtual services by various fields.
type virtualServiceIndex struct {
exportedToNamespaceByGateway map[types.NamespacedName][]config.Config
// this contains all the virtual services with exportTo "." and current namespace. The keys are namespace,gateway.
privateByNamespaceAndGateway map[types.NamespacedName][]config.Config
// This contains all virtual services whose exportTo is "*", keyed by gateway
publicByGateway map[string][]config.Config
// root vs namespace/name ->delegate vs virtualservice gvk/namespace/name
delegates map[ConfigKey][]ConfigKey
// This contains destination hosts of virtual services, keyed by gateway's namespace/name,
// only used when PILOT_FILTER_GATEWAY_CLUSTER_CONFIG is enabled
destinationsByGateway map[string]sets.String
// Map of VS hostname -> referenced hostnames
referencedDestinations map[string]sets.String
}
func newVirtualServiceIndex() virtualServiceIndex {
out := virtualServiceIndex{
publicByGateway: map[string][]config.Config{},
privateByNamespaceAndGateway: map[types.NamespacedName][]config.Config{},
exportedToNamespaceByGateway: map[types.NamespacedName][]config.Config{},
delegates: map[ConfigKey][]ConfigKey{},
referencedDestinations: map[string]sets.String{},
}
if features.FilterGatewayClusterConfig {
out.destinationsByGateway = make(map[string]sets.String)
}
return out
}
// destinationRuleIndex is the index of destination rules by various fields.
type destinationRuleIndex struct {
// namespaceLocal contains all public/private dest rules pertaining to a service defined in a given namespace.
namespaceLocal map[string]*consolidatedDestRules
// exportedByNamespace contains all dest rules pertaining to a service exported by a namespace.
exportedByNamespace map[string]*consolidatedDestRules
rootNamespaceLocal *consolidatedDestRules
}
func newDestinationRuleIndex() destinationRuleIndex {
return destinationRuleIndex{
namespaceLocal: map[string]*consolidatedDestRules{},
exportedByNamespace: map[string]*consolidatedDestRules{},
}
}
// sidecarIndex is the index of sidecar rules
type sidecarIndex struct {
// user configured sidecars for each namespace if available.
sidecarsByNamespace map[string][]*SidecarScope
// the Sidecar for the root namespace (if present). This applies to any namespace without its own Sidecar.
meshRootSidecarConfig *config.Config
// meshRootSidecarsByNamespace contains the default sidecar for namespaces that do not have a sidecar.
// These are converted from root namespace sidecar if it exists.
// These are lazy-loaded. Access protected by derivedSidecarMutex.
meshRootSidecarsByNamespace map[string]*SidecarScope
// defaultSidecarsByNamespace contains the default sidecar for namespaces that do not have a sidecar,
// These are *always* computed from DefaultSidecarScopeForNamespace i.e. a sidecar that has listeners
// for all services in the mesh. This will be used if there is no sidecar specified in root namespace.
// These are lazy-loaded. Access protected by derivedSidecarMutex.
defaultSidecarsByNamespace map[string]*SidecarScope
// sidecarsForGatewayByNamespace contains the default sidecar for gateways and waypoints,
// These are *always* computed from DefaultSidecarScopeForGateway.
// These are lazy-loaded. Access protected by derivedSidecarMutex.
sidecarsForGatewayByNamespace map[string]*SidecarScope
// mutex to protect derived sidecars i.e. not specified by user.
derivedSidecarMutex *sync.RWMutex
}
func newSidecarIndex() sidecarIndex {
return sidecarIndex{
sidecarsByNamespace: map[string][]*SidecarScope{},
meshRootSidecarsByNamespace: map[string]*SidecarScope{},
defaultSidecarsByNamespace: map[string]*SidecarScope{},
sidecarsForGatewayByNamespace: map[string]*SidecarScope{},
derivedSidecarMutex: &sync.RWMutex{},
}
}
// gatewayIndex is the index of gateways by various fields.
type gatewayIndex struct {
// namespace contains gateways by namespace.
namespace map[string][]config.Config
// all contains all gateways.
all []config.Config
}
func newGatewayIndex() gatewayIndex {
return gatewayIndex{
namespace: map[string][]config.Config{},
all: []config.Config{},
}
}
type serviceAccountKey struct {
hostname host.Name
namespace string
}
// PushContext tracks the status of a push - metrics and errors.
// Metrics are reset after a push - at the beginning all
// values are zero, and when push completes the status is reset.
// The struct is exposed in a debug endpoint - fields public to allow
// easy serialization as json.
type PushContext struct {
proxyStatusMutex sync.RWMutex
// ProxyStatus is keyed by the error code, and holds a map keyed
// by the ID.
ProxyStatus map[string]map[string]ProxyPushStatus
// Synthesized from env.Mesh
exportToDefaults exportToDefaults
// ServiceIndex is the index of services by various fields.
ServiceIndex serviceIndex
// serviceAccounts contains a map of hostname and port to service accounts.
serviceAccounts map[serviceAccountKey][]string
// virtualServiceIndex is the index of virtual services by various fields.
virtualServiceIndex virtualServiceIndex
// destinationRuleIndex is the index of destination rules by various fields.
destinationRuleIndex destinationRuleIndex
// gatewayIndex is the index of gateways.
gatewayIndex gatewayIndex
// clusterLocalHosts extracted from the MeshConfig
clusterLocalHosts ClusterLocalHosts
// sidecarIndex stores sidecar resources
sidecarIndex sidecarIndex
// envoy filters for each namespace including global config namespace
envoyFiltersByNamespace map[string][]*EnvoyFilterWrapper
// wasm plugins for each namespace including global config namespace
wasmPluginsByNamespace map[string][]*WasmPluginWrapper
// AuthnPolicies contains Authn policies by namespace.
AuthnPolicies *AuthenticationPolicies `json:"-"`
// AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there
// are no authorization policies in the cluster.
AuthzPolicies *AuthorizationPolicies `json:"-"`
// Telemetry stores the existing Telemetry resources for the cluster.
Telemetry *Telemetries `json:"-"`
// ProxyConfig stores the existing ProxyConfig resources for the cluster.
ProxyConfigs *ProxyConfigs `json:"-"`
// The following data is either a global index or used in the inbound path.
// Namespace specific views do not apply here.
// Mesh configuration for the mesh.
Mesh *meshconfig.MeshConfig `json:"-"`
// PushVersion describes the push version this push context was computed for
PushVersion string
// LedgerVersion is the version of the configuration ledger
LedgerVersion string
// JwtKeyResolver holds a reference to the JWT key resolver instance.
JwtKeyResolver *JwksResolver
// GatewayAPIController holds a reference to the gateway API controller.
GatewayAPIController GatewayController
// cache gateways addresses for each network
// this is mainly used for kubernetes multi-cluster scenario
networkMgr *NetworkManager
Networks *meshconfig.MeshNetworks
InitDone atomic.Bool
initializeMutex sync.Mutex
ambientIndex AmbientIndexes
}
type consolidatedDestRules struct {
// Map of dest rule host to the list of namespaces to which this destination rule has been exported to
exportTo map[host.Name]sets.Set[visibility.Instance]
// Map of dest rule host and the merged destination rules for that host.
// Only stores specific non-wildcard destination rules
specificDestRules map[host.Name][]*ConsolidatedDestRule
// Map of dest rule host and the merged destination rules for that host.
// Only stores wildcard destination rules
wildcardDestRules map[host.Name][]*ConsolidatedDestRule
}
// ConsolidatedDestRule represents a dr and from which it is consolidated.
type ConsolidatedDestRule struct {
// rule is merged from the following destinationRules.
rule *config.Config
// the original dest rules from which above rule is merged.
from []types.NamespacedName
}
// XDSUpdater is used for direct updates of the xDS model and incremental push.
// Pilot uses multiple registries - for example each K8S cluster is a registry
// instance. Each registry is responsible for tracking a set
// of endpoints associated with mesh services, and calling the EDSUpdate on changes.
// A registry may group endpoints for a service in smaller subsets - for example by
// deployment, or to deal with very large number of endpoints for a service. We want
// to avoid passing around large objects - like full list of endpoints for a registry,
// or the full list of endpoints for a service across registries, since it limits
// scalability.
//
// Future optimizations will include grouping the endpoints by labels, gateway or region to
// reduce the time when subsetting or split-horizon is used. This design assumes pilot
// tracks all endpoints in the mesh and they fit in RAM - so limit is few M endpoints.
// It is possible to split the endpoint tracking in future.
type XDSUpdater interface {
// EDSUpdate is called when the list of endpoints or labels in a Service is changed.
// For each cluster and hostname, the full list of active endpoints (including empty list)
// must be sent. The shard name is used as a key - current implementation is using the
// registry name.
EDSUpdate(shard ShardKey, hostname string, namespace string, entry []*IstioEndpoint)
// EDSCacheUpdate is called when the list of endpoints or labels in a Service is changed.
// For each cluster and hostname, the full list of active endpoints (including empty list)
// must be sent. The shard name is used as a key - current implementation is using the
// registry name.
// Note: the difference with `EDSUpdate` is that it only update the cache rather than requesting a push
EDSCacheUpdate(shard ShardKey, hostname string, namespace string, entry []*IstioEndpoint)
// SvcUpdate is called when a service definition is updated/deleted.
SvcUpdate(shard ShardKey, hostname string, namespace string, event Event)
// ConfigUpdate is called to notify the XDS server of config updates and request a push.
// The requests may be collapsed and throttled.
ConfigUpdate(req *PushRequest)
// ProxyUpdate is called to notify the XDS server to send a push to the specified proxy.
// The requests may be collapsed and throttled.
ProxyUpdate(clusterID cluster.ID, ip string)
// RemoveShard removes all endpoints for the given shard key
RemoveShard(shardKey ShardKey)
}
// PushRequest defines a request to push to proxies
// It is used to send updates to the config update debouncer and pass to the PushQueue.
type PushRequest struct {
// Full determines whether a full push is required or not. If false, an incremental update will be sent.
// Incremental pushes:
// * Do not recompute the push context
// * Do not recompute proxy state (such as ServiceInstances)
// * Are not reported in standard metrics such as push time
// As a result, configuration updates should never be incremental. Generally, only EDS will set this, but
// in the future SDS will as well.
Full bool
// ConfigsUpdated keeps track of configs that have changed.
// This is used as an optimization to avoid unnecessary pushes to proxies that are scoped with a Sidecar.
// If this is empty, then all proxies will get an update.
// Otherwise only proxies depend on these configs will get an update.
// The kind of resources are defined in pkg/config/schemas.
ConfigsUpdated sets.Set[ConfigKey]
// Push stores the push context to use for the update. This may initially be nil, as we will
// debounce changes before a PushContext is eventually created.
Push *PushContext
// Start represents the time a push was started. This represents the time of adding to the PushQueue.
// Note that this does not include time spent debouncing.
Start time.Time
// Reason represents the reason for requesting a push. This should only be a fixed set of values,
// to avoid unbounded cardinality in metrics. If this is not set, it may be automatically filled in later.
// There should only be multiple reasons if the push request is the result of two distinct triggers, rather than
// classifying a single trigger as having multiple reasons.
Reason ReasonStats
// Delta defines the resources that were added or removed as part of this push request.
// This is set only on requests from the client which change the set of resources they (un)subscribe from.
Delta ResourceDelta
}
type ResourceDelta = xds.ResourceDelta
type ReasonStats map[TriggerReason]int
func NewReasonStats(reasons ...TriggerReason) ReasonStats {
ret := make(ReasonStats)
for _, reason := range reasons {
ret.Add(reason)
}
return ret
}
func (r ReasonStats) Add(reason TriggerReason) {
r[reason]++
}
func (r ReasonStats) Merge(other ReasonStats) {
for reason, count := range other {
r[reason] += count
}
}
func (r ReasonStats) CopyMerge(other ReasonStats) ReasonStats {
if len(r) == 0 {
return other
}
if len(other) == 0 {
return r
}
merged := make(ReasonStats, len(r)+len(other))
merged.Merge(r)
merged.Merge(other)
return merged
}
func (r ReasonStats) Count() int {
var ret int
for _, count := range r {
ret += count
}
return ret
}
func (r ReasonStats) Has(reason TriggerReason) bool {
return r[reason] > 0
}
type TriggerReason string
// If adding a new reason, update xds/monitoring.go:triggerMetric
const (
// EndpointUpdate describes a push triggered by an Endpoint change
EndpointUpdate TriggerReason = "endpoint"
// HeadlessEndpointUpdate describes a push triggered by an Endpoint change for headless service
HeadlessEndpointUpdate TriggerReason = "headlessendpoint"
// ConfigUpdate describes a push triggered by a config (generally and Istio CRD) change.
ConfigUpdate TriggerReason = "config"
// ServiceUpdate describes a push triggered by a Service change
ServiceUpdate TriggerReason = "service"
// ProxyUpdate describes a push triggered by a change to an individual proxy (such as label change)
ProxyUpdate TriggerReason = "proxy"
// GlobalUpdate describes a push triggered by a change to global config, such as mesh config
GlobalUpdate TriggerReason = "global"
// AmbientUpdate describes a push triggered by a change to ambient mesh config
AmbientUpdate TriggerReason = "ambient"
// UnknownTrigger describes a push triggered by an unknown reason
UnknownTrigger TriggerReason = "unknown"
// DebugTrigger describes a push triggered for debugging
DebugTrigger TriggerReason = "debug"
// SecretTrigger describes a push triggered for a Secret change
SecretTrigger TriggerReason = "secret"
// NetworksTrigger describes a push triggered for Networks change
NetworksTrigger TriggerReason = "networks"
// ProxyRequest describes a push triggered based on proxy request
ProxyRequest TriggerReason = "proxyrequest"
// DependentResource describes a push triggered based on a proxy request for a
// resource that depends on this resource (e.g. a CDS request triggers an EDS response as well)
// This is mainly used in Delta for now.
DependentResource TriggerReason = "depdendentresource"
// NamespaceUpdate describes a push triggered by a Namespace change
NamespaceUpdate TriggerReason = "namespace"
// ClusterUpdate describes a push triggered by a Cluster change
ClusterUpdate TriggerReason = "cluster"
)
// Merge two update requests together
// Merge behaves similarly to a list append; usage should in the form `a = a.merge(b)`.
// Importantly, Merge may decide to allocate a new PushRequest object or reuse the existing one - both
// inputs should not be used after completion.
func (pr *PushRequest) Merge(other *PushRequest) *PushRequest {
if pr == nil {
return other
}
if other == nil {
return pr
}
// Keep the first (older) start time
// Merge the two reasons. Note that we shouldn't deduplicate here, or we would under count
if len(other.Reason) > 0 {
if pr.Reason == nil {
pr.Reason = make(map[TriggerReason]int)
}
pr.Reason.Merge(other.Reason)
}
// If either is full we need a full push
pr.Full = pr.Full || other.Full
// The other push context is presumed to be later and more up to date
if other.Push != nil {
pr.Push = other.Push
}
// Do not merge when any one is empty
if len(pr.ConfigsUpdated) == 0 || len(other.ConfigsUpdated) == 0 {
pr.ConfigsUpdated = nil
} else {
for conf := range other.ConfigsUpdated {
pr.ConfigsUpdated.Insert(conf)
}
}
return pr
}
// CopyMerge two update requests together. Unlike Merge, this will not mutate either input.
// This should be used when we are modifying a shared PushRequest (typically any time it's in the context
// of a single proxy)
func (pr *PushRequest) CopyMerge(other *PushRequest) *PushRequest {
if pr == nil {
return other
}
if other == nil {
return pr
}
var reason ReasonStats
if len(pr.Reason)+len(other.Reason) > 0 {
reason = make(ReasonStats)
reason.Merge(pr.Reason)
reason.Merge(other.Reason)
}
merged := &PushRequest{
// Keep the first (older) start time
Start: pr.Start,
// If either is full we need a full push
Full: pr.Full || other.Full,
// The other push context is presumed to be later and more up to date
Push: other.Push,
// Merge the two reasons. Note that we shouldn't deduplicate here, or we would under count
Reason: reason,
}
// Do not merge when any one is empty
if len(pr.ConfigsUpdated) > 0 && len(other.ConfigsUpdated) > 0 {
merged.ConfigsUpdated = make(sets.Set[ConfigKey], len(pr.ConfigsUpdated)+len(other.ConfigsUpdated))
merged.ConfigsUpdated.Merge(pr.ConfigsUpdated)
merged.ConfigsUpdated.Merge(other.ConfigsUpdated)
}
return merged
}
func (pr *PushRequest) IsRequest() bool {
return len(pr.Reason) == 1 && pr.Reason.Has(ProxyRequest)
}
func (pr *PushRequest) IsProxyUpdate() bool {
return pr.Reason.Has(ProxyUpdate)
}
func (pr *PushRequest) PushReason() string {
if pr.IsRequest() {
return " request"
}
return ""
}
// ProxyPushStatus represents an event captured during config push to proxies.
// It may contain additional message and the affected proxy.
type ProxyPushStatus struct {
Proxy string `json:"proxy,omitempty"`
Message string `json:"message,omitempty"`
}
// AddMetric will add an case to the metric.
func (ps *PushContext) AddMetric(metric monitoring.Metric, key string, proxyID, msg string) {
if ps == nil {
log.Infof("Metric without context %s %v %s", key, proxyID, msg)
return
}
ps.proxyStatusMutex.Lock()
defer ps.proxyStatusMutex.Unlock()
metricMap, f := ps.ProxyStatus[metric.Name()]
if !f {
metricMap = map[string]ProxyPushStatus{}
ps.ProxyStatus[metric.Name()] = metricMap
}
ev := ProxyPushStatus{Message: msg, Proxy: proxyID}
metricMap[key] = ev
}
var (
// EndpointNoPod tracks endpoints without an associated pod. This is an error condition, since
// we can't figure out the labels. It may be a transient problem, if endpoint is processed before
// pod.
EndpointNoPod = monitoring.NewGauge(
"endpoint_no_pod",
"Endpoints without an associated pod.",
)
// ProxyStatusNoService represents proxies not selected by any service
// This can be normal - for workloads that act only as client, or are not covered by a Service.
// It can also be an error, for example in cases the Endpoint list of a service was not updated by the time
// the sidecar calls.
// Updated by GetProxyServiceTargets
ProxyStatusNoService = monitoring.NewGauge(
"pilot_no_ip",
"Pods not found in the endpoint table, possibly invalid.",
)
// ProxyStatusEndpointNotReady represents proxies found not be ready.
// Updated by GetProxyServiceTargets. Normal condition when starting
// an app with readiness, error if it doesn't change to 0.
ProxyStatusEndpointNotReady = monitoring.NewGauge(
"pilot_endpoint_not_ready",
"Endpoint found in unready state.",
)
// ProxyStatusConflictOutboundListenerTCPOverTCP metric tracks number of
// TCP listeners that conflicted with existing TCP listeners on same port
ProxyStatusConflictOutboundListenerTCPOverTCP = monitoring.NewGauge(
"pilot_conflict_outbound_listener_tcp_over_current_tcp",
"Number of conflicting tcp listeners with current tcp listener.",
)
// ProxyStatusConflictInboundListener tracks cases of multiple inbound
// listeners - 2 services selecting the same port of the pod.
ProxyStatusConflictInboundListener = monitoring.NewGauge(
"pilot_conflict_inbound_listener",
"Number of conflicting inbound listeners.",
)
// DuplicatedClusters tracks duplicate clusters seen while computing CDS
DuplicatedClusters = monitoring.NewGauge(
"pilot_duplicate_envoy_clusters",
"Duplicate envoy clusters caused by service entries with same hostname",
)
// DNSNoEndpointClusters tracks dns clusters without endpoints
DNSNoEndpointClusters = monitoring.NewGauge(
"pilot_dns_cluster_without_endpoints",
"DNS clusters without endpoints caused by the endpoint field in "+
"STRICT_DNS type cluster is not set or the corresponding subset cannot select any endpoint",
)
// ProxyStatusClusterNoInstances tracks clusters (services) without workloads.
ProxyStatusClusterNoInstances = monitoring.NewGauge(
"pilot_eds_no_instances",
"Number of clusters without instances.",
)
// DuplicatedDomains tracks rejected VirtualServices due to duplicated hostname.
DuplicatedDomains = monitoring.NewGauge(
"pilot_vservice_dup_domain",
"Virtual services with dup domains.",
)
// DuplicatedSubsets tracks duplicate subsets that we rejected while merging multiple destination rules for same host
DuplicatedSubsets = monitoring.NewGauge(
"pilot_destrule_subsets",
"Duplicate subsets across destination rules for same host",
)
// totalVirtualServices tracks the total number of virtual service
totalVirtualServices = monitoring.NewGauge(
"pilot_virt_services",
"Total virtual services known to pilot.",
)
// LastPushStatus preserves the metrics and data collected during lasts global push.
// It can be used by debugging tools to inspect the push event. It will be reset after each push with the
// new version.
LastPushStatus *PushContext
// LastPushMutex will protect the LastPushStatus
LastPushMutex sync.Mutex
// All metrics we registered.
metrics = []monitoring.Metric{
DNSNoEndpointClusters,
EndpointNoPod,
ProxyStatusNoService,
ProxyStatusEndpointNotReady,
ProxyStatusConflictOutboundListenerTCPOverTCP,
ProxyStatusConflictInboundListener,
DuplicatedClusters,
ProxyStatusClusterNoInstances,
DuplicatedDomains,
DuplicatedSubsets,
}
)
// NewPushContext creates a new PushContext structure to track push status.
func NewPushContext() *PushContext {
return &PushContext{
ServiceIndex: newServiceIndex(),
virtualServiceIndex: newVirtualServiceIndex(),
destinationRuleIndex: newDestinationRuleIndex(),
sidecarIndex: newSidecarIndex(),
envoyFiltersByNamespace: map[string][]*EnvoyFilterWrapper{},
gatewayIndex: newGatewayIndex(),
ProxyStatus: map[string]map[string]ProxyPushStatus{},
serviceAccounts: map[serviceAccountKey][]string{},
}
}
// AddPublicServices adds the services to context public services - mainly used in tests.
func (ps *PushContext) AddPublicServices(services []*Service) {
ps.ServiceIndex.public = append(ps.ServiceIndex.public, services...)
}
// AddServiceInstances adds instances to the context service instances - mainly used in tests.
func (ps *PushContext) AddServiceInstances(service *Service, instances map[int][]*IstioEndpoint) {
svcKey := service.Key()
for port, inst := range instances {
if _, exists := ps.ServiceIndex.instancesByPort[svcKey]; !exists {
ps.ServiceIndex.instancesByPort[svcKey] = make(map[int][]*IstioEndpoint)
}
ps.ServiceIndex.instancesByPort[svcKey][port] = append(ps.ServiceIndex.instancesByPort[svcKey][port], inst...)
}
}
// StatusJSON implements json.Marshaller, with a lock.
func (ps *PushContext) StatusJSON() ([]byte, error) {
if ps == nil {
return []byte{'{', '}'}, nil
}
ps.proxyStatusMutex.RLock()
defer ps.proxyStatusMutex.RUnlock()
return json.MarshalIndent(ps.ProxyStatus, "", " ")
}
// OnConfigChange is called when a config change is detected.
func (ps *PushContext) OnConfigChange() {
LastPushMutex.Lock()
LastPushStatus = ps
LastPushMutex.Unlock()
ps.UpdateMetrics()
}
// UpdateMetrics will update the prometheus metrics based on the
// current status of the push.
func (ps *PushContext) UpdateMetrics() {
ps.proxyStatusMutex.RLock()
defer ps.proxyStatusMutex.RUnlock()
for _, pm := range metrics {
mmap := ps.ProxyStatus[pm.Name()]
pm.Record(float64(len(mmap)))
}
}
// It is called after virtual service short host name is resolved to FQDN
func virtualServiceDestinations(v *networking.VirtualService) map[string]sets.Set[int] {
if v == nil {
return nil
}
out := make(map[string]sets.Set[int])
addDestination := func(host string, port *networking.PortSelector) {
// Use the value 0 as a sentinel indicating that one of the destinations
// in the Virtual Service does not specify a port for this host.
pn := 0
if port != nil {
pn = int(port.Number)
}
sets.InsertOrNew(out, host, pn)
}
for _, h := range v.Http {
for _, r := range h.Route {
if r.Destination != nil {
addDestination(r.Destination.Host, r.Destination.GetPort())
}
}
if h.Mirror != nil {
addDestination(h.Mirror.Host, h.Mirror.GetPort())
}
for _, m := range h.Mirrors {
if m.Destination != nil {
addDestination(m.Destination.Host, m.Destination.GetPort())
}
}
}
for _, t := range v.Tcp {
for _, r := range t.Route {
if r.Destination != nil {
addDestination(r.Destination.Host, r.Destination.GetPort())
}
}
}
for _, t := range v.Tls {
for _, r := range t.Route {
if r.Destination != nil {
addDestination(r.Destination.Host, r.Destination.GetPort())
}
}
}
return out
}
func (ps *PushContext) ExtraWaypointServices(proxy *Proxy) sets.String {
return ps.extraServicesForProxy(proxy)
}
// GatewayServices returns the set of services which are referred from the proxy gateways.
func (ps *PushContext) GatewayServices(proxy *Proxy) []*Service {
svcs := proxy.SidecarScope.services
// MergedGateway will be nil when there are no configs in the
// system during initial installation.
if proxy.MergedGateway == nil {
return nil
}
// host set.
hostsFromGateways := ps.extraServicesForProxy(proxy)
for _, gw := range proxy.MergedGateway.GatewayNameForServer {
hostsFromGateways.Merge(ps.virtualServiceIndex.destinationsByGateway[gw])
}
log.Debugf("GatewayServices: gateway %v is exposing these hosts:%v", proxy.ID, hostsFromGateways)
gwSvcs := make([]*Service, 0, len(svcs))
for _, s := range svcs {
svcHost := string(s.Hostname)
if _, ok := hostsFromGateways[svcHost]; ok {
gwSvcs = append(gwSvcs, s)
}
}
log.Debugf("GatewayServices: gateways len(services)=%d, len(filtered)=%d", len(svcs), len(gwSvcs))
return gwSvcs
}
func (ps *PushContext) ServicesAttachedToMesh() map[string]sets.String {
return ps.virtualServiceIndex.referencedDestinations
}
func (ps *PushContext) ServiceAttachedToGateway(hostname string, proxy *Proxy) bool {
gw := proxy.MergedGateway
// MergedGateway will be nil when there are no configs in the
// system during initial installation.
if gw == nil {
return false
}
if gw.ContainsAutoPassthroughGateways {
return true
}
for _, g := range gw.GatewayNameForServer {
if hosts := ps.virtualServiceIndex.destinationsByGateway[g]; hosts != nil {
if hosts.Contains(hostname) {
return true
}
}
}
return ps.extraServicesForProxy(proxy).Contains(hostname)
}
// wellknownProviders is a list of all known providers.
// This exists
var wellknownProviders = sets.New(
"envoy_ext_authz_http",
"envoy_ext_authz_grpc",
"zipkin",
"lightstep",
"datadog",
"opencensus",
"stackdriver",
"prometheus",
"skywalking",
"envoy_http_als",
"envoy_tcp_als",
"envoy_otel_als",
"opentelemetry",
"envoy_file_access_log",
)
func AssertProvidersHandled(expected int) {
if expected != len(wellknownProviders) {
panic(fmt.Sprintf("Not all providers handled; This function handles %v but there are %v known providers", expected, len(wellknownProviders)))
}
}
// addHostsFromMeshConfigProvidersHandled contains the number of providers we handle below.
// This is to ensure this stays in sync as new handlers are added
// STOP. DO NOT UPDATE THIS WITHOUT UPDATING extraServicesForProxy.
const addHostsFromMeshConfigProvidersHandled = 14
// extraServicesForProxy returns a subset of services referred from the proxy gateways, including:
// 1. MeshConfig.ExtensionProviders
// 2. RequestAuthentication.JwtRules.JwksUri
// TODO: include cluster from EnvoyFilter such as global ratelimit [demo](https://istio.io/latest/docs/tasks/policy-enforcement/rate-limit/#global-rate-limit)
func (ps *PushContext) extraServicesForProxy(proxy *Proxy) sets.String {
hosts := sets.String{}
// add services from MeshConfig.ExtensionProviders
for _, prov := range ps.Mesh.ExtensionProviders {
switch p := prov.Provider.(type) {
case *meshconfig.MeshConfig_ExtensionProvider_EnvoyExtAuthzHttp:
hosts.Insert(p.EnvoyExtAuthzHttp.Service)
case *meshconfig.MeshConfig_ExtensionProvider_EnvoyExtAuthzGrpc:
hosts.Insert(p.EnvoyExtAuthzGrpc.Service)
case *meshconfig.MeshConfig_ExtensionProvider_Zipkin:
hosts.Insert(p.Zipkin.Service)
//nolint: staticcheck // Lightstep deprecated
case *meshconfig.MeshConfig_ExtensionProvider_Lightstep:
hosts.Insert(p.Lightstep.Service)
case *meshconfig.MeshConfig_ExtensionProvider_Datadog:
hosts.Insert(p.Datadog.Service)
case *meshconfig.MeshConfig_ExtensionProvider_Skywalking:
hosts.Insert(p.Skywalking.Service)
case *meshconfig.MeshConfig_ExtensionProvider_Opencensus:
//nolint: staticcheck
hosts.Insert(p.Opencensus.Service)
case *meshconfig.MeshConfig_ExtensionProvider_Opentelemetry:
hosts.Insert(p.Opentelemetry.Service)
case *meshconfig.MeshConfig_ExtensionProvider_EnvoyHttpAls:
hosts.Insert(p.EnvoyHttpAls.Service)
case *meshconfig.MeshConfig_ExtensionProvider_EnvoyTcpAls:
hosts.Insert(p.EnvoyTcpAls.Service)
case *meshconfig.MeshConfig_ExtensionProvider_EnvoyOtelAls:
hosts.Insert(p.EnvoyOtelAls.Service)
case *meshconfig.MeshConfig_ExtensionProvider_EnvoyFileAccessLog: // No services
case *meshconfig.MeshConfig_ExtensionProvider_Prometheus: // No services
case *meshconfig.MeshConfig_ExtensionProvider_Stackdriver: // No services
}
}
// add services from RequestAuthentication.JwtRules.JwksUri
if features.JwksFetchMode != jwt.Istiod {
forWorkload := PolicyMatcherForProxy(proxy)
jwtPolicies := ps.AuthnPolicies.GetJwtPoliciesForWorkload(forWorkload)
for _, cfg := range jwtPolicies {
rules := cfg.Spec.(*v1beta1.RequestAuthentication).JwtRules
for _, r := range rules {
if uri := r.GetJwksUri(); len(uri) > 0 {
jwksInfo, err := security.ParseJwksURI(uri)
if err == nil {
hosts.Insert(jwksInfo.Hostname.String())
}
}
}
}
}
return hosts
}
// servicesExportedToNamespace returns the list of services that are visible to a namespace.
// namespace "" indicates all namespaces
func (ps *PushContext) servicesExportedToNamespace(ns string) []*Service {
var out []*Service
// First add private services and explicitly exportedTo services
if ns == NamespaceAll {
out = make([]*Service, 0, len(ps.ServiceIndex.privateByNamespace)+len(ps.ServiceIndex.public))
for _, privateServices := range ps.ServiceIndex.privateByNamespace {
out = append(out, privateServices...)
}
} else {
out = make([]*Service, 0, len(ps.ServiceIndex.privateByNamespace[ns])+
len(ps.ServiceIndex.exportedToNamespace[ns])+len(ps.ServiceIndex.public))
out = append(out, ps.ServiceIndex.privateByNamespace[ns]...)
out = append(out, ps.ServiceIndex.exportedToNamespace[ns]...)
}
// Second add public services
out = append(out, ps.ServiceIndex.public...)
return out
}
// GetAllServices returns the total services within the mesh.
// Note: per proxy services should use SidecarScope.Services.
func (ps *PushContext) GetAllServices() []*Service {
return ps.servicesExportedToNamespace(NamespaceAll)
}
// ServiceForHostname returns the service associated with a given hostname following SidecarScope
func (ps *PushContext) ServiceForHostname(proxy *Proxy, hostname host.Name) *Service {
if proxy != nil && proxy.SidecarScope != nil {
return proxy.SidecarScope.servicesByHostname[hostname]
}
// SidecarScope shouldn't be null here. If it is, we can't disambiguate the hostname to use for a namespace,
// so the selection must be undefined.
for _, service := range ps.ServiceIndex.HostnameAndNamespace[hostname] {
return service
}
// No service found
return nil
}
// IsServiceVisible returns true if the input service is visible to the given namespace.
func (ps *PushContext) IsServiceVisible(service *Service, namespace string) bool {