Skip to content

Commit

Permalink
replication of namespaced resources in tenant namespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
aleoli committed May 21, 2021
1 parent 8476758 commit 25934cc
Show file tree
Hide file tree
Showing 24 changed files with 235 additions and 54 deletions.
1 change: 1 addition & 0 deletions apis/discovery/v1alpha1/resourcerequest_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ResourceRequest struct {
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status

// ResourceRequestList contains a list of ResourceRequest.
type ResourceRequestList struct {
Expand Down
25 changes: 22 additions & 3 deletions cmd/crd-replicator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ import (

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"
discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1"
"github.com/liqotech/liqo/internal/crdReplicator"
crdreplicator "github.com/liqotech/liqo/internal/crdReplicator"
"github.com/liqotech/liqo/pkg/clusterid"
identitymanager "github.com/liqotech/liqo/pkg/identityManager"
util "github.com/liqotech/liqo/pkg/liqonet"
"github.com/liqotech/liqo/pkg/mapperUtils"
tenantcontrolnamespace "github.com/liqotech/liqo/pkg/tenantControlNamespace"
)

var (
Expand All @@ -34,7 +37,14 @@ func init() {
}

func main() {
var useNewAuth bool

flag.BoolVar(&useNewAuth, "useNewAuth", false, "Enable the new authentication flow, with certificates and namespaced resources")
klog.InitFlags(nil)
flag.Parse()

flag.Parse()

cfg := ctrl.GetConfigOrDie()
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
MapperProvider: mapperUtils.LiqoMapperProvider(scheme),
Expand Down Expand Up @@ -70,9 +80,12 @@ func main() {
} else {
klog.Infof("setting local clusterID to: %s", clusterID)
}
clusterIDInterface := clusterid.NewStaticClusterID(clusterID)
namespaceManager := tenantcontrolnamespace.NewTenantControlNamespaceManager(k8sClient)
dynClient := dynamic.NewForConfigOrDie(cfg)
dynFac := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, crdReplicator.ResyncPeriod, metav1.NamespaceAll, crdReplicator.SetLabelsForLocalResources)
d := &crdReplicator.Controller{
dynFac := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
dynClient, crdreplicator.ResyncPeriod, metav1.NamespaceAll, crdreplicator.SetLabelsForLocalResources)
d := &crdreplicator.Controller{
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
ClientSet: k8sClient,
Expand All @@ -85,6 +98,12 @@ func main() {
LocalWatchers: make(map[string]chan struct{}),
RemoteWatchers: make(map[string]map[string]chan struct{}),
RemoteDynSharedInformerFactory: make(map[string]dynamicinformer.DynamicSharedInformerFactory),
UseNewAuth: useNewAuth,
NamespaceManager: namespaceManager,
IdentityManager: identitymanager.NewCertificateIdentityManager(
k8sClient, clusterIDInterface, namespaceManager),
LocalToRemoteNamespaceMapper: map[string]string{},
RemoteToLocalNamespaceMapper: map[string]string{},
}
if err = d.SetupWithManager(mgr); err != nil {
klog.Error(err, "unable to setup the crdreplicator-operator")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ rules:
resources:
- resourcerequests
verbs:
- create
- delete
- get
- list
Expand All @@ -15,6 +16,7 @@ rules:
resources:
- resourcerequests/status
verbs:
- create
- delete
- get
- list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ rules:
resources:
- networkconfigs
verbs:
- create
- delete
- get
- list
Expand All @@ -15,6 +16,7 @@ rules:
resources:
- networkconfigs/status
verbs:
- create
- delete
- get
- list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ rules:
resources:
- resourceoffers
verbs:
- create
- delete
- get
- list
Expand All @@ -15,6 +16,7 @@ rules:
resources:
- resourceoffers/status
verbs:
- create
- delete
- get
- list
Expand All @@ -26,6 +28,7 @@ rules:
resources:
- networkconfigs
verbs:
- create
- delete
- get
- list
Expand All @@ -37,6 +40,7 @@ rules:
resources:
- networkconfigs/status
verbs:
- create
- delete
- get
- list
Expand Down
2 changes: 1 addition & 1 deletion internal/crdReplicator/crdReplicator-config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package crdReplicator
package crdreplicator

import (
"reflect"
Expand Down
2 changes: 1 addition & 1 deletion internal/crdReplicator/crdReplicator-config_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package crdReplicator
package crdreplicator

import (
"testing"
Expand Down
66 changes: 58 additions & 8 deletions internal/crdReplicator/crdReplicator-operator.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package crdReplicator
package crdreplicator

import (
"context"
Expand Down Expand Up @@ -28,7 +28,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/liqotech/liqo/apis/discovery/v1alpha1"
identitymanager "github.com/liqotech/liqo/pkg/identityManager"
utils "github.com/liqotech/liqo/pkg/liqonet"
tenantcontrolnamespace "github.com/liqotech/liqo/pkg/tenantControlNamespace"
)

var (
Expand Down Expand Up @@ -60,6 +62,12 @@ type Controller struct {
UnregisteredResources []string // each time a resource is removed from the configuration it is saved in this list, it stays here until the associated watcher, if running, is stopped
LocalWatchers map[string]chan struct{} // we save all the running watchers monitoring the local resources:(registeredResource, chan))
RemoteWatchers map[string]map[string]chan struct{} // for each peering cluster we save all the running watchers monitoring the replicated resources:(clusterID, (registeredResource, chan))

UseNewAuth bool
NamespaceManager tenantcontrolnamespace.TenantControlNamespaceManager
IdentityManager identitymanager.IdentityManager
LocalToRemoteNamespaceMapper map[string]string
RemoteToLocalNamespaceMapper map[string]string
}

// cluster-role
Expand Down Expand Up @@ -140,6 +148,22 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
if dynClientOk && dynFacOk {
return result, nil
}

if c.UseNewAuth {
if fc.Status.TenantControlNamespace.Local == "" || fc.Status.TenantControlNamespace.Remote == "" {
klog.V(4).Infof("%s -> tenantControlNamespace is not set in resource %s for remote peering cluster %s",
c.ClusterID, req.NamespacedName, remoteClusterID)
return result, nil
}
config, err := c.IdentityManager.GetConfig(remoteClusterID, fc.Status.TenantControlNamespace.Local)
if err != nil {
klog.Errorf("%s -> unable to retrieve config from resource %s for remote peering cluster %s: %s",
c.ClusterID, req.NamespacedName, remoteClusterID, err)
return result, nil
}
return result, c.setUpConnectionToPeeringCluster(config, remoteClusterID, &fc)
}

// check if the config of the peering cluster is ready
// first we check the outgoing connection
if fc.Status.Outgoing.AvailableIdentity {
Expand All @@ -149,15 +173,15 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
klog.Errorf("%s -> unable to retrieve config from resource %s for remote peering cluster %s: %s", c.ClusterID, req.NamespacedName, remoteClusterID, err)
return result, nil
}
return result, c.setUpConnectionToPeeringCluster(config, remoteClusterID)
return result, c.setUpConnectionToPeeringCluster(config, remoteClusterID, &fc)
} else if fc.Status.Incoming.AvailableIdentity {
// retrieve the config
config, err := c.getKubeConfig(c.ClientSet, fc.Status.Incoming.IdentityRef, remoteClusterID)
if err != nil {
klog.Errorf("%s -> unable to retrieve config from resource %s for remote peering cluster %s: %s", c.ClusterID, req.NamespacedName, remoteClusterID, err)
return result, err
}
return result, c.setUpConnectionToPeeringCluster(config, remoteClusterID)
return result, c.setUpConnectionToPeeringCluster(config, remoteClusterID, &fc)
}
return result, nil
}
Expand Down Expand Up @@ -191,7 +215,16 @@ func (c *Controller) getKubeConfig(clientset kubernetes.Interface, reference *co
return cnf, nil
}

func (c *Controller) setUpConnectionToPeeringCluster(config *rest.Config, remoteClusterID string) error {
func (c *Controller) setUpConnectionToPeeringCluster(config *rest.Config, remoteClusterID string, fc *v1alpha1.ForeignCluster) error {
var remoteNamespace string
if c.UseNewAuth {
c.LocalToRemoteNamespaceMapper[fc.Status.TenantControlNamespace.Local] = fc.Status.TenantControlNamespace.Remote
c.RemoteToLocalNamespaceMapper[fc.Status.TenantControlNamespace.Remote] = fc.Status.TenantControlNamespace.Local
remoteNamespace = fc.Status.TenantControlNamespace.Remote
} else {
remoteNamespace = metav1.NamespaceAll
}

// check if the dynamic dynamic client exists
if _, ok := c.RemoteDynClients[remoteClusterID]; !ok {
dynClient, err := dynamic.NewForConfig(config)
Expand All @@ -206,7 +239,8 @@ func (c *Controller) setUpConnectionToPeeringCluster(config *rest.Config, remote
}
// check if the dynamic shared informer factory exists
if _, ok := c.RemoteDynSharedInformerFactory[remoteClusterID]; !ok {
f := dynamicinformer.NewFilteredDynamicSharedInformerFactory(c.RemoteDynClients[remoteClusterID], ResyncPeriod, metav1.NamespaceAll, c.SetLabelsForRemoteResources)
f := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
c.RemoteDynClients[remoteClusterID], ResyncPeriod, remoteNamespace, c.SetLabelsForRemoteResources)
c.RemoteDynSharedInformerFactory[remoteClusterID] = f
klog.Infof("%s -> dynamic shared informer factory created", remoteClusterID)
}
Expand Down Expand Up @@ -267,10 +301,13 @@ func (c *Controller) remoteModifiedWrapper(oldObj, newObj interface{}) {
func (c *Controller) RemoteResourceModifiedHandler(obj *unstructured.Unstructured, gvr schema.GroupVersionResource, remoteClusterId string) {
name := obj.GetName()
namespace := obj.GetNamespace()

localNamespace := c.remoteToLocalNamespace(namespace)

localDynClient := c.LocalDynClient
clusterID := c.ClusterID
// we check if the resource exists in the local cluster
localObj, found, err := c.GetResource(localDynClient, gvr, name, namespace, clusterID)
localObj, found, err := c.GetResource(localDynClient, gvr, name, localNamespace, clusterID)
if err != nil {
klog.Errorf("%s -> an error occurred while getting resource %s of type %s: %s", clusterID, name, gvr.String(), err)
return
Expand All @@ -279,8 +316,8 @@ func (c *Controller) RemoteResourceModifiedHandler(obj *unstructured.Unstructure
// do nothing? remove the remote replication?
// current choice -> remove the remote one as well
if !found {
klog.Infof("%s -> resource %s in namespace %s of type %s not found", clusterID, name, namespace, gvr.String())
klog.Infof("%s -> removing resource %s in namespace %s of type %s", remoteClusterId, name, namespace, gvr.String())
klog.Infof("%s -> resource %s in namespace %s of type %s not found", clusterID, name, localNamespace, gvr.String())
klog.Infof("%s -> removing resource %s in namespace %s of type %s", remoteClusterId, name, localNamespace, gvr.String())
err := c.DeleteResource(c.RemoteDynClients[remoteClusterId], gvr, obj, remoteClusterId)
if err != nil {
return
Expand Down Expand Up @@ -380,6 +417,7 @@ func (c *Controller) CreateResource(client dynamic.Interface, gvr schema.GroupVe
// check if the resource exists
name := obj.GetName()
namespace := obj.GetNamespace()

klog.Infof("%s -> creating resource %s of type %s", clusterID, name, gvr.String())
r, found, err := c.GetResource(client, gvr, name, namespace, clusterID)
if err != nil {
Expand Down Expand Up @@ -523,6 +561,10 @@ func (c *Controller) AddedHandler(obj *unstructured.Unstructured, gvr schema.Gro
klog.Infof("%s -> resource %s %s of type %s has not a destination label with the ID of the peering cluster", c.ClusterID, obj.GetName(), obj.GetNamespace(), gvr.String())
return
}

remoteNamespace := c.localToRemoteNamespace(obj.GetNamespace())
obj.SetNamespace(remoteNamespace)

if dynClient, ok := c.RemoteDynClients[remoteClusterID]; !ok {
klog.Infof("%s -> a connection to the peering cluster with id: %s does not exist", c.ClusterID, remoteClusterID)
return
Expand Down Expand Up @@ -560,6 +602,10 @@ func (c *Controller) ModifiedHandler(obj *unstructured.Unstructured, gvr schema.
name := obj.GetName()
namespace := obj.GetNamespace()
clusterID := remoteClusterID

namespace = c.localToRemoteNamespace(namespace)
obj.SetNamespace(namespace)

// we check if the resource exists in the remote cluster
_, found, err := c.GetResource(dynClient, gvr, name, namespace, clusterID)
if err != nil {
Expand Down Expand Up @@ -609,6 +655,10 @@ func (c *Controller) DeletedHandler(obj *unstructured.Unstructured, gvr schema.G
namespace := obj.GetNamespace()
dynClient := dynClient
clusterID := remoteClusterID

namespace = c.localToRemoteNamespace(namespace)
obj.SetNamespace(namespace)

// we check if the resource exists in the remote cluster
_, found, err := c.GetResource(dynClient, gvr, name, namespace, clusterID)
if err != nil {
Expand Down
40 changes: 39 additions & 1 deletion internal/crdReplicator/crdReplicator-operator_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package crdReplicator
package crdreplicator

import (
"context"
Expand All @@ -14,6 +14,9 @@ import (
"k8s.io/client-go/dynamic/dynamicinformer"

netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1"
"github.com/liqotech/liqo/pkg/clusterid"
identitymanager "github.com/liqotech/liqo/pkg/identityManager"
tenantcontrolnamespace "github.com/liqotech/liqo/pkg/tenantControlNamespace"
)

var (
Expand Down Expand Up @@ -52,6 +55,8 @@ func getLabels() map[string]string {
}

func getCRDReplicator() Controller {
tenantmanager := tenantcontrolnamespace.NewTenantControlNamespaceManager(k8sclient)
clusterIDInterface := clusterid.NewStaticClusterID(localClusterID)
return Controller{
Scheme: nil,
ClusterID: localClusterID,
Expand All @@ -63,6 +68,12 @@ func getCRDReplicator() Controller {
LocalDynClient: dynClient,
LocalDynSharedInformerFactory: localDynFac,
LocalWatchers: map[string]chan struct{}{},

UseNewAuth: false,
NamespaceManager: tenantmanager,
IdentityManager: identitymanager.NewCertificateIdentityManager(k8sclient, clusterIDInterface, tenantmanager),
LocalToRemoteNamespaceMapper: map[string]string{},
RemoteToLocalNamespaceMapper: map[string]string{},
}
}

Expand Down Expand Up @@ -360,3 +371,30 @@ func TestGetStatus(t *testing.T) {
assert.Nil(t, err, "error should be nil")
assert.Nil(t, objStatus, "the spec should be nil")
}

func TestNamespaceTranslation(t *testing.T) {
d := getCRDReplicator()

localNamespace := "local"
remoteNamespace := "remote"
otherNamespace := "other"

d.LocalToRemoteNamespaceMapper[localNamespace] = remoteNamespace
d.RemoteToLocalNamespaceMapper[remoteNamespace] = localNamespace

// namespaces present in the map

mappedNamespace := d.localToRemoteNamespace(localNamespace)
assert.Equal(t, mappedNamespace, remoteNamespace, "these namespace names have to be equal")

demappedNamespace := d.remoteToLocalNamespace(mappedNamespace)
assert.Equal(t, demappedNamespace, localNamespace, "these namespace names have to be equal")

// namespaces not present in the map

mappedNamespace = d.localToRemoteNamespace(otherNamespace)
assert.Equal(t, mappedNamespace, otherNamespace, "these namespace names have to be equal")

demappedNamespace = d.remoteToLocalNamespace(mappedNamespace)
assert.Equal(t, demappedNamespace, otherNamespace, "these namespace names have to be equal")
}
3 changes: 3 additions & 0 deletions internal/crdReplicator/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package crdreplicator implements the logic for the replication of CustomResourceDefinitions
// between the peered clusters.
package crdreplicator
Loading

0 comments on commit 25934cc

Please sign in to comment.