Skip to content

Commit

Permalink
[perf] optimizing unconditional delete through truncate
Browse files Browse the repository at this point in the history
  • Loading branch information
noorall committed Nov 30, 2023
1 parent 3458fee commit 0ba1555
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 39 deletions.
8 changes: 5 additions & 3 deletions pkg/sql/colexec/deletion/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ func (arg *Argument) AppendChild(child vm.Operator) {

type DeleteCtx struct {
CanTruncate bool
RowIdIdx int // The array index position of the rowid column
PartitionTableIDs []uint64 // Align array index with the partition number
PartitionTableNames []string // Align array index with the partition number
RowIdIdx int // The array index position of the rowid column
PartitionTableIDs []uint64 // Align array index with the partition number
PartitionTableNames []string // Align array index with the partition number
IndexTableNames []string
ForeignTbl []uint64
PartitionIndexInBatch int // The array index position of the partition expression column
PartitionSources []engine.Relation // Align array index with the partition number
Source engine.Relation
Expand Down
30 changes: 30 additions & 0 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,23 @@ func (c *Compile) compilePlanScope(ctx context.Context, step int32, curNodeIdx i
return c.compileSort(n, c.compileUnionAll(left, right)), nil
case plan.Node_DELETE:
c.appendMetaTables(n.DeleteCtx.Ref)
if n.DeleteCtx.CanTruncate {
arg, err := constructDeletion(n, c.e, c.proc)
if err != nil {
return nil, err
}
ss := []*Scope{{
Magic: Deletion,
Plan: c.pn,
Instructions: vm.Instructions{
{
Op: vm.Deletion,
Arg: arg,
},
},
}}
return ss, nil
}
curr := c.anal.curr
c.setAnalyzeCurrent(nil, int(n.Children[0]))

Expand Down Expand Up @@ -2794,6 +2811,19 @@ func (c *Compile) newDeleteMergeScope(arg *deletion.Argument, ss []*Scope) *Scop
return c.newMergeScope(rs)
}

func (c *Compile) newDeleteScope(arg *deletion.Argument, ss []*Scope) []*Scope {
deleteIns := &vm.Instruction{
Op: vm.Deletion,
Arg: arg,
}
for i := range ss {
ss[i].Instructions = append(
ss[i].Instructions,
dupInstruction(deleteIns, nil, 0))
}
return ss
}

func (c *Compile) newMergeScope(ss []*Scope) *Scope {
rs := &Scope{
PreScopes: ss,
Expand Down
144 changes: 131 additions & 13 deletions pkg/sql/compile/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,157 @@
package compile

import (
"fmt"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/incrservice"
"github.com/matrixorigin/matrixone/pkg/pb/lock"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/deletion"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/insert"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
)

func (s *Scope) Delete(c *Compile) (uint64, error) {
s.Magic = Merge
arg := s.Instructions[len(s.Instructions)-1].Arg.(*deletion.Argument)

if arg.DeleteCtx.CanTruncate {
var dbSource engine.Database
var rel engine.Relation
var err error
var affectRows int64
var isTemp bool
var newId uint64

delCtx := arg.DeleteCtx

err = delCtx.Source.UpdateObjectInfos(c.ctx)
dbName := delCtx.Ref.SchemaName
tblName := delCtx.Ref.ObjName
oldId := uint64(delCtx.Ref.Obj)
affectRows, err := delCtx.Source.Rows(c.ctx)

if err != nil {
return 0, err
}
affectRow, err := delCtx.Source.Rows(s.Proc.Ctx)

dbSource, err = c.e.Database(c.ctx, dbName, c.proc.TxnOperator)
if err != nil {
return 0, err
}
affectRows = affectRows + affectRow

dbName := delCtx.Ref.SchemaName
tblName := delCtx.Ref.ObjName
oldId := uint64(delCtx.Ref.Obj)
dbSource, err := c.e.Database(c.ctx, dbName, c.proc.TxnOperator)
if err != nil {
return 0, err
if rel, err = dbSource.Relation(c.ctx, tblName, nil); err != nil {
var e error // avoid contamination of error messages
dbSource, e = c.e.Database(c.ctx, defines.TEMPORARY_DBNAME, c.proc.TxnOperator)
if e != nil {
return 0, err
}
rel, e = dbSource.Relation(c.ctx, engine.GetTempTableName(dbName, tblName), nil)
if e != nil {
return 0, err
}
isTemp = true
}

if !isTemp && c.proc.TxnOperator.Txn().IsPessimistic() {
var err error
if e := lockMoTable(c, dbName, tblName, lock.LockMode_Shared); e != nil {
if !moerr.IsMoErrCode(e, moerr.ErrTxnNeedRetry) &&
!moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetryWithDefChanged) {
return 0, e
}
err = e
}
// before dropping table, lock it.
if e := lockTable(c.ctx, c.e, c.proc, rel, dbName, delCtx.PartitionTableNames, false); e != nil {
if !moerr.IsMoErrCode(e, moerr.ErrTxnNeedRetry) &&
!moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetryWithDefChanged) {
return 0, e
}
err = e
}
if err != nil {
return 0, err
}
}

if isTemp {
// memoryengine truncate always return 0, so for temporary table, just use origin tableId as newId
_, err = dbSource.Truncate(c.ctx, engine.GetTempTableName(dbName, tblName))
newId = rel.GetTableID(c.ctx)
} else {
newId, err = dbSource.Truncate(c.ctx, tblName)
}

// truncate origin table
newId, err := dbSource.Truncate(c.ctx, tblName)
if err != nil {
return 0, err
}

// keep old offset.
// Truncate Index Tables if needed
for _, name := range delCtx.IndexTableNames {
var err error
if isTemp {
_, err = dbSource.Truncate(c.ctx, engine.GetTempTableName(dbName, name))
} else {
_, err = dbSource.Truncate(c.ctx, name)
}
if err != nil {
return 0, err
}
}

//Truncate Partition subtable if needed
for _, name := range delCtx.PartitionTableNames {
var err error
if isTemp {
dbSource.Truncate(c.ctx, engine.GetTempTableName(dbName, name))
} else {
_, err = dbSource.Truncate(c.ctx, name)
}
if err != nil {
return 0, err
}
}

// update tableDef of foreign key's table with new table id
for _, fTblId := range delCtx.ForeignTbl {
_, _, fkRelation, err := c.e.GetRelationById(c.ctx, c.proc.TxnOperator, fTblId)
if err != nil {
return 0, err
}
fkTableDef, err := fkRelation.TableDefs(c.ctx)
if err != nil {
return 0, err
}
var oldCt *engine.ConstraintDef
for _, def := range fkTableDef {
if ct, ok := def.(*engine.ConstraintDef); ok {
oldCt = ct
break
}
}
for _, ct := range oldCt.Cts {
if def, ok := ct.(*engine.RefChildTableDef); ok {
for idx, refTable := range def.Tables {
if refTable == oldId {
def.Tables[idx] = newId
break
}
}
break
}
}
if err != nil {
return 0, err
}
err = fkRelation.UpdateConstraint(c.ctx, oldCt)
if err != nil {
return 0, err
}

}

if isTemp {
oldId = rel.GetTableID(c.ctx)
}
err = incrservice.GetAutoIncrementService(c.ctx).Reset(
c.ctx,
oldId,
Expand All @@ -64,6 +176,12 @@ func (s *Scope) Delete(c *Compile) (uint64, error) {
return 0, err
}

// update index information in mo_catalog.mo_indexes
updateSql := fmt.Sprintf(updateMoIndexesTruncateTableFormat, newId, oldId)
err = c.runSql(updateSql)
if err != nil {
return 0, err
}
return uint64(affectRows), nil
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/compile/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,23 @@ func constructDeletion(n *plan.Node, eg engine.Engine, proc *process.Process) (*
}
}

if delCtx.CanTruncate {
tableDef := delCtx.Source.GetTableDef(proc.Ctx)
delCtx.IndexTableNames = make([]string, 0)
if tableDef.Indexes != nil {
for _, indexDef := range tableDef.Indexes {
if indexDef.TableExist {
delCtx.IndexTableNames = append(delCtx.IndexTableNames, indexDef.IndexTableName)
}
}
}
if tableDef.Fkeys != nil {
for _, fk := range tableDef.Fkeys {
delCtx.ForeignTbl = append(delCtx.ForeignTbl, fk.ForeignTbl)
}
}
}

return &deletion.Argument{
DeleteCtx: delCtx,
}, nil
Expand Down
39 changes: 16 additions & 23 deletions pkg/sql/plan/build_dml_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ func buildDeletePlans(ctx CompilerContext, builder *QueryBuilder, bindCtx *BindC
// both UK and SK. To handle SK case, we will have flags to indicate if it's UK or SK.
hasUniqueKey := haveUniqueKey(delCtx.tableDef)
hasSecondaryKey := haveSecondaryKey(delCtx.tableDef)
if hasUniqueKey || hasSecondaryKey {
if (hasUniqueKey || hasSecondaryKey) && !delCtx.isDeleteWithoutFilters {
uniqueDeleteIdx := len(delCtx.tableDef.Cols) + delCtx.updateColLength
typMap := make(map[string]*plan.Type)
posMap := make(map[string]int)
for idx, col := range delCtx.tableDef.Cols {
Expand Down Expand Up @@ -302,26 +303,17 @@ func buildDeletePlans(ctx CompilerContext, builder *QueryBuilder, bindCtx *BindC
if uniqueTableDef == nil {
return moerr.NewNoSuchTable(builder.GetContext(), delCtx.objRef.SchemaName, indexdef.IndexTableName)
}
var lastNodeId int32
var err error
var uniqueDeleteIdx int
var uniqueTblPkPos int
var uniqueTblPkTyp *Type

if delCtx.isDeleteWithoutFilters {
lastNodeId, err = appendDeleteUniqueTablePlanWithoutFilters(builder, bindCtx, uniqueObjRef, uniqueTableDef)
uniqueDeleteIdx = getRowIdPos(uniqueTableDef)
uniqueTblPkPos, uniqueTblPkTyp = getPkPos(uniqueTableDef, false)
} else {
lastNodeId = appendSinkScanNode(builder, bindCtx, delCtx.sourceStep)
lastNodeId, err = appendDeleteUniqueTablePlan(builder, bindCtx, uniqueObjRef, uniqueTableDef, indexdef, typMap, posMap, lastNodeId, isUk)
uniqueDeleteIdx = len(delCtx.tableDef.Cols) + delCtx.updateColLength
uniqueTblPkPos = uniqueDeleteIdx + 1
uniqueTblPkTyp = uniqueTableDef.Cols[0].Typ
}
lastNodeId := appendSinkScanNode(builder, bindCtx, delCtx.sourceStep)

lastNodeId, err := appendDeleteUniqueTablePlan(builder, bindCtx, uniqueObjRef, uniqueTableDef, indexdef, typMap, posMap, lastNodeId, isUk)

if err != nil {
return err
}

uniqueTblPkPos := uniqueDeleteIdx + 1
uniqueTblPkTyp := uniqueTableDef.Cols[0].Typ

if isUpdate {
// do it like simple update
lastNodeId = appendSinkNode(builder, bindCtx, lastNodeId)
Expand All @@ -331,7 +323,7 @@ func buildDeletePlans(ctx CompilerContext, builder *QueryBuilder, bindCtx *BindC
//sink_scan -> lock -> delete
lastNodeId = appendSinkScanNode(builder, bindCtx, newSourceStep)
delNodeInfo := makeDeleteNodeInfo(builder.compCtx, uniqueObjRef, uniqueTableDef, uniqueDeleteIdx, -1, false, uniqueTblPkPos, uniqueTblPkTyp, delCtx.lockTable, delCtx.partitionInfos)
lastNodeId, err = makeOneDeletePlan(builder, bindCtx, lastNodeId, delNodeInfo, isUk, isSK)
lastNodeId, err = makeOneDeletePlan(builder, bindCtx, lastNodeId, delNodeInfo, isUk, isSK, false)
putDeleteNodeInfo(delNodeInfo)
if err != nil {
return err
Expand Down Expand Up @@ -381,7 +373,7 @@ func buildDeletePlans(ctx CompilerContext, builder *QueryBuilder, bindCtx *BindC
} else {
// it's more simple for delete hidden unique table .so we append nodes after the plan. not recursive call buildDeletePlans
delNodeInfo := makeDeleteNodeInfo(builder.compCtx, uniqueObjRef, uniqueTableDef, uniqueDeleteIdx, -1, false, uniqueTblPkPos, uniqueTblPkTyp, delCtx.lockTable, delCtx.partitionInfos)
lastNodeId, err = makeOneDeletePlan(builder, bindCtx, lastNodeId, delNodeInfo, isUk, isSK)
lastNodeId, err = makeOneDeletePlan(builder, bindCtx, lastNodeId, delNodeInfo, isUk, isSK, delCtx.isDeleteWithoutFilters)
putDeleteNodeInfo(delNodeInfo)
if err != nil {
return err
Expand All @@ -401,7 +393,7 @@ func buildDeletePlans(ctx CompilerContext, builder *QueryBuilder, bindCtx *BindC
}
pkPos, pkTyp := getPkPos(delCtx.tableDef, false)
delNodeInfo := makeDeleteNodeInfo(ctx, delCtx.objRef, delCtx.tableDef, delCtx.rowIdPos, partExprIdx, true, pkPos, pkTyp, delCtx.lockTable, delCtx.partitionInfos)
lastNodeId, err := makeOneDeletePlan(builder, bindCtx, lastNodeId, delNodeInfo, false, false)
lastNodeId, err := makeOneDeletePlan(builder, bindCtx, lastNodeId, delNodeInfo, false, false, delCtx.isDeleteWithoutFilters)
putDeleteNodeInfo(delNodeInfo)
if err != nil {
return err
Expand Down Expand Up @@ -1488,8 +1480,9 @@ func makeOneDeletePlan(
bindCtx *BindContext,
lastNodeId int32,
delNodeInfo *deleteNodeInfo,
isUK bool, // is delete unique key hidden table
isUK bool,
isSK bool,
canTruncate bool,
) (int32, error) {
if isUK || isSK {
// append lock
Expand Down Expand Up @@ -1521,7 +1514,7 @@ func makeOneDeletePlan(
DeleteCtx: &plan.DeleteCtx{
RowIdIdx: int32(delNodeInfo.deleteIndex),
Ref: delNodeInfo.objRef,
CanTruncate: false,
CanTruncate: canTruncate,
AddAffectedRows: delNodeInfo.addAffectedRows,
IsClusterTable: delNodeInfo.IsClusterTable,
PartitionTableIds: delNodeInfo.partTableIDs,
Expand Down

0 comments on commit 0ba1555

Please sign in to comment.