Skip to content

Commit

Permalink
Merge pull request #2064 from npinaeva/ocpbugs-29185
Browse files Browse the repository at this point in the history
[release-4.15] OCPBUGS-29185: Wait for ovnkube controller to start before checking result error.
  • Loading branch information
openshift-merge-bot[bot] committed Feb 13, 2024
2 parents c5c6330 + ca7c96b commit 3fdc924
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 24 deletions.
18 changes: 11 additions & 7 deletions go-controller/cmd/ovnkube/ovnkube.go
Expand Up @@ -469,17 +469,17 @@ func runOvnKube(ctx context.Context, runMode *ovnkubeRunMode, ovnClientset *util
defer clusterManagerWatchFactory.Shutdown()
}

cm, err := clustermanager.NewClusterManager(ovnClientset.GetClusterManagerClientset(), clusterManagerWatchFactory,
clusterManager, err := clustermanager.NewClusterManager(ovnClientset.GetClusterManagerClientset(), clusterManagerWatchFactory,
runMode.identity, wg, eventRecorder)
if err != nil {
return fmt.Errorf("failed to create new cluster manager: %w", err)
}
metrics.RegisterClusterManagerFunctional()
err = cm.Start(ctx)
err = clusterManager.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start cluster manager: %w", err)
}
defer cm.Stop()
defer clusterManager.Stop()

// record delay until ready
metrics.MetricClusterManagerReadyDuration.Set(time.Since(startTime).Seconds())
Expand All @@ -498,7 +498,7 @@ func runOvnKube(ctx context.Context, runMode *ovnkubeRunMode, ovnClientset *util
return fmt.Errorf("error when trying to initialize libovsdb SB client: %v", err)
}

