Skip to content

Commit

Permalink
Enable leader election for liqo-gateway controller
Browse files Browse the repository at this point in the history
Add new operator in liqo-gateway that labels the current leader in order to receive network traffic
  • Loading branch information
alacuku committed Jul 9, 2021
1 parent 90e7ef6 commit 327a31d
Show file tree
Hide file tree
Showing 13 changed files with 641 additions and 124 deletions.
119 changes: 90 additions & 29 deletions cmd/liqonet/main.go
Expand Up @@ -26,8 +26,10 @@ import (
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"

clusterConfig "github.com/liqotech/liqo/apis/config/v1alpha1"
netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1"
Expand Down Expand Up @@ -60,34 +62,32 @@ func init() {
}

func main() {
var metricsAddr string
var metricsAddr, runAs string
var enableLeaderElection bool
var runAs string

flag.StringVar(&metricsAddr, "metrics-addr", ":0", "The address the metric endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
leaseDuration := 7 * time.Second
renewDeadLine := 5 * time.Second
retryPeriod := 2 * time.Second
flag.StringVar(&metricsAddr, "metrics-bind-addr", ":0", "The address the metric endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
flag.StringVar(&runAs, "run-as", liqoconst.LiqoGatewayOperatorName,
"The accepted values are: liqo-gateway, liqo-route, tunnelEndpointCreator-operator. The default value is \"liqo-gateway\"")
flag.Parse()
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
MapperProvider: mapperUtils.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: metricsAddr,
LeaderElection: enableLeaderElection,
Port: 9443,
})
if err != nil {
klog.Errorf("unable to get manager: %s", err)
os.Exit(1)
}
clientset := kubernetes.NewForConfigOrDie(mgr.GetConfig())

switch runAs {
case liqoconst.LiqoRouteOperatorName:
mutex := &sync.RWMutex{}
nodeMap := map[string]string{}
// Get the pod ip and parse to net.IP
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
MapperProvider: mapperUtils.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: metricsAddr,
})
if err != nil {
klog.Errorf("unable to get manager: %s", err)
os.Exit(1)
}
// Get the pod ip and parse to net.IP.
podIP, err := utils.GetPodIP()
if err != nil {
klog.Errorf("unable to get podIP: %v", err)
Expand Down Expand Up @@ -141,7 +141,7 @@ func main() {
os.Exit(1)
}
case liqoconst.LiqoGatewayOperatorName:
// Get the pod ip and parse to net.IP
// Get the pod ip and parse to net.IP.
podIP, err := utils.GetPodIP()
if err != nil {
klog.Errorf("unable to get podIP: %v", err)
Expand All @@ -152,7 +152,48 @@ func main() {
klog.Errorf("unable to get pod namespace: %v", err)
os.Exit(1)
}
eventRecorder := mgr.GetEventRecorderFor(liqoconst.LiqoGatewayOperatorName + "." + podIP.String())
// This manager is used for the label operator. The label operator needs to watch the
// gateway pods which are lives in a specific namespace. The main manager on the other
// side needs to have permissions to watch the liqo CRDs in every namespace. To limit
// the permissions needed by the gateway components for the pods resources we use a
// different manager with limited permissions.
labelMgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
MapperProvider: mapperUtils.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Namespace: podNamespace,
// The operator will receive notifications/events only for the pod objects
// that satisfy the LabelSelector.
NewCache: cache.BuilderWithOptions(cache.Options{
SelectorsByObject: tunneloperator.LabelSelector,
}),
})
if err != nil {
klog.Errorf("unable to get manager for the label operator: %s", err)
os.Exit(1)
}
// The mainMgr is the one used for the tunnelendpoints.net.liqo.io CRDs. It needs
// to have cluster wide permissions for tep resources. It has the leader election
// enabled assuring that only one instance is active.
mainMgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
MapperProvider: mapperUtils.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: metricsAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: liqoconst.GatewayLeaderElectionID,
LeaderElectionNamespace: podNamespace,
LeaderElectionReleaseOnCancel: true,
LeaderElectionResourceLock: resourcelock.LeasesResourceLock,
LeaseDuration: &leaseDuration,
RenewDeadline: &renewDeadLine,
RetryPeriod: &retryPeriod,
})
if err != nil {
klog.Errorf("unable to get main manager: %s", err)
os.Exit(1)
}
clientset := kubernetes.NewForConfigOrDie(mainMgr.GetConfig())
eventRecorder := mainMgr.GetEventRecorderFor(liqoconst.LiqoGatewayOperatorName + "." + podIP.String())
// This map is updated by the tunnel operator after a successful tunnel creation
// and is consumed by the natmapping operator to check whether the tunnel is ready or not.
var readyClustersMutex sync.Mutex
Expand All @@ -167,35 +208,56 @@ func main() {
os.Exit(1)
}
klog.Infof("created custom network namespace {%s}", liqoconst.GatewayNetnsName)

