Skip to content

Commit

Permalink
migrating route manager to the new overlay network and direct routing
Browse files Browse the repository at this point in the history
  • Loading branch information
alacuku committed Jun 19, 2021
1 parent 3c09091 commit 1b1453b
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 44 deletions.
49 changes: 42 additions & 7 deletions cmd/liqonet/main.go
Expand Up @@ -21,6 +21,8 @@ import (
"sync"
"time"

"github.com/liqotech/liqo/pkg/liqonet/utils"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand All @@ -36,13 +38,21 @@ import (
"github.com/liqotech/liqo/internal/liqonet/tunnelEndpointCreator"
liqoconst "github.com/liqotech/liqo/pkg/consts"
liqonetOperator "github.com/liqotech/liqo/pkg/liqonet"
"github.com/liqotech/liqo/pkg/liqonet/overlay"
"github.com/liqotech/liqo/pkg/liqonet/wireguard"
"github.com/liqotech/liqo/pkg/mapperUtils"
// +kubebuilder:scaffold:imports
)

var (
scheme = runtime.NewScheme()
scheme = runtime.NewScheme()
vxlanConfig = &overlay.VxlanDeviceAttrs{
Vni: 18952,
Name: "liqo.vxlan",
VtepPort: 4789,
VtepAddr: nil,
Mtu: 1420,
}
)

func init() {
Expand Down Expand Up @@ -76,22 +86,47 @@ func main() {
clientset := kubernetes.NewForConfigOrDie(mgr.GetConfig())
switch runAs {
case route_operator.OperatorName:
wgc, err := wireguard.NewWgClient()
// Get the pod ip and parse to net.IP
podIP, err := utils.GetPodIP()
if err != nil {
klog.Errorf("an error occurred while creating wireguard client: %v", err)
klog.Errorf("unable to get podIP: %v", err)
os.Exit(1)
}
r, err := route_operator.NewRouteController(mgr, wgc, wireguard.NewNetLinker())
vxlanConfig.VtepAddr = podIP
vxlanDevice, err := overlay.NewVxlanDevice(vxlanConfig)
if err != nil {
klog.Errorf("an error occurred while creating vxlan device : %v", err)
os.Exit(2)
}
r, err := route_operator.NewRouteController(mgr, *vxlanDevice)
if err != nil {
klog.Errorf("an error occurred while creating the route operator -> %v", err)
os.Exit(1)
}
r.StartPodWatcher()
r.StartServiceWatcher()
if err = r.SetupWithManager(mgr); err != nil {
klog.Errorf("unable to setup controller: %s", err)
os.Exit(1)
}
mutex := &sync.RWMutex{}
nodeMap := map[string]string{}
ovc, err := route_operator.NewOverlayController("", podIP.String(), route_operator.PodLabelSelector, *vxlanDevice, mutex, nodeMap, mgr.GetClient())
if err != nil {
klog.Errorf("an error occurred while creating overlay controller: %v", err)
os.Exit(3)
}
if err = ovc.SetupWithManager(mgr); err != nil {
klog.Errorf("unable to setup overlay controller: %s", err)
os.Exit(1)
}

smc, err := route_operator.NewSymmetricRoutingOperator("", route_operator.RoutingTableID, *vxlanDevice, mutex, nodeMap, mgr.GetClient())
if err != nil {
klog.Errorf("an error occurred while creting symmetric routing controller: %v", err)
os.Exit(4)
}
if err = smc.SetupWithManager(mgr); err != nil {
klog.Errorf("unable to setup overlay controller: %s", err)
os.Exit(1)
}
if err := mgr.Start(r.SetupSignalHandlerForRouteOperator()); err != nil {
klog.Errorf("unable to start controller: %s", err)
os.Exit(1)
Expand Down
10 changes: 10 additions & 0 deletions deployments/liqo/files/liqo-route-ClusterRole.yaml
Expand Up @@ -24,6 +24,16 @@ rules:
- nodes
verbs:
- get
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- net.liqo.io
resources:
Expand Down
10 changes: 0 additions & 10 deletions deployments/liqo/files/liqo-route-Role.yaml
@@ -1,14 +1,4 @@
rules:
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
Expand Down
3 changes: 3 additions & 0 deletions internal/liqonet/route-operator/overlayOperator.go
Expand Up @@ -191,6 +191,9 @@ func (ovc *OverlayController) delPeer(req ctrl.Request) (bool, error) {
// annotation is already present.
func (ovc *OverlayController) addAnnotation(obj client.Object, aKey, aValue string) bool {
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string, 1)
}
oldAnnValue, ok := annotations[aKey]
// If the annotations does not exist or is outdated then set it.
if !ok || oldAnnValue != aValue {
Expand Down
63 changes: 36 additions & 27 deletions internal/liqonet/route-operator/routeOperator.go
Expand Up @@ -22,6 +22,8 @@ import (
"strings"
"time"

liqorouting "github.com/liqotech/liqo/pkg/liqonet/routing"

k8sApiErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
Expand All @@ -37,7 +39,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1"
"github.com/liqotech/liqo/pkg/liqonet"
"github.com/liqotech/liqo/pkg/liqonet/overlay"
"github.com/liqotech/liqo/pkg/liqonet/utils"
"github.com/liqotech/liqo/pkg/liqonet/wireguard"
Expand All @@ -48,13 +49,16 @@ var (
result = ctrl.Result{}
// OperatorName holds the name of the route operator.
OperatorName = "liqo-route"
// RoutingTableID number of the custom routing table used by liqo-route.
RoutingTableID = 18952
overlayNetPrefix = "240"
)

// RouteController reconciles a TunnelEndpoint object.
type RouteController struct {
client.Client
record.EventRecorder
liqonet.NetLink
liqorouting.Routing
clientSet *kubernetes.Clientset
nodeName string
namespace string
Expand All @@ -65,7 +69,7 @@ type RouteController struct {
}

// NewRouteController returns a configure route controller ready to be started.
func NewRouteController(mgr ctrl.Manager, wgc wireguard.Client, nl wireguard.Netlinker) (*RouteController, error) {
func NewRouteController(mgr ctrl.Manager, vxlanDevice overlay.VxlanDevice) (*RouteController, error) {
dynClient := dynamic.NewForConfigOrDie(mgr.GetConfig())
clientSet := kubernetes.NewForConfigOrDie(mgr.GetConfig())
// get node name
Expand All @@ -79,33 +83,27 @@ func NewRouteController(mgr ctrl.Manager, wgc wireguard.Client, nl wireguard.Net
klog.Errorf("unable to create the controller: %v", err)
return nil, err
}
nodePodCIDR, err := utils.GetNodePodCIDR(nodeName, clientSet)
if err != nil {
klog.Errorf("unable to create the controller: %v", err)
return nil, err
}

namespace, err := utils.GetPodNamespace()
if err != nil {
klog.Errorf("unable to create the controller: %v", err)
return nil, err
}
overlayIP := strings.Join([]string{overlay.GetOverlayIP(podIP.String()), "4"}, "/")
wg, err := overlay.CreateInterface(nodeName, namespace, overlayIP, clientSet, wgc, nl)
rm, err := liqorouting.NewVxlanRoutingManager(RoutingTableID, podIP.String(), overlayNetPrefix, vxlanDevice)
if err != nil {
klog.Errorf("unable to create the controller: %v", err)
return nil, err
}
r := &RouteController{
Client: mgr.GetClient(),
clientSet: clientSet,
podIP: podIP.String(),
nodePodCIDR: nodePodCIDR,
namespace: namespace,
wg: wg,
nodeName: nodeName,
DynClient: dynClient,
Client: mgr.GetClient(),
Routing: rm,
EventRecorder: mgr.GetEventRecorderFor("liqo-route"),
clientSet: clientSet,
podIP: podIP.String(),
namespace: namespace,
nodeName: nodeName,
DynClient: dynClient,
}
r.setUpRouteManager(mgr.GetEventRecorderFor(strings.Join([]string{OperatorName, nodeName}, "-")))
return r, nil
}

Expand All @@ -117,7 +115,7 @@ func NewRouteController(mgr ctrl.Manager, wgc wireguard.Client, nl wireguard.Net
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get
// role
// +kubebuilder:rbac:groups=core,namespace="do-not-care",resources=secrets,verbs=create;update;patch;get;list;watch;delete
// +kubebuilder:rbac:groups=core,namespace="do-not-care",resources=pods,verbs=update;patch;get;list;watch
// +kubebuilder:rbac:groups=core,resources=pods,verbs=update;patch;get;list;watch
// +kubebuilder:rbac:groups=core,namespace="do-not-care",resources=services,verbs=update;patch;get;list;watch

// Reconcile handle requests on TunnelEndpoint object to create and configure routes on Nodes.
Expand All @@ -134,10 +132,11 @@ func (r *RouteController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
return result, err
}
// Here we check that the tunnelEndpoint resource has been fully processed. If not we do nothing.
if tep.Status.RemoteNATPodCIDR == "" {
if tep.Status.RemoteNATPodCIDR == "" && tep.Status.GatewayIP == "" {
return result, nil
}
clusterID := tep.Spec.ClusterID
_, remotePodCIDR := utils.GetPodCIDRS(&tep)
// examine DeletionTimestamp to determine if object is under deletion
if tep.ObjectMeta.DeletionTimestamp.IsZero() {
if !controllerutil.ContainsFinalizer(&tep, routeOperatorFinalizer) {
Expand All @@ -156,9 +155,16 @@ func (r *RouteController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
// event on the resource to notify the user
// the finalizer is not removed
if controllerutil.ContainsFinalizer(&tep, routeOperatorFinalizer) {
if err := r.RemoveRoutesPerCluster(&tep); err != nil {
deleted, err := r.RemoveRoutesPerCluster(&tep)
if err != nil {
klog.Errorf("%s -> unable to remove route for destination {%s}: %s", clusterID, remotePodCIDR, err)
r.Eventf(&tep, "Warning", "Processing", "unable to remove route: %s", err.Error())
return result, err
}
if deleted {
klog.Infof("%s -> route for destination {%s} correctly removed", clusterID, remotePodCIDR)
r.Eventf(&tep, "Normal", "Processing", "route for destination {%s} correctly removed", remotePodCIDR)
}
// remove the finalizer from the list and update it.
controllerutil.RemoveFinalizer(&tep, routeOperatorFinalizer)
if err := r.Update(ctx, &tep); err != nil {
Expand All @@ -168,9 +174,16 @@ func (r *RouteController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
}
return result, nil
}
if err := r.EnsureRoutesPerCluster(r.wg.GetDeviceName(), &tep); err != nil {
added, err := r.EnsureRoutesPerCluster(&tep)
if err != nil {
klog.Errorf("%s -> unable to configure route for destination {%s}: %s", clusterID, remotePodCIDR, err)
r.Eventf(&tep, "Warning", "Processing", "unable to configure route for destination {%s}: %s", remotePodCIDR, err.Error())
return result, err
}
if added {
klog.Infof("%s -> route for destination {%s} correctly configured", clusterID, remotePodCIDR)
r.Eventf(&tep, "Normal", "Processing", "route for destination {%s} configured", remotePodCIDR)
}
return result, nil
}

Expand All @@ -183,10 +196,6 @@ func (r *RouteController) deleteOverlayIFace() {
}
}

func (r *RouteController) setUpRouteManager(recorder record.EventRecorder) {
r.NetLink = liqonet.NewRouteManager(recorder)
}

// SetupWithManager used to set up the controller with a given manager.
func (r *RouteController) SetupWithManager(mgr ctrl.Manager) error {
resourceToBeProccesedPredicate := predicate.Funcs{
Expand Down

0 comments on commit 1b1453b

Please sign in to comment.