/
resolver_k8s_handler.go
112 lines (101 loc) · 3.1 KB
/
resolver_k8s_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"
import (
"context"
"sync"
"go.opencensus.io/stats"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
var _ cache.ResourceEventHandler = (*handler)(nil)
type handler struct {
endpoints *sync.Map
callback func(ctx context.Context) ([]string, error)
logger *zap.Logger
}
func (h handler) OnAdd(obj any, _ bool) {
var endpoints []string
switch object := obj.(type) {
case *corev1.Endpoints:
endpoints = convertToEndpoints(object)
default: // unsupported
h.logger.Warn("Got an unexpected Kubernetes data type during the inclusion of a new pods for the service", zap.Any("obj", obj))
_ = stats.RecordWithTags(context.Background(), k8sResolverSuccessFalseMutators, mNumResolutions.M(1))
return
}
changed := false
for _, ep := range endpoints {
if _, loaded := h.endpoints.LoadOrStore(ep, true); !loaded {
changed = true
}
}
if changed {
_, _ = h.callback(context.Background())
}
}
func (h handler) OnUpdate(oldObj, newObj any) {
switch oldEps := oldObj.(type) {
case *corev1.Endpoints:
epRemove := convertToEndpoints(oldEps)
for _, ep := range epRemove {
h.endpoints.Delete(ep)
}
if len(epRemove) > 0 {
_, _ = h.callback(context.Background())
}
newEps, ok := newObj.(*corev1.Endpoints)
if !ok {
h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", newObj))
_ = stats.RecordWithTags(context.Background(), k8sResolverSuccessFalseMutators, mNumResolutions.M(1))
return
}
changed := false
for _, ep := range convertToEndpoints(newEps) {
if _, loaded := h.endpoints.LoadOrStore(ep, true); !loaded {
changed = true
}
}
if changed {
_, _ = h.callback(context.Background())
}
default: // unsupported
h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", oldObj))
_ = stats.RecordWithTags(context.Background(), k8sResolverSuccessFalseMutators, mNumResolutions.M(1))
return
}
}
func (h handler) OnDelete(obj any) {
var endpoints []string
switch object := obj.(type) {
case *cache.DeletedFinalStateUnknown:
h.OnDelete(object.Obj)
return
case *corev1.Endpoints:
if object != nil {
endpoints = convertToEndpoints(object)
}
default: // unsupported
h.logger.Warn("Got an unexpected Kubernetes data type during the removal of the pods for a service", zap.Any("obj", obj))
_ = stats.RecordWithTags(context.Background(), k8sResolverSuccessFalseMutators, mNumResolutions.M(1))
return
}
if len(endpoints) != 0 {
for _, endpoint := range endpoints {
h.endpoints.Delete(endpoint)
}
_, _ = h.callback(context.Background())
}
}
func convertToEndpoints(eps ...*corev1.Endpoints) []string {
var ipAddress []string
for _, ep := range eps {
for _, subsets := range ep.Subsets {
for _, addr := range subsets.Addresses {
ipAddress = append(ipAddress, addr.IP)
}
}
}
return ipAddress
}