Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change to sharedInformer #183

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions pkg/data/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewFromInterface(client dynamic.Interface, opa opa_client.Data, ns types.Re
return s
}

//WithBackoff tunes the values of exponential backoff and jitter factor
// WithBackoff tunes the values of exponential backoff and jitter factor
func WithBackoff(min, max time.Duration, jitterFactor float64) Option {
return func(s *GenericSync) {
s.limiter = workqueue.NewItemExponentialFailureRateLimiter(min, max)
Expand Down Expand Up @@ -119,11 +119,12 @@ func (s *GenericSync) RunContext(ctx context.Context) error {
}

// setup the store and queue for this GenericSync instance
func (s *GenericSync) setup(ctx context.Context) (cache.Store, workqueue.DelayingInterface) {
func (s *GenericSync) setup(ctx context.Context) (cache.SharedInformer, workqueue.DelayingInterface) {

resource := s.client.ResourceFor(s.ns, metav1.NamespaceAll)
queue := workqueue.NewNamedDelayingQueue(s.ns.String())
store, controller := cache.NewInformer(

informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return resource.List(ctx, options)
Expand All @@ -134,19 +135,27 @@ func (s *GenericSync) setup(ctx context.Context) (cache.Store, workqueue.Delayin
},
&unstructured.Unstructured{},
0,
resourceEventQueue{queue},
cache.Indexers{},
)

informer.AddEventHandler(resourceEventQueue{queue})
start, quit := time.Now(), ctx.Done()
go controller.Run(quit)
for !cache.WaitForCacheSync(quit, controller.HasSynced) {
go informer.Run(quit)
for !cache.WaitForCacheSync(quit, informer.HasSynced) {
logrus.Warnf("Failed to sync cache for %v, retrying...", s.ns)
}
if controller.HasSynced() {
if informer.HasSynced() {
logrus.Infof("Initial informer sync for %v completed, took %v", s.ns, time.Since(start))
}

return store, queue
//Add the list after initial startup
for _, item := range informer.GetStore().ListKeys() {
queue.AddAfter(item, 1*time.Second)
}

stopCh := make(chan struct{})
defer close(stopCh)
return informer, queue
}

// resourceEventQueue is a cache.ResourceEventHandler that queues all events
Expand Down Expand Up @@ -203,7 +212,7 @@ const initPath = ""
// loop starts replicating Kubernetes resources into OPA. If an error occurs
// during the replication process, this function will backoff and reload
// all resources into OPA from scratch.
func (s *GenericSync) loop(store cache.Store, queue workqueue.DelayingInterface) {
func (s *GenericSync) loop(store cache.SharedInformer, queue workqueue.DelayingInterface) {

logrus.Infof("Syncing %v.", s.ns)
defer func() {
Expand Down Expand Up @@ -234,14 +243,14 @@ func (s *GenericSync) loop(store cache.Store, queue workqueue.DelayingInterface)
}
}

func (s *GenericSync) processNext(store cache.Store, path string, syncDone *bool) error {
func (s *GenericSync) processNext(store cache.SharedInformer, path string, syncDone *bool) error {

// On receiving the initPath, load a full dump of the data store
if path == initPath {
if *syncDone {
return nil
}
start, list := time.Now(), store.List()
start, list := time.Now(), store.GetStore().List()
if err := s.syncAll(list); err != nil {
return err
}
Expand All @@ -255,7 +264,7 @@ func (s *GenericSync) processNext(store cache.Store, path string, syncDone *bool
return nil
}

obj, exists, err := store.GetByKey(path)
obj, exists, err := store.GetStore().GetByKey(path)
if err != nil {
return fmt.Errorf("store error: %w", err)
}
Expand All @@ -278,7 +287,7 @@ func (s *GenericSync) syncAll(objs []interface{}) error {
if err != nil {
return err
}

return s.opa.PutData("/", payload)
}

Expand Down