Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 4 additions & 36 deletions db/helpers/sqlhelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,40 +122,6 @@ func AvgColumnSize(ctx context.Context, pool *pgxpool.Pool, schema, table, colum
return avgSize, nil
}

func BlockHash(ctx context.Context, pool *pgxpool.Pool, schema, table string, cols []string, primaryKey string, start interface{}, end interface{}) (string, error) {
if err := SanitiseIdentifier(schema); err != nil {
return "", err
}
if err := SanitiseIdentifier(table); err != nil {
return "", err
}
if err := SanitiseIdentifier(primaryKey); err != nil {
return "", err
}
for _, col := range cols {
if err := SanitiseIdentifier(col); err != nil {
return "", err
}
}
schemaIdent := fmt.Sprintf("\"%s\"", schema)
tableIdent := fmt.Sprintf("\"%s\"", table)
primaryIdent := fmt.Sprintf("\"%s\"", primaryKey)
var colIdents []string
for _, col := range cols {
colIdents = append(colIdents, fmt.Sprintf("\"%s\"", col))
}
colsList := strings.Join(colIdents, ", ")
query := fmt.Sprintf(
`SELECT encode(digest(COALESCE(string_agg(concat_ws('|', %s),'|' ORDER BY %s),'EMPTY_BLOCK'),'sha256'),'hex') FROM %s.%s WHERE %s >= $1 AND %s < $2`,
colsList, primaryIdent, schemaIdent, tableIdent, primaryIdent, primaryIdent,
)
var hash string
if err := pool.QueryRow(ctx, query, start, end).Scan(&hash); err != nil {
return "", fmt.Errorf("BlockHash query failed for %s.%s range %v-%v: %w", schema, table, start, end, err)
}
return hash, nil
}

