Skip to content

Commit

Permalink
backup: use history iterator to scan ddl jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Jun 19, 2024
1 parent a8de589 commit 25cd3c3
Showing 1 changed file with 50 additions and 29 deletions.
79 changes: 50 additions & 29 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,38 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S
if err != nil {
return errors.Trace(err)
}

handleJobsFn := func(jobs []*model.Job) (bool, error) {
for _, job := range jobs {
if skipUnsupportedDDLJob(job) {
continue
}

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) {
if job.BinlogInfo.DBInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.DBInfo.PlacementPolicyRef = nil
}
if job.BinlogInfo.TableInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.TableInfo.ClearPlacement()
}
jobBytes, err := json.Marshal(job)
if err != nil {
return true, errors.Trace(err)
}
err = metaWriter.Send(jobBytes, metautil.AppendDDL)
if err != nil {
return true, errors.Trace(err)
}
}
// early exits to stop unnecessary scan
return true, nil
}
return true, nil
}

newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver)))
allJobs := make([]*model.Job, 0)
err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error {
Expand All @@ -1014,42 +1046,31 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S
return errors.Trace(err)
}

historyJobs, err := ddl.GetAllHistoryDDLJobs(newestMeta)
historyJobsIter, err := ddl.GetLastHistoryDDLJobsIterator(newestMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("get history jobs", zap.Int("jobs", len(historyJobs)))
allJobs = append(allJobs, historyJobs...)

count := 0
for _, job := range allJobs {
if skipUnsupportedDDLJob(job) {
continue
}
_, err = handleJobsFn(allJobs)
if err != nil {
log.Error("failed to handle job", zap.Error(err))
return errors.Trace(err)
}
count := len(allJobs)

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) {
if job.BinlogInfo.DBInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.DBInfo.PlacementPolicyRef = nil
}
if job.BinlogInfo.TableInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.TableInfo.ClearPlacement()
}
jobBytes, err := json.Marshal(job)
if err != nil {
return errors.Trace(err)
}
err = metaWriter.Send(jobBytes, metautil.AppendDDL)
if err != nil {
return errors.Trace(err)
}
count++
cacheJobs := make([]*model.Job, 0, ddl.DefNumHistoryJobs)
for {
cacheJobs, err = historyJobsIter.GetLastJobs(ddl.DefNumHistoryJobs, cacheJobs)
if err != nil || len(cacheJobs) == 0 {
return errors.Trace(err)
}
count += len(cacheJobs)
finish, err := handleJobsFn(cacheJobs)
if err != nil || finish {
log.Debug("get complete jobs", zap.Int("jobs", count))
return errors.Trace(err)
}
}
log.Debug("get completed jobs", zap.Int("jobs", count))
return nil
}

func (bc *Client) BuildProgressRangeTree(ranges []rtree.Range) (rtree.ProgressRangeTree, error) {
Expand Down

0 comments on commit 25cd3c3

Please sign in to comment.