Skip to content

Commit

Permalink
Merge pull request #14596 from csrwng/buildconfig_controller
Browse files Browse the repository at this point in the history
Refactor BuildConfig controller to use Informers
  • Loading branch information
smarterclayton committed Jun 15, 2017
2 parents dc2fa5a + da9b120 commit a0d7ce2
Show file tree
Hide file tree
Showing 11 changed files with 398 additions and 382 deletions.
25 changes: 25 additions & 0 deletions pkg/build/client/cache/buildconfigs.go
@@ -0,0 +1,25 @@
package cache

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

buildapi "github.com/openshift/origin/pkg/build/api"
cacheclient "github.com/openshift/origin/pkg/client/cache"
)

// NewBuildConfigGetter returns an object that implements the buildclient BuildConfigGetter interface
// using a StoreToBuildConfigLister
func NewBuildConfigGetter(lister cacheclient.StoreToBuildConfigLister) *buildConfigGetter {
return &buildConfigGetter{
lister: lister,
}
}

type buildConfigGetter struct {
lister cacheclient.StoreToBuildConfigLister
}

// Get retrieves a buildconfig from the cache
func (g *buildConfigGetter) Get(namespace, name string, options metav1.GetOptions) (*buildapi.BuildConfig, error) {
return g.lister.BuildConfigs(namespace).Get(name, options)
}
49 changes: 49 additions & 0 deletions pkg/build/client/cache/builds.go
@@ -0,0 +1,49 @@
package cache

import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

buildapi "github.com/openshift/origin/pkg/build/api"
cacheclient "github.com/openshift/origin/pkg/client/cache"
)

// NewBuildLister returns an object that implements the buildclient BuildLister interface
// using a StoreToBuildLister
func NewBuildLister(lister *cacheclient.StoreToBuildLister) *buildLister {
return &buildLister{
lister: lister,
}
}

type buildLister struct {
lister *cacheclient.StoreToBuildLister
}

// List returns a BuildList with the given namespace and get options. Only the LabelSelector
// from the ListOptions is honored.
func (l *buildLister) List(namespace string, opts metav1.ListOptions) (*buildapi.BuildList, error) {
selector, err := labels.Parse(opts.LabelSelector)
if err != nil {
return nil, fmt.Errorf("invalid label selector %q: %v", opts.LabelSelector, err)
}
builds, err := l.lister.Builds(namespace).List(selector)
if err != nil {
return nil, err
}
return buildList(builds), nil
}

