/
leader.go
92 lines (76 loc) · 2.81 KB
/
leader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package k8s
import (
"context"
"os"
"time"
"github.com/golang/glog"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
)
// newLeaderElector creates a new LeaderElection and returns the Elector.
func newLeaderElector(client kubernetes.Interface, callbacks leaderelection.LeaderCallbacks, namespace string, lockName string) (*leaderelection.LeaderElector, error) {
podName := os.Getenv("POD_NAME")
broadcaster := record.NewBroadcaster()
hostname, _ := os.Hostname()
source := v1.EventSource{Component: "nginx-ingress-leader-elector", Host: hostname}
recorder := broadcaster.NewRecorder(scheme.Scheme, source)
lock := resourcelock.ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{Namespace: namespace, Name: lockName},
Client: client.CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: podName,
EventRecorder: recorder,
},
}
ttl := 30 * time.Second
return leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: &lock,
LeaseDuration: ttl,
RenewDeadline: ttl / 2,
RetryPeriod: ttl / 4,
Callbacks: callbacks,
})
}
// createLeaderHandler builds the handler funcs for leader handling
func createLeaderHandler(lbc *LoadBalancerController) leaderelection.LeaderCallbacks {
return leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
glog.V(3).Info("started leading")
if lbc.reportIngressStatus {
ingresses := lbc.configuration.GetResourcesWithFilter(resourceFilter{Ingresses: true})
glog.V(3).Infof("Updating status for %v Ingresses", len(ingresses))
err := lbc.statusUpdater.UpdateExternalEndpointsForResources(ingresses)
if err != nil {
glog.V(3).Infof("error updating status when starting leading: %v", err)
}
}
if lbc.areCustomResourcesEnabled {
glog.V(3).Info("updating VirtualServer and VirtualServerRoutes status")
err := lbc.updateVirtualServersStatusFromEvents()
if err != nil {
glog.V(3).Infof("error updating VirtualServers status when starting leading: %v", err)
}
err = lbc.updateVirtualServerRoutesStatusFromEvents()
if err != nil {
glog.V(3).Infof("error updating VirtualServerRoutes status when starting leading: %v", err)
}
err = lbc.updatePoliciesStatus()
if err != nil {
glog.V(3).Infof("error updating Policies status when starting leading: %v", err)
}
err = lbc.updateTransportServersStatusFromEvents()
if err != nil {
glog.V(3).Infof("error updating TransportServers status when starting leading: %v", err)
}
}
},
OnStoppedLeading: func() {
glog.V(3).Info("stopped leading")
},
}
}