Skip to content

Commit

Permalink
Merge branch 'master' into read-purged-binlog
Browse files Browse the repository at this point in the history
  • Loading branch information
IANTHEREAL committed Jul 13, 2019
2 parents 2475654 + bda8ebd commit 4c7cf4a
Show file tree
Hide file tree
Showing 49 changed files with 1,138 additions and 321 deletions.
2 changes: 1 addition & 1 deletion arbiter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var (
var Registry = prometheus.NewRegistry()

func init() {
Registry.MustRegister(prometheus.NewProcessCollector(os.Getpid(), ""))
Registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
Registry.MustRegister(prometheus.NewGoCollector())

Registry.MustRegister(checkpointTSOGauge)
Expand Down
24 changes: 19 additions & 5 deletions arbiter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,15 @@ func (s *Server) Run() error {
wg.Done()
}()

var syncErr error

wg.Add(1)
go func() {
syncBinlogs(s.kafkaReader.Messages(), s.load)
wg.Done()
defer wg.Done()
syncErr = syncBinlogs(s.kafkaReader.Messages(), s.load)
if syncErr != nil {
s.Close()
}
}()

err := s.load.Run()
Expand All @@ -197,6 +202,10 @@ func (s *Server) Run() error {
return errors.Trace(err)
}

if syncErr != nil {
return errors.Trace(syncErr)
}

if err = s.saveFinishTS(StatusNormal); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -262,16 +271,21 @@ func (s *Server) loadStatus() (int, error) {
return status, errors.Trace(err)
}

func syncBinlogs(source <-chan *reader.Message, ld loader.Loader) {
func syncBinlogs(source <-chan *reader.Message, ld loader.Loader) (err error) {
dest := ld.Input()
defer ld.Close()
for msg := range source {
log.Debug("recv msg from kafka reader", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset))
txn := loader.SlaveBinlogToTxn(msg.Binlog)
txn, err := loader.SlaveBinlogToTxn(msg.Binlog)
if err != nil {
log.Error("transfer binlog failed, program will stop handling data from loader", zap.Error(err))
return err
}
txn.Metadata = msg
dest <- txn

queueSizeGauge.WithLabelValues("kafka_reader").Set(float64(len(source)))
queueSizeGauge.WithLabelValues("loader_input").Set(float64(len(dest)))
}
ld.Close()
return nil
}
3 changes: 2 additions & 1 deletion arbiter/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ func (s *syncBinlogsSuite) TestShouldSendBinlogToLoader(c *C) {
}()
ld := dummyLoader{input: dest}

syncBinlogs(source, &ld)
err := syncBinlogs(source, &ld)
c.Assert(err, IsNil)

c.Assert(len(dest), Equals, 2)
for _, m := range msgs {
Expand Down
12 changes: 8 additions & 4 deletions cmd/reparo/reparo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,29 @@ log-level = "info"
# for print, it just prints decoded value.
dest-type = "mysql"

# Enable safe mode to make reparo reentrant, which value can be "true", "false". If the value is "true", reparo will change the "update" command into "delete+replace".
# The default value of safe-mode is false.
# safe-mode = false

##replicate-do-db priority over replicate-do-table if have same db name
##and we support regular expression , start with '~' declare use regular expression.
#
#replicate-do-db = ["~^b.*","s1"]
#[[replicate-do-table]]
#db-name ="test"
#db-name = "test"
#tbl-name = "log"

#[[replicate-do-table]]
#db-name ="test"
#db-name = "test"
#tbl-name = "~^a.*"

#replicate-ignore-db = ["~^c.*","s2"]
#[[replicate-ignore-table]]
#db-name ="test"
#db-name = "test"
#tbl-name = "~^a.*"

[dest-db]
host = "127.0.0.1"
port = 3309
user = "root"
password = ""
password = ""
2 changes: 1 addition & 1 deletion drainer/checkpoint/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (

// mysql driver
_ "github.com/go-sql-driver/mysql"
tmysql "github.com/pingcap/parser/mysql"
pkgsql "github.com/pingcap/tidb-binlog/pkg/sql"
tmysql "github.com/pingcap/tidb/mysql"
)

// MysqlCheckPoint is a local savepoint struct for mysql
Expand Down
6 changes: 3 additions & 3 deletions drainer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb-binlog/pump"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"go.uber.org/zap"
Expand Down Expand Up @@ -76,7 +76,7 @@ var (
getDDLJobRetryWait = time.Second

// Make it possible to mock the following functions in tests
newStore = session.NewStore
newStore = store.New
newClient = etcd.NewClientFromCfg
fDDLJobGetter = getDDLJob
)
Expand All @@ -88,7 +88,7 @@ func NewCollector(cfg *Config, clusterID uint64, s *Syncer, cpt checkpoint.Check
return nil, errors.Trace(err)
}

if err := session.RegisterStore("tikv", tikv.Driver{}); err != nil {
if err := store.Register("tikv", tikv.Driver{}); err != nil {
if !strings.Contains(err.Error(), "already registered") {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 1 addition & 3 deletions drainer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package drainer

import (
"os"

bf "github.com/pingcap/tidb-binlog/pkg/binlogfile"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -126,7 +124,7 @@ var (
var registry = prometheus.NewRegistry()

func init() {
registry.MustRegister(prometheus.NewProcessCollector(os.Getpid(), ""))
registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())
registry.MustRegister(pumpPositionGauge)
registry.MustRegister(ddlJobsCounter)
Expand Down
4 changes: 2 additions & 2 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb/mysql"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -326,7 +326,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionCreateTable, model.ActionCreateView:
case model.ActionCreateTable, model.ActionCreateView, model.ActionRecoverTable:
table := job.BinlogInfo.TableInfo
if table == nil {
return "", "", "", errors.NotFoundf("table %d", job.TableID)
Expand Down
2 changes: 1 addition & 1 deletion drainer/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/types"
)

Expand Down
12 changes: 6 additions & 6 deletions drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/pingcap/tidb-binlog/pkg/node"
"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tipb/go-binlog"
Expand Down Expand Up @@ -216,7 +216,7 @@ func (s *Server) heartbeat(ctx context.Context) <-chan error {
s.tg.Go("heartbeat", func() {
defer func() {
close(errc)
s.Close()
go s.Close()
}()

for {
Expand Down Expand Up @@ -251,7 +251,7 @@ func (s *Server) Start() error {
}()

s.tg.GoNoPanic("collect", func() {
defer s.Close()
defer func() { go s.Close() }()
s.collector.Start(s.ctx)
})

Expand All @@ -262,7 +262,7 @@ func (s *Server) Start() error {
}

s.tg.GoNoPanic("syncer", func() {
defer s.Close()
defer func() { go s.Close() }()
if err := s.syncer.Start(); err != nil {
log.Error("syncer exited abnormal", zap.Error(err))
}
Expand Down Expand Up @@ -428,11 +428,11 @@ func createTiStore(urls string) (kv.Storage, error) {
return nil, errors.Trace(err)
}

if err := session.RegisterStore("tikv", tikv.Driver{}); err != nil {
if err := store.Register("tikv", tikv.Driver{}); err != nil {
return nil, errors.Trace(err)
}
tiPath := fmt.Sprintf("tikv://%s?disableGC=true", urlv.HostString())
tiStore, err := session.NewStore(tiPath)
tiStore, err := store.New(tiPath)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions drainer/translator/flash.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/dml"
"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
tipb "github.com/pingcap/tipb/go-binlog"
Expand Down Expand Up @@ -207,7 +207,7 @@ func GenFlashDDLSQL(sql string, schema string) (string, error) {
ddlParser := getParser()
stmt, err := ddlParser.ParseOneStmt(sql, "", "")
if err != nil {
return "", errors.Trace(err)
return "", errors.Annotatef(err, "parse sql failed: %s", sql)
}

switch stmt := stmt.(type) {
Expand Down
6 changes: 3 additions & 3 deletions drainer/translator/flash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -323,10 +323,10 @@ func (t *testTranslatorSuite) TestFlashGenDDLSQL(c *C) {
Equals,
"ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` DateTime DEFAULT '1998-08-08 13:13:13';")
// Current_timestamp for Timestamp.
check("alter table Test add column ts timestamp not null default current_timestamp'",
check("alter table Test add column ts timestamp not null default current_timestamp",
Matches,
"ALTER TABLE `test_schema`.`test` ADD COLUMN `ts` DateTime DEFAULT '"+dtRegex+"';")
check("alter table Test add column ts timestamp not null default current_timestamp()'",
check("alter table Test add column ts timestamp not null default current_timestamp()",
Matches,
"ALTER TABLE `test_schema`.`test` ADD COLUMN `ts` DateTime DEFAULT '"+dtRegex+"';")
// Time(duration).
Expand Down
2 changes: 1 addition & 1 deletion drainer/translator/flash_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/opcode"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"
Expand Down
2 changes: 1 addition & 1 deletion drainer/translator/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/util"
obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
pb "github.com/pingcap/tipb/go-binlog"
Expand Down
2 changes: 1 addition & 1 deletion drainer/translator/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/loader"
"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
tipb "github.com/pingcap/tipb/go-binlog"
Expand Down
2 changes: 1 addition & 1 deletion drainer/translator/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (

"github.com/pingcap/check"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/loader"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/types"
)

Expand Down
33 changes: 18 additions & 15 deletions drainer/translator/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,19 @@ func genDelete(schema string, table *model.TableInfo, row []byte) (event *pb.Eve
}

func encodeRow(row []types.Datum, colName []string, tp []byte, mysqlType []string) ([][]byte, error) {
var cols [][]byte
var err error
cols := make([][]byte, 0, len(row))
sc := &stmtctx.StatementContext{TimeZone: time.Local}
for i, c := range row {
col := &pb.Column{}
col.Name = colName[i]
col.Tp = []byte{tp[i]}
col.MysqlType = mysqlType[i]
col.Value, err = codec.EncodeValue(sc, nil, []types.Datum{c}...)
val, err := codec.EncodeValue(sc, nil, []types.Datum{c}...)
if err != nil {
return nil, errors.Trace(err)
}
col := pb.Column{
Name: colName[i],
Tp: []byte{tp[i]},
MysqlType: mysqlType[i],
Value: val,
}

colVal, err := col.Marshal()
if err != nil {
Expand All @@ -260,23 +261,25 @@ func encodeRow(row []types.Datum, colName []string, tp []byte, mysqlType []strin
}

func encodeUpdateRow(oldRow []types.Datum, newRow []types.Datum, colName []string, tp []byte, mysqlType []string) ([][]byte, error) {
var cols [][]byte
var err error
cols := make([][]byte, 0, len(oldRow))
sc := &stmtctx.StatementContext{TimeZone: time.Local}
for i, c := range oldRow {
col := &pb.Column{}
col.Name = colName[i]
col.Tp = []byte{tp[i]}
col.MysqlType = mysqlType[i]
col.Value, err = codec.EncodeValue(sc, nil, []types.Datum{c}...)
val, err := codec.EncodeValue(sc, nil, []types.Datum{c}...)
if err != nil {
return nil, errors.Trace(err)
}

col.ChangedValue, err = codec.EncodeValue(sc, nil, []types.Datum{newRow[i]}...)
changedVal, err := codec.EncodeValue(sc, nil, []types.Datum{newRow[i]}...)
if err != nil {
return nil, errors.Trace(err)
}
col := pb.Column{
Name: colName[i],
Tp: []byte{tp[i]},
MysqlType: mysqlType[i],
Value: val,
ChangedValue: changedVal,
}

colVal, err := col.Marshal()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion drainer/translator/pb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"fmt"

"github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
)
Expand Down
Loading

0 comments on commit 4c7cf4a

Please sign in to comment.