func buildList(builds []*buildapi.Build) *buildapi.BuildList {
items := []buildapi.Build{}
for _, b := range builds {
if b != nil {
items = append(items, *b)
}
}
return &buildapi.BuildList{
Items: items,
}
}
11 changes: 9 additions & 2 deletions pkg/build/controller/build/build_controller.go
Expand Up @@ -27,6 +27,7 @@ import (
buildapi "github.com/openshift/origin/pkg/build/api"
"github.com/openshift/origin/pkg/build/api/validation"
buildclient "github.com/openshift/origin/pkg/build/client"
buildcacheclient "github.com/openshift/origin/pkg/build/client/cache"
"github.com/openshift/origin/pkg/build/controller/common"
"github.com/openshift/origin/pkg/build/controller/policy"
"github.com/openshift/origin/pkg/build/controller/strategy"
Expand Down Expand Up @@ -79,6 +80,7 @@ type BuildController struct {
// create a new BuildController
type BuildControllerParams struct {
BuildInformer shared.BuildInformer
BuildConfigInformer shared.BuildConfigInformer
ImageStreamInformer imageinformers.ImageStreamInformer
PodInformer kcoreinformers.PodInformer
SecretInformer kcoreinformers.SecretInformer
Expand All @@ -98,10 +100,15 @@ func NewBuildController(params *BuildControllerParams) *BuildController {
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(params.KubeClientExternal.Core().RESTClient()).Events("")})

buildClient := buildclient.NewOSClientBuildClient(params.OpenshiftClient)
buildConfigGetter := buildclient.NewOSClientBuildConfigClient(params.OpenshiftClient)
// TODO: Switch to using the cache build lister when we figure out
// what is wrong with retrieving by index
// buildLister := buildcacheclient.NewBuildLister(params.BuildInformer.Lister())
_ = buildcacheclient.NewBuildLister(params.BuildInformer.Lister())
buildLister := buildClient
buildConfigGetter := buildcacheclient.NewBuildConfigGetter(params.BuildConfigInformer.Lister())
c := &BuildController{
buildPatcher: buildClient,
buildLister: buildClient,
buildLister: buildLister,
buildConfigGetter: buildConfigGetter,
buildDeleter: buildClient,
secretStore: params.SecretInformer.Lister(),
Expand Down
1 change: 1 addition & 0 deletions pkg/build/controller/build/build_controller_test.go
Expand Up @@ -991,6 +991,7 @@ func newFakeBuildController(openshiftClient client.Interface, imageClient imagei

params := &BuildControllerParams{
BuildInformer: informers.Builds(),
BuildConfigInformer: informers.BuildConfigs(),
PodInformer: informers.InternalKubernetesInformers().Core().InternalVersion().Pods(),
ImageStreamInformer: imageInformers.Image().InternalVersion().ImageStreams(),
SecretInformer: informers.InternalKubernetesInformers().Core().InternalVersion().Secrets(),
Expand Down
273 changes: 273 additions & 0 deletions pkg/build/controller/buildconfig/buildconfig_controller.go
@@ -0,0 +1,273 @@
package controller

import (
"fmt"
"time"

"github.com/golang/glog"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
kapi "k8s.io/kubernetes/pkg/api"
kexternalclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kcontroller "k8s.io/kubernetes/pkg/controller"

buildapi "github.com/openshift/origin/pkg/build/api"
buildclient "github.com/openshift/origin/pkg/build/client"
cachebuildclient "github.com/openshift/origin/pkg/build/client/cache"
buildutil "github.com/openshift/origin/pkg/build/controller/common"
buildgenerator "github.com/openshift/origin/pkg/build/generator"
osclient "github.com/openshift/origin/pkg/client"
"github.com/openshift/origin/pkg/controller/shared"
)

const (
maxRetries = 15
)

// configControllerFatalError represents a fatal error while generating a build.
// An operation that fails because of a fatal error should not be retried.
type configControllerFatalError struct {
// Reason the fatal error occurred
reason string
}

// Error returns the error string for this fatal error
func (e *configControllerFatalError) Error() string {
return fmt.Sprintf("fatal: %s", e.reason)
}

// IsFatal returns true if err is a fatal error
func IsFatal(err error) bool {
_, isFatal := err.(*configControllerFatalError)
return isFatal
}

type BuildConfigController struct {
buildConfigInstantiator buildclient.BuildConfigInstantiator
buildConfigGetter buildclient.BuildConfigGetter
buildLister buildclient.BuildLister
buildDeleter buildclient.BuildDeleter

buildConfigInformer cache.SharedIndexInformer

queue workqueue.RateLimitingInterface

buildConfigStoreSynced func() bool

recorder record.EventRecorder
}

func NewBuildConfigController(openshiftClient osclient.Interface, kubeExternalClient kexternalclientset.Interface, buildConfigInformer shared.BuildConfigInformer, buildInformer shared.BuildInformer) *BuildConfigController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeExternalClient.Core().RESTClient()).Events("")})

buildClient := buildclient.NewOSClientBuildClient(openshiftClient)
buildConfigGetter := cachebuildclient.NewBuildConfigGetter(buildConfigInformer.Lister())
buildConfigInstantiator := buildclient.NewOSClientBuildConfigInstantiatorClient(openshiftClient)
// TODO: Switch to using the cache build lister when we figure out
// what is wrong with retrieving by index
// buildLister := cachebuildclient.NewBuildLister(buildInformer.Lister())
_ = cachebuildclient.NewBuildLister(buildInformer.Lister())
buildLister := buildClient

c := &BuildConfigController{
buildConfigGetter: buildConfigGetter,
buildLister: buildLister,
buildDeleter: buildClient,
buildConfigInstantiator: buildConfigInstantiator,

buildConfigInformer: buildConfigInformer.Informer(),

queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
recorder: eventBroadcaster.NewRecorder(kapi.Scheme, clientv1.EventSource{Component: "buildconfig-controller"}),
}

c.buildConfigInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: c.buildConfigUpdated,
AddFunc: c.buildConfigAdded,
})

c.buildConfigStoreSynced = c.buildConfigInformer.HasSynced
return c
}

