/
manager_table.go
153 lines (140 loc) · 4.66 KB
/
manager_table.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package manager
import (
"context"
"fmt"
"time"
"github.com/kube-vip/kube-vip/pkg/vip"
log "github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)
// Start will begin the Manager, which will start services and watch the configmap
func (sm *Manager) startTableMode(id string) error {
var ns string
var err error
// use a Go context so we can tell the leaderelection code when we
// want to step down
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Infof("all routing table entries will exist in table [%d] with protocol [%d]", sm.config.RoutingTableID, sm.config.RoutingProtocol)
if sm.config.CleanRoutingTable {
go func() {
// we assume that after 10s all services should be configured so we can delete redundant routes
time.Sleep(time.Second * 10)
if err := sm.cleanRoutes(); err != nil {
log.Errorf("error checking for old routes: %v", err)
}
}()
}
// Shutdown function that will wait on this signal, unless we call it ourselves
go func() {
<-sm.signalChan
log.Info("Received termination, signaling shutdown")
// Cancel the context, which will in turn cancel the leadership
cancel()
}()
ns, err = returnNameSpace()
if err != nil {
log.Warnf("unable to auto-detect namespace, dropping to [%s]", sm.config.Namespace)
ns = sm.config.Namespace
}
// Start a services watcher (all kube-vip pods will watch services), upon a new service
// a lock based upon that service is created that they will all leaderElection on
if sm.config.EnableServicesElection {
log.Infof("beginning watching services, leaderelection will happen for every service")
err = sm.startServicesWatchForLeaderElection(ctx)
if err != nil {
return err
}
} else if sm.config.EnableLeaderElection {
log.Infof("beginning services leadership, namespace [%s], lock name [%s], id [%s]", ns, plunderLock, id)
// we use the Lease lock type since edits to Leases are less common
// and fewer objects in the cluster watch "all Leases".
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: plunderLock,
Namespace: ns,
},
Client: sm.clientSet.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
ReleaseOnCancel: true,
LeaseDuration: time.Duration(sm.config.LeaseDuration) * time.Second,
RenewDeadline: time.Duration(sm.config.RenewDeadline) * time.Second,
RetryPeriod: time.Duration(sm.config.RetryPeriod) * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
err = sm.servicesWatcher(ctx, sm.syncServices)
if err != nil {
log.Fatal(err)
}
},
OnStoppedLeading: func() {
// we can do cleanup here
log.Infof("leader lost: %s", id)
for _, instance := range sm.serviceInstances {
for _, cluster := range instance.clusters {
cluster.Stop()
}
}
log.Fatal("lost leadership, restarting kube-vip")
},
OnNewLeader: func(identity string) {
// we're notified when new leader elected
if identity == id {
// I just got the lock
return
}
log.Infof("new leader elected: %s", identity)
},
},
})
} else {
log.Infof("beginning watching services without leader election")
err = sm.servicesWatcher(ctx, sm.syncServices)
if err != nil {
log.Errorf("Cannot watch services, %v", err)
}
}
return nil
}
func (sm *Manager) cleanRoutes() error {
routes, err := vip.ListRoutes(sm.config.RoutingTableID, sm.config.RoutingProtocol)
if err != nil {
return fmt.Errorf("error getting routes: %w", err)
}
for i := range routes {
found := false
for _, instance := range sm.serviceInstances {
for _, cluster := range instance.clusters {
for n := range cluster.Network {
r := cluster.Network[n].PrepareRoute()
if r.Dst.String() == routes[i].Dst.String() {
found = true
}
}
}
}
if !found {
err = netlink.RouteDel(&(routes[i]))
if err != nil {
log.Errorf("[route] error deleting route: %v", routes[i])
}
log.Debugf("[route] deleted route: %v", routes[i])
}
}
return nil
}