/
services_cache.go
78 lines (60 loc) · 1.76 KB
/
services_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package controllers
import (
"context"
"time"
"sync/atomic"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type mapServiceCache map[string]*corev1.Service
type serviceCache struct {
client.Client
allServices atomic.Pointer[mapServiceCache]
allServicesExpires atomic.Pointer[time.Time]
}
func (s *serviceCache) GetByIP(ctx context.Context, ip string) (*corev1.Service, error) {
allServices := s.allServices.Load()
expires := s.allServicesExpires.Load()
if allServices == nil || expires == nil || expires.After(time.Now().UTC()) {
var err error
allServices, err = s.fillCache(ctx)
if err != nil {
return nil, err
}
}
return (*allServices)[ip], nil
}
func (s *serviceCache) fillCache(ctx context.Context) (*mapServiceCache, error) {
allServices := corev1.ServiceList{}
err := s.Client.List(ctx, &allServices, &client.ListOptions{Namespace: metav1.NamespaceAll})
if err != nil {
return nil, err
}
cache := mapServiceCache{}
// store also cluster IPs
for i, service := range allServices.Items {
if service.Spec.ClusterIP != "" {
cache[service.Spec.ClusterIP] = &allServices.Items[i]
}
for _, clusterIP := range service.Spec.ClusterIPs {
cache[clusterIP] = &allServices.Items[i]
}
}
for i, service := range allServices.Items {
if service.Spec.Type != corev1.ServiceTypeLoadBalancer {
continue
}
if len(service.Status.LoadBalancer.Ingress) == 0 {
continue
}
if service.Status.LoadBalancer.Ingress[0].IP == "" {
continue
}
cache[service.Status.LoadBalancer.Ingress[0].IP] = &allServices.Items[i]
}
s.allServices.Store(&cache)
expires := time.Now().UTC().Add(time.Minute * 15)
s.allServicesExpires.Store(&expires)
return &cache, err
}