Skip to content

Commit

Permalink
Some fixes:
Browse files Browse the repository at this point in the history
- Some test utils function are added.
- New management of mutatiion server shutdown.
- New function mergePodNodeSelectorWithTheImposedOne.
- Some comments are added.
- New function getMergedNodeSelector with some tests.
  • Loading branch information
Andreagit97 committed Jun 26, 2021
1 parent 8bc7831 commit 11e075a
Show file tree
Hide file tree
Showing 8 changed files with 618 additions and 507 deletions.
18 changes: 13 additions & 5 deletions cmd/liqo-webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"context"
"os/signal"
"syscall"

"k8s.io/klog"

Expand All @@ -10,19 +12,25 @@ import (

func main() {
config := &mutate.MutationConfig{}

setOptions(config)

klog.Info("Starting server ...")

ctx, cancel := context.WithCancel(context.Background())
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

s, err := mutate.NewMutationServer(ctx, config)
if err != nil {
klog.Fatal(err)
}

idleConnsClosed := make(chan struct{})
go func() {
<-ctx.Done()
s.Shutdown(ctx)
close(idleConnsClosed)
}()

s.Serve()
// todo: not sure about cancel, is it necessary ?
cancel()
ctx.Done()
<-idleConnsClosed
}
12 changes: 6 additions & 6 deletions pkg/liqonet/natmappinginflater/natMappingInflater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/fake"

liqonetapi "github.com/liqotech/liqo/apis/net/v1alpha1"
netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1"
"github.com/liqotech/liqo/pkg/consts"
liqoneterrors "github.com/liqotech/liqo/pkg/liqonet/errors"
)
Expand Down Expand Up @@ -45,7 +45,7 @@ func setDynClient() error {
Group: "net.liqo.io",
Version: "v1alpha1",
Kind: "natmappings",
}, &liqonetapi.NatMapping{})
}, &netv1alpha1.NatMapping{})

var m = make(map[schema.GroupVersionResource]string)

