Skip to content

Commit

Permalink
[refactor] truncate table operation
Browse files Browse the repository at this point in the history
  • Loading branch information
noorall committed Nov 30, 2023
1 parent 682d31f commit 3855d65
Showing 1 changed file with 46 additions and 30 deletions.
76 changes: 46 additions & 30 deletions pkg/sql/compile/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,31 +1238,33 @@ func (s *Scope) removeRefChildTbl(c *Compile, fkTblId uint64, tblId uint64) erro
return fkRelation.UpdateConstraint(c.ctx, oldCt)
}

// Truncation operations cannot be performed if the session holds an active table lock.
func (s *Scope) TruncateTable(c *Compile) error {
func doTruncateTable(
c *Compile,
dbName string,
tableName string,
tableId uint64,
partitionTableNames []string,
indexTableNames []string,
foreignTbl []uint64,
keepAutoIncrement bool,
) error {
var dbSource engine.Database
var rel engine.Relation
var err error
var isTemp bool
var newId uint64

tqry := s.Plan.GetDdl().GetTruncateTable()
dbName := tqry.GetDatabase()
tblName := tqry.GetTable()
oldId := tqry.GetTableId()

var newTblId uint64
dbSource, err = c.e.Database(c.ctx, dbName, c.proc.TxnOperator)
if err != nil {
return err
}

if rel, err = dbSource.Relation(c.ctx, tblName, nil); err != nil {
if rel, err = dbSource.Relation(c.ctx, tableName, nil); err != nil {
var e error // avoid contamination of error messages
dbSource, e = c.e.Database(c.ctx, defines.TEMPORARY_DBNAME, c.proc.TxnOperator)
if e != nil {
return err
}
rel, e = dbSource.Relation(c.ctx, engine.GetTempTableName(dbName, tblName), nil)
rel, e = dbSource.Relation(c.ctx, engine.GetTempTableName(dbName, tableName), nil)
if e != nil {
return err
}
Expand All @@ -1271,15 +1273,15 @@ func (s *Scope) TruncateTable(c *Compile) error {

if !isTemp && c.proc.TxnOperator.Txn().IsPessimistic() {
var err error
if e := lockMoTable(c, dbName, tblName, lock.LockMode_Shared); e != nil {
if e := lockMoTable(c, dbName, tableName, lock.LockMode_Shared); e != nil {
if !moerr.IsMoErrCode(e, moerr.ErrTxnNeedRetry) &&
!moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetryWithDefChanged) {
return e
}
err = e
}
// before dropping table, lock it.
if e := lockTable(c.ctx, c.e, c.proc, rel, dbName, tqry.PartitionTableNames, false); e != nil {
if e := lockTable(c.ctx, c.e, c.proc, rel, dbName, partitionTableNames, false); e != nil {
if !moerr.IsMoErrCode(e, moerr.ErrTxnNeedRetry) &&
!moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetryWithDefChanged) {
return e
Expand All @@ -1292,19 +1294,19 @@ func (s *Scope) TruncateTable(c *Compile) error {
}

if isTemp {
// memoryengine truncate always return 0, so for temporary table, just use origin tableId as newId
_, err = dbSource.Truncate(c.ctx, engine.GetTempTableName(dbName, tblName))
newId = rel.GetTableID(c.ctx)
// memory engine truncate always return 0, so for temporary table, just use origin tableId as newTblId
_, err = dbSource.Truncate(c.ctx, engine.GetTempTableName(dbName, tableName))
newTblId = rel.GetTableID(c.ctx)
} else {
newId, err = dbSource.Truncate(c.ctx, tblName)
newTblId, err = dbSource.Truncate(c.ctx, tableName)
}

if err != nil {
return err
}

// Truncate Index Tables if needed
for _, name := range tqry.IndexTableNames {
for _, name := range indexTableNames {
var err error
if isTemp {
_, err = dbSource.Truncate(c.ctx, engine.GetTempTableName(dbName, name))
Expand All @@ -1316,11 +1318,11 @@ func (s *Scope) TruncateTable(c *Compile) error {
}
}

//Truncate Partition subtable if needed
for _, name := range tqry.PartitionTableNames {
//Truncate Partition subTable if needed
for _, name := range partitionTableNames {
var err error
if isTemp {
dbSource.Truncate(c.ctx, engine.GetTempTableName(dbName, name))
_, err = dbSource.Truncate(c.ctx, engine.GetTempTableName(dbName, name))
} else {
_, err = dbSource.Truncate(c.ctx, name)
}
Expand All @@ -1330,8 +1332,8 @@ func (s *Scope) TruncateTable(c *Compile) error {
}

// update tableDef of foreign key's table with new table id
for _, ftblId := range tqry.ForeignTbl {
_, _, fkRelation, err := c.e.GetRelationById(c.ctx, c.proc.TxnOperator, ftblId)
for _, fTblId := range foreignTbl {
_, _, fkRelation, err := c.e.GetRelationById(c.ctx, c.proc.TxnOperator, fTblId)
if err != nil {
return err
}
Expand All @@ -1349,8 +1351,8 @@ func (s *Scope) TruncateTable(c *Compile) error {
for _, ct := range oldCt.Cts {
if def, ok := ct.(*engine.RefChildTableDef); ok {
for idx, refTable := range def.Tables {
if refTable == oldId {
def.Tables[idx] = newId
if refTable == tableId {
def.Tables[idx] = newTblId
break
}
}
Expand All @@ -1368,27 +1370,41 @@ func (s *Scope) TruncateTable(c *Compile) error {
}

if isTemp {
oldId = rel.GetTableID(c.ctx)
tableId = rel.GetTableID(c.ctx)
}
err = incrservice.GetAutoIncrementService(c.ctx).Reset(
c.ctx,
oldId,
newId,
false,
tableId,
newTblId,
keepAutoIncrement,
c.proc.TxnOperator)
if err != nil {
return err
}

// update index information in mo_catalog.mo_indexes
updateSql := fmt.Sprintf(updateMoIndexesTruncateTableFormat, newId, oldId)
updateSql := fmt.Sprintf(updateMoIndexesTruncateTableFormat, newTblId, tableId)
err = c.runSql(updateSql)
if err != nil {
return err
}
return nil
}

// TruncateTable Truncation operations cannot be performed if the session holds an active table lock.
func (s *Scope) TruncateTable(c *Compile) error {
truncateTbl := s.Plan.GetDdl().GetTruncateTable()
return doTruncateTable(
c,
truncateTbl.GetDatabase(),
truncateTbl.GetTable(),
truncateTbl.GetTableId(),
truncateTbl.PartitionTableNames,
truncateTbl.IndexTableNames,
truncateTbl.ForeignTbl,
false)
}

func (s *Scope) DropSequence(c *Compile) error {
qry := s.Plan.GetDdl().GetDropSequence()
dbName := qry.GetDatabase()
Expand Down

0 comments on commit 3855d65

Please sign in to comment.