forked from kubevirt/containerized-data-importer
/
leaderelection.go
117 lines (100 loc) · 3.22 KB
/
leaderelection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package main
import (
"context"
"os"
"time"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"kubevirt.io/containerized-data-importer/pkg/common"
"kubevirt.io/containerized-data-importer/pkg/operator"
"kubevirt.io/containerized-data-importer/pkg/util"
)
const (
configMapName = "cdi-controller-leader-election-helper"
componentName = "cdi-controller"
)
func startLeaderElection(ctx context.Context, config *rest.Config, leaderFunc func()) error {
client := kubernetes.NewForConfigOrDie(config)
namespace := util.GetNamespace()
// create manually so it has CDI component label
err := createConfigMap(client, namespace, configMapName)
if err != nil && !k8serrors.IsAlreadyExists(err) {
return err
}
resourceLock, err := createResourceLock(client, namespace, configMapName)
if err != nil {
return err
}
leaderElector, err := createLeaderElector(resourceLock, leaderFunc)
if err != nil {
return err
}
klog.Info("Attempting to acquire leader lease")
go leaderElector.Run(ctx)
return nil
}
func createConfigMap(client kubernetes.Interface, namespace, name string) error {
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
common.CDIComponentLabel: componentName,
},
},
}
err := operator.SetOwner(client, cm)
if err != nil {
return err
}
_, err = client.CoreV1().ConfigMaps(namespace).Create(cm)
return err
}
func createResourceLock(client kubernetes.Interface, namespace, name string) (resourcelock.Interface, error) {
// Leader id, needs to be unique
id, err := os.Hostname()
if err != nil {
return nil, err
}
id = id + "_" + string(uuid.NewUUID())
return resourcelock.New(resourcelock.ConfigMapsResourceLock,
namespace,
name,
client.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: createEventRecorder(client, namespace, componentName),
})
}
func createLeaderElector(resourceLock resourcelock.Interface, leaderFunc func()) (*leaderelection.LeaderElector, error) {
return leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: resourceLock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
klog.Info("Successfully acquired leadership lease")
leaderFunc()
},
OnStoppedLeading: func() {
klog.Fatal("NO LONGER LEADER, EXITING")
},
},
})
}
func createEventRecorder(client kubernetes.Interface, namespace, name string) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: client.CoreV1().Events(namespace)})
return eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: componentName})
}