/
shim_controller.go
138 lines (121 loc) · 3.42 KB
/
shim_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package controller
import (
"context"
"fmt"
"time"
"github.com/kudobuilder/shim/shim-controller/pkg/apis/kudoshim/v1alpha1"
"github.com/kudobuilder/shim/shim-controller/pkg/client"
"github.com/kudobuilder/shim/shim-controller/pkg/kudoshim/shim"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
uruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type Controller struct {
client *client.Client
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
maxRetries int
shim *shim.Shim
}
func NewController(client *client.Client) *Controller {
shim := &shim.Shim{
Client: client,
}
return &Controller{
client: client,
shim: shim,
maxRetries: 1,
}
}
func (c *Controller) Run(ctx context.Context) {
stopCh := make(chan struct{})
defer close(stopCh)
c.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
c.informer = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return c.client.Shim.KudoshimV1alpha1().ShimInstances("").List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return c.client.Shim.KudoshimV1alpha1().ShimInstances("").Watch(context.TODO(), options)
},
},
&v1alpha1.ShimInstance{},
0, //No resync
cache.Indexers{},
)
c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
c.queue.Add(key)
}
},
UpdateFunc: func(old, new interface{}) {
oldObj, _ := old.(*v1alpha1.ShimInstance)
newObj, _ := new.(*v1alpha1.ShimInstance)
if oldObj.GetResourceVersion() != newObj.GetResourceVersion() {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(newObj)
if err == nil {
c.queue.Add(key)
}
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
c.queue.Add(key)
}
},
})
go c.informer.Run(stopCh)
log.Infoln("Controller started.")
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
uruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
log.Infoln("Controller synced.")
wait.Until(c.runWorker, time.Second, stopCh)
}
func (c *Controller) runWorker() {
for c.processNext() {
}
}
func (c *Controller) processNext() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.processItem(key.(string))
if err == nil {
c.queue.Forget(key)
} else if c.queue.NumRequeues(key) < c.maxRetries {
log.Errorf("Error processing %s (will retry): %v", key, err)
c.queue.AddRateLimited(key)
} else {
log.Errorf("Error processing %s (giving up): %v", key, err)
c.queue.Forget(key)
uruntime.HandleError(err)
}
return true
}
func (c *Controller) processItem(key string) error {
obj, _, err := c.informer.GetStore().GetByKey(key)
if err != nil {
return fmt.Errorf("error fetching object with key %s from store: %v", key, err)
}
if obj == nil {
return nil
}
ro, ok := obj.(runtime.Object)
if !ok {
return fmt.Errorf("object with key %s is not a runtime.Object", key)
}
return c.shim.Process(ro)
}