Skip to content

Commit

Permalink
Virtual kubelet: working queue-based reflection manager
Browse files Browse the repository at this point in the history
  • Loading branch information
giorio94 committed Oct 5, 2021
1 parent e54701f commit a9c8d18
Show file tree
Hide file tree
Showing 37 changed files with 1,758 additions and 64 deletions.
Expand Up @@ -186,7 +186,7 @@ func (b *Broadcaster) onPodAdd(obj interface{}) {
// subtract the pod resource from cluster resources. This action is done for all pods to extract actual available resources.
subResources(currentResources, podResources)
b.writeClusterResources(currentResources)
if clusterID := podAdded.Labels[forge.LiqoOriginClusterID]; clusterID != "" {
if clusterID := podAdded.Labels[forge.LiqoOriginClusterIDKey]; clusterID != "" {
klog.V(4).Infof("OnPodAdd: Pod %s:%s passed ClusterID check. ClusterID = %s", podAdded.Namespace, podAdded.Name, clusterID)
currentPodsResources := b.readPodResources(clusterID)
// add the resource of this pod in the map clusterID => resources to be used in ReadResources() function.
Expand All @@ -204,7 +204,7 @@ func (b *Broadcaster) onPodUpdate(oldObj, newObj interface{}) {
newResources := extractPodResources(newPod)
oldResources := extractPodResources(oldPod)
currentResources := b.readClusterResources()
clusterID := newPod.Labels[forge.LiqoOriginClusterID]
clusterID := newPod.Labels[forge.LiqoOriginClusterIDKey]
// empty if clusterID has not a valid value.
currentPodsResources := b.readPodResources(clusterID)

Expand Down Expand Up @@ -247,7 +247,7 @@ func (b *Broadcaster) onPodDelete(obj interface{}) {
// Resources used by the pod will become available again so add them to the total allocatable ones.
addResources(currentResources, podResources)
b.writeClusterResources(currentResources)
if clusterID := podDeleted.Labels[forge.LiqoOriginClusterID]; clusterID != "" {
if clusterID := podDeleted.Labels[forge.LiqoOriginClusterIDKey]; clusterID != "" {
klog.V(4).Infof("OnPodDelete: Pod %s:%s passed ClusterID check. ClusterID = %s", podDeleted.Namespace, podDeleted.Name, clusterID)
currentPodsResources := b.readPodResources(clusterID)
subResources(currentPodsResources, podResources)
Expand Down
Expand Up @@ -90,7 +90,7 @@ func CreateNewPod(ctx context.Context, podName, clusterID string, shadow bool, c
}
if clusterID != "" {
pod.Labels[forge.LiqoOutgoingKey] = "test"
pod.Labels[forge.LiqoOriginClusterID] = clusterID
pod.Labels[forge.LiqoOriginClusterIDKey] = clusterID
}
if shadow {
pod.Labels[consts.LocalPodLabelKey] = consts.LocalPodLabelValue
Expand Down
Expand Up @@ -87,7 +87,7 @@ func (r *ResourceOfferReconciler) checkVirtualKubeletDeployment(
func (r *ResourceOfferReconciler) createVirtualKubeletDeployment(
ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) error {
name := virtualKubelet.VirtualKubeletPrefix + resourceOffer.Spec.ClusterId
nodeName := virtualKubelet.VirtualNodePrefix + resourceOffer.Spec.ClusterId
nodeName := virtualKubelet.VirtualNodeName(resourceOffer.Spec.ClusterId)

namespace := resourceOffer.Namespace
remoteClusterID := resourceOffer.Spec.ClusterId
Expand Down
60 changes: 60 additions & 0 deletions pkg/utils/testutil/notfound.go
@@ -0,0 +1,60 @@
// Copyright 2019-2021 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package testutil

import (
"fmt"

"github.com/onsi/gomega/format"
"github.com/onsi/gomega/types"
"k8s.io/apimachinery/pkg/api/errors"
)

// IsNotFoundErrorMatcher is a custom matcher to check when kubernetes resources do not exist.
type IsNotFoundErrorMatcher struct{}

// FailBecauseNotFound returns a new IsNotFoundErrorMatcher to catch k8s not-found errors.
func FailBecauseNotFound() types.GomegaMatcher {
return &IsNotFoundErrorMatcher{}
}

// BeNotFound returns a new IsNotFoundErrorMatcher to catch k8s not-found errors.
func BeNotFound() types.GomegaMatcher {
return &IsNotFoundErrorMatcher{}
}

// Match is a GomegaMatcher interface method to actually run the matcher.
func (s *IsNotFoundErrorMatcher) Match(actual interface{}) (success bool, err error) {
if actual == nil {
return false, nil
}

switch e := actual.(type) {
case error:
return errors.IsNotFound(e), nil
default:
return false, fmt.Errorf("IsNotFoundErrorMatcher can match errors only")
}
}

// FailureMessage is called when the matcher fails positively.
func (s *IsNotFoundErrorMatcher) FailureMessage(actual interface{}) (message string) {
return format.Message(actual, "to match NotFoundError")
}

// NegatedFailureMessage is called when the matcher fails negatively.
func (s *IsNotFoundErrorMatcher) NegatedFailureMessage(actual interface{}) (message string) {
return format.Message(actual, "not to match NotFoundError")
}
21 changes: 13 additions & 8 deletions pkg/virtualKubelet/const.go
Expand Up @@ -14,19 +14,24 @@

package virtualKubelet

type ContextKey string

const (
VirtualNodePrefix = "liqo-"
VirtualKubeletPrefix = "virtual-kubelet-"
VirtualKubeletSecPrefix = "vk-kubeconfig-secret-"
AdvertisementPrefix = "advertisement-"
ReflectedpodKey = "virtualkubelet.liqo.io/source-pod"
HomePodFinalizer = "virtual-kubelet.liqo.io/provider"
// VirtualNodePrefix -> the prefix used to generate the virtual node name.
VirtualNodePrefix = "liqo-"
// VirtualKubeletPrefix -> the prefix used to generate the virtual kubelet deployment name.
VirtualKubeletPrefix = "virtual-kubelet-"
// ReflectedpodKey -> the key of the label added to reflected pods.
ReflectedpodKey = "virtualkubelet.liqo.io/source-pod"
// HomePodFinalizer -> the finalizer added to local pods when reflected.
HomePodFinalizer = "virtual-kubelet.liqo.io/provider"

// Clients configuration.
HOME_CLIENT_QPS = 1000
HOME_CLIENTS_BURST = 5000
FOREIGN_CLIENT_QPS = 1000
FOREIGN_CLIENT_BURST = 5000
)

// VirtualNodeName generates the virtual node name based on the cluster ID.
func VirtualNodeName(clusterID string) string {
return VirtualNodePrefix + clusterID
}
16 changes: 12 additions & 4 deletions pkg/virtualKubelet/forge/apiForger.go
Expand Up @@ -33,6 +33,13 @@ import (
"github.com/liqotech/liqo/pkg/virtualKubelet/options/types"
)

var (
// LocalClusterID -> the cluster ID associated with the local cluster.
LocalClusterID string
// RemoteClusterID -> the cluster ID associated with the remote cluster.
RemoteClusterID string
)

func ForeignToHomeStatus(foreignObj, homeObj runtime.Object) (runtime.Object, error) {
switch foreignObj.(type) {
case *corev1.Pod:
Expand Down Expand Up @@ -78,9 +85,8 @@ type apiForger struct {
nattingTable namespacesmapping.NamespaceNatter
ipamClient liqonetIpam.IpamClient

virtualNodeName options.ReadOnlyOption
liqoIpamServer options.ReadOnlyOption
offloadClusterID options.ReadOnlyOption
virtualNodeName options.ReadOnlyOption
liqoIpamServer options.ReadOnlyOption
}

var forger apiForger
Expand All @@ -92,8 +98,10 @@ func InitForger(nattingTable namespacesmapping.NamespaceNatter, opts ...options.
switch opt.Key() {
case types.VirtualNodeName:
forger.virtualNodeName = opt
case types.LocalClusterID:
LocalClusterID = opt.Value().ToString()
case types.RemoteClusterID:
forger.offloadClusterID = opt
RemoteClusterID = opt.Value().ToString()
case types.LiqoIpamServer:
forger.liqoIpamServer = opt
initIpamClient()
Expand Down
16 changes: 16 additions & 0 deletions pkg/virtualKubelet/forge/doc.go
@@ -0,0 +1,16 @@
// Copyright 2019-2021 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package forge groups the methods used to forge the Kubernetes object definitions for the reflection logic.
package forge
27 changes: 27 additions & 0 deletions pkg/virtualKubelet/forge/forge_suite_test.go
@@ -0,0 +1,27 @@
// Copyright 2019-2021 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package forge_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestForge(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Forge Suite")
}
40 changes: 21 additions & 19 deletions pkg/virtualKubelet/forge/forge_test.go
Expand Up @@ -12,46 +12,48 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package forge
package forge_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/liqotech/liqo/pkg/virtualKubelet/forge"
"github.com/liqotech/liqo/pkg/virtualKubelet/namespacesmapping/test"
"github.com/liqotech/liqo/pkg/virtualKubelet/options"
"github.com/liqotech/liqo/pkg/virtualKubelet/options/types"
)

var _ = Describe("Virtual Kubelet labels test", func() {
var (
namespaceNattingTable *test.MockNamespaceMapper
foreignClusterID options.Option
)

Context("Testing Labels attached to offloaded pods", func() {
const (
homeNamespace = "home-namespace"
remoteNamespace = "remote-namespace"
)

var homePod corev1.Pod

BeforeEach(
func() {
namespaceNattingTable = &test.MockNamespaceMapper{Cache: map[string]string{}}
namespaceNattingTable.Cache["homeNamespace"] = "homeNamespace-natted"
foreignClusterID = types.NewNetworkingOption(types.RemoteClusterID, "foreign-id")
InitForger(namespaceNattingTable, foreignClusterID)
namespaceNattingTable := &test.MockNamespaceMapper{Cache: map[string]string{}}
namespaceNattingTable.Cache[homeNamespace] = remoteNamespace
localClusterID := types.NewNetworkingOption(types.LocalClusterID, "local-id")
forge.InitForger(namespaceNattingTable, localClusterID)

homePod = corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: homeNamespace}}
},
)

It("Creating new pod to offload", func() {
foreignObj, err := HomeToForeign(nil, nil, LiqoOutgoingKey)
foreignObj, err := forge.HomeToForeign(&homePod, nil, forge.LiqoOutgoingKey)
Expect(err).NotTo(HaveOccurred())
foreignPod := foreignObj.(*corev1.Pod)
Expect(foreignPod.Labels[LiqoOutgoingKey]).ShouldNot(BeNil())
Expect(foreignPod.Labels[LiqoOriginClusterID]).ShouldNot(BeNil())
Expect(foreignPod.Labels[LiqoOriginClusterID]).Should(Equal("foreign-id"))

Expect(foreignPod.Labels[forge.LiqoOutgoingKey]).ShouldNot(BeNil())
Expect(foreignPod.Labels[forge.LiqoOriginClusterIDKey]).ShouldNot(BeNil())
Expect(foreignPod.Labels[forge.LiqoOriginClusterIDKey]).Should(Equal("local-id"))
})

})

})

var _ = Describe("Forge toleration test", func() {
Expand All @@ -78,7 +80,7 @@ var _ = Describe("Forge toleration test", func() {
It("Filtering tolerations", func() {
input := []corev1.Toleration{tol1, tol2}
expected := []corev1.Toleration{tol2}
output := forgeTolerations(input)
output := forge.Tolerations(input)
Expect(output).To(Equal(expected))
})

Expand Down
35 changes: 28 additions & 7 deletions pkg/virtualKubelet/forge/meta.go
Expand Up @@ -14,15 +14,18 @@

package forge

import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)

const (
// LiqoOutgoingKey is a label to set on all offloaded resources.
// LiqoOutgoingKey is a label to set on all offloaded resources (deprecated).
LiqoOutgoingKey = "virtualkubelet.liqo.io/outgoing"
// LiqoOriginClusterID is a label to set on all offloaded resources to identify the origin cluster.
LiqoOriginClusterID = "virtualkubelet.liqo.io/originClusterId"
// LiqoIncomingKey is a label for incoming resources.
LiqoIncomingKey = "virtualkubelet.liqo.io/incoming"
// LiqoOriginClusterIDKey is the key of a label identifying the origin cluster of a reflected resource.
LiqoOriginClusterIDKey = "virtualkubelet.liqo.io/origin"
// LiqoDestinationClusterIDKey is the key of a label identifying the destination cluster of a reflected resource.
LiqoDestinationClusterIDKey = "virtualkubelet.liqo.io/destination"
)

var (
Expand All @@ -34,11 +37,29 @@ var (
}
)

// ReflectionLabels returns the labels assigned to the objects reflected from the local to the remote cluster.
func ReflectionLabels() labels.Set {
return map[string]string{
LiqoOriginClusterIDKey: LocalClusterID,
LiqoDestinationClusterIDKey: RemoteClusterID,
}
}

// ReflectedLabelSelector returns a label selector matching the objects reflected from the local to the remote cluster.
func ReflectedLabelSelector() labels.Selector {
return ReflectionLabels().AsSelectorPreValidated()
}

// IsReflected returns whether the current object has been reflected from the local to the remote cluster.
func IsReflected(obj metav1.Object) bool {
return ReflectedLabelSelector().Matches(labels.Set(obj.GetLabels()))
}

func (f *apiForger) forgeForeignMeta(homeMeta, foreignMeta *metav1.ObjectMeta, foreignNamespace, reflectionType string) {
forgeObjectMeta(homeMeta, foreignMeta)

foreignMeta.Namespace = foreignNamespace
foreignMeta.Labels[LiqoOriginClusterID] = f.offloadClusterID.Value().ToString()
foreignMeta.Labels[LiqoOriginClusterIDKey] = LocalClusterID
foreignMeta.Labels[reflectionType] = LiqoNodeName()
}

Expand Down

0 comments on commit a9c8d18

Please sign in to comment.