Skip to content

Commit

Permalink
Bump EndpointSlices API to v1
Browse files Browse the repository at this point in the history
  • Loading branch information
palexster committed Jun 18, 2021
1 parent c891f0a commit 718f118
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 60 deletions.
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
discoveryv1 "k8s.io/api/discovery/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -42,7 +42,7 @@ func (r *EndpointSlicesReflector) SetSpecializedPreProcessingHandlers() {

func (r *EndpointSlicesReflector) HandleEvent(e interface{}) {
event := e.(watch.Event)
ep, ok := event.Object.(*discoveryv1beta1.EndpointSlice)
ep, ok := event.Object.(*discoveryv1.EndpointSlice)
if !ok {
klog.Error("REFLECTION: cannot cast object to EndpointSlice")
return
Expand All @@ -52,7 +52,7 @@ func (r *EndpointSlicesReflector) HandleEvent(e interface{}) {

switch event.Type {
case watch.Added:
_, err := r.GetForeignClient().DiscoveryV1beta1().EndpointSlices(ep.Namespace).Create(context.TODO(), ep, metav1.CreateOptions{})
_, err := r.GetForeignClient().DiscoveryV1().EndpointSlices(ep.Namespace).Create(context.TODO(), ep, metav1.CreateOptions{})
if kerrors.IsAlreadyExists(err) {
klog.V(4).Infof("REFLECTION: The remote endpointslices %v/%v has not been created because already existing", ep.Namespace, ep.Name)
break
Expand All @@ -65,7 +65,7 @@ func (r *EndpointSlicesReflector) HandleEvent(e interface{}) {

case watch.Modified:
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
_, newErr := r.GetForeignClient().DiscoveryV1beta1().EndpointSlices(ep.Namespace).Update(context.TODO(), ep, metav1.UpdateOptions{})
_, newErr := r.GetForeignClient().DiscoveryV1().EndpointSlices(ep.Namespace).Update(context.TODO(), ep, metav1.UpdateOptions{})
return newErr
}); err != nil {
klog.Errorf("REFLECTION: Error while updating the remote EndpointSlice %v - ERR: %v", key, err)
Expand All @@ -74,7 +74,7 @@ func (r *EndpointSlicesReflector) HandleEvent(e interface{}) {
}

case watch.Deleted:
if err := r.GetForeignClient().DiscoveryV1beta1().EndpointSlices(ep.Namespace).Delete(context.TODO(), ep.Name, metav1.DeleteOptions{}); err != nil {
if err := r.GetForeignClient().DiscoveryV1().EndpointSlices(ep.Namespace).Delete(context.TODO(), ep.Name, metav1.DeleteOptions{}); err != nil {
klog.Errorf("REFLECTION: Error while deleting the remote EndpointSlice %v - ERR: %v", key, err)
} else {
klog.V(3).Infof("REFLECTION: remote EndpointSlice %v correctly deleted", key)
Expand All @@ -83,7 +83,7 @@ func (r *EndpointSlicesReflector) HandleEvent(e interface{}) {
}

func (r *EndpointSlicesReflector) PreAdd(obj interface{}) (interface{}, watch.EventType) {
epLocal := obj.(*discoveryv1beta1.EndpointSlice).DeepCopy()
epLocal := obj.(*discoveryv1.EndpointSlice).DeepCopy()
nattedNs, err := r.NattingTable().NatNamespace(epLocal.Namespace, false)
if err != nil {
klog.Error(err)
Expand Down Expand Up @@ -123,14 +123,14 @@ func (r *EndpointSlicesReflector) PreAdd(obj interface{}) (interface{}, watch.Ev
},
}

epsRemote := &discoveryv1beta1.EndpointSlice{
epsRemote := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: epLocal.Name,
Namespace: nattedNs,
Labels: labels,
OwnerReferences: svcOwnerRef,
},
AddressType: discoveryv1beta1.AddressTypeIPv4,
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: filterEndpoints(epLocal, r.IpamClient, string(r.VirtualNodeName.Value())),
Ports: epLocal.Ports,
}
Expand All @@ -139,7 +139,7 @@ func (r *EndpointSlicesReflector) PreAdd(obj interface{}) (interface{}, watch.Ev
}

func (r *EndpointSlicesReflector) PreUpdate(newObj, _ interface{}) (interface{}, watch.EventType) {
endpointSliceHome := newObj.(*discoveryv1beta1.EndpointSlice).DeepCopy()
endpointSliceHome := newObj.(*discoveryv1.EndpointSlice).DeepCopy()
endpointSliceName := endpointSliceHome.Name

nattedNs, err := r.NattingTable().NatNamespace(endpointSliceHome.Namespace, false)
Expand All @@ -158,7 +158,7 @@ func (r *EndpointSlicesReflector) PreUpdate(newObj, _ interface{}) (interface{},
klog.Error(err)
return nil, watch.Modified
}
RemoteEpSlice := oldRemoteObj.(*discoveryv1beta1.EndpointSlice).DeepCopy()
RemoteEpSlice := oldRemoteObj.(*discoveryv1.EndpointSlice).DeepCopy()

RemoteEpSlice.Endpoints = filterEndpoints(endpointSliceHome, r.IpamClient, string(r.VirtualNodeName.Value()))
RemoteEpSlice.Ports = endpointSliceHome.Ports
Expand All @@ -168,7 +168,7 @@ func (r *EndpointSlicesReflector) PreUpdate(newObj, _ interface{}) (interface{},

func (r *EndpointSlicesReflector) PreDelete(obj interface{}) (interface{}, watch.EventType) {
clusterID := strings.TrimPrefix(string(r.VirtualNodeName.Value()), "liqo-")
endpointSliceLocal := obj.(*discoveryv1beta1.EndpointSlice)
endpointSliceLocal := obj.(*discoveryv1.EndpointSlice)
nattedNs, err := r.NattingTable().NatNamespace(endpointSliceLocal.Namespace, false)
if err != nil {
klog.Error(err)
Expand All @@ -186,23 +186,22 @@ func (r *EndpointSlicesReflector) PreDelete(obj interface{}) (interface{}, watch
return endpointSliceLocal, watch.Deleted
}

func filterEndpoints(slice *discoveryv1beta1.EndpointSlice, ipamClient liqonet.IpamClient, nodeName string) []discoveryv1beta1.Endpoint {
var epList []discoveryv1beta1.Endpoint
func filterEndpoints(slice *discoveryv1.EndpointSlice, ipamClient liqonet.IpamClient, nodeName string) []discoveryv1.Endpoint {
var epList []discoveryv1.Endpoint
// Two possibilities: (1) exclude all virtual nodes (2)
for _, v := range slice.Endpoints {
t := v.Topology["kubernetes.io/hostname"]
for index := range slice.Endpoints {
t := *slice.Endpoints[index].Hostname
if t != nodeName {
response, err := ipamClient.MapEndpointIP(context.Background(),
&liqonet.MapRequest{ClusterID: utils.GetClusterIDFromNodeName(nodeName), Ip: v.Addresses[0]})
&liqonet.MapRequest{ClusterID: utils.GetClusterIDFromNodeName(nodeName), Ip: slice.Endpoints[index].Addresses[0]})
if err != nil {
klog.Error(err)
}
newEp := discoveryv1beta1.Endpoint{
newEp := discoveryv1.Endpoint{
Addresses: []string{response.GetIp()},
Conditions: v.Conditions,
Conditions: slice.Endpoints[index].Conditions,
Hostname: nil,
TargetRef: nil,
Topology: nil,
}
epList = append(epList, newEp)
}
Expand Down Expand Up @@ -233,17 +232,17 @@ func (r *EndpointSlicesReflector) CleanupNamespace(localNamespace string) {
}
}
for _, obj := range objects {
eps := obj.(*discoveryv1beta1.EndpointSlice)
eps := obj.(*discoveryv1.EndpointSlice)
if err := retry.OnError(retry.DefaultBackoff, retriable, func() error {
return r.GetForeignClient().DiscoveryV1beta1().EndpointSlices(foreignNamespace).Delete(context.TODO(), eps.Name, metav1.DeleteOptions{})
return r.GetForeignClient().DiscoveryV1().EndpointSlices(foreignNamespace).Delete(context.TODO(), eps.Name, metav1.DeleteOptions{})
}); err != nil {
klog.Errorf("Error while deleting remote endpointslice %v/%v", eps.Namespace, eps.Name)
}
}
}

func (r *EndpointSlicesReflector) isAllowed(_ context.Context, obj interface{}) bool {
eps, ok := obj.(*discoveryv1beta1.EndpointSlice)
eps, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
klog.Error("cannot convert obj to service")
return false
Expand Down
@@ -1,26 +1,29 @@
package reflection
package outgoing

import (
"context"
"testing"

"gotest.tools/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/api/discovery/v1beta1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/klog"
"k8s.io/klog/v2"

liqonetTest "github.com/liqotech/liqo/pkg/liqonet/test"

apimgmt "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection"
api "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection/reflectors"
"github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection/reflectors/outgoing"
"github.com/liqotech/liqo/pkg/virtualKubelet/namespacesMapping/test"
"github.com/liqotech/liqo/pkg/virtualKubelet/options/types"
storageTest "github.com/liqotech/liqo/pkg/virtualKubelet/storage/test"
)

var (
localNode = "worker-3"
virtualNode = "vk-node"
)

func TestEndpointAdd(t *testing.T) {
foreignClient := fake.NewSimpleClientset()
cacheManager := &storageTest.MockManager{
Expand All @@ -35,14 +38,14 @@ func TestEndpointAdd(t *testing.T) {
CacheManager: cacheManager,
}

reflector := &outgoing.EndpointSlicesReflector{
reflector := &EndpointSlicesReflector{
APIReflector: Greflector,
VirtualNodeName: types.NewNetworkingOption("VirtualNodeName", "vk-node"),
IpamClient: &liqonetTest.MockIpam{LocalRemappedPodCIDR: "10.0.0.0/16"},
}
reflector.SetSpecializedPreProcessingHandlers()

epslice := &v1beta1.EndpointSlice{
epslice := &discoveryv1.EndpointSlice{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Expand All @@ -59,20 +62,18 @@ func TestEndpointAdd(t *testing.T) {
},
},
},
Endpoints: []v1beta1.Endpoint{
Endpoints: []discoveryv1.Endpoint{
{
Addresses: []string{"10.0.0.15"},
Conditions: v1beta1.EndpointConditions{},
Hostname: nil,
Conditions: discoveryv1.EndpointConditions{},
Hostname: &localNode,
TargetRef: nil,
Topology: map[string]string{"kubernetes.io/hostname": "worker-3"},
},
{
Addresses: []string{"10.0.0.20"},
Conditions: v1beta1.EndpointConditions{},
Hostname: nil,
Conditions: discoveryv1.EndpointConditions{},
Hostname: &virtualNode,
TargetRef: nil,
Topology: map[string]string{"kubernetes.io/hostname": "vk-node"},
}},
Ports: nil,
}
Expand All @@ -97,7 +98,7 @@ func TestEndpointAdd(t *testing.T) {
}

pa, _ := reflector.PreProcessAdd(epslice)
postadd := pa.(*v1beta1.EndpointSlice)
postadd := pa.(*discoveryv1.EndpointSlice)

assert.Equal(t, postadd.Namespace, "homeNamespace-natted", "Asserting namespace natting")
assert.Equal(t, len(postadd.Endpoints), 1, "Asserting node-based filtering")
Expand All @@ -118,14 +119,14 @@ func TestEndpointAdd2(t *testing.T) {
CacheManager: cacheManager,
}

reflector := &outgoing.EndpointSlicesReflector{
reflector := &EndpointSlicesReflector{
APIReflector: Greflector,
VirtualNodeName: types.NewNetworkingOption("VirtualNodeName", "vk-node"),
IpamClient: &liqonetTest.MockIpam{LocalRemappedPodCIDR: "10.0.0.0/16"},
}
reflector.SetSpecializedPreProcessingHandlers()

epslice := &v1beta1.EndpointSlice{
epSlice := &discoveryv1.EndpointSlice{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Expand All @@ -142,20 +143,18 @@ func TestEndpointAdd2(t *testing.T) {
},
},
},
Endpoints: []v1beta1.Endpoint{
Endpoints: []discoveryv1.Endpoint{
{
Addresses: []string{"10.10.0.15"},
Conditions: v1beta1.EndpointConditions{},
Hostname: nil,
Conditions: discoveryv1.EndpointConditions{},
Hostname: &localNode,
TargetRef: nil,
Topology: map[string]string{"kubernetes.io/hostname": "worker-3"},
},
{
Addresses: []string{"10.10.0.20"},
Conditions: v1beta1.EndpointConditions{},
Hostname: nil,
Conditions: discoveryv1.EndpointConditions{},
Hostname: &virtualNode,
TargetRef: nil,
Topology: map[string]string{"kubernetes.io/hostname": "vk-node"},
}},
Ports: nil,
}
Expand All @@ -179,8 +178,8 @@ func TestEndpointAdd2(t *testing.T) {
t.Fail()
}

pa, _ := reflector.PreProcessAdd(epslice)
postadd := pa.(*v1beta1.EndpointSlice)
pa, _ := reflector.PreProcessAdd(epSlice)
postadd := pa.(*discoveryv1.EndpointSlice)

assert.Equal(t, postadd.Namespace, "homeNamespace-natted", "Asserting namespace natting")
assert.Equal(t, len(postadd.Endpoints), 1, "Asserting node-based filtering")
Expand Down
@@ -1,4 +1,4 @@
package reflection
package outgoing

import (
"testing"
Expand All @@ -10,7 +10,6 @@ import (

apimgmt "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection"
api "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection/reflectors"
"github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection/reflectors/outgoing"
"github.com/liqotech/liqo/pkg/virtualKubelet/namespacesMapping/test"
storageTest "github.com/liqotech/liqo/pkg/virtualKubelet/storage/test"
)
Expand All @@ -29,7 +28,7 @@ func TestSecretAdd(t *testing.T) {
CacheManager: cacheManager,
}

reflector := &outgoing.SecretsReflector{
reflector := &SecretsReflector{
APIReflector: Greflector,
}
reflector.SetSpecializedPreProcessingHandlers()
Expand Down Expand Up @@ -68,7 +67,7 @@ func TestSASecretAdd(t *testing.T) {
CacheManager: cacheManager,
}

reflector := &outgoing.SecretsReflector{
reflector := &SecretsReflector{
APIReflector: Greflector,
}
reflector.SetSpecializedPreProcessingHandlers()
Expand Down Expand Up @@ -114,7 +113,7 @@ func TestSecretUpdate(t *testing.T) {
CacheManager: cacheManager,
}

reflector := &outgoing.SecretsReflector{
reflector := &SecretsReflector{
APIReflector: Greflector,
}
reflector.SetSpecializedPreProcessingHandlers()
Expand Down
6 changes: 3 additions & 3 deletions pkg/virtualKubelet/forge/apiForger.go
Expand Up @@ -8,7 +8,7 @@ import (
"google.golang.org/grpc"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -41,8 +41,8 @@ func HomeToForeign(homeObj, foreignObj runtime.Object, reflectionType string) (r
switch homeObj.(type) {
case *corev1.ConfigMap:
return forger.configmapHomeToForeign(homeObj.(*corev1.ConfigMap), foreignObj.(*corev1.ConfigMap))
case *discoveryv1beta1.EndpointSlice:
return forger.endpointsliceHomeToForeign(homeObj.(*discoveryv1beta1.EndpointSlice), foreignObj.(*discoveryv1beta1.EndpointSlice))
case *discoveryv1.EndpointSlice:
return forger.endpointsliceHomeToForeign(homeObj.(*discoveryv1.EndpointSlice), foreignObj.(*discoveryv1.EndpointSlice))
case *corev1.Pod:
return forger.podHomeToForeign(homeObj, foreignObj, reflectionType)
case *corev1.Service:
Expand Down
4 changes: 2 additions & 2 deletions pkg/virtualKubelet/forge/endpointslices.go
@@ -1,9 +1,9 @@
package forge

import (
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
discoveryv1 "k8s.io/api/discovery/v1"
)

func (f *apiForger) endpointsliceHomeToForeign(homeEndpointslice, foreignEndpointslice *discoveryv1beta1.EndpointSlice) (*discoveryv1beta1.EndpointSlice, error) {
func (f *apiForger) endpointsliceHomeToForeign(homeEndpointslice, foreignEndpointslice *discoveryv1.EndpointSlice) (*discoveryv1.EndpointSlice, error) {
panic("to implement")
}
4 changes: 2 additions & 2 deletions pkg/virtualKubelet/storage/indexers.go
Expand Up @@ -6,7 +6,7 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/client-go/tools/cache"

"github.com/liqotech/liqo/pkg/virtualKubelet"
Expand Down Expand Up @@ -39,7 +39,7 @@ func configmapsIndexers() cache.Indexers {
func endpointSlicesIndexers() cache.Indexers {
i := cache.Indexers{}
i["endpointslices"] = func(obj interface{}) ([]string, error) {
endpointSlice, ok := obj.(*discoveryv1beta1.EndpointSlice)
endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
return []string{}, errors.New("cannot convert obj to endpointslice")
}
Expand Down

0 comments on commit 718f118

Please sign in to comment.