/
rollouts.go
229 lines (183 loc) · 6.49 KB
/
rollouts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package admiral
import (
"context"
"fmt"
"sync"
"time"
argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
argoclientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned"
argoprojv1alpha1 "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/typed/rollouts/v1alpha1"
argoinformers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
// Handler interface contains the methods that are required
type RolloutHandler interface {
Added(ctx context.Context, obj *argo.Rollout)
Updated(ctx context.Context, obj *argo.Rollout)
Deleted(ctx context.Context, obj *argo.Rollout)
}
type RolloutsEntry struct {
Identity string
Rollout *argo.Rollout
}
type RolloutClusterEntry struct {
Identity string
Rollouts map[string]*argo.Rollout
}
type RolloutController struct {
K8sClient kubernetes.Interface
RolloutClient argoprojv1alpha1.ArgoprojV1alpha1Interface
RolloutHandler RolloutHandler
informer cache.SharedIndexInformer
Cache *rolloutCache
labelSet *common.LabelSet
}
type rolloutCache struct {
//map of dependencies key=identity value array of onboarded identities
cache map[string]*RolloutClusterEntry
mutex *sync.Mutex
}
func (p *rolloutCache) Put(rolloutEntry *RolloutClusterEntry) {
defer p.mutex.Unlock()
p.mutex.Lock()
p.cache[rolloutEntry.Identity] = rolloutEntry
}
func (p *rolloutCache) getKey(rollout *argo.Rollout) string {
return common.GetRolloutGlobalIdentifier(rollout)
}
func (p *rolloutCache) Get(key string, env string) *argo.Rollout {
defer p.mutex.Unlock()
p.mutex.Lock()
rce := p.cache[key]
if rce == nil {
return nil
} else {
return rce.Rollouts[env]
}
}
func (p *rolloutCache) Delete(pod *RolloutClusterEntry) {
defer p.mutex.Unlock()
p.mutex.Lock()
delete(p.cache, pod.Identity)
}
func (p *rolloutCache) UpdateRolloutToClusterCache(key string, rollout *argo.Rollout) {
defer p.mutex.Unlock()
p.mutex.Lock()
env := common.GetEnvForRollout(rollout)
rce := p.cache[key]
if rce == nil {
rce = &RolloutClusterEntry{
Identity: key,
Rollouts: make(map[string]*argo.Rollout),
}
}
rce.Rollouts[env] = rollout
p.cache[rce.Identity] = rce
}
func (p *rolloutCache) DeleteFromRolloutToClusterCache(key string, rollout *argo.Rollout) {
defer p.mutex.Unlock()
p.mutex.Lock()
env := common.GetEnvForRollout(rollout)
rce := p.cache[key]
if rce != nil {
delete(rce.Rollouts, env)
}
}
func (d *RolloutController) shouldIgnoreBasedOnLabelsForRollout(ctx context.Context, rollout *argo.Rollout) bool {
if rollout.Spec.Template.Labels[d.labelSet.AdmiralIgnoreLabel] == "true" { //if we should ignore, do that and who cares what else is there
return true
}
if rollout.Spec.Template.Annotations[d.labelSet.DeploymentAnnotation] != "true" { //Not sidecar injected, we don't want to inject
return true
}
if rollout.Annotations[common.AdmiralIgnoreAnnotation] == "true" {
return true
}
ns, err := d.K8sClient.CoreV1().Namespaces().Get(ctx, rollout.Namespace, meta_v1.GetOptions{})
if err != nil {
log.Warnf("Failed to get namespace object for rollout with namespace %v, err: %v", rollout.Namespace, err)
return false
}
if ns.Annotations[common.AdmiralIgnoreAnnotation] == "true" {
return true
}
return false //labels are fine, we should not ignore
}
func NewRolloutsController(clusterID string, stopCh <-chan struct{}, handler RolloutHandler, config *rest.Config, resyncPeriod time.Duration) (*RolloutController, error) {
roController := RolloutController{}
roController.RolloutHandler = handler
roController.labelSet = common.GetLabelSet()
rolloutCache := rolloutCache{}
rolloutCache.cache = make(map[string]*RolloutClusterEntry)
rolloutCache.mutex = &sync.Mutex{}
roController.Cache = &rolloutCache
var err error
rolloutClient, err := argoclientset.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create rollouts controller argo client: %v", err)
}
roController.K8sClient, err = K8sClientFromConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create rollouts controller k8s client: %v", err)
}
roController.RolloutClient = rolloutClient.ArgoprojV1alpha1()
argoRolloutsInformerFactory := argoinformers.NewSharedInformerFactoryWithOptions(
rolloutClient,
resyncPeriod,
argoinformers.WithNamespace(meta_v1.NamespaceAll))
//Initialize informer
roController.informer = argoRolloutsInformerFactory.Argoproj().V1alpha1().Rollouts().Informer()
mcd := NewMonitoredDelegator(&roController, clusterID, "rollout")
NewController("rollouts-ctrl-"+clusterID, stopCh, mcd, roController.informer)
return &roController, nil
}
func (roc *RolloutController) Added(ctx context.Context, ojb interface{}) {
HandleAddUpdateRollout(ctx, ojb, roc)
}
func (roc *RolloutController) Updated(ctx context.Context, ojb interface{}, oldObj interface{}) {
HandleAddUpdateRollout(ctx, ojb, roc)
}
func HandleAddUpdateRollout(ctx context.Context, ojb interface{}, roc *RolloutController) {
rollout := ojb.(*argo.Rollout)
key := roc.Cache.getKey(rollout)
if len(key) > 0 {
if !roc.shouldIgnoreBasedOnLabelsForRollout(ctx, rollout) {
roc.Cache.UpdateRolloutToClusterCache(key, rollout)
roc.RolloutHandler.Added(ctx, rollout)
} else {
roc.Cache.DeleteFromRolloutToClusterCache(key, rollout)
log.Debugf("ignoring rollout %v based on labels", rollout.Name)
}
}
}
func (roc *RolloutController) Deleted(ctx context.Context, ojb interface{}) {
rollout := ojb.(*argo.Rollout)
key := roc.Cache.getKey(rollout)
if len(key) > 0 {
roc.Cache.DeleteFromRolloutToClusterCache(key, rollout)
}
roc.RolloutHandler.Deleted(ctx, rollout)
}
func (d *RolloutController) GetRolloutBySelectorInNamespace(ctx context.Context, serviceSelector map[string]string, namespace string) []argo.Rollout {
matchedRollouts, err := d.RolloutClient.Rollouts(namespace).List(ctx, meta_v1.ListOptions{})
if err != nil {
logrus.Errorf("Failed to list rollouts in cluster, error: %v", err)
return nil
}
if matchedRollouts.Items == nil {
return make([]argo.Rollout, 0)
}
filteredRollouts := make([]argo.Rollout, 0)
for _, rollout := range matchedRollouts.Items {
if common.IsServiceMatch(serviceSelector, rollout.Spec.Selector) {
filteredRollouts = append(filteredRollouts, rollout)
}
}
return filteredRollouts
}