From 103acb6cf72fd04be02ae051f750e455a6627fad Mon Sep 17 00:00:00 2001 From: Tej Kashi Date: Wed, 21 May 2025 10:41:45 -0400 Subject: [PATCH] Use unsampled/raw pkey offsets when row_count < 10k * Add support for choosing dbname during diff --- db/helpers/sqlhelpers.go | 40 ++--------- internal/cli/cli.go | 7 +- internal/core/table_diff.go | 139 ++++++++++++++++++++++++++++++------ internal/core/utils.go | 7 ++ 4 files changed, 131 insertions(+), 62 deletions(-) diff --git a/db/helpers/sqlhelpers.go b/db/helpers/sqlhelpers.go index ccece54..eb03ea1 100644 --- a/db/helpers/sqlhelpers.go +++ b/db/helpers/sqlhelpers.go @@ -122,40 +122,6 @@ func AvgColumnSize(ctx context.Context, pool *pgxpool.Pool, schema, table, colum return avgSize, nil } -func BlockHash(ctx context.Context, pool *pgxpool.Pool, schema, table string, cols []string, primaryKey string, start interface{}, end interface{}) (string, error) { - if err := SanitiseIdentifier(schema); err != nil { - return "", err - } - if err := SanitiseIdentifier(table); err != nil { - return "", err - } - if err := SanitiseIdentifier(primaryKey); err != nil { - return "", err - } - for _, col := range cols { - if err := SanitiseIdentifier(col); err != nil { - return "", err - } - } - schemaIdent := fmt.Sprintf("\"%s\"", schema) - tableIdent := fmt.Sprintf("\"%s\"", table) - primaryIdent := fmt.Sprintf("\"%s\"", primaryKey) - var colIdents []string - for _, col := range cols { - colIdents = append(colIdents, fmt.Sprintf("\"%s\"", col)) - } - colsList := strings.Join(colIdents, ", ") - query := fmt.Sprintf( - `SELECT encode(digest(COALESCE(string_agg(concat_ws('|', %s),'|' ORDER BY %s),'EMPTY_BLOCK'),'sha256'),'hex') FROM %s.%s WHERE %s >= $1 AND %s < $2`, - colsList, primaryIdent, schemaIdent, tableIdent, primaryIdent, primaryIdent, - ) - var hash string - if err := pool.QueryRow(ctx, query, start, end).Scan(&hash); err != nil { - return "", fmt.Errorf("BlockHash query failed for %s.%s range %v-%v: %w", schema, table, start, end, err) - } - return hash, nil -} - func GeneratePkeyOffsetsQuery( schema, table string, keyColumns []string, @@ -225,7 +191,6 @@ func GeneratePkeyOffsetsQuery( return buf.String(), nil } -// BlockHashSQL returns the SQL string for hashing a block of rows using the given schema, table, columns, and primary key. func BlockHashSQL(schema, table string, cols []string, primaryKey string) (string, error) { if err := SanitiseIdentifier(schema); err != nil { return "", err @@ -250,7 +215,10 @@ func BlockHashSQL(schema, table string, cols []string, primaryKey string) (strin } colsList := strings.Join(colIdents, ", ") query := fmt.Sprintf( - `SELECT encode(digest(COALESCE(string_agg(concat_ws('|', %s),'|' ORDER BY %s),'EMPTY_BLOCK'),'sha256'),'hex') FROM %s.%s WHERE %s >= $1 AND %s < $2`, + `SELECT encode(digest(COALESCE(string_agg(concat_ws('|', %s),'|' ORDER BY %s),'EMPTY_BLOCK'),'sha1'),'hex') + FROM %s.%s + WHERE ($1::boolean OR %s >= $2) + AND ($3::boolean OR %s < $4)`, colsList, primaryIdent, schemaIdent, tableIdent, primaryIdent, primaryIdent, ) return query, nil diff --git a/internal/cli/cli.go b/internal/cli/cli.go index 50b9de0..f5d25cc 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -67,9 +67,10 @@ func SetupCLI() *cli.App { Value: false, }, &cli.BoolFlag{ - Name: "debug", - Usage: "Enable debug logging", - Value: false, + Name: "debug", + Aliases: []string{"v"}, + Usage: "Enable debug logging", + Value: false, }, } app := &cli.App{ diff --git a/internal/core/table_diff.go b/internal/core/table_diff.go index c5f0e8f..df8ec0b 100644 --- a/internal/core/table_diff.go +++ b/internal/core/table_diff.go @@ -864,32 +864,53 @@ func (t *TableDiffTask) ExecuteTask(debugMode bool) error { default: samplePercent = 100 } - ntileCount := int(math.Ceil(float64(maxCount) / float64(t.BlockSize))) - if ntileCount == 0 && maxCount > 0 { - ntileCount = 1 - } - - querySQL, err := helpers.GeneratePkeyOffsetsQuery(t.Schema, t.Table, t.Key, sampleMethod, samplePercent, ntileCount) - if err != nil { - return fmt.Errorf("failed to generate offsets query: %w", err) - } - pkRangesRows, err := pools[maxNode].Query(ctx, querySQL) - if err != nil { - return fmt.Errorf("offsets query execution failed on %s: %w", maxNode, err) - } - defer pkRangesRows.Close() var ranges []Range - for pkRangesRows.Next() { - var startVal, endVal any - if err := pkRangesRows.Scan(&startVal, &endVal); err != nil { - return fmt.Errorf("scanning offset row failed: %w", err) + // Determine if we should use direct PKey offset generation + if (maxCount > 0 && maxCount <= 10000) || t.TableFilter != "" { + logger.Info("Using direct primary key offset generation for table %s.%s (maxCount: %d, tableFilter: '%s')", + t.Schema, t.Table, maxCount, t.TableFilter) + r, err := t.getPkeyOffsets(ctx, pools[maxNode]) + if err != nil { + return fmt.Errorf("failed to get pkey offsets directly: %w", err) + } + ranges = r + } else { + ntileCount := int(math.Ceil(float64(maxCount) / float64(t.BlockSize))) + if ntileCount == 0 && maxCount > 0 { + ntileCount = 1 + } + + querySQL, err := helpers.GeneratePkeyOffsetsQuery(t.Schema, t.Table, t.Key, sampleMethod, samplePercent, ntileCount) + logger.Debug("Generated offsets query: %s", querySQL) + if err != nil { + return fmt.Errorf("failed to generate offsets query: %w", err) + } + pkRangesRows, err := pools[maxNode].Query(ctx, querySQL) + if err != nil { + return fmt.Errorf("offsets query execution failed on %s: %w", maxNode, err) + } + defer pkRangesRows.Close() + + for pkRangesRows.Next() { + var startVal, endVal any + if err := pkRangesRows.Scan(&startVal, &endVal); err != nil { + return fmt.Errorf("scanning offset row failed: %w", err) + } + ranges = append(ranges, Range{Start: startVal, End: endVal}) + } + if err := pkRangesRows.Err(); err != nil { + return fmt.Errorf("offset rows iteration error: %w", err) + } + + // Prepend (nil, first_original_start) only if ranges were actually generated and the first doesn't already start with nil. + if len(ranges) > 0 && ranges[0].Start != nil { + firstOriginalStart := ranges[0].Start + newInitialRange := Range{Start: nil, End: firstOriginalStart} + ranges = append([]Range{newInitialRange}, ranges...) } - ranges = append(ranges, Range{Start: startVal, End: endVal}) - } - if err := pkRangesRows.Err(); err != nil { - return fmt.Errorf("offset rows iteration error: %w", err) } + logger.Info("Created %d initial ranges to compare", len(ranges)) logger.Debug("Ranges: %v", ranges) t.DiffResult.Summary.InitialRangesCount = len(ranges) @@ -1041,10 +1062,19 @@ func (t *TableDiffTask) hashRange( } startTime := time.Now() var hash string + var skipMinCheck bool + var skipMaxCheck bool + + if r.Start == nil { + skipMinCheck = true + } + if r.End == nil { + skipMaxCheck = true + } logger.Debug("[%s] Hashing range: Start=%v, End=%v", node, r.Start, r.End) - err := pool.QueryRow(ctx, t.BlockHashSQL, r.Start, r.End).Scan(&hash) + err := pool.QueryRow(ctx, t.BlockHashSQL, skipMinCheck, r.Start, skipMaxCheck, r.End).Scan(&hash) if err != nil { duration := time.Since(startTime) @@ -1324,3 +1354,66 @@ func safeCut(s string, n int) string { } return s[:n] } + +func (t *TableDiffTask) getPkeyOffsets(ctx context.Context, pool *pgxpool.Pool) ([]Range, error) { + // TODO: Add support for composite keys. + if len(t.Key) == 0 { + return nil, fmt.Errorf("primary key not defined for table %s.%s", t.Schema, t.Table) + } + primaryKeyColumn := t.Key[0] + + schemaIdent := sanitise(t.Schema) + tableIdent := sanitise(t.Table) + pkIdent := sanitise(primaryKeyColumn) + + querySQL := fmt.Sprintf("SELECT %s FROM %s.%s ORDER BY %s", pkIdent, schemaIdent, tableIdent, pkIdent) + + pgRows, err := pool.Query(ctx, querySQL) + if err != nil { + return nil, fmt.Errorf("failed to query primary keys for direct offset generation from %s.%s: %w", t.Schema, t.Table, err) + } + defer pgRows.Close() + + var allPks []any + for pgRows.Next() { + var pkVal any + if err := pgRows.Scan(&pkVal); err != nil { + return nil, fmt.Errorf("failed to scan primary key value from %s.%s: %w", t.Schema, t.Table, err) + } + allPks = append(allPks, pkVal) + } + + if err := pgRows.Err(); err != nil { + return nil, fmt.Errorf("error iterating over primary key rows from %s.%s: %w", t.Schema, t.Table, err) + } + + if len(allPks) == 0 { + logger.Info("[%s.%s] No primary key values found, returning empty ranges for direct generation.", t.Schema, t.Table) + return []Range{}, nil + } + + var ranges []Range + // As always the case, our first range needs to be (NULL, first_pkey) + ranges = append(ranges, Range{Start: nil, End: allPks[0]}) + + currentPkIndex := 0 + for currentPkIndex < len(allPks) { + currentBlockStartPkey := allPks[currentPkIndex] + + nextPKeyIndexForRangeEnd := currentPkIndex + t.BlockSize + + if nextPKeyIndexForRangeEnd < len(allPks) { + rangeEndValue := allPks[nextPKeyIndexForRangeEnd] + ranges = append(ranges, Range{Start: currentBlockStartPkey, End: rangeEndValue}) + currentPkIndex = nextPKeyIndexForRangeEnd + } else { + if !(currentPkIndex == 0 && len(allPks) <= t.BlockSize && len(allPks) == 1) { + ranges = append(ranges, Range{Start: currentBlockStartPkey, End: nil}) + } + break + } + } + + logger.Debug("[%s.%s] Generated %d ranges without sampling from %d pkeys with block_size %d.", t.Schema, t.Table, len(ranges), len(allPks), t.BlockSize) + return ranges, nil +} diff --git a/internal/core/utils.go b/internal/core/utils.go index b4db55e..dcfc2f2 100644 --- a/internal/core/utils.go +++ b/internal/core/utils.go @@ -259,6 +259,13 @@ func readClusterInfo(t *TableDiffTask) error { if t.DBName == "" && len(config.PGEdge.Databases) > 0 { t.DBName = config.PGEdge.Databases[0].DBName t.Database = config.PGEdge.Databases[0] + } else if t.DBName != "" { + for _, db := range config.PGEdge.Databases { + if db.DBName == t.DBName { + t.Database = db + break + } + } } t.ClusterNodes = []map[string]any{}