Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drainer: reduce memory usage when restart #1030

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ package drainer

import (
"encoding/json"
"math"
"sort"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"go.uber.org/zap"
)

Expand All @@ -46,6 +52,7 @@ type Schema struct {
hasImplicitCol bool

jobs []*model.Job
store kv.Storage
version2SchemaTable map[int64]TableName
currentVersion int64
}
Expand Down Expand Up @@ -267,7 +274,127 @@ func (s *Schema) addJob(job *model.Job) {
}
}

var mDDLJobHistoryKey = []byte("DDLJobHistory")

func (s *Schema) restoreFromSnapshot(version int64) error {
var (
jobs []*model.Job
droppingColumns = make(map[int64]int64)
jobsMeta *meta.Meta
dom *domain.Domain
latestJob *model.Job
err error
)
log.Info("restore from snapshot", zap.Int64("version", version))

jobsMeta, dom, err = loadHistoryMeta(s.store)
if err != nil {
log.Error("load history meta failed", zap.Error(err))
return errors.Trace(err)
}
txn := jobsMeta.GetTxn()

i := 0
err = txn.IterateHash(mDDLJobHistoryKey, func(field []byte, value []byte) error {
i++
job := &model.Job{}
err := job.Decode(value)
if err != nil {
return errors.Trace(err)
}
if i%1000 == 0 {
log.Info("get batched job",
zap.Int("i", i),
zap.Int64("schema version", job.BinlogInfo.SchemaVersion))
}
for _, job := range jobs {
if job.BinlogInfo.SchemaVersion > version {
s.jobs = append(s.jobs, job)
if latestJob == nil {
latestJob = job
} else {
if job.SnapshotVer < latestJob.SnapshotVer {
latestJob = job
}
}
} else {
s.tableSchemaVersion[job.TableID] = job.BinlogInfo.SchemaVersion
switch job.Type {
case model.ActionTruncateTable:
s.truncateTableID[job.TableID] = struct{}{}
case model.ActionDropColumn:
droppingColumn := droppingColumns[job.TableID]
if job.SchemaState == model.StateDeleteOnly {
log.Info("Got DeleteOnly Job", zap.Stringer("job", job))
droppingColumn++
} else {
log.Info("Finished dropping column", zap.Stringer("job", job))
droppingColumn--
}
droppingColumns[job.TableID] = droppingColumn
}
}
}
return nil
})
if err != nil {
return errors.Trace(err)
}
log.Info("load all history")

for tableID, droppingColumn := range droppingColumns {
if droppingColumn > 0 {
s.tblsDroppingCol[tableID] = true
}
}

// jobs from GetLastHistoryDDLJobsIterator are sorted by job id, need sorted by schema version
sort.Slice(s.jobs, func(i, j int) bool {
return s.jobs[i].BinlogInfo.SchemaVersion < s.jobs[j].BinlogInfo.SchemaVersion
})

snapshotTS := uint64(math.MaxUint64)
if latestJob != nil {
snapshotTS = uint64(latestJob.SnapshotVer)
}
snapshotSchema, err := dom.GetSnapshotInfoSchema(snapshotTS)
if err != nil {
return errors.Trace(err)
}

s.loadFromSnapshotSchema(snapshotSchema)

s.store.Close()
log.Info("clear store")
s.store = nil
return nil
}

func (s *Schema) loadFromSnapshotSchema(snapshotSchema infoschema.InfoSchema) {
schemas := snapshotSchema.AllSchemas()
version := snapshotSchema.SchemaMetaVersion()
s.schemaMetaVersion = version
for _, schema := range schemas {
s.schemas[schema.ID] = schema
s.schemaNameToID[schema.Name.O] = schema.ID
tables := schema.Tables
for _, table := range tables {
s.tables[table.ID] = []schemaVersionTableInfo{{
SchemaVersion: version,
TableInfo: table,
}}
s.tableIDToName[table.ID] = TableName{Schema: schema.Name.O, Table: table.Name.O}
}
}
}

func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
if s.store != nil {
if err := s.restoreFromSnapshot(version); err != nil {
return err
}
}

