Skip to content

Commit

Permalink
Merge pull request #2057 from npinaeva/merge-7-feb-2024
Browse files Browse the repository at this point in the history
OCPBUGS-20336,OCPBUGS-28558: [DownstreamMerge] 7 Feb 2024
  • Loading branch information
openshift-merge-bot[bot] committed Feb 8, 2024
2 parents b5425d6 + 615b5f2 commit c74dcd4
Show file tree
Hide file tree
Showing 16 changed files with 149 additions and 81 deletions.
18 changes: 11 additions & 7 deletions go-controller/cmd/ovnkube/ovnkube.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,17 +469,17 @@ func runOvnKube(ctx context.Context, runMode *ovnkubeRunMode, ovnClientset *util
defer clusterManagerWatchFactory.Shutdown()
}

cm, err := clustermanager.NewClusterManager(ovnClientset.GetClusterManagerClientset(), clusterManagerWatchFactory,
clusterManager, err := clustermanager.NewClusterManager(ovnClientset.GetClusterManagerClientset(), clusterManagerWatchFactory,
runMode.identity, wg, eventRecorder)
if err != nil {
return fmt.Errorf("failed to create new cluster manager: %w", err)
}
metrics.RegisterClusterManagerFunctional()
err = cm.Start(ctx)
err = clusterManager.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start cluster manager: %w", err)
}
defer cm.Stop()
defer clusterManager.Stop()

// record delay until ready
metrics.MetricClusterManagerReadyDuration.Set(time.Since(startTime).Seconds())
Expand All @@ -498,7 +498,7 @@ func runOvnKube(ctx context.Context, runMode *ovnkubeRunMode, ovnClientset *util
return fmt.Errorf("error when trying to initialize libovsdb SB client: %v", err)
}

