Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resourcemanager: support worker pool #42513

Merged
merged 10 commits into from
Mar 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 10 additions & 9 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@ var (

// metrics labels.
const (
LabelSession = "session"
LabelDomain = "domain"
LabelDDLOwner = "ddl-owner"
LabelDDL = "ddl"
LabelDDLWorker = "ddl-worker"
LabelDistReorg = "dist-reorg"
LabelDDLSyncer = "ddl-syncer"
LabelGCWorker = "gcworker"
LabelAnalyze = "analyze"
LabelSession = "session"
LabelDomain = "domain"
LabelDDLOwner = "ddl-owner"
LabelDDL = "ddl"
LabelDDLWorker = "ddl-worker"
LabelDistReorg = "dist-reorg"
LabelDDLSyncer = "ddl-syncer"
LabelGCWorker = "gcworker"
LabelAnalyze = "analyze"
LabelWorkerPool = "worker-pool"

LabelBatchRecvLoop = "batch-recv-loop"
LabelBatchSendLoop = "batch-send-loop"
Expand Down
36 changes: 36 additions & 0 deletions resourcemanager/pool/workerpool/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "workerpool",
srcs = ["workerpool.go"],
importpath = "github.com/pingcap/tidb/resourcemanager/pool/workerpool",
visibility = ["//visibility:public"],
deps = [
"//metrics",
"//resourcemanager",
"//resourcemanager/util",
"//util",
"//util/syncutil",
"@org_uber_go_atomic//:atomic",
],
)

go_test(
name = "workerpool_test",
timeout = "short",
srcs = [
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
"main_test.go",
"workpool_test.go",
],
embed = [":workerpool"],
flaky = True,
race = "on",
deps = [
"//resourcemanager/util",
"//testkit/testsetup",
"//util/logutil",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
],
)
33 changes: 33 additions & 0 deletions resourcemanager/pool/workerpool/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package workerpool

import (
"testing"

"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
testsetup.SetupForCommonTest()
goleak.VerifyTestMain(m, opts...)
}
153 changes: 153 additions & 0 deletions resourcemanager/pool/workerpool/workerpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package workerpool

import (
"time"

"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/resourcemanager"
"github.com/pingcap/tidb/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/syncutil"
atomicutil "go.uber.org/atomic"
)

// Worker is worker interface.
type Worker[T any] interface {
HandleTask(task T)
Close()
}

// WorkerPool is a pool of workers.
type WorkerPool[T any] struct {
name string
numWorkers int32
runningTask atomicutil.Int32
taskChan chan T
quitChan chan struct{}
wg tidbutil.WaitGroupWrapper
createWorker func() Worker[T]
lastTuneTs atomicutil.Time
mu syncutil.RWMutex
}

// NewWorkerPool creates a new worker pool.
func NewWorkerPool[T any](name string, component util.Component, numWorkers int, createWorker func() Worker[T]) (*WorkerPool[T], error) {
if numWorkers <= 0 {
numWorkers = 1
}

p := &WorkerPool[T]{
name: name,
numWorkers: int32(numWorkers),
taskChan: make(chan T),
quitChan: make(chan struct{}),
createWorker: createWorker,
}

err := resourcemanager.InstanceResourceManager.Register(p, name, component)
if err != nil {
return nil, err
}

// Start default count of workers.
for i := 0; i < int(p.numWorkers); i++ {
p.runAWorker()
}

return p, nil
}

func (p *WorkerPool[T]) handleTaskWithRecover(w Worker[T], task T) {
p.runningTask.Add(1)
defer func() {
tidbutil.Recover(metrics.LabelWorkerPool, "handleTaskWithRecover", nil, false)
p.runningTask.Add(-1)
}()
w.HandleTask(task)
}

func (p *WorkerPool[T]) runAWorker() {
p.wg.Run(func() {
w := p.createWorker()
for {
select {
case task := <-p.taskChan:
p.handleTaskWithRecover(w, task)
case <-p.quitChan:
w.Close()
return
}
}
})
}

// AddTask adds a task to the pool.
func (p *WorkerPool[T]) AddTask(task T) {
p.taskChan <- task
}

// Tune tunes the pool to the specified number of workers.
func (p *WorkerPool[T]) Tune(numWorkers int32) {
if numWorkers <= 0 {
numWorkers = 1
}
p.lastTuneTs.Store(time.Now())
p.mu.Lock()
defer p.mu.Unlock()
diff := numWorkers - p.numWorkers
if diff > 0 {
// Add workers
for i := 0; i < int(diff); i++ {
p.runAWorker()
}
} else if diff < 0 {
// Remove workers
for i := 0; i < int(-diff); i++ {
p.quitChan <- struct{}{}
}
}
p.numWorkers = numWorkers
}

// LastTunerTs returns the last time when the pool was tuned.
func (p *WorkerPool[T]) LastTunerTs() time.Time {
return p.lastTuneTs.Load()
}

// Cap returns the capacity of the pool.
func (p *WorkerPool[T]) Cap() int32 {
p.mu.RLock()
defer p.mu.RUnlock()
return p.numWorkers
}

// Running returns the number of running workers.
func (p *WorkerPool[T]) Running() int32 {
return p.runningTask.Load()
}

// Name returns the name of the pool.
func (p *WorkerPool[T]) Name() string {
return p.name
}

// ReleaseAndWait releases the pool and wait for complete.
func (p *WorkerPool[T]) ReleaseAndWait() {
close(p.quitChan)
p.wg.Wait()
resourcemanager.InstanceResourceManager.Unregister(p.Name())
}
93 changes: 93 additions & 0 deletions resourcemanager/pool/workerpool/workpool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package workerpool

import (
"sync"
"sync/atomic"
"testing"

"github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

var globalCnt atomic.Int64
var cntWg sync.WaitGroup

type MyWorker[T int64] struct {
id int
}

func (w *MyWorker[T]) HandleTask(task int64) {
globalCnt.Add(task)
cntWg.Done()
logutil.BgLogger().Info("Worker handling task")
}

func (w *MyWorker[T]) Close() {
logutil.BgLogger().Info("Close worker", zap.Any("id", w.id))
}

func createMyWorker() Worker[int64] {
return &MyWorker[int64]{}
}

func TestWorkerPool(t *testing.T) {
// Create a worker pool with 3 workers.
pool, err := NewWorkerPool[int64]("test", util.UNKNOWN, 3, createMyWorker)
require.NoError(t, err)
globalCnt.Store(0)

// Add some tasks to the pool.
cntWg.Add(10)
for i := 0; i < 10; i++ {
pool.AddTask(int64(i))
}

cntWg.Wait()
require.Equal(t, int32(3), pool.Cap())
require.Equal(t, int64(45), globalCnt.Load())

// Enlarge the pool to 5 workers.
pool.Tune(5)

// Add some more tasks to the pool.
cntWg.Add(10)
for i := 0; i < 10; i++ {
pool.AddTask(int64(i))
}

cntWg.Wait()
require.Equal(t, int32(5), pool.Cap())
require.Equal(t, int64(90), globalCnt.Load())

// Decrease the pool to 2 workers.
pool.Tune(2)

// Add some more tasks to the pool.
cntWg.Add(10)
for i := 0; i < 10; i++ {
pool.AddTask(int64(i))
}

cntWg.Wait()
require.Equal(t, int32(2), pool.Cap())
require.Equal(t, int64(135), globalCnt.Load())

// Wait for the tasks to be completed.
pool.ReleaseAndWait()
}