Skip to content

Commit

Permalink
began turning attention to cache.Controller
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeSpreitzer authored and skilxn-go committed Jan 27, 2020
1 parent 4289b8a commit e5dcd5a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 31 deletions.
32 changes: 19 additions & 13 deletions staging/src/k8s.io/client-go/tools/cache/controller.go
Expand Up @@ -26,7 +26,16 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
)

// Config contains all the settings for a Controller.
// This file implements a low-level controller that is used in
// sharedIndexInformer, which is an implementation of
// SharedIndexInformer. Such informers, in turn, are key components
// in the high level controllers that form the backbone of the
// Kubernetes control plane. Look at those for examples, or the
// example in
// https://github.com/kubernetes/client-go/tree/master/examples/workqueue
// .

// Config contains all the settings for one of these low-level controllers.
type Config struct {
// The queue for your objects - has to be a DeltaFIFO due to
// assumptions in the implementation. Your Process() function
Expand All @@ -36,19 +45,16 @@ type Config struct {
// Something that can list and watch your objects.
ListerWatcher

// Something that can process your objects.
// Something that can process a popped Deltas.
Process ProcessFunc

// The type of your objects.
// ObjectType is an example object of the type this controller is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` must also be right.
ObjectType runtime.Object

// Reprocess everything at least this often.
// Note that if it takes longer for you to clear the queue than this
// period, you will end up processing items in the order determined
// by FIFO.Replace(). Currently, this is random. If this is a
// problem, we can change that replacement policy to append new
// things to the end of the queue instead of replacing the entire
// queue.
// FullResyncPeriod is the period at which ShouldResync is invoked.
FullResyncPeriod time.Duration

// ShouldResync, if specified, is invoked when the controller's reflector determines the next
Expand All @@ -71,15 +77,15 @@ type ShouldResyncFunc func() bool
// ProcessFunc processes a single object.
type ProcessFunc func(obj interface{}) error

// Controller is a generic controller framework.
// `*controller` implements Controller
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}

// Controller is a generic controller framework.
// Controller is a low-level controller used in sharedIndexInformer.
type Controller interface {
Run(stopCh <-chan struct{})
HasSynced() bool
Expand All @@ -95,7 +101,7 @@ func New(c *Config) Controller {
return ctlr
}

// Run begins processing items, and will continue until a value is sent down stopCh.
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
Expand Down
30 changes: 20 additions & 10 deletions staging/src/k8s.io/client-go/tools/cache/reflector.go
Expand Up @@ -55,18 +55,23 @@ type Reflector struct {
// stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison.
expectedTypeName string
// The type of object we expect to place in the store.
// An example object of the type we expect to place in the store.
// Only the type needs to be right, except that when that is
// `unstructured.Unstructured` the object's `"apiVersion"` must
// also be right.
expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind
// The destination to sync up with the watch source
store Store
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
// period controls timing between one watch ending and
// the beginning of the next one.
// period controls timing between an unsuccessful watch ending and
// the beginning of the next list.
period time.Duration
// The period at which ShouldResync is invoked
resyncPeriod time.Duration
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
Expand Down Expand Up @@ -98,12 +103,16 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa
return indexer, reflector
}

// NewReflector creates a new Reflector object which will keep the given store up to
// date with the server's contents for the given resource. Reflector promises to
// only put things in the store that have the type of expectedType, unless expectedType
// is nil. If resyncPeriod is non-zero, then lists will be executed after every
// resyncPeriod, so that you can use reflectors to periodically process everything as
// well as incrementally processing the things that change.
// NewReflector creates a new Reflector object which will keep the
// given store up to date with the server's contents for the given
// resource. Reflector promises to only put things in the store that
// have the type of expectedType, unless expectedType is nil. If
// resyncPeriod is non-zero, then the reflector will periodically
// consult its ShouldResync function to determine whether to invoke
// the Store's Resync operation; `ShouldResync==nil` means always
// "yes". This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that
// change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
Expand Down Expand Up @@ -147,7 +156,8 @@ func (r *Reflector) setExpectedType(expectedType interface{}) {
// call chains to NewReflector, so they'd be low entropy names for reflectors
var internalPackages = []string{"client-go/tools/cache/"}

// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
Expand Down
22 changes: 14 additions & 8 deletions staging/src/k8s.io/client-go/tools/cache/shared_informer.go
Expand Up @@ -168,21 +168,21 @@ type SharedIndexInformer interface {
}

// NewSharedInformer creates a new instance for the listwatcher.
func NewSharedInformer(lw ListerWatcher, objOfWatchedType runtime.Object, resyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, objOfWatchedType, resyncPeriod, Indexers{})
func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, resyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, exampleObject, resyncPeriod, Indexers{})
}

// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objOfWatchedType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: objOfWatchedType,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objOfWatchedType)),
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
Expand Down Expand Up @@ -245,6 +245,11 @@ type sharedIndexInformer struct {
cacheMutationDetector MutationDetector

listerWatcher ListerWatcher

// objectType is an example object of the type this informer is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` must also be right.
objectType runtime.Object

// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
Expand Down Expand Up @@ -609,9 +614,10 @@ type processorListener struct {
// full resync from the shared informer, but modified by two
// adjustments. One is imposing a lower bound,
// `minimumResyncPeriod`. The other is another lower bound, the
// sharedProcessor's `resyncCheckPeriod`, that is imposed in
// AddEventHandlerWithResyncPeriod invocations made after the
// sharedProcessor starts.
// sharedProcessor's `resyncCheckPeriod`, that is imposed (a) only
// in AddEventHandlerWithResyncPeriod invocations made after the
// sharedProcessor starts and (b) only if the informer does
// resyncs at all.
requestedResyncPeriod time.Duration
// resyncPeriod is the threshold that will be used in the logic
// for this listener. This value differs from
Expand Down

0 comments on commit e5dcd5a

Please sign in to comment.