forked from projectcalico/felix
-
Notifications
You must be signed in to change notification settings - Fork 0
/
vxlan_resolver.go
566 lines (516 loc) · 19.6 KB
/
vxlan_resolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
// Copyright (c) 2019 Tigera, Inc. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package calc
import (
"crypto/sha1"
"fmt"
gonet "net"
"github.com/sirupsen/logrus"
"github.com/projectcalico/felix/dispatcher"
"github.com/projectcalico/felix/ip"
"github.com/projectcalico/felix/proto"
"github.com/projectcalico/libcalico-go/lib/backend/api"
"github.com/projectcalico/libcalico-go/lib/backend/encap"
"github.com/projectcalico/libcalico-go/lib/backend/model"
"github.com/projectcalico/libcalico-go/lib/net"
"github.com/projectcalico/libcalico-go/lib/set"
)
// VXLANResolver is responsible for resolving node IPs, node config, IPAM blocks,
// and IP pools to determine the correct set of programming to send to the dataplane.
// Specifically, it registers to get updates on the following:
//
// - model.HostIPKey
// - model.HostConfigKey
// - model.BlockKey
// - model.IPPoolKey
//
// VXLAN routes are contributed by blocks within IP pools that have VXLAN enabled, and
// must target a VXLAN tunnel endpoint (VTEP) which comprises a node IP address, VXLAN
// tunnel address, and a deterministically calculated MAC address. The VXLAN resolver
// ensures that routes are only sent to the data plane when a fully specified VTEP
// exists.
//
// For each VTEP, this component will send a *proto.VXLANTunnelEndpointUpdate followed by
// a *proto.RouteUpdate for each route which targets that VTEP. As routes are added and
// removed, subsequent *proto.RouteUpdate and *proto.RouteRemove messages will be sent.
//
// If a VTEP is no longer fully specified (e.g., due to a vxlan tunnel address removal),
// a *proto.RouteRemove message will be sent for each route targeting that VTEP, followed
// by a *proto.VXLANTunnelEndpointRemove message for the VTEP itself.
//
// If a VTEP changes (e.g., due to a vxlan tunnel address changing), this component will treat
// it as a delete followed by an add.
type VXLANResolver struct {
hostname string
callbacks PipelineCallbacks
// Store node metadata indexed by node name, and routes by the
// block that contributed them. The following comprises the full internal data model.
nodeNameToVXLANTunnelAddr map[string]string
nodeNameToIPAddr map[string]string
nodeNameToVXLANMac map[string]string
blockToRoutes map[string]set.Set
vxlanPools map[string]model.IPPool
}
func NewVXLANResolver(hostname string, callbacks PipelineCallbacks) *VXLANResolver {
return &VXLANResolver{
hostname: hostname,
callbacks: callbacks,
nodeNameToVXLANTunnelAddr: map[string]string{},
nodeNameToIPAddr: map[string]string{},
nodeNameToVXLANMac: map[string]string{},
blockToRoutes: map[string]set.Set{},
vxlanPools: map[string]model.IPPool{},
}
}
func (c *VXLANResolver) RegisterWith(allUpdDispatcher *dispatcher.Dispatcher) {
allUpdDispatcher.Register(model.HostIPKey{}, c.OnHostIPUpdate)
allUpdDispatcher.Register(model.HostConfigKey{}, c.OnHostConfigUpdate)
allUpdDispatcher.Register(model.BlockKey{}, c.OnBlockUpdate)
allUpdDispatcher.Register(model.IPPoolKey{}, c.OnPoolUpdate)
}
func (c *VXLANResolver) OnBlockUpdate(update api.Update) (_ bool) {
// Update the routes map based on the provided block update.
key := update.Key.String()
deletes := set.New()
adds := set.New()
if update.Value != nil {
// Block has been created or updated.
// We don't allow multiple blocks with the same CIDR, so no need to check
// for duplicates here. Look at the routes contributed by this block and determine if we
// need to send any updates.
newRoutes := c.routesFromBlock(key, update.Value.(*model.AllocationBlock))
cachedRoutes, ok := c.blockToRoutes[key]
if !ok {
cachedRoutes = set.New()
c.blockToRoutes[key] = cachedRoutes
}
// Now scan the old routes, looking for any that are no-longer associated with the block.
// Remove no longer active routes from the cache and queue up deletions.
cachedRoutes.Iter(func(item interface{}) error {
r := item.(vxlanRoute)
// For each existing route which is no longer present, we need to delete it.
// Note: since r.Key() only contains the destination, we need to check equality too in case
// the gateway has changed.
if newRoute, ok := newRoutes[r.Key()]; ok && newRoute == r {
// Exists, and we want it to - nothing to do.
return nil
}
// Current route is not in new set - we need to withdraw the route, and also
// remove it from internal state.
deletes.Add(r)
return set.RemoveItem
})
// Now scan the new routes, looking for additions. Cache them and queue up adds.
for _, r := range newRoutes {
logCxt := logrus.WithField("newRoute", r)
if cachedRoutes.Contains(r) {
logCxt.Debug("Desired VXLAN route already exists, skip")
continue
}
cachedRoutes.Add(r)
adds.Add(r)
}
// At this point we've determined the correct diff to perform based on the block update.
// Delete any routes which are gone for good, withdraw modified routes, and send updates for
// new ones.
deletes.Iter(func(item interface{}) error {
c.withdrawRoute(item.(vxlanRoute))
return nil
})
c.kickPendingRoutes(adds)
} else {
// Block has been deleted. Clean up routes that were contributed by this block.
routes := c.blockToRoutes[key]
if routes != nil {
routes.Iter(func(item interface{}) error {
c.withdrawRoute(item.(vxlanRoute))
return nil
})
}
delete(c.blockToRoutes, key)
}
return
}
// OnHostIPUpdate gets called whenever a node IP address changes. On an add/update,
// we need to check if there are VTEPs or routes which are now valid, and trigger programming
// of them to the data plane. On a delete, we need to withdraw any routes and VTEPs associated
// with the node.
func (c *VXLANResolver) OnHostIPUpdate(update api.Update) (_ bool) {
nodeName := update.Key.(model.HostIPKey).Hostname
logCxt := logrus.WithField("node", nodeName)
pendingSet, sentSet := c.routeSets()
vtepSent := c.vtepSent(nodeName)
if update.Value != nil {
// Host IP updated or added. If it was added, we should check to see if we're ready
// to send a VTEP and associated routes. If we already knew about this one, we need to
// see if it has changed. If it has, we should remove and reprogram the VTEP and routes.
newIP := update.Value.(*net.IP).String()
currIP := c.nodeNameToIPAddr[nodeName]
logCxt = logCxt.WithFields(logrus.Fields{"newIP": newIP, "currIP": currIP})
if vtepSent {
if currIP == newIP {
// If we've already handled this node, there's nothing to do. Deduplicate.
logCxt.Debug("Skipping duplicate node IP update")
return
}
// We've already sent a VTEP for this node, and the node's IP address has changed.
// We need to revoke it and any routes before sending any updates.
logCxt.Info("Withdrawing routes and VTEP, node changed IP address")
sentSet.Iter(func(item interface{}) error {
r := item.(vxlanRoute)
if r.node == nodeName {
c.withdrawRoute(r)
pendingSet.Add(r)
}
return nil
})
c.sendVTEPRemove(nodeName)
}
// Try sending a VTEP update. If we do, this will trigger a kick of any
// pending routes which might now be ready to send.
c.nodeNameToIPAddr[nodeName] = newIP
if c.sendVTEPUpdate(nodeName) {
// We've successfully sent a new VTEP - check to see if any pending routes
// are now ready to be programmed.
logCxt.Info("Sent VTEP to dataplane, check pending routes")
c.kickPendingRoutes(pendingSet)
}
} else {
// Withdraw any routes which target this VTEP, followed by the VTEP itself.
logCxt.Info("Withdrawing routes and VTEP, node IP address deleted")
delete(c.nodeNameToIPAddr, nodeName)
sentSet.Iter(func(item interface{}) error {
r := item.(vxlanRoute)
if r.node == nodeName {
c.withdrawRoute(r)
}
return nil
})
c.sendVTEPRemove(nodeName)
}
return
}
// OnHostConfigUpdate gets called whenever a node's host config changes. We only care about
// VXLAN tunnel address updates. On an add/update, we need to check if there are VTEPs or routes which
// are now valid, and trigger programming of them to the data plane. On a delete, we need to withdraw any
// routes and VTEPs associated with the node.
func (c *VXLANResolver) OnHostConfigUpdate(update api.Update) (_ bool) {
pendingSet, sentSet := c.routeSets()
switch update.Key.(model.HostConfigKey).Name {
case "IPv4VXLANTunnelAddr":
nodeName := update.Key.(model.HostConfigKey).Hostname
vtepSent := c.vtepSent(nodeName)
logCxt := logrus.WithField("node", nodeName).WithField("value", update.Value)
logCxt.Debug("IPv4VXLANTunnelAddr update")
if update.Value != nil {
// Update for a VXLAN tunnel address.
newIP := update.Value.(string)
currIP := c.nodeNameToVXLANTunnelAddr[nodeName]
logCxt = logCxt.WithFields(logrus.Fields{"newIP": newIP, "currIP": currIP})
if vtepSent {
if currIP == newIP {
// If we've already handled this node, there's nothing to do. Deduplicate.
logCxt.Debug("Skipping duplicate tunnel addr update")
return
}
// We've already sent a VTEP for this node, and the node's IP address has changed.
// We need to revoke it and any routes before sending any updates.
logCxt.Info("Withdrawing routes and VTEP, node changed tunnel address")
sentSet.Iter(func(item interface{}) error {
r := item.(vxlanRoute)
if r.node == nodeName {
c.withdrawRoute(r)
pendingSet.Add(r)
}
return nil
})
c.sendVTEPRemove(nodeName)
}
// Try sending a VTEP update. If we do, this will trigger a kick of any
// pending routes which might now be ready to send.
c.nodeNameToVXLANTunnelAddr[nodeName] = newIP
if c.sendVTEPUpdate(nodeName) {
// We've successfully sent a new VTEP - check to see if any pending routes
// are now ready to be programmed.
logCxt.Info("Sent VTEP to dataplane, check pending routes")
c.kickPendingRoutes(pendingSet)
}
} else {
// Withdraw any routes which target this VTEP, followed by the VTEP itself.
logCxt.Info("Withdrawing routes and VTEP, node tunnel address deleted")
delete(c.nodeNameToVXLANTunnelAddr, nodeName)
sentSet.Iter(func(item interface{}) error {
r := item.(vxlanRoute)
if r.node == nodeName {
c.withdrawRoute(r)
}
return nil
})
c.sendVTEPRemove(nodeName)
}
case "VXLANTunnelMACAddr":
nodeName := update.Key.(model.HostConfigKey).Hostname
vtepSent := c.vtepSent(nodeName)
logCxt := logrus.WithField("node", nodeName).WithField("value", update.Value)
logCxt.Debug("VXLANTunnelMACAddr update")
if update.Value != nil {
// Update for a VXLAN tunnel MAC address.
newMAC := update.Value.(string)
currMAC := c.vtepMACForHost(nodeName)
logCxt = logCxt.WithFields(logrus.Fields{"newMAC": newMAC, "currMAC": currMAC})
c.nodeNameToVXLANMac[nodeName] = newMAC
if vtepSent {
if currMAC == newMAC {
// If we've already handled this node, there's nothing to do. Deduplicate.
logCxt.Debug("Skipping duplicate tunnel MAC addr update")
return
}
// Try sending a VTEP update.
if c.sendVTEPUpdate(nodeName) {
// We've successfully sent a new VTEP
logCxt.Info("Sent VTEP to dataplane")
}
}
} else {
logCxt.Info("Update the VTEP with the system generated MAC address and send it to dataplane")
delete(c.nodeNameToVXLANMac, nodeName)
if c.sendVTEPUpdate(nodeName) {
// We've successfully sent a new VTEP
logCxt.Info("Sent VTEP to dataplane")
}
}
}
return
}
// OnPoolUpdate gets called whenever an IP pool changes. If a new VXLAN pool is added, kick
// pending routes to see if any should now be programmed. If a VXLAN pool is removed, then
// find any routes which need to be withdrawn and remove them.
func (c *VXLANResolver) OnPoolUpdate(update api.Update) (_ bool) {
k := update.Key.(model.IPPoolKey)
pendingSet, sentSet := c.routeSets()
if update.Value != nil && update.Value.(*model.IPPool).VXLANMode != encap.Undefined {
// This is an add/update of a pool with VXLAN enabled.
logrus.WithField("pool", k.CIDR).Info("Update of VXLAN-enabled IP pool.")
if curr, ok := c.vxlanPools[k.String()]; ok {
// We already know about this IP pool. Check to see if the CIDR has changed.
// While this isn't possible directly in the user-facing API, it's possible that
// we see a delete/recreate as an update over the Syncer in rare cases.
if curr.CIDR.String() == update.Value.(*model.IPPool).CIDR.String() {
return
}
// If the CIDR has changed, treat this as a delete followed by a re-create
// with the new CIDR. Iterate through sent routes and withdraw any within
// the old CIDR.
delete(c.vxlanPools, k.String())
sentSet.Iter(func(item interface{}) error {
r := item.(vxlanRoute)
if !c.containsRoute(curr, r) {
c.withdrawRoute(r)
}
return nil
})
}
// This is a new VXLAN pool - update the cache and trigger a kick of pending routes.
logrus.WithField("pool", k.CIDR).Info("New/Updated VXLAN IP pool")
c.vxlanPools[k.String()] = *update.Value.(*model.IPPool)
c.kickPendingRoutes(pendingSet)
} else if curr, ok := c.vxlanPools[k.String()]; ok {
// A VXLAN pool has either been deleted, or no longer has VXLAN enabled.
// Withdraw any routes within the IP pool and remove internal state.
logrus.WithField("pool", k.CIDR).Info("Removed VXLAN IP pool")
sentSet.Iter(func(item interface{}) error {
r := item.(vxlanRoute)
if c.containsRoute(curr, r) {
c.withdrawRoute(r)
}
return nil
})
delete(c.vxlanPools, k.String())
} else {
logrus.WithField("pool", k.CIDR).Debug("Ignoring non-VXLAN IP pool")
}
return
}
func (c *VXLANResolver) containsRoute(pool model.IPPool, r vxlanRoute) bool {
return pool.CIDR.Contains(r.dst.ToIPNet().IP)
}
// routeSets returns the subset of routes we know about which haven't been
// sent to the dataplane, as well as the subset that has. It does this by
// calculating whether or not each route should have been sent, given the current state.
func (c *VXLANResolver) routeSets() (pending, sent set.Set) {
pending = set.New()
sent = set.New()
for _, routes := range c.blockToRoutes {
routes.Iter(func(item interface{}) error {
if !c.routeReady(item.(vxlanRoute)) {
pending.Add(item.(vxlanRoute))
} else {
sent.Add(item.(vxlanRoute))
}
return nil
})
}
return
}
// vtepSent returns whether or not we should have sent the VTEP for the given node
// based on our current internal state.
func (c *VXLANResolver) vtepSent(node string) bool {
if _, ok := c.nodeNameToVXLANTunnelAddr[node]; !ok {
return false
}
if _, ok := c.nodeNameToIPAddr[node]; !ok {
return false
}
return true
}
// routeReady returns true if the route is ready to be sent to the data plane, and
// false otherwise.
func (c *VXLANResolver) routeReady(r vxlanRoute) bool {
logCxt := logrus.WithField("route", r)
gw := c.determineGatewayForRoute(r)
if gw == "" {
logCxt.Debug("No gateway yet for VXLAN route, skip")
return false
}
if !c.routeWithinVXLANPool(r) {
logCxt.Debug("Route not within VXLAN IP pool")
return false
}
if !c.vtepSent(r.node) {
logCxt.Debug("Don't yet know the VTEP for this route")
return false
}
return true
}
// kickPendingRoutes loops through the provided routes to see if there are any which are now programmable,
// and will send any that are.
func (c *VXLANResolver) kickPendingRoutes(pendingRouteUpdates set.Set) {
pendingRouteUpdates.Iter(func(item interface{}) error {
r := item.(vxlanRoute)
logCxt := logrus.WithField("route", r)
if !c.routeReady(r) {
return nil
}
logCxt.Info("Sending VXLAN route update")
c.callbacks.OnRouteUpdate(&proto.RouteUpdate{
Type: proto.RouteType_VXLAN,
Node: r.node,
Dst: r.dst.String(),
Gw: c.determineGatewayForRoute(r),
})
return nil
})
}
// withdrawRoute will send a *proto.RouteRemove for the given route.
func (c *VXLANResolver) withdrawRoute(r vxlanRoute) {
logrus.WithField("route", r).Info("Sending VXLAN route remove")
c.callbacks.OnRouteRemove(r.dst.String())
}
func (c *VXLANResolver) sendVTEPUpdate(node string) bool {
logCxt := logrus.WithField("node", node)
tunlAddr, ok := c.nodeNameToVXLANTunnelAddr[node]
if !ok {
logCxt.Info("Missing vxlan tunnel address for node, cannot send VTEP yet")
return false
}
parentDeviceIP, ok := c.nodeNameToIPAddr[node]
if !ok {
logCxt.Info("Missing IP for node, cannot send VTEP yet")
return false
}
logCxt.Debug("Sending VTEP to dataplane")
vtep := &proto.VXLANTunnelEndpointUpdate{
Node: node,
ParentDeviceIp: parentDeviceIP,
Mac: c.vtepMACForHost(node),
Ipv4Addr: tunlAddr,
}
c.callbacks.OnVTEPUpdate(vtep)
return true
}
func (c *VXLANResolver) sendVTEPRemove(node string) {
logrus.WithField("node", node).Debug("Withdrawing VTEP from dataplane")
c.callbacks.OnVTEPRemove(node)
}
// routeWithinVXLANPool checks if the provided route is within a configured IP pool with
// VXLAN enabled.
func (c *VXLANResolver) routeWithinVXLANPool(r vxlanRoute) bool {
for _, pool := range c.vxlanPools {
if c.containsRoute(pool, r) {
return true
}
}
return false
}
// routesFromBlock returns a list of routes which should exist based on the provided
// allocation block.
func (c *VXLANResolver) routesFromBlock(blockKey string, b *model.AllocationBlock) map[string]vxlanRoute {
routes := make(map[string]vxlanRoute)
for _, alloc := range b.NonAffineAllocations() {
if alloc.Host == "" {
logrus.WithField("IP", alloc.Addr).Warn(
"Unable to create VXLAN route for IP; the node it belongs to was not recorded")
continue
}
r := vxlanRoute{
dst: ip.CIDRFromNetIP(alloc.Addr.IP),
node: alloc.Host,
}
routes[r.Key()] = r
}
host := b.Host()
if host == c.hostname {
logrus.Debug("Skipping VXLAN routes for local node")
} else if host != "" {
logrus.WithField("host", host).Debug("Block has a host, including host route")
r := vxlanRoute{
dst: ip.CIDRFromCalicoNet(b.CIDR),
node: host,
}
routes[r.Key()] = r
}
return routes
}
// determineGatewayForRoute determines which gateway to use for this route. For VXLAN routes, the
// gateway is the remote node's IPv4VXLANTunnelAddr. If we don't know the remote node's tunnel
// address, this function will return an empty string.
func (c *VXLANResolver) determineGatewayForRoute(r vxlanRoute) string {
return c.nodeNameToVXLANTunnelAddr[r.node]
}
// vtepMACForHost checks if there is new MAC present in host config.
// If new MAC is present in host config, then vtepMACForHost returns the MAC present in host config else
// vtepMACForHost calculates a deterministic MAC address based on the provided host.
// The returned address matches the address assigned to the VXLAN device on that node.
func (c *VXLANResolver) vtepMACForHost(nodename string) string {
mac := c.nodeNameToVXLANMac[nodename]
if mac != "" {
return mac
}
hasher := sha1.New()
hasher.Write([]byte(nodename))
sha := hasher.Sum(nil)
var hw gonet.HardwareAddr
hw = gonet.HardwareAddr(append([]byte("f"), sha[0:5]...))
return hw.String()
}
// vxlanRoute is the VXLANResolver's internal representation of a route.
type vxlanRoute struct {
node string
dst ip.CIDR
}
func (r vxlanRoute) Key() string {
return r.dst.String()
}
func (r vxlanRoute) String() string {
return fmt.Sprintf("vxlanRoute(dst: %s, node: %s)", r.dst.String(), r.node)
}