Skip to content

Commit

Permalink
openshift-tests: add resource watch monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
mfojtik committed Apr 8, 2020
1 parent 2836c10 commit bb8acd9
Show file tree
Hide file tree
Showing 7 changed files with 620 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/openshift-tests/openshift-tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/openshift/library-go/pkg/serviceability"
"github.com/openshift/origin/pkg/monitor"
"github.com/openshift/origin/pkg/monitor/resourcewatch/cmd"
testginkgo "github.com/openshift/origin/pkg/test/ginkgo"
"github.com/openshift/origin/pkg/version"
exutil "github.com/openshift/origin/test/extended/util"
Expand Down Expand Up @@ -48,6 +49,7 @@ func main() {
newRunUpgradeCommand(),
newRunTestCommand(),
newRunMonitorCommand(),
cmd.NewRunResourceWatchCommand(),
)

pflag.CommandLine = pflag.NewFlagSet("empty", pflag.ExitOnError)
Expand Down
20 changes: 20 additions & 0 deletions pkg/monitor/resourcewatch/cmd/resourcewatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cmd

import (
"github.com/spf13/cobra"

"github.com/openshift/library-go/pkg/controller/controllercmd"

"github.com/openshift/origin/pkg/monitor/resourcewatch/operator"
"github.com/openshift/origin/pkg/version"
)

func NewRunResourceWatchCommand() *cobra.Command {
cmd := controllercmd.
NewControllerCommandConfig("run-resourcewatch", version.Get(), operator.RunOperator).
NewCommand()
cmd.Use = "run-resourcewatch"
cmd.Short = "Run watching resource changes"

return cmd
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package clusteroperatormetric

import (
"context"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"

configv1client "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
)

var clusterOperatorStateMetric *metrics.GaugeVec

func init() {
clusterOperatorStateMetric = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Name: "openshift_ci_monitor_operator_cluster_operator_status",
Help: "A metric that tracks individual cluster operator status.",
},
[]string{"name", "condition", "status"},
)

legacyregistry.MustRegister(clusterOperatorStateMetric)
}

type ClusterOperatorMetricController struct {
clusterOperatorClient configv1client.ClusterOperatorsGetter
}

func NewClusterOperatorMetricController(clusterOperatorInformer cache.SharedInformer, clusterOperatorGetter configv1client.ClusterOperatorsGetter, recorder events.Recorder) factory.Controller {
c := &ClusterOperatorMetricController{
clusterOperatorClient: clusterOperatorGetter,
}
return factory.New().WithInformers(clusterOperatorInformer).WithSync(c.sync).ResyncEvery(1*time.Minute).ToController("ClusterOperatorMetricController", recorder.WithComponentSuffix("cluster-operator-metric"))
}

func (c *ClusterOperatorMetricController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
clusterOperators, err := c.clusterOperatorClient.ClusterOperators().List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
for _, operator := range clusterOperators.Items {
for _, condition := range operator.Status.Conditions {
clusterOperatorStateMetric.WithLabelValues(operator.Name, string(condition.Type), string(condition.Status)).Set(float64(condition.LastTransitionTime.Unix()))
}
}
return nil
}
163 changes: 163 additions & 0 deletions pkg/monitor/resourcewatch/controller/configmonitor/crd_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package configmonitor

import (
"context"
"fmt"
"strings"
"time"

apiextensionsv1beta1lister "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
)

var (
defaultResyncDuration = 1 * time.Minute
)

type ConfigObserverController struct {
crdLister apiextensionsv1beta1lister.CustomResourceDefinitionLister
crdInformer cache.SharedIndexInformer
dynamicClient dynamic.Interface
dynamicInformers []*dynamicConfigInformer
cachedDiscovery discovery.CachedDiscoveryInterface
monitoredResources []schema.GroupVersion
storageHandler cache.ResourceEventHandler
}

