Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
356 commits
Select commit Hold shift + click to select a range
dc4cbbd
merge fix
cpegeric Jun 11, 2025
3d7be05
better message
cpegeric Jun 11, 2025
ae92606
Merge branch 'main' into cdc_sqlexecutor_cleanup
cpegeric Jun 12, 2025
652783c
fix bvt -- drop pitr
cpegeric Jun 12, 2025
c0f162b
cleanup
cpegeric Jun 12, 2025
4ce4a5d
take timing
cpegeric Jun 13, 2025
ba61a0f
support values as input
cpegeric Jun 19, 2025
67feed9
Comment on composite primary key
cpegeric Jun 19, 2025
1b8735b
comments
cpegeric Jun 19, 2025
a12d40d
add sql writer
cpegeric Jun 19, 2025
acb4387
update
cpegeric Jun 20, 2025
edc33bb
update
cpegeric Jun 20, 2025
0a1f61c
update
cpegeric Jun 20, 2025
c393b7c
update
cpegeric Jun 20, 2025
dabe24b
update
cpegeric Jun 20, 2025
56bc1df
update ivfflat
cpegeric Jun 20, 2025
5e0a2bd
update ivfflat
cpegeric Jun 20, 2025
56b6a28
update
cpegeric Jun 20, 2025
04097a4
add db
cpegeric Jun 20, 2025
9210d80
reset
cpegeric Jun 20, 2025
5aa4712
empty
cpegeric Jun 20, 2025
09de3b1
hnsw sql writer
cpegeric Jun 20, 2025
5a664e6
update hnsw
cpegeric Jun 20, 2025
77037ae
template
cpegeric Jun 20, 2025
5200881
update
cpegeric Jun 20, 2025
fa50232
add test
cpegeric Jun 20, 2025
cfe6a0b
update test
cpegeric Jun 20, 2025
0b04c8f
ivf
cpegeric Jun 20, 2025
da1d2ec
update
cpegeric Jun 20, 2025
581a07d
version
cpegeric Jun 20, 2025
09dc52d
update
cpegeric Jun 20, 2025
e04b63c
update
cpegeric Jun 20, 2025
17057d7
bug fix
cpegeric Jun 20, 2025
d437339
index sinker with IndexSqlWriter
cpegeric Jun 23, 2025
5c648f3
remove comment
cpegeric Jun 23, 2025
53b6438
rename hsnw to index
cpegeric Jun 23, 2025
8069eb0
delete if vector is nil
cpegeric Jun 23, 2025
4afbe23
rename file
cpegeric Jun 23, 2025
4772c2f
support multi-indexes
cpegeric Jun 23, 2025
12429c2
cleanup
cpegeric Jun 23, 2025
b32d846
cleanup
cpegeric Jun 23, 2025
dd93d28
use constant
cpegeric Jun 23, 2025
173589a
rename file
cpegeric Jun 23, 2025
a44aacf
todo
cpegeric Jun 23, 2025
9aa078c
bvt test
cpegeric Jun 23, 2025
01fc88d
cleanup
cpegeric Jun 23, 2025
a39fa82
cleanup
cpegeric Jun 23, 2025
613544e
sca
cpegeric Jun 23, 2025
1a38d8b
add license
cpegeric Jun 23, 2025
be20232
bug fix
cpegeric Jun 23, 2025
622a888
more comments
cpegeric Jun 23, 2025
95a9a5c
delete sql
cpegeric Jun 23, 2025
4f06629
cleanup
cpegeric Jun 23, 2025
3c34919
cleanup
cpegeric Jun 23, 2025
d60145c
bug fix delete row only have 1 column pk
cpegeric Jun 24, 2025
84d945c
bug fix pre-defined column name
cpegeric Jun 24, 2025
789fa58
hardcode composite primary key column to varbinary
cpegeric Jun 24, 2025
504b616
bug fix
cpegeric Jun 24, 2025
e97612c
disable fulltext and ivfflat
cpegeric Jun 24, 2025
3e5ead4
bug fix
cpegeric Jun 24, 2025
f64f6de
Merge branch 'cdc_sqlexecutor_cleanup' into cdc_fulltext
cpegeric Jun 24, 2025
2806c24
only enable hnsw
cpegeric Jun 24, 2025
9ba390e
add async option
cpegeric Jun 25, 2025
ef894ea
skip async with DML
cpegeric Jun 25, 2025
1de75a6
catalog.IsIndexAsync
cpegeric Jun 25, 2025
9f83f62
async
cpegeric Jun 25, 2025
4d9cca5
Merge branch 'cdc_fulltext' into cdc_sqlexecutor_cleanup
cpegeric Jun 25, 2025
4e527c4
update
cpegeric Jun 25, 2025
99ce5a6
fix sca
cpegeric Jun 25, 2025
0f89628
fix merge
cpegeric Jun 25, 2025
14b0e7f
Merge branch 'main' into cdc_fulltext
cpegeric Jun 26, 2025
5143b81
Merge branch 'main' into cdc_fulltext
cpegeric Jun 27, 2025
72231a7
add cdc util
cpegeric Jun 27, 2025
7f7e096
create/delete cdc task
cpegeric Jun 27, 2025
884102e
update
cpegeric Jun 27, 2025
82b76f8
update
cpegeric Jun 27, 2025
959b527
update
cpegeric Jun 27, 2025
662fd57
Merge branch 'cdc_fulltext' into cdc_sqlexecutor_cleanup
cpegeric Jun 27, 2025
c16cf90
truncate table
cpegeric Jun 27, 2025
b423bcc
truncate table
cpegeric Jun 27, 2025
20fb6e5
update
cpegeric Jun 27, 2025
5caab53
cleanup
cpegeric Jun 27, 2025
43e9116
update
cpegeric Jun 27, 2025
a4a753d
update
cpegeric Jun 27, 2025
42a9adc
hnsw disable alter reindex
cpegeric Jun 27, 2025
2ed6051
alter reindex
cpegeric Jun 27, 2025
164693e
sca
cpegeric Jun 27, 2025
7757192
bug fix
cpegeric Jun 27, 2025
38775df
bug fix
cpegeric Jun 27, 2025
852b8c2
update
cpegeric Jun 30, 2025
d7241ae
use pitr_name
cpegeric Jun 30, 2025
b6639d2
add check pitr before create
cpegeric Jun 30, 2025
49a4cef
update
cpegeric Jun 30, 2025
f09eb13
update
cpegeric Jun 30, 2025
22e91b3
update
cpegeric Jun 30, 2025
04393a1
consumer
cpegeric Jun 30, 2025
3400935
license
cpegeric Jun 30, 2025
11c74ff
update
cpegeric Jun 30, 2025
7d106fb
update
cpegeric Jun 30, 2025
d4e19e6
use transaction from DataRetriever
cpegeric Jul 1, 2025
1704875
update watermark
cpegeric Jul 1, 2025
4c4c3a8
update
cpegeric Jul 1, 2025
7b02d81
update
cpegeric Jul 1, 2025
16600d3
statement option
cpegeric Jul 1, 2025
ab06863
statement option
cpegeric Jul 1, 2025
e83058f
snapshot
cpegeric Jul 1, 2025
952b57a
run
cpegeric Jul 1, 2025
06af88a
update
cpegeric Jul 1, 2025
c72197d
move to idxcdc
cpegeric Jul 1, 2025
1d2b203
update
cpegeric Jul 1, 2025
ffa5da7
update
cpegeric Jul 1, 2025
fc4b7d2
update idxcdc
cpegeric Jul 1, 2025
3b0ca17
update
cpegeric Jul 1, 2025
fab81f1
tail use insert, snapshot use upsert
cpegeric Jul 1, 2025
268ab6e
update
cpegeric Jul 1, 2025
d06cb28
update
cpegeric Jul 1, 2025
76b2c62
mock retriever
cpegeric Jul 1, 2025
98439c8
flush at the end
cpegeric Jul 1, 2025
e0aca74
update test
cpegeric Jul 2, 2025
352655d
update
cpegeric Jul 2, 2025
2567793
add test
cpegeric Jul 2, 2025
ed0c94b
merge fix watermarkUpdater
cpegeric Jul 2, 2025
ccf384c
update
cpegeric Jul 2, 2025
9afc4df
Merge branch 'main' into cdc_fulltext
cpegeric Jul 3, 2025
0b6cfd9
add cnUUID
cpegeric Jul 3, 2025
97113c4
remove unneccessary code
cpegeric Jul 3, 2025
7259ddc
update
cpegeric Jul 3, 2025
5aa0926
api
cpegeric Jul 3, 2025
55258af
merge fix
cpegeric Jul 8, 2025
7a6ba7e
bug fix cdc
cpegeric Jul 8, 2025
a5ed284
bvt test
cpegeric Jul 8, 2025
839a494
fix drop index
cpegeric Jul 8, 2025
6ed579a
fix sca
cpegeric Jul 8, 2025
381665e
Merge branch 'main' into cdc_fulltext
cpegeric Jul 9, 2025
346f32c
Merge branch 'main' into cdc_sqlexecutor_cleanup
cpegeric Jul 9, 2025
9edbd11
bug fix thread id
cpegeric Jul 9, 2025
e744672
Merge branch 'cdc_sqlexecutor_cleanup' into cdc_fulltext
cpegeric Jul 9, 2025
f1b1962
rename idxcdc to iscp
cpegeric Jul 22, 2025
51a0044
merge fix function id
cpegeric Jul 22, 2025
44c63ff
fix sca
cpegeric Jul 22, 2025
254f20e
fix sca
cpegeric Jul 23, 2025
cef2cac
merge fix
cpegeric Aug 18, 2025
8db5fe7
merge fix
cpegeric Aug 19, 2025
cb1f323
new index consumer
cpegeric Aug 19, 2025
16c0cd3
uncomment used code
cpegeric Aug 20, 2025
3f9d25e
Merge branch 'main' into cdc_fulltext_merge
cpegeric Aug 20, 2025
718ca4b
merge fix
cpegeric Aug 22, 2025
6e02233
update
cpegeric Aug 22, 2025
19a52a3
rename
cpegeric Aug 22, 2025
5819c2d
update async bvt
cpegeric Aug 22, 2025
f1ee83b
add index consumer
cpegeric Aug 22, 2025
9eb0c4e
remove unused files
cpegeric Aug 22, 2025
bc3ed22
Merge branch 'main' into cdc_fulltext_merge
cpegeric Aug 26, 2025
4b2cc9c
fix account id
cpegeric Aug 26, 2025
82fbd5c
fix data retriever mock
cpegeric Aug 26, 2025
40ccd63
merge fix
cpegeric Aug 26, 2025
9efa664
fix sca
cpegeric Aug 26, 2025
2e815bb
bug fix default with sys account and change to particular id by optio…
cpegeric Aug 27, 2025
55f769d
revert
cpegeric Aug 27, 2025
640008e
system account
cpegeric Aug 27, 2025
dfa6166
fix ivf index name
cpegeric Aug 27, 2025
3fd0782
support secondary index table
cpegeric Aug 28, 2025
4c6d1fb
support secondary index table
cpegeric Aug 28, 2025
8986c1f
Revert "support secondary index table"
cpegeric Aug 28, 2025
b34aa55
fix cast null value with type
cpegeric Aug 28, 2025
266d9bd
Merge branch 'main' into cdc_fulltext_merge
cpegeric Aug 28, 2025
1c08614
Merge branch 'main' into cdc_fulltext_merge
cpegeric Aug 29, 2025
45851c7
better checking with valid cdc async index
cpegeric Aug 29, 2025
bc1218f
bug fix alter table drop index
cpegeric Aug 29, 2025
6262a1e
update
cpegeric Aug 29, 2025
5e55a2d
Merge branch 'main' into cdc_fulltext_merge
cpegeric Aug 29, 2025
7c77985
Merge branch 'main' into cdc_fulltext_merge
cpegeric Sep 2, 2025
89baab0
drop database also remove iscp jobs
cpegeric Sep 2, 2025
bb06ed8
bug fix check tableIDs is empty
cpegeric Sep 2, 2025
2bd1f1b
check data is nil
cpegeric Sep 3, 2025
3ba370c
bug fix hnsw load index not free when error
cpegeric Sep 4, 2025
da6d569
limit max 2k values per insert into to avoid slow gc and OOM
cpegeric Sep 4, 2025
f23c60c
Merge branch 'main' into cdc_fulltext_merge
cpegeric Sep 4, 2025
ed631c7
add bvt tests
cpegeric Sep 4, 2025
48acc18
fix alter table add column and fulltext
cpegeric Sep 5, 2025
1f58a3d
bug fix check dropped
cpegeric Sep 5, 2025
1121aae
merge fix
cpegeric Sep 5, 2025
d407424
Merge branch 'main' into iscp_merge
LeftHandCold Sep 8, 2025
f089a44
Merge branch 'main' into iscp_merge
cpegeric Sep 8, 2025
74aea64
Merge branch 'main' into iscp_merge
cpegeric Sep 8, 2025
dd731b2
merge fix
cpegeric Sep 12, 2025
3d6874b
bug fix async option
cpegeric Sep 12, 2025
1bed417
check error
cpegeric Sep 12, 2025
e8ad5e1
add type to error message
cpegeric Sep 12, 2025
946c284
print value
cpegeric Sep 12, 2025
1a40787
Merge branch 'main' into iscp_merge
cpegeric Sep 15, 2025
adae0b2
add ut tests
cpegeric Sep 15, 2025
04d1b11
license
cpegeric Sep 15, 2025
e53d604
add more ut
cpegeric Sep 15, 2025
73a2ec0
cleanup
cpegeric Sep 15, 2025
104d882
fix sca
cpegeric Sep 15, 2025
0adf3a4
fix sca
cpegeric Sep 15, 2025
af00c96
update tests
cpegeric Sep 15, 2025
3b49c87
add vector type as input argument to hnsw_cdc_sync function
cpegeric Sep 16, 2025
83195d0
hnsw with generic types
cpegeric Sep 16, 2025
52589af
Merge branch 'main' into iscp_hnsw
mergify[bot] Sep 16, 2025
1463552
update usearch
cpegeric Sep 17, 2025
4faa38c
support f64
cpegeric Sep 17, 2025
d33d882
merge fix
cpegeric Sep 17, 2025
1e89800
Merge branch 'iscp_merge' into iscp_hnsw_merge
cpegeric Sep 17, 2025
39093b6
remove invalid comment
cpegeric Sep 17, 2025
401e636
bug fix license and update tests
cpegeric Sep 17, 2025
38c042d
fix sca
cpegeric Sep 17, 2025
86e5eca
hnsw support fp64 sync
cpegeric Sep 17, 2025
5914f6b
add bvt tests
cpegeric Sep 17, 2025
031988b
ut test
cpegeric Sep 17, 2025
2381cef
fix hnsw function test
cpegeric Sep 17, 2025
d7dd439
more ut tests
cpegeric Sep 17, 2025
600d305
rename table
cpegeric Sep 18, 2025
d1d2762
load from memory
cpegeric Sep 18, 2025
afb7755
keepalive
cpegeric Sep 18, 2025
4e8778e
Merge branch 'main' into iscp_hnsw
cpegeric Sep 19, 2025
7f50dd8
new iscp api and remove pitr
cpegeric Sep 19, 2025
5e37460
remove pitr
cpegeric Sep 19, 2025
df49221
add ut tests
cpegeric Sep 19, 2025
629a93e
more tests
cpegeric Sep 19, 2025
39fbbe6
make sure error send
cpegeric Sep 19, 2025
4b7f3d5
rename table
cpegeric Sep 19, 2025
82f24c4
support l2sq
cpegeric Sep 19, 2025
6dbb6ac
merge fix
cpegeric Sep 22, 2025
00ac2c9
add ut test
cpegeric Sep 22, 2025
f70d68f
add ut test
cpegeric Sep 22, 2025
05fcf76
license
cpegeric Sep 22, 2025
8efb51a
more ut test and comment unused code
cpegeric Sep 22, 2025
4b7a537
more ut test and comment unused code
cpegeric Sep 22, 2025
3dd22be
sync ut test
cpegeric Sep 22, 2025
36d5876
update test
cpegeric Sep 22, 2025
5f06b17
update
cpegeric Sep 22, 2025
52e45a6
add ut tests
cpegeric Sep 23, 2025
44c2e31
Merge branch 'main' into iscp_hnsw
cpegeric Sep 23, 2025
560e281
always use REPLACE and hnsw always check key exists in models
cpegeric Sep 24, 2025
0ae25bb
merge fix
cpegeric Sep 24, 2025
38a40ea
Merge branch 'main' into iscp_hnsw
cpegeric Sep 25, 2025
b33f22b
merge fix
cpegeric Sep 29, 2025
674391a
update ut
cpegeric Sep 29, 2025
023573f
add ut test
cpegeric Sep 29, 2025
e6e094e
license
cpegeric Sep 29, 2025
d5896cb
comment unused code
cpegeric Sep 29, 2025
4ca3541
fix sca
cpegeric Sep 29, 2025
5c9c0c6
remove stderr
cpegeric Sep 29, 2025
ef9e30e
update
cpegeric Sep 29, 2025
0ab3101
comment
cpegeric Sep 29, 2025
172552a
rename file
cpegeric Sep 29, 2025
b95eaff
register job at the end
cpegeric Sep 29, 2025
966d692
set size when init VectorIndexCdc
cpegeric Sep 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/iscp/index_sqlwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,9 @@ func NewGenericHnswSqlWriter[T types.RealNumbers](algo string, jobID JobID, info

// get the first indexdef as they are the same
idxdef := indexdef[0]
writer_capacity := 8192

w := &HnswSqlWriter[T]{tabledef: tabledef, indexdef: indexdef, jobID: jobID, info: info, cdc: vectorindex.NewVectorIndexCdc[T]()}
w := &HnswSqlWriter[T]{tabledef: tabledef, indexdef: indexdef, jobID: jobID, info: info, cdc: vectorindex.NewVectorIndexCdc[T](writer_capacity)}

paramstr := idxdef.IndexAlgoParams
var meta, storage string
Expand Down
59 changes: 28 additions & 31 deletions pkg/sql/compile/alter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func convertDBEOB(ctx context.Context, e error, name string) error {
func (s *Scope) AlterTableCopy(c *Compile) error {
qry := s.Plan.GetDdl().GetAlterTable()
dbName := qry.Database

if dbName == "" {
dbName = c.db
}
Expand Down Expand Up @@ -149,25 +150,34 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
if qry.Options.SkipPkDedup || len(qry.Options.SkipUniqueIdxDedup) > 0 {
opt = opt.WithAlterCopyOpt(qry.Options)
}
// 4. copy the original table data to the temporary replica table
err = c.runSqlWithOptions(qry.InsertTmpDataSql, opt)

//4. obtain relation for new tables
newRel, err := dbSource.Relation(c.proc.Ctx, qry.CopyTableDef.Name, nil)
if err != nil {
c.proc.Error(c.proc.Ctx, "insert data to copy table for alter table",
c.proc.Error(c.proc.Ctx, "obtain new relation for copy table for alter table",
zap.String("databaseName", dbName),
zap.String("origin tableName", qry.GetTableDef().Name),
zap.String("copy tableName", qry.CopyTableDef.Name),
zap.String("InsertTmpDataSql", qry.InsertTmpDataSql),
zap.String("copy table name", qry.CopyTableDef.Name),
zap.Error(err))
return err
}

//5. obtain relation for new tables
newRel, err := dbSource.Relation(c.proc.Ctx, qry.CopyTableDef.Name, nil)
//5. ISCP: temp table already created pitr and iscp job with temp table name
// and we don't want iscp to run with temp table so drop pitr and iscp job with the temp table here
newTmpTableDef := newRel.CopyTableDef(c.proc.Ctx)
err = DropAllIndexCdcTasks(c, newTmpTableDef, dbName, qry.CopyTableDef.Name)
if err != nil {
c.proc.Error(c.proc.Ctx, "obtain new relation for copy table for alter table",
return err
}

// 6. copy the original table data to the temporary replica table
err = c.runSqlWithOptions(qry.InsertTmpDataSql, opt)
if err != nil {
c.proc.Error(c.proc.Ctx, "insert data to copy table for alter table",
zap.String("databaseName", dbName),
zap.String("origin tableName", qry.GetTableDef().Name),
zap.String("copy table name", qry.CopyTableDef.Name),
zap.String("copy tableName", qry.CopyTableDef.Name),
zap.String("InsertTmpDataSql", qry.InsertTmpDataSql),
zap.Error(err))
return err
}
Expand All @@ -179,7 +189,8 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
return err
}

// 7. drop original table
// 7. drop original table.
// ISCP: That will also drop ISCP related jobs and pitr of the original table.
dropSql := fmt.Sprintf("drop table `%s`.`%s`", dbName, tblName)
if err := c.runSqlWithOptions(
dropSql,
Expand All @@ -193,26 +204,6 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
return err
}

// 7.1 delete all index objects of the table in mo_catalog.mo_indexes
if qry.Database != catalog.MO_CATALOG && qry.TableDef.Name != catalog.MO_INDEXES {
if qry.GetTableDef().Pkey != nil || len(qry.GetTableDef().Indexes) > 0 {
deleteSql := fmt.Sprintf(
deleteMoIndexesWithTableIdFormat,
qry.GetTableDef().TblId,
)
err = c.runSql(deleteSql)
if err != nil {
c.proc.Error(c.proc.Ctx, "delete all index meta data of origin table in `mo_indexes` for alter table",
zap.String("databaseName", dbName),
zap.String("origin tableName", qry.GetTableDef().Name),
zap.String("delete all index sql", deleteSql),
zap.Error(err))

return err
}
}
}

newId := newRel.GetTableID(c.proc.Ctx)
//-------------------------------------------------------------------------
// 8. rename temporary replica table into the original table(Table Id remains unchanged)
Expand All @@ -237,11 +228,11 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
return err
}

newTableDef := newRel.CopyTableDef(c.proc.Ctx)
//--------------------------------------------------------------------------------------------------------------
{
// 9. invoke reindex for the new table, if it contains ivf index.
multiTableIndexes := make(map[string]*MultiTableIndex)
newTableDef := newRel.CopyTableDef(c.proc.Ctx)
extra := newRel.GetExtraInfo()
id := newRel.GetTableID(c.proc.Ctx)

Expand Down Expand Up @@ -311,6 +302,12 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
}
}

// 10. register ISCP job again
err = CreateAllIndexCdcTasks(c, newTableDef.Indexes, dbName, tblName)
if err != nil {
return err
}

// get and update the change mapping information of table colIds
if err = updateNewTableColId(c, newRel, qry.ChangeTblColIdMap); err != nil {
c.proc.Error(c.proc.Ctx, "get and update the change mapping information of table colIds for alter table",
Expand Down
84 changes: 81 additions & 3 deletions pkg/sql/compile/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

moruntime "github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/iscp"
"github.com/matrixorigin/matrixone/pkg/pb/task"

"github.com/matrixorigin/matrixone/pkg/cdc"
Expand Down Expand Up @@ -239,6 +240,13 @@ func (s *Scope) DropDatabase(c *Compile) error {
return err
}
}

// 5.unregister iscp jobs
err = iscp.UnregisterJobsByDBName(c.proc.Ctx, c.proc.GetService(), c.proc.GetTxnOperator(), dbName)
if err != nil {
return err
}

return err
}

Expand Down Expand Up @@ -589,6 +597,13 @@ func (s *Scope) AlterTableInplace(c *Compile) error {
newIndexes = append(newIndexes, extra.IndexTables[idx])
}
}

// drop index cdc task
err = DropIndexCdcTask(c, oTableDef, dbName, tblName, constraintName)
if err != nil {
return err
}

// Avoid modifying slice directly during iteration
oTableDef.Indexes = notDroppedIndex
extra.IndexTables = newIndexes
Expand Down Expand Up @@ -746,9 +761,8 @@ func (s *Scope) AlterTableInplace(c *Compile) error {
if err != nil {
return err
}

case catalog.MoIndexHnswAlgo.ToString():
// PASS: keep option unchange for incremental update
// PASS
default:
return moerr.NewInternalError(c.proc.Ctx, "invalid index algo type for alter reindex")
}
Expand Down Expand Up @@ -890,6 +904,25 @@ func (s *Scope) AlterTableInplace(c *Compile) error {
return err
}

// post alter table rename -- AlterKind_RenameTable to update iscp job
for _, req := range reqs {
if req.Kind == api.AlterKind_RenameTable {
op, ok := req.Operation.(*api.AlterTableReq_RenameTable)
if ok {
err = iscp.RenameSrcTable(c.proc.Ctx,
c.proc.GetService(),
c.proc.GetTxnOperator(),
req.DbId,
req.TableId,
op.RenameTable.OldName,
op.RenameTable.NewName)
if err != nil {
return err
}
}
}
}

// remove refChildTbls for drop foreign key clause
//remove the child table id -- tblId from the parent table -- fkTblId
for _, fkTblId := range removeRefChildTbls {
Expand Down Expand Up @@ -1468,6 +1501,21 @@ func (s *Scope) CreateTable(c *Compile) error {
)
return err
}

// create iscp jobs for index async update
ct, err := GetConstraintDef(c.proc.Ctx, newRelation)
if err != nil {
return err
}
for _, constraint := range ct.Cts {
if idxdef, ok := constraint.(*engine.IndexDef); ok && len(idxdef.Indexes) > 0 {
err = CreateAllIndexCdcTasks(c, idxdef.Indexes, dbName, tblName)
if err != nil {
return err
}
}
}

}

