Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: trace the execution of the insert operation #11667

Merged
merged 7 commits into from Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/column_test.go
Expand Up @@ -280,7 +280,7 @@ func (s *testColumnSuite) checkColumnKVExist(ctx sessionctx.Context, t table.Tab
if err != nil {
return errors.Trace(err)
}
data, err := txn.Get(key)
data, err := txn.Get(context.TODO(), key)
if !isExist {
if terror.ErrorEqual(err, kv.ErrNotExist) {
return nil
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Expand Up @@ -764,7 +764,7 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i
w.distinctCheckFlags = append(w.distinctCheckFlags, distinct)
}

batchVals, err := txn.BatchGet(w.batchCheckKeys)
batchVals, err := txn.BatchGet(context.Background(), w.batchCheckKeys)
if err != nil {
return errors.Trace(err)
}
Expand Down
7 changes: 7 additions & 0 deletions distsql/distsql.go
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
Expand All @@ -33,6 +34,12 @@ const (
// Select sends a DAG request, returns SelectResult.
// In kvReq, KeyRanges is required, Concurrency/KeepOrder/Desc/IsolationLevel/Priority are optional.
func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (SelectResult, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("distsql.Select", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

// For testing purpose.
if hook := ctx.Value("CheckSelectRequestHook"); hook != nil {
hook.(func(*kv.Request))(kvReq)
Expand Down
4 changes: 2 additions & 2 deletions executor/admin.go
Expand Up @@ -377,7 +377,7 @@ func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows)
distinctFlags[i] = distinct
}

values, err := txn.BatchGet(e.batchKeys)
values, err := txn.BatchGet(context.Background(), e.batchKeys)
if err != nil {
return err
}
Expand Down Expand Up @@ -502,7 +502,7 @@ func (e *CleanupIndexExec) batchGetRecord(txn kv.Transaction) (map[string][]byte
for handle := range e.idxValues {
e.batchKeys = append(e.batchKeys, e.table.RecordKey(handle))
}
values, err := txn.BatchGet(e.batchKeys)
values, err := txn.BatchGet(context.Background(), e.batchKeys)
if err != nil {
return nil, err
}
Expand Down
38 changes: 23 additions & 15 deletions executor/batch_checker.go
Expand Up @@ -14,6 +14,9 @@
package executor

import (
"context"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -53,12 +56,12 @@ type batchChecker struct {
}

// batchGetOldValues gets the values of storage in batch.
func (b *batchChecker) batchGetOldValues(ctx sessionctx.Context, batchKeys []kv.Key) error {
txn, err := ctx.Txn(true)
func (b *batchChecker) batchGetOldValues(ctx context.Context, sctx sessionctx.Context, batchKeys []kv.Key) error {
txn, err := sctx.Txn(true)
if err != nil {
return err
}
values, err := txn.BatchGet(batchKeys)
values, err := txn.BatchGet(ctx, batchKeys)
if err != nil {
return err
}
Expand Down Expand Up @@ -87,7 +90,7 @@ func (b *batchChecker) encodeNewRow(ctx sessionctx.Context, t table.Table, row [

// getKeysNeedCheck gets keys converted from to-be-insert rows to record keys and unique index keys,
// which need to be checked whether they are duplicate keys.
func (b *batchChecker) getKeysNeedCheck(ctx sessionctx.Context, t table.Table, rows [][]types.Datum) ([]toBeCheckedRow, error) {
func (b *batchChecker) getKeysNeedCheck(ctx context.Context, sctx sessionctx.Context, t table.Table, rows [][]types.Datum) ([]toBeCheckedRow, error) {
nUnique := 0
for _, v := range t.WritableIndices() {
if v.Meta().Unique {
Expand All @@ -109,7 +112,7 @@ func (b *batchChecker) getKeysNeedCheck(ctx sessionctx.Context, t table.Table, r

var err error
for _, row := range rows {
toBeCheckRows, err = b.getKeysNeedCheckOneRow(ctx, t, row, nUnique, handleCol, toBeCheckRows)
toBeCheckRows, err = b.getKeysNeedCheckOneRow(sctx, t, row, nUnique, handleCol, toBeCheckRows)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -187,9 +190,9 @@ func (b *batchChecker) getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Ta
}

// batchGetInsertKeys uses batch-get to fetch all key-value pairs to be checked for ignore or duplicate key update.
func (b *batchChecker) batchGetInsertKeys(ctx sessionctx.Context, t table.Table, newRows [][]types.Datum) (err error) {
func (b *batchChecker) batchGetInsertKeys(ctx context.Context, sctx sessionctx.Context, t table.Table, newRows [][]types.Datum) (err error) {
// Get keys need to be checked.
b.toBeCheckedRows, err = b.getKeysNeedCheck(ctx, t, newRows)
b.toBeCheckedRows, err = b.getKeysNeedCheck(ctx, sctx, t, newRows)
if err != nil {
return err
}
Expand All @@ -211,11 +214,11 @@ func (b *batchChecker) batchGetInsertKeys(ctx sessionctx.Context, t table.Table,
batchKeys = append(batchKeys, k.newKV.key)
}
}
txn, err := ctx.Txn(true)
txn, err := sctx.Txn(true)
if err != nil {
return err
}
b.dupKVs, err = txn.BatchGet(batchKeys)
b.dupKVs, err = txn.BatchGet(ctx, batchKeys)
return err
}

Expand All @@ -231,7 +234,7 @@ func (b *batchChecker) initDupOldRowFromHandleKey() {
}
}

func (b *batchChecker) initDupOldRowFromUniqueKey(ctx sessionctx.Context, newRows [][]types.Datum) error {
func (b *batchChecker) initDupOldRowFromUniqueKey(ctx context.Context, sctx sessionctx.Context, newRows [][]types.Datum) error {
batchKeys := make([]kv.Key, 0, len(newRows))
for _, r := range b.toBeCheckedRows {
for _, uk := range r.uniqueKeys {
Expand All @@ -244,14 +247,19 @@ func (b *batchChecker) initDupOldRowFromUniqueKey(ctx sessionctx.Context, newRow
}
}
}
return b.batchGetOldValues(ctx, batchKeys)
return b.batchGetOldValues(ctx, sctx, batchKeys)
}

// initDupOldRowValue initializes dupOldRowValues which contain the to-be-updated rows from storage.
func (b *batchChecker) initDupOldRowValue(ctx sessionctx.Context, t table.Table, newRows [][]types.Datum) error {
func (b *batchChecker) initDupOldRowValue(ctx context.Context, sctx sessionctx.Context, t table.Table, newRows [][]types.Datum) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("batchCheck.initDupOldRowValue", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
b.dupOldRowValues = make(map[string][]byte, len(newRows))
b.initDupOldRowFromHandleKey()
return b.initDupOldRowFromUniqueKey(ctx, newRows)
return b.initDupOldRowFromUniqueKey(ctx, sctx, newRows)
}

// fillBackKeys fills the updated key-value pair to the dupKeyValues for further check.
Expand All @@ -268,8 +276,8 @@ func (b *batchChecker) fillBackKeys(t table.Table, row toBeCheckedRow, handle in
}

// deleteDupKeys picks primary/unique key-value pairs from rows and remove them from the dupKVs
func (b *batchChecker) deleteDupKeys(ctx sessionctx.Context, t table.Table, rows [][]types.Datum) error {
cleanupRows, err := b.getKeysNeedCheck(ctx, t, rows)
func (b *batchChecker) deleteDupKeys(ctx context.Context, sctx sessionctx.Context, t table.Table, rows [][]types.Datum) error {
cleanupRows, err := b.getKeysNeedCheck(ctx, sctx, t, rows)
if err != nil {
return err
}
Expand Down
42 changes: 25 additions & 17 deletions executor/insert.go
Expand Up @@ -16,6 +16,8 @@ package executor
import (
"context"
"fmt"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -57,18 +59,18 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
// If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword,
// the to-be-insert rows will be check on duplicate keys and update to the new rows.
if len(e.OnDuplicate) > 0 {
err := e.batchUpdateDupRows(rows)
err := e.batchUpdateDupRows(ctx, rows)
if err != nil {
return err
}
} else if ignoreErr {
err := e.batchCheckAndInsert(rows, e.addRecord)
err := e.batchCheckAndInsert(ctx, rows, e.addRecord)
if err != nil {
return err
}
} else {
for _, row := range rows {
if _, err := e.addRecord(row); err != nil {
if _, err := e.addRecord(ctx, row); err != nil {
return err
}
}
Expand All @@ -77,14 +79,14 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
}

// batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table.
func (e *InsertExec) batchUpdateDupRows(newRows [][]types.Datum) error {
err := e.batchGetInsertKeys(e.ctx, e.Table, newRows)
func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error {
err := e.batchGetInsertKeys(ctx, e.ctx, e.Table, newRows)
if err != nil {
return err
}

// Batch get the to-be-updated rows in storage.
err = e.initDupOldRowValue(e.ctx, e.Table, newRows)
err = e.initDupOldRowValue(ctx, e.ctx, e.Table, newRows)
if err != nil {
return err
}
Expand All @@ -96,7 +98,7 @@ func (e *InsertExec) batchUpdateDupRows(newRows [][]types.Datum) error {
if err != nil {
return err
}
err = e.updateDupRow(r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, r, handle, e.OnDuplicate)
if err != nil {
return err
}
Expand All @@ -109,7 +111,7 @@ func (e *InsertExec) batchUpdateDupRows(newRows [][]types.Datum) error {
if err != nil {
return err
}
err = e.updateDupRow(r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, r, handle, e.OnDuplicate)
if err != nil {
return err
}
Expand All @@ -122,7 +124,7 @@ func (e *InsertExec) batchUpdateDupRows(newRows [][]types.Datum) error {
// and key-values should be filled back to dupOldRowValues for the further row check,
// due to there may be duplicate keys inside the insert statement.
if newRows[i] != nil {
newHandle, err := e.addRecord(newRows[i])
newHandle, err := e.addRecord(ctx, newRows[i])
if err != nil {
return err
}
Expand Down Expand Up @@ -161,26 +163,32 @@ func (e *InsertExec) Open(ctx context.Context) error {
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
func (e *InsertExec) updateDupRow(ctx context.Context, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("InsertExec.updateDupRow", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

oldRow, err := e.getOldRow(e.ctx, row.t, handle, e.GenExprs)
if err != nil {
logutil.BgLogger().Error("get old row failed when insert on dup", zap.Int64("handle", handle), zap.String("toBeInsertedRow", types.DatumsToStrNoErr(row.row)))
return err
}
// Do update row.
updatedRow, handleChanged, newHandle, err := e.doDupRowUpdate(handle, oldRow, row.row, onDuplicate)
updatedRow, handleChanged, newHandle, err := e.doDupRowUpdate(ctx, handle, oldRow, row.row, onDuplicate)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
return nil
}
if err != nil {
return err
}
return e.updateDupKeyValues(handle, newHandle, handleChanged, oldRow, updatedRow)
return e.updateDupKeyValues(ctx, handle, newHandle, handleChanged, oldRow, updatedRow)
}

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(handle int64, oldRow []types.Datum, newRow []types.Datum,
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle int64, oldRow []types.Datum, newRow []types.Datum,
cols []*expression.Assignment) ([]types.Datum, bool, int64, error) {
assignFlag := make([]bool, len(e.Table.WritableCols()))
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
Expand All @@ -204,23 +212,23 @@ func (e *InsertExec) doDupRowUpdate(handle int64, oldRow []types.Datum, newRow [
}

newData := row4Update[:len(oldRow)]
_, handleChanged, newHandle, err := updateRecord(e.ctx, handle, oldRow, newData, assignFlag, e.Table, true)
_, handleChanged, newHandle, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true)
if err != nil {
return nil, false, 0, err
}
return newData, handleChanged, newHandle, nil
}

// updateDupKeyValues updates the dupKeyValues for further duplicate key check.
func (e *InsertExec) updateDupKeyValues(oldHandle int64, newHandle int64,
func (e *InsertExec) updateDupKeyValues(ctx context.Context, oldHandle int64, newHandle int64,
handleChanged bool, oldRow []types.Datum, updatedRow []types.Datum) error {
// There is only one row per update.
fillBackKeysInRows, err := e.getKeysNeedCheck(e.ctx, e.Table, [][]types.Datum{updatedRow})
fillBackKeysInRows, err := e.getKeysNeedCheck(ctx, e.ctx, e.Table, [][]types.Datum{updatedRow})
if err != nil {
return err
}
// Delete old keys and fill back new key-values of the updated row.
err = e.deleteDupKeys(e.ctx, e.Table, [][]types.Datum{oldRow})
err = e.deleteDupKeys(ctx, e.ctx, e.Table, [][]types.Datum{oldRow})
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions executor/insert_common.go
Expand Up @@ -582,10 +582,10 @@ func (e *InsertValues) handleWarning(err error) {

// batchCheckAndInsert checks rows with duplicate errors.
// All duplicate rows will be ignored and appended as duplicate warnings.
func (e *InsertValues) batchCheckAndInsert(rows [][]types.Datum, addRecord func(row []types.Datum) (int64, error)) error {
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) (int64, error)) error {
// all the rows will be checked, so it is safe to set BatchCheck = true
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true
err := e.batchGetInsertKeys(e.ctx, e.Table, rows)
err := e.batchGetInsertKeys(ctx, e.ctx, e.Table, rows)
if err != nil {
return err
}
Expand All @@ -611,7 +611,7 @@ func (e *InsertValues) batchCheckAndInsert(rows [][]types.Datum, addRecord func(
// There may be duplicate keys inside the insert statement.
if !skip {
e.ctx.GetSessionVars().StmtCtx.AddCopiedRows(1)
_, err = addRecord(rows[i])
_, err = addRecord(ctx, rows[i])
if err != nil {
return err
}
Expand All @@ -626,15 +626,15 @@ func (e *InsertValues) batchCheckAndInsert(rows [][]types.Datum, addRecord func(
return nil
}

func (e *InsertValues) addRecord(row []types.Datum) (int64, error) {
func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum) (int64, error) {
txn, err := e.ctx.Txn(true)
if err != nil {
return 0, err
}
if !e.ctx.GetSessionVars().ConstraintCheckInPlace {
txn.SetOption(kv.PresumeKeyNotExists, nil)
}
h, err := e.Table.AddRecord(e.ctx, row)
h, err := e.Table.AddRecord(e.ctx, row, table.WithCtx(ctx))
txn.DelOption(kv.PresumeKeyNotExists)
if err != nil {
return 0, err
Expand Down
6 changes: 3 additions & 3 deletions executor/load_data.go
Expand Up @@ -284,7 +284,7 @@ func (e *LoadDataInfo) CheckAndInsertOneBatch() error {
return err
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(e.curBatchCnt)
err = e.batchCheckAndInsert(e.rows[0:e.curBatchCnt], e.addRecordLD)
err = e.batchCheckAndInsert(context.TODO(), e.rows[0:e.curBatchCnt], e.addRecordLD)
if err != nil {
return err
}
Expand Down Expand Up @@ -332,11 +332,11 @@ func (e *LoadDataInfo) colsToRow(ctx context.Context, cols []field) []types.Datu
return row
}

func (e *LoadDataInfo) addRecordLD(row []types.Datum) (int64, error) {
func (e *LoadDataInfo) addRecordLD(ctx context.Context, row []types.Datum) (int64, error) {
if row == nil {
return 0, nil
}
h, err := e.addRecord(row)
h, err := e.addRecord(ctx, row)
if err != nil {
e.handleWarning(err)
}
Expand Down