Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: improve wide table insert & update performance #7935

Merged
merged 12 commits into from
Oct 23, 2018
13 changes: 8 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,11 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
hasRefCols: v.NeedFillDefaultValue,
SelectExec: selectExec,
}
err := ivs.initInsertColumns()
if err != nil {
b.err = err
return nil
}

if v.IsReplace {
return b.buildReplace(ivs)
Expand All @@ -572,25 +577,23 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
GenColumns: v.GenCols.Columns,
GenExprs: v.GenCols.Exprs,
}
tableCols := tbl.Cols()
columns, err := insertVal.getColumns(tableCols)
err := insertVal.initInsertColumns()
if err != nil {
b.err = errors.Trace(err)
b.err = err
return nil
}
loadDataExec := &LoadDataExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID()),
IsLocal: v.IsLocal,
loadDataInfo: &LoadDataInfo{
row: make([]types.Datum, len(columns)),
row: make([]types.Datum, len(insertVal.insertColumns)),
InsertValues: insertVal,
Path: v.Path,
Table: tbl,
FieldsInfo: v.FieldsInfo,
LinesInfo: v.LinesInfo,
IgnoreLines: v.IgnoreLines,
Ctx: b.ctx,
columns: columns,
},
}

Expand Down
10 changes: 3 additions & 7 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,10 @@ func (e *InsertExec) batchUpdateDupRows(newRows [][]types.Datum) error {
// Next implements Exec Next interface.
func (e *InsertExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
cols, err := e.getColumns(e.Table.Cols())
if err != nil {
return errors.Trace(err)
}

if len(e.children) > 0 && e.children[0] != nil {
return errors.Trace(e.insertRowsFromSelect(ctx, cols, e.exec))
return e.insertRowsFromSelect(ctx, e.exec)
}
return errors.Trace(e.insertRows(cols, e.exec))
return e.insertRows(e.exec)
}

// Close implements the Executor Close interface.
Expand All @@ -154,6 +149,7 @@ func (e *InsertExec) Open(ctx context.Context) error {
if e.SelectExec != nil {
return e.SelectExec.Open(ctx)
}
e.initEvalBuffer()
return nil
}

Expand Down
67 changes: 45 additions & 22 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,13 @@ type InsertValues struct {
GenColumns []*ast.ColumnName
GenExprs []expression.Expression

insertColumns []*table.Column

// colDefaultVals is used to store casted default value.
// Because not every insert statement needs colDefaultVals, so we will init the buffer lazily.
colDefaultVals []defaultVal
colDefaultVals []defaultVal
evalBuffer chunk.MutRow
evalBufferTypes []*types.FieldType
}

type defaultVal struct {
Expand All @@ -61,16 +65,18 @@ type defaultVal struct {
valid bool
}

// getColumns gets the explicitly specified columns of an insert statement. There are three cases:
// initInsertColumns sets the explicitly specified columns of an insert statement. There are three cases:
// There are three types of insert statements:
// 1 insert ... values(...) --> name type column
// 2 insert ... set x=y... --> set type column
// 3 insert ... (select ..) --> name type column
// See https://dev.mysql.com/doc/refman/5.7/en/insert.html
func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, error) {
func (e *InsertValues) initInsertColumns() error {
var cols []*table.Column
var err error

tableCols := e.Table.Cols()

if len(e.SetList) > 0 {
// Process `set` type column.
columns := make([]string, 0, len(e.SetList))
Expand All @@ -82,10 +88,10 @@ func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, e
}
cols, err = table.FindCols(tableCols, columns, e.Table.Meta().PKIsHandle)
if err != nil {
return nil, errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err)
return errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err)
}
if len(cols) == 0 {
return nil, errors.Errorf("INSERT INTO %s: empty column", e.Table.Meta().Name.O)
return errors.Errorf("INSERT INTO %s: empty column", e.Table.Meta().Name.O)
}
} else if len(e.Columns) > 0 {
// Process `name` type column.
Expand All @@ -98,7 +104,7 @@ func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, e
}
cols, err = table.FindCols(tableCols, columns, e.Table.Meta().PKIsHandle)
if err != nil {
return nil, errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err)
return errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err)
}
} else {
// If e.Columns are empty, use all columns instead.
Expand All @@ -114,10 +120,25 @@ func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, e
// Check column whether is specified only once.
err = table.CheckOnce(cols)
if err != nil {
return nil, errors.Trace(err)
return err
}
e.insertColumns = cols
return nil
}

