Skip to content

Commit

Permalink
resolve OOM when restart
Browse files Browse the repository at this point in the history
  • Loading branch information
PingCAP committed Dec 29, 2020
1 parent 7f342eb commit e68b624
Show file tree
Hide file tree
Showing 6 changed files with 649 additions and 55 deletions.
67 changes: 42 additions & 25 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@ package drainer

import (
"encoding/json"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"math"
"sort"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"go.uber.org/zap"
)

Expand All @@ -51,6 +52,7 @@ type Schema struct {
hasImplicitCol bool

jobs []*model.Job
etcdURLs string
store kv.Storage
version2SchemaTable map[int64]TableName
currentVersion int64
Expand Down Expand Up @@ -273,40 +275,49 @@ func (s *Schema) addJob(job *model.Job) {
}
}

var mDDLJobHistoryKey = []byte("DDLJobHistory")

func (s *Schema) restoreFromSnapshot(version int64) error {
var (
batchSize = 100
jobs []*model.Job
droppingColumns = make(map[int64]int64)
jobsMeta *meta.Meta
dom *domain.Domain
iter *meta.LastJobIterator
latestJob *model.Job
err error
)
log.Debug("restore from snapshot")
log.Info("restore from snapshot", zap.Int64("version", version))

jobsMeta, dom, err = loadHistoryMeta(s.store)
if err != nil {
log.Error("load history meta failed", zap.Error(err))
return errors.Trace(err)
}
log.Debug("load history meta")
iter, err = jobsMeta.GetLastHistoryDDLJobsIterator()
if err != nil {
return errors.Trace(err)
}
log.Debug("get history iterator")
for {
jobs, err = iter.GetLastJobs(batchSize, jobs)
txn := jobsMeta.GetTxn()

i := 0
err = txn.IterateHash(mDDLJobHistoryKey, func(field []byte, value []byte) error {
i++
job := &model.Job{}
err := job.Decode(value)
if err != nil {
return errors.Trace(err)
}
if len(jobs) > 0 {
log.Debug("get batched job", zap.Int64("schema version", jobs[0].BinlogInfo.SchemaVersion))
if i%1000 == 0 {
log.Info("get batched job",
zap.Int("i", i),
zap.Int64("schema version", job.BinlogInfo.SchemaVersion))
}
for _, job := range jobs {
if job.BinlogInfo.SchemaVersion > version {
s.jobs = append(s.jobs, job)
if latestJob == nil {
latestJob = job
} else {
if job.SnapshotVer < latestJob.SnapshotVer {
latestJob = job
}
}
} else {
s.tableSchemaVersion[job.TableID] = job.BinlogInfo.SchemaVersion
switch job.Type {
Expand All @@ -325,10 +336,12 @@ func (s *Schema) restoreFromSnapshot(version int64) error {
}
}
}
if len(jobs) < batchSize {
break
}
return nil
})
if err != nil {
return errors.Trace(err)
}
log.Info("load all history")

for tableID, droppingColumn := range droppingColumns {
if droppingColumn > 0 {
Expand All @@ -341,13 +354,19 @@ func (s *Schema) restoreFromSnapshot(version int64) error {
return s.jobs[i].BinlogInfo.SchemaVersion < s.jobs[j].BinlogInfo.SchemaVersion
})

snapshotSchema, err := dom.GetSnapshotInfoSchema(uint64(version))
snapshotTS := uint64(math.MaxUint64)
if latestJob != nil {
snapshotTS = uint64(latestJob.SnapshotVer)
}
snapshotSchema, err := dom.GetSnapshotInfoSchema(snapshotTS)
if err != nil {
return errors.Trace(err)
}

s.loadFromSnapshotSchema(snapshotSchema)

s.store.Close()
log.Info("clear store")
s.store = nil
return nil
}
Expand All @@ -360,15 +379,13 @@ func (s *Schema) loadFromSnapshotSchema(snapshotSchema infoschema.InfoSchema) {
s.schemas[schema.ID] = schema
s.schemaNameToID[schema.Name.O] = schema.ID
tables := schema.Tables
tableInfos := make([]schemaVersionTableInfo, 0, len(tables))
for _, table := range tables {
tableInfos = append(tableInfos, schemaVersionTableInfo{
s.tables[table.ID] = []schemaVersionTableInfo{{
SchemaVersion: version,
TableInfo: table,
})
}}
s.tableIDToName[table.ID] = TableName{Schema: schema.Name.O, Table: table.Name.O}
}
s.tables[schema.ID] = tableInfos
}
}

Expand Down
9 changes: 1 addition & 8 deletions drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package drainer

import (
"fmt"
"github.com/pingcap/parser/model"
"net/http"
"net/url"
"os"
Expand All @@ -27,6 +26,7 @@ import (
"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-binlog/drainer/checkpoint"
"github.com/pingcap/tidb-binlog/pkg/flags"
"github.com/pingcap/tidb-binlog/pkg/node"
Expand Down Expand Up @@ -198,13 +198,6 @@ func createSyncer(etcdURLs string, cp checkpoint.CheckPoint, cfg *SyncerConfig)
if err != nil {
return nil, errors.Trace(err)
}
defer tiStore.Close()

//jobs, err := loadHistoryDDLJobs(tiStore)
//jobsMeta, dom, err := loadHistoryMeta(tiStore)
//if err != nil {
// return nil, errors.Trace(err)
//}

syncer, err = NewSyncer(cp, cfg, []*model.Job{}, tiStore)
if err != nil {
Expand Down
10 changes: 4 additions & 6 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,25 @@
package drainer

import (
"github.com/pingcap/tidb/kv"
"reflect"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/pkg/loader"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"go.uber.org/zap"

"github.com/pingcap/tidb-binlog/drainer/checkpoint"
"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/drainer/relay"
dsync "github.com/pingcap/tidb-binlog/drainer/sync"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb-binlog/pkg/loader"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
pb "github.com/pingcap/tipb/go-binlog"
"go.uber.org/zap"
)

// runWaitThreshold is the expected time for `Syncer.run` to quit
Expand Down
24 changes: 11 additions & 13 deletions drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package drainer

import (
"fmt"
"github.com/ngaut/pools"
"math"
"net"
"net/url"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/session"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -171,37 +171,35 @@ func loadHistoryDDLJobs(tiStore kv.Storage) ([]*model.Job, error) {
}

func loadHistoryMeta(tiStore kv.Storage) (*meta.Meta, *domain.Domain, error) {
log.Debug("get snap meta", zap.String("store", fmt.Sprintf("%T", tiStore)))
snapMeta, err := getSnapshotMeta(tiStore)
if err != nil {
return nil, nil, errors.Trace(err)
}
dom := getDomain(tiStore)
log.Debug("get domain")
dom, err := getDomain(tiStore)
if err != nil {
return nil, nil, errors.Trace(err)
}
return snapMeta, dom, nil
}

func getSnapshotMeta(tiStore kv.Storage) (*meta.Meta, error) {
version, err := tiStore.CurrentVersion()
log.Debug("get store")
if err != nil {
return nil, errors.Trace(err)
}
snapshot, err := tiStore.GetSnapshot(version)
log.Debug("get snapshot")
if err != nil {
return nil, errors.Trace(err)
}
return meta.NewSnapshotMeta(snapshot), nil
}

func mockFactory() (pools.Resource, error) {
return nil, errors.New("mock factory should not be called")
}

func getDomain(tiStore kv.Storage) *domain.Domain {
dom := domain.NewDomain(tiStore, 0, 0, mockFactory)
return dom
func getDomain(tiStore kv.Storage) (*domain.Domain, error) {
dom, err := session.GetDomain(tiStore)
if err != nil {
return nil, errors.Trace(err)
}
return dom, nil
}

func genDrainerID(listenAddr string) (string, error) {
Expand Down
17 changes: 14 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ require (
github.com/DATA-DOG/go-sqlmock v1.3.0
github.com/Shopify/sarama v1.24.1
github.com/dustin/go-humanize v1.0.0
github.com/fatih/color v1.10.0 // indirect
github.com/go-openapi/spec v0.20.0 // indirect
github.com/go-playground/overalls v0.0.0-20191218162659-7df9f728c018 // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/gogo/protobuf v1.3.1
github.com/golang/mock v1.3.1
Expand All @@ -24,18 +27,26 @@ require (
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/samuel/go-zookeeper v0.0.0-20170815201139-e6b59f6144be
github.com/sergi/go-diff v1.1.0 // indirect
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
github.com/soheilhy/cmux v0.1.4
github.com/swaggo/swag v1.7.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285
github.com/tikv/pd v1.1.0-beta.0.20200921100508-9ee41c4144f3
github.com/uber/jaeger-lib v2.4.0+incompatible // indirect
github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/sys v0.0.0-20200819171115-d785dc25833f
golang.org/x/mod v0.4.0 // indirect
golang.org/x/net v0.0.0-20201224014010-6772e930b67b
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20201223074533-0d417f636930
golang.org/x/tools v0.0.0-20201226215659-b1c90890d22a // indirect
google.golang.org/grpc v1.26.0
)

go 1.13

replace github.com/pingcap/tidb => github.com/you06/tidb v1.1.0-beta.0.20201229071942-2be3600223c7
Loading

0 comments on commit e68b624

Please sign in to comment.