Skip to content

Commit

Permalink
Added advertisement status in PeeringRequest
Browse files Browse the repository at this point in the history
When the adv is accepted/refused by the foreign cluster, the status is copied in the local PeeringRequest to notify home cluster
  • Loading branch information
fraborg authored and palexster committed Aug 10, 2020
1 parent 8f22507 commit 390ce8f
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 34 deletions.
61 changes: 61 additions & 0 deletions api/discovery/v1/peeringRequestClient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package v1

import (
"errors"
"github.com/liqoTech/liqo/pkg/crdClient"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

// create a client for ClusterConfig CR using a provided kubeconfig
func CreatePeeringRequestClient(kubeconfig string) (*crdClient.CRDClient, error) {
var config *rest.Config
var err error

if err = AddToScheme(scheme.Scheme); err != nil {
panic(err)
}

GroupResource := schema.GroupResource{Group: GroupVersion.Group, Resource: "peeringrequests"}

crdClient.AddToRegistry("peeringrequests", &PeeringRequest{}, &PeeringRequestList{}, Keyer, GroupResource)

config, err = crdClient.NewKubeconfig(kubeconfig, &GroupVersion)
if err != nil {
panic(err)
}

clientSet, err := crdClient.NewFromConfig(config)
if err != nil {
return nil, err
}

store, stop, err := crdClient.WatchResources(clientSet,
"peeringrequests",
"",
0,
cache.ResourceEventHandlerFuncs{},
metav1.ListOptions{})

if err != nil {
return nil, err
}

clientSet.Store = store
clientSet.Stop = stop

return clientSet, nil
}

func Keyer(obj runtime.Object) (string, error) {
config, ok := obj.(*PeeringRequest)
if !ok {
return "", errors.New("cannot cast received object to PeeringRequest")
}

return config.Name, nil
}
3 changes: 1 addition & 2 deletions api/discovery/v1/peeringrequest_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ type PeeringRequestSpec struct {

// PeeringRequestStatus defines the observed state of PeeringRequest
type PeeringRequestStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
AdvertisementStatus string `json:"advertisementStatus,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
30 changes: 25 additions & 5 deletions api/discovery/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion deployments/liqo_chart/crds/peering-request-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,9 @@ spec:
type: object
description: KubeConfig file (with Advertisement creation role) secret reference
required:
- clusterID
- clusterID
status:
description: PeeringRequestStatus defines the observed state of PeeringRequest
properties:
advertisementStatus:
type: string
2 changes: 1 addition & 1 deletion internal/advertisement-operator/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (b *AdvertisementBroadcaster) GenerateAdvertisement() {
// start the remote watcher over this Advertisement; the watcher must be launched only once
go once.Do(func() {
foreignAdvName := "advertisement-" + b.ForeignClusterId
WatchAdvertisement(b.LocalClient, b.RemoteClient, adv.Name, foreignAdvName)
b.WatchAdvertisement(adv.Name, foreignAdvName)
})

time.Sleep(10 * time.Minute)
Expand Down
76 changes: 54 additions & 22 deletions internal/advertisement-operator/remote-watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@ package advertisement_operator

import (
protocolv1 "github.com/liqoTech/liqo/api/advertisement-operator/v1"
"github.com/liqoTech/liqo/pkg/crdClient"
discoveryv1 "github.com/liqoTech/liqo/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog"
)

func WatchAdvertisement(localClient, remoteClient *crdClient.CRDClient, homeAdvName, foreignAdvName string) {
func (b *AdvertisementBroadcaster) WatchAdvertisement(homeAdvName, foreignAdvName string) {

klog.V(6).Info("starting remote advertisement watcher")
watcher, err := remoteClient.Resource("advertisements").Watch(metav1.ListOptions{
klog.Info("starting remote advertisement watcher")
watcher, err := b.RemoteClient.Resource("advertisements").Watch(metav1.ListOptions{
FieldSelector: "metadata.name=" + homeAdvName,
Watch: true,
})
if err != nil {
klog.Error(err)
return
}
klog.V(6).Info("correctly created watcher for " + homeAdvName)
klog.Info("correctly created watcher for " + homeAdvName)

// events are triggered only by modifications on the Advertisement created by the broadcaster on the remote cluster
// homeClusterAdv is the Advertisement created by home cluster on foreign cluster -> stored remotely
Expand All @@ -32,29 +32,61 @@ func WatchAdvertisement(localClient, remoteClient *crdClient.CRDClient, homeAdvN
}
switch event.Type {
case watch.Added, watch.Modified:
// check if the triggering event is a modification made by the tunnelEndpoint creator
if homeClusterAdv.Status.RemoteRemappedPodCIDR == "" {
continue
// the triggering event is a modification made by the tunnelEndpoint creator
if homeClusterAdv.Status.RemoteRemappedPodCIDR != "" {
err = b.setNetworkRemapping(homeClusterAdv, foreignAdvName)
if err != nil {
klog.Error(err)
} else {
klog.Info("correctly set network remapping for foreign advertisement " + foreignAdvName)
}
}

// get the Advertisement of the foreign cluster (stored in the local cluster)
obj, err := localClient.Resource("advertisements").Get(foreignAdvName, metav1.GetOptions{})
if err != nil {
klog.Error(err)
continue
// the triggering event is the acceptance/refusal of the Advertisement
if homeClusterAdv.Status.AdvertisementStatus != "" {
err = b.saveAdvStatus(homeClusterAdv)
if err != nil {
klog.Error(err)
} else {
klog.Info("correctly set peering request status to " + homeClusterAdv.Status.AdvertisementStatus)
}
}
foreignClusterAdv := obj.(*protocolv1.Advertisement)
// set the status of the foreign cluster Advertisement with the information given by the tunnelEndpoint creator
foreignClusterAdv.Status.LocalRemappedPodCIDR = homeClusterAdv.Status.RemoteRemappedPodCIDR
_, err = localClient.Resource("advertisements").UpdateStatus(foreignAdvName, foreignClusterAdv, metav1.UpdateOptions{})
if err != nil {
klog.Error(err)
continue
}
klog.V(6).Info("correctly set status of foreign advertisement " + foreignAdvName)
case watch.Deleted:
klog.Info("Adv " + homeAdvName + " has been deleted")
watcher.Stop()
}
}
}

func (b *AdvertisementBroadcaster) setNetworkRemapping(homeClusterAdv *protocolv1.Advertisement, foreignAdvName string) error {
// get the Advertisement of the foreign cluster (stored in the local cluster)
obj, err := b.LocalClient.Resource("advertisements").Get(foreignAdvName, metav1.GetOptions{})
if err != nil {
return err
}
foreignClusterAdv := obj.(*protocolv1.Advertisement)
// set the status of the foreign cluster Advertisement with the information given by the tunnelEndpoint creator
foreignClusterAdv.Status.LocalRemappedPodCIDR = homeClusterAdv.Status.RemoteRemappedPodCIDR
_, err = b.LocalClient.Resource("advertisements").UpdateStatus(foreignAdvName, foreignClusterAdv, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}

func (b *AdvertisementBroadcaster) saveAdvStatus(adv *protocolv1.Advertisement) error {
// get the PeeringRequest from the foreign cluster which requested resources
tmp, err := b.DiscoveryClient.Resource("peeringrequests").Get(b.PeeringRequestName, metav1.GetOptions{})
if err != nil {
return err
}
pr := tmp.(*discoveryv1.PeeringRequest)

// save the advertisement status (ACCEPTED/REFUSED) in the PeeringRequest
pr.Status.AdvertisementStatus = adv.Status.AdvertisementStatus
_, err = b.DiscoveryClient.Resource("peeringrequests").UpdateStatus(b.PeeringRequestName, pr, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}
10 changes: 9 additions & 1 deletion test/unit/advertisement-operator/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
protocolv1 "github.com/liqoTech/liqo/api/advertisement-operator/v1"
policyv1 "github.com/liqoTech/liqo/api/cluster-config/v1"
discoveryv1 "github.com/liqoTech/liqo/api/discovery/v1"
"github.com/liqoTech/liqo/internal/advertisement-operator"
"github.com/liqoTech/liqo/internal/kubernetes/test"
pkg "github.com/liqoTech/liqo/pkg/advertisement-operator"
Expand Down Expand Up @@ -34,6 +35,12 @@ func createBroadcaster(clusterConfig policyv1.ClusterConfigSpec) advertisement_o
panic(err)
}

// create the discovery client
discoveryClient, err := discoveryv1.CreatePeeringRequestClient("")
if err != nil {
panic(err)
}

secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Expand All @@ -46,13 +53,14 @@ func createBroadcaster(clusterConfig policyv1.ClusterConfigSpec) advertisement_o

return advertisement_operator.AdvertisementBroadcaster{
LocalClient: homeClient,
DiscoveryClient: nil,
DiscoveryClient: discoveryClient,
KubeconfigSecretForForeign: secret,
RemoteClient: foreignClient,
HomeClusterId: test.HomeClusterId,
ForeignClusterId: test.ForeignClusterId,
GatewayPrivateIP: "10.0.0.1",
ClusterConfig: clusterConfig,
PeeringRequestName: test.ForeignClusterId,
}
}

Expand Down
Loading

0 comments on commit 390ce8f

Please sign in to comment.