cm, err := controllerManager.NewNetworkControllerManager(ovnClientset, masterWatchFactory, libovsdbOvnNBClient, libovsdbOvnSBClient, eventRecorder, wg)
networkControllerManager, err := controllerManager.NewNetworkControllerManager(ovnClientset, masterWatchFactory, libovsdbOvnNBClient, libovsdbOvnSBClient, eventRecorder, wg)
if err != nil {
return err
}
Expand All @@ -510,7 +510,7 @@ func runOvnKube(ctx context.Context, runMode *ovnkubeRunMode, ovnClientset *util
ovnkubeControllerWG.Add(1)
go func() {
defer ovnkubeControllerWG.Done()
err = cm.Start(ctx)
err = networkControllerManager.Start(ctx)
if err != nil {
ovnkubeControllerStartErr = fmt.Errorf("failed to start ovnkube controller: %w", err)
klog.Error(ovnkubeControllerStartErr)
Expand All @@ -519,9 +519,12 @@ func runOvnKube(ctx context.Context, runMode *ovnkubeRunMode, ovnClientset *util
// record delay until ready
metrics.MetricOVNKubeControllerReadyDuration.Set(time.Since(startTime).Seconds())
}()
// make sure ovnkubeController started in a separate goroutine will execute .Stop() on shutdown.
// Stop() only makes sense to call if Start() succeeded.
defer func() {
if ovnkubeControllerStartErr != nil {
cm.Stop()
ovnkubeControllerWG.Wait()
if ovnkubeControllerStartErr == nil {
networkControllerManager.Stop()
}
}()
}
Expand Down Expand Up @@ -560,6 +563,7 @@ func runOvnKube(ctx context.Context, runMode *ovnkubeRunMode, ovnClientset *util
metrics.MetricNodeReadyDuration.Set(time.Since(startTime).Seconds())
}

// wait for ovnkubeController to start and check error
if runMode.ovnkubeController {
ovnkubeControllerWG.Wait()
if ovnkubeControllerStartErr != nil {
Expand Down
27 changes: 26 additions & 1 deletion go-controller/pkg/factory/factory_test.go
Expand Up @@ -320,6 +320,7 @@ var _ = Describe("Watch Factory Operations", func() {
adminNetworkPolicies []*anpapi.AdminNetworkPolicy
baselineAdminNetworkPolicies []*anpapi.BaselineAdminNetworkPolicy
err error
shutdown bool
)

const (
Expand Down Expand Up @@ -478,10 +479,13 @@ var _ = Describe("Watch Factory Operations", func() {
}
return true, obj, nil
})
shutdown = false
})

AfterEach(func() {
wf.Shutdown()
if !shutdown {
wf.Shutdown()
}
})

Context("when a processExisting is given", func() {
Expand Down Expand Up @@ -745,6 +749,27 @@ var _ = Describe("Watch Factory Operations", func() {
baselineAdminNetworkPolicies = append(baselineAdminNetworkPolicies, newBaselineAdminNetworkPolicy("myBANP2"))
testExisting(BaselineAdminNetworkPolicyType)
})
It("doesn't deadlock when factory is shutdown", func() {
// every queue has length 10, but some events may be handled before the stop channel event is selected,
// so multiply by 15 instead of 10 to ensure overflow
for i := uint32(1); i <= defaultNumEventQueues*15; i++ {
pods = append(pods, newPod(fmt.Sprintf("pod%d", i), "default"))
}
wf, err = NewMasterWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())
err = wf.Start()
Expect(err).NotTo(HaveOccurred())
wf.Shutdown()
shutdown = true
h, err := wf.addHandler(PodType, "", nil,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
UpdateFunc: func(old, new interface{}) {},
DeleteFunc: func(obj interface{}) {},
}, nil, wf.GetHandlerPriority(PodType))
Expect(err).NotTo(HaveOccurred())
wf.removeHandler(PodType, h)
})
})

Context("when EgressIP is disabled", func() {
Expand Down
39 changes: 23 additions & 16 deletions go-controller/pkg/factory/handler.go
Expand Up @@ -81,9 +81,10 @@ type initialAddFn func(*Handler, []interface{})

type queueMap struct {
sync.Mutex
entries map[ktypes.NamespacedName]*queueMapEntry
queues []chan *event
wg *sync.WaitGroup
entries map[ktypes.NamespacedName]*queueMapEntry
queues []chan *event
wg *sync.WaitGroup
stopChan chan struct{}
}

type queueMapEntry struct {
Expand Down Expand Up @@ -221,19 +222,20 @@ func (i *informer) removeHandler(handler *Handler) {
}()
}

func newQueueMap(numEventQueues uint32, wg *sync.WaitGroup) *queueMap {
func newQueueMap(numEventQueues uint32, wg *sync.WaitGroup, stopChan chan struct{}) *queueMap {
qm := &queueMap{
entries: make(map[ktypes.NamespacedName]*queueMapEntry),
queues: make([]chan *event, numEventQueues),
wg: wg,
entries: make(map[ktypes.NamespacedName]*queueMapEntry),
queues: make([]chan *event, numEventQueues),
wg: wg,
stopChan: stopChan,
}
for j := 0; j < int(numEventQueues); j++ {
qm.queues[j] = make(chan *event, 10)
}
return qm
}

func (qm *queueMap) processEvents(queue chan *event, stopChan <-chan struct{}) {
func (qm *queueMap) processEvents(queue chan *event) {
defer qm.wg.Done()
for {
select {
Expand All @@ -242,16 +244,16 @@ func (qm *queueMap) processEvents(queue chan *event, stopChan <-chan struct{}) {
return
}
e.process(e)
case <-stopChan:
case <-qm.stopChan:
return
}
}
}

func (qm *queueMap) start(stopChan chan struct{}) {
func (qm *queueMap) start() {
qm.wg.Add(len(qm.queues))
for _, q := range qm.queues {
go qm.processEvents(q, stopChan)
go qm.processEvents(q)
}
}

Expand Down Expand Up @@ -352,14 +354,19 @@ func (qm *queueMap) releaseQueueMapEntry(key ktypes.NamespacedName, entry *queue
// enqueueEvent adds an event to the appropriate queue for the object
func (qm *queueMap) enqueueEvent(oldObj, obj interface{}, oType reflect.Type, isDel bool, processFunc func(*event)) {
key, entry := qm.getQueueMapEntry(oType, obj)
qm.queues[entry.queue] <- &event{
event := &event{
obj: obj,
oldObj: oldObj,
process: func(e *event) {
processFunc(e)
qm.releaseQueueMapEntry(key, entry, isDel)
},
}
select {
case qm.queues[entry.queue] <- event:
case <-qm.stopChan:
return
}
}

func ensureObjectOnDelete(obj interface{}, expectedType reflect.Type) (interface{}, error) {
Expand Down Expand Up @@ -565,17 +572,17 @@ func newQueuedInformer(oType reflect.Type, sharedInformer cache.SharedIndexInfor
if err != nil {
return nil, err
}
i.queueMap = newQueueMap(numEventQueues, &i.shutdownWg)
i.queueMap.start(stopChan)
i.queueMap = newQueueMap(numEventQueues, &i.shutdownWg, stopChan)
i.queueMap.start()

i.initialAddFunc = func(h *Handler, items []interface{}) {
// Make a handler-specific channel array across which the
// initial add events will be distributed. When a new handler
// is added, only that handler should receive events for all
// existing objects.
addsWg := &sync.WaitGroup{}
addsMap := newQueueMap(numEventQueues, addsWg)
addsMap.start(stopChan)
addsMap := newQueueMap(numEventQueues, addsWg, stopChan)
addsMap.start()

// Distribute the existing items into the handler-specific
// channel array.
Expand Down

0 comments on commit 3fdc924

Please sign in to comment.