Skip to content

Commit

Permalink
migrating route manager to the new overlay network and direct routing
Browse files Browse the repository at this point in the history
  • Loading branch information
alacuku committed Jun 19, 2021
1 parent 3c09091 commit 485857b
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 79 deletions.
56 changes: 49 additions & 7 deletions cmd/liqonet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"sync"
"time"

"github.com/liqotech/liqo/pkg/liqonet/utils"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand All @@ -36,13 +38,21 @@ import (
"github.com/liqotech/liqo/internal/liqonet/tunnelEndpointCreator"
liqoconst "github.com/liqotech/liqo/pkg/consts"
liqonetOperator "github.com/liqotech/liqo/pkg/liqonet"
"github.com/liqotech/liqo/pkg/liqonet/overlay"
"github.com/liqotech/liqo/pkg/liqonet/wireguard"
"github.com/liqotech/liqo/pkg/mapperUtils"
// +kubebuilder:scaffold:imports
)

var (
scheme = runtime.NewScheme()
scheme = runtime.NewScheme()
vxlanConfig = &overlay.VxlanDeviceAttrs{
Vni: 18952,
Name: "liqo.vxlan",
VtepPort: 4789,
VtepAddr: nil,
Mtu: 1420,
}
)

func init() {
Expand Down Expand Up @@ -76,22 +86,54 @@ func main() {
clientset := kubernetes.NewForConfigOrDie(mgr.GetConfig())
switch runAs {
case route_operator.OperatorName:
wgc, err := wireguard.NewWgClient()
// Get the pod ip and parse to net.IP
podIP, err := utils.GetPodIP()
if err != nil {
klog.Errorf("an error occurred while creating wireguard client: %v", err)
klog.Errorf("unable to get podIP: %v", err)
os.Exit(1)
}
r, err := route_operator.NewRouteController(mgr, wgc, wireguard.NewNetLinker())

nodeName, err := utils.GetNodeName()
if err != nil {
klog.Errorf("an error occurred while creating the route operator -> %v", err)
klog.Errorf("unable to get node name: %v", err)
os.Exit(1)
}
r.StartPodWatcher()
r.StartServiceWatcher()

vxlanConfig.VtepAddr = podIP
vxlanDevice, err := overlay.NewVxlanDevice(vxlanConfig)
if err != nil {
klog.Errorf("an error occurred while creating vxlan device : %v", err)
os.Exit(2)
}
r, err := route_operator.NewRouteController(mgr, *vxlanDevice)
if err != nil {
klog.Errorf("an error occurred while creating the route operator -> %v", err)
}
if err = r.SetupWithManager(mgr); err != nil {
klog.Errorf("unable to setup controller: %s", err)
os.Exit(1)
}
mutex := &sync.RWMutex{}
nodeMap := map[string]string{}
ovc, err := route_operator.NewOverlayController("", podIP.String(), route_operator.PodLabelSelector, *vxlanDevice, mutex, nodeMap, mgr.GetClient())
if err != nil {
klog.Errorf("an error occurred while creating overlay controller: %v", err)
os.Exit(3)
}
if err = ovc.SetupWithManager(mgr); err != nil {
klog.Errorf("unable to setup overlay controller: %s", err)
os.Exit(1)
}

smc, err := route_operator.NewSymmetricRoutingOperator(nodeName, route_operator.RoutingTableID, *vxlanDevice, mutex, nodeMap, mgr.GetClient())
if err != nil {
klog.Errorf("an error occurred while creting symmetric routing controller: %v", err)
os.Exit(4)
}
if err = smc.SetupWithManager(mgr); err != nil {
klog.Errorf("unable to setup overlay controller: %s", err)
os.Exit(1)
}
if err := mgr.Start(r.SetupSignalHandlerForRouteOperator()); err != nil {
klog.Errorf("unable to start controller: %s", err)
os.Exit(1)
Expand Down
10 changes: 10 additions & 0 deletions deployments/liqo/files/liqo-route-ClusterRole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ rules:
- nodes
verbs:
- get
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- net.liqo.io
resources:
Expand Down
10 changes: 0 additions & 10 deletions deployments/liqo/files/liqo-route-Role.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
rules:
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
Expand Down
49 changes: 44 additions & 5 deletions internal/liqonet/route-operator/overlayOperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"net"
"sync"
"syscall"

"github.com/vishvananda/netlink"

corev1 "k8s.io/api/core/v1"
k8sApiErrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -160,25 +163,58 @@ func (ovc *OverlayController) addPeer(req ctrl.Request, pod *corev1.Pod) (bool,
if err != nil {
return added, err
}
// This entry is needed for broadcast and multicast
// traffic (e.g. ARP and IPv6 neighbor discovery).
macZeros, err := net.ParseMAC("00:00:00:00:00:00")
if err != nil {
return false, err
}
peerZero := overlay.Neighbor{
MAC: macZeros,
IP: ip,
}
addedZeros, err := ovc.vxlanDev.AddFDB(peerZero)
if err != nil {
return false, err
}
ovc.vxlanPeers[req.String()] = &peer
ovc.vxlanNodes[pod.Spec.NodeName] = peerIP
ovc.podToNode[req.String()] = pod.Spec.NodeName
return added, nil
if added || addedZeros {
return true, nil
}
return false, nil
}

// delPeer for a given pod it removes the fdb entry for the current vxlan device.
// It return true when the entry exists and is removed, false if the entry does not exist,
// delPeer for a given pod it removes all the fdb entries for the current peer on the vxlan device.
// It return true when entries exist and are removed, false if entries do not exist,
// and error if something goes wrong.
func (ovc *OverlayController) delPeer(req ctrl.Request) (bool, error) {
var deleted bool
ovc.nodesLock.Lock()
defer ovc.nodesLock.Unlock()
peer, ok := ovc.vxlanPeers[req.String()]
if !ok {
return false, nil
}
deleted, err := ovc.vxlanDev.DelFDB(*peer)
// First we list all the fdbs
fdbs, err := netlink.NeighList(ovc.vxlanDev.Link.Index, syscall.AF_BRIDGE)
if err != nil {
return deleted, err
return false, err
}
// Check if the entry exists and remove them.
for i := range fdbs {
if fdbs[i].IP.Equal(peer.IP) {
deleted, err = ovc.vxlanDev.DelFDB(overlay.Neighbor{
MAC: fdbs[i].HardwareAddr,
IP: fdbs[i].IP,
})
if err != nil {
return deleted, err
}
klog.V(4).Infof("fdb entry with mac {%s} and dst {%s} on device {%s} has been removed",
fdbs[i].HardwareAddr.String(), fdbs[i].IP.String(), ovc.vxlanDev.Link.Name)
}
}
delete(ovc.vxlanPeers, req.String())
delete(ovc.vxlanNodes, ovc.podToNode[req.String()])
Expand All @@ -191,6 +227,9 @@ func (ovc *OverlayController) delPeer(req ctrl.Request) (bool, error) {
// annotation is already present.
func (ovc *OverlayController) addAnnotation(obj client.Object, aKey, aValue string) bool {
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string, 1)
}
oldAnnValue, ok := annotations[aKey]
// If the annotations does not exist or is outdated then set it.
if !ok || oldAnnValue != aValue {
Expand Down
54 changes: 47 additions & 7 deletions internal/liqonet/route-operator/overlayOperator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ var (
overlayNamespace = "overlay-namespace"
overlayPodName = "overlay-test-pod"

overlayTestPod *corev1.Pod
overlayReq ctrl.Request
ovc *OverlayController
overlayNeigh overlay.Neighbor
overlayExistingNeigh overlay.Neighbor
overlayTestPod *corev1.Pod
overlayReq ctrl.Request
ovc *OverlayController
overlayNeigh overlay.Neighbor
overlayExistingNeigh overlay.Neighbor
overlayExistingNeighDef overlay.Neighbor
/*** EnvTest Section ***/
overlayScheme = runtime.NewScheme()
overlayEnvTest *envtest.Environment
Expand Down Expand Up @@ -92,7 +93,9 @@ var _ = Describe("OverlayOperator", func() {
vxlanNodes: map[string]string{},
podToNode: map[string]string{},
}
// Add fdb entries for existing peer.
Expect(addFdb(overlayExistingNeigh, vxlanDevice.Link.Attrs().Index))
Expect(addFdb(overlayExistingNeighDef, vxlanDevice.Link.Attrs().Index)).Should(BeNil())
})

JustAfterEach(func() {
Expand Down Expand Up @@ -165,7 +168,16 @@ var _ = Describe("OverlayOperator", func() {
Eventually(func() error { return k8sClient.Get(context.TODO(), overlayReq.NamespacedName, newPod) }).Should(BeNil())
newPod.Status.PodIP = "10.1.11.1"
Eventually(func() error { return k8sClient.Status().Update(context.TODO(), newPod) }).Should(BeNil())
Eventually(func() error { return k8sClient.Get(context.TODO(), overlayReq.NamespacedName, newPod) }).Should(BeNil())
Eventually(func() error {
err := k8sClient.Get(context.TODO(), overlayReq.NamespacedName, newPod)
if err != nil {
return err
}
if newPod.Status.PodIP != "10.1.11.1" {
return fmt.Errorf("pod ip has not been set yet")
}
return nil
}).Should(BeNil())
Eventually(func() error { _, err := ovc.Reconcile(context.TODO(), overlayReq); return err }).Should(BeNil())
_, ok := ovc.vxlanPeers[overlayReq.String()]
Expect(ok).Should(BeTrue())
Expand Down Expand Up @@ -244,6 +256,27 @@ var _ = Describe("OverlayOperator", func() {
nodeName, ok := ovc.podToNode[overlayReq.String()]
Expect(ok).Should(BeTrue())
Expect(nodeName).Should(Equal(overlayTestPod.Spec.NodeName))
// Check that the fdbs entries have been inserted.
fdbs, err := netlink.NeighList(ovc.vxlanDev.Link.Index, syscall.AF_BRIDGE)
Expect(err).To(BeNil())
var checkEntries = func() bool {
var fdb1, fdb2 bool
for _, f := range fdbs {
if f.HardwareAddr.String() == overlayTestPod.GetAnnotations()[overlayAnnKey] {
fdb1 = true
}
}
for _, f := range fdbs {
if f.HardwareAddr.String() == "00:00:00:00:00:00" && f.IP.String() == overlayPodIP {
fdb2 = true
}
}
if fdb2 && fdb1 {
return true
}
return false
}
Expect(checkEntries()).Should(BeTrue())
})
})

Expand All @@ -257,6 +290,10 @@ var _ = Describe("OverlayOperator", func() {
Expect(added).Should(BeFalse())
_, ok := ovc.vxlanPeers[overlayReq.String()]
Expect(ok).Should(BeTrue())
//Check that the entries are only two.
fdbs, err := netlink.NeighList(ovc.vxlanDev.Link.Index, syscall.AF_BRIDGE)
Expect(err).To(BeNil())
Expect(len(fdbs)).Should(BeNumerically("==", 2))

})
})
Expand Down Expand Up @@ -291,7 +328,10 @@ var _ = Describe("OverlayOperator", func() {
//Check that we remove the tuple: (req.string, nodeName)
_, ok = ovc.podToNode[overlayReq.String()]
Expect(ok).Should(BeFalse())

//Check that the entries have been removed.
fdbs, err := netlink.NeighList(ovc.vxlanDev.Link.Index, syscall.AF_BRIDGE)
Expect(err).To(BeNil())
Expect(len(fdbs)).Should(BeNumerically("==", 0))
})
})
})
Expand Down

0 comments on commit 485857b

Please sign in to comment.