Skip to content

Commit

Permalink
Use shared informers in image controllers
Browse files Browse the repository at this point in the history
  • Loading branch information
soltysh authored and Michal Fojtik committed May 25, 2017
1 parent 4b3473a commit 016b65a
Show file tree
Hide file tree
Showing 6 changed files with 551 additions and 362 deletions.
37 changes: 16 additions & 21 deletions pkg/cmd/server/origin/run_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
utilwait "k8s.io/apimachinery/pkg/util/wait"
kv1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/cert"
"k8s.io/client-go/util/flowcontrol"
kctrlmgr "k8s.io/kubernetes/cmd/kube-controller-manager/app"
cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
kapi "k8s.io/kubernetes/pkg/api"
Expand Down Expand Up @@ -407,31 +406,27 @@ func (c *MasterConfig) RunServiceServingCertController(client kclientsetinternal

// RunImageImportController starts the image import trigger controller process.
func (c *MasterConfig) RunImageImportController() {
osclient := c.ImageImportControllerClient()
controller := imagecontroller.NewImageStreamController(c.ImageImportControllerClient(), c.Informers.ImageStreams())
scheduledController := imagecontroller.NewScheduledImageStreamController(c.ImageImportControllerClient(), c.Informers.ImageStreams(), imagecontroller.ScheduledImageStreamControllerOptions{
Resync: time.Duration(c.Options.ImagePolicyConfig.ScheduledImageImportMinimumIntervalSeconds) * time.Second,

var limiter flowcontrol.RateLimiter = nil
if c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute <= 0 {
limiter = flowcontrol.NewFakeAlwaysRateLimiter()
} else {
importRate := float32(c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute) / float32(time.Minute/time.Second)
importBurst := c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute * 2
limiter = flowcontrol.NewTokenBucketRateLimiter(importRate, importBurst)
}
Enabled: !c.Options.ImagePolicyConfig.DisableScheduledImport,
DefaultBucketSize: 4, // TODO: Make this configurable?
MaxImageImportsPerMinute: c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute,
})

factory := imagecontroller.ImportControllerFactory{
Client: osclient,
ResyncInterval: 10 * time.Minute,
MinimumCheckInterval: time.Duration(c.Options.ImagePolicyConfig.ScheduledImageImportMinimumIntervalSeconds) * time.Second,
ImportRateLimiter: limiter,
ScheduleEnabled: !c.Options.ImagePolicyConfig.DisableScheduledImport,
}
controller, scheduledController := factory.Create()
controller.Run()
// Setup notifier on the main controller so that it informs the scheduled controller when streams are being imported
controller.SetNotifier(scheduledController)

// TODO align with https://github.com/openshift/origin/pull/13579 once it merges
stopCh := make(chan struct{})
go controller.Run(5, stopCh)
if c.Options.ImagePolicyConfig.DisableScheduledImport {
glog.V(2).Infof("Scheduled image import is disabled - the 'scheduled' flag on image streams will be ignored")
} else {
scheduledController.RunUntil(utilwait.NeverStop)
return
}

go scheduledController.Run(stopCh)
}

// RunSecurityAllocationController starts the security allocation controller process.
Expand Down
187 changes: 75 additions & 112 deletions pkg/image/controller/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,139 +3,102 @@ package controller
import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"

"github.com/openshift/origin/pkg/client"
"github.com/openshift/origin/pkg/controller"
"github.com/openshift/origin/pkg/image/api"
ctrl "github.com/openshift/origin/pkg/controller"
"github.com/openshift/origin/pkg/controller/shared"
)

// ImportControllerFactory can create an ImportController.
type ImportControllerFactory struct {
Client client.Interface
ResyncInterval time.Duration
MinimumCheckInterval time.Duration
ImportRateLimiter flowcontrol.RateLimiter
ScheduleEnabled bool
}
// ImageStreamControllerOptions represents a configuration for the scheduled image stream
// import controller.
type ScheduledImageStreamControllerOptions struct {
Resync time.Duration

// Create creates an ImportController.
func (f *ImportControllerFactory) Create() (controller.RunnableController, controller.StoppableController) {
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return f.Client.ImageStreams(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return f.Client.ImageStreams(metav1.NamespaceAll).Watch(options)
},
}
q := cache.NewResyncableFIFO(cache.MetaNamespaceKeyFunc)
cache.NewReflector(lw, &api.ImageStream{}, q, f.ResyncInterval).Run()
// Enabled indicates that the scheduled imports for images are allowed.
Enabled bool

// instantiate a scheduled importer using a number of buckets
buckets := 4
switch {
case f.MinimumCheckInterval > time.Hour:
buckets = 8
case f.MinimumCheckInterval < 10*time.Minute:
buckets = 2
}
seconds := f.MinimumCheckInterval / time.Second
bucketQPS := 1.0 / float32(seconds) * float32(buckets)

limiter := flowcontrol.NewTokenBucketRateLimiter(bucketQPS, 1)
b := newScheduled(f.ScheduleEnabled, f.Client, buckets, limiter, f.ImportRateLimiter)

// instantiate an importer for changes that happen to the image stream
changed := &controller.RetryController{
Queue: q,
RetryManager: controller.NewQueueRetryManager(
q,
cache.MetaNamespaceKeyFunc,
func(obj interface{}, err error, retries controller.Retry) bool {
utilruntime.HandleError(err)
return retries.Count < 5
},
flowcontrol.NewTokenBucketRateLimiter(1, 10),
),
Handle: b.Handle,
}
// DefaultBucketSize is the default bucket size used by QPS.
DefaultBucketSize int

return changed, b.scheduler
// MaxImageImportsPerMinute sets the maximum number of simultaneous image imports per
// minute.
MaxImageImportsPerMinute int
}