Expand Down Expand Up @@ -133,7 +133,7 @@ var _ = Describe("NatMappingInflater", func() {
Group: "net.liqo.io",
Version: "v1alpha1",
Kind: "natmappings",
}, &liqonetapi.NatMapping{})
}, &netv1alpha1.NatMapping{})
var m = make(map[schema.GroupVersionResource]string)
m[schema.GroupVersionResource{
Group: "net.liqo.io",
Expand Down Expand Up @@ -162,7 +162,7 @@ var _ = Describe("NatMappingInflater", func() {
Group: "net.liqo.io",
Version: "v1alpha1",
Kind: "natmappings",
}, &liqonetapi.NatMapping{})
}, &netv1alpha1.NatMapping{})
var m = make(map[schema.GroupVersionResource]string)
m[schema.GroupVersionResource{
Group: "net.liqo.io",
Expand Down Expand Up @@ -241,7 +241,7 @@ var _ = Describe("NatMappingInflater", func() {
Group: "net.liqo.io",
Version: "v1alpha1",
Kind: "natmappings",
}, &liqonetapi.NatMapping{})
}, &netv1alpha1.NatMapping{})
var m = make(map[schema.GroupVersionResource]string)
m[schema.GroupVersionResource{
Group: "net.liqo.io",
Expand All @@ -266,7 +266,7 @@ var _ = Describe("NatMappingInflater", func() {

// Check if there is only one resouce
list, err := inflater.dynClient.
Resource(liqonetapi.NatMappingGroupResource).
Resource(netv1alpha1.NatMappingGroupResource).
List(context.Background(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s,%s=%s",
consts.NatMappingResourceLabelKey,
Expand Down
117 changes: 71 additions & 46 deletions pkg/mutate/manage_pod_mutation.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package mutate

import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

offv1alpha1 "github.com/liqotech/liqo/apis/offloading/v1alpha1"
liqoconst "github.com/liqotech/liqo/pkg/consts"
)

// getVirtualNodeToleration returns a new Toleration for the Liqo's virtual-node.
// getVirtualNodeToleration returns a new Toleration for the Liqo's virtual-nodes.
func getVirtualNodeToleration() corev1.Toleration {
return corev1.Toleration{
Key: liqoconst.VirtualNodeTolerationKey,
Expand All @@ -18,29 +20,29 @@ func getVirtualNodeToleration() corev1.Toleration {
}

// createTolerationFromNamespaceOffloading creates a new virtualNodeToleration in case of LocalAndRemotePodOffloadingStrategyType
// or RemotePodOffloadingStrategyType. In case of LocalPodOffloadingStrategyType returns an empty Toleration.
func createTolerationFromNamespaceOffloading(strategy offv1alpha1.PodOffloadingStrategyType) corev1.Toleration {
// or RemotePodOffloadingStrategyType. In case of PodOffloadingStrategyType not recognized, returns an error.
func createTolerationFromNamespaceOffloading(strategy offv1alpha1.PodOffloadingStrategyType) (corev1.Toleration, error) {
var toleration corev1.Toleration
switch {
case strategy == offv1alpha1.LocalAndRemotePodOffloadingStrategyType, strategy == offv1alpha1.RemotePodOffloadingStrategyType:
// The virtual-node toleration must be added.
toleration = getVirtualNodeToleration()
default:
// LocalPodOffloadingStrategyType, default is less clear but safer
// No toleration has to be added.
toleration = corev1.Toleration{}
err := fmt.Errorf("PodOffloadingStrategyType '%s' not recognized", strategy)
klog.Error(err)
return corev1.Toleration{}, err
}
return toleration
return toleration, nil
}

// createNodeSelectorFromNamespaceOffloading creates the right NodeSelector according to the PodOffloadingStrategy chosen.
func createNodeSelectorFromNamespaceOffloading(nsoff *offv1alpha1.NamespaceOffloading) corev1.NodeSelector {
var nodeSelector corev1.NodeSelector
func createNodeSelectorFromNamespaceOffloading(nsoff *offv1alpha1.NamespaceOffloading) (corev1.NodeSelector, error) {
nodeSelector := nsoff.Spec.ClusterSelector
switch {
case nsoff.Spec.PodOffloadingStrategy == offv1alpha1.RemotePodOffloadingStrategyType:
// To ensure that the pod is not scheduled on local nodes is necessary to add a new NodeSelectorRequirement which
// requires the label "liqo.io/type=virtual-node", to every NodeSelectorTerm.
nodeSelector = nsoff.Spec.ClusterSelector
// To ensure that the pod is not scheduled on local nodes is necessary to add to every NodeSelectorTerm a
// new NodeSelectorRequirement. This NodeSelectorRequirement requires explicitly the label
// "liqo.io/type=virtual-node" to exclude local nodes from the scheduler choice.
for i := range nodeSelector.NodeSelectorTerms {
nodeSelector.NodeSelectorTerms[i].MatchExpressions = append(nodeSelector.NodeSelectorTerms[i].MatchExpressions,
corev1.NodeSelectorRequirement{
Expand All @@ -50,55 +52,67 @@ func createNodeSelectorFromNamespaceOffloading(nsoff *offv1alpha1.NamespaceOfflo
})
}
case nsoff.Spec.PodOffloadingStrategy == offv1alpha1.LocalAndRemotePodOffloadingStrategyType:
nodeSelector = nsoff.Spec.ClusterSelector
// A new NodeSelectorTerm terms that allows scheduling the pod also on local nodes is added.
newNodeSelectorTerm := corev1.NodeSelectorTerm{
MatchExpressions: append([]corev1.NodeSelectorRequirement{}, corev1.NodeSelectorRequirement{
MatchExpressions: []corev1.NodeSelectorRequirement{{
Key: liqoconst.TypeLabel,
Operator: corev1.NodeSelectorOpNotIn,
Values: []string{liqoconst.TypeNode},
}),
}},
}
nodeSelector.NodeSelectorTerms = append(nodeSelector.NodeSelectorTerms, newNodeSelectorTerm)
case nsoff.Spec.PodOffloadingStrategy == offv1alpha1.LocalPodOffloadingStrategyType:
// In local strategy there is no necessity to impose a particular NodeSelector.
nodeSelector = corev1.NodeSelector{NodeSelectorTerms: []corev1.NodeSelectorTerm{}}
default:
err := fmt.Errorf("PodOffloadingStrategyType '%s' not recognized", nsoff.Spec.PodOffloadingStrategy)
klog.Error(err)
return corev1.NodeSelector{}, err
}
return nodeSelector
return nodeSelector, nil
}

// fillPodWithImposedNodeSelector gets the previously computed NodeSelector imposed by the PodOffloadingStrategy and
// merges it with the Pod NodeSelector if it is already present. It simply adds it to the Pod if there is merge to do.
func fillPodWithImposedNodeSelector(imposedNodeSelector *corev1.NodeSelector, pod *corev1.Pod) {
// getMergedNodeSelector gets the old PodNodeSelector and merges it with the ImposedNodeSelector.
// Every MatchExpression of the PodNodeSelector must be merged with all the MatchExpressions of the ImposedNodeSelector:
// n PodNodeSelector MatchExpressions.
// m ImposedNodeSelector MatchExpressions.
// m * n MergedNoseSelector MatchExpressions.
func getMergedNodeSelector(podNodeSelector *corev1.NodeSelector,
imposedNodeSelector *corev1.NodeSelector) corev1.NodeSelector {
mergedNodeSelector := corev1.NodeSelector{NodeSelectorTerms: []corev1.NodeSelectorTerm{}}
for i := range podNodeSelector.NodeSelectorTerms {
for j := range imposedNodeSelector.NodeSelectorTerms {
newMatchExpression := append(imposedNodeSelector.NodeSelectorTerms[j].MatchExpressions,
podNodeSelector.NodeSelectorTerms[i].MatchExpressions...)
mergedNodeSelector.NodeSelectorTerms = append(mergedNodeSelector.NodeSelectorTerms, corev1.NodeSelectorTerm{
MatchExpressions: newMatchExpression,
})
}
}
return mergedNodeSelector
}

// fillPodWithTheNewNodeSelector gets the previously computed NodeSelector imposed by the PodOffloadingStrategy and
// merges it with the Pod NodeSelector if it is already present. It simply adds it to the Pod if previously unset.
func fillPodWithTheNewNodeSelector(imposedNodeSelector *corev1.NodeSelector, pod *corev1.Pod) {
// To preserve the Pod Affinity content, it is necessary to add the imposedNodeSelector according to what
// is already present in the Pod Affinity.
switch {
case pod.Spec.Affinity == nil:
pod.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{},
RequiredDuringSchedulingIgnoredDuringExecution: imposedNodeSelector.DeepCopy(),
},
}
case pod.Spec.Affinity.NodeAffinity == nil:
pod.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{},
}
case pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil:
pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &corev1.NodeSelector{}
case len(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) != 0:
oldNodeSelector := *pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution
tmpSelector := corev1.NodeSelector{NodeSelectorTerms: []corev1.NodeSelectorTerm{}}
for i := range oldNodeSelector.NodeSelectorTerms {
for j := range imposedNodeSelector.NodeSelectorTerms {
newMatchExpression := append(imposedNodeSelector.NodeSelectorTerms[j].MatchExpressions, oldNodeSelector.NodeSelectorTerms[i].MatchExpressions...)
tmpSelector.NodeSelectorTerms = append(tmpSelector.NodeSelectorTerms, corev1.NodeSelectorTerm{
MatchExpressions: newMatchExpression,
})
}
RequiredDuringSchedulingIgnoredDuringExecution: imposedNodeSelector.DeepCopy(),
}
*imposedNodeSelector = tmpSelector
case pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil ||
len(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) == 0:
pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = imposedNodeSelector.DeepCopy()
default:
*pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution =
getMergedNodeSelector(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
imposedNodeSelector)
}
*pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = *imposedNodeSelector
}

// mutatePod checks the NamespaceOffloading CR associated with the Pod's Namespace.
Expand All @@ -111,22 +125,33 @@ func mutatePod(namespaceOffloading *offv1alpha1.NamespaceOffloading, pod *corev1
// the NodeSelector inserted by the user (ClusterSelector field).
klog.V(5).Infof("Chosen strategy: %s", namespaceOffloading.Spec.PodOffloadingStrategy)

// If strategy is equal to LocalPodOffloadingStrategy there is nothing to do
if namespaceOffloading.Spec.PodOffloadingStrategy == offv1alpha1.LocalPodOffloadingStrategyType {
return
}

// Create the right Toleration according to the PodOffloadingStrategy case.
toleration := createTolerationFromNamespaceOffloading(namespaceOffloading.Spec.PodOffloadingStrategy)
toleration, err := createTolerationFromNamespaceOffloading(namespaceOffloading.Spec.PodOffloadingStrategy)
if err != nil {
klog.Errorf("The NamespaceOffloading in namespace '%s' has a not known strategy '%s'",
namespaceOffloading.Namespace, namespaceOffloading.Spec.PodOffloadingStrategy)
return
}
klog.V(5).Infof("Generated Toleration: %s", toleration)

// Create the right NodeSelector according to the PodOffloadingStrategy case.
imposedNodeSelector := createNodeSelectorFromNamespaceOffloading(namespaceOffloading)
klog.V(5).Infof("ImposedNodeSelector: %s", imposedNodeSelector)

// If there is no toleration, return (LocalPodOffloadingStrategy case).
if toleration.MatchToleration(&corev1.Toleration{}) {
imposedNodeSelector, err := createNodeSelectorFromNamespaceOffloading(namespaceOffloading)
if err != nil {
klog.Errorf("The NamespaceOffloading in namespace '%s' has a not known strategy '%s'",
namespaceOffloading.Namespace, namespaceOffloading.Spec.PodOffloadingStrategy)
return
}
klog.V(5).Infof("ImposedNodeSelector: %s", imposedNodeSelector)

// It is necessary to add the just created toleration.
pod.Spec.Tolerations = append(pod.Spec.Tolerations, toleration)

// Enforce the new NodeSelector policy imposed by the NamespaceOffloading creator.
fillPodWithImposedNodeSelector(&imposedNodeSelector, pod)
fillPodWithTheNewNodeSelector(&imposedNodeSelector, pod)
klog.V(5).Infof("Pod NodeSelector: %s", imposedNodeSelector)
}
5 changes: 2 additions & 3 deletions pkg/mutate/mutate.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,13 @@ func (s *MutationServer) Mutate(body []byte) ([]byte, error) {
return nil, err
}

newPod := pod.DeepCopy()
newPod.Spec.Tolerations = append(newPod.Spec.Tolerations, corev1.Toleration{
pod.Spec.Tolerations = append(pod.Spec.Tolerations, corev1.Toleration{
Key: liqoconst.VirtualNodeTolerationKey,
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoExecute,
})

target, err := json.Marshal(newPod)
target, err := json.Marshal(pod)
if err != nil {
klog.Error(err)
return nil, err
Expand Down
30 changes: 19 additions & 11 deletions pkg/mutate/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -31,11 +31,12 @@ type MutationServer struct {
config *MutationConfig
}

func getWebhookClient(ctx context.Context) client.Client {
func getWebhookClient(ctx context.Context) (client.Client, error) {
conf := ctrl.GetConfigOrDie()
if conf == nil {
klog.Error("mutation server: unable to get config file for cluster home")
return nil
err := fmt.Errorf("mutation server: unable to get config file for cluster home")
klog.Error(err)
return nil, err
}

scheme := runtime.NewScheme()
Expand All @@ -44,13 +45,13 @@ func getWebhookClient(ctx context.Context) client.Client {
mapper, err := (mapperUtils.LiqoMapperProvider(scheme))(conf)
if err != nil {
klog.Errorf("mutation server mapper: %s", err)
return nil
return nil, err
}

webhookCache, err := cache.New(conf, cache.Options{Scheme: scheme, Mapper: mapper})
if err != nil {
klog.Errorf("mutation server cache: %s", err)
return nil
return nil, err
}

go func() {
Expand All @@ -62,19 +63,19 @@ func getWebhookClient(ctx context.Context) client.Client {
webhookClient, err := cluster.DefaultNewClient(webhookCache, conf, client.Options{Scheme: scheme, Mapper: mapper})
if err != nil {
klog.Errorf("mutation server webhookClient: %s", err)
return nil
return nil, err
}
return webhookClient
return webhookClient, nil
}

// NewMutationServer creates a new mutation server.
func NewMutationServer(ctx context.Context, c *MutationConfig) (*MutationServer, error) {
s := &MutationServer{}
s.config = c

s.webhookClient = getWebhookClient(ctx)
if s.webhookClient == nil {
return nil, fmt.Errorf(" webhook Client is empty")
var err error
if s.webhookClient, err = getWebhookClient(ctx); err != nil {
return nil, err
}

s.mux = http.NewServeMux()
Expand Down Expand Up @@ -125,3 +126,10 @@ func (s *MutationServer) sendError(err error, w http.ResponseWriter) {
func (s *MutationServer) Serve() {
klog.Fatal(s.server.ListenAndServeTLS(s.config.CertFile, s.config.KeyFile))
}

func (s *MutationServer) Shutdown(ctx context.Context) {
if err := s.server.Shutdown(ctx); err != nil {
// Error from closing listeners, or context timeout:
klog.Errorf("HTTP server Shutdown: %v", err)
}
}

0 comments on commit 11e075a

Please sign in to comment.