Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mtree:
cdc:
slot_name: "ace_mtree_slot"
publication_name: "ace_mtree_pub"
schema: "spock"

diff:
min_block_size: 1000
Expand Down
17 changes: 17 additions & 0 deletions db/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2715,3 +2715,20 @@ func UpdateMtreeCounters(ctx context.Context, db DBQuerier, mtreeTable string, i
_, err = db.Exec(ctx, sql, args)
return err
}

func CreateSchema(ctx context.Context, db DBQuerier, schemaName string) error {
data := map[string]interface{}{
"SchemaName": schemaName,
}
sql, err := RenderSQL(SQLTemplates.CreateSchema, data)
if err != nil {
return fmt.Errorf("failed to render CreateSchema SQL: %w", err)
}

_, err = db.Exec(ctx, sql)
if err != nil {
return fmt.Errorf("query to create schema failed: %w", err)
}

return nil
}
28 changes: 16 additions & 12 deletions db/queries/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ type Templates struct {
GetReplicationSlotPID *template.Template
TerminateBackend *template.Template
CheckPIDExists *template.Template
CreateSchema *template.Template
}

var SQLTemplates = Templates{
// A template isn't needed for this query; just keeping the struct uniform
CreateMetadataTable: template.Must(template.New("createMetadataTable").Parse(`
CREATE TABLE IF NOT EXISTS ace_mtree_metadata (
CREATE TABLE IF NOT EXISTS spock.ace_mtree_metadata (
schema_name text,
table_name text,
total_rows bigint,
Expand Down Expand Up @@ -151,7 +152,7 @@ var SQLTemplates = Templates{

UpdateCDCMetadata: template.Must(template.New("updateCdcMetadata").Parse(`
INSERT INTO
ace_mtree_cdc_metadata (
spock.ace_mtree_cdc_metadata (
publication_name,
slot_name,
start_lsn,
Expand Down Expand Up @@ -191,7 +192,7 @@ var SQLTemplates = Templates{
SELECT pid FROM pg_stat_activity WHERE pid = $1
`)),
DropCDCMetadataTable: template.Must(template.New("dropCDCMetadataTable").Parse(`
DROP TABLE IF EXISTS ace_mtree_cdc_metadata
DROP TABLE IF EXISTS spock.ace_mtree_cdc_metadata
`)),

GetCDCMetadata: template.Must(template.New("getCDCMetadata").Parse(`
Expand All @@ -200,7 +201,7 @@ var SQLTemplates = Templates{
start_lsn,
tables
FROM
ace_mtree_cdc_metadata
spock.ace_mtree_cdc_metadata
WHERE
publication_name = $1
`)),
Expand Down Expand Up @@ -288,7 +289,7 @@ var SQLTemplates = Templates{
`)),

CreateCDCMetadataTable: template.Must(template.New("createCDCMetadataTable").Parse(`
CREATE TABLE IF NOT EXISTS ace_mtree_cdc_metadata (
CREATE TABLE IF NOT EXISTS spock.ace_mtree_cdc_metadata (
publication_name text PRIMARY KEY,
slot_name text,
start_lsn text,
Expand Down Expand Up @@ -719,7 +720,7 @@ var SQLTemplates = Templates{
`)),
CreateXORFunction: template.Must(template.New("createXORFunction").Parse(`
CREATE
OR REPLACE FUNCTION bytea_xor(a bytea, b bytea) RETURNS bytea AS $$
OR REPLACE FUNCTION spock.bytea_xor(a bytea, b bytea) RETURNS bytea AS $$
DECLARE
result bytea;
len int;
Expand Down Expand Up @@ -750,7 +751,7 @@ var SQLTemplates = Templates{
CREATE OPERATOR # (
LEFTARG = bytea,
RIGHTARG = bytea,
PROCEDURE = bytea_xor
PROCEDURE = spock.bytea_xor
);
END IF;
END $$;
Expand Down Expand Up @@ -787,7 +788,7 @@ var SQLTemplates = Templates{
`)),
UpdateMetadata: template.Must(template.New("updateMetadata").Parse(`
INSERT INTO
ace_mtree_metadata (
spock.ace_mtree_metadata (
schema_name,
table_name,
total_rows,
Expand Down Expand Up @@ -1001,7 +1002,7 @@ var SQLTemplates = Templates{
SELECT
total_rows
FROM
ace_mtree_metadata
spock.ace_mtree_metadata
WHERE
schema_name = $1
AND table_name = $2
Expand Down Expand Up @@ -1272,7 +1273,7 @@ var SQLTemplates = Templates{
SELECT
block_size
FROM
ace_mtree_metadata
spock.ace_mtree_metadata
WHERE
schema_name = $1
AND table_name = $2
Expand All @@ -1292,10 +1293,10 @@ var SQLTemplates = Templates{
{{.WhereClause}}
`)),
DropXORFunction: template.Must(template.New("dropXORFunction").Parse(`
DROP FUNCTION IF EXISTS bytea_xor(bytea, bytea) CASCADE
DROP FUNCTION IF EXISTS spock.bytea_xor(bytea, bytea) CASCADE
`)),
DropMetadataTable: template.Must(template.New("dropMetadataTable").Parse(`
DROP TABLE IF EXISTS ace_mtree_metadata CASCADE
DROP TABLE IF EXISTS spock.ace_mtree_metadata CASCADE
`)),
DropMtreeTable: template.Must(template.New("dropMtreeTable").Parse(`
DROP TABLE IF EXISTS {{.MtreeTable}} CASCADE
Expand Down Expand Up @@ -1452,4 +1453,7 @@ var SQLTemplates = Templates{
UpdateNodePositionsWithOffset: template.Must(template.New("updateNodePositionsWithOffset").Parse(`
UPDATE {{.MtreeTable}} SET node_position = node_position + $1 WHERE node_level = 0
`)),
CreateSchema: template.Must(template.New("createSchema").Parse(`
CREATE SCHEMA IF NOT EXISTS {{.SchemaName}}
`)),
}
65 changes: 46 additions & 19 deletions internal/core/merkle_trees.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ import (
"github.com/vbauerster/mpb/v8/decor"
)

func aceSchema() string {
return config.Cfg.MTree.Schema
}

const tableAlreadyInPublicationError = "42710"

type MerkleTreeTask struct {
Expand Down Expand Up @@ -327,6 +331,11 @@ func (m *MerkleTreeTask) MtreeInit() error {
}
defer pool.Close()

err = queries.CreateSchema(context.Background(), pool, aceSchema())
if err != nil {
return fmt.Errorf("failed to create schema '%s': %w", aceSchema(), err)
}

tx, err := pool.Begin(context.Background())
if err != nil {
return fmt.Errorf("failed to begin transaction on node %s: %w", nodeInfo["Name"], err)
Expand Down Expand Up @@ -818,7 +827,8 @@ func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) error {
var compositeTypeName string

if !m.SimplePrimaryKey {
compositeTypeName = fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)
compositeTypeIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)}
compositeTypeName = compositeTypeIdentifier.Sanitize()
dt, err := conn.Conn().LoadType(context.Background(), compositeTypeName)
if err != nil {
return fmt.Errorf("failed to load composite type %s: %w", compositeTypeName, err)
Expand All @@ -832,7 +842,8 @@ func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) error {
}
defer tx.Rollback(context.Background())

mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)
mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)}
mtreeTableName := mtreeTableIdentifier.Sanitize()

blocksToUpdate, err := queries.GetDirtyAndNewBlocks(context.Background(), tx, mtreeTableName, m.SimplePrimaryKey, m.Key)
if err != nil {
Expand Down Expand Up @@ -936,12 +947,14 @@ func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) error {
}

func (m *MerkleTreeTask) splitBlocks(tx pgx.Tx, blocksToSplit []types.BlockRange) ([]int64, error) {
mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)
mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)}
mtreeTableName := mtreeTableIdentifier.Sanitize()
isComposite := !m.SimplePrimaryKey
ctx := context.Background()
var modifiedPositions []int64

compositeTypeName := fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)
compositeTypeIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)}
compositeTypeName := compositeTypeIdentifier.Sanitize()

currentBlocks := make([]types.BlockRange, len(blocksToSplit))
copy(currentBlocks, blocksToSplit)
Expand Down Expand Up @@ -1057,7 +1070,8 @@ func (m *MerkleTreeTask) splitBlocks(tx pgx.Tx, blocksToSplit []types.BlockRange

func (m *MerkleTreeTask) performMerges(tx pgx.Tx) ([]int64, error) {
var allModifiedPositions []int64
mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)
mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)}
mtreeTableName := mtreeTableIdentifier.Sanitize()

for {
blocksToMerge, err := queries.FindBlocksToMerge(context.Background(), tx, mtreeTableName, m.SimplePrimaryKey, m.Schema, m.Table, m.Key, 0.25, []int64{})
Expand Down Expand Up @@ -1105,7 +1119,8 @@ func (m *MerkleTreeTask) DiffMtree() error {
return fmt.Errorf("failed to update merkle tree before diff: %w", err)
}
nodePairs := getNodePairs(m.ClusterNodes)
mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)
mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)}
mtreeTableName := mtreeTableIdentifier.Sanitize()

allNodePairBatches := make(map[string]CompareRangesWorkItem)

Expand Down Expand Up @@ -1216,7 +1231,8 @@ func (m *MerkleTreeTask) DiffMtree() error {

func (m *MerkleTreeTask) findMismatchedLeaves(pool1, pool2 *pgxpool.Pool, parentLevel int, parentPosition int64) (map[int64]bool, error) {
mismatched := make(map[int64]bool)
mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)
mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)}
mtreeTableName := mtreeTableIdentifier.Sanitize()

children1, err := queries.GetNodeChildren(context.Background(), pool1, mtreeTableName, parentLevel, int(parentPosition))
if err != nil {
Expand Down Expand Up @@ -1279,7 +1295,8 @@ func (m *MerkleTreeTask) findMismatchedLeaves(pool1, pool2 *pgxpool.Pool, parent
}

func (m *MerkleTreeTask) getPkeyBatches(pool1, pool2 *pgxpool.Pool, mismatchedPositions []int64) ([][2][]any, error) {
mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)
mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)}
mtreeTableName := mtreeTableIdentifier.Sanitize()

leafRanges1, err := queries.GetLeafRanges(context.Background(), pool1, mtreeTableName, mismatchedPositions, m.SimplePrimaryKey, m.Key)
if err != nil {
Expand Down Expand Up @@ -1359,12 +1376,14 @@ func intervalIntersects(start, end any, allRanges []types.LeafRange) bool {
}

func (m *MerkleTreeTask) mergeBlocks(tx pgx.Tx, blocksToMerge []types.BlockRange) ([]int64, error) {
mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)
mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)}
mtreeTableName := mtreeTableIdentifier.Sanitize()
isComposite := !m.SimplePrimaryKey
ctx := context.Background()
var modifiedPositions []int64

compositeTypeName := fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)
compositeTypeIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)}
compositeTypeName := compositeTypeIdentifier.Sanitize()

if err := queries.DeleteParentNodes(ctx, tx, mtreeTableName); err != nil {
return nil, fmt.Errorf("failed to delete parent nodes: %w", err)
Expand Down Expand Up @@ -1414,7 +1433,8 @@ func (m *MerkleTreeTask) mergeBlocks(tx pgx.Tx, blocksToMerge []types.BlockRange
}

func (m *MerkleTreeTask) buildParentNodes(conn queries.DBQuerier) error {
mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)
mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)}
mtreeTableName := mtreeTableIdentifier.Sanitize()

var err error
if tx, ok := conn.(pgx.Tx); ok {
Expand Down Expand Up @@ -1460,7 +1480,8 @@ type LeafHashResult struct {
}

func (m *MerkleTreeTask) computeLeafHashes(pool *pgxpool.Pool, tx pgx.Tx, ranges []types.BlockRange) error {
mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)
mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)}
mtreeTableName := mtreeTableIdentifier.Sanitize()

numWorkers := int(float64(runtime.NumCPU()) * m.MaxCpuRatio)
if numWorkers < 1 {
Expand Down Expand Up @@ -1532,15 +1553,14 @@ func (m *MerkleTreeTask) leafHashWorker(wg *sync.WaitGroup, jobs <-chan types.Bl
}

func (m *MerkleTreeTask) insertBlockRanges(conn queries.DBQuerier, ranges []types.BlockRange) error {
mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)
mtreeTableIdent := pgx.Identifier{mtreeTableName}
mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)}

if m.SimplePrimaryKey {
if err := queries.InsertBlockRangesBatchSimple(context.Background(), conn, mtreeTableIdent.Sanitize(), ranges); err != nil {
if err := queries.InsertBlockRangesBatchSimple(context.Background(), conn, mtreeTableIdentifier.Sanitize(), ranges); err != nil {
return err
}
} else {
if err := queries.InsertBlockRangesBatchComposite(context.Background(), conn, mtreeTableIdent.Sanitize(), ranges, len(m.Key)); err != nil {
if err := queries.InsertBlockRangesBatchComposite(context.Background(), conn, mtreeTableIdentifier.Sanitize(), ranges, len(m.Key)); err != nil {
return err
}
}
Expand All @@ -1550,7 +1570,12 @@ func (m *MerkleTreeTask) insertBlockRanges(conn queries.DBQuerier, ranges []type

func (m *MerkleTreeTask) createMtreeObjects(tx pgx.Tx, totalRows int64, numBlocks int) error {

err := queries.CreateXORFunction(context.Background(), tx)
err := queries.CreateSchema(context.Background(), tx, aceSchema())
if err != nil {
return fmt.Errorf("failed to create schema '%s': %w", aceSchema(), err)
}

err = queries.CreateXORFunction(context.Background(), tx)
if err != nil {
return fmt.Errorf("failed to create xor function: %w", err)
}
Expand All @@ -1565,7 +1590,8 @@ func (m *MerkleTreeTask) createMtreeObjects(tx pgx.Tx, totalRows int64, numBlock
return fmt.Errorf("failed to update metadata: %w", err)
}

mtreeTableName := fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)
mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)}
mtreeTableName := mtreeTableIdentifier.Sanitize()
err = queries.DropMtreeTable(context.Background(), tx, mtreeTableName)
if err != nil {
return fmt.Errorf("failed to render drop mtree table sql: %w", err)
Expand All @@ -1590,7 +1616,8 @@ func (m *MerkleTreeTask) createMtreeObjects(tx pgx.Tx, totalRows int64, numBlock
keyTypeColumns[i] = fmt.Sprintf("%s %s", pgx.Identifier{col}.Sanitize(), colType)
}

compositeTypeName := fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)
compositeTypeIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)}
compositeTypeName := compositeTypeIdentifier.Sanitize()

err = queries.DropCompositeType(context.Background(), tx, compositeTypeName)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type MTreeConfig struct {
SlotName string `yaml:"slot_name"`
PublicationName string `yaml:"publication_name"`
} `yaml:"cdc"`
Diff struct {
Schema string `yaml:"schema"`
Diff struct {
MinBlockSize int `yaml:"min_block_size"`
BlockSize int `yaml:"block_size"`
MaxBlockSize int `yaml:"max_block_size"`
Expand Down