forked from openshift/ovn-kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
master.go
406 lines (369 loc) · 14.3 KB
/
master.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
package cluster
import (
"fmt"
"net"
kapi "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"github.com/openvswitch/ovn-kubernetes/go-controller/pkg/ovn"
"github.com/openvswitch/ovn-kubernetes/go-controller/pkg/util"
"github.com/openshift/origin/pkg/util/netutils"
"github.com/sirupsen/logrus"
)
// RebuildOVNDatabase rebuilds db if HA option is enabled and OVN DB
// doesn't exist. First It updates k8s nodes by creating a logical
// switch for each node. Then it reads all resources from k8s and
// creates needed resources in OVN DB. Last, it updates master node
// ip in default namespace to trigger event on node.
func (cluster *OvnClusterController) RebuildOVNDatabase(
masterNodeName string, oc *ovn.Controller) error {
logrus.Debugf("Rebuild OVN database for cluster nodes")
var err error
ipChange, err := cluster.checkMasterIPChange(masterNodeName)
if err != nil {
logrus.Errorf("Error when checking Master Node IP Change: %v", err)
return err
}
// If OvnHA options is enabled, make sure default namespace has the
// annotation of current cluster master node's overlay IP address
logrus.Debugf("cluster.OvnHA: %t", cluster.OvnHA)
if cluster.OvnHA && ipChange {
logrus.Debugf("HA is enabled and DB doesn't exist!")
// Rebuild OVN DB for the k8s nodes
err = cluster.UpdateDBForKubeNodes(masterNodeName)
if err != nil {
return err
}
// Rebuild OVN DB for the k8s namespaces and all the resource
// objects inside the namespace including pods and network
// policies
err = cluster.UpdateKubeNsObjects(oc)
if err != nil {
return err
}
// Update master node IP annotation on default namespace
err = cluster.UpdateMasterNodeIP(masterNodeName)
if err != nil {
return err
}
}
return nil
}
// UpdateDBForKubeNodes rebuilds ovn db for k8s nodes
func (cluster *OvnClusterController) UpdateDBForKubeNodes(
masterNodeName string) error {
nodes, err := cluster.Kube.GetNodes()
if err != nil {
logrus.Errorf("Failed to obtain k8s nodes: %v", err)
return err
}
for _, node := range nodes.Items {
subnet, ok := node.Annotations[OvnHostSubnet]
if ok {
// Create a logical switch for the node
logrus.Debugf("ovn_host_subnet: %s", subnet)
ip, localNet, err := net.ParseCIDR(subnet)
if err != nil {
return fmt.Errorf("Failed to parse subnet %v: %v", subnet,
err)
}
ip = util.NextIP(ip)
n, _ := localNet.Mask.Size()
routerIPMask := fmt.Sprintf("%s/%d", ip.String(), n)
stdout, stderr, err := util.RunOVNNbctl("--may-exist",
"ls-add", node.Name, "--", "set", "logical_switch",
node.Name, fmt.Sprintf("other-config:subnet=%s", subnet),
fmt.Sprintf("external-ids:gateway_ip=%s", routerIPMask))
if err != nil {
logrus.Errorf("Failed to create logical switch for "+
"node %s, stdout: %q, stderr: %q, error: %v",
node.Name, stdout, stderr, err)
return err
}
}
}
return nil
}
// UpdateKubeNsObjects rebuilds ovn db for k8s namespaces
// and pods/networkpolicies inside the namespaces.
func (cluster *OvnClusterController) UpdateKubeNsObjects(
oc *ovn.Controller) error {
namespaces, err := cluster.Kube.GetNamespaces()
if err != nil {
logrus.Errorf("Failed to get k8s namespaces: %v", err)
return err
}
for _, ns := range namespaces.Items {
oc.AddNamespace(&ns)
pods, err := cluster.Kube.GetPods(ns.Name)
if err != nil {
logrus.Errorf("Failed to get k8s pods: %v", err)
return err
}
for _, pod := range pods.Items {
oc.AddLogicalPortWithIP(&pod)
}
endpoints, err := cluster.Kube.GetEndpoints(ns.Name)
if err != nil {
logrus.Errorf("Failed to get k8s endpoints: %v", err)
return err
}
for _, ep := range endpoints.Items {
er := oc.AddEndpoints(&ep)
if er != nil {
logrus.Errorf("Error adding endpoints: %v", er)
return er
}
}
policies, err := cluster.Kube.GetNetworkPolicies(ns.Name)
if err != nil {
logrus.Errorf("Failed to get k8s network policies: %v", err)
return err
}
for _, policy := range policies.Items {
oc.AddNetworkPolicy(&policy)
}
}
return nil
}
// UpdateMasterNodeIP add annotations of master node IP on
// default namespace
func (cluster *OvnClusterController) UpdateMasterNodeIP(
masterNodeName string) error {
masterNodeIP, err := netutils.GetNodeIP(masterNodeName)
if err != nil {
return fmt.Errorf("Failed to obtain local IP from master node "+
"%q: %v", masterNodeName, err)
}
defaultNs, err := cluster.Kube.GetNamespace(DefaultNamespace)
if err != nil {
return fmt.Errorf("Failed to get default namespace: %v", err)
}
// Get overlay ip on OVN master node from default namespace. If it
// doesn't have it or the IP address is different than the current one,
// set it with current master overlay IP.
masterIP, ok := defaultNs.Annotations[MasterOverlayIP]
if !ok || masterIP != masterNodeIP {
err := cluster.Kube.SetAnnotationOnNamespace(defaultNs, MasterOverlayIP,
masterNodeIP)
if err != nil {
return fmt.Errorf("Failed to set %s=%s on namespace %s: %v",
MasterOverlayIP, masterNodeIP, defaultNs.Name, err)
}
}
return nil
}
func (cluster *OvnClusterController) checkMasterIPChange(
masterNodeName string) (bool, error) {
// Check DB existence by checking if the default namespace annotated
// IP address is the same as the master node IP. If annotated IP
// address is different, we assume that the ovn db is crashed on the
// old node and needs to be rebuilt on the new master node.
masterNodeIP, err := netutils.GetNodeIP(masterNodeName)
if err != nil {
return false, fmt.Errorf("Failed to obtain local IP from master "+
"node %q: %v", masterNodeName, err)
}
defaultNs, err := cluster.Kube.GetNamespace(DefaultNamespace)
if err != nil {
return false, fmt.Errorf("Failed to get default namespace: %v", err)
}
// Get overlay ip on OVN master node from default namespace. If the IP
// address is different than master node IP, return true. Else, return
// false.
masterIP := defaultNs.Annotations[MasterOverlayIP]
logrus.Debugf("Master IP: %s, Annotated IP: %s", masterNodeIP, masterIP)
if masterIP != masterNodeIP {
logrus.Debugf("Detected Master node IP is different than default " +
"namespae annotated IP.")
return true, nil
}
return false, nil
}
// StartClusterMaster runs a subnet IPAM and a controller that watches arrival/departure
// of nodes in the cluster
// On an addition to the cluster (node create), a new subnet is created for it that will translate
// to creation of a logical switch (done by the node, but could be created here at the master process too)
// Upon deletion of a node, the switch will be deleted
//
// TODO: Verify that the cluster was not already called with a different global subnet
// If true, then either quit or perform a complete reconfiguration of the cluster (recreate switches/routers with new subnet values)
func (cluster *OvnClusterController) StartClusterMaster(masterNodeName string) error {
clusterNetwork := cluster.ClusterIPNet
hostSubnetLength := cluster.HostSubnetLength
subrange := make([]string, 0)
existingNodes, err := cluster.Kube.GetNodes()
if err != nil {
logrus.Errorf("Error in initializing/fetching subnets: %v", err)
return err
}
for _, node := range existingNodes.Items {
hostsubnet, ok := node.Annotations[OvnHostSubnet]
if ok {
subrange = append(subrange, hostsubnet)
}
}
// NewSubnetAllocator is a subnet IPAM, which takes a CIDR (first argument)
// and gives out subnets of length 'hostSubnetLength' (second argument)
// but omitting any that exist in 'subrange' (third argument)
cluster.masterSubnetAllocator, err = netutils.NewSubnetAllocator(clusterNetwork.String(), hostSubnetLength, subrange)
if err != nil {
return err
}
// now go over the 'existing' list again and create annotations for those who do not have it
for _, node := range existingNodes.Items {
_, ok := node.Annotations[OvnHostSubnet]
if !ok {
err := cluster.addNode(&node)
if err != nil {
logrus.Errorf("error creating subnet for node %s: %v", node.Name, err)
}
}
}
if err := cluster.SetupMaster(masterNodeName); err != nil {
logrus.Errorf("Failed to setup master (%v)", err)
return err
}
// Watch all node events. On creation, addNode will be called that will
// create a subnet for the switch belonging to that node. On a delete
// call, the subnet will be returned to the allocator as the switch is
// deleted from ovn
return cluster.watchNodes()
}
// SetupMaster creates the central router and load-balancers for the network
func (cluster *OvnClusterController) SetupMaster(masterNodeName string) error {
if err := setupOVNMaster(masterNodeName); err != nil {
return err
}
// Create a single common distributed router for the cluster.
stdout, stderr, err := util.RunOVNNbctl("--", "--may-exist", "lr-add", masterNodeName, "--", "set", "logical_router", masterNodeName, "external_ids:k8s-cluster-router=yes")
if err != nil {
logrus.Errorf("Failed to create a single common distributed router for the cluster, stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
return err
}
// Create 2 load-balancers for east-west traffic. One handles UDP and another handles TCP.
k8sClusterLbTCP, stderr, err := util.RunOVNNbctl("--data=bare", "--no-heading", "--columns=_uuid", "find", "load_balancer", "external_ids:k8s-cluster-lb-tcp=yes")
if err != nil {
logrus.Errorf("Failed to get tcp load-balancer, stderr: %q, error: %v", stderr, err)
return err
}
if k8sClusterLbTCP == "" {
stdout, stderr, err = util.RunOVNNbctl("--", "create", "load_balancer", "external_ids:k8s-cluster-lb-tcp=yes", "protocol=tcp")
if err != nil {
logrus.Errorf("Failed to create tcp load-balancer, stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
return err
}
}
k8sClusterLbUDP, stderr, err := util.RunOVNNbctl("--data=bare", "--no-heading", "--columns=_uuid", "find", "load_balancer", "external_ids:k8s-cluster-lb-udp=yes")
if err != nil {
logrus.Errorf("Failed to get udp load-balancer, stderr: %q, error: %v", stderr, err)
return err
}
if k8sClusterLbUDP == "" {
stdout, stderr, err = util.RunOVNNbctl("--", "create", "load_balancer", "external_ids:k8s-cluster-lb-udp=yes", "protocol=udp")
if err != nil {
logrus.Errorf("Failed to create udp load-balancer, stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
return err
}
}
// Create a logical switch called "join" that will be used to connect gateway routers to the distributed router.
// The "join" will be allocated IP addresses in the range 100.64.1.0/24.
stdout, stderr, err = util.RunOVNNbctl("--may-exist", "ls-add", "join")
if err != nil {
logrus.Errorf("Failed to create logical switch called \"join\", stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
return err
}
// Connect the distributed router to "join".
routerMac, stderr, err := util.RunOVNNbctl("--if-exist", "get", "logical_router_port", "rtoj-"+masterNodeName, "mac")
if err != nil {
logrus.Errorf("Failed to get logical router port rtoj-%v, stderr: %q, error: %v", masterNodeName, stderr, err)
return err
}
if routerMac == "" {
routerMac = util.GenerateMac()
stdout, stderr, err = util.RunOVNNbctl("--", "--may-exist", "lrp-add", masterNodeName, "rtoj-"+masterNodeName, routerMac, "100.64.1.1/24", "--", "set", "logical_router_port", "rtoj-"+masterNodeName, "external_ids:connect_to_join=yes")
if err != nil {
logrus.Errorf("Failed to add logical router port rtoj-%v, stdout: %q, stderr: %q, error: %v", masterNodeName, stdout, stderr, err)
return err
}
}
// Connect the switch "join" to the router.
stdout, stderr, err = util.RunOVNNbctl("--", "--may-exist", "lsp-add", "join", "jtor-"+masterNodeName, "--", "set", "logical_switch_port", "jtor-"+masterNodeName, "type=router", "options:router-port=rtoj-"+masterNodeName, "addresses="+"\""+routerMac+"\"")
if err != nil {
logrus.Errorf("Failed to add logical switch port to logical router, stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
return err
}
// Create a lock for gateway-init to co-ordinate.
stdout, stderr, err = util.RunOVNNbctl("--", "set", "nb_global", ".",
"external-ids:gateway-lock=\"\"")
if err != nil {
logrus.Errorf("Failed to create lock for gateways, "+
"stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
return err
}
return nil
}
func (cluster *OvnClusterController) addNode(node *kapi.Node) error {
// Do not create a subnet if the node already has a subnet
hostsubnet, ok := node.Annotations[OvnHostSubnet]
if ok {
// double check if the hostsubnet looks valid
_, _, err := net.ParseCIDR(hostsubnet)
if err == nil {
return nil
}
}
// Create new subnet
sn, err := cluster.masterSubnetAllocator.GetNetwork()
if err != nil {
return fmt.Errorf("Error allocating network for node %s: %v", node.Name, err)
}
err = cluster.Kube.SetAnnotationOnNode(node, OvnHostSubnet, sn.String())
if err != nil {
_ = cluster.masterSubnetAllocator.ReleaseNetwork(sn)
return fmt.Errorf("Error creating subnet %s for node %s: %v", sn.String(), node.Name, err)
}
logrus.Infof("Created HostSubnet %s", sn.String())
return nil
}
func (cluster *OvnClusterController) deleteNode(node *kapi.Node) error {
sub, ok := node.Annotations[OvnHostSubnet]
if !ok {
return fmt.Errorf("Error in obtaining host subnet for node %q for deletion", node.Name)
}
_, subnet, err := net.ParseCIDR(sub)
if err != nil {
return fmt.Errorf("Error in parsing hostsubnet - %v", err)
}
err = cluster.masterSubnetAllocator.ReleaseNetwork(subnet)
if err != nil {
return fmt.Errorf("Error deleting subnet %v for node %q: %v", sub, node.Name, err)
}
logrus.Infof("Deleted HostSubnet %s for node %s", sub, node.Name)
return nil
}
func (cluster *OvnClusterController) watchNodes() error {
_, err := cluster.watchFactory.AddNodeHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*kapi.Node)
logrus.Debugf("Added event for Node %q", node.Name)
err := cluster.addNode(node)
if err != nil {
logrus.Errorf("error creating subnet for node %s: %v", node.Name, err)
}
},
UpdateFunc: func(old, new interface{}) {},
DeleteFunc: func(obj interface{}) {
node := obj.(*kapi.Node)
logrus.Debugf("Delete event for Node %q", node.Name)
err := cluster.deleteNode(node)
if err != nil {
logrus.Errorf("Error deleting node %s: %v", node.Name, err)
}
err = util.RemoveNode(node.Name)
if err != nil {
logrus.Errorf("Failed to remove node %s (%v)", node.Name, err)
}
},
}, nil)
return err
}