-
Notifications
You must be signed in to change notification settings - Fork 52
/
zero_controller.go
400 lines (362 loc) · 15.2 KB
/
zero_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
package controllers
import (
"context"
"fmt"
"reflect"
"strings"
"time"
"github.com/go-logr/logr"
v1 "github.com/infinispan/infinispan-operator/api/v1"
consts "github.com/infinispan/infinispan-operator/controllers/constants"
"github.com/infinispan/infinispan-operator/pkg/infinispan/client/api"
"github.com/infinispan/infinispan-operator/pkg/infinispan/version"
kube "github.com/infinispan/infinispan-operator/pkg/kubernetes"
. "github.com/infinispan/infinispan-operator/pkg/reconcile/pipeline/infinispan/handler/provision"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
// Adapter interface that allows the zero-capacity controller to interact with the underlying k8 resource
type zeroCapacityResource interface {
// Returns the name of the Infinispan cluster CR that the zero-pod should join
Cluster() string
// The current execution phase of the controller
Phase() zeroCapacityPhase
// Update the current state of the resource to reflect the most recent Phase
UpdatePhase(phase zeroCapacityPhase, phaseErr error) error
// Ensure that all prerequisite resources are av¬ailable and create any required resources before returning the zero spec
Init() (*zeroCapacitySpec, error)
// Perform the operation(s) that are required on the zero-capacity pod
Exec(client api.Infinispan) error
// Return true when the operation(s) have completed, otherwise false
ExecStatus(api api.Infinispan) (zeroCapacityPhase, error)
// Utility method to return a metav1.Object in order to set the controller reference
AsMeta() metav1.Object
}
type zeroCapacityReconciler interface {
// The k8 struct being handled by this controller
Type() client.Object
// Create a new instance of the zero Resource wrapping the actual k8 type
ResourceInstance(ctx context.Context, name types.NamespacedName, ctrl *zeroCapacityController) (zeroCapacityResource, error)
}
type zeroCapacitySpec struct {
// The VolumeSpec to utilise on the zero-capacity pod
Volume zeroCapacityVolumeSpec
// The spec to be used by the zero-capacity pod
Container v1.InfinispanContainerSpec
// The labels to apply to the zero-capacity pod
PodLabels map[string]string
}
type zeroCapacityVolumeSpec struct {
// If true a chmod initContainer is added to the pod to update the permissions of the MountPath
UpdatePermissions bool
// Path within the container at which the volume should be mounted.
MountPath string
// The VolumeSource to utilise on the zero-capacity pod
VolumeSource corev1.VolumeSource
}
type zeroCapacityController struct {
client.Client
Name string
Reconciler zeroCapacityReconciler
Kube *kube.Kubernetes
Log logr.Logger
Scheme *runtime.Scheme
EventRec record.EventRecorder
VersionManager *version.Manager
}
type zeroCapacityPhase string
const (
// ZeroInitializing means the request has been accepted by the system, but the underlying resources are still
// being initialized.
ZeroInitializing zeroCapacityPhase = "Initializing"
// ZeroInitialized means that all required resources have been initialized
ZeroInitialized zeroCapacityPhase = "Initialized"
// ZeroRunning means that the required action has been initiated on the infinispan server.
ZeroRunning zeroCapacityPhase = "Running"
// ZeroSucceeded means that the action on the server has completed and the zero pod has been terminated.
ZeroSucceeded zeroCapacityPhase = "Succeeded"
// ZeroFailed means that the action failed on the infinispan server and the zero pod has terminated.
ZeroFailed zeroCapacityPhase = "Failed"
// ZeroUnknown means that for some reason the state of the action could not be obtained, typically due
// to an error in communicating with the underlying zero pod.
ZeroUnknown zeroCapacityPhase = "Unknown"
)
func newZeroCapacityController(name string, reconciler zeroCapacityReconciler, mgr ctrl.Manager) error {
versionManager, err := version.ManagerFromEnv(v1.OperatorOperandVersionEnvVarName)
if err != nil {
return err
}
r := &zeroCapacityController{
Name: name,
Client: mgr.GetClient(),
Reconciler: reconciler,
Kube: kube.NewKubernetesFromController(mgr),
Log: ctrl.Log.WithName("controllers").WithName(name),
Scheme: mgr.GetScheme(),
EventRec: mgr.GetEventRecorderFor(strings.ToLower(name) + "-controller"),
VersionManager: versionManager,
}
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{Reconciler: r}).
For(reconciler.Type()).
Owns(&corev1.Pod{}).
Complete(r)
}
func (z *zeroCapacityController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
reconciler := z.Reconciler
resource := reflect.TypeOf(reconciler.Type()).Elem().Name()
namespace := request.Namespace
reqLogger := z.Log.WithValues("Request.Namespace", namespace, "Request.Name", request.Name)
reqLogger.Info("Reconciling " + resource)
defer reqLogger.Info("----- End Reconciling " + resource)
instance, err := reconciler.ResourceInstance(ctx, request.NamespacedName, z)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
// Return and don't requeue
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
return reconcile.Result{}, fmt.Errorf("unable to fetch %s CR '%s': %w", resource, request.Name, err)
}
phase := instance.Phase()
switch phase {
case "":
return reconcile.Result{}, instance.UpdatePhase(ZeroInitializing, nil)
case ZeroInitializing:
return z.initializeResources(request, instance, ctx)
}
infinispan := &v1.Infinispan{}
clusterName := instance.Cluster()
clusterObjKey := types.NamespacedName{
Namespace: namespace,
Name: clusterName,
}
if err := z.Get(ctx, clusterObjKey, infinispan); err != nil {
if errors.IsNotFound(err) {
if phase == ZeroSucceeded || phase == ZeroFailed {
// If the cluster no longer exists and the operation has failed or succeeded already, no need todo anything
return reconcile.Result{}, nil
}
return reconcile.Result{}, fmt.Errorf("CR '%s' not found", clusterName)
}
// Error reading the object - requeue the request.
return reconcile.Result{}, fmt.Errorf("unable to fetch CR '%s': %w", clusterName, err)
}
podName := instance.AsMeta().GetName()
ispnClient, err := NewInfinispanForPod(ctx, podName, infinispan, z.VersionManager, z.Kube)
if err != nil {
return reconcile.Result{}, err
}
switch phase {
case ZeroInitialized:
return z.execute(ispnClient, request, instance, ctx)
case ZeroSucceeded, ZeroFailed:
return z.cleanupResources(ispnClient, request, ctx)
default:
// Phase must be ZeroRunning, so wait for execution to complete
return z.waitForExecutionToComplete(ispnClient, request, instance)
}
}
func (z *zeroCapacityController) initializeResources(request reconcile.Request, instance zeroCapacityResource, ctx context.Context) (reconcile.Result, error) {
name := request.Name
namespace := request.Namespace
clusterName := instance.Cluster()
clusterKey := types.NamespacedName{
Namespace: namespace,
Name: clusterName,
}
infinispan := &v1.Infinispan{}
if err := z.Client.Get(ctx, clusterKey, infinispan); err != nil {
z.Log.Info(fmt.Sprintf("Unable to load Infinispan Cluster '%s': %s", clusterName, err))
if errors.IsNotFound(err) {
return reconcile.Result{RequeueAfter: consts.DefaultWaitOnCluster}, nil
}
return reconcile.Result{}, err
}
if err := infinispan.EnsureClusterStability(); err != nil {
z.Log.Info(fmt.Sprintf("Infinispan '%s' not ready: %s", clusterName, err.Error()))
return reconcile.Result{RequeueAfter: consts.DefaultWaitOnCluster}, nil
}
podList := &corev1.PodList{}
podLabels := infinispan.PodSelectorLabels()
if err := z.Kube.ResourcesList(infinispan.Namespace, podLabels, podList, ctx); err != nil {
z.Log.Error(err, "Failed to list pods")
return reconcile.Result{}, err
}
podSecurityCtx := podList.Items[0].Spec.SecurityContext
spec, err := instance.Init()
if err != nil {
return reconcile.Result{}, err
}
err = z.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &corev1.Pod{})
if errors.IsNotFound(err) {
pod, err := z.zeroPodSpec(name, namespace, podSecurityCtx, infinispan, spec)
if err != nil {
return reconcile.Result{}, fmt.Errorf("unable to compute Spec for zero-capacity pod: %w", err)
}
if err := controllerutil.SetControllerReference(instance.AsMeta(), pod, z.Scheme); err != nil {
return reconcile.Result{}, fmt.Errorf("unable to setControllerReference for zero-capacity pod: %w", err)
}
if infinispan.IsSiteTLSEnabled() {
// JGroups uses SSL sockets with cross-site so we need to mount the keystore and truststore
AddSecretVolume(infinispan.GetSiteTransportSecretName(), SiteTransportKeystoreVolumeName, consts.SiteTransportKeyStoreRoot, &pod.Spec, InfinispanContainer)
// check if truststore exists
trustStoreSecret := &corev1.Secret{}
err := z.Get(ctx, types.NamespacedName{Namespace: namespace, Name: infinispan.GetSiteTrustoreSecretName()}, trustStoreSecret)
if err == nil {
// truststore secret exists, mount the volume
AddSecretVolume(infinispan.GetSiteTrustoreSecretName(), SiteTruststoreVolumeName, consts.SiteTrustStoreRoot, &pod.Spec, InfinispanContainer)
} else if !errors.IsNotFound(err) {
return reconcile.Result{}, fmt.Errorf("unable to create zero-capacity pod: %w", err)
}
}
if err := z.Create(ctx, pod); err != nil {
return reconcile.Result{}, fmt.Errorf("unable to create zero-capacity pod: %w", err)
}
}
// Update status
return reconcile.Result{}, instance.UpdatePhase(ZeroInitialized, nil)
}
func (z *zeroCapacityController) execute(ispnClient api.Infinispan, request reconcile.Request, instance zeroCapacityResource, ctx context.Context) (reconcile.Result, error) {
if !z.isZeroPodReady(request, ctx) {
return reconcile.Result{RequeueAfter: time.Second}, nil
}
if err := instance.Exec(ispnClient); err != nil {
z.Log.Error(err, "unable to execute action on zero-capacity pod", "request.Name", request.Name)
return reconcile.Result{}, instance.UpdatePhase(ZeroFailed, err)
}
return reconcile.Result{}, instance.UpdatePhase(ZeroRunning, nil)
}
func (z *zeroCapacityController) waitForExecutionToComplete(ispnClient api.Infinispan, request reconcile.Request, instance zeroCapacityResource) (reconcile.Result, error) {
phase, err := instance.ExecStatus(ispnClient)
if err != nil || phase == ZeroFailed {
z.Log.Error(err, "execution failed", "request.Name", request.Name)
return reconcile.Result{}, instance.UpdatePhase(ZeroFailed, err)
}
if phase == ZeroSucceeded {
return reconcile.Result{}, instance.UpdatePhase(ZeroSucceeded, nil)
}
// Execution has not completed, or it's state is unknown, wait 1 second before retrying
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
func (z *zeroCapacityController) cleanupResources(ispnClient api.Infinispan, request reconcile.Request, ctx context.Context) (reconcile.Result, error) {
// Stop the zero-capacity server so that it leaves the Infinispan cluster
if z.isZeroPodReady(request, ctx) {
if err := ispnClient.Server().Stop(); err != nil {
err = fmt.Errorf("unable to stop zero-capacity server: %w", err)
z.Log.Error(err, "error encountered when cleaning up zero-capacity pod")
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}
func (z *zeroCapacityController) isZeroPodReady(request reconcile.Request, ctx context.Context) bool {
pod := &corev1.Pod{}
if err := z.Get(ctx, request.NamespacedName, pod); err != nil {
return false
}
return kube.IsPodReady(*pod)
}
func (z *zeroCapacityController) zeroPodSpec(name, namespace string, podSecurityCtx *corev1.PodSecurityContext, ispn *v1.Infinispan, zeroSpec *zeroCapacitySpec) (*corev1.Pod, error) {
operand, _ := z.VersionManager.WithRef(ispn.Spec.Version)
podResources, err := PodResources(zeroSpec.Container)
if err != nil {
return nil, err
}
dataVolName := name + "-data"
labels := ispn.PodLabels()
labels["app"] = "infinispan-zero-pod"
for k, v := range zeroSpec.PodLabels {
labels[k] = v
}
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: labels,
Annotations: ispn.PodAnnotations(),
},
Spec: corev1.PodSpec{
SecurityContext: podSecurityCtx,
Containers: []corev1.Container{{
Image: ispn.ImageName(),
Name: InfinispanContainer,
Env: PodEnv(ispn, &[]corev1.EnvVar{{Name: "IDENTITIES_BATCH", Value: consts.ServerOperatorSecurity + "/" + consts.ServerIdentitiesBatchFilename}}),
Lifecycle: PodLifecycle(),
LivenessProbe: PodLivenessProbe(ispn, operand),
Ports: []corev1.ContainerPort{
{ContainerPort: consts.InfinispanAdminPort, Name: consts.InfinispanAdminPortName, Protocol: corev1.ProtocolTCP},
{ContainerPort: consts.InfinispanPingPort, Name: consts.InfinispanPingPortName, Protocol: corev1.ProtocolTCP},
},
ReadinessProbe: PodReadinessProbe(ispn, operand),
Resources: *podResources,
StartupProbe: PodStartupProbe(ispn, operand),
Args: []string{"-c", "operator/infinispan-zero.xml", "-l", OperatorConfMountPath + "/log4j.xml"},
VolumeMounts: []corev1.VolumeMount{
{
Name: ConfigVolumeName,
MountPath: OperatorConfMountPath,
},
// Utilise Ephemeral vol as we're only interested in data related to CR
{
Name: dataVolName,
MountPath: DataMountPath,
},
// Mount configured volume at /zero path so that any created content is stored independent of server data
{
Name: name,
MountPath: zeroSpec.Volume.MountPath,
}, {
Name: InfinispanSecurityVolumeName,
MountPath: consts.ServerOperatorSecurity,
},
},
}},
RestartPolicy: corev1.RestartPolicyNever,
Volumes: []corev1.Volume{
// Volume for mounting zero-capacity yaml configmap
{
Name: ConfigVolumeName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{Name: ispn.GetConfigName()},
},
}},
// Volume for reading/writing data
{
Name: name,
VolumeSource: zeroSpec.Volume.VolumeSource,
},
// EmptyDir for Infinispan data volume as the zero-capacity node does not store traditional data
{
Name: dataVolName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
}, {
Name: InfinispanSecurityVolumeName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: ispn.GetInfinispanSecuritySecretName(),
},
},
},
},
},
}
if zeroSpec.Volume.UpdatePermissions {
AddVolumeChmodInitContainer("backup-chmod-pv", name, zeroSpec.Volume.MountPath, &pod.Spec)
}
return pod, nil
}