labelController := tunneloperator.NewLabelerController(podIP.String(), labelMgr.GetClient())
if err = labelController.SetupWithManager(labelMgr); err != nil {
klog.Errorf("unable to setup labeler controller: %s", err)
os.Exit(1)
}
tunnelController, err := tunneloperator.NewTunnelController(podIP.String(),
podNamespace, eventRecorder, clientset, mgr.GetClient(), &readyClustersMutex,
podNamespace, eventRecorder, clientset, mainMgr.GetClient(), &readyClustersMutex,
readyClusters, gatewayNetns)
if err != nil {
klog.Errorf("an error occurred while creating the tunnel controller: %v", err)
_ = tunnelController.CleanUpConfiguration(liqoconst.GatewayNetnsName, liqoconst.HostVethName)
tunnelController.RemoveAllTunnels()
os.Exit(1)
}
if err = tunnelController.SetupWithManager(mgr); err != nil {
if err = tunnelController.SetupWithManager(mainMgr); err != nil {
klog.Errorf("unable to setup tunnel controller: %s", err)
os.Exit(1)
}

nmc, err := tunneloperator.NewNatMappingController(mgr.GetClient(), &readyClustersMutex, readyClusters, gatewayNetns)
natMappingController, err := tunneloperator.NewNatMappingController(mainMgr.GetClient(), &readyClustersMutex,
readyClusters, gatewayNetns)
if err != nil {
klog.Errorf("an error occurred while creating the natmapping controller: %v", err)
os.Exit(1)
}
if err = nmc.SetupWithManager(mgr); err != nil {
if err = natMappingController.SetupWithManager(mainMgr); err != nil {
klog.Errorf("unable to setup natmapping controller: %s", err)
os.Exit(1)
}

if err := mainMgr.Add(labelMgr); err != nil {
klog.Errorf("unable to merge managers: %s", err)
os.Exit(1)
}
klog.Info("Starting manager as Tunnel-Operator")
if err := mgr.Start(tunnelController.SetupSignalHandlerForTunnelOperator()); err != nil {
if err := mainMgr.Start(tunnelController.SetupSignalHandlerForTunnelOperator()); err != nil {
klog.Errorf("unable to start tunnel controller: %s", err)
os.Exit(1)
}
case "tunnelEndpointCreator-operator":
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
MapperProvider: mapperUtils.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: metricsAddr,
})
if err != nil {
klog.Errorf("unable to get manager: %s", err)
os.Exit(1)
}
clientset := kubernetes.NewForConfigOrDie(mgr.GetConfig())
dynClient := dynamic.NewForConfigOrDie(mgr.GetConfig())
ipam := liqonetIpam.NewIPAM()
err = ipam.Init(liqonetIpam.Pools, dynClient, liqoconst.NetworkManagerIpamPort)
Expand All @@ -215,9 +277,8 @@ func main() {
Configured: make(chan bool, 1),
ForeignClusterStartWatcher: make(chan bool, 1),
ForeignClusterStopWatcher: make(chan struct{}),

IPManager: ipam,
RetryTimeout: 30 * time.Second,
IPManager: ipam,
RetryTimeout: 30 * time.Second,
}
r.WaitConfig.Add(3)
//starting configuration watcher
Expand Down
10 changes: 0 additions & 10 deletions deployments/liqo/files/liqo-gateway-ClusterRole.yaml
@@ -1,14 +1,4 @@
rules:
- apiGroups:
- config.liqo.io
resources:
- clusterconfigs
verbs:
- create
- get
- list
- update
- watch
- apiGroups:
- ""
resources:
Expand Down
17 changes: 8 additions & 9 deletions deployments/liqo/files/liqo-gateway-Role.yaml
@@ -1,31 +1,30 @@
rules:
- apiGroups:
- ""
- coordination.k8s.io
resources:
- pods
- leases
verbs:
- create
- get
- list
- update
- watch
- apiGroups:
- ""
resources:
- secrets
- pods
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- services
- secrets
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
9 changes: 5 additions & 4 deletions deployments/liqo/templates/liqo-gateway-deployment.yaml
Expand Up @@ -22,7 +22,6 @@ spec:
{{- end }}
labels:
{{- include "liqo.labels" $gatewayConfig | nindent 8 }}
{{- include "liqo.gatewayPodLabels" . | nindent 8 }}
{{- if .Values.gateway.pod.labels }}
{{- toYaml .Values.gateway.pod.labels | nindent 8 }}
{{- end }}
Expand All @@ -35,11 +34,13 @@ spec:
ports:
- containerPort: 5871
command: ["/usr/bin/liqonet"]
args: ["-run-as=liqo-gateway"]
args:
- -run-as=liqo-gateway
- -leader-elect=true
resources:
limits:
cpu: 10m
memory: 30M
cpu: 500m
memory: 300M
requests:
cpu: 10m
memory: 30M
Expand Down
33 changes: 6 additions & 27 deletions internal/liqonet/route-operator/overlayOperator.go
Expand Up @@ -18,6 +18,7 @@ import (

liqoerrors "github.com/liqotech/liqo/pkg/liqonet/errors"
"github.com/liqotech/liqo/pkg/liqonet/overlay"
liqoutils "github.com/liqotech/liqo/pkg/liqonet/utils"
)

var (
Expand Down Expand Up @@ -86,7 +87,7 @@ func (ovc *OverlayController) Reconcile(ctx context.Context, req ctrl.Request) (
}
// If it is our pod than add the mac address annotation.
if ovc.podIP == pod.Status.PodIP {
if ovc.addAnnotation(&pod, vxlanMACAddressKey, ovc.vxlanDev.Link.HardwareAddr.String()) {
if liqoutils.AddAnnotationToObj(&pod, vxlanMACAddressKey, ovc.vxlanDev.Link.HardwareAddr.String()) {
if err := ovc.Update(ctx, &pod); err != nil {
klog.Errorf("an error occurred while adding mac address annotation to pod {%s}: %v", req.String(), err)
return ctrl.Result{}, err
Expand All @@ -100,12 +101,12 @@ func (ovc *OverlayController) Reconcile(ctx context.Context, req ctrl.Request) (
added, err := ovc.addPeer(req, &pod)
if err != nil {
klog.Errorf("an error occurred while adding peer {%s} with IP address {%s} and MAC address {%s} to the vxlan overlay network: %v",
req.String(), pod.Status.PodIP, ovc.getAnnotationValue(&pod, vxlanMACAddressKey), err)
req.String(), pod.Status.PodIP, liqoutils.GetAnnotationValueFromObj(&pod, vxlanMACAddressKey), err)
return ctrl.Result{}, err
}
if added {
klog.Errorf("successfully added peer {%s} with IP address {%s} and MAC address {%s} to the vxlan overlay network",
req.String(), pod.Status.PodIP, ovc.getAnnotationValue(&pod, vxlanMACAddressKey))
req.String(), pod.Status.PodIP, liqoutils.GetAnnotationValueFromObj(&pod, vxlanMACAddressKey))
}
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -219,30 +220,6 @@ func (ovc *OverlayController) delPeer(req ctrl.Request) (bool, error) {
return deleted, nil
}

// addAnnotation for a given object it adds the annotation with the given key and value.
// It return a bool which is true when the annotations has been added or false if the
// annotation is already present.
func (ovc *OverlayController) addAnnotation(obj client.Object, aKey, aValue string) bool {
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string, 1)
}
oldAnnValue, ok := annotations[aKey]
// If the annotations does not exist or is outdated then set it.
if !ok || oldAnnValue != aValue {
annotations[aKey] = aValue
obj.SetAnnotations(annotations)
return true
}
return false
}

// getAnnotationValue all objects passed to this function has the annotations set.
// The podFilter functions makes sure that we reconcile only objects with the annotation set.
func (ovc *OverlayController) getAnnotationValue(obj client.Object, akey string) string {
return obj.GetAnnotations()[akey]
}

// podFilter used to filter out all the pods that are not instances of the route operator
// daemon set. It checks that pods are route operator instances, and has the vxlanMACAddressKey
// annotation set or that the current pod we are considering is our same pod. In this case
Expand All @@ -264,6 +241,8 @@ func (ovc *OverlayController) podFilter(obj client.Object) bool {
}
// If it is not our pod then check if the vxlan mac address has been set.
annotations := obj.GetAnnotations()

// Here we make sure that only objects with the annotation set can be reconciled.
if _, ok := annotations[vxlanMACAddressKey]; ok {
return true
}
Expand Down

0 comments on commit 327a31d

Please sign in to comment.