Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
crazycs520 committed Apr 1, 2022
1 parent fba518a commit fd170da
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 127 deletions.
2 changes: 1 addition & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error)

func iterateSnapshotRows(ctx *jobContext, store kv.Storage, priority int, t table.Table, version uint64,
func iterateSnapshotRows(ctx *JobContext, store kv.Storage, priority int, t table.Table, version uint64,
startKey kv.Key, endKey kv.Key, fn recordIterFunc) error {
var firstKey kv.Key
if startKey == nil {
Expand Down
6 changes: 3 additions & 3 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ func (w *worker) doModifyColumnTypeWithData(
return ver, errors.Trace(err)
}

reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs))
reorgInfo, err := getReorgInfo(w.JobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs))
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down Expand Up @@ -1148,7 +1148,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column
if err != nil {
return errors.Trace(err)
}
originalStartHandle, originalEndHandle, err := getTableRange(w.jobContext, reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority)
originalStartHandle, originalEndHandle, err := getTableRange(w.JobContext, reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1255,7 +1255,7 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
taskDone := false
var lastAccessedHandle kv.Key
oprStartTime := startTime
err := iterateSnapshotRows(w.ddlWorker.jobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
err := iterateSnapshotRows(w.ddlWorker.JobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in updateColumnWorker fetchRowColVals", 0)
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ type DDL interface {
// GetID gets the ddl ID.
GetID() string
// GetTableMaxHandle gets the max row ID of a normal table or a partition.
GetTableMaxHandle(ctx *jobContext, startTS uint64, tbl table.PhysicalTable) (kv.Handle, bool, error)
GetTableMaxHandle(ctx *JobContext, startTS uint64, tbl table.PhysicalTable) (kv.Handle, bool, error)
// SetBinlogClient sets the binlog client for DDL worker. It's exported for testing.
SetBinlogClient(*pumpcli.PumpsClient)
// GetHook gets the hook. It's exported for testing.
Expand Down
16 changes: 8 additions & 8 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ type worker struct {
lockSeqNum bool

*ddlCtx
*jobContext
*JobContext
}

// jobContext is the ddl job execution context.
type jobContext struct {
// JobContext is the ddl job execution context.
type JobContext struct {
// below fields are cache for top sql
ddlJobCtx context.Context
cacheSQL string
Expand All @@ -115,7 +115,7 @@ func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRan
tp: tp,
ddlJobCh: make(chan struct{}, 1),
ctx: ctx,
jobContext: &jobContext{
JobContext: &JobContext{
ddlJobCtx: context.Background(),
cacheSQL: "",
cacheNormalizedSQL: "",
Expand Down Expand Up @@ -466,7 +466,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
updateRawArgs = false
}
w.writeDDLSeqNum(job)
w.jobContext.resetWhenJobFinish()
w.JobContext.resetWhenJobFinish()
err = t.AddHistoryDDLJob(job, updateRawArgs)
return errors.Trace(err)
}
Expand Down Expand Up @@ -519,7 +519,7 @@ func newMetaWithQueueTp(txn kv.Transaction, tp workerType) *meta.Meta {
return meta.NewMeta(txn)
}

func (w *jobContext) setDDLLabelForTopSQL(job *model.Job) {
func (w *JobContext) setDDLLabelForTopSQL(job *model.Job) {
if !topsqlstate.TopSQLEnabled() || job == nil {
return
}
Expand All @@ -533,7 +533,7 @@ func (w *jobContext) setDDLLabelForTopSQL(job *model.Job) {
}
}

func (w *jobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger {
func (w *JobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger {
if !topsqlstate.TopSQLEnabled() || w.cacheDigest == nil {
return nil
}
Expand All @@ -546,7 +546,7 @@ func (w *jobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagg
return tagger
}

func (w *jobContext) resetWhenJobFinish() {
func (w *JobContext) resetWhenJobFinish() {
w.ddlJobCtx = context.Background()
w.cacheSQL = ""
w.cacheDigest = nil
Expand Down
8 changes: 4 additions & 4 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
}

elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, elements)
reorgInfo, err := getReorgInfo(w.JobContext, d, t, job, tbl, elements)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down Expand Up @@ -1141,7 +1141,7 @@ func (w *baseIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBac
// taskDone means that the reorged handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := startTime
err := iterateSnapshotRows(w.ddlWorker.jobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
err := iterateSnapshotRows(w.ddlWorker.JobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in baseIndexWorker fetchRowColVals", 0)
Expand Down Expand Up @@ -1410,7 +1410,7 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo
if err != nil {
return false, errors.Trace(err)
}
start, end, err := getTableRange(w.jobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
start, end, err := getTableRange(w.JobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -1594,7 +1594,7 @@ func (w *worker) updateReorgInfoForPartitions(t table.PartitionedTable, reorg *r
if err != nil {
return false, errors.Trace(err)
}
start, end, err := getTableRange(w.jobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
start, end, err := getTableRange(w.JobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
if err != nil {
return false, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey})
}
}
reorgInfo, err := getReorgInfoFromPartitions(w.jobContext, d, t, job, tbl, physicalTableIDs, elements)
reorgInfo, err := getReorgInfoFromPartitions(w.JobContext, d, t, job, tbl, physicalTableIDs, elements)

if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
Expand Down
2 changes: 1 addition & 1 deletion ddl/primary_key_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
func getTableMaxHandle(t *testing.T, d ddl.DDL, tbl table.Table, store kv.Storage) (kv.Handle, bool) {
ver, err := store.CurrentVersion(kv.GlobalTxnScope)
require.NoError(t, err)
maxHandle, emptyTable, err := d.GetTableMaxHandle(ver.Ver, tbl.(table.PhysicalTable))
maxHandle, emptyTable, err := d.GetTableMaxHandle(&ddl.JobContext{}, ver.Ver, tbl.(table.PhysicalTable))
require.NoError(t, err)
return maxHandle, emptyTable
}
Expand Down
10 changes: 5 additions & 5 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func getColumnsTypes(columns []*model.ColumnInfo) []*types.FieldType {
}

// buildDescTableScan builds a desc table scan upon tblInfo.
func (dc *ddlCtx) buildDescTableScan(ctx *jobContext, startTS uint64, tbl table.PhysicalTable,
func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table.PhysicalTable,
handleCols []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) {
sctx := newContext(dc.store)
dagPB, err := buildDescTableScanDAG(sctx, tbl, handleCols, limit)
Expand Down Expand Up @@ -476,7 +476,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx *jobContext, startTS uint64, tbl table.
}

// GetTableMaxHandle gets the max handle of a PhysicalTable.
func (dc *ddlCtx) GetTableMaxHandle(ctx *jobContext, startTS uint64, tbl table.PhysicalTable) (maxHandle kv.Handle, emptyTable bool, err error) {
func (dc *ddlCtx) GetTableMaxHandle(ctx *JobContext, startTS uint64, tbl table.PhysicalTable) (maxHandle kv.Handle, emptyTable bool, err error) {
var handleCols []*model.ColumnInfo
var pkIdx *model.IndexInfo
tblInfo := tbl.Meta()
Expand Down Expand Up @@ -542,7 +542,7 @@ func buildCommonHandleFromChunkRow(sctx *stmtctx.StatementContext, tblInfo *mode
}

// getTableRange gets the start and end handle of a table (or partition).
func getTableRange(ctx *jobContext, d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, priority int) (startHandleKey, endHandleKey kv.Key, err error) {
func getTableRange(ctx *JobContext, d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, priority int) (startHandleKey, endHandleKey kv.Key, err error) {
// Get the start handle of this partition.
err = iterateSnapshotRows(ctx, d.store, priority, tbl, snapshotVer, nil, nil,
func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (bool, error) {
Expand Down Expand Up @@ -579,7 +579,7 @@ func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) {
return ver, nil
}

func getReorgInfo(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elements []*meta.Element) (*reorgInfo, error) {
func getReorgInfo(ctx *JobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elements []*meta.Element) (*reorgInfo, error) {
var (
element *meta.Element
start kv.Key
Expand Down Expand Up @@ -671,7 +671,7 @@ func getReorgInfo(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl
return &info, nil
}

func getReorgInfoFromPartitions(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
var (
element *meta.Element
start kv.Key
Expand Down
6 changes: 3 additions & 3 deletions ddl/reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestReorg(t *testing.T) {
require.NoError(t, err)

m = meta.NewMeta(txn)
info, err1 := getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, nil)
info, err1 := getReorgInfo(&JobContext{}, d.ddlCtx, m, job, mockTbl, nil)
require.NoError(t, err1)
require.Equal(t, info.StartKey, kv.Key(handle.Encoded()))
require.Equal(t, info.currElement, e)
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestReorg(t *testing.T) {
err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
var err1 error
_, err1 = getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
_, err1 = getReorgInfo(&JobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err1))
require.Equal(t, job.SnapshotVer, uint64(0))
return nil
Expand All @@ -185,7 +185,7 @@ func TestReorg(t *testing.T) {
require.NoError(t, err)
err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
info1, err1 := getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
info1, err1 := getReorgInfo(&JobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
require.NoError(t, err1)
require.Equal(t, info1.currElement, info.currElement)
require.Equal(t, info1.StartKey, info.StartKey)
Expand Down
66 changes: 0 additions & 66 deletions ddl/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package testutil

import (
"context"
"runtime"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/domain"
Expand All @@ -26,7 +25,6 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/tikv/client-go/v2/tikvrpc"
)

// SessionExecInGoroutine export for testing.
Expand Down Expand Up @@ -83,67 +81,3 @@ func ExtractAllTableHandles(se session.Session, dbName, tbName string) ([]int64,
})
return allHandles, err
}

// GetReqStartKey gets start key of the request.
func GetReqStartKey(req *tikvrpc.Request) ([]byte, error) {
switch req.Type {
case tikvrpc.CmdGet:
request := req.Get()
return request.Key, nil
case tikvrpc.CmdScan:
request := req.Scan()
return request.StartKey, nil
case tikvrpc.CmdPrewrite:
request := req.Prewrite()
return request.Mutations[0].Key, nil
case tikvrpc.CmdCommit:
request := req.Commit()
return request.Keys[0], nil
case tikvrpc.CmdCleanup:
request := req.Cleanup()
return request.Key, nil
case tikvrpc.CmdBatchGet:
request := req.BatchGet()
return request.Keys[0], nil
case tikvrpc.CmdBatchRollback:
request := req.BatchRollback()
return request.Keys[0], nil
case tikvrpc.CmdScanLock:
request := req.ScanLock()
return request.StartKey, nil
case tikvrpc.CmdPessimisticLock:
request := req.PessimisticLock()
return request.PrimaryLock, nil
case tikvrpc.CmdCheckSecondaryLocks:
request := req.CheckSecondaryLocks()
return request.Keys[0], nil
case tikvrpc.CmdCop, tikvrpc.CmdCopStream:
request := req.Cop()
return request.Ranges[0].Start, nil
case tikvrpc.CmdGC, tikvrpc.CmdDeleteRange, tikvrpc.CmdTxnHeartBeat, tikvrpc.CmdRawGet,
tikvrpc.CmdRawBatchGet, tikvrpc.CmdRawPut, tikvrpc.CmdRawBatchPut, tikvrpc.CmdRawDelete, tikvrpc.CmdRawBatchDelete, tikvrpc.CmdRawDeleteRange,
tikvrpc.CmdRawScan, tikvrpc.CmdGetKeyTTL, tikvrpc.CmdRawCompareAndSwap, tikvrpc.CmdUnsafeDestroyRange, tikvrpc.CmdRegisterLockObserver,
tikvrpc.CmdCheckLockObserver, tikvrpc.CmdRemoveLockObserver, tikvrpc.CmdPhysicalScanLock, tikvrpc.CmdStoreSafeTS,
tikvrpc.CmdLockWaitInfo, tikvrpc.CmdMvccGetByKey, tikvrpc.CmdMvccGetByStartTs, tikvrpc.CmdSplitRegion,
tikvrpc.CmdDebugGetRegionProperties, tikvrpc.CmdEmpty:
// Ignore those requests since now, since it is no business with TopSQL.
return nil, nil
case tikvrpc.CmdBatchCop, tikvrpc.CmdMPPTask, tikvrpc.CmdMPPConn, tikvrpc.CmdMPPCancel, tikvrpc.CmdMPPAlive:
// Ignore mpp requests.
return nil, nil
case tikvrpc.CmdResolveLock, tikvrpc.CmdCheckTxnStatus, tikvrpc.CmdPessimisticRollback:
// TODO: add resource tag for those request. https://github.com/pingcap/tidb/issues/33621
return nil, nil
default:
return nil, errors.New("unknown request, check the new type RPC request here")
}
}

// GetStack gets the stacktrace.
func GetStack() []byte {
const size = 1024 * 64
buf := make([]byte, size)
stackSize := runtime.Stack(buf, false)
buf = buf[:stackSize]
return buf
}
2 changes: 1 addition & 1 deletion server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/metrics"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/util/testbridge"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/goleak"
)
Expand Down
33 changes: 0 additions & 33 deletions store/mockstore/unistore/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package unistore

import (
"fmt"
"io"
"math"
"os"
Expand All @@ -32,12 +31,9 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/tidb/ddl/testutil"
"github.com/pingcap/tidb/parser/terror"
us "github.com/pingcap/tidb/store/mockstore/unistore/tikv"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/tikv/client-go/v2/tikvrpc"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -424,35 +420,6 @@ func (c *RPCClient) CloseAddr(addr string) error {
return nil
}

func checkResourceTagForTopSQL(req *tikvrpc.Request) error {
if !topsqlstate.TopSQLEnabled() {
return nil
}
tag := req.GetResourceGroupTag()
if len(tag) > 0 {
return nil
}

startKey, err := testutil.GetReqStartKey(req)
if err != nil {
return err
}
var tid int64
if tablecodec.IsRecordKey(startKey) {
tid, _, _ = tablecodec.DecodeRecordKey(startKey)
}
if tablecodec.IsIndexKey(startKey) {
tid, _, _, _ = tablecodec.DecodeIndexKey(startKey)
}
// since the error maybe "invalid record key", should just ignore check resource tag for this request.
if tid > 0 {
stack := testutil.GetStack()
return fmt.Errorf("%v req does not set the resource tag, tid: %v, stack: %v",
req.Type.String(), tid, string(stack))
}
return nil
}

type mockClientStream struct{}

// Header implements grpc.ClientStream interface
Expand Down

0 comments on commit fd170da

Please sign in to comment.