diff --git a/ace.yaml b/ace.yaml index 04dd9ac..f059b05 100644 --- a/ace.yaml +++ b/ace.yaml @@ -27,6 +27,7 @@ mtree: cdc: slot_name: "ace_mtree_slot" publication_name: "ace_mtree_pub" + schema: "spock" diff: min_block_size: 1000 diff --git a/db/queries/queries.go b/db/queries/queries.go index a4d8486..eb04afa 100644 --- a/db/queries/queries.go +++ b/db/queries/queries.go @@ -2715,3 +2715,20 @@ func UpdateMtreeCounters(ctx context.Context, db DBQuerier, mtreeTable string, i _, err = db.Exec(ctx, sql, args) return err } + +func CreateSchema(ctx context.Context, db DBQuerier, schemaName string) error { + data := map[string]interface{}{ + "SchemaName": schemaName, + } + sql, err := RenderSQL(SQLTemplates.CreateSchema, data) + if err != nil { + return fmt.Errorf("failed to render CreateSchema SQL: %w", err) + } + + _, err = db.Exec(ctx, sql) + if err != nil { + return fmt.Errorf("query to create schema failed: %w", err) + } + + return nil +} diff --git a/db/queries/templates.go b/db/queries/templates.go index 74d7526..a420b31 100644 --- a/db/queries/templates.go +++ b/db/queries/templates.go @@ -110,12 +110,13 @@ type Templates struct { GetReplicationSlotPID *template.Template TerminateBackend *template.Template CheckPIDExists *template.Template + CreateSchema *template.Template } var SQLTemplates = Templates{ // A template isn't needed for this query; just keeping the struct uniform CreateMetadataTable: template.Must(template.New("createMetadataTable").Parse(` - CREATE TABLE IF NOT EXISTS ace_mtree_metadata ( + CREATE TABLE IF NOT EXISTS spock.ace_mtree_metadata ( schema_name text, table_name text, total_rows bigint, @@ -151,7 +152,7 @@ var SQLTemplates = Templates{ UpdateCDCMetadata: template.Must(template.New("updateCdcMetadata").Parse(` INSERT INTO - ace_mtree_cdc_metadata ( + spock.ace_mtree_cdc_metadata ( publication_name, slot_name, start_lsn, @@ -191,7 +192,7 @@ var SQLTemplates = Templates{ SELECT pid FROM pg_stat_activity WHERE pid = $1 `)), DropCDCMetadataTable: template.Must(template.New("dropCDCMetadataTable").Parse(` - DROP TABLE IF EXISTS ace_mtree_cdc_metadata + DROP TABLE IF EXISTS spock.ace_mtree_cdc_metadata `)), GetCDCMetadata: template.Must(template.New("getCDCMetadata").Parse(` @@ -200,7 +201,7 @@ var SQLTemplates = Templates{ start_lsn, tables FROM - ace_mtree_cdc_metadata + spock.ace_mtree_cdc_metadata WHERE publication_name = $1 `)), @@ -288,7 +289,7 @@ var SQLTemplates = Templates{ `)), CreateCDCMetadataTable: template.Must(template.New("createCDCMetadataTable").Parse(` - CREATE TABLE IF NOT EXISTS ace_mtree_cdc_metadata ( + CREATE TABLE IF NOT EXISTS spock.ace_mtree_cdc_metadata ( publication_name text PRIMARY KEY, slot_name text, start_lsn text, @@ -719,7 +720,7 @@ var SQLTemplates = Templates{ `)), CreateXORFunction: template.Must(template.New("createXORFunction").Parse(` CREATE - OR REPLACE FUNCTION bytea_xor(a bytea, b bytea) RETURNS bytea AS $$ + OR REPLACE FUNCTION spock.bytea_xor(a bytea, b bytea) RETURNS bytea AS $$ DECLARE result bytea; len int; @@ -750,7 +751,7 @@ var SQLTemplates = Templates{ CREATE OPERATOR # ( LEFTARG = bytea, RIGHTARG = bytea, - PROCEDURE = bytea_xor + PROCEDURE = spock.bytea_xor ); END IF; END $$; @@ -787,7 +788,7 @@ var SQLTemplates = Templates{ `)), UpdateMetadata: template.Must(template.New("updateMetadata").Parse(` INSERT INTO - ace_mtree_metadata ( + spock.ace_mtree_metadata ( schema_name, table_name, total_rows, @@ -1001,7 +1002,7 @@ var SQLTemplates = Templates{ SELECT total_rows FROM - ace_mtree_metadata + spock.ace_mtree_metadata WHERE schema_name = $1 AND table_name = $2 @@ -1272,7 +1273,7 @@ var SQLTemplates = Templates{ SELECT block_size FROM - ace_mtree_metadata + spock.ace_mtree_metadata WHERE schema_name = $1 AND table_name = $2 @@ -1292,10 +1293,10 @@ var SQLTemplates = Templates{ {{.WhereClause}} `)), DropXORFunction: template.Must(template.New("dropXORFunction").Parse(` - DROP FUNCTION IF EXISTS bytea_xor(bytea, bytea) CASCADE + DROP FUNCTION IF EXISTS spock.bytea_xor(bytea, bytea) CASCADE `)), DropMetadataTable: template.Must(template.New("dropMetadataTable").Parse(` - DROP TABLE IF EXISTS ace_mtree_metadata CASCADE + DROP TABLE IF EXISTS spock.ace_mtree_metadata CASCADE `)), DropMtreeTable: template.Must(template.New("dropMtreeTable").Parse(` DROP TABLE IF EXISTS {{.MtreeTable}} CASCADE @@ -1452,4 +1453,7 @@ var SQLTemplates = Templates{ UpdateNodePositionsWithOffset: template.Must(template.New("updateNodePositionsWithOffset").Parse(` UPDATE {{.MtreeTable}} SET node_position = node_position + $1 WHERE node_level = 0 `)), + CreateSchema: template.Must(template.New("createSchema").Parse(` + CREATE SCHEMA IF NOT EXISTS {{.SchemaName}} + `)), } diff --git a/internal/core/merkle_trees.go b/internal/core/merkle_trees.go index 56344ee..cf7ead1 100644 --- a/internal/core/merkle_trees.go +++ b/internal/core/merkle_trees.go @@ -42,6 +42,10 @@ import ( "github.com/vbauerster/mpb/v8/decor" ) +func aceSchema() string { + return config.Cfg.MTree.Schema +} + const tableAlreadyInPublicationError = "42710" type MerkleTreeTask struct { @@ -327,6 +331,11 @@ func (m *MerkleTreeTask) MtreeInit() error { } defer pool.Close() + err = queries.CreateSchema(context.Background(), pool, aceSchema()) + if err != nil { + return fmt.Errorf("failed to create schema '%s': %w", aceSchema(), err) + } + tx, err := pool.Begin(context.Background()) if err != nil { return fmt.Errorf("failed to begin transaction on node %s: %w", nodeInfo["Name"], err) @@ -818,7 +827,8 @@ func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) error { var compositeTypeName string if !m.SimplePrimaryKey { - compositeTypeName = fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table) + compositeTypeIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)} + compositeTypeName = compositeTypeIdentifier.Sanitize() dt, err := conn.Conn().LoadType(context.Background(), compositeTypeName) if err != nil { return fmt.Errorf("failed to load composite type %s: %w", compositeTypeName, err) @@ -832,7 +842,8 @@ func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) error { } defer tx.Rollback(context.Background()) - mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table) + mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableName := mtreeTableIdentifier.Sanitize() blocksToUpdate, err := queries.GetDirtyAndNewBlocks(context.Background(), tx, mtreeTableName, m.SimplePrimaryKey, m.Key) if err != nil { @@ -936,12 +947,14 @@ func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) error { } func (m *MerkleTreeTask) splitBlocks(tx pgx.Tx, blocksToSplit []types.BlockRange) ([]int64, error) { - mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table) + mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableName := mtreeTableIdentifier.Sanitize() isComposite := !m.SimplePrimaryKey ctx := context.Background() var modifiedPositions []int64 - compositeTypeName := fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table) + compositeTypeIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)} + compositeTypeName := compositeTypeIdentifier.Sanitize() currentBlocks := make([]types.BlockRange, len(blocksToSplit)) copy(currentBlocks, blocksToSplit) @@ -1057,7 +1070,8 @@ func (m *MerkleTreeTask) splitBlocks(tx pgx.Tx, blocksToSplit []types.BlockRange func (m *MerkleTreeTask) performMerges(tx pgx.Tx) ([]int64, error) { var allModifiedPositions []int64 - mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table) + mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableName := mtreeTableIdentifier.Sanitize() for { blocksToMerge, err := queries.FindBlocksToMerge(context.Background(), tx, mtreeTableName, m.SimplePrimaryKey, m.Schema, m.Table, m.Key, 0.25, []int64{}) @@ -1105,7 +1119,8 @@ func (m *MerkleTreeTask) DiffMtree() error { return fmt.Errorf("failed to update merkle tree before diff: %w", err) } nodePairs := getNodePairs(m.ClusterNodes) - mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table) + mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableName := mtreeTableIdentifier.Sanitize() allNodePairBatches := make(map[string]CompareRangesWorkItem) @@ -1216,7 +1231,8 @@ func (m *MerkleTreeTask) DiffMtree() error { func (m *MerkleTreeTask) findMismatchedLeaves(pool1, pool2 *pgxpool.Pool, parentLevel int, parentPosition int64) (map[int64]bool, error) { mismatched := make(map[int64]bool) - mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table) + mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableName := mtreeTableIdentifier.Sanitize() children1, err := queries.GetNodeChildren(context.Background(), pool1, mtreeTableName, parentLevel, int(parentPosition)) if err != nil { @@ -1279,7 +1295,8 @@ func (m *MerkleTreeTask) findMismatchedLeaves(pool1, pool2 *pgxpool.Pool, parent } func (m *MerkleTreeTask) getPkeyBatches(pool1, pool2 *pgxpool.Pool, mismatchedPositions []int64) ([][2][]any, error) { - mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table) + mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableName := mtreeTableIdentifier.Sanitize() leafRanges1, err := queries.GetLeafRanges(context.Background(), pool1, mtreeTableName, mismatchedPositions, m.SimplePrimaryKey, m.Key) if err != nil { @@ -1359,12 +1376,14 @@ func intervalIntersects(start, end any, allRanges []types.LeafRange) bool { } func (m *MerkleTreeTask) mergeBlocks(tx pgx.Tx, blocksToMerge []types.BlockRange) ([]int64, error) { - mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table) + mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableName := mtreeTableIdentifier.Sanitize() isComposite := !m.SimplePrimaryKey ctx := context.Background() var modifiedPositions []int64 - compositeTypeName := fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table) + compositeTypeIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)} + compositeTypeName := compositeTypeIdentifier.Sanitize() if err := queries.DeleteParentNodes(ctx, tx, mtreeTableName); err != nil { return nil, fmt.Errorf("failed to delete parent nodes: %w", err) @@ -1414,7 +1433,8 @@ func (m *MerkleTreeTask) mergeBlocks(tx pgx.Tx, blocksToMerge []types.BlockRange } func (m *MerkleTreeTask) buildParentNodes(conn queries.DBQuerier) error { - mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table) + mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableName := mtreeTableIdentifier.Sanitize() var err error if tx, ok := conn.(pgx.Tx); ok { @@ -1460,7 +1480,8 @@ type LeafHashResult struct { } func (m *MerkleTreeTask) computeLeafHashes(pool *pgxpool.Pool, tx pgx.Tx, ranges []types.BlockRange) error { - mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table) + mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableName := mtreeTableIdentifier.Sanitize() numWorkers := int(float64(runtime.NumCPU()) * m.MaxCpuRatio) if numWorkers < 1 { @@ -1532,15 +1553,14 @@ func (m *MerkleTreeTask) leafHashWorker(wg *sync.WaitGroup, jobs <-chan types.Bl } func (m *MerkleTreeTask) insertBlockRanges(conn queries.DBQuerier, ranges []types.BlockRange) error { - mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table) - mtreeTableIdent := pgx.Identifier{mtreeTableName} + mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} if m.SimplePrimaryKey { - if err := queries.InsertBlockRangesBatchSimple(context.Background(), conn, mtreeTableIdent.Sanitize(), ranges); err != nil { + if err := queries.InsertBlockRangesBatchSimple(context.Background(), conn, mtreeTableIdentifier.Sanitize(), ranges); err != nil { return err } } else { - if err := queries.InsertBlockRangesBatchComposite(context.Background(), conn, mtreeTableIdent.Sanitize(), ranges, len(m.Key)); err != nil { + if err := queries.InsertBlockRangesBatchComposite(context.Background(), conn, mtreeTableIdentifier.Sanitize(), ranges, len(m.Key)); err != nil { return err } } @@ -1550,7 +1570,12 @@ func (m *MerkleTreeTask) insertBlockRanges(conn queries.DBQuerier, ranges []type func (m *MerkleTreeTask) createMtreeObjects(tx pgx.Tx, totalRows int64, numBlocks int) error { - err := queries.CreateXORFunction(context.Background(), tx) + err := queries.CreateSchema(context.Background(), tx, aceSchema()) + if err != nil { + return fmt.Errorf("failed to create schema '%s': %w", aceSchema(), err) + } + + err = queries.CreateXORFunction(context.Background(), tx) if err != nil { return fmt.Errorf("failed to create xor function: %w", err) } @@ -1565,7 +1590,8 @@ func (m *MerkleTreeTask) createMtreeObjects(tx pgx.Tx, totalRows int64, numBlock return fmt.Errorf("failed to update metadata: %w", err) } - mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table) + mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableName := mtreeTableIdentifier.Sanitize() err = queries.DropMtreeTable(context.Background(), tx, mtreeTableName) if err != nil { return fmt.Errorf("failed to render drop mtree table sql: %w", err) @@ -1590,7 +1616,8 @@ func (m *MerkleTreeTask) createMtreeObjects(tx pgx.Tx, totalRows int64, numBlock keyTypeColumns[i] = fmt.Sprintf("%s %s", pgx.Identifier{col}.Sanitize(), colType) } - compositeTypeName := fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table) + compositeTypeIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)} + compositeTypeName := compositeTypeIdentifier.Sanitize() err = queries.DropCompositeType(context.Background(), tx, compositeTypeName) if err != nil { diff --git a/pkg/config/config.go b/pkg/config/config.go index 33ec12a..d6cf2af 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -54,7 +54,8 @@ type MTreeConfig struct { SlotName string `yaml:"slot_name"` PublicationName string `yaml:"publication_name"` } `yaml:"cdc"` - Diff struct { + Schema string `yaml:"schema"` + Diff struct { MinBlockSize int `yaml:"min_block_size"` BlockSize int `yaml:"block_size"` MaxBlockSize int `yaml:"max_block_size"`