Skip to content

Commit

Permalink
Enable the image trigger controller with policy
Browse files Browse the repository at this point in the history
Remove the old build image trigger controller
  • Loading branch information
smarterclayton committed May 18, 2017
1 parent 45b9087 commit 9e46c40
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 76 deletions.
53 changes: 0 additions & 53 deletions pkg/build/controller/factory/factory.go
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/openshift/origin/pkg/build/controller/policy"
strategy "github.com/openshift/origin/pkg/build/controller/strategy"
osclient "github.com/openshift/origin/pkg/client"
oscache "github.com/openshift/origin/pkg/client/cache"
controller "github.com/openshift/origin/pkg/controller"
imageapi "github.com/openshift/origin/pkg/image/api"
errors "github.com/openshift/origin/pkg/util/errors"
Expand Down Expand Up @@ -214,58 +213,6 @@ func (keyListerGetter) GetByKey(key string) (interface{}, bool, error) {
return "", true, nil
}

// ImageChangeControllerFactory can create an ImageChangeController which obtains ImageStreams
// from a queue populated from a watch of all ImageStreams.
type ImageChangeControllerFactory struct {
Client osclient.Interface
BuildConfigInstantiator buildclient.BuildConfigInstantiator
BuildConfigIndex oscache.StoreToBuildConfigLister
BuildConfigIndexSynced func() bool
// Stop may be set to allow controllers created by this factory to be terminated.
Stop <-chan struct{}
}

// Create creates a new ImageChangeController which is used to trigger builds when a new
// image is available
func (factory *ImageChangeControllerFactory) Create() controller.RunnableController {
queue := cache.NewResyncableFIFO(cache.MetaNamespaceKeyFunc)
cache.NewReflector(newImageStreamLW(factory.Client), &imageapi.ImageStream{}, queue, 2*time.Minute).RunUntil(factory.Stop)

imageChangeController := &buildcontroller.ImageChangeController{
BuildConfigIndex: factory.BuildConfigIndex,
BuildConfigInstantiator: factory.BuildConfigInstantiator,
}

// Wait for the bc store to sync before starting any work in this controller.
factory.waitForSyncedStores()

return &controller.RetryController{
Queue: queue,
RetryManager: controller.NewQueueRetryManager(
queue,
cache.MetaNamespaceKeyFunc,
retryFunc("ImageStream update", nil),
flowcontrol.NewTokenBucketRateLimiter(1, 10),
),
Handle: func(obj interface{}) error {
imageRepo := obj.(*imageapi.ImageStream)
return imageChangeController.HandleImageStream(imageRepo)
},
}
}

func (factory *ImageChangeControllerFactory) waitForSyncedStores() {
for !factory.BuildConfigIndexSynced() {
glog.V(4).Infof("Waiting for the bc caches to sync before starting the imagechange buildconfig controller worker")
select {
case <-time.After(storeSyncedPollPeriod):
case <-factory.Stop:
return
}

}
}

type BuildConfigControllerFactory struct {
Client osclient.Interface
KubeClient kclientset.Interface
Expand Down
61 changes: 58 additions & 3 deletions pkg/cmd/server/bootstrappolicy/infra_sa_policy.go
Expand Up @@ -10,18 +10,22 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apis/storage"

// we need the conversions registered for our init block
authorizationapi "github.com/openshift/origin/pkg/authorization/api"
_ "github.com/openshift/origin/pkg/authorization/api/install"
authorizationapiv1 "github.com/openshift/origin/pkg/authorization/api/v1"
buildapi "github.com/openshift/origin/pkg/build/api"

// we need the conversions registered for our init block
_ "github.com/openshift/origin/pkg/authorization/api/install"
deployapi "github.com/openshift/origin/pkg/deploy/api"
imageapi "github.com/openshift/origin/pkg/image/api"
)

