Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix runnable ingest job checking #52503

Merged
merged 15 commits into from
Apr 12, 2024
8 changes: 1 addition & 7 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
if err != nil {
logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err))
}
d.runningJobs.clear()
Copy link
Contributor

@lance6716 lance6716 Apr 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we reuse the startDispatchLoop to initialize the runningJobs, by relying on the deterministic order from order by processing desc, job_id and add the jobs one by one.

Considering the case that the former DDL owner marked wrong jobs as running, like
100 (running), 101 (pending), 102 (running wrongly). Now the new DDL owner will let the states be (running, pending, pending) then (finished, pending, running). However, the correct state should be (running, pending, pending) then (finished, running, pending).

I slightly prefer the new DDL owner re-compute the running jobs instead of reuse the persistent state from persistent table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

“Considering the case that the former DDL owner marked wrong jobs as running, like
100 (running), 101 (pending), 102 (running wrongly).”
How come?

Copy link
Contributor

@lance6716 lance6716 Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

“Considering the case that the former DDL owner marked wrong jobs as running, like 100 (running), 101 (pending), 102 (running wrongly).” How come?

The old owner uses the code before this PR, and marked 102 as running, like in the linking issue

})

d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil)
Expand Down Expand Up @@ -869,13 +870,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
defer d.sessPool.Put(ctx)

ingest.InitGlobalLightningEnv()
d.ownerManager.SetRetireOwnerHook(func() {
// Since this instance is not DDL owner anymore, we clean up the processing job info.
if ingest.LitBackCtxMgr != nil {
ingest.LitBackCtxMgr.MarkJobFinish()
}
d.runningJobs.clear()
})

return nil
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/ddl/ddl_running_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

