Skip to content

Commit

Permalink
Add a node labeling FV test
Browse files Browse the repository at this point in the history
  • Loading branch information
caseydavenport committed Dec 17, 2018
1 parent c5a5e82 commit dcb2aff
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pkg/config/config.go
Expand Up @@ -44,7 +44,7 @@ type Config struct {
HealthEnabled bool `default:"true"`

// Enable syncing of node labels
SyncNodeLabels bool `default:"false" split_words:"true"`
SyncNodeLabels bool `default:"true" split_words:"true"`
}

// Parse parses envconfig and stores in Config struct
Expand Down
42 changes: 24 additions & 18 deletions pkg/controllers/node/node_controller.go
Expand Up @@ -17,7 +17,6 @@ package node
import (
"context"
"encoding/json"
"fmt"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -42,6 +41,7 @@ const (
RateLimitCalicoList = "calico-list"
RateLimitK8s = "k8s"
RateLimitCalicoDelete = "calico-delete"
nodeLabelAnnotation = "projectcalico.org/kube-labels"
)

var (
Expand Down Expand Up @@ -77,10 +77,6 @@ func NewNodeController(ctx context.Context, k8sClientset *kubernetes.Clientset,
// just one additional sync.
nc.schedule = make(chan interface{}, 1)

// Start the syncer.
nc.initSyncer()
nc.syncer.Start()

// Create a Node watcher.
listWatcher := cache.NewListWatchFromClient(k8sClientset.CoreV1().RESTClient(), "nodes", "", fields.Everything())

Expand All @@ -92,12 +88,18 @@ func NewNodeController(ctx context.Context, k8sClientset *kubernetes.Clientset,
kick(nc.schedule)
}}

// TODO: Only setup node labels syncer if env var is set
handlers.AddFunc = func(obj interface{}) {
nc.syncNodeLabels(obj.(*v1.Node))
}
handlers.UpdateFunc = func(_, obj interface{}) {
nc.syncNodeLabels(obj.(*v1.Node))
if cfg.SyncNodeLabels {
// Start the syncer.
nc.initSyncer()
nc.syncer.Start()

// Add handlers for node add/update events from k8s.
handlers.AddFunc = func(obj interface{}) {
nc.syncNodeLabels(obj.(*v1.Node))
}
handlers.UpdateFunc = func(_, obj interface{}) {
nc.syncNodeLabels(obj.(*v1.Node))
}
}

// Informer handles managing the watch and signals us when nodes are deleted.
Expand All @@ -108,15 +110,18 @@ func NewNodeController(ctx context.Context, k8sClientset *kubernetes.Clientset,
}

// syncNodeLabels syncs the labels found in v1.Node to the Calico node object.
func (nc *NodeController) syncNodeLabels(node *v1.Node) error {
// It uses an annotation on the Calico node object to keep track of which labels have
// beend synced from Kubernetes, so that it doesn't overwrite user provided labels (e.g.,
// via calicoctl or another Calico controller).
func (nc *NodeController) syncNodeLabels(node *v1.Node) {
// On failure, we retry a certain number of times.
for n := 1; n < maxAttempts; n++ {
// Get the Calico node representation.
name, ok := nc.nodemapper[node.Name]
if !ok {
// We havent learned this Calico node yet.
log.Debugf("Skipping update for node with no Calico equivalent")
return nil
return
}
calNode, err := nc.calicoClient.Nodes().Get(context.Background(), name, options.GetOptions{})
if err != nil {
Expand All @@ -135,7 +140,7 @@ func (nc *NodeController) syncNodeLabels(node *v1.Node) error {
needsUpdate := false

// Check if it has the annotation for k8s labels.
a, ok := calNode.Annotations["projectcalico.org/kube-labels"]
a, ok := calNode.Annotations[nodeLabelAnnotation]

// If there are labels present, then parse them. Otherwise this is
// a first-time sync, in which case there are no old labels.
Expand Down Expand Up @@ -171,9 +176,10 @@ func (nc *NodeController) syncNodeLabels(node *v1.Node) error {
bytes, err := json.Marshal(node.Labels)
if err != nil {
log.WithError(err).Errorf("Error marshalling node labels")
panic(err) // TODO
time.Sleep(retrySleepTime)
continue
}
calNode.Annotations["projectcalico.org/kube-labels"] = string(bytes)
calNode.Annotations[nodeLabelAnnotation] = string(bytes)

// Update the node in the datastore.
if needsUpdate {
Expand All @@ -184,9 +190,9 @@ func (nc *NodeController) syncNodeLabels(node *v1.Node) error {
}
log.WithField("node", node.ObjectMeta.Name).Info("Successfully synced node labels")
}
return nil
return
}
return fmt.Errorf("Too many retries when updating node")
log.Errorf("Too many retries when updating node")
}

// getK8sNodeName is a helper method that searches a calicoNode for its kubernetes nodeRef.
Expand Down
79 changes: 79 additions & 0 deletions pkg/controllers/node/node_syncer.go
@@ -0,0 +1,79 @@
// Copyright (c) 2017 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package node

import (
apiv3 "github.com/projectcalico/libcalico-go/lib/apis/v3"
bapi "github.com/projectcalico/libcalico-go/lib/backend/api"
"github.com/projectcalico/libcalico-go/lib/backend/model"
"github.com/projectcalico/libcalico-go/lib/backend/watchersyncer"
"github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
)

func (c *NodeController) initSyncer() {
resourceTypes := []watchersyncer.ResourceType{
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindNode},
},
}
type accessor interface {
Backend() bapi.Client
}
c.syncer = watchersyncer.New(c.calicoClient.(accessor).Backend(), resourceTypes, c)
}

func (c *NodeController) OnStatusUpdated(status bapi.SyncStatus) {
logrus.Infof("Node controller syncer status updated: %s", status)
}

func (c *NodeController) OnUpdates(updates []bapi.Update) {
// Update local cache.
logrus.Debugf("Node controller syncer received updates: %#v", updates)
for _, upd := range updates {
switch upd.UpdateType {
case bapi.UpdateTypeKVNew:
fallthrough
case bapi.UpdateTypeKVUpdated:
n := upd.KVPair.Value.(*apiv3.Node)
if kn := getK8sNodeName(*n); kn != "" {
// Create a mapping from Kubernetes node -> Calico node.
logrus.Debugf("Mapping Calico node to k8s node. %s -> %s", n.Name, kn)
c.nodemapper[kn] = n.Name

// It has a node reference - get that Kubernetes node, and if
// it exists perform a sync.
obj, ok, err := c.indexer.GetByKey(kn)
if !ok {
logrus.Debugf("No corresponding kubernetes node")
return
} else if err != nil {
logrus.WithError(err).Warnf("Couldn't get node from indexer")
return
}
c.syncNodeLabels(obj.(*v1.Node))
}
case bapi.UpdateTypeKVDeleted:
logrus.Debug("Node deleted")
n := upd.KVPair.Value.(*apiv3.Node)
if kn := getK8sNodeName(*n); kn != "" {
// Remove it from the node map.
delete(c.nodemapper, kn)
}
default:
logrus.Errorf("Unhandled update type")
}
}
}
172 changes: 172 additions & 0 deletions tests/fv/node_label_test.go
@@ -0,0 +1,172 @@
// Copyright (c) 2017 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fv_test

import (
"context"
"fmt"
"io/ioutil"
"os"
"reflect"
"time"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"

"github.com/projectcalico/felix/fv/containers"
"github.com/projectcalico/kube-controllers/tests/testutils"
api "github.com/projectcalico/libcalico-go/lib/apis/v3"
client "github.com/projectcalico/libcalico-go/lib/clientv3"
"github.com/projectcalico/libcalico-go/lib/options"
)

var _ = Describe("Node labeling tests", func() {
var (
etcd *containers.Container
policyController *containers.Container
apiserver *containers.Container
c client.Interface
k8sClient *kubernetes.Clientset
controllerManager *containers.Container
)

const kNodeName = "k8snodename"
const cNodeName = "calinodename"

BeforeEach(func() {
// Run etcd.
etcd = testutils.RunEtcd()
c = testutils.GetCalicoClient(etcd.IP)

// Run apiserver.
apiserver = testutils.RunK8sApiserver(etcd.IP)

// Write out a kubeconfig file
kfconfigfile, err := ioutil.TempFile("", "ginkgo-policycontroller")
Expect(err).NotTo(HaveOccurred())
defer os.Remove(kfconfigfile.Name())
data := fmt.Sprintf(testutils.KubeconfigTemplate, apiserver.IP)
kfconfigfile.Write([]byte(data))

// Run the controller.
policyController = testutils.RunPolicyController(etcd.IP, kfconfigfile.Name())

k8sClient, err = testutils.GetK8sClient(kfconfigfile.Name())
Expect(err).NotTo(HaveOccurred())

// Wait for the apiserver to be available.
Eventually(func() error {
_, err := k8sClient.CoreV1().Namespaces().List(metav1.ListOptions{})
return err
}, 30*time.Second, 1*time.Second).Should(BeNil())

// Run controller manager. Empirically it can take around 10s until the
// controller manager is ready to create default service accounts, even
// when the hyperkube image has already been downloaded to run the API
// server. We use Eventually to allow for possible delay when doing
// initial pod creation below.
controllerManager = testutils.RunK8sControllerManager(apiserver.IP)
})

AfterEach(func() {
controllerManager.Stop()
policyController.Stop()
apiserver.Stop()
etcd.Stop()
})

It("should sync labels from k8s -> calico", func() {
// Create a kubernetes node with some labels.
kn := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: kNodeName,
Labels: map[string]string{
"label1": "value1",
},
},
}
_, err := k8sClient.CoreV1().Nodes().Create(kn)
Expect(err).NotTo(HaveOccurred())

// Create a Calico node with a reference to it.
cn := api.NewNode()
cn.Name = cNodeName
cn.Labels = map[string]string{"calico-label": "calico-value", "label1": "badvalue"}
cn.Spec = api.NodeSpec{
OrchRefs: []api.OrchRef{
{
NodeName: kNodeName,
Orchestrator: "k8s",
},
},
}
_, err = c.Nodes().Create(context.Background(), cn, options.SetOptions{})
Expect(err).NotTo(HaveOccurred())

// Expect the node label to sync.
expected := map[string]string{"label1": "value1", "calico-label": "calico-value"}
Eventually(func() error { return expectLabels(c, expected, cNodeName) },
time.Second*15, 500*time.Millisecond).Should(BeNil())

// Update the Kubernetes node labels.
kn, err = k8sClient.CoreV1().Nodes().Get(kn.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
kn.Labels["label1"] = "value2"
_, err = k8sClient.CoreV1().Nodes().Update(kn)
Expect(err).NotTo(HaveOccurred())

// Expect the node label to sync.
expected = map[string]string{"label1": "value2", "calico-label": "calico-value"}
Eventually(func() error { return expectLabels(c, expected, cNodeName) },
time.Second*15, 500*time.Millisecond).Should(BeNil())

// Delete the label, add a different one.
kn, err = k8sClient.CoreV1().Nodes().Get(kn.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
kn.Labels = map[string]string{"label2": "value1"}
_, err = k8sClient.CoreV1().Nodes().Update(kn)
Expect(err).NotTo(HaveOccurred())

// Expect the node labels to sync.
expected = map[string]string{"label2": "value1", "calico-label": "calico-value"}
Eventually(func() error { return expectLabels(c, expected, cNodeName) },
time.Second*15, 500*time.Millisecond).Should(BeNil())

// Delete the Kubernetes node.
k8sClient.CoreV1().Nodes().Delete(kNodeName, &metav1.DeleteOptions{})
Eventually(func() *api.Node {
node, _ := c.Nodes().Get(context.Background(), cNodeName, options.GetOptions{})
return node
}, time.Second*2, 500*time.Millisecond).Should(BeNil())
})
})

func expectLabels(c client.Interface, labels map[string]string, node string) error {
cn, err := c.Nodes().Get(context.Background(), node, options.GetOptions{})
if err != nil {
return err
}
if !reflect.DeepEqual(cn.Labels, labels) {
s := fmt.Sprintf("Labels do not match.\n\nExpected: %#v\n Actual: %#v\n", labels, cn.Labels)
logrus.Warn(s)
return fmt.Errorf(s)
}
return nil
}

0 comments on commit dcb2aff

Please sign in to comment.