Skip to content

Commit

Permalink
add ttl job manager framework
Browse files Browse the repository at this point in the history
Signed-off-by: YangKeao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao committed Dec 5, 2022
1 parent 04cd9b8 commit c9fc7f9
Show file tree
Hide file tree
Showing 9 changed files with 1,508 additions and 14 deletions.
1 change: 1 addition & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ go_library(
"//statistics",
"//statistics/handle",
"//telemetry",
"//ttl/ttlworker",
"//types",
"//util",
"//util/chunk",
Expand Down
29 changes: 29 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
Expand Down Expand Up @@ -120,6 +121,7 @@ type Domain struct {
expiredTimeStamp4PC types.Time
logBackupAdvancer *daemon.OwnerDaemon
historicalStatsWorker *HistoricalStatsWorker
ttlJobManager *ttlworker.JobManager

serverID uint64
serverIDSession *concurrency.Session
Expand Down Expand Up @@ -1058,6 +1060,10 @@ func (do *Domain) Init(
return err
}

do.wg.Run(func() {
do.runTTLJobManager(ctx)
})

return nil
}

Expand Down Expand Up @@ -2374,6 +2380,29 @@ func (do *Domain) serverIDKeeper() {
}
}

func (do *Domain) runTTLJobManager(ctx context.Context) {
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool)
ttlJobManager.Start()
do.ttlJobManager = ttlJobManager

// TODO: read the worker count from `do.sysVarCache` and resize the workers
ttlworker.ResizeScanWorkers(ctx, 4)
ttlworker.ResizeDelWorkers(ctx, 4)

<-do.exit

ttlJobManager.Stop()
err := ttlJobManager.WaitStopped(ctx, 30*time.Second)
if err != nil {
logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err))
}
}

// TTLJobManager returns the ttl job manager on this domain
func (do *Domain) TTLJobManager() *ttlworker.JobManager {
return do.ttlJobManager
}

func init() {
initByLDFlagsForGlobalKill()
telemetry.GetDomainInfoSchema = func(ctx sessionctx.Context) infoschema.InfoSchema {
Expand Down
18 changes: 4 additions & 14 deletions ttl/cache/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ package cache
import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand All @@ -41,18 +39,10 @@ func NewInfoSchemaCache(updateInterval time.Duration) *InfoSchemaCache {
}

// Update updates the info schema cache
func (isc *InfoSchemaCache) Update(sctx sessionctx.Context) error {
is, ok := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
if !ok {
return errors.New("fail to get domain info schema from session")
}

ext, ok := is.(*infoschema.SessionExtendedInfoSchema)
if !ok {
return errors.New("fail to get extended info schema")
}
func (isc *InfoSchemaCache) Update(se session.Session) error {
is := se.SessionInfoSchema()

if isc.schemaVer == ext.SchemaMetaVersion() {
if isc.schemaVer == is.SchemaMetaVersion() {
return nil
}

Expand Down
15 changes: 15 additions & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "ttlworker",
srcs = [
"config.go",
"del.go",
"job.go",
"job_manager.go",
"scan.go",
"session.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/ttl/ttlworker",
visibility = ["//visibility:public"],
deps = [
"//kv",
"//parser/terror",
"//sessionctx",
"//sessionctx/variable",
Expand All @@ -25,6 +29,7 @@ go_library(
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_time//rate",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -33,23 +38,33 @@ go_test(
name = "ttlworker_test",
srcs = [
"del_test.go",
"job_manager_test.go",
"job_test.go",
"scan_test.go",
"session_test.go",
"ttlworker_test.go",
"worker_test.go",
],
embed = [":ttlworker"],
deps = [
"//infoschema",
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//server",
"//sessionctx",
"//sessionctx/variable",
"//testkit",
"//ttl/cache",
"//ttl/session",
"//types",
"//util/chunk",
"//util/logutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_time//rate",
"@org_uber_go_zap//:zap",
],
)
117 changes: 117 additions & 0 deletions ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ttlworker

import (
"context"
"time"
)

// TODO: the following functions should be put in the variable pkg to avoid cyclic dependency after adding variables for the TTL
// some of them are only used in test

// ResizeScanWorkers resizes the number of scan workers
var ResizeScanWorkers func(context.Context, int) error

// ResizeDelWorkers resizes the number of delete workers
var ResizeDelWorkers func(context.Context, int) error

func (m *JobManager) registerConfigFunctions() {
ResizeScanWorkers = m.resizeScanWorkers
ResizeDelWorkers = m.resizeScanWorkers
}

type jobManagerConfig struct {
// this ticker is not writable
jobManagerLoopTicker time.Duration

updateInfoSchemaCacheInterval time.Duration
updateTTLTableStatusCacheInterval time.Duration

ttlJobRunInterval time.Duration
}

const defJobManagerLoopTicker = 10 * time.Second

const defUpdateInfoSchemaCacheInterval = time.Minute
const defUpdateTTLTableStatusCacheInterval = 10 * time.Minute

const defUpdateTTLTableStatusCacheTimeout = 30 * time.Second
const defSyncInfoSchemaWithTTLTableStatusTimeout = 30 * time.Second
const defUpdateHeartBeatTimeout = 30 * time.Second
const defUpdateStateTimeout = 30 * time.Second
const defTTLJobTimeout = time.Hour

const defTiDBTTLJobRunInterval = 1 * time.Hour

func newJobManagerConfig() jobManagerConfig {
return jobManagerConfig{
jobManagerLoopTicker: defJobManagerLoopTicker,
updateInfoSchemaCacheInterval: defUpdateInfoSchemaCacheInterval,
updateTTLTableStatusCacheInterval: defUpdateTTLTableStatusCacheInterval,
ttlJobRunInterval: defTiDBTTLJobRunInterval,
}
}

func (m *JobManager) setUpdateInfoSchemaCacheInterval(interval time.Duration) {
m.Lock()
defer m.Unlock()

m.config.updateInfoSchemaCacheInterval = interval
m.infoSchemaCache.SetInterval(interval)
}

func (m *JobManager) setUpdateTTLTableStatusCacheInterval(interval time.Duration) {
m.Lock()
defer m.Unlock()

m.config.updateTTLTableStatusCacheInterval = interval
m.tableStatusCache.SetInterval(interval)
}

func (m *JobManager) setTTLJobRunInterval(interval time.Duration) {
m.Lock()
defer m.Unlock()

m.config.ttlJobRunInterval = interval
}

func (m *JobManager) getJobManagerLoopTicker() time.Duration {
m.Lock()
defer m.Unlock()

return m.config.jobManagerLoopTicker
}

func (m *JobManager) getUpdateInfoSchemaCacheInterval() time.Duration {
m.Lock()
defer m.Unlock()

return m.config.updateInfoSchemaCacheInterval
}

func (m *JobManager) getUpdateTTLTableStatusCacheInterval() time.Duration {
m.Lock()
defer m.Unlock()

return m.config.updateTTLTableStatusCacheInterval
}

func (m *JobManager) getTTLJobRunInterval() time.Duration {
m.Lock()
defer m.Unlock()

return m.config.ttlJobRunInterval
}

0 comments on commit c9fc7f9

Please sign in to comment.