Skip to content

Commit

Permalink
Add check virtual node.
Browse files Browse the repository at this point in the history
Added vitual node test.
Removed case ReadyToReady because useless.
Changed variable debug name in errorsUtils.go
  • Loading branch information
giuse2596 committed Aug 5, 2021
1 parent 054d60b commit ea36aff
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 37 deletions.
6 changes: 3 additions & 3 deletions cmd/liqo-controller-manager/main.go
Expand Up @@ -78,14 +78,14 @@ func main() {
var metricsAddr, localKubeconfig, clusterId string
var probeAddr string
var enableLeaderElection bool
var debug bool
var enablePanic bool
var liqoNamespace, kubeletImage, initKubeletImage string
var resyncPeriod int64
var offloadingStatusControllerRequeueTime int64
var offerUpdateThreshold uint64
var namespaceMapControllerRequeueTime int64

flag.BoolVar(&debug, "debug", false, "flag to enable the debug mode")
flag.BoolVar(&enablePanic, "panic-on-unexpected-errors", false, "flag to enable panic if unexpected errors occur")
flag.StringVar(&metricsAddr, "metrics-addr", defaultMetricsaddr, "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection,
Expand All @@ -111,7 +111,7 @@ func main() {
klog.InitFlags(nil)
flag.Parse()

errorsmanagement.SetDebug(debug)
errorsmanagement.SetPanicMode(enablePanic)

if clusterId == "" {
klog.Error("Cluster ID must be provided")
Expand Down
30 changes: 17 additions & 13 deletions internal/resource-request-operator/broadcaster.go
Expand Up @@ -18,6 +18,7 @@ import (

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"
"github.com/liqotech/liqo/internal/resource-request-operator/interfaces"
"github.com/liqotech/liqo/pkg/consts"
crdclient "github.com/liqotech/liqo/pkg/crdClient"
"github.com/liqotech/liqo/pkg/utils"
errorsmanagement "github.com/liqotech/liqo/pkg/utils/errorsManagement"
Expand Down Expand Up @@ -137,7 +138,7 @@ func (b *Broadcaster) getLastRead(remoteClusterID string) corev1.ResourceList {
// react to a Node Creation/First informer run.
func (b *Broadcaster) onNodeAdd(obj interface{}) {
node := obj.(*corev1.Node)
if utils.IsNodeReady(node) {
if utils.IsNodeReady(node) && !isVirtualNode(node) {
klog.V(4).Infof("Adding Node %s\n", node.Name)
toAdd := &node.Status.Allocatable
currentResources := b.readClusterResources()
Expand All @@ -154,7 +155,7 @@ func (b *Broadcaster) onNodeUpdate(oldObj, newObj interface{}) {
newNodeResources := newNode.Status.Allocatable
currentResources := b.readClusterResources()
klog.V(4).Infof("Updating Node %s in %v\n", oldNode.Name, newNode)
if utils.IsNodeReady(newNode) {
if utils.IsNodeReady(newNode) && !isVirtualNode(newNode) {
// node was already Ready, update with possible new resources.
if utils.IsNodeReady(oldNode) {
updateResources(currentResources, oldNodeResources, newNodeResources)
Expand All @@ -163,7 +164,7 @@ func (b *Broadcaster) onNodeUpdate(oldObj, newObj interface{}) {
addResources(currentResources, newNodeResources)
}
// node is terminating or stopping, delete all its resources.
} else if utils.IsNodeReady(oldNode) && !utils.IsNodeReady(newNode) {
} else if utils.IsNodeReady(oldNode) && !utils.IsNodeReady(newNode) && !isVirtualNode(newNode) {
subResources(currentResources, oldNodeResources)
}
b.writeClusterResources(currentResources)
Expand All @@ -174,7 +175,7 @@ func (b *Broadcaster) onNodeDelete(obj interface{}) {
node := obj.(*corev1.Node)
toDelete := &node.Status.Allocatable
currentResources := b.readClusterResources()
if utils.IsNodeReady(node) {
if utils.IsNodeReady(node) && !isVirtualNode(node) {
klog.V(4).Infof("Deleting Node %s\n", node.Name)
subResources(currentResources, *toDelete)
b.writeClusterResources(currentResources)
Expand Down Expand Up @@ -213,15 +214,6 @@ func (b *Broadcaster) onPodUpdate(oldObj, newObj interface{}) {
currentPodsResources := b.readPodResources(clusterID)

switch getPodTransitionState(oldPod, newPod) {
// pod already Ready, just update resources.
case ReadyToReady:
updateResources(currentResources, oldResources, newResources)
if clusterID != "" {
klog.V(4).Infof("OnPodUpdate: Pod %s:%s passed ClusterID check. ClusterID = %s\n", newPod.Namespace, newPod.Name, clusterID)
// update the resource of this pod in the map clusterID => resources to be used in ReadResources() function.
// this action is done to correct the computation not considering pod offloaded by the cluster with this ClusterID
updateResources(currentPodsResources, oldResources, newResources)
}
// pod is becoming Ready, same of onPodAdd case.
case PendingToReady:
subResources(currentResources, newResources)
Expand All @@ -240,6 +232,9 @@ func (b *Broadcaster) onPodUpdate(oldObj, newObj interface{}) {
// this action is done to correct the computation not considering pod offloaded by the cluster with this ClusterID
subResources(currentPodsResources, oldResources)
}
// pod resources request are immutable. See the doc https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
case ReadyToReady:
return
case PendingToPending:
return
}
Expand Down Expand Up @@ -457,3 +452,12 @@ func getPodTransitionState(oldPod, newPod *corev1.Pod) PodTransition {

return PendingToPending
}

func isVirtualNode(node *corev1.Node) bool {
if virtualLabel, exists := node.Labels[consts.TypeLabel]; exists {
if virtualLabel == consts.TypeNode {
return true
}
}
return false
}
40 changes: 25 additions & 15 deletions internal/resource-request-operator/resourceRequest_operator_test.go
Expand Up @@ -23,6 +23,7 @@ import (
sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1"
crdreplicator "github.com/liqotech/liqo/internal/crdReplicator"
"github.com/liqotech/liqo/pkg/clusterid"
"github.com/liqotech/liqo/pkg/consts"
"github.com/liqotech/liqo/pkg/discovery"
"github.com/liqotech/liqo/pkg/virtualKubelet/forge"
)
Expand Down Expand Up @@ -50,6 +51,11 @@ func createNewNode(nodeName string, virtual bool) (*corev1.Node, error) {
Name: nodeName,
},
}
if virtual {
node.Labels = map[string]string{
consts.TypeLabel: consts.TypeNode,
}
}
node.Status = corev1.NodeStatus{
Capacity: resources,
Allocatable: resources,
Expand Down Expand Up @@ -550,20 +556,6 @@ var _ = Describe("ResourceRequest Operator", func() {
}
return checkResourceOfferUpdate(nodeList, podList)
}, timeout, interval).Should(BeTrue())
By("Changing pod resources")
podReq.Cpu().Add(*resource.NewQuantity(2, resource.DecimalSI))
podWithoutLabel.Spec.Containers[0].Resources.Requests = podReq
podWithoutLabel, err = clientset.CoreV1().Pods("default").UpdateStatus(ctx, podWithoutLabel, metav1.UpdateOptions{})
Expect(err).ToNot(HaveOccurred())
Eventually(func() bool {
nodeList := []corev1.ResourceList{
0: node2.Status.Allocatable,
}
podList := []corev1.ResourceList{
0: podReq,
}
return checkResourceOfferUpdate(nodeList, podList)
}, timeout, interval).Should(BeTrue())
By("Adding pod offloaded by cluster which refers the ResourceOffer. Expected no change in resources")
_, err = createNewPod("pod-offloaded-"+ClusterID1, ClusterID1)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -610,10 +602,28 @@ var _ = Describe("ResourceRequest Operator", func() {
}, timeout, interval).Should(BeTrue())
By("Update threshold with huge amount to test isAboveThreshold function")
newBroadcaster.setThreshold(80)
node2.Status.Allocatable.Cpu().Add(*resource.NewQuantity(1, resource.DecimalSI))
cpu := node2.Status.Allocatable[corev1.ResourceCPU]
cpu.Add(*resource.NewQuantity(2, resource.DecimalSI))
node2.Status.Allocatable[corev1.ResourceCPU] = cpu
node2, err = clientset.CoreV1().Nodes().UpdateStatus(ctx, node2, metav1.UpdateOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(newBroadcaster.isAboveThreshold(ClusterID1)).ShouldNot(BeTrue())
})
It("Test virtual node creation", func() {
podReq, _ := resourcehelper.PodRequestsAndLimits(podWithoutLabel)
_, err := createNewNode("test-virtual-node", true)
Expect(err).ToNot(HaveOccurred())
By("Expected no change on resources")
Eventually(func() bool {
nodeList := []corev1.ResourceList{
0: node2.Status.Allocatable,
1: node1.Status.Allocatable,
}
podList := []corev1.ResourceList{
0: podReq,
}
return checkResourceOfferUpdate(nodeList, podList)
}, timeout, interval).Should(BeTrue())
})
})
})
2 changes: 1 addition & 1 deletion internal/resource-request-operator/suite_test.go
Expand Up @@ -71,7 +71,7 @@ func createCluster() {
MetricsBindAddress: "0", // this avoids port binding collision
})
Expect(err).ToNot(HaveOccurred())
errorsmanagement.SetDebug(true)
errorsmanagement.SetPanicMode(true)
clientset = kubernetes.NewForConfigOrDie(k8sManager.GetConfig())
homeClusterID = clusterid.NewStaticClusterID("test-cluster").GetClusterID()
updater := OfferUpdater{}
Expand Down
10 changes: 5 additions & 5 deletions pkg/utils/errorsManagement/errorUtils.go
Expand Up @@ -2,19 +2,19 @@ package errorsmanagement

import "k8s.io/klog/v2"

var debug = false
var panicMode = false

// SetDebug can be used to set or unset the debug mode.
func SetDebug(status bool) {
debug = status
// SetPanicMode can be used to set or unset the panic mode.
func SetPanicMode(status bool) {
panicMode = status
}

// Must wraps a function call that can return an error. If some error occurred Must has two possible behaviors:
// panic if debug = true or log the error and return false in order to recover the error.
// Returns true if no error occurred.
func Must(err error) bool {
if err != nil {
if debug {
if panicMode {
panic(err)
} else {
klog.Errorf("%s", err)
Expand Down

0 comments on commit ea36aff

Please sign in to comment.