func NewConfigObserverController(
dynamicClient dynamic.Interface,
crdInformer cache.SharedIndexInformer,
discoveryClient *discovery.DiscoveryClient,
configStorage cache.ResourceEventHandler,
monitoredResources []schema.GroupVersion,
recorder events.Recorder,
) factory.Controller {
c := &ConfigObserverController{
dynamicClient: dynamicClient,
crdInformer: crdInformer,
storageHandler: configStorage,
monitoredResources: monitoredResources,
cachedDiscovery: memory.NewMemCacheClient(discoveryClient),
}
c.crdLister = apiextensionsv1beta1lister.NewCustomResourceDefinitionLister(c.crdInformer.GetIndexer())

return factory.New().WithInformers(c.crdInformer).ResyncEvery(defaultResyncDuration).WithSync(c.sync).ToController("ConfigObserverController", recorder.WithComponentSuffix("config-observer-controller"))
}

// currentResourceKinds returns list of group version configKind for OpenShift configuration types.
func (c *ConfigObserverController) currentResourceKinds() ([]schema.GroupVersionKind, error) {
observedCrds, err := c.crdLister.List(labels.Everything())
if err != nil {
return nil, err
}
var (
currentConfigResources []schema.GroupVersionKind
currentKinds = sets.NewString()
)
for _, crd := range observedCrds {
for _, gv := range c.monitoredResources {
if !strings.HasSuffix(crd.GetName(), "."+gv.Group) {
continue
}
for _, version := range crd.Spec.Versions {
if !version.Served {
continue
}
gvk := schema.GroupVersionKind{
Group: gv.Group,
Version: gv.Version,
Kind: crd.Spec.Names.Kind,
}
if currentKinds.Has(gvk.Kind) {
continue
}
currentKinds.Insert(gvk.Kind)
currentConfigResources = append(currentConfigResources, gvk)
}
}

}
return currentConfigResources, nil
}

