forked from joeholley/supergloo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcache.go
115 lines (97 loc) · 4.12 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package smi
import (
"context"
"sync"
"time"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/kube/controller"
accessclient "github.com/solo-io/supergloo/imported/deislabs/smi-sdk-go/pkg/gen/client/access/clientset/versioned"
accessinformer "github.com/solo-io/supergloo/imported/deislabs/smi-sdk-go/pkg/gen/client/access/informers/externalversions"
accessv1alpha1 "github.com/solo-io/supergloo/imported/deislabs/smi-sdk-go/pkg/gen/client/access/listers/access/v1alpha1"
specsclient "github.com/solo-io/supergloo/imported/deislabs/smi-sdk-go/pkg/gen/client/specs/clientset/versioned"
specsinformer "github.com/solo-io/supergloo/imported/deislabs/smi-sdk-go/pkg/gen/client/specs/informers/externalversions"
specsv1alpha1 "github.com/solo-io/supergloo/imported/deislabs/smi-sdk-go/pkg/gen/client/specs/listers/specs/v1alpha1"
splitclient "github.com/solo-io/supergloo/imported/deislabs/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
splitinformer "github.com/solo-io/supergloo/imported/deislabs/smi-sdk-go/pkg/gen/client/split/informers/externalversions"
splitv1alpha1 "github.com/solo-io/supergloo/imported/deislabs/smi-sdk-go/pkg/gen/client/split/listers/split/v1alpha1"
)
type Cache interface {
TrafficTargetLister() accessv1alpha1.TrafficTargetLister
HTTPRouteGroupLister() specsv1alpha1.HTTPRouteGroupLister
TrafficSplitLister() splitv1alpha1.TrafficSplitLister
Subscribe() <-chan struct{}
Unsubscribe(<-chan struct{})
}
type smiCache struct {
TrafficTargets accessv1alpha1.TrafficTargetLister
HTTPRouteGroups specsv1alpha1.HTTPRouteGroupLister
TrafficSplits splitv1alpha1.TrafficSplitLister
cacheUpdatedWatchers []chan struct{}
cacheUpdatedWatchersMutex sync.Mutex
}
// This context should live as long as the cache is desired. i.e. if the cache is shared
// across clients, it should get a context that has a longer lifetime than the clients themselves
func NewSMICache(ctx context.Context,
accessClient accessclient.Interface,
specsClient specsclient.Interface,
splitClient splitclient.Interface) (*smiCache, error) {
resyncDuration := 12 * time.Hour
accessInformerFactory := accessinformer.NewSharedInformerFactory(accessClient, resyncDuration)
specsInformerFactory := specsinformer.NewSharedInformerFactory(specsClient, resyncDuration)
splitInformerFactory := splitinformer.NewSharedInformerFactory(splitClient, resyncDuration)
trafficTargets := accessInformerFactory.Access().V1alpha1().TrafficTargets()
httpRouteGroups := specsInformerFactory.Specs().V1alpha1().HTTPRouteGroups()
trafficSplits := splitInformerFactory.Smispec().V1alpha1().TrafficSplits()
k := &smiCache{
TrafficTargets: trafficTargets.Lister(),
HTTPRouteGroups: httpRouteGroups.Lister(),
TrafficSplits: trafficSplits.Lister(),
}
kubeController := controller.NewController("linkerd-resources-cache",
controller.NewLockingSyncHandler(k.updatedOccured),
trafficTargets.Informer(),
httpRouteGroups.Informer(),
trafficSplits.Informer(),
)
stop := ctx.Done()
err := kubeController.Run(2, stop)
if err != nil {
return nil, err
}
return k, nil
}
func (k *smiCache) TrafficTargetLister() accessv1alpha1.TrafficTargetLister {
return k.TrafficTargets
}
func (k *smiCache) HTTPRouteGroupLister() specsv1alpha1.HTTPRouteGroupLister {
return k.HTTPRouteGroups
}
func (k *smiCache) TrafficSplitLister() splitv1alpha1.TrafficSplitLister {
return k.TrafficSplits
}
func (k *smiCache) Subscribe() <-chan struct{} {
k.cacheUpdatedWatchersMutex.Lock()
defer k.cacheUpdatedWatchersMutex.Unlock()
c := make(chan struct{}, 10)
k.cacheUpdatedWatchers = append(k.cacheUpdatedWatchers, c)
return c
}
func (k *smiCache) Unsubscribe(c <-chan struct{}) {
k.cacheUpdatedWatchersMutex.Lock()
defer k.cacheUpdatedWatchersMutex.Unlock()
for i, cacheUpdated := range k.cacheUpdatedWatchers {
if cacheUpdated == c {
k.cacheUpdatedWatchers = append(k.cacheUpdatedWatchers[:i], k.cacheUpdatedWatchers[i+1:]...)
return
}
}
}
func (k *smiCache) updatedOccured() {
k.cacheUpdatedWatchersMutex.Lock()
defer k.cacheUpdatedWatchersMutex.Unlock()
for _, cacheUpdated := range k.cacheUpdatedWatchers {
select {
case cacheUpdated <- struct{}{}:
default:
}
}
}