if c.keepAutoIncrement == 0 {
Expand Down Expand Up @@ -2078,6 +2126,21 @@ func (s *Scope) handleVectorIvfFlatIndex(
return err
}

async, err := catalog.IsIndexAsync(indexDefs[catalog.SystemSI_IVFFLAT_TblType_Metadata].IndexAlgoParams)
if err != nil {
return err
}

// create ISCP job when Async is true
if async {
logutil.Infof("Ivfflat index Async is true")
sinker_type := getSinkerTypeFromAlgo(catalog.MoIndexIvfFlatAlgo.ToString())
err = CreateIndexCdcTask(c, qryDatabase, originalTableDef.Name,
indexDefs[catalog.SystemSI_IVFFLAT_TblType_Metadata].IndexName, sinker_type)
if err != nil {
return err
}
}
return nil

}
Expand All @@ -2102,6 +2165,9 @@ func (s *Scope) DropIndex(c *Compile) error {
return err
}

// old tabledef
oldTableDef := r.GetTableDef(c.proc.Ctx)

//1. build and update constraint def
oldCt, err := GetConstraintDef(c.proc.Ctx, r)
if err != nil {
Expand Down Expand Up @@ -2130,14 +2196,20 @@ func (s *Scope) DropIndex(c *Compile) error {
return err
}

//3. delete iscp job for vector, fulltext index
err = DropIndexCdcTask(c, oldTableDef, qry.Database, qry.Table, qry.IndexName)
if err != nil {
return err
}
}