func (c *BuildConfigController) handleBuildConfig(bc *buildapi.BuildConfig) error {
glog.V(4).Infof("Handling BuildConfig %s", bcDesc(bc))

if err := buildutil.HandleBuildPruning(bc.Name, bc.Namespace, c.buildLister, c.buildConfigGetter, c.buildDeleter); err != nil {
utilruntime.HandleError(err)
}

hasChangeTrigger := buildapi.HasTriggerType(buildapi.ConfigChangeBuildTriggerType, bc)

if !hasChangeTrigger {
return nil
}

if bc.Status.LastVersion > 0 {
return nil
}

glog.V(4).Infof("Running build for BuildConfig %s", bcDesc(bc))

buildTriggerCauses := []buildapi.BuildTriggerCause{}
// instantiate new build
lastVersion := int64(0)
request := &buildapi.BuildRequest{
TriggeredBy: append(buildTriggerCauses,
buildapi.BuildTriggerCause{
Message: buildapi.BuildTriggerCauseConfigMsg,
}),
ObjectMeta: metav1.ObjectMeta{
Name: bc.Name,
Namespace: bc.Namespace,
},
LastVersion: &lastVersion,
}
if _, err := c.buildConfigInstantiator.Instantiate(bc.Namespace, request); err != nil {
var instantiateErr error
if kerrors.IsConflict(err) {
instantiateErr = fmt.Errorf("unable to instantiate Build for BuildConfig %s due to a conflicting update: %v", bcDesc(bc), err)
utilruntime.HandleError(instantiateErr)
} else if buildgenerator.IsFatal(err) || kerrors.IsNotFound(err) || kerrors.IsBadRequest(err) || kerrors.IsForbidden(err) {
instantiateErr = fmt.Errorf("gave up on Build for BuildConfig %s due to fatal error: %v", bcDesc(bc), err)
utilruntime.HandleError(instantiateErr)
c.recorder.Event(bc, kapi.EventTypeWarning, "BuildConfigInstantiateFailed", instantiateErr.Error())
return &configControllerFatalError{err.Error()}
} else {
instantiateErr = fmt.Errorf("error instantiating Build from BuildConfig %s: %v", bcDesc(bc), err)
c.recorder.Event(bc, kapi.EventTypeWarning, "BuildConfigInstantiateFailed", instantiateErr.Error())
utilruntime.HandleError(instantiateErr)
}
return instantiateErr
}
return nil
}

// buildConfigAdded is called by the buildconfig informer event handler whenever a
// buildconfig is created
func (c *BuildConfigController) buildConfigAdded(obj interface{}) {
bc := obj.(*buildapi.BuildConfig)
c.enqueueBuildConfig(bc)
}

// buildConfigUpdated gets called by the buildconfig informer event handler whenever a
// buildconfig is updated or there is a relist of buildconfigs
func (c *BuildConfigController) buildConfigUpdated(old, cur interface{}) {
bc := cur.(*buildapi.BuildConfig)
c.enqueueBuildConfig(bc)
}

// enqueueBuild adds the given build to the queue.
func (c *BuildConfigController) enqueueBuildConfig(bc *buildapi.BuildConfig) {
key, err := kcontroller.KeyFunc(bc)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for buildconfig %#v: %v", bc, err))
return
}
c.queue.Add(key)
}

// Run begins watching and syncing.
func (c *BuildConfigController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

// Wait for the controller stores to sync before starting any work in this controller.
if !cache.WaitForCacheSync(stopCh, c.buildConfigStoreSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}

glog.Infof("Starting buildconfig controller")

for i := 0; i < workers; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}

<-stopCh
glog.Infof("Shutting down buildconfig controller")
}

func (c *BuildConfigController) worker() {
for {
if quit := c.work(); quit {
return
}
}
}

// work gets the next build from the queue and invokes handleBuild on it
func (c *BuildConfigController) work() bool {
key, quit := c.queue.Get()
if quit {
return true
}

defer c.queue.Done(key)

bc, err := c.getBuildConfigByKey(key.(string))
if err != nil {
c.handleError(err, key)
return false
}
if bc == nil {
return false
}

err = c.handleBuildConfig(bc)
c.handleError(err, key)

return false
}

// handleError is called by the main work loop to check the return of calling handleBuildConfig
// If an error occurred, then the key is re-added to the queue unless it has been retried too many
// times.
func (c *BuildConfigController) handleError(err error, key interface{}) {
if err == nil {
c.queue.Forget(key)
return
}

if IsFatal(err) {
glog.V(2).Infof("Will not retry fatal error for key %v: %v", key, err)
c.queue.Forget(key)
return
}

if c.queue.NumRequeues(key) < maxRetries {
glog.V(4).Infof("Retrying key %v: %v", key, err)
c.queue.AddRateLimited(key)
return
}

glog.V(2).Infof("Giving up retrying %v: %v", key, err)
c.queue.Forget(key)
}

// getBuildConfigByKey looks up a buildconfig by key in the buildConfigInformer cache
func (c *BuildConfigController) getBuildConfigByKey(key string) (*buildapi.BuildConfig, error) {
obj, exists, err := c.buildConfigInformer.GetIndexer().GetByKey(key)
if err != nil {
glog.V(2).Infof("Unable to retrieve buildconfig %s from store: %v", key, err)
return nil, err
}
if !exists {
glog.V(2).Infof("Buildconfig %q has been deleted", key)
return nil, nil
}

return obj.(*buildapi.BuildConfig), nil
}

func bcDesc(bc *buildapi.BuildConfig) string {
return fmt.Sprintf("%s/%s (%d)", bc.Namespace, bc.Name, bc.Status.LastVersion)
}

0 comments on commit a0d7ce2

Please sign in to comment.