const (
InfraBuildControllerServiceAccountName = "build-controller"
BuildControllerRoleName = "system:build-controller"

InfraImageTriggerControllerServiceAccountName = "imagetrigger-controller"
ImageTriggerControllerRoleName = "system:imagetrigger-controller"

InfraDeploymentConfigControllerServiceAccountName = "deploymentconfig-controller"
DeploymentConfigControllerRoleName = "system:deploymentconfig-controller"

Expand Down Expand Up @@ -175,6 +179,57 @@ func init() {
panic(err)
}

err = InfraSAs.addServiceAccount(
InfraImageTriggerControllerServiceAccountName,
authorizationapi.ClusterRole{
ObjectMeta: metav1.ObjectMeta{
Name: ImageTriggerControllerRoleName,
},
Rules: []authorizationapi.PolicyRule{
// List Watch
{
Verbs: sets.NewString("list", "watch"),
APIGroups: []string{imageapi.GroupName, imageapi.LegacyGroupName},
Resources: sets.NewString("imagestreams"),
},
// Spec update on triggerable resources
{
Verbs: sets.NewString("get", "update"),
APIGroups: []string{extensionsGroup},
Resources: sets.NewString("daemonsets"),
},
{
Verbs: sets.NewString("get", "update"),
APIGroups: []string{extensionsGroup, appsGroup},
Resources: sets.NewString("deployments"),
},
{
Verbs: sets.NewString("get", "update"),
APIGroups: []string{appsGroup},
Resources: sets.NewString("statefulsets"),
},
{
Verbs: sets.NewString("get", "update"),
APIGroups: []string{batchGroup},
Resources: sets.NewString("cronjobs"),
},
{
Verbs: sets.NewString("get", "update"),
APIGroups: []string{deployapi.GroupName, deployapi.LegacyGroupName},
Resources: sets.NewString("deploymentconfigs"),
},
{
Verbs: sets.NewString("create"),
APIGroups: []string{buildapi.GroupName, buildapi.LegacyGroupName},
Resources: sets.NewString("buildconfigs/instantiate"),
},
},
},
)
if err != nil {
panic(err)
}

err = InfraSAs.addServiceAccount(
InfraDeploymentConfigControllerServiceAccountName,
authorizationapi.ClusterRole{
Expand Down
9 changes: 9 additions & 0 deletions pkg/cmd/server/origin/master_config.go
Expand Up @@ -1003,6 +1003,15 @@ func (c *MasterConfig) DeploymentControllerClients() (*osclient.Client, kclients
return osClient, internalKubeClientset, externalKubeClientset
}

// ImageTriggerControllerClients returns the trigger controller client objects
func (c *MasterConfig) ImageTriggerControllerClients() (*osclient.Client, kclientsetinternal.Interface, kclientsetexternal.Interface) {
_, osClient, internalKubeClientset, externalKubeClientset, err := c.GetServiceAccountClients(bootstrappolicy.InfraImageTriggerControllerServiceAccountName)
if err != nil {
glog.Fatal(err)
}
return osClient, internalKubeClientset, externalKubeClientset
}

// DeploymentConfigClients returns deploymentConfig and deployment client objects
func (c *MasterConfig) DeploymentConfigClients() (*osclient.Client, kclientsetinternal.Interface) {
return c.PrivilegedLoopbackOpenShiftClient, c.PrivilegedLoopbackKubernetesClientsetInternal
Expand Down
131 changes: 116 additions & 15 deletions pkg/cmd/server/origin/run_components.go
@@ -1,6 +1,7 @@
package origin

import (
"fmt"
"io/ioutil"
"net"
"path"
Expand All @@ -9,16 +10,19 @@ import (

"github.com/golang/glog"

deployclient "github.com/openshift/origin/pkg/deploy/generated/internalclientset/typed/deploy/internalversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
kv1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/cert"
"k8s.io/client-go/util/flowcontrol"
kctrlmgr "k8s.io/kubernetes/cmd/kube-controller-manager/app"
cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
kapi "k8s.io/kubernetes/pkg/api"
kapiv1 "k8s.io/kubernetes/pkg/api/v1"
kappsv1beta1 "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
kextensionsv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
kclientsetexternal "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kclientsetinternal "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller"
Expand Down Expand Up @@ -48,8 +52,13 @@ import (
deploycontroller "github.com/openshift/origin/pkg/deploy/controller/deployment"
deployconfigcontroller "github.com/openshift/origin/pkg/deploy/controller/deploymentconfig"
triggercontroller "github.com/openshift/origin/pkg/deploy/controller/generictrigger"
deployclient "github.com/openshift/origin/pkg/deploy/generated/internalclientset/typed/deploy/internalversion"
"github.com/openshift/origin/pkg/dns"
imagecontroller "github.com/openshift/origin/pkg/image/controller"
imagetriggercontroller "github.com/openshift/origin/pkg/image/controller/trigger"
triggerannotations "github.com/openshift/origin/pkg/image/trigger/annotations"
triggerbuildconfigs "github.com/openshift/origin/pkg/image/trigger/buildconfigs"
triggerdeploymentconfigs "github.com/openshift/origin/pkg/image/trigger/deploymentconfigs"
projectcontroller "github.com/openshift/origin/pkg/project/controller"
quota "github.com/openshift/origin/pkg/quota"
quotacontroller "github.com/openshift/origin/pkg/quota/controller"
Expand Down Expand Up @@ -316,18 +325,6 @@ func (c *MasterConfig) RunBuildPodController() {
go controller.Run(5, utilwait.NeverStop)
}

// RunBuildImageChangeTriggerController starts the build image change trigger controller process.
func (c *MasterConfig) RunBuildImageChangeTriggerController() {
bcClient, _ := c.BuildImageChangeTriggerControllerClients()
bcInstantiator := buildclient.NewOSClientBuildConfigInstantiatorClient(bcClient)
bcIndex := &oscache.StoreToBuildConfigListerImpl{Indexer: c.Informers.BuildConfigs().Indexer()}
bcIndexSynced := c.Informers.BuildConfigs().Informer().HasSynced
factory := buildcontrollerfactory.ImageChangeControllerFactory{Client: bcClient, BuildConfigInstantiator: bcInstantiator, BuildConfigIndex: bcIndex, BuildConfigIndexSynced: bcIndexSynced}
go func() {
factory.Create().Run()
}()
}

// RunBuildConfigChangeController starts the build config change trigger controller process.
func (c *MasterConfig) RunBuildConfigChangeController() {
bcClient, internalKubeClientset, externalKubeClientset := c.BuildConfigChangeControllerClients()
Expand Down Expand Up @@ -388,13 +385,117 @@ func (c *MasterConfig) RunDeploymentConfigController() {
func (c *MasterConfig) RunDeploymentTriggerController() {
dcInfomer := c.Informers.DeploymentConfigs().Informer()
rcInformer := c.Informers.InternalKubernetesInformers().Core().InternalVersion().ReplicationControllers().Informer()
streamInformer := c.Informers.ImageStreams().Informer()
osclient := c.DeploymentTriggerControllerClient()

controller := triggercontroller.NewDeploymentTriggerController(dcInfomer, rcInformer, streamInformer, osclient, c.ExternalVersionCodec)
controller := triggercontroller.NewDeploymentTriggerController(dcInfomer, rcInformer, nil, osclient, c.ExternalVersionCodec)
go controller.Run(5, utilwait.NeverStop)
}

// TODO: remove when generated informers exist
type temporaryLister struct {
*oscache.StoreToImageStreamLister
}

func (l temporaryLister) ImageStreams(namespace string) imagetriggercontroller.ImageStreamNamespaceLister {
return l.StoreToImageStreamLister.ImageStreams(namespace)
}

type podSpecUpdater struct {
kclient kclientsetexternal.Interface
}

func (u podSpecUpdater) Update(obj runtime.Object) error {
switch t := obj.(type) {
case *kextensionsv1beta1.DaemonSet:
_, err := u.kclient.Extensions().DaemonSets(t.Namespace).Update(t)
return err
case *kappsv1beta1.Deployment:
_, err := u.kclient.Apps().Deployments(t.Namespace).Update(t)
return err
case *kappsv1beta1.StatefulSet:
_, err := u.kclient.Apps().StatefulSets(t.Namespace).Update(t)
return err
case *kapiv1.Pod:
_, err := u.kclient.Core().Pods(t.Namespace).Update(t)
return err
default:
return fmt.Errorf("unrecognized object - no trigger update possible for %T", obj)
}
}

func (c *MasterConfig) RunImageTriggerController() {
streamInformer := c.Informers.ImageStreams().Informer()
lister := temporaryLister{c.Informers.ImageStreams().Lister()}

oclient, _, kclient := c.ImageTriggerControllerClients()
updater := podSpecUpdater{kclient}
bcInstantiator := buildclient.NewOSClientBuildConfigInstantiatorClient(oclient)
broadcaster := imagetriggercontroller.NewTriggerEventBroadcaster(kv1core.New(kclient.CoreV1().RESTClient()))

sources := []imagetriggercontroller.TriggerSource{
{
Resource: schema.GroupResource{Group: "apps.openshift.io", Resource: "deploymentconfigs"},
Informer: c.Informers.DeploymentConfigs().Informer(),
Store: c.Informers.DeploymentConfigs().Indexer(),
TriggerFn: triggerdeploymentconfigs.NewDeploymentConfigTriggerIndexer,
Reactor: &triggerdeploymentconfigs.DeploymentConfigReactor{Client: oclient},
},
}
if !c.Options.DisabledFeatures.Has(configapi.FeatureBuilder) {
sources = append(sources, imagetriggercontroller.TriggerSource{
Resource: schema.GroupResource{Group: "build.openshift.io", Resource: "buildconfigs"},
Informer: c.Informers.BuildConfigs().Informer(),
Store: c.Informers.BuildConfigs().Indexer(),
TriggerFn: triggerbuildconfigs.NewBuildConfigTriggerIndexer,
Reactor: &triggerbuildconfigs.BuildConfigReactor{Instantiator: bcInstantiator},
})
}
if !c.Options.DisabledFeatures.Has("triggers.image.openshift.io/deployments") {
sources = append(sources, imagetriggercontroller.TriggerSource{
Resource: schema.GroupResource{Group: "extensions", Resource: "deployments"},
Informer: c.Informers.KubernetesInformers().Apps().V1beta1().Deployments().Informer(),
Store: c.Informers.KubernetesInformers().Apps().V1beta1().Deployments().Informer().GetIndexer(),
TriggerFn: triggerannotations.NewAnnotationTriggerIndexer,
Reactor: &triggerannotations.AnnotationReactor{Updater: updater, Copier: kapi.Scheme},
})
}
if !c.Options.DisabledFeatures.Has("triggers.image.openshift.io/daemonsets") {
sources = append(sources, imagetriggercontroller.TriggerSource{
Resource: schema.GroupResource{Group: "extensions", Resource: "daemonsets"},
Informer: c.Informers.KubernetesInformers().Extensions().V1beta1().DaemonSets().Informer(),
Store: c.Informers.KubernetesInformers().Extensions().V1beta1().DaemonSets().Informer().GetIndexer(),
TriggerFn: triggerannotations.NewAnnotationTriggerIndexer,
Reactor: &triggerannotations.AnnotationReactor{Updater: updater, Copier: kapi.Scheme},
})
}
if !c.Options.DisabledFeatures.Has("triggers.image.openshift.io/statefulsets") {
sources = append(sources, imagetriggercontroller.TriggerSource{
Resource: schema.GroupResource{Group: "apps", Resource: "statefulsets"},
Informer: c.Informers.KubernetesInformers().Apps().V1beta1().StatefulSets().Informer(),
Store: c.Informers.KubernetesInformers().Apps().V1beta1().StatefulSets().Informer().GetIndexer(),
TriggerFn: triggerannotations.NewAnnotationTriggerIndexer,
Reactor: &triggerannotations.AnnotationReactor{Updater: updater, Copier: kapi.Scheme},
})
}
if !c.Options.DisabledFeatures.Has("triggers.image.openshift.io/cronjobs") {
sources = append(sources, imagetriggercontroller.TriggerSource{
Resource: schema.GroupResource{Group: "batch", Resource: "cronjobs"},
Informer: c.Informers.KubernetesInformers().Batch().V2alpha1().CronJobs().Informer(),
Store: c.Informers.KubernetesInformers().Batch().V2alpha1().CronJobs().Informer().GetIndexer(),
TriggerFn: triggerannotations.NewAnnotationTriggerIndexer,
Reactor: &triggerannotations.AnnotationReactor{Updater: updater, Copier: kapi.Scheme},
})
}

trigger := imagetriggercontroller.NewTriggerController(
broadcaster,
streamInformer,
lister,
sources...,
)
go trigger.Run(5, utilwait.NeverStop)
}

// RunSDNController runs openshift-sdn if the said network plugin is provided
func (c *MasterConfig) RunSDNController() {
oClient, kClient := c.SDNControllerClients()
Expand Down
6 changes: 3 additions & 3 deletions pkg/cmd/server/start/start_master.go
Expand Up @@ -701,18 +701,18 @@ func startControllers(oc *origin.MasterConfig, kc *kubernetes.MasterConfig) erro

// no special order
if configapi.IsBuildEnabled(&oc.Options) {
err := oc.RunBuildController(oc.Informers)
if err != nil {
if err := oc.RunBuildController(oc.Informers); err != nil {
glog.Fatalf("Could not start build controller: %v", err)
return err
}
oc.RunBuildPodController()
oc.RunBuildConfigChangeController()
oc.RunBuildImageChangeTriggerController()
}

oc.RunDeploymentController()
oc.RunDeploymentConfigController()
oc.RunDeploymentTriggerController()
oc.RunImageTriggerController()
oc.RunImageImportController()
oc.RunOriginNamespaceController()
oc.RunSDNController()
Expand Down
3 changes: 2 additions & 1 deletion test/integration/authorization_test.go
Expand Up @@ -380,9 +380,10 @@ var globalClusterReaderGroups = sets.NewString("system:cluster-readers", "system

// this list includes any other users who can get DeploymentConfigs
var globalDeploymentConfigGetterUsers = sets.NewString(
"system:serviceaccount:openshift-infra:unidling-controller",
"system:serviceaccount:kube-system:generic-garbage-collector",
"system:serviceaccount:kube-system:namespace-controller",
"system:serviceaccount:openshift-infra:imagetrigger-controller",
"system:serviceaccount:openshift-infra:unidling-controller",
)

type resourceAccessReviewTest struct {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/buildcontroller_test.go
Expand Up @@ -113,7 +113,7 @@ func setupBuildControllerTest(counts controllerCount, t *testing.T) (*client.Cli
openshiftConfig.RunBuildPodController()
}
for i := 0; i < counts.ImageChangeControllers; i++ {
openshiftConfig.RunBuildImageChangeTriggerController()
openshiftConfig.RunImageTriggerController()
}
for i := 0; i < counts.ConfigChangeControllers; i++ {
openshiftConfig.RunBuildConfigChangeController()
Expand Down

0 comments on commit 9e46c40

Please sign in to comment.