var i int
for i = 0; i < len(s.jobs); i++ {
job := s.jobs[i]
Expand Down
9 changes: 2 additions & 7 deletions drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-binlog/drainer/checkpoint"
"github.com/pingcap/tidb-binlog/pkg/flags"
"github.com/pingcap/tidb-binlog/pkg/node"
Expand Down Expand Up @@ -197,14 +198,8 @@ func createSyncer(etcdURLs string, cp checkpoint.CheckPoint, cfg *SyncerConfig)
if err != nil {
return nil, errors.Trace(err)
}
defer tiStore.Close()

jobs, err := loadHistoryDDLJobs(tiStore)
if err != nil {
return nil, errors.Trace(err)
}

syncer, err = NewSyncer(cp, cfg, jobs)
syncer, err = NewSyncer(cp, cfg, []*model.Job{}, tiStore)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,20 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/pkg/loader"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"go.uber.org/zap"

"github.com/pingcap/tidb-binlog/drainer/checkpoint"
"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/drainer/relay"
dsync "github.com/pingcap/tidb-binlog/drainer/sync"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb-binlog/pkg/loader"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
pb "github.com/pingcap/tipb/go-binlog"
"go.uber.org/zap"
)

// runWaitThreshold is the expected time for `Syncer.run` to quit
Expand Down Expand Up @@ -63,7 +62,7 @@ type Syncer struct {
}

// NewSyncer returns a Drainer instance
func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) (*Syncer, error) {
func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job, store kv.Storage) (*Syncer, error) {
syncer := new(Syncer)
syncer.cfg = cfg
syncer.cp = cp
Expand All @@ -85,6 +84,7 @@ func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) (
if err != nil {
return nil, errors.Trace(err)
}
syncer.schema.store = store

syncer.dsyncer, err = createDSyncer(cfg, syncer.schema, syncer.loopbackSync)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion drainer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *syncerSuite) TestNewSyncer(c *check.C) {
cp, err := checkpoint.NewFile(0, cpFile)
c.Assert(err, check.IsNil)

syncer, err := NewSyncer(cp, cfg, nil)
syncer, err := NewSyncer(cp, cfg, nil, nil)
c.Assert(err, check.IsNil)

// run syncer
Expand Down
28 changes: 15 additions & 13 deletions drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@ package drainer

import (
"fmt"
"github.com/pingcap/tidb/session"
"math"
"net"
"net/url"
"os"
"path"
"sort"
"sync"

"github.com/Shopify/sarama"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-binlog/drainer/checkpoint"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -150,23 +151,16 @@ func getDDLJob(tiStore kv.Storage, id int64) (*model.Job, error) {
return job, nil
}

// loadHistoryDDLJobs loads all history DDL jobs from TiDB
func loadHistoryDDLJobs(tiStore kv.Storage) ([]*model.Job, error) {
func loadHistoryMeta(tiStore kv.Storage) (*meta.Meta, *domain.Domain, error) {
snapMeta, err := getSnapshotMeta(tiStore)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}
jobs, err := snapMeta.GetAllHistoryDDLJobs()
dom, err := getDomain(tiStore)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}

// jobs from GetAllHistoryDDLJobs are sorted by job id, need sorted by schema version
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].BinlogInfo.SchemaVersion < jobs[j].BinlogInfo.SchemaVersion
})

return jobs, nil
return snapMeta, dom, nil
}

func getSnapshotMeta(tiStore kv.Storage) (*meta.Meta, error) {
Expand All @@ -178,6 +172,14 @@ func getSnapshotMeta(tiStore kv.Storage) (*meta.Meta, error) {
return meta.NewSnapshotMeta(snapshot), nil
}

func getDomain(tiStore kv.Storage) (*domain.Domain, error) {
dom, err := session.GetDomain(tiStore)
if err != nil {
return nil, errors.Trace(err)
}
return dom, nil
}

func genDrainerID(listenAddr string) (string, error) {
urllis, err := url.Parse(listenAddr)
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ require (
github.com/Shopify/sarama v1.24.1
github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/dustin/go-humanize v1.0.0
github.com/fatih/color v1.10.0 // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/gogo/protobuf v1.3.1
github.com/golang/mock v1.4.3
github.com/golang/protobuf v1.3.4
github.com/google/gofuzz v1.0.0
github.com/gorilla/mux v1.7.4
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d
github.com/mattn/go-colorable v0.1.7 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/onsi/ginkgo v1.11.0 // indirect
github.com/onsi/gomega v1.8.1 // indirect
Expand All @@ -37,11 +37,15 @@ require (
go.etcd.io/bbolt v1.3.5 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20200824191128-ae9734ed278b
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20200904194848-62affa334b73
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20200819171115-d785dc25833f
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f
golang.org/x/tools v0.0.0-20201120155355-20be4ac4bd6e // indirect
google.golang.org/grpc v1.27.1
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
)

go 1.13

replace github.com/pingcap/tidb => github.com/you06/tidb v1.1.0-beta.0.20201229074854-c45f38760eff
Loading