Skip to content

Commit

Permalink
Merge pull request #35 from gnufied/backport-deadlock-fix
Browse files Browse the repository at this point in the history
[4.7] Bug 1936975: Fix deadlock when enqueing functions into the pool
  • Loading branch information
openshift-merge-robot committed Mar 24, 2021
2 parents 602c274 + 59177e8 commit 077d6be
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 6 deletions.
8 changes: 4 additions & 4 deletions pkg/operator/operator.go
Expand Up @@ -50,6 +50,8 @@ const (
cloudCredentialsSecretName = "vsphere-cloud-credentials"
// TODO: make it configurable?
parallelVSPhereCalls = 10
// Size of golang channel buffer
channelBufferSize = 100
)

var (
Expand Down Expand Up @@ -151,7 +153,7 @@ func (c *vSphereProblemDetectorController) runChecks(ctx context.Context) (time.
KubeClient: c,
}

checkRunner := NewCheckThreadPool(parallelVSPhereCalls)
checkRunner := NewCheckThreadPool(parallelVSPhereCalls, channelBufferSize)
resultCollector := NewResultsCollector()
c.enqueueClusterChecks(checkContext, checkRunner, resultCollector)
if err := c.enqueueNodeChecks(checkContext, checkRunner, resultCollector); err != nil {
Expand Down Expand Up @@ -251,9 +253,7 @@ func (c *vSphereProblemDetectorController) enqueueSingleNodeChecks(checkContext
for i := range c.nodeChecks {
check := c.nodeChecks[i]
klog.V(4).Infof("Adding node check %s:%s", node.Name, check.Name())
checkRunner.RunGoroutine(checkContext.Context, func() {
c.runSingleNodeSingleCheck(checkContext, resultCollector, node, vm, check)
})
c.runSingleNodeSingleCheck(checkContext, resultCollector, node, vm, check)
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/pool.go
Expand Up @@ -16,9 +16,9 @@ type CheckThreadPool struct {
}

// Creates a new CheckThreadPool with given max. number of goroutines.
func NewCheckThreadPool(parallelism int) *CheckThreadPool {
func NewCheckThreadPool(parallelism int, channelBufferSize int) *CheckThreadPool {
pool := &CheckThreadPool{
workCh: make(chan func(), 100),
workCh: make(chan func(), channelBufferSize),
}

for i := 0; i < parallelism; i++ {
Expand Down
41 changes: 41 additions & 0 deletions pkg/operator/pool_test.go
@@ -0,0 +1,41 @@
package operator

import (
"context"
"fmt"
"testing"
"time"
)

func TestThreadPool(t *testing.T) {
pool := NewCheckThreadPool(5, 5)
ctx := context.TODO()
startTime := time.Now()
for i := 0; i < 5; i++ {
i := i
pool.RunGoroutine(ctx, func() {
fmt.Printf("running parent task %d\n", i)
for j := 0; j < 5; j++ {
j := j
runSomeTask(fmt.Sprintf("task-%d-%d", i, j))
}
})
}
waitChannel := make(chan interface{})
go func() {
pool.Wait(ctx)
close(waitChannel)
}()
select {
case <-waitChannel:
fmt.Printf("test finished successfully")
case <-time.After(40 * time.Second):
t.Errorf("test failed to finish")
}
fmt.Printf("time taken is: %v", time.Now().Sub(startTime))
}

func runSomeTask(taskID string) {
fmt.Printf("Running task: %s\n", taskID)
time.Sleep(5 * time.Second)
}

0 comments on commit 077d6be

Please sign in to comment.