Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resourcemanager: support worker pool #42513

Merged
merged 10 commits into from
Mar 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 32 additions & 0 deletions resourcemanager/pool/workerpool/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "workerpool",
srcs = ["workerpool.go"],
importpath = "github.com/pingcap/tidb/resourcemanager/pool/workerpool",
visibility = ["//visibility:public"],
deps = [
"//resourcemanager",
"//resourcemanager/util",
"//util",
"@com_github_sasha_s_go_deadlock//:go-deadlock",
"@org_uber_go_atomic//:atomic",
],
)

go_test(
name = "workerpool_test",
srcs = [
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
"main_test.go",
"workpool_test.go",
],
embed = [":workerpool"],
deps = [
"//resourcemanager/util",
"//testkit/testsetup",
"//util/logutil",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
],
)
33 changes: 33 additions & 0 deletions resourcemanager/pool/workerpool/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package workerpool

import (
"testing"

"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
testsetup.SetupForCommonTest()
goleak.VerifyTestMain(m, opts...)
}
140 changes: 140 additions & 0 deletions resourcemanager/pool/workerpool/workerpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package workerpool

import (
"time"

"github.com/pingcap/tidb/resourcemanager"
"github.com/pingcap/tidb/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/syncutil"
atomicutil "go.uber.org/atomic"
)

// Worker is worker interface.
type Worker interface {
HandleTask(task any)
Close()
}

// WorkerPool is a pool of workers.
type WorkerPool[T any] struct {
name string
numWorkers int32
runningWorkers atomicutil.Int32
tangenta marked this conversation as resolved.
Show resolved Hide resolved
taskChan chan T
quitChan chan struct{}
wg tidbutil.WaitGroupWrapper
createWorker func() Worker
lastTuneTs atomicutil.Time
mu syncutil.RWMutex
}

// NewWorkerPool creates a new worker pool.
func NewWorkerPool[T any](name string, component util.Component, numWorkers int, createWorker func() Worker) (*WorkerPool[T], error) {
if numWorkers <= 0 {
numWorkers = 1
}

p := &WorkerPool[T]{
name: name,
numWorkers: int32(numWorkers),
taskChan: make(chan T),
quitChan: make(chan struct{}),
createWorker: createWorker,
}

err := resourcemanager.InstanceResourceManager.Register(p, name, component)
if err != nil {
return nil, err
}

// Start default count of workers.
for i := 0; i < int(p.numWorkers); i++ {
p.runAWorker()
}

return p, nil
}

func (p *WorkerPool[T]) runAWorker() {
p.wg.Run(func() {
w := p.createWorker()
for {
select {
case task := <-p.taskChan:
p.runningWorkers.Add(1)
w.HandleTask(task)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add recoveryhere to avoid panic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let the worker handle it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If wokrer forgets to handle it, how to ensure the correctness here.

p.runningWorkers.Add(-1)
case <-p.quitChan:
w.Close()
return
}
}
})
}

// AddTask adds a task to the pool.
func (p *WorkerPool[T]) AddTask(task T) {
p.taskChan <- task
}

// Tune tunes the pool to the specified number of workers.
func (p *WorkerPool[T]) Tune(numWorkers int32) {
if numWorkers <= 0 {
numWorkers = 1
}
p.lastTuneTs.Store(time.Now())
p.mu.Lock()
defer p.mu.Unlock()
diff := numWorkers - p.numWorkers
if diff > 0 {
// Add workers
for i := 0; i < int(diff); i++ {
p.runAWorker()
}
} else if diff < 0 {
// Remove workers
for i := 0; i < int(-diff); i++ {
p.quitChan <- struct{}{}
}
}
p.numWorkers = numWorkers
}

func (p *WorkerPool[T]) LastTunerTs() time.Time {
return p.lastTuneTs.Load()
}

func (p *WorkerPool[T]) Cap() int32 {
p.mu.RLock()
defer p.mu.RUnlock()
return p.numWorkers
}

func (p *WorkerPool[T]) Running() int32 {
return p.runningWorkers.Load()
}

func (p *WorkerPool[T]) Name() string {
return p.name
}

func (p *WorkerPool[T]) ReleaseAndWait() {
close(p.quitChan)
p.wg.Wait()
resourcemanager.InstanceResourceManager.Unregister(p.Name())
}
93 changes: 93 additions & 0 deletions resourcemanager/pool/workerpool/workpool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package workerpool

import (
"sync"
"sync/atomic"
"testing"

"github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

var globalCnt atomic.Int64
var cntWg sync.WaitGroup

type MyWorker struct {
id int
}

func (w *MyWorker) HandleTask(task any) {
globalCnt.Add(task.(int64))
cntWg.Done()
logutil.BgLogger().Info("Worker handling task")
}

func (w *MyWorker) Close() {
wjhuang2016 marked this conversation as resolved.
Show resolved Hide resolved
logutil.BgLogger().Info("Close worker", zap.Any("id", w.id))
}

func createMyWorker() Worker {
return &MyWorker{}
}

func TestWorkerPool(t *testing.T) {
// Create a worker pool with 3 workers.
pool, err := NewWorkerPool[int64]("test", util.UNKNOWN, 3, createMyWorker)
require.NoError(t, err)
globalCnt.Store(0)

// Add some tasks to the pool.
cntWg.Add(10)
for i := 0; i < 10; i++ {
pool.AddTask(int64(i))
}

cntWg.Wait()
require.Equal(t, int32(3), pool.Cap())
require.Equal(t, int64(45), globalCnt.Load())

// Enlarge the pool to 5 workers.
pool.Tune(5)

// Add some more tasks to the pool.
cntWg.Add(10)
for i := 0; i < 10; i++ {
pool.AddTask(int64(i))
}

cntWg.Wait()
require.Equal(t, int32(5), pool.Cap())
require.Equal(t, int64(90), globalCnt.Load())

// Decrease the pool to 2 workers.
pool.Tune(2)

// Add some more tasks to the pool.
cntWg.Add(10)
for i := 0; i < 10; i++ {
pool.AddTask(int64(i))
}

cntWg.Wait()
require.Equal(t, int32(2), pool.Cap())
require.Equal(t, int64(135), globalCnt.Load())

// Wait for the tasks to be completed.
pool.ReleaseAndWait()
}