Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[perf] improve delete from table without where & limit #13120

Merged
merged 25 commits into from
Dec 10, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0ba1555
[perf] optimizing unconditional delete through truncate
noorall Nov 30, 2023
688bee7
[fix] the issue of being unable to delete special tables
noorall Nov 30, 2023
682d31f
[mod] remove useless codes
noorall Nov 30, 2023
3855d65
[refactor] truncate table operation
noorall Nov 30, 2023
05f9f2f
[refactor] move truncate to deletion operator
noorall Dec 1, 2023
9e13b11
[mod] revert dml.go
noorall Dec 1, 2023
9fcc82a
[mod] add License
noorall Dec 1, 2023
4096cda
[mod] remove useless code
noorall Dec 3, 2023
2fca1dd
Merge branch 'main' into teach-12845
noorall Dec 3, 2023
f761fc4
Revert "[mod] remove useless code"
noorall Dec 5, 2023
8d70fb7
Revert "[mod] add License"
noorall Dec 5, 2023
873ffdf
Revert "[mod] revert dml.go"
noorall Dec 5, 2023
5038a6c
Revert "[refactor] move truncate to deletion operator"
noorall Dec 5, 2023
5751f4b
Revert "[refactor] truncate table operation"
noorall Dec 5, 2023
6937cec
Revert "[mod] remove useless codes"
noorall Dec 5, 2023
bd06f24
Revert "[fix] the issue of being unable to delete special tables"
noorall Dec 5, 2023
46b6731
Revert "[perf] optimizing unconditional delete through truncate"
noorall Dec 5, 2023
42eae4a
[perf] optimizing unconditional delete through truncate
noorall Dec 5, 2023
8a2348c
[fix] explain error case by delete by truncate node
noorall Dec 5, 2023
bb6e9c6
[mod] remove delete by truncate node
noorall Dec 7, 2023
5199899
[mod] add truncate table for deleteCtx
noorall Dec 7, 2023
988f324
[mod] remove useless codes
noorall Dec 7, 2023
50b6a6b
[mod] add judgments for truncate type
noorall Dec 7, 2023
b5198b3
Merge branch 'main' into teach-12845
ouyuanning Dec 7, 2023
e3703f6
Merge branch 'main' into teach-12845
mergify[bot] Dec 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 37 additions & 1 deletion pkg/sql/colexec/deletion/deletion.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package deletion

import (
"bytes"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"sync/atomic"

"github.com/matrixorigin/matrixone/pkg/catalog"
Expand Down Expand Up @@ -160,6 +162,41 @@ func (arg *Argument) remote_delete(proc *process.Process) (vm.CallResult, error)
}