type runningJobs struct {
Expand All @@ -36,6 +39,11 @@ type runningJobs struct {
// It is not necessarily being processed by a worker.
unfinishedIDs map[int64]struct{}
unfinishedSchema map[string]map[string]struct{} // database -> table -> struct{}

// processingReorgJobID records the ID of the ingest job that is being processed by a worker.
// TODO(tangenta): remove this when we support running multiple concurrent ingest jobs.
processingIngestJobID int64
lastLoggingTime time.Time
}

func newRunningJobs() *runningJobs {
Expand All @@ -47,6 +55,8 @@ func newRunningJobs() *runningJobs {
}

func (j *runningJobs) clear() {
j.Lock()
defer j.Unlock()
j.unfinishedIDs = make(map[int64]struct{})
j.unfinishedSchema = make(map[string]map[string]struct{})
}
Expand All @@ -56,6 +66,9 @@ func (j *runningJobs) add(job *model.Job) {
defer j.Unlock()
j.processingIDs[job.ID] = struct{}{}
j.updateInternalRunningJobIDs()
if isIngestJob(job) {
j.processingIngestJobID = job.ID
}

if _, ok := j.unfinishedIDs[job.ID]; ok {
// Already exists, no need to add it again.
Expand All @@ -75,6 +88,9 @@ func (j *runningJobs) remove(job *model.Job) {
defer j.Unlock()
delete(j.processingIDs, job.ID)
j.updateInternalRunningJobIDs()
if isIngestJob(job) && job.ID == j.processingIngestJobID {
j.processingIngestJobID = 0
}

if job.IsFinished() || job.IsSynced() {
delete(j.unfinishedIDs, job.ID)
Expand Down Expand Up @@ -115,6 +131,16 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool {
// Already processing by a worker. Skip running it again.
return false
}
if isIngestJob(job) && j.processingIngestJobID != 0 {
// We only allow one task to use ingest at the same time in order to limit the CPU/memory usage.
if time.Since(j.lastLoggingTime) > 1*time.Minute {
logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job",
zap.String("category", "ddl-ingest"),
zap.Int64("processing job ID", j.processingIngestJobID))
j.lastLoggingTime = time.Now()
}
return false
}
for _, info := range job.GetInvolvingSchemaInfo() {
if _, ok := j.unfinishedSchema[model.InvolvingAll]; ok {
return false
Expand All @@ -136,3 +162,9 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool {
}
return true
}

func isIngestJob(job *model.Job) bool {
return (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
job.ReorgMeta != nil &&
job.ReorgMeta.IsFastReorg
}
11 changes: 0 additions & 11 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -693,7 +692,6 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
startTime := time.Now()
defer func() {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
markJobFinish(job)
}()

if JobNeedGC(job) {
Expand Down Expand Up @@ -743,15 +741,6 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
return errors.Trace(err)
}

func markJobFinish(job *model.Job) {
if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
job.ReorgMeta != nil &&
job.ReorgMeta.IsFastReorg &&
ingest.LitBackCtxMgr != nil {
ingest.LitBackCtxMgr.MarkJobFinish()
}
}

func (w *worker) writeDDLSeqNum(job *model.Job) {
w.ddlSeqNumMu.Lock()
w.ddlSeqNumMu.seqNum++
Expand Down
42 changes: 9 additions & 33 deletions pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ import (
"fmt"
"math"
"strconv"
"sync"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/pingcap/tidb/pkg/util/logutil"
kvutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand All @@ -48,18 +49,12 @@ type BackendCtxMgr interface {
) (BackendCtx, error)
Unregister(jobID int64)
Load(jobID int64) (BackendCtx, bool)

MarkJobProcessing(jobID int64) (ok bool)
MarkJobFinish()
}

type litBackendCtxMgr struct {
generic.SyncMap[int64, *litBackendCtx]
memRoot MemRoot
diskRoot DiskRoot
processingJobID int64
lastLoggingTime time.Time
mu sync.Mutex
memRoot MemRoot
diskRoot DiskRoot
}

func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr {
Expand All @@ -80,30 +75,6 @@ func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr {
return mgr
}

// MarkJobProcessing marks ingest backfill is processing.
func (m *litBackendCtxMgr) MarkJobProcessing(jobID int64) bool {
m.mu.Lock()
defer m.mu.Unlock()
if m.processingJobID == 0 || m.processingJobID == jobID {
m.processingJobID = jobID
return true
}
if time.Since(m.lastLoggingTime) > 1*time.Minute {
logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job",
zap.String("category", "ddl-ingest"),
zap.Int64("processing job ID", m.processingJobID))
m.lastLoggingTime = time.Now()
}
return false
}

// MarkJobFinish marks ingest backfill is finished.
func (m *litBackendCtxMgr) MarkJobFinish() {
m.mu.Lock()
m.processingJobID = 0
m.mu.Unlock()
}

// CheckAvailable checks if the ingest backfill is available.
func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
if err := m.diskRoot.PreCheckUsage(); err != nil {
Expand All @@ -113,6 +84,8 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
return true, nil
}

var ResignOwnerForTest = atomic.NewBool(false)

// Register creates a new backend and registers it to the backend context.
func (m *litBackendCtxMgr) Register(
ctx context.Context,
Expand All @@ -137,6 +110,9 @@ func (m *litBackendCtxMgr) Register(
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
}
failpoint.Inject("beforeCreateLocalBackend", func() {
ResignOwnerForTest.Store(true)
})
bd, err := createLocalBackend(ctx, cfg, pdSvcDiscovery)
if err != nil {
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err))
Expand Down
9 changes: 0 additions & 9 deletions pkg/ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,6 @@ func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBacken
}
}

// MarkJobProcessing implements BackendCtxMgr.MarkJobProcessing interface.
func (*MockBackendCtxMgr) MarkJobProcessing(_ int64) bool {
return true
}

// MarkJobFinish implements BackendCtxMgr.MarkJobFinish interface.
func (*MockBackendCtxMgr) MarkJobFinish() {
}

// CheckAvailable implements BackendCtxMgr.Available interface.
func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) {
return len(m.runningJobs) == 0, nil
Expand Down
21 changes: 10 additions & 11 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,6 @@ func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) {
if !d.runningJobs.checkRunnable(job) {
return false, nil
}
if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
job.State == model.JobStateQueueing &&
job.ReorgMeta != nil &&
job.ReorgMeta.IsFastReorg &&
ingest.LitBackCtxMgr != nil {
succeed := ingest.LitBackCtxMgr.MarkJobProcessing(job.ID)
if !succeed {
// We only allow one task to use ingest at the same time in order to limit the CPU/memory usage.
return false, nil
}
}
// Check if there is any block ddl running, like drop schema and flashback cluster.
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where "+
"(CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and type = %d and processing) "+
Expand Down Expand Up @@ -292,6 +281,16 @@ func (d *ddl) startDispatchLoop() {
time.Sleep(dispatchLoopWaitingDuration)
continue
}
failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() {
if ingest.ResignOwnerForTest.Load() {
err2 := d.ownerManager.ResignOwner(context.Background())
if err2 != nil {
logutil.BgLogger().Info("resign meet error", zap.Error(err2))
}
time.Sleep(500 * time.Millisecond)
ingest.ResignOwnerForTest.Store(false)
}
})
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-d.ddlJobCh:
case <-ticker.C:
Expand Down
19 changes: 19 additions & 0 deletions pkg/lightning/backend/local/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (
"math"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/vfs"
"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -581,6 +584,19 @@ func (em *engineManager) getBufferPool() *membuf.Pool {
return em.bufferPool
}

// only used in tests
type slowCreateFS struct {
vfs.FS
}

func (s slowCreateFS) Create(name string) (vfs.File, error) {
if strings.Contains(name, "temporary") {
// print stack
time.Sleep(time.Second)
}
return s.FS.Create(name)
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
dbPath := filepath.Join(storeDir, duplicateDBName)
// TODO: Optimize the opts for better write.
Expand All @@ -589,6 +605,9 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) {
newRangePropertiesCollector,
},
}
failpoint.Inject("slowCreateFS", func() {
opts.FS = slowCreateFS{vfs.Default}
})
return pebble.Open(dbPath, opts)
}

Expand Down
12 changes: 1 addition & 11 deletions pkg/owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ type Manager interface {

// SetBeOwnerHook sets a hook. The hook is called before becoming an owner.
SetBeOwnerHook(hook func())
// SetRetireOwnerHook will be called after retiring the owner.
SetRetireOwnerHook(hook func())
}

const (
Expand Down Expand Up @@ -118,8 +116,7 @@ type ownerManager struct {
wg sync.WaitGroup
campaignCancel context.CancelFunc

beOwnerHook func()
retireOwnerHook func()
beOwnerHook func()
}

// NewOwnerManager creates a new Manager.
Expand Down Expand Up @@ -164,10 +161,6 @@ func (m *ownerManager) SetBeOwnerHook(hook func()) {
m.beOwnerHook = hook
}

func (m *ownerManager) SetRetireOwnerHook(hook func()) {
m.retireOwnerHook = hook
}

// ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing.
var ManagerSessionTTL = 60

Expand Down Expand Up @@ -230,9 +223,6 @@ func (m *ownerManager) toBeOwner(elec *concurrency.Election) {

// RetireOwner make the manager to be a not owner.
func (m *ownerManager) RetireOwner() {
if m.retireOwnerHook != nil {
m.retireOwnerHook()
}
atomic.StorePointer(&m.elec, nil)
}

Expand Down
5 changes: 0 additions & 5 deletions pkg/owner/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,6 @@ func (m *mockManager) SetBeOwnerHook(hook func()) {
m.beOwnerHook = hook
}

// SetRetireOwnerHook implements Manager.SetRetireOwnerHook interface.
func (m *mockManager) SetRetireOwnerHook(hook func()) {
m.retireHook = hook
}

// CampaignCancel implements Manager.CampaignCancel interface
func (m *mockManager) CampaignCancel() {
m.campaignDone <- struct{}{}
Expand Down