-
Notifications
You must be signed in to change notification settings - Fork 442
/
controller.go
95 lines (77 loc) · 3.19 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package speaker
import (
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/util"
)
const controllerAgentName = "ovn-speaker"
type Controller struct {
config *Configuration
podsLister listerv1.PodLister
podsSynced cache.InformerSynced
subnetsLister kubeovnlister.SubnetLister
subnetSynced cache.InformerSynced
servicesLister listerv1.ServiceLister
servicesSynced cache.InformerSynced
informerFactory kubeinformers.SharedInformerFactory
kubeovnInformerFactory kubeovninformer.SharedInformerFactory
recorder record.EventRecorder
}
func NewController(config *Configuration) *Controller {
utilruntime.Must(kubeovnv1.AddToScheme(scheme.Scheme))
klog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeClient, 0,
kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
listOption.AllowWatchBookmarks = true
}))
kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnClient, 0,
kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
listOption.AllowWatchBookmarks = true
}))
podInformer := informerFactory.Core().V1().Pods()
subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
serviceInformer := informerFactory.Core().V1().Services()
controller := &Controller{
config: config,
podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
subnetsLister: subnetInformer.Lister(),
subnetSynced: subnetInformer.Informer().HasSynced,
servicesLister: serviceInformer.Lister(),
servicesSynced: serviceInformer.Informer().HasSynced,
informerFactory: informerFactory,
kubeovnInformerFactory: kubeovnInformerFactory,
recorder: recorder,
}
return controller
}
func (c *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
c.informerFactory.Start(stopCh)
c.kubeovnInformerFactory.Start(stopCh)
if !cache.WaitForCacheSync(stopCh, c.podsSynced, c.subnetSynced, c.servicesSynced) {
util.LogFatalAndExit(nil, "failed to wait for caches to sync")
return
}
klog.Info("Started workers")
go wait.Until(c.syncSubnetRoutes, 5*time.Second, stopCh)
<-stopCh
klog.Info("Shutting down workers")
}