Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into import-data-race
Browse files Browse the repository at this point in the history
  • Loading branch information
IANTHEREAL committed Nov 13, 2019
2 parents 709088b + 699e7ed commit 15548df
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 120 deletions.
15 changes: 8 additions & 7 deletions _utils/terror_gen/errors_release.txt
Expand Up @@ -37,7 +37,7 @@ ErrAddWatchForRelayLogDir,[code=11030:class=functional:scope=internal:level=high
ErrWatcherStart,[code=11031:class=functional:scope=internal:level=high],"watcher starts for relay log dir %s"
ErrWatcherChanClosed,[code=11032:class=functional:scope=internal:level=high],"watcher's %s chan for relay log dir %s closed"
ErrWatcherChanRecvError,[code=11033:class=functional:scope=internal:level=high],"watcher receives error, relay log dir %s"
ErrRelayLogFileSizeSmaller,[code=11034:class=functional:scope=internal:level=high],"file size of relay log %s become smaller"
ErrRelayLogFileSizeSmaller,[code=11034:class=functional:scope=internal:level=high],"file size of relay log %s become smaller, please check the status of relay log and re-pull it. If you want to re-pull it, you should open relay.meta, set the binlog-name to the error pos name, set binlog-pos to 4, delete the stashed relay log and run `resume-relay` in dmctl"
ErrBinlogFileNotSpecified,[code=11035:class=functional:scope=internal:level=high],"binlog file must be specified"
ErrNoRelayLogMatchPos,[code=11036:class=functional:scope=internal:level=high],"no relay log files in dir %s match pos %s"
ErrFirstRelayLogNotMatchPos,[code=11037:class=functional:scope=internal:level=high],"the first relay log %s not match the start pos %v"
Expand Down Expand Up @@ -109,6 +109,7 @@ ErrTracingGetTSO,[code=11102:class=functional:scope=internal:level=high],"get ts
ErrBackoffArgsNotValid,[code=11103:class=functional:scope=internal:level=medium],"backoff argument %s value %v not valid"
ErrInitLoggerFail,[code=11104:class=functional:scope=internal:level=medium],"init logger failed"
ErrGTIDTruncateInvalid,[code=11105:class=functional:scope=internal:level=high],"truncate GTID sets %v to %v not valid"
ErrRelayLogGivenPosTooBig,[code=11106:class=functional:scope=internal:level=high],"the given relay log pos %s of meta config is too big, please check it again"
ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium],"checking item %s is not supported\n%s"
ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium],"%s"
ErrConfigTaskYamlTransform,[code=20003:class=config:scope=internal:level=medium],"%s"
Expand Down Expand Up @@ -173,7 +174,7 @@ ErrRelayReaderNeedStart,[code=30011:class=relay-unit:scope=internal:level=high],
ErrRelayTCPReaderStartSync,[code=30012:class=relay-unit:scope=upstream:level=high],"start sync from position %s"
ErrRelayTCPReaderNilGTID,[code=30013:class=relay-unit:scope=internal:level=high],"nil GTID set not valid"
ErrRelayTCPReaderStartSyncGTID,[code=30014:class=relay-unit:scope=upstream:level=high],"start sync from GTID set %s"
ErrRelayTCPReaderGetEvent,[code=30015:class=relay-unit:scope=upstream:level=high],"TCPReader get event"
ErrRelayTCPReaderGetEvent,[code=30015:class=relay-unit:scope=upstream:level=high],"TCPReader get relay event with error"
ErrRelayWriterNotStateNew,[code=30016:class=relay-unit:scope=internal:level=high],"stage %s, expect %s, already started"
ErrRelayWriterStateCannotClose,[code=30017:class=relay-unit:scope=internal:level=high],"stage %s, expect %s, can not close"
ErrRelayWriterNeedStart,[code=30018:class=relay-unit:scope=internal:level=high],"stage %s, expect %s, please start the writer first"
Expand Down Expand Up @@ -307,9 +308,9 @@ ErrMasterGetWorkerCfgExtractor,[code=38027:class=dm-master:scope=internal:level=
ErrMasterTaskConfigExtractor,[code=38028:class=dm-master:scope=internal:level=high],""
ErrMasterWorkerArgsExtractor,[code=38029:class=dm-master:scope=internal:level=high],""
ErrMasterQueryWorkerConfig,[code=38030:class=dm-master:scope=internal:level=high],""
ErrMasterOperNotFound,[code=38031:class=dm-master:scope=internal:level=high],"operation %d of task %s not found, please execute `query-status` to check status"
ErrMasterOperRespNotSuccess,[code=38032:class=dm-master:scope=internal:level=high],"operation not success: %s"
ErrMasterOperRequestTimeout,[code=38033:class=dm-master:scope=internal:level=high],"request is timeout, but request may be successful, please execute `query-status` to check status"
ErrMasterOperNotFound,[code=38031:class=dm-master:scope=internal:level=high],"operation %d of task %s on worker %s not found, please execute `query-status` to check status"
ErrMasterOperRespNotSuccess,[code=38032:class=dm-master:scope=internal:level=high],"operation %d of task %s on worker %s not success: %s"
ErrMasterOperRequestTimeout,[code=38033:class=dm-master:scope=internal:level=high],"request to dm-worker %s is timeout, but request may be successful, please execute `query-status` to check status"
ErrMasterHandleHTTPApis,[code=38034:class=dm-master:scope=internal:level=high],"serve http apis to grpc"
ErrMasterHostPortNotValid,[code=38035:class=dm-master:scope=internal:level=high],"host:port '%s' not valid"
ErrMasterGetHostnameFail,[code=38036:class=dm-master:scope=internal:level=high],"get hostname fail"
Expand Down Expand Up @@ -381,8 +382,8 @@ ErrWorkerDDLLockInfoExists,[code=40062:class=dm-worker:scope=internal:level=high
ErrWorkerCacheDDLInfoExists,[code=40063:class=dm-worker:scope=internal:level=high],"CacheDDLInfo for task %s already exists"
ErrWorkerExecSkipDDLConflict,[code=40064:class=dm-worker:scope=internal:level=high],"execDDL and skipDDL can not specify both at the same time"
ErrWorkerExecDDLSyncerOnly,[code=40065:class=dm-worker:scope=internal:level=high],"only syncer support ExecuteDDL, but current unit is %s"
ErrWorkerExecDDLTimeout,[code=40066:class=dm-worker:scope=internal:level=high],"ExecuteDDL timeout, try use `query-status` to query whether the DDL is still blocking"
ErrWorkerWaitRelayCatchupTimeout,[code=40067:class=dm-worker:scope=internal:level=high],"wait relay catchup timeout, loader end binlog pos: %s, relay binlog pos: %s"
ErrWorkerExecDDLTimeout,[code=40066:class=dm-worker:scope=internal:level=high],"ExecuteDDL timeout (exceeding %s), try use `query-status` to query whether the DDL is still blocking"
ErrWorkerWaitRelayCatchupTimeout,[code=40067:class=dm-worker:scope=internal:level=high],"waiting for relay binlog pos to catch up with loader end binlog pos is timeout (exceeding %s), loader end binlog pos: %s, relay binlog pos: %s"
ErrWorkerRelayIsPurging,[code=40068:class=dm-worker:scope=internal:level=high],"relay log purger is purging, cannot start sub task %s, please try again later"
ErrWorkerHostPortNotValid,[code=40069:class=dm-worker:scope=internal:level=high],"host:port '%s' not valid"
ErrTracerParseFlagSet,[code=42001:class=dm-tracer:scope=internal:level=medium],"parse dm-tracer config flag set"
Expand Down
2 changes: 1 addition & 1 deletion dm/config/task.go
Expand Up @@ -314,7 +314,7 @@ func (c *TaskConfig) DecodeFile(fpath string) error {
func (c *TaskConfig) Decode(data string) error {
err := yaml.UnmarshalStrict([]byte(data), c)
if err != nil {
return terror.ErrConfigTaskYamlTransform.Delegate(err, "decode config from data")
return terror.ErrConfigTaskYamlTransform.Delegate(err, "decode task config failed")
}

return c.adjust()
Expand Down
26 changes: 13 additions & 13 deletions dm/master/server.go
Expand Up @@ -31,7 +31,7 @@ import (

"github.com/pingcap/dm/checker"
"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/master/sql-operator"
operator "github.com/pingcap/dm/dm/master/sql-operator"
"github.com/pingcap/dm/dm/master/workerrpc"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/log"
Expand Down Expand Up @@ -240,7 +240,7 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S
StartSubTask: &pb.StartSubTaskRequest{Task: stCfgToml},
}
resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout)
workerResp := s.handleOperationResult(ctx, cli, taskName, err, resp)
workerResp := s.handleOperationResult(ctx, cli, taskName, worker, err, resp)
workerResp.Meta.Worker = worker
workerRespCh <- workerResp.Meta
}, func(args ...interface{}) {
Expand Down Expand Up @@ -332,7 +332,7 @@ func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (*
return
}
resp, err := cli.SendRequest(ctx, subReq, s.cfg.RPCTimeout)
workerResp := s.handleOperationResult(ctx, cli, req.Name, err, resp)
workerResp := s.handleOperationResult(ctx, cli, req.Name, worker1, err, resp)
workerResp.Op = req.Op
workerResp.Meta.Worker = worker1
workerRespCh <- workerResp
Expand Down Expand Up @@ -424,7 +424,7 @@ func (s *Server) UpdateTask(ctx context.Context, req *pb.UpdateTaskRequest) (*pb
UpdateSubTask: &pb.UpdateSubTaskRequest{Task: stCfgToml},
}
resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout)
workerResp := s.handleOperationResult(ctx, cli, taskName, err, resp)
workerResp := s.handleOperationResult(ctx, cli, taskName, worker, err, resp)
workerResp.Meta.Worker = worker
workerRespCh <- workerResp.Meta
}, func(args ...interface{}) {
Expand Down Expand Up @@ -1853,11 +1853,11 @@ var (
retryInterval = time.Second
)

func (s *Server) waitOperationOk(ctx context.Context, cli workerrpc.Client, name string, opLogID int64) error {
func (s *Server) waitOperationOk(ctx context.Context, cli workerrpc.Client, taskName, workerID string, opLogID int64) error {
req := &workerrpc.Request{
Type: workerrpc.CmdQueryTaskOperation,
QueryTaskOperation: &pb.QueryTaskOperationRequest{
Name: name,
Name: taskName,
LogID: opLogID,
},
}
Expand All @@ -1866,18 +1866,18 @@ func (s *Server) waitOperationOk(ctx context.Context, cli workerrpc.Client, name
resp, err := cli.SendRequest(ctx, req, s.cfg.RPCTimeout)
var queryResp *pb.QueryTaskOperationResponse
if err != nil {
log.L().Error("fail to query task operation", zap.String("task", name), zap.Int64("operation log ID", opLogID), log.ShortError(err))
log.L().Error("fail to query task operation", zap.String("task", taskName), zap.String("worker", workerID), zap.Int64("operation log ID", opLogID), log.ShortError(err))
} else {
queryResp = resp.QueryTaskOperation
respLog := queryResp.Log
if respLog == nil {
return terror.ErrMasterOperNotFound.Generate(opLogID, name)
return terror.ErrMasterOperNotFound.Generate(opLogID, taskName, workerID)
} else if respLog.Success {
return nil
} else if len(respLog.Message) != 0 {
return terror.ErrMasterOperRespNotSuccess.Generate(respLog.Message)
return terror.ErrMasterOperRespNotSuccess.Generate(opLogID, taskName, workerID, respLog.Message)
}
log.L().Info("wait op log result", zap.String("task", name), zap.Int64("operation log ID", opLogID), zap.Stringer("result", resp.QueryTaskOperation))
log.L().Info("wait op log result", zap.String("task", taskName), zap.String("worker", workerID), zap.Int64("operation log ID", opLogID), zap.Stringer("result", resp.QueryTaskOperation))
}

select {
Expand All @@ -1887,10 +1887,10 @@ func (s *Server) waitOperationOk(ctx context.Context, cli workerrpc.Client, name
}
}

return terror.ErrMasterOperRequestTimeout.Generate()
return terror.ErrMasterOperRequestTimeout.Generate(workerID)
}

func (s *Server) handleOperationResult(ctx context.Context, cli workerrpc.Client, name string, err error, resp *workerrpc.Response) *pb.OperateSubTaskResponse {
func (s *Server) handleOperationResult(ctx context.Context, cli workerrpc.Client, taskName, workerID string, err error, resp *workerrpc.Response) *pb.OperateSubTaskResponse {
if err != nil {
return &pb.OperateSubTaskResponse{
Meta: errorCommonWorkerResponse(errors.ErrorStack(err), ""),
Expand All @@ -1910,7 +1910,7 @@ func (s *Server) handleOperationResult(ctx context.Context, cli workerrpc.Client
return response
}

err = s.waitOperationOk(ctx, cli, name, response.LogID)
err = s.waitOperationOk(ctx, cli, taskName, workerID, response.LogID)
if err != nil {
response.Meta = errorCommonWorkerResponse(errors.ErrorStack(err), "")
}
Expand Down
7 changes: 4 additions & 3 deletions dm/worker/subtask.go
Expand Up @@ -514,7 +514,7 @@ func (st *SubTask) ExecuteDDL(ctx context.Context, req *pb.ExecDDLRequest) error
case <-ctx.Done():
return ctx.Err()
case <-ctxTimeout.Done():
return terror.ErrWorkerExecDDLTimeout.Generate()
return terror.ErrWorkerExecDDLTimeout.Generate(timeout)
}
}

Expand Down Expand Up @@ -623,7 +623,8 @@ func (st *SubTask) unitTransWaitCondition() error {
if pu != nil && pu.Type() == pb.UnitType_Load && cu.Type() == pb.UnitType_Sync {
st.l.Info("wait condition between two units", zap.Stringer("previous unit", pu.Type()), zap.Stringer("unit", cu.Type()))
hub := GetConditionHub()
ctx, cancel := context.WithTimeout(hub.w.ctx, 5*time.Minute)
waitRelayCatchupTimeout := 5 * time.Minute
ctx, cancel := context.WithTimeout(hub.w.ctx, waitRelayCatchupTimeout)
defer cancel()

loadStatus := pu.Status().(*pb.LoadStatus)
Expand All @@ -644,7 +645,7 @@ func (st *SubTask) unitTransWaitCondition() error {

select {
case <-ctx.Done():
return terror.ErrWorkerWaitRelayCatchupTimeout.Generate(pos1, pos2)
return terror.ErrWorkerWaitRelayCatchupTimeout.Generate(waitRelayCatchupTimeout, pos1, pos2)
case <-time.After(time.Millisecond * 50):
}
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/streamer/reader.go
Expand Up @@ -15,6 +15,7 @@ package streamer

import (
"context"
"os"
"path"
"path/filepath"
"strings"
Expand Down Expand Up @@ -83,6 +84,25 @@ func NewBinlogReader(tctx *tcontext.Context, cfg *BinlogReaderConfig) *BinlogRea
}
}

// checkRelayPos will check whether the given relay pos is too big
func (r *BinlogReader) checkRelayPos(pos mysql.Position) error {
currentUUID, _, realPos, err := binlog.ExtractPos(pos, r.uuids)
if err != nil {
return terror.Annotatef(err, "parse relay dir with pos %s", pos)
}
pos = realPos
relayFilepath := path.Join(r.cfg.RelayDir, currentUUID, pos.Name)
r.tctx.L().Info("start to check relay log file", zap.String("path", relayFilepath), zap.Stringer("position", pos))
fi, err := os.Stat(relayFilepath)
if err != nil {
return terror.ErrGetRelayLogStat.Delegate(err, relayFilepath)
}
if fi.Size() < int64(pos.Pos) {
return terror.ErrRelayLogGivenPosTooBig.Generate(pos)
}
return nil
}

// StartSync start syncon
// TODO: thread-safe?
func (r *BinlogReader) StartSync(pos mysql.Position) (Streamer, error) {
Expand All @@ -99,6 +119,10 @@ func (r *BinlogReader) StartSync(pos mysql.Position) (Streamer, error) {
if err != nil {
return nil, err
}
err = r.checkRelayPos(pos)
if err != nil {
return nil, err
}

r.latestServerID = 0
r.running = true
Expand Down
28 changes: 18 additions & 10 deletions pkg/streamer/reader_test.go
Expand Up @@ -603,11 +603,10 @@ func (t *testReaderSuite) TestStartSyncError(c *C) {
startPos = gmysql.Position{Name: "test-mysql-bin|000001.000001"} // from the first relay log file in the first sub directory
)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

tctx := tcontext.Background()
r := NewBinlogReader(tctx, cfg)
err := r.checkRelayPos(startPos)
c.Assert(err, ErrorMatches, ".*empty UUIDs not valid.*")

// no startup pos specified
s, err := r.StartSync(gmysql.Position{})
Expand All @@ -616,11 +615,8 @@ func (t *testReaderSuite) TestStartSyncError(c *C) {

// empty UUIDs
s, err = r.StartSync(startPos)
c.Assert(err, IsNil)
ev, err := s.GetEvent(ctx)
c.Assert(err, ErrorMatches, ".*empty UUIDs not valid.*")
c.Assert(ev, IsNil)
r.Close()
c.Assert(s, IsNil)

// write UUIDs into index file
r = NewBinlogReader(tctx, cfg) // create a new reader
Expand All @@ -630,16 +626,28 @@ func (t *testReaderSuite) TestStartSyncError(c *C) {

// the startup relay log file not found
s, err = r.StartSync(startPos)
c.Assert(err, IsNil)
ev, err = s.GetEvent(ctx)
c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*not found.*", startPos.Name))
c.Assert(ev, IsNil)
c.Assert(s, IsNil)

// can not re-start the reader
r.running = true
s, err = r.StartSync(startPos)
c.Assert(terror.ErrReaderAlreadyRunning.Equal(err), IsTrue)
c.Assert(s, IsNil)
r.Close()

// too big startPos
uuid := UUIDs[0]
err = os.MkdirAll(filepath.Join(baseDir, uuid), 0700)
c.Assert(err, IsNil)
parsedStartPosName := "test-mysql-bin.000001"
relayLogFilePath := filepath.Join(baseDir, uuid, parsedStartPosName)
err = ioutil.WriteFile(relayLogFilePath, make([]byte, 100), 0600)
c.Assert(err, IsNil)
startPos.Pos = 10000
s, err = r.StartSync(startPos)
c.Assert(terror.ErrRelayLogGivenPosTooBig.Equal(err), IsTrue)
c.Assert(s, IsNil)
}

func (t *testReaderSuite) genBinlogEvents(c *C, latestPos uint32) []*replication.BinlogEvent {
Expand Down

0 comments on commit 15548df

Please sign in to comment.