func GeneratePkeyOffsetsQuery(
schema, table string,
keyColumns []string,
Expand Down Expand Up @@ -225,7 +191,6 @@ func GeneratePkeyOffsetsQuery(
return buf.String(), nil
}

// BlockHashSQL returns the SQL string for hashing a block of rows using the given schema, table, columns, and primary key.
func BlockHashSQL(schema, table string, cols []string, primaryKey string) (string, error) {
if err := SanitiseIdentifier(schema); err != nil {
return "", err
Expand All @@ -250,7 +215,10 @@ func BlockHashSQL(schema, table string, cols []string, primaryKey string) (strin
}
colsList := strings.Join(colIdents, ", ")
query := fmt.Sprintf(
`SELECT encode(digest(COALESCE(string_agg(concat_ws('|', %s),'|' ORDER BY %s),'EMPTY_BLOCK'),'sha256'),'hex') FROM %s.%s WHERE %s >= $1 AND %s < $2`,
`SELECT encode(digest(COALESCE(string_agg(concat_ws('|', %s),'|' ORDER BY %s),'EMPTY_BLOCK'),'sha1'),'hex')
FROM %s.%s
WHERE ($1::boolean OR %s >= $2)
AND ($3::boolean OR %s < $4)`,
colsList, primaryIdent, schemaIdent, tableIdent, primaryIdent, primaryIdent,
)
return query, nil
Expand Down
7 changes: 4 additions & 3 deletions internal/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ func SetupCLI() *cli.App {
Value: false,
},
&cli.BoolFlag{
Name: "debug",
Usage: "Enable debug logging",
Value: false,
Name: "debug",
Aliases: []string{"v"},
Usage: "Enable debug logging",
Value: false,
},
}
app := &cli.App{
Expand Down
139 changes: 116 additions & 23 deletions internal/core/table_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,32 +864,53 @@ func (t *TableDiffTask) ExecuteTask(debugMode bool) error {
default:
samplePercent = 100
}
ntileCount := int(math.Ceil(float64(maxCount) / float64(t.BlockSize)))
if ntileCount == 0 && maxCount > 0 {
ntileCount = 1
}

querySQL, err := helpers.GeneratePkeyOffsetsQuery(t.Schema, t.Table, t.Key, sampleMethod, samplePercent, ntileCount)
if err != nil {
return fmt.Errorf("failed to generate offsets query: %w", err)
}
pkRangesRows, err := pools[maxNode].Query(ctx, querySQL)
if err != nil {
return fmt.Errorf("offsets query execution failed on %s: %w", maxNode, err)
}
defer pkRangesRows.Close()

var ranges []Range
for pkRangesRows.Next() {
var startVal, endVal any
if err := pkRangesRows.Scan(&startVal, &endVal); err != nil {
return fmt.Errorf("scanning offset row failed: %w", err)
// Determine if we should use direct PKey offset generation
if (maxCount > 0 && maxCount <= 10000) || t.TableFilter != "" {
logger.Info("Using direct primary key offset generation for table %s.%s (maxCount: %d, tableFilter: '%s')",
t.Schema, t.Table, maxCount, t.TableFilter)
r, err := t.getPkeyOffsets(ctx, pools[maxNode])
if err != nil {
return fmt.Errorf("failed to get pkey offsets directly: %w", err)
}
ranges = r
} else {
ntileCount := int(math.Ceil(float64(maxCount) / float64(t.BlockSize)))
if ntileCount == 0 && maxCount > 0 {
ntileCount = 1
}

querySQL, err := helpers.GeneratePkeyOffsetsQuery(t.Schema, t.Table, t.Key, sampleMethod, samplePercent, ntileCount)
logger.Debug("Generated offsets query: %s", querySQL)
if err != nil {
return fmt.Errorf("failed to generate offsets query: %w", err)
}
pkRangesRows, err := pools[maxNode].Query(ctx, querySQL)
if err != nil {
return fmt.Errorf("offsets query execution failed on %s: %w", maxNode, err)
}
defer pkRangesRows.Close()

for pkRangesRows.Next() {
var startVal, endVal any
if err := pkRangesRows.Scan(&startVal, &endVal); err != nil {
return fmt.Errorf("scanning offset row failed: %w", err)
}
ranges = append(ranges, Range{Start: startVal, End: endVal})
}
if err := pkRangesRows.Err(); err != nil {
return fmt.Errorf("offset rows iteration error: %w", err)
}

// Prepend (nil, first_original_start) only if ranges were actually generated and the first doesn't already start with nil.
if len(ranges) > 0 && ranges[0].Start != nil {
firstOriginalStart := ranges[0].Start
newInitialRange := Range{Start: nil, End: firstOriginalStart}
ranges = append([]Range{newInitialRange}, ranges...)
}
ranges = append(ranges, Range{Start: startVal, End: endVal})
}
if err := pkRangesRows.Err(); err != nil {
return fmt.Errorf("offset rows iteration error: %w", err)
}

logger.Info("Created %d initial ranges to compare", len(ranges))
logger.Debug("Ranges: %v", ranges)
t.DiffResult.Summary.InitialRangesCount = len(ranges)
Expand Down Expand Up @@ -1041,10 +1062,19 @@ func (t *TableDiffTask) hashRange(
}
startTime := time.Now()
var hash string
var skipMinCheck bool
var skipMaxCheck bool

if r.Start == nil {
skipMinCheck = true
}
if r.End == nil {
skipMaxCheck = true
}

logger.Debug("[%s] Hashing range: Start=%v, End=%v", node, r.Start, r.End)

err := pool.QueryRow(ctx, t.BlockHashSQL, r.Start, r.End).Scan(&hash)
err := pool.QueryRow(ctx, t.BlockHashSQL, skipMinCheck, r.Start, skipMaxCheck, r.End).Scan(&hash)

if err != nil {
duration := time.Since(startTime)
Expand Down Expand Up @@ -1324,3 +1354,66 @@ func safeCut(s string, n int) string {
}
return s[:n]
}

func (t *TableDiffTask) getPkeyOffsets(ctx context.Context, pool *pgxpool.Pool) ([]Range, error) {
// TODO: Add support for composite keys.
if len(t.Key) == 0 {
return nil, fmt.Errorf("primary key not defined for table %s.%s", t.Schema, t.Table)
}
primaryKeyColumn := t.Key[0]

schemaIdent := sanitise(t.Schema)
tableIdent := sanitise(t.Table)
pkIdent := sanitise(primaryKeyColumn)

querySQL := fmt.Sprintf("SELECT %s FROM %s.%s ORDER BY %s", pkIdent, schemaIdent, tableIdent, pkIdent)

pgRows, err := pool.Query(ctx, querySQL)
if err != nil {
return nil, fmt.Errorf("failed to query primary keys for direct offset generation from %s.%s: %w", t.Schema, t.Table, err)
}
defer pgRows.Close()

var allPks []any
for pgRows.Next() {
var pkVal any
if err := pgRows.Scan(&pkVal); err != nil {
return nil, fmt.Errorf("failed to scan primary key value from %s.%s: %w", t.Schema, t.Table, err)
}
allPks = append(allPks, pkVal)
}

if err := pgRows.Err(); err != nil {
return nil, fmt.Errorf("error iterating over primary key rows from %s.%s: %w", t.Schema, t.Table, err)
}

if len(allPks) == 0 {
logger.Info("[%s.%s] No primary key values found, returning empty ranges for direct generation.", t.Schema, t.Table)
return []Range{}, nil
}

var ranges []Range
// As always the case, our first range needs to be (NULL, first_pkey)
ranges = append(ranges, Range{Start: nil, End: allPks[0]})

currentPkIndex := 0
for currentPkIndex < len(allPks) {
currentBlockStartPkey := allPks[currentPkIndex]

nextPKeyIndexForRangeEnd := currentPkIndex + t.BlockSize

if nextPKeyIndexForRangeEnd < len(allPks) {
rangeEndValue := allPks[nextPKeyIndexForRangeEnd]
ranges = append(ranges, Range{Start: currentBlockStartPkey, End: rangeEndValue})
currentPkIndex = nextPKeyIndexForRangeEnd
} else {
if !(currentPkIndex == 0 && len(allPks) <= t.BlockSize && len(allPks) == 1) {
ranges = append(ranges, Range{Start: currentBlockStartPkey, End: nil})
}
break
}
}

logger.Debug("[%s.%s] Generated %d ranges without sampling from %d pkeys with block_size %d.", t.Schema, t.Table, len(ranges), len(allPks), t.BlockSize)
return ranges, nil
}
7 changes: 7 additions & 0 deletions internal/core/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,13 @@ func readClusterInfo(t *TableDiffTask) error {
if t.DBName == "" && len(config.PGEdge.Databases) > 0 {
t.DBName = config.PGEdge.Databases[0].DBName
t.Database = config.PGEdge.Databases[0]
} else if t.DBName != "" {
for _, db := range config.PGEdge.Databases {
if db.DBName == t.DBName {
t.Database = db
break
}
}
}

t.ClusterNodes = []map[string]any{}
Expand Down