Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an Informer example in client-go #44446

Closed
wants to merge 16 commits into from
Closed
47 changes: 47 additions & 0 deletions staging/src/k8s.io/client-go/examples/informer/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package(default_visibility = ["//visibility:public"])

licenses(["notice"])

load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
)

go_binary(
name = "informer",
library = ":go_default_library",
tags = ["automanaged"],
)

go_library(
name = "go_default_library",
srcs = ["main.go"],
tags = ["automanaged"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["main_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
],
)
34 changes: 34 additions & 0 deletions staging/src/k8s.io/client-go/examples/informer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Informer Example

Informers provide a high-level API for creating custom controllers for Kubernetes resources.

This particular example demonstrates:

* How to write an Informer against a core resource type.
* How to handle add, update and delete events.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you link to the thirdparty resource example? Something like "The thirdparty-resources example demonstrates how to use informers against a thirdparty resource"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

The [thirdparty-resources example](../third-party-resources) demonstrates how to use informers against a thirdparty resource.

## Running

To run the example outside the Kubernetes cluster you need to supply the path to a Kubernetes config file.

```sh
go run main.go -kubeconfig=$HOME/.kube/config -logtostderr
```

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what happens when someone runs this program? Example of this is at: https://github.com/kubernetes/client-go/tree/master/examples/create-update-delete-deployment We're trying to keep READMEs of all the client examples similar to each other.

By default `glog` logs to files.
Use the `-logtostderr` command line argument so that you can see the output on the console.

## Running Inside a Kubernetes Cluster

You can also run the example inside a Kubernetes cluster.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we omit the instruction on how to start minikube? After all there are many other ways to get a kubernetes cluster. Instead just something like "Check out this guide on how to setup a Kubernetes cluster".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

In this case it will use [in Cluster configuration](../in-cluster/),
and you don't need to supply `-kubeconfig` or `-master` command line flags.

## Use Cases

* Building controllers that coordinate other resources.
Most controllers in [k8s.io/kubernetes/pkg/controller](https://godoc.org/k8s.io/kubernetes/pkg/controller) use informers.
* Capturing resource events for logging to external systems
(e.g. monitor non-"Normal" events and publish metrics to a time series database)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a ## Cleanup section people can run to clean up the artifacts created by running this example? (For instance: https://github.com/kubernetes/client-go/tree/master/examples/create-update-delete-deployment or https://github.com/kubernetes/client-go/tree/master/examples/in-cluster has this section).

This will help us keep the README files for each client-go example similar.

137 changes: 137 additions & 0 deletions staging/src/k8s.io/client-go/examples/informer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Note: the example only works with the code within the same release/branch.
package main

import (
"flag"
"fmt"

"github.com/golang/glog"

"k8s.io/api/core/v1"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
// Only required to authenticate against GKE clusters
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)

// PodLoggingController logs the name and namespace of pods that are added,
// deleted, or updated
type PodLoggingController struct {
informerFactory informers.SharedInformerFactory
podInformer coreinformers.PodInformer
}

// Run starts shared informers and waits for the shared informer cache to
// synchronize.
func (c *PodLoggingController) Run(stopCh chan struct{}) error {
// Starts all the shared informers that have been created by the factory so
// far.
c.informerFactory.Start(stopCh)
// wait for the initial synchronization of the local cache.
if !cache.WaitForCacheSync(stopCh, c.podInformer.Informer().HasSynced) {
return fmt.Errorf("Failed to sync")
}
return nil
}

func (c *PodLoggingController) podAdd(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
glog.Infof("POD CREATED. %q", key)
} else {
glog.Error(err)
}
}

func (c *PodLoggingController) podUpdate(old, new interface{}) {
newPod := new.(*v1.Pod)
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
glog.Infof("POD UPDATED. %q, %s", key, newPod.Status.Phase)
} else {
glog.Error(err)
}
}

func (c *PodLoggingController) podDelete(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
glog.Infof("POD DELETED: %q", key)
} else {
glog.Error(err)
}
}

// NewPodLoggingController creates a PodLoggingController
func NewPodLoggingController(informerFactory informers.SharedInformerFactory) *PodLoggingController {
podInformer := informerFactory.Core().V1().Pods()

c := &PodLoggingController{
informerFactory: informerFactory,
podInformer: podInformer,
}
podInformer.Informer().AddEventHandler(
// Your custom resource event handlers.
cache.ResourceEventHandlerFuncs{
// Called on creation
AddFunc: c.podAdd,
// Called on resource update and every resyncPeriod on existing resources.
UpdateFunc: c.podUpdate,
// Called on resource deletion.
DeleteFunc: c.podDelete,
},
)
return c
}

func main() {
var kubeconfig string
var master string

flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
flag.StringVar(&master, "master", "", "master url")
flag.Parse()

// BuildConfigFromFlags builds configs from a master url or a kubeconfig
// filepath. If neither masterUrl nor kubeconfigPath are passed, it falls back
// to inClusterConfig. If inClusterConfig fails, it falls back to the default
// config.
config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
if err != nil {
glog.Fatal(err)
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
glog.Fatal(err)
}

factory := informers.NewSharedInformerFactory(clientset, 0)
controller := NewPodLoggingController(factory)
stop := make(chan struct{})
defer close(stop)
err = controller.Run(stop)
if err != nil {
glog.Fatal(err)
}
select {}
}
106 changes: 106 additions & 0 deletions staging/src/k8s.io/client-go/examples/informer/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"fmt"
"math/rand"
"reflect"
"testing"
"testing/quick"
"time"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/testing/fuzzer"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/api"
kapitesting "k8s.io/kubernetes/pkg/api/testing"
)

func podForTest(rand *rand.Rand) *v1.Pod {
apiObjectFuzzer := fuzzer.FuzzerFor(
kapitesting.FuzzerFuncs,
rand,
api.Codecs,
)
var p v1.Pod
apiObjectFuzzer.Fuzz(&p)

p.SetName(fmt.Sprintf("Pod%d", rand.Intn(10)))
p.SetNamespace(fmt.Sprintf("Namespace%d", rand.Intn(10)))
phases := []v1.PodPhase{
"creating",
"starting",
"running",
"stopping",
"deleting",
}
p.Status.Phase = phases[rand.Intn(len(phases))]
p.Spec.InitContainers = nil
p.Status.InitContainerStatuses = nil
return &p
}

func TestMain(t *testing.T) {
clientset := fake.NewSimpleClientset()
fakeWatch := watch.NewFake()
clientset.PrependWatchReactor(
"pods",
clienttesting.DefaultWatchReactor(fakeWatch, nil),
)
factory := informers.NewSharedInformerFactory(clientset, 0)
c := NewPodLoggingController(factory)

stop := make(chan struct{})
defer close(stop)

err := c.Run(stop)
if err != nil {
t.Error(err)
}

f := func(p *v1.Pod, operation func(runtime.Object)) bool {
operation(p)
return true
}

operations := []func(runtime.Object){
fakeWatch.Add,
fakeWatch.Delete,
}

err = quick.Check(
f,
&quick.Config{
MaxCount: 1000,
Values: func(values []reflect.Value, r *rand.Rand) {
p := podForTest(r)
values[0] = reflect.ValueOf(p)
operation := operations[rand.Intn(len(operations))]
values[1] = reflect.ValueOf(operation)
},
},
)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
}