/
stacklistener.go
104 lines (94 loc) · 2.93 KB
/
stacklistener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package controller
import (
"time"
"github.com/docker/compose-on-kubernetes/api/client/clientset"
"github.com/docker/compose-on-kubernetes/api/client/informers/compose/v1alpha3"
"github.com/docker/compose-on-kubernetes/api/compose/latest"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
)
// StackListener listen for changes in stacks from the API
type StackListener struct {
stacks stackIndexer
reconcileQueue chan<- string
reconcileDeletionQueue chan<- *latest.Stack
ownerCache StackOwnerCacher
}
type stackIndexer interface {
GetStore() cache.Store
Run(<-chan struct{})
}
func (s *StackListener) onAdd(obj interface{}) {
n, err := extractStackNameAndNamespace(obj)
if err != nil {
log.Warnf("StackListener: onAdd: %s", err)
return
}
objKey := n.objKey()
s.ownerCache.setDirty(objKey)
log.Debugf("Sending stack reconciliation request: %s", objKey)
s.reconcileQueue <- objKey
}
func (s *StackListener) onUpdate(_, newObj interface{}) {
n, err := extractStackNameAndNamespace(newObj)
if err != nil {
log.Warnf("StackListener: onUpdate: %s", err)
return
}
objKey := n.objKey()
s.ownerCache.setDirty(objKey)
log.Debugf("Sending stack reconciliation request: %s", objKey)
s.reconcileQueue <- objKey
}
func (s *StackListener) onDelete(obj interface{}) {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
stack, ok := obj.(*latest.Stack)
if !ok {
log.Warnf("StackListener: onDelete: unable to retrive deleted stack")
return
}
log.Debugf("Sending stack deletion request: %s/%s", stack.Namespace, stack.Name)
s.reconcileDeletionQueue <- stack
}
func (s *StackListener) get(key string) (*latest.Stack, error) {
res, exists, err := s.stacks.GetStore().GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.Errorf("not found: %s", key)
}
stack, ok := res.(*latest.Stack)
if !ok {
return nil, errors.Errorf("object with key %s is not a stack: %T", key, res)
}
return stack, nil
}
// Start starts the underlying informer
func (s *StackListener) Start(stop chan struct{}) {
go s.stacks.Run(stop)
}
// NewStackListener creates a StackListener
func NewStackListener(clientSet clientset.Interface,
reconciliationInterval time.Duration,
reconcileQueue chan<- string,
reconcileDeletionQueue chan<- *latest.Stack,
ownerCache StackOwnerCacher) *StackListener {
stacksInformer := v1alpha3.NewFilteredStackInformer(clientSet, reconciliationInterval, func(o *metav1.ListOptions) {})
result := &StackListener{
stacks: stacksInformer,
reconcileQueue: reconcileQueue,
reconcileDeletionQueue: reconcileDeletionQueue,
ownerCache: ownerCache,
}
stacksInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: result.onAdd,
UpdateFunc: result.onUpdate,
DeleteFunc: result.onDelete,
})
return result
}