cm, err := controllerManager.NewNetworkControllerManager(ovnClientset, masterWatchFactory, libovsdbOvnNBClient, libovsdbOvnSBClient, eventRecorder, wg)
networkControllerManager, err := controllerManager.NewNetworkControllerManager(ovnClientset, masterWatchFactory, libovsdbOvnNBClient, libovsdbOvnSBClient, eventRecorder, wg)
if err != nil {
return err
}
Expand All @@ -510,7 +510,7 @@ func runOvnKube(ctx context.Context, runMode *ovnkubeRunMode, ovnClientset *util
ovnkubeControllerWG.Add(1)
go func() {
defer ovnkubeControllerWG.Done()
err = cm.Start(ctx)
err = networkControllerManager.Start(ctx)
if err != nil {
ovnkubeControllerStartErr = fmt.Errorf("failed to start ovnkube controller: %w", err)
klog.Error(ovnkubeControllerStartErr)
Expand All @@ -519,9 +519,12 @@ func runOvnKube(ctx context.Context, runMode *ovnkubeRunMode, ovnClientset *util
// record delay until ready
metrics.MetricOVNKubeControllerReadyDuration.Set(time.Since(startTime).Seconds())
}()
// make sure ovnkubeController started in a separate goroutine will execute .Stop() on shutdown.
// Stop() only makes sense to call if Start() succeeded.
defer func() {
if ovnkubeControllerStartErr != nil {
cm.Stop()
ovnkubeControllerWG.Wait()
if ovnkubeControllerStartErr == nil {
networkControllerManager.Stop()
}
}()
}
Expand Down Expand Up @@ -560,6 +563,7 @@ func runOvnKube(ctx context.Context, runMode *ovnkubeRunMode, ovnClientset *util
metrics.MetricNodeReadyDuration.Set(time.Since(startTime).Seconds())
}

// wait for ovnkubeController to start and check error
if runMode.ovnkubeController {
ovnkubeControllerWG.Wait()
if ovnkubeControllerStartErr != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,20 +168,20 @@ func (c *Controller) Start(threadiness int) error {
defer utilruntime.HandleCrash()

klog.Infof("Starting Egress Services Controller")
if !util.WaitForNamedCacheSyncWithTimeout("egressservices", c.stopCh, c.egressServiceSynced) {
return fmt.Errorf("timed out waiting for egressservice caches to sync")
if !util.WaitForInformerCacheSyncWithTimeout("egressservices", c.stopCh, c.egressServiceSynced) {
return fmt.Errorf("timed out waiting for egress service caches to sync")
}

if !util.WaitForNamedCacheSyncWithTimeout("egressservices_services", c.stopCh, c.servicesSynced) {
return fmt.Errorf("timed out waiting for caches to sync")
if !util.WaitForInformerCacheSyncWithTimeout("egressservices_services", c.stopCh, c.servicesSynced) {
return fmt.Errorf("timed out waiting for service caches (for egress services) to sync")
}

if !util.WaitForNamedCacheSyncWithTimeout("egressservices_endpointslices", c.stopCh, c.endpointSlicesSynced) {
return fmt.Errorf("timed out waiting for caches to sync")
if !util.WaitForInformerCacheSyncWithTimeout("egressservices_endpointslices", c.stopCh, c.endpointSlicesSynced) {
return fmt.Errorf("timed out waiting for endpoint slice caches (for egress services) to sync")
}

if !util.WaitForNamedCacheSyncWithTimeout("egressservices_nodes", c.stopCh, c.nodesSynced) {
return fmt.Errorf("timed out waiting for caches to sync")
if !util.WaitForInformerCacheSyncWithTimeout("egressservices_nodes", c.stopCh, c.nodesSynced) {
return fmt.Errorf("timed out waiting for node caches (for egress services) to sync")
}

klog.Infof("Repairing Egress Services")
Expand Down
4 changes: 2 additions & 2 deletions go-controller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func (c *controller[T]) Start(threadiness int) error {
return fmt.Errorf("failed to add event handler: %w", err)
}

if !util.WaitForNamedCacheSyncWithTimeout(c.name, c.stopChan, c.eventHandler.HasSynced) {
return fmt.Errorf("timed out waiting for egressservice caches to sync")
if !util.WaitForInformerCacheSyncWithTimeout(c.name, c.stopChan, c.config.Informer.HasSynced) {
return fmt.Errorf("timed out waiting for %s informer cache to sync", c.name)
}

// now we have already started receiving events and putting keys in the queue.
Expand Down
27 changes: 26 additions & 1 deletion go-controller/pkg/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ var _ = Describe("Watch Factory Operations", func() {
adminNetworkPolicies []*anpapi.AdminNetworkPolicy
baselineAdminNetworkPolicies []*anpapi.BaselineAdminNetworkPolicy
err error
shutdown bool
)

const (
Expand Down Expand Up @@ -478,10 +479,13 @@ var _ = Describe("Watch Factory Operations", func() {
}
return true, obj, nil
})
shutdown = false
})

AfterEach(func() {
wf.Shutdown()
if !shutdown {
wf.Shutdown()
}
})

Context("when a processExisting is given", func() {
Expand Down Expand Up @@ -745,6 +749,27 @@ var _ = Describe("Watch Factory Operations", func() {
baselineAdminNetworkPolicies = append(baselineAdminNetworkPolicies, newBaselineAdminNetworkPolicy("myBANP2"))
testExisting(BaselineAdminNetworkPolicyType)
})
It("doesn't deadlock when factory is shutdown", func() {
// every queue has length 10, but some events may be handled before the stop channel event is selected,
// so multiply by 15 instead of 10 to ensure overflow
for i := uint32(1); i <= defaultNumEventQueues*15; i++ {
pods = append(pods, newPod(fmt.Sprintf("pod%d", i), "default"))
}
wf, err = NewMasterWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())
err = wf.Start()
Expect(err).NotTo(HaveOccurred())
wf.Shutdown()
shutdown = true
h, err := wf.addHandler(PodType, "", nil,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
UpdateFunc: func(old, new interface{}) {},
DeleteFunc: func(obj interface{}) {},
}, nil, wf.GetHandlerPriority(PodType))
Expect(err).NotTo(HaveOccurred())
wf.removeHandler(PodType, h)
})
})

Context("when EgressIP is disabled", func() {
Expand Down
39 changes: 23 additions & 16 deletions go-controller/pkg/factory/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ type initialAddFn func(*Handler, []interface{})

type queueMap struct {
sync.Mutex
entries map[ktypes.NamespacedName]*queueMapEntry
queues []chan *event
wg *sync.WaitGroup
entries map[ktypes.NamespacedName]*queueMapEntry
queues []chan *event
wg *sync.WaitGroup
stopChan chan struct{}
}

type queueMapEntry struct {
Expand Down Expand Up @@ -221,19 +222,20 @@ func (i *informer) removeHandler(handler *Handler) {
}()
}

func newQueueMap(numEventQueues uint32, wg *sync.WaitGroup) *queueMap {
func newQueueMap(numEventQueues uint32, wg *sync.WaitGroup, stopChan chan struct{}) *queueMap {
qm := &queueMap{
entries: make(map[ktypes.NamespacedName]*queueMapEntry),
queues: make([]chan *event, numEventQueues),
wg: wg,
entries: make(map[ktypes.NamespacedName]*queueMapEntry),
queues: make([]chan *event, numEventQueues),
wg: wg,
stopChan: stopChan,
}
for j := 0; j < int(numEventQueues); j++ {
qm.queues[j] = make(chan *event, 10)
}
return qm
}

func (qm *queueMap) processEvents(queue chan *event, stopChan <-chan struct{}) {
func (qm *queueMap) processEvents(queue chan *event) {
defer qm.wg.Done()
for {
select {
Expand All @@ -242,16 +244,16 @@ func (qm *queueMap) processEvents(queue chan *event, stopChan <-chan struct{}) {
return
}
e.process(e)
case <-stopChan:
case <-qm.stopChan:
return
}
}
}

func (qm *queueMap) start(stopChan chan struct{}) {
func (qm *queueMap) start() {
qm.wg.Add(len(qm.queues))
for _, q := range qm.queues {
go qm.processEvents(q, stopChan)
go qm.processEvents(q)
}
}

Expand Down Expand Up @@ -352,14 +354,19 @@ func (qm *queueMap) releaseQueueMapEntry(key ktypes.NamespacedName, entry *queue
// enqueueEvent adds an event to the appropriate queue for the object
func (qm *queueMap) enqueueEvent(oldObj, obj interface{}, oType reflect.Type, isDel bool, processFunc func(*event)) {
key, entry := qm.getQueueMapEntry(oType, obj)
qm.queues[entry.queue] <- &event{
event := &event{
obj: obj,
oldObj: oldObj,
process: func(e *event) {
processFunc(e)
qm.releaseQueueMapEntry(key, entry, isDel)
},
}
select {
case qm.queues[entry.queue] <- event:
case <-qm.stopChan:
return
}
}

func ensureObjectOnDelete(obj interface{}, expectedType reflect.Type) (interface{}, error) {
Expand Down Expand Up @@ -565,17 +572,17 @@ func newQueuedInformer(oType reflect.Type, sharedInformer cache.SharedIndexInfor
if err != nil {
return nil, err
}
i.queueMap = newQueueMap(numEventQueues, &i.shutdownWg)
i.queueMap.start(stopChan)
i.queueMap = newQueueMap(numEventQueues, &i.shutdownWg, stopChan)
i.queueMap.start()

i.initialAddFunc = func(h *Handler, items []interface{}) {
// Make a handler-specific channel array across which the
// initial add events will be distributed. When a new handler
// is added, only that handler should receive events for all
// existing objects.
addsWg := &sync.WaitGroup{}
addsMap := newQueueMap(numEventQueues, addsWg)
addsMap.start(stopChan)
addsMap := newQueueMap(numEventQueues, addsWg, stopChan)
addsMap.start()

// Distribute the existing items into the handler-specific
// channel array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ func (nadController *NetAttachDefinitionController) Start() error {

func (nadController *NetAttachDefinitionController) start() error {
nadController.nadFactory.Start(nadController.stopChan)
if !util.WaitForNamedCacheSyncWithTimeout(nadController.name, nadController.stopChan, nadController.netAttachDefSynced) {
return fmt.Errorf("stop requested while syncing caches")
if !util.WaitForInformerCacheSyncWithTimeout(nadController.name, nadController.stopChan, nadController.netAttachDefSynced) {
return fmt.Errorf("stop requested while syncing %s caches", nadController.name)
}

err := nadController.SyncNetworkControllers()
Expand Down
2 changes: 1 addition & 1 deletion go-controller/pkg/node/controllers/egressip/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup, threads int
syncWg.Add(1)
go func(resourceName string, syncFn cache.InformerSynced) {
defer syncWg.Done()
if !util.WaitForNamedCacheSyncWithTimeout(resourceName, stopCh, syncFn) {
if !util.WaitForInformerCacheSyncWithTimeout(resourceName, stopCh, syncFn) {
syncErrs = append(syncErrs, fmt.Errorf("timed out waiting for %q caches to sync", resourceName))
}
}(se.resourceName, se.syncFn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func runController(testNS ns.NetNS, c *Controller) (cleanupFn, error) {
{"eippod", c.podInformer.HasSynced},
} {
func(resourceName string, syncFn cache.InformerSynced) {
if !util.WaitForNamedCacheSyncWithTimeout(resourceName, stopCh, syncFn) {
if !util.WaitForInformerCacheSyncWithTimeout(resourceName, stopCh, syncFn) {
gomega.PanicWith(fmt.Sprintf("timed out waiting for %q caches to sync", resourceName))
}
}(se.resourceName, se.syncFn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,16 @@ func (c *Controller) Run(wg *sync.WaitGroup, threadiness int) error {

klog.Infof("Starting Egress Services Controller")

if !util.WaitForNamedCacheSyncWithTimeout("egressservices", c.stopCh, c.egressServiceSynced) {
return fmt.Errorf("timed out waiting for caches to sync")
if !util.WaitForInformerCacheSyncWithTimeout("egressservices", c.stopCh, c.egressServiceSynced) {
return fmt.Errorf("timed out waiting for egress service caches to sync")
}

if !util.WaitForNamedCacheSyncWithTimeout("egressservices_services", c.stopCh, c.servicesSynced) {
return fmt.Errorf("timed out waiting for caches to sync")
if !util.WaitForInformerCacheSyncWithTimeout("egressservices_services", c.stopCh, c.servicesSynced) {
return fmt.Errorf("timed out waiting for service caches (for egress services) to sync")
}

if !util.WaitForNamedCacheSyncWithTimeout("egressservices_endpointslices", c.stopCh, c.endpointSlicesSynced) {
return fmt.Errorf("timed out waiting for caches to sync")
if !util.WaitForInformerCacheSyncWithTimeout("egressservices_endpointslices", c.stopCh, c.endpointSlicesSynced) {
return fmt.Errorf("timed out waiting for endpoint slice caches (for egress services) to sync")
}

klog.Infof("Repairing Egress Services")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) {

// Wait for the caches to be synced
klog.Info("Waiting for informer caches to sync")
if !util.WaitForNamedCacheSyncWithTimeout(c.controllerName, stopCh, c.anpCacheSynced, c.banpCacheSynced, c.anpNamespaceSynced, c.anpPodSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !util.WaitForInformerCacheSyncWithTimeout(c.controllerName, stopCh, c.anpCacheSynced, c.banpCacheSynced, c.anpNamespaceSynced, c.anpPodSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for admin network policy caches to sync"))
klog.Errorf("Error syncing caches for admin network policy and baseline admin network policy")
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,20 +204,20 @@ func (c *Controller) Run(wg *sync.WaitGroup, threadiness int) error {

klog.Infof("Starting Egress Services Controller")

if !util.WaitForNamedCacheSyncWithTimeout("egressservices", c.stopCh, c.egressServiceSynced) {
return fmt.Errorf("timed out waiting for caches to sync")
if !util.WaitForInformerCacheSyncWithTimeout("egressservices", c.stopCh, c.egressServiceSynced) {
return fmt.Errorf("timed out waiting for egress service caches to sync")
}

if !util.WaitForNamedCacheSyncWithTimeout("egressservices_services", c.stopCh, c.servicesSynced) {
return fmt.Errorf("timed out waiting for caches to sync")
if !util.WaitForInformerCacheSyncWithTimeout("egressservices_services", c.stopCh, c.servicesSynced) {
return fmt.Errorf("timed out waiting for service caches (for egress services) to sync")
}

if !util.WaitForNamedCacheSyncWithTimeout("egressservices_endpointslices", c.stopCh, c.endpointSlicesSynced) {
return fmt.Errorf("timed out waiting for caches to sync")
if !util.WaitForInformerCacheSyncWithTimeout("egressservices_endpointslices", c.stopCh, c.endpointSlicesSynced) {
return fmt.Errorf("timed out waiting for endpoint slice caches (for egress services) to sync")
}

if !util.WaitForNamedCacheSyncWithTimeout("egressservices_nodes", c.stopCh, c.nodesSynced) {
return fmt.Errorf("timed out waiting for caches to sync")
if !util.WaitForInformerCacheSyncWithTimeout("egressservices_nodes", c.stopCh, c.nodesSynced) {
return fmt.Errorf("timed out waiting for node caches (for egress services) to sync")
}

klog.Infof("Repairing Egress Services")
Expand Down

0 comments on commit c74dcd4

Please sign in to comment.