return cols, nil
func (e *InsertValues) initEvalBuffer() {
numCols := len(e.Table.Cols())
if e.hasExtraHandle {
numCols++
}
e.evalBufferTypes = make([]*types.FieldType, numCols)
for i, col := range e.Table.Cols() {
e.evalBufferTypes[i] = &col.FieldType
}
if e.hasExtraHandle {
e.evalBufferTypes[len(e.evalBufferTypes)-1] = types.NewFieldType(mysql.TypeLonglong)
}
e.evalBuffer = chunk.MutRowFromTypes(e.evalBufferTypes)
}

func (e *InsertValues) lazilyInitColDefaultValBuf() (ok bool) {
Expand Down Expand Up @@ -150,7 +171,7 @@ func (e *InsertValues) processSetList() error {
}

// insertRows processes `insert|replace into values ()` or `insert|replace into set x=y`
func (e *InsertValues) insertRows(cols []*table.Column, exec func(rows [][]types.Datum) error) (err error) {
func (e *InsertValues) insertRows(exec func(rows [][]types.Datum) error) (err error) {
// For `insert|replace into set x=y`, process the set list here.
if err = e.processSetList(); err != nil {
return errors.Trace(err)
Expand All @@ -159,7 +180,7 @@ func (e *InsertValues) insertRows(cols []*table.Column, exec func(rows [][]types
rows := make([][]types.Datum, 0, len(e.Lists))
for i, list := range e.Lists {
e.rowCount++
row, err := e.evalRow(cols, list, i)
row, err := e.evalRow(list, i)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -189,7 +210,7 @@ func (e *InsertValues) handleErr(col *table.Column, val *types.Datum, rowIdx int

// evalRow evaluates a to-be-inserted row. The value of the column may base on another column,
// so we use setValueForRefColumn to fill the empty row some default values when needFillDefaultValues is true.
func (e *InsertValues) evalRow(cols []*table.Column, list []expression.Expression, rowIdx int) ([]types.Datum, error) {
func (e *InsertValues) evalRow(list []expression.Expression, rowIdx int) ([]types.Datum, error) {
rowLen := len(e.Table.Cols())
if e.hasExtraHandle {
rowLen++
Expand All @@ -204,18 +225,20 @@ func (e *InsertValues) evalRow(cols []*table.Column, list []expression.Expressio
}
}

e.evalBuffer.SetDatums(row...)
for i, expr := range list {
val, err := expr.Eval(chunk.MutRowFromDatums(row).ToRow())
if err = e.handleErr(cols[i], &val, rowIdx, err); err != nil {
val, err := expr.Eval(e.evalBuffer.ToRow())
if err = e.handleErr(e.insertColumns[i], &val, rowIdx, err); err != nil {
return nil, errors.Trace(err)
}
val1, err := table.CastValue(e.ctx, val, cols[i].ToInfo())
if err = e.handleErr(cols[i], &val, rowIdx, err); err != nil {
val1, err := table.CastValue(e.ctx, val, e.insertColumns[i].ToInfo())
if err = e.handleErr(e.insertColumns[i], &val, rowIdx, err); err != nil {
return nil, errors.Trace(err)
}

offset := cols[i].Offset
row[offset], hasValue[offset] = val1, true
offset := e.insertColumns[i].Offset
row[offset], hasValue[offset] = *val1.Copy(), true
e.evalBuffer.SetDatum(offset, val1)
}

return e.fillRow(row, hasValue)
Expand Down Expand Up @@ -251,7 +274,7 @@ func (e *InsertValues) setValueForRefColumn(row []types.Datum, hasValue []bool)
return nil
}

func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.Column, exec func(rows [][]types.Datum) error) error {
func (e *InsertValues) insertRowsFromSelect(ctx context.Context, exec func(rows [][]types.Datum) error) error {
// process `insert|replace into ... select ... from ...`
selectExec := e.children[0]
fields := selectExec.retTypes()
Expand All @@ -275,7 +298,7 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.C
for innerChunkRow := iter.Begin(); innerChunkRow != iter.End(); innerChunkRow = iter.Next() {
innerRow := types.CopyRow(innerChunkRow.GetDatumRow(fields))
e.rowCount++
row, err := e.getRow(cols, innerRow)
row, err := e.getRow(innerRow)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -305,16 +328,16 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.C
// getRow gets the row which from `insert into select from` or `load data`.
// The input values from these two statements are datums instead of
// expressions which are used in `insert into set x=y`.
func (e *InsertValues) getRow(cols []*table.Column, vals []types.Datum) ([]types.Datum, error) {
func (e *InsertValues) getRow(vals []types.Datum) ([]types.Datum, error) {
row := make([]types.Datum, len(e.Table.Cols()))
hasValue := make([]bool, len(e.Table.Cols()))
for i, v := range vals {
casted, err := table.CastValue(e.ctx, v, cols[i].ToInfo())
casted, err := table.CastValue(e.ctx, v, e.insertColumns[i].ToInfo())
if e.filterErr(err) != nil {
return nil, errors.Trace(err)
}

offset := cols[i].Offset
offset := e.insertColumns[i].Offset
row[offset] = casted
hasValue[offset] = true
}
Expand Down
7 changes: 4 additions & 3 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func NewLoadDataInfo(ctx sessionctx.Context, row []types.Datum, tbl table.Table,
InsertValues: insertVal,
Table: tbl,
Ctx: ctx,
columns: cols,
}
}

Expand Down Expand Up @@ -81,6 +80,9 @@ func (e *LoadDataExec) Close() error {

// Open implements the Executor Open interface.
func (e *LoadDataExec) Open(ctx context.Context) error {
if e.loadDataInfo.insertColumns != nil {
e.loadDataInfo.initEvalBuffer()
}
return nil
}

Expand All @@ -95,7 +97,6 @@ type LoadDataInfo struct {
LinesInfo *ast.LinesClause
IgnoreLines uint64
Ctx sessionctx.Context
columns []*table.Column
}

// SetMaxRowsInBatch sets the max number of rows to insert in a batch.
Expand Down Expand Up @@ -274,7 +275,7 @@ func (e *LoadDataInfo) colsToRow(cols []field) []types.Datum {
e.row[i].SetString(string(cols[i].str))
}
}
row, err := e.getRow(e.columns, e.row)
row, err := e.getRow(e.row)
if err != nil {
e.handleWarning(err,
fmt.Sprintf("Load Data: insert data:%v failed:%v", e.row, errors.ErrorStack(err)))
Expand Down
10 changes: 3 additions & 7 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (e *ReplaceExec) Open(ctx context.Context) error {
if e.SelectExec != nil {
return e.SelectExec.Open(ctx)
}
e.initEvalBuffer()
return nil
}

Expand Down Expand Up @@ -178,13 +179,8 @@ func (e *ReplaceExec) exec(newRows [][]types.Datum) error {
// Next implements the Executor Next interface.
func (e *ReplaceExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
cols, err := e.getColumns(e.Table.Cols())
if err != nil {
return errors.Trace(err)
}

if len(e.children) > 0 && e.children[0] != nil {
return errors.Trace(e.insertRowsFromSelect(ctx, cols, e.exec))
return e.insertRowsFromSelect(ctx, e.exec)
}
return errors.Trace(e.insertRows(cols, e.exec))
return e.insertRows(e.exec)
}
10 changes: 7 additions & 3 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type UpdateExec struct {
// columns2Handle stores relationship between column ordinal to its table handle.
// the columns ordinals is present in ordinal range format, @see executor.cols2Handle
columns2Handle cols2HandleSlice
evalBuffer chunk.MutRow
}

func (e *UpdateExec) exec(schema *expression.Schema) ([]types.Datum, error) {
Expand Down Expand Up @@ -141,6 +142,7 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error {
fields := e.children[0].retTypes()
globalRowIdx := 0
chk := e.children[0].newFirstChunk()
e.evalBuffer = chunk.MutRowFromTypes(fields)
for {
err := e.children[0].Next(ctx, chk)
if err != nil {
Expand Down Expand Up @@ -181,17 +183,19 @@ func (e *UpdateExec) handleErr(colName model.CIStr, rowIdx int, err error) error

func (e *UpdateExec) composeNewRow(rowIdx int, oldRow []types.Datum) ([]types.Datum, error) {
newRowData := types.CopyRow(oldRow)
e.evalBuffer.SetDatums(newRowData...)
for _, assign := range e.OrderedList {
handleIdx, handleFound := e.columns2Handle.findHandle(int32(assign.Col.Index))
if handleFound && e.canNotUpdate(oldRow[handleIdx]) {
continue
}
val, err := assign.Expr.Eval(chunk.MutRowFromDatums(newRowData).ToRow())
val, err := assign.Expr.Eval(e.evalBuffer.ToRow())

if err1 := e.handleErr(assign.Col.ColName, rowIdx, err); err1 != nil {
return nil, errors.Trace(err1)
return nil, err1
}
newRowData[assign.Col.Index] = val
newRowData[assign.Col.Index] = *val.Copy()
e.evalBuffer.SetDatum(assign.Col.Index, val)
}
return newRowData, nil
}
Expand Down