/
controller.go
147 lines (120 loc) · 4.42 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package controller
import (
contextpkg "context"
"fmt"
"time"
"github.com/tliron/commonlog"
kubernetesutil "github.com/tliron/kutil/kubernetes"
reposureclientset "github.com/tliron/reposure/apis/clientset/versioned"
reposureinformers "github.com/tliron/reposure/apis/informers/externalversions"
reposurelisters "github.com/tliron/reposure/apis/listers/reposure.puccini.cloud/v1alpha1"
adminclient "github.com/tliron/reposure/client/admin"
reposureresources "github.com/tliron/reposure/resources/reposure.puccini.cloud/v1alpha1"
apiextensionspkg "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
dynamicpkg "k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
restpkg "k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
)
//
// Controller
//
type Controller struct {
Config *restpkg.Config
Dynamic *kubernetesutil.Dynamic
Kubernetes kubernetes.Interface
Reposure reposureclientset.Interface
Client *adminclient.Client
CachePath string
StopChannel <-chan struct{}
Processors *kubernetesutil.Processors
Events record.EventRecorder
KubernetesInformerFactory informers.SharedInformerFactory
ReposureInformerFactory reposureinformers.SharedInformerFactory
Registries reposurelisters.RegistryLister
Context contextpkg.Context
Log commonlog.Logger
}
func NewController(context contextpkg.Context, toolName string, clusterMode bool, clusterRole string, namespace string, dynamic dynamicpkg.Interface, kubernetes kubernetes.Interface, apiExtensions apiextensionspkg.Interface, reposure reposureclientset.Interface, config *restpkg.Config, informerResyncPeriod time.Duration, stopChannel <-chan struct{}) *Controller {
if clusterMode {
namespace = ""
if clusterRole != "" {
clusterRole = "cluster-admin"
}
}
log := commonlog.GetLoggerf("%s.controller", toolName)
self := Controller{
Config: config,
Dynamic: kubernetesutil.NewDynamic(toolName, dynamic, kubernetes.Discovery(), namespace, context),
Kubernetes: kubernetes,
Reposure: reposure,
StopChannel: stopChannel,
Processors: kubernetesutil.NewProcessors(toolName),
Events: kubernetesutil.CreateEventRecorder(kubernetes, "Reposure", log),
Context: context,
Log: log,
}
self.Client = adminclient.NewClient(
kubernetes,
apiExtensions,
reposure,
kubernetes.CoreV1().RESTClient(),
config,
context,
clusterMode,
clusterRole,
namespace,
NamePrefix,
PartOf,
ManagedBy,
OperatorImageReference,
SurrogateImageReference,
SimpleImageReference,
fmt.Sprintf("%s.client", toolName),
)
if clusterMode {
self.KubernetesInformerFactory = informers.NewSharedInformerFactory(kubernetes, informerResyncPeriod)
self.ReposureInformerFactory = reposureinformers.NewSharedInformerFactory(reposure, informerResyncPeriod)
} else {
self.KubernetesInformerFactory = informers.NewSharedInformerFactoryWithOptions(kubernetes, informerResyncPeriod, informers.WithNamespace(namespace))
self.ReposureInformerFactory = reposureinformers.NewSharedInformerFactoryWithOptions(reposure, informerResyncPeriod, reposureinformers.WithNamespace(namespace))
}
// Informers
registryInformer := self.ReposureInformerFactory.Reposure().V1alpha1().Registries()
// Listers
self.Registries = registryInformer.Lister()
// Processors
processorPeriod := 5 * time.Second
self.Processors.Add(reposureresources.RegistryGVK, kubernetesutil.NewProcessor(
toolName,
"registries",
registryInformer.Informer(),
processorPeriod,
func(name string, namespace string) (any, error) {
return self.Client.GetRegistry(namespace, name)
},
func(object any) (bool, error) {
return self.processRegistry(object.(*reposureresources.Registry))
},
))
return &self
}
func (self *Controller) Run(concurrency uint, startup func()) error {
defer utilruntime.HandleCrash()
self.Log.Info("starting informer factories")
self.KubernetesInformerFactory.Start(self.StopChannel)
self.ReposureInformerFactory.Start(self.StopChannel)
self.Log.Info("waiting for processor informer caches to sync")
utilruntime.HandleError(self.Processors.WaitForCacheSync(self.StopChannel))
self.Log.Infof("starting processors (concurrency=%d)", concurrency)
self.Processors.Start(concurrency, self.StopChannel)
defer self.Processors.ShutDown()
if startup != nil {
go startup()
}
<-self.StopChannel
self.Log.Info("shutting down")
return nil
}