type uniqueItem struct {
uid string
resourceVersion string
// Buckets returns the bucket size calculated based on the resync interval of the
// scheduled image import controller. For resync interval bigger than our the bucket size
// is doubled, for resync lower then 10 minutes bucket size is set to a half of the
// default size.
func (opts ScheduledImageStreamControllerOptions) Buckets() int {
buckets := opts.DefaultBucketSize // 4
switch {
case opts.Resync > time.Hour:
return buckets * 2
case opts.Resync < 10*time.Minute:
return buckets / 2
}
return buckets
}

// scheduled watches for changes to image streams and adds them to the list of streams to be
// periodically imported (later) or directly imported (now).
type scheduled struct {
enabled bool
scheduler *controller.Scheduler
rateLimiter flowcontrol.RateLimiter
controller *ImportController
// BucketsToQPS converts the bucket size to QPS
func (opts ScheduledImageStreamControllerOptions) BucketsToQPS() float32 {
seconds := float32(opts.Resync / time.Second)
return 1.0 / seconds * float32(opts.Buckets())
}

// newScheduled initializes a scheduled import object and sets its scheduler. Limiter is optional.
func newScheduled(enabled bool, client client.ImageStreamsNamespacer, buckets int, bucketLimiter, importLimiter flowcontrol.RateLimiter) *scheduled {
b := &scheduled{
enabled: enabled,
rateLimiter: importLimiter,
controller: &ImportController{
streams: client,
},
// GetRateLimiter returns a flowcontrol rate limiter based on the maximum number of
// imports (MaxImageImportsPerMinute) setting.
func (opts ScheduledImageStreamControllerOptions) GetRateLimiter() flowcontrol.RateLimiter {
if opts.MaxImageImportsPerMinute <= 0 {
return flowcontrol.NewFakeAlwaysRateLimiter()
}
b.scheduler = controller.NewScheduler(buckets, bucketLimiter, b.HandleTimed)
return b
}

// Handle ensures an image stream is checked for scheduling and then runs a direct import
func (b *scheduled) Handle(obj interface{}) error {
stream := obj.(*api.ImageStream)
if b.enabled && needsScheduling(stream) {
key, _ := cache.MetaNamespaceKeyFunc(stream)
b.scheduler.Add(key, uniqueItem{uid: string(stream.UID), resourceVersion: stream.ResourceVersion})
}
return b.controller.Next(stream, b)
importRate := float32(opts.MaxImageImportsPerMinute) / float32(time.Minute/time.Second)
importBurst := opts.MaxImageImportsPerMinute * 2
return flowcontrol.NewTokenBucketRateLimiter(importRate, importBurst)
}

// HandleTimed is invoked when a key is ready to be processed.
func (b *scheduled) HandleTimed(key, value interface{}) {
if !b.enabled {
b.scheduler.Remove(key, value)
return
}
if b.rateLimiter != nil && !b.rateLimiter.TryAccept() {
return
}
namespace, name, _ := cache.SplitMetaNamespaceKey(key.(string))
if err := b.controller.NextTimedByName(namespace, name); err != nil {
// the stream cannot be imported
if err == ErrNotImportable {
// value must match to be removed, so we avoid races against creation by ensuring that we only
// remove the stream if the uid and resource version in the scheduler are exactly the same.
b.scheduler.Remove(key, value)
return
}
utilruntime.HandleError(err)
return
// NewImageStreamController returns a new image stream import controller.
func NewImageStreamController(namespacer client.ImageStreamsNamespacer, informer shared.ImageStreamInformer) *ImageStreamController {
controller := &ImageStreamController{
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),

isNamespacer: namespacer,
lister: temporaryLister{informer.Lister()},
listerSynced: informer.Informer().HasSynced,
}

informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addImageStream,
UpdateFunc: controller.updateImageStream,
})

return controller
}

// Importing is invoked when the controller decides to import a stream in order to push back
// the next schedule time.
func (b *scheduled) Importing(stream *api.ImageStream) {
if !b.enabled {
return
// NewScheduledImageStreamController returns a new scheduled image stream import
// controller.
func NewScheduledImageStreamController(namespacer client.ImageStreamsNamespacer, informer shared.ImageStreamInformer, opts ScheduledImageStreamControllerOptions) *ScheduledImageStreamController {
bucketLimiter := flowcontrol.NewTokenBucketRateLimiter(opts.BucketsToQPS(), 1)

controller := &ScheduledImageStreamController{
enabled: opts.Enabled,
rateLimiter: opts.GetRateLimiter(),
isNamespacer: namespacer,
lister: temporaryLister{informer.Lister()},
listerSynced: informer.Informer().HasSynced,
}
// Push the current key back to the end of the queue because it's just been imported
key, _ := cache.MetaNamespaceKeyFunc(stream)
b.scheduler.Delay(key)

controller.scheduler = ctrl.NewScheduler(opts.Buckets(), bucketLimiter, controller.syncTimed)

informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addImageStream,
UpdateFunc: controller.updateImageStream,
DeleteFunc: controller.deleteImageStream,
})

return controller
}
Loading

0 comments on commit 016b65a

Please sign in to comment.