func (arg *Argument) normal_delete(proc *process.Process) (vm.CallResult, error) {
delCtx := arg.DeleteCtx

if arg.DeleteCtx.CanTruncate {
dbName := delCtx.Ref.SchemaName
tableName := delCtx.Ref.ObjName
tableId := uint64(delCtx.Ref.Obj)
result := vm.NewCallResult()
affectRows, err := delCtx.Source.Rows(proc.Ctx)
eng := proc.Ctx.Value(defines.EngineKey{}).(engine.Engine)
if err != nil {
return result, err
}

err = TruncateTable(
proc.Ctx,
eng,
proc,
dbName,
tableName,
tableId,
delCtx.PartitionTableNames,
delCtx.IndexTableNames,
delCtx.ForeignTbl,
true,
)

if err != nil {
return result, err
}
if delCtx.AddAffectedRows {
atomic.AddUint64(&arg.affectedRows, uint64(affectRows))
}
return result, nil
}

result, err := arg.children[0].Call(proc)
if err != nil {
return result, err
Expand All @@ -170,7 +207,6 @@ func (arg *Argument) normal_delete(proc *process.Process) (vm.CallResult, error)
bat := result.Batch

var affectedRows uint64
delCtx := arg.DeleteCtx

if len(delCtx.PartitionTableIDs) > 0 {
delBatches, err := colexec.GroupByPartitionForDelete(proc, bat, delCtx.RowIdIdx, delCtx.PartitionIndexInBatch,
Expand Down
8 changes: 5 additions & 3 deletions pkg/sql/colexec/deletion/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ func (arg *Argument) AppendChild(child vm.Operator) {

type DeleteCtx struct {
CanTruncate bool
RowIdIdx int // The array index position of the rowid column
PartitionTableIDs []uint64 // Align array index with the partition number
PartitionTableNames []string // Align array index with the partition number
RowIdIdx int // The array index position of the rowid column
PartitionTableIDs []uint64 // Align array index with the partition number
PartitionTableNames []string // Align array index with the partition number
IndexTableNames []string
ForeignTbl []uint64
PartitionIndexInBatch int // The array index position of the partition expression column
PartitionSources []engine.Relation // Align array index with the partition number
Source engine.Relation
Expand Down
208 changes: 208 additions & 0 deletions pkg/sql/colexec/deletion/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deletion

import (
"context"
"fmt"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
moruntime "github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/incrservice"
"github.com/matrixorigin/matrixone/pkg/pb/lock"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/lockop"
"github.com/matrixorigin/matrixone/pkg/util/executor"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

func runSql(proc *process.Process, dbName string, sql string) error {
if sql == "" {
return nil
}
v, ok := moruntime.ProcessLevelRuntime().GetGlobalVariables(moruntime.InternalSQLExecutor)
if !ok {
panic("missing lock service")
}
exec := v.(executor.SQLExecutor)
opts := executor.Options{}.
// All runSql and runSqlWithResult is a part of input sql, can not incr statement.
// All these sub-sql's need to be rolled back and retried en masse when they conflict in pessimistic mode
WithDisableIncrStatement().
WithTxn(proc.TxnOperator).
WithDatabase(dbName).
WithTimeZone(proc.SessionInfo.TimeZone)
res, err := exec.Exec(proc.Ctx, sql, opts)
if err != nil {
return err
}
res.Close()
return nil
}

func TruncateTable(
ctx context.Context,
eng engine.Engine,
proc *process.Process,
dbName string,
tableName string,
tableId uint64,
partitionTableNames []string,
indexTableNames []string,
foreignTbl []uint64,
keepAutoIncrement bool,
) error {
var dbSource engine.Database
var rel engine.Relation
var err error
var isTemp bool
var newTblId uint64
dbSource, err = eng.Database(ctx, dbName, proc.TxnOperator)
if err != nil {
return err
}

if rel, err = dbSource.Relation(ctx, tableName, nil); err != nil {
var e error // avoid contamination of error messages
dbSource, e = eng.Database(ctx, defines.TEMPORARY_DBNAME, proc.TxnOperator)
if e != nil {
return err
}
rel, e = dbSource.Relation(ctx, engine.GetTempTableName(dbName, tableName), nil)
if e != nil {
return err
}
isTemp = true
}

if !isTemp && proc.TxnOperator.Txn().IsPessimistic() {
var err error
if e := lockop.LockMoTable(ctx, eng, proc, dbName, tableName, lock.LockMode_Shared); e != nil {
if !moerr.IsMoErrCode(e, moerr.ErrTxnNeedRetry) &&
!moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetryWithDefChanged) {
return e
}
err = e
}
// before dropping table, lock it.
if e := lockop.LockTable(ctx, eng, proc, rel, dbName, partitionTableNames, false); e != nil {
if !moerr.IsMoErrCode(e, moerr.ErrTxnNeedRetry) &&
!moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetryWithDefChanged) {
return e
}
err = e
}
if err != nil {
return err
}
}

if isTemp {
// memory engine truncate always return 0, so for temporary table, just use origin tableId as newTblId
_, err = dbSource.Truncate(ctx, engine.GetTempTableName(dbName, tableName))
newTblId = rel.GetTableID(ctx)
} else {
newTblId, err = dbSource.Truncate(ctx, tableName)
}

if err != nil {
return err
}

// Truncate Index Tables if needed
for _, name := range indexTableNames {
var err error
if isTemp {
_, err = dbSource.Truncate(ctx, engine.GetTempTableName(dbName, name))
} else {
_, err = dbSource.Truncate(ctx, name)
}
if err != nil {
return err
}
}

//Truncate Partition subTable if needed
for _, name := range partitionTableNames {
var err error
if isTemp {
_, err = dbSource.Truncate(ctx, engine.GetTempTableName(dbName, name))
} else {
_, err = dbSource.Truncate(ctx, name)
}
if err != nil {
return err
}
}

// update tableDef of foreign key's table with new table id
for _, fTblId := range foreignTbl {
_, _, fkRelation, err := eng.GetRelationById(ctx, proc.TxnOperator, fTblId)
if err != nil {
return err
}
fkTableDef, err := fkRelation.TableDefs(ctx)
if err != nil {
return err
}
var oldCt *engine.ConstraintDef
for _, def := range fkTableDef {
if ct, ok := def.(*engine.ConstraintDef); ok {
oldCt = ct
break
}
}
for _, ct := range oldCt.Cts {
if def, ok := ct.(*engine.RefChildTableDef); ok {
for idx, refTable := range def.Tables {
if refTable == tableId {
def.Tables[idx] = newTblId
break
}
}
break
}
}
if err != nil {
return err
}
err = fkRelation.UpdateConstraint(ctx, oldCt)
if err != nil {
return err
}

}

if isTemp {
tableId = rel.GetTableID(ctx)
}
err = incrservice.GetAutoIncrementService(ctx).Reset(
ctx,
tableId,
newTblId,
keepAutoIncrement,
proc.TxnOperator)
if err != nil {
return err
}

// update index information in mo_catalog.mo_indexes
updateSql := fmt.Sprintf(`update mo_catalog.mo_indexes set table_id = %v where table_id = %v`, newTblId, tableId)
err = runSql(proc, dbName, updateSql)
if err != nil {
return err
}
return nil
}
8 changes: 4 additions & 4 deletions pkg/sql/colexec/lockop/lock_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,9 @@ func performLock(
return nil
}

// LockTable lock table, all rows in the table will be locked, and wait current txn
// lockTable lock table, all rows in the table will be locked, and wait current txn
// closed.
func LockTable(
func lockTable(
eng engine.Engine,
proc *process.Process,
tableID uint64,
Expand Down Expand Up @@ -629,7 +629,7 @@ func (arg *Argument) AddLockTargetWithMode(
return arg
}

// LockTable lock all table, used for delete, truncate and drop table
// lockTable lock all table, used for delete, truncate and drop table
func (arg *Argument) LockTable(
tableID uint64,
changeDef bool) *Argument {
Expand All @@ -639,7 +639,7 @@ func (arg *Argument) LockTable(
changeDef)
}

// LockTableWithMode is similar to LockTable, but with specify
// LockTableWithMode is similar to lockTable, but with specify
// lock mode
func (arg *Argument) LockTableWithMode(
tableID uint64,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/lockop/lock_op_no_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var (
internalProcesses = map[string]*process.Process{}
)

// LockTableWithUniqueID is similar to LockTable, but used to lock a table directly based on
// LockTableWithUniqueID is similar to lockTable, but used to lock a table directly based on
// a unique identifier, without using an external transaction.
func LockTableWithUniqueID(
ctx context.Context,
Expand Down