func (c *ConfigObserverController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
current, err := c.currentResourceKinds()
if err != nil {
return err
}

// TODO: The CRD delete case is not handled
var (
currentList []string
needObserverList []string
kindNeedObserver []schema.GroupVersionKind
)
for _, configKind := range current {
currentList = append(currentList, configKind.String())
hasObserver := false
for _, o := range c.dynamicInformers {
if o.isKind(configKind) {
hasObserver = true
break
}
}
if !hasObserver {
kindNeedObserver = append(kindNeedObserver, configKind)
needObserverList = append(needObserverList, configKind.String())
}
}

var (
waitForCacheSyncFn []cache.InformerSynced
syntheticRequeueErr error
)

// If we have new CRD refresh the discovery info and update the mapper
if len(kindNeedObserver) > 0 {
// NOTE: this is very time expensive, only do this when we have new kinds
c.cachedDiscovery.Invalidate()
gr, err := restmapper.GetAPIGroupResources(c.cachedDiscovery)
if err != nil {
return err
}

mapper := restmapper.NewDiscoveryRESTMapper(gr)
for _, kind := range kindNeedObserver {
mapping, err := mapper.RESTMapping(kind.GroupKind(), kind.Version)
if err != nil {
syncCtx.Recorder().Warningf("Unable to find REST mapping for %s/%s: %w", kind.GroupKind().String(), kind.Version, err)
syntheticRequeueErr = err
continue
}

// we got mapping, lets run the dynamicInformer for the config and install GIT storageHandler event handlers
dynamicInformer := newDynamicConfigInformer(kind.Kind, mapping.Resource, c.dynamicClient, c.storageHandler)
waitForCacheSyncFn = append(waitForCacheSyncFn, dynamicInformer.hasSynced)

go func(k schema.GroupVersionKind) {
klog.Infof("Starting dynamic informer for %q ...", k.String())
dynamicInformer.run(ctx)
}(kind)
c.dynamicInformers = append(c.dynamicInformers, dynamicInformer)
}
}

cacheCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if !cache.WaitForCacheSync(cacheCtx.Done(), waitForCacheSyncFn...) {
return fmt.Errorf("timeout while waiting for dynamic informers to start: %#v", kindNeedObserver)
}

return syntheticRequeueErr
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package configmonitor

import (
"context"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
)

type dynamicConfigInformer struct {
informer cache.SharedIndexInformer
hasSynced cache.InformerSynced

groupVersionResource schema.GroupVersionResource
configKind string
}

func newDynamicConfigInformer(kind string, configResource schema.GroupVersionResource, client dynamic.Interface, resourceHandlers ...cache.ResourceEventHandler) *dynamicConfigInformer {
observer := &dynamicConfigInformer{
informer: dynamicinformer.NewDynamicSharedInformerFactory(client, defaultResyncDuration).ForResource(configResource).Informer(),
configKind: kind,
groupVersionResource: configResource,
}
observer.hasSynced = observer.informer.HasSynced
for _, handler := range resourceHandlers {
observer.informer.AddEventHandler(handler)
}
return observer
}

func (c dynamicConfigInformer) isKind(kind schema.GroupVersionKind) bool {
return schema.GroupVersionKind{
Group: c.groupVersionResource.Group,
Version: c.groupVersionResource.Version,
Kind: c.configKind,
} == kind
}

func (c *dynamicConfigInformer) run(ctx context.Context) {
c.informer.Run(ctx.Done())
}
83 changes: 83 additions & 0 deletions pkg/monitor/resourcewatch/operator/starter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package operator

import (
"context"
"os"
"time"

apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextensionsv1beta1informer "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"

configv1client "github.com/openshift/client-go/config/clientset/versioned"
configv1informer "github.com/openshift/client-go/config/informers/externalversions/config/v1"
"github.com/openshift/library-go/pkg/controller/controllercmd"

"github.com/openshift/origin/pkg/monitor/resourcewatch/controller/clusteroperatormetric"
"github.com/openshift/origin/pkg/monitor/resourcewatch/controller/configmonitor"
"github.com/openshift/origin/pkg/monitor/resourcewatch/storage"
)

func RunOperator(ctx context.Context, controllerCtx *controllercmd.ControllerContext) error {
kubeClient, err := apiextensionsclient.NewForConfig(controllerCtx.ProtoKubeConfig)
if err != nil {
return err
}

dynamicClient, err := dynamic.NewForConfig(controllerCtx.KubeConfig)
if err != nil {
return err
}

discoveryClient, err := discovery.NewDiscoveryClientForConfig(controllerCtx.KubeConfig)
if err != nil {
return err
}

repositoryPath := "/repository"
if repositoryPathEnv := os.Getenv("REPOSITORY_PATH"); len(repositoryPathEnv) > 0 {
repositoryPath = repositoryPathEnv
}

configStore, err := storage.NewGitStorage(repositoryPath)
if err != nil {
return err
}

configClient, err := configv1client.NewForConfig(controllerCtx.KubeConfig)
if err != nil {
return err
}

configInformer := configv1informer.NewClusterOperatorInformer(configClient, time.Minute, cache.Indexers{})
crdInformer := apiextensionsv1beta1informer.NewCustomResourceDefinitionInformer(kubeClient, time.Minute, cache.Indexers{})

openshiftConfigObserver := configmonitor.NewConfigObserverController(
dynamicClient,
crdInformer,
discoveryClient,
configStore,
[]schema.GroupVersion{
{
Group: "config.openshift.io", // Track everything under *.config.openshift.io
Version: "v1",
},
},
controllerCtx.EventRecorder,
)

clusterOperatorMetric := clusteroperatormetric.NewClusterOperatorMetricController(configInformer, configClient.ConfigV1(), controllerCtx.EventRecorder)

go crdInformer.Run(ctx.Done())
go configInformer.Run(ctx.Done())

go openshiftConfigObserver.Run(ctx, 1)
go clusterOperatorMetric.Run(ctx, 1)

<-ctx.Done()

return nil
}

0 comments on commit bb8acd9

Please sign in to comment.