//3. delete index object from mo_catalog.mo_indexes
//4. delete index object from mo_catalog.mo_indexes
deleteSql := fmt.Sprintf(deleteMoIndexesWithTableIdAndIndexNameFormat, r.GetTableID(c.proc.Ctx), qry.IndexName)
err = c.runSql(deleteSql)
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -2651,6 +2723,12 @@ func (s *Scope) DropTable(c *Compile) error {
}
}

// delete cdc task of the vector and fulltext index here
err = DropAllIndexCdcTasks(c, rel.GetTableDef(c.proc.Ctx), qry.Database, qry.Table)
if err != nil {
return err
}

// delete all index objects record of the table in mo_catalog.mo_indexes
if !qry.IsView && qry.Database != catalog.MO_CATALOG && qry.Table != catalog.MO_INDEXES {
if qry.GetTableDef().Pkey != nil || len(qry.GetTableDef().Indexes) > 0 {
Expand Down
31 changes: 30 additions & 1 deletion pkg/sql/compile/ddl_index_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/util/executor"
Expand Down Expand Up @@ -140,6 +141,7 @@ func (s *Scope) handleFullTextIndexTable(
return moerr.NewInternalErrorNoCtx("FullText index is not enabled")
}

// create hidden tables
if indexInfo != nil {
if len(indexInfo.GetIndexTables()) != 1 {
return moerr.NewInternalErrorNoCtx("index table count not equal to 1")
Expand All @@ -152,13 +154,33 @@ func (s *Scope) handleFullTextIndexTable(
}
}

insertSQLs := genInsertIndexTableSqlForFullTextIndex(originalTableDef, indexDef, qryDatabase)
insertSQLs, err := genInsertIndexTableSqlForFullTextIndex(originalTableDef, indexDef, qryDatabase)
if err != nil {
return err
}

for _, insertSQL := range insertSQLs {
err = c.runSql(insertSQL)
if err != nil {
return err
}
}

async, err := catalog.IsIndexAsync(indexDef.IndexAlgoParams)
if err != nil {
return err
}
// create ISCP job for Async fulltext index
if async {
logutil.Infof("fulltext index Async is true")
sinker_type := getSinkerTypeFromAlgo(catalog.MOIndexFullTextAlgo.ToString())
err = CreateIndexCdcTask(c, qryDatabase, originalTableDef.Name,
indexDef.IndexName, sinker_type)
if err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -580,5 +602,12 @@ func (s *Scope) handleVectorHnswIndex(
}
}

// 4. register ISCP job for async update
sinker_type := getSinkerTypeFromAlgo(catalog.MoIndexHnswAlgo.ToString())
err = CreateIndexCdcTask(c, qryDatabase, originalTableDef.Name, indexDefs[catalog.Hnsw_TblType_Metadata].IndexName, sinker_type)
if err != nil {
return err
}

return nil
}
Loading
Loading