From 6177299bf50b5b7362f153131cfd46a22e6a55c1 Mon Sep 17 00:00:00 2001 From: Tej Kashi Date: Mon, 21 Jul 2025 12:44:48 -0400 Subject: [PATCH 1/3] WIP: Cleaning up logging across modules --- cmd/server/main.go | 8 ++--- internal/cli/cli.go | 37 ++++++++------------- internal/core/table_diff.go | 65 +++++++++++++++++++++++++++---------- internal/core/utils.go | 5 +++ internal/logger/logger.go | 17 ++++++---- 5 files changed, 82 insertions(+), 50 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 9f13fc6..16d68a3 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -12,11 +12,11 @@ package main import ( - "log" "os" "path/filepath" "github.com/pgedge/ace/internal/cli" + "github.com/pgedge/ace/internal/logger" "github.com/pgedge/ace/pkg/config" ) @@ -25,18 +25,18 @@ func main() { if _, err := os.Stat(cfgPath); os.IsNotExist(err) { execPath, err := os.Executable() if err != nil { - log.Fatalf("unable to determine executable path: %v", err) + logger.Fatal("unable to determine executable path: %v", err) } root := filepath.Dir(filepath.Dir(execPath)) cfgPath = filepath.Join(root, "ace.yaml") } if err := config.Init(cfgPath); err != nil { - log.Fatalf("loading config (%s): %v", cfgPath, err) + logger.Fatal("loading config (%s): %v", cfgPath, err) } app := cli.SetupCLI() err := app.Run(os.Args) if err != nil { - log.Fatalf("Error: %v", err) + logger.Error("%v", err) } } diff --git a/internal/cli/cli.go b/internal/cli/cli.go index 94bb7e3..ec792a2 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -309,7 +309,7 @@ func TableDiffCLI(ctx *cli.Context) error { blockSizeStr := ctx.String("block-size") blockSizeInt, err := strconv.ParseInt(blockSizeStr, 10, 64) if err != nil { - return fmt.Errorf("invalid block size '%s': %v", blockSizeStr, err) + return fmt.Errorf("invalid block size '%s': %w", blockSizeStr, err) } task := core.NewTableDiffTask() @@ -326,18 +326,17 @@ func TableDiffCLI(ctx *cli.Context) error { task.OverrideBlockSize = ctx.Bool("override-block-size") if err := task.Validate(); err != nil { - return fmt.Errorf("validation failed: %v", err) + return fmt.Errorf("validation failed: %w", err) } if err := task.RunChecks(true); err != nil { - return fmt.Errorf("checks failed: %v", err) + return fmt.Errorf("checks failed: %w", err) } if err := task.ExecuteTask(); err != nil { - return fmt.Errorf("error during comparison: %v", err) + return fmt.Errorf("error during comparison: %w", err) } - fmt.Println("Table diff completed") return nil } @@ -352,10 +351,9 @@ func TableRerunCLI(ctx *cli.Context) error { task.QuietMode = ctx.Bool("quiet") if err := task.ExecuteRerunTask(); err != nil { - return fmt.Errorf("error during table-rerun: %v", err) + return fmt.Errorf("error during table-rerun: %w", err) } - fmt.Println("Table rerun completed") return nil } @@ -378,14 +376,13 @@ func TableRepairCLI(ctx *cli.Context) error { task.GenerateReport = ctx.Bool("generate-report") if err := task.ValidateAndPrepare(); err != nil { - return fmt.Errorf("validation failed: %v", err) + return fmt.Errorf("validation failed: %w", err) } if err := task.Run(true); err != nil { - return fmt.Errorf("error during table repair: %v", err) + return fmt.Errorf("error during table repair: %w", err) } - fmt.Println("Table repair complete") return nil } @@ -397,18 +394,16 @@ func SpockDiffCLI(ctx *cli.Context) error { task.Output = ctx.String("output") if err := task.Validate(); err != nil { - return fmt.Errorf("validation failed: %v", err) + return fmt.Errorf("validation failed: %w", err) } if err := task.RunChecks(true); err != nil { - return fmt.Errorf("checks failed: %v", err) + return fmt.Errorf("checks failed: %w", err) } if err := task.ExecuteTask(); err != nil { - return fmt.Errorf("error during spock diff: %v", err) + return fmt.Errorf("error during spock diff: %w", err) } - - fmt.Println("Spock diff completed") return nil } @@ -416,7 +411,7 @@ func SchemaDiffCLI(ctx *cli.Context) error { blockSizeStr := ctx.String("block-size") blockSizeInt, err := strconv.ParseInt(blockSizeStr, 10, 64) if err != nil { - return fmt.Errorf("invalid block size '%s': %v", blockSizeStr, err) + return fmt.Errorf("invalid block size '%s': %w", blockSizeStr, err) } task := &core.SchemaDiffCmd{ @@ -437,10 +432,8 @@ func SchemaDiffCLI(ctx *cli.Context) error { task.OverrideBlockSize = ctx.Bool("override-block-size") if err := core.SchemaDiff(task); err != nil { - return fmt.Errorf("error during schema diff: %v", err) + return fmt.Errorf("error during schema diff: %w", err) } - - fmt.Println("Schema diff completed") return nil } @@ -448,7 +441,7 @@ func RepsetDiffCLI(ctx *cli.Context) error { blockSizeStr := ctx.String("block-size") blockSizeInt, err := strconv.ParseInt(blockSizeStr, 10, 64) if err != nil { - return fmt.Errorf("invalid block size '%s': %v", blockSizeStr, err) + return fmt.Errorf("invalid block size '%s': %w", blockSizeStr, err) } task := &core.RepsetDiffCmd{ @@ -468,9 +461,7 @@ func RepsetDiffCLI(ctx *cli.Context) error { task.OverrideBlockSize = ctx.Bool("override-block-size") if err := core.RepsetDiff(task); err != nil { - return fmt.Errorf("error during repset diff: %v", err) + return fmt.Errorf("error during repset diff: %w", err) } - - fmt.Println("Repset diff completed") return nil } diff --git a/internal/core/table_diff.go b/internal/core/table_diff.go index c77dcc9..97ae249 100644 --- a/internal/core/table_diff.go +++ b/internal/core/table_diff.go @@ -969,7 +969,7 @@ func (t *TableDiffTask) ExecuteTask() error { // t.Schema, t.Table, maxCount, t.TableFilter) // r, err := t.getPkeyOffsets(ctx, pools[maxNode]) // if err != nil { - // return fmt.Errorf("failed to get pkey offsets directly: %w", err) + // return logger.Error("failed to get pkey offsets directly: %w", err) // } // ranges = r // } else { @@ -1032,7 +1032,7 @@ func (t *TableDiffTask) ExecuteTask() error { } // } - logger.Info("Created %d initial ranges to compare", len(ranges)) + logger.Debug("Created %d initial ranges to compare", len(ranges)) logger.Debug("Ranges: %v", ranges) t.DiffResult.Summary.InitialRangesCount = len(ranges) @@ -1046,10 +1046,11 @@ func (t *TableDiffTask) ExecuteTask() error { sort.Strings(nodeNames) totalHashTasks := len(nodeNames) * len(ranges) - p := mpb.New() + p := mpb.New(mpb.WithOutput(os.Stderr)) bar := p.AddBar(int64(totalHashTasks), + mpb.BarRemoveOnComplete(), mpb.PrependDecorators( - decor.Name("Hashing ranges: ", decor.WC{W: 18}), + decor.Name("Hashing initial ranges: ", decor.WC{W: 18}), decor.CountersNoUnit("%d / %d", decor.WCSyncWidth), ), mpb.AppendDecorators( @@ -1097,12 +1098,11 @@ func (t *TableDiffTask) ExecuteTask() error { } close(hashTaskQueue) initialHashWg.Wait() - p.Wait() logger.Info("Initial hash calculations complete. Proceeding with comparisons for mismatches...") var diffWg sync.WaitGroup - // var mismatchedRangesCountAtomic int32 + var mismatchedTasks []RecursiveDiffTask for rangeIdx := 0; rangeIdx < len(ranges); rangeIdx++ { currentRange := ranges[rangeIdx] @@ -1121,24 +1121,48 @@ func (t *TableDiffTask) ExecuteTask() error { } if r1.hash != r2.hash { - logger.Debug("✗ Mismatch in initial range %d (%v-%v) for %s vs %s. Hashes: %s... / %s... narrowing down diffs...", - rangeIdx, currentRange.Start, currentRange.End, node1, node2, safeCut(r1.hash, 8), safeCut(r2.hash, 8)) - diffWg.Add(1) - go t.recursiveDiff(ctx, RecursiveDiffTask{ + logger.Debug("%s Mismatch in initial range %d (%v-%v) for %s vs %s. Hashes: %s... / %s... narrowing down diffs...", + CrossMark, rangeIdx, currentRange.Start, currentRange.End, node1, node2, safeCut(r1.hash, 8), safeCut(r2.hash, 8)) + mismatchedTasks = append(mismatchedTasks, RecursiveDiffTask{ Node1Name: node1, Node2Name: node2, CurrentRange: currentRange, CurrentEstimatedBlockSize: t.BlockSize, - }, &diffWg) + }) + } else { - logger.Debug("✓ Match in initial range %d (%v-%v) for %s vs %s", rangeIdx, currentRange.Start, currentRange.End, node1, node2) + logger.Debug("%s Match in initial range %d (%v-%v) for %s vs %s", CheckMark, rangeIdx, currentRange.Start, currentRange.End, node1, node2) } } } } + if len(mismatchedTasks) > 0 { + logger.Debug("Found %d initial mismatched ranges. Narrowing down differences.", len(mismatchedTasks)) + + diffBar := p.AddBar(int64(len(mismatchedTasks)), + mpb.PrependDecorators( + decor.Name("Analysing mismatches: ", decor.WC{W: 18}), + decor.CountersNoUnit("%d / %d", decor.WCSyncWidth), + ), + mpb.AppendDecorators( + decor.Elapsed(decor.ET_STYLE_GO), + decor.Name(" | "), + decor.OnComplete(decor.AverageETA(decor.ET_STYLE_GO), "done"), + ), + ) + + for _, task := range mismatchedTasks { + diffWg.Add(1) + go func(task RecursiveDiffTask) { + defer diffBar.Increment() + t.recursiveDiff(ctx, task, &diffWg) + }(task) + } + } + diffWg.Wait() - // t.DiffResult.Summary.MismatchedRangesCount = int(mismatchedRangesCountAtomic) + p.Wait() logger.Info("Table diff comparison completed for %s", t.QualifiedTableName) @@ -1166,9 +1190,16 @@ func (t *TableDiffTask) ExecuteTask() error { logger.Info("ERROR writing diff output to file %s: %v", outputFileName, err) return fmt.Errorf("failed to write diffs file: %w", err) } + logger.Warn("%s TABLES DO NOT MATCH", CrossMark) + + for key, diffCount := range t.DiffResult.Summary.DiffRowsCount { + logger.Warn("Found %d differences between %s", diffCount, key) + } + logger.Info("Diff report written to %s", outputFileName) + } else { - logger.Info("No differences found. Diff file not created.") + logger.Info("%s TABLES MATCH", CheckMark) } return nil @@ -1561,8 +1592,8 @@ func (t *TableDiffTask) recursiveDiff( } if res1.hash != res2.hash { - logger.Debug("✗ Mismatch in sub-range %v-%v for %s (%s...) vs %s (%s...). Recursing.", - sr.Start, sr.End, node1Name, safeCut(res1.hash, 8), node2Name, safeCut(res2.hash, 8)) + logger.Debug("%s Mismatch in sub-range %v-%v for %s (%s...) vs %s (%s...). Recursing.", + CrossMark, sr.Start, sr.End, node1Name, safeCut(res1.hash, 8), node2Name, safeCut(res2.hash, 8)) wg.Add(1) go t.recursiveDiff(ctx, RecursiveDiffTask{ Node1Name: node1Name, @@ -1571,7 +1602,7 @@ func (t *TableDiffTask) recursiveDiff( CurrentEstimatedBlockSize: newEstimatedBlockSize, }, wg) } else { - logger.Debug("✓ Match in sub-range %v-%v for %s vs %s.", sr.Start, sr.End, node1Name, node2Name) + logger.Debug("%s Match in sub-range %v-%v for %s vs %s.", CheckMark, sr.Start, sr.End, node1Name, node2Name) } } } diff --git a/internal/core/utils.go b/internal/core/utils.go index d3ff302..97213f0 100644 --- a/internal/core/utils.go +++ b/internal/core/utils.go @@ -30,6 +30,11 @@ import ( "github.com/pgedge/ace/pkg/types" ) +const ( + CheckMark = "\u2714" + CrossMark = "\u2718" +) + type ClusterConfigProvider interface { GetClusterName() string GetDBName() string diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 8422cc8..43ddaf1 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -1,34 +1,39 @@ package logger import ( + "fmt" "os" "github.com/charmbracelet/log" ) var ( - // Log is the global logger. Log = log.NewWithOptions(os.Stderr, log.Options{ ReportTimestamp: true, }) ) -// SetLevel sets the log level for the global logger. func SetLevel(level log.Level) { Log.SetLevel(level) } -// Info logs a formatted string at the info level. func Info(format string, args ...any) { Log.Infof(format, args...) } -// Debug logs a formatted string at the debug level. func Debug(format string, args ...any) { Log.Debugf(format, args...) } -// Error logs a formatted string at the error level. -func Error(format string, args ...any) { +func Warn(format string, args ...any) { + Log.Warnf(format, args...) +} + +func Error(format string, args ...any) error { Log.Errorf(format, args...) + return fmt.Errorf(format, args...) +} + +func Fatal(msg any, args ...any) { + Log.Fatal(msg, args...) } From e2c7cc8bfe2995a3bc662d05d97cce70cf04c2c7 Mon Sep 17 00:00:00 2001 From: Tej Kashi Date: Mon, 21 Jul 2025 13:41:36 -0400 Subject: [PATCH 2/3] Use logger in table_repair.go --- internal/core/table_repair.go | 183 +++++++++++++++++++++------------- 1 file changed, 112 insertions(+), 71 deletions(-) diff --git a/internal/core/table_repair.go b/internal/core/table_repair.go index c2fa577..362d9c5 100644 --- a/internal/core/table_repair.go +++ b/internal/core/table_repair.go @@ -15,7 +15,6 @@ import ( "context" "encoding/json" "fmt" - "log" "maps" "os" "path/filepath" @@ -25,6 +24,7 @@ import ( "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" "github.com/pgedge/ace/internal/auth" + "github.com/pgedge/ace/internal/logger" "github.com/pgedge/ace/pkg/types" ) @@ -174,7 +174,7 @@ func (t *TableRepairTask) ValidateAndPrepare() error { port, okPort := nodeInfo["Port"].(string) if !okHostname || !okPublicIP || !okPort { - log.Printf("Warning: Skipping node with incomplete info: %+v", nodeInfo) + logger.Warn("Skipping node with incomplete info: %+v", nodeInfo) continue } t.HostMap[fmt.Sprintf("%s:%s", publicIP, port)] = hostname @@ -245,7 +245,7 @@ func (t *TableRepairTask) ValidateAndPrepare() error { if nodeName == t.SourceOfTruth || involvedNodeNames[nodeName] { connPool, err := auth.GetClusterNodeConnection(nodeInfo, t.ClientRole) if err != nil { - log.Printf("Warning: Failed to connect to node %s: %v. Will attempt to proceed if it's not critical or SoT.", nodeName, err) + logger.Warn("Failed to connect to node %s: %v. Will attempt to proceed if it's not critical or SoT.", nodeName, err) if nodeName == t.SourceOfTruth { return fmt.Errorf("failed to connect to source_of_truth node %s: %w", nodeName, err) } @@ -309,7 +309,7 @@ func (t *TableRepairTask) ValidateAndPrepare() error { return fmt.Errorf("failed to establish a connection to the source_of_truth node: %s", t.SourceOfTruth) } - log.Println("Table repair task validated and prepared successfully.") + logger.Debug("Table repair task validated and prepared successfully.") return nil } @@ -378,7 +378,7 @@ func writeReportToFile(report *RepairReport) error { return fmt.Errorf("failed to write report to file %s: %w", filePath, err) } - log.Printf("Wrote report to %s", filePath) + logger.Info("Wrote report to %s", filePath) return nil } @@ -398,7 +398,7 @@ func (t *TableRepairTask) Run(skipValidation bool) error { if t.GenerateReport && t.report != nil { t.report.RunTimeSeconds = time.Since(startTime).Seconds() if err := writeReportToFile(t.report); err != nil { - log.Printf("Warning: failed to write repair report: %v", err) + logger.Warn("Warning: failed to write repair report: %v", err) } } }() @@ -416,7 +416,7 @@ func (t *TableRepairTask) Run(skipValidation bool) error { for nodeName, pool := range t.Pools { if pool != nil { pool.Close() - log.Printf("Closed connection pool for node: %s", nodeName) + logger.Debug("Closed connection pool for node: %s", nodeName) } } }() @@ -429,7 +429,7 @@ func (t *TableRepairTask) Run(skipValidation bool) error { } func (t *TableRepairTask) runUnidirectionalRepair(startTime time.Time) error { - log.Printf("Starting table repair for %s on cluster %s", t.QualifiedTableName, t.ClusterName) + logger.Info("Starting table repair for %s on cluster %s", t.QualifiedTableName, t.ClusterName) // Core repair logic begins here totalOps := make(map[string]map[string]int) // node -> "upserted"/"deleted" -> count @@ -456,10 +456,10 @@ func (t *TableRepairTask) runUnidirectionalRepair(startTime time.Time) error { */ for nodeName := range divergentNodes { - log.Printf("Processing repairs for divergent node: %s", nodeName) + logger.Info("Processing repairs for divergent node: %s", nodeName) divergentPool, ok := t.Pools[nodeName] if !ok || divergentPool == nil { - log.Printf("Connection pool for divergent node %s not found, attempting to connect.", nodeName) + logger.Debug("Connection pool for divergent node %s not found, attempting to connect.", nodeName) var nodeInfo map[string]any for _, ni := range t.ClusterNodes { if name, _ := ni["Name"].(string); name == nodeName { @@ -469,7 +469,7 @@ func (t *TableRepairTask) runUnidirectionalRepair(startTime time.Time) error { } if nodeInfo == nil { - log.Printf("Error: Could not find node info for %s. Skipping repairs for this node.", nodeName) + logger.Error("Could not find node info for %s. Skipping repairs for this node.", nodeName) repairErrors = append(repairErrors, fmt.Sprintf("no node info for %s", nodeName)) continue } @@ -477,17 +477,17 @@ func (t *TableRepairTask) runUnidirectionalRepair(startTime time.Time) error { var err error divergentPool, err = auth.GetClusterNodeConnection(nodeInfo, t.ClientRole) if err != nil { - log.Printf("Error: Failed to connect to node %s: %v. Skipping repairs for this node.", nodeName, err) + logger.Error("Failed to connect to node %s: %v. Skipping repairs for this node.", nodeName, err) repairErrors = append(repairErrors, fmt.Sprintf("connection failed for %s: %v", nodeName, err)) continue } t.Pools[nodeName] = divergentPool - log.Printf("Successfully connected to node %s and created a new connection pool.", nodeName) + logger.Debug("Successfully connected to node %s and created a new connection pool.", nodeName) } tx, err := divergentPool.Begin(context.Background()) if err != nil { - log.Printf("Error starting transaction on node %s: %v", nodeName, err) + logger.Error("starting transaction on node %s: %v", nodeName, err) repairErrors = append(repairErrors, fmt.Sprintf("tx begin failed for %s: %v", nodeName, err)) continue } @@ -496,12 +496,12 @@ func (t *TableRepairTask) runUnidirectionalRepair(startTime time.Time) error { _, err = tx.Exec(context.Background(), "SELECT spock.repair_mode(true)") if err != nil { tx.Rollback(context.Background()) - log.Printf("Error enabling spock.repair_mode(true) on %s: %v", nodeName, err) + logger.Error("enabling spock.repair_mode(true) on %s: %v", nodeName, err) repairErrors = append(repairErrors, fmt.Sprintf("spock.repair_mode(true) failed for %s: %v", nodeName, err)) continue } spockRepairModeActive = true - log.Printf("spock.repair_mode(true) set on %s", nodeName) + logger.Debug("spock.repair_mode(true) set on %s", nodeName) if t.FireTriggers { _, err = tx.Exec(context.Background(), "SET session_replication_role = 'local'") @@ -510,11 +510,11 @@ func (t *TableRepairTask) runUnidirectionalRepair(startTime time.Time) error { } if err != nil { tx.Rollback(context.Background()) - log.Printf("Error setting session_replication_role on %s: %v", nodeName, err) + logger.Error("setting session_replication_role on %s: %v", nodeName, err) repairErrors = append(repairErrors, fmt.Sprintf("session_replication_role failed for %s: %v", nodeName, err)) continue } - log.Printf("session_replication_role set on %s (fire_triggers: %v)", nodeName, t.FireTriggers) + logger.Debug("session_replication_role set on %s (fire_triggers: %v)", nodeName, t.FireTriggers) // TODO: DROP PRIVILEGES HERE! @@ -525,12 +525,12 @@ func (t *TableRepairTask) runUnidirectionalRepair(startTime time.Time) error { deletedCount, err := executeDeletes(tx, t, nodeDeletes) if err != nil { tx.Rollback(context.Background()) - log.Printf("Error executing deletes on node %s: %v", nodeName, err) + logger.Error("executing deletes on node %s: %v", nodeName, err) repairErrors = append(repairErrors, fmt.Sprintf("delete ops failed for %s: %v", nodeName, err)) continue } totalOps[nodeName]["deleted"] = deletedCount - log.Printf("Executed %d delete operations on %s", deletedCount, nodeName) + logger.Info("Executed %d delete operations on %s", deletedCount, nodeName) if t.report != nil { if _, ok := t.report.Changes[nodeName]; !ok { @@ -558,7 +558,7 @@ func (t *TableRepairTask) runUnidirectionalRepair(startTime time.Time) error { if targetNodeHostPortKey == "" { tx.Rollback(context.Background()) errStr := fmt.Sprintf("could not find host:port key for target node %s to get col types", nodeName) - log.Printf("Error: %s", errStr) + logger.Error("%s", errStr) repairErrors = append(repairErrors, errStr) continue } @@ -566,7 +566,7 @@ func (t *TableRepairTask) runUnidirectionalRepair(startTime time.Time) error { if !ok { tx.Rollback(context.Background()) errStr := fmt.Sprintf("column types for target node '%s' (key: %s) not found for upserts", nodeName, targetNodeHostPortKey) - log.Printf("Error: %s", errStr) + logger.Error("%s", errStr) repairErrors = append(repairErrors, errStr) continue } @@ -574,12 +574,12 @@ func (t *TableRepairTask) runUnidirectionalRepair(startTime time.Time) error { upsertedCount, err := executeUpserts(tx, t, nodeUpserts, targetNodeColTypes) if err != nil { tx.Rollback(context.Background()) - log.Printf("Error executing upserts on node %s: %v", nodeName, err) + logger.Error("executing upserts on node %s: %v", nodeName, err) repairErrors = append(repairErrors, fmt.Sprintf("upsert ops failed for %s: %v", nodeName, err)) continue } totalOps[nodeName]["upserted"] = upsertedCount - log.Printf("Executed %d upsert operations on %s", upsertedCount, nodeName) + logger.Info("Executed %d upsert operations on %s", upsertedCount, nodeName) if t.report != nil { if _, ok := t.report.Changes[nodeName]; !ok { @@ -603,66 +603,87 @@ func (t *TableRepairTask) runUnidirectionalRepair(startTime time.Time) error { _, err = tx.Exec(context.Background(), "SELECT spock.repair_mode(false)") if err != nil { tx.Rollback(context.Background()) - log.Printf("Error disabling spock.repair_mode(false) on %s: %v", nodeName, err) + logger.Error("disabling spock.repair_mode(false) on %s: %v", nodeName, err) repairErrors = append(repairErrors, fmt.Sprintf("spock.repair_mode(false) failed for %s: %v", nodeName, err)) continue } - log.Printf("spock.repair_mode(false) set on %s", nodeName) + logger.Debug("spock.repair_mode(false) set on %s", nodeName) } err = tx.Commit(context.Background()) if err != nil { - log.Printf("Error committing transaction on node %s: %v", nodeName, err) + logger.Error("committing transaction on node %s: %v", nodeName, err) repairErrors = append(repairErrors, fmt.Sprintf("commit failed for %s: %v", nodeName, err)) continue } - log.Printf("Transaction committed successfully on %s", nodeName) + logger.Debug("Transaction committed successfully on %s", nodeName) } + t.FinishedAt = time.Now() + t.TimeTaken = float64(t.FinishedAt.Sub(startTime).Milliseconds()) + runTimeStr := fmt.Sprintf("%.3fs", t.TimeTaken/1000) + if len(repairErrors) > 0 { - log.Printf("Table repair for %s finished with errors: %s", t.QualifiedTableName, strings.Join(repairErrors, "; ")) + logger.Error("Repair of %s failed in %s with errors: %s", t.QualifiedTableName, runTimeStr, strings.Join(repairErrors, "; ")) t.TaskStatus = "FAILED" t.TaskContext = strings.Join(repairErrors, "; ") } else { - log.Printf("Table repair for %s completed successfully.", t.QualifiedTableName) - t.TaskStatus = "COMPLETED" - summary := strings.Builder{} + totalUpserted := 0 + totalDeleted := 0 + repairedNodes := make(map[string]bool) for node, ops := range totalOps { - summary.WriteString(fmt.Sprintf("Node %s: %d upserted, %d deleted. ", node, ops["upserted"], ops["deleted"])) + if ops["upserted"] > 0 || ops["deleted"] > 0 { + repairedNodes[node] = true + } + totalUpserted += ops["upserted"] + totalDeleted += ops["deleted"] } - t.TaskContext = strings.TrimSpace(summary.String()) - } - log.Printf("Total operations: %v", totalOps) + summaryParts := []string{} + if totalUpserted > 0 { + op := "upserted" + if t.InsertOnly { + op = "inserted" + } + summaryParts = append(summaryParts, fmt.Sprintf("%d %s", totalUpserted, op)) + } + if totalDeleted > 0 { + summaryParts = append(summaryParts, fmt.Sprintf("%d deleted", totalDeleted)) + } - log.Println("*** SUMMARY ***") - for nodeName := range divergentNodes { - ops := totalOps[nodeName] - if t.InsertOnly { - log.Printf("%s INSERTED = %d rows", nodeName, ops["upserted"]) + if len(repairedNodes) > 0 { + nodeList := []string{} + for node := range repairedNodes { + nodeList = append(nodeList, node) + } + logger.Info("Repair of %s complete in %s. Nodes %s repaired (%s).", + t.QualifiedTableName, + runTimeStr, + strings.Join(nodeList, ", "), + strings.Join(summaryParts, ", "), + ) } else { - log.Printf("%s UPSERTED = %d rows", nodeName, ops["upserted"]) + logger.Info("Repair of %s complete in %s. No differences found.", + t.QualifiedTableName, + runTimeStr, + ) } - } - fmt.Println() - if !t.UpsertOnly && !t.InsertOnly { - for nodeName := range divergentNodes { - ops := totalOps[nodeName] - log.Printf("%s DELETED = %d rows", nodeName, ops["deleted"]) + + t.TaskStatus = "COMPLETED" + summary := strings.Builder{} + for node, ops := range totalOps { + summary.WriteString(fmt.Sprintf("Node %s: %d upserted, %d deleted. ", node, ops["upserted"], ops["deleted"])) } + t.TaskContext = strings.TrimSpace(summary.String()) } - t.FinishedAt = time.Now() - t.TimeTaken = t.FinishedAt.Sub(startTime).Seconds() - log.Printf("RUN TIME = %.2f seconds", t.TimeTaken) - // TODO: Update task metrics in a local DB return nil } func (t *TableRepairTask) runBidirectionalRepair() error { startTime := time.Now() - log.Printf("Starting bidirectional table repair for %s on cluster %s", t.QualifiedTableName, t.ClusterName) + logger.Info("Starting bidirectional table repair for %s on cluster %s", t.QualifiedTableName, t.ClusterName) totalOps := make(map[string]int) var repairErrors []string @@ -670,11 +691,11 @@ func (t *TableRepairTask) runBidirectionalRepair() error { for nodePairKey, diffs := range t.RawDiffs.NodeDiffs { nodes := strings.Split(nodePairKey, "/") if len(nodes) != 2 { - log.Printf("Warning: Invalid node pair key '%s', skipping", nodePairKey) + logger.Warn("Warning: Invalid node pair key '%s', skipping", nodePairKey) continue } node1Name, node2Name := nodes[0], nodes[1] - log.Printf("Processing node pair: %s/%s", node1Name, node2Name) + logger.Info("Processing node pair: %s/%s", node1Name, node2Name) node1Rows := diffs.Rows[node1Name] node2Rows := diffs.Rows[node2Name] @@ -759,12 +780,41 @@ func (t *TableRepairTask) runBidirectionalRepair() error { } } + t.FinishedAt = time.Now() + t.TimeTaken = float64(t.FinishedAt.Sub(startTime).Milliseconds()) + runTimeStr := fmt.Sprintf("%.3fs", t.TimeTaken/1000) + if len(repairErrors) > 0 { - log.Printf("Bidirectional table repair for %s finished with errors: %s", t.QualifiedTableName, strings.Join(repairErrors, "; ")) + logger.Error("Bidirectional repair of %s failed in %s with errors: %s", t.QualifiedTableName, runTimeStr, strings.Join(repairErrors, "; ")) t.TaskStatus = "FAILED" t.TaskContext = strings.Join(repairErrors, "; ") } else { - log.Printf("Bidirectional table repair for %s completed successfully.", t.QualifiedTableName) + totalInserted := 0 + repairedNodes := make(map[string]bool) + for node, count := range totalOps { + if count > 0 { + repairedNodes[node] = true + } + totalInserted += count + } + + if totalInserted > 0 { + nodeList := []string{} + for node := range repairedNodes { + nodeList = append(nodeList, node) + } + logger.Info("Bidirectional repair of %s complete in %s. Nodes %s repaired (%d rows inserted).", + t.QualifiedTableName, + runTimeStr, + strings.Join(nodeList, ", "), + totalInserted, + ) + } else { + logger.Info("Bidirectional repair of %s complete in %s. No differences found.", + t.QualifiedTableName, + runTimeStr, + ) + } t.TaskStatus = "COMPLETED" summary := strings.Builder{} for node, count := range totalOps { @@ -773,15 +823,6 @@ func (t *TableRepairTask) runBidirectionalRepair() error { t.TaskContext = strings.TrimSpace(summary.String()) } - log.Println("*** SUMMARY ***") - for nodeName, count := range totalOps { - log.Printf("%s INSERTED = %d rows", nodeName, count) - } - - t.FinishedAt = time.Now() - t.TimeTaken = t.FinishedAt.Sub(startTime).Seconds() - log.Printf("RUN TIME = %.2f seconds", t.TimeTaken) - return nil } @@ -801,7 +842,7 @@ func (t *TableRepairTask) performBirectionalInserts(nodeName string, inserts map if err != nil { return 0, fmt.Errorf("failed to enable spock.repair_mode(true) on %s: %w", nodeName, err) } - log.Printf("spock.repair_mode(true) set on %s", nodeName) + logger.Info("spock.repair_mode(true) set on %s", nodeName) if t.FireTriggers { _, err = tx.Exec(context.Background(), "SET session_replication_role = 'local'") @@ -811,7 +852,7 @@ func (t *TableRepairTask) performBirectionalInserts(nodeName string, inserts map if err != nil { return 0, fmt.Errorf("failed to set session_replication_role on %s: %w", nodeName, err) } - log.Printf("session_replication_role set on %s (fire_triggers: %v)", nodeName, t.FireTriggers) + logger.Info("session_replication_role set on %s (fire_triggers: %v)", nodeName, t.FireTriggers) targetNodeHostPortKey := "" for hostPort, mappedName := range t.HostMap { @@ -837,18 +878,18 @@ func (t *TableRepairTask) performBirectionalInserts(nodeName string, inserts map if err != nil { return 0, fmt.Errorf("failed to execute inserts on %s: %w", nodeName, err) } - log.Printf("Executed %d insert operations on %s", insertedCount, nodeName) + logger.Info("Executed %d insert operations on %s", insertedCount, nodeName) _, err = tx.Exec(context.Background(), "SELECT spock.repair_mode(false)") if err != nil { return 0, fmt.Errorf("failed to disable spock.repair_mode(false) on %s: %w", nodeName, err) } - log.Printf("spock.repair_mode(false) set on %s", nodeName) + logger.Info("spock.repair_mode(false) set on %s", nodeName) if err := tx.Commit(context.Background()); err != nil { return 0, fmt.Errorf("failed to commit transaction on %s: %w", nodeName, err) } - log.Printf("Transaction committed successfully on %s", nodeName) + logger.Info("Transaction committed successfully on %s", nodeName) return insertedCount, nil } From 4feb16ed648db35617596e868abf2f3768a3281d Mon Sep 17 00:00:00 2001 From: Tej Kashi Date: Mon, 21 Jul 2025 15:30:42 -0400 Subject: [PATCH 3/3] Use logger in remaining modules * Fix object comparison in table-rerun --- internal/core/repset_diff.go | 12 ++++---- internal/core/schema_diff.go | 14 ++++----- internal/core/spock_diff.go | 58 +++++++++++++++++++----------------- internal/core/table_rerun.go | 29 +++++++++--------- 4 files changed, 57 insertions(+), 56 deletions(-) diff --git a/internal/core/repset_diff.go b/internal/core/repset_diff.go index f2066e0..753e682 100644 --- a/internal/core/repset_diff.go +++ b/internal/core/repset_diff.go @@ -15,7 +15,6 @@ import ( "bufio" "context" "fmt" - "log" "maps" "os" "strconv" @@ -25,6 +24,7 @@ import ( "github.com/jackc/pgx/v4/pgxpool" "github.com/pgedge/ace/db/queries" "github.com/pgedge/ace/internal/auth" + "github.com/pgedge/ace/internal/logger" "github.com/pgedge/ace/pkg/types" ) @@ -164,7 +164,7 @@ func RepsetDiff(task *RepsetDiffCmd) error { for _, skip := range task.skipTablesList { if strings.TrimSpace(skip) == tableName { if !task.Quiet { - fmt.Printf("Skipping table: %s\n", tableName) + logger.Info("Skipping table: %s", tableName) } skipped = true break @@ -175,7 +175,7 @@ func RepsetDiff(task *RepsetDiffCmd) error { } if !task.Quiet { - fmt.Printf("Diffing table: %s\n", tableName) + logger.Info("Diffing table: %s", tableName) } tdTask := NewTableDiffTask() @@ -192,16 +192,16 @@ func RepsetDiff(task *RepsetDiffCmd) error { tdTask.QuietMode = task.Quiet if err := tdTask.Validate(); err != nil { - log.Printf("validation for table %s failed: %v", tableName, err) + logger.Warn("validation for table %s failed: %v", tableName, err) continue } if err := tdTask.RunChecks(true); err != nil { - log.Printf("checks for table %s failed: %v", tableName, err) + logger.Warn("checks for table %s failed: %v", tableName, err) continue } if err := tdTask.ExecuteTask(); err != nil { - log.Printf("error during comparison for table %s: %v", tableName, err) + logger.Warn("error during comparison for table %s: %v", tableName, err) continue } } diff --git a/internal/core/schema_diff.go b/internal/core/schema_diff.go index 1a72b3f..df9480f 100644 --- a/internal/core/schema_diff.go +++ b/internal/core/schema_diff.go @@ -15,7 +15,6 @@ import ( "context" "encoding/json" "fmt" - "log" "maps" "sort" "strconv" @@ -24,6 +23,7 @@ import ( "github.com/jackc/pgx/v4/pgxpool" "github.com/pgedge/ace/db/queries" "github.com/pgedge/ace/internal/auth" + "github.com/pgedge/ace/internal/logger" "github.com/pgedge/ace/pkg/types" ) @@ -273,7 +273,7 @@ func schemaDDLDiff(task *SchemaDiffCmd) error { pool, err := auth.GetClusterNodeConnection(nodeWithDBInfo, "") if err != nil { - log.Printf("could not connect to node %s: %v. Skipping.", nodeName, err) + logger.Warn("could not connect to node %s: %v. Skipping.", nodeName, err) continue } defer pool.Close() @@ -281,7 +281,7 @@ func schemaDDLDiff(task *SchemaDiffCmd) error { db := queries.NewQuerier(pool) objects, err := getObjectsForSchema(db, task.SchemaName) if err != nil { - log.Printf("could not get schema objects for node %s: %v. Skipping.", nodeName, err) + logger.Warn("could not get schema objects for node %s: %v. Skipping.", nodeName, err) continue } @@ -370,7 +370,7 @@ func SchemaDiff(task *SchemaDiffCmd) error { for _, tableName := range task.tableList { qualifiedTableName := fmt.Sprintf("%s.%s", task.SchemaName, tableName) if !task.Quiet { - fmt.Printf("Diffing table: %s\n", qualifiedTableName) + logger.Info("Diffing table: %s", qualifiedTableName) } tdTask := NewTableDiffTask() @@ -387,16 +387,16 @@ func SchemaDiff(task *SchemaDiffCmd) error { tdTask.QuietMode = task.Quiet if err := tdTask.Validate(); err != nil { - log.Printf("validation for table %s failed: %v", qualifiedTableName, err) + logger.Warn("validation for table %s failed: %v", qualifiedTableName, err) continue } if err := tdTask.RunChecks(true); err != nil { - log.Printf("checks for table %s failed: %v", qualifiedTableName, err) + logger.Warn("checks for table %s failed: %v", qualifiedTableName, err) continue } if err := tdTask.ExecuteTask(); err != nil { - log.Printf("error during comparison for table %s: %v", qualifiedTableName, err) + logger.Warn("error during comparison for table %s: %v", qualifiedTableName, err) continue } diff --git a/internal/core/spock_diff.go b/internal/core/spock_diff.go index 187487b..595006e 100644 --- a/internal/core/spock_diff.go +++ b/internal/core/spock_diff.go @@ -15,7 +15,6 @@ import ( "context" "encoding/json" "fmt" - "log" "maps" "os" "reflect" @@ -209,7 +208,7 @@ func (t *SpockDiffTask) ExecuteTask() error { q := queries.NewQuerier(pool) config := SpockNodeConfig{NodeName: nodeName, Hints: []string{}} - log.Printf("Fetching Spock config for node: %s\n", nodeName) + logger.Debug("Fetching Spock config for node: %s", nodeName) // Fetch node and subscription info nodeInfos, err := q.SpockNodeAndSubInfo(ctx) @@ -264,37 +263,37 @@ func (t *SpockDiffTask) ExecuteTask() error { // Pretty print configs for _, nodeName := range nodeNames { config := allNodeConfigs[nodeName] - log.Printf("\n===== Spock Config: %s =====\n", nodeName) + fmt.Printf("\n===== Spock Config: %s =====\n", nodeName) if len(config.Subscriptions) > 0 { - log.Println(" Subscriptions:") + fmt.Println(" Subscriptions:") for _, sub := range config.Subscriptions { if sub.SubName != "" { - log.Printf(" - Name: %s (Enabled: %t)\n", sub.SubName, sub.SubEnabled) - log.Printf(" Replication Sets: %v\n", sub.ReplicationSets) + fmt.Printf(" - Name: %s (Enabled: %t)\n", sub.SubName, sub.SubEnabled) + fmt.Printf(" Replication Sets: %v\n", sub.ReplicationSets) } } } else { - log.Println(" No subscriptions found.") + fmt.Println(" No subscriptions found.") } if len(config.RepSetInfo) > 0 { - log.Println(" Replication Sets:") + fmt.Println(" Replication Sets:") for _, rs := range config.RepSetInfo { if rs.SetName.Status == 2 { - log.Printf(" - %s:\n", rs.SetName.String) + fmt.Printf(" - %s:\n", rs.SetName.String) for _, table := range rs.Relname { - log.Printf(" - %s\n", table) + fmt.Printf(" - %s\n", table) } } } } else { - log.Println(" No replication sets found.") + fmt.Println(" No replication sets found.") } if len(config.Hints) > 0 { - log.Println(" Hints:") + fmt.Println(" Hints:") for _, hint := range config.Hints { - log.Printf(" - %s\n", hint) + fmt.Printf(" - %s\n", hint) } } } @@ -304,7 +303,7 @@ func (t *SpockDiffTask) ExecuteTask() error { t.DiffResult.SpockConfigs[k] = v } - log.Println("\n===== Spock Diff =====") + fmt.Println("\n===== Spock Diff =====") for i := 0; i < len(nodeNames); i++ { for j := i + 1; j < len(nodeNames); j++ { @@ -315,25 +314,26 @@ func (t *SpockDiffTask) ExecuteTask() error { pairKey := fmt.Sprintf("%s/%s", refNodeName, compareNodeName) - log.Printf("\nComparing %s vs %s:\n", refNodeName, compareNodeName) + fmt.Printf("\nComparing %s vs %s:\n", refNodeName, compareNodeName) // Perform detailed diff diff := compareSpockConfigs(refConfig, compareConfig) if !diff.Mismatch { diff.Message = fmt.Sprintf("Replication rules are the same for %s and %s", refNodeName, compareNodeName) - log.Printf(" ✔ No differences found.\n") + fmt.Printf("%s No differences found.\n", CheckMark) } else { diff.Message = fmt.Sprintf("Difference in Replication Rules between %s and %s", refNodeName, compareNodeName) - log.Printf(" ✘ Differences found:\n") + fmt.Printf("%s Differences found:\n", CrossMark) printDiffDetails(diff.Details, refNodeName, compareNodeName) } t.DiffResult.Diffs[pairKey] = diff } } + fmt.Println() + endTime := time.Now() - log.Printf("\nSpock diff completed in %s\n", endTime.Sub(startTime)) if len(t.DiffResult.Diffs) > 0 { outputFileName := fmt.Sprintf("spock_diffs-%s.json", @@ -354,6 +354,8 @@ func (t *SpockDiffTask) ExecuteTask() error { logger.Info("Diff report written to %s", outputFileName) } + logger.Info("Spock diff completed in %.3f seconds", endTime.Sub(startTime).Seconds()) + return nil } @@ -481,26 +483,26 @@ func compareReplicationSets(c1, c2 SpockNodeConfig) types.ReplicationSetDiff { func printDiffDetails(details types.SpockDiffDetail, node1, node2 string) { if len(details.Subscriptions.MissingOnNode1) > 0 { - log.Printf(" Missing reciprocal subscriptions on %s: %v\n", node1, details.Subscriptions.MissingOnNode1) + fmt.Printf(" Missing reciprocal subscriptions on %s: %v\n", node1, details.Subscriptions.MissingOnNode1) } if len(details.Subscriptions.MissingOnNode2) > 0 { - log.Printf(" Missing reciprocal subscriptions on %s: %v\n", node2, details.Subscriptions.MissingOnNode2) + fmt.Printf(" Missing reciprocal subscriptions on %s: %v\n", node2, details.Subscriptions.MissingOnNode2) } if len(details.Subscriptions.Different) > 0 { - log.Println(" Subscriptions with different properties:") + fmt.Println(" Subscriptions with different properties:") for _, d := range details.Subscriptions.Different { - log.Printf(" - Mismatch in settings for subscriptions between %s and %s:\n", node1, node2) - log.Printf(" - On %s (subscription '%s'): Enabled: %t, Repsets: %v\n", node1, d.Node1.SubName, d.Node1.SubEnabled, d.Node1.ReplicationSets) - log.Printf(" - On %s (subscription '%s'): Enabled: %t, Repsets: %v\n", node2, d.Node2.SubName, d.Node2.SubEnabled, d.Node2.ReplicationSets) + fmt.Printf(" - Mismatch in settings for subscriptions between %s and %s:\n", node1, node2) + fmt.Printf(" - On %s (subscription '%s'): Enabled: %t, Repsets: %v\n", node1, d.Node1.SubName, d.Node1.SubEnabled, d.Node1.ReplicationSets) + fmt.Printf(" - On %s (subscription '%s'): Enabled: %t, Repsets: %v\n", node2, d.Node2.SubName, d.Node2.SubEnabled, d.Node2.ReplicationSets) } } if len(details.ReplicationSets.TablePlacementDiffs) > 0 { - log.Println(" Table placement in replication sets differs:") + fmt.Println(" Table placement in replication sets differs:") for _, d := range details.ReplicationSets.TablePlacementDiffs { - log.Printf(" - Table '%s':\n", d.TableName) - log.Printf(" - on %s: in repset '%s'\n", node1, d.Node1RepSet) - log.Printf(" - on %s: in repset '%s'\n", node2, d.Node2RepSet) + fmt.Printf(" - Table '%s':\n", d.TableName) + fmt.Printf(" - on %s: in repset '%s'\n", node1, d.Node1RepSet) + fmt.Printf(" - on %s: in repset '%s'\n", node2, d.Node2RepSet) } } } diff --git a/internal/core/table_rerun.go b/internal/core/table_rerun.go index b16ad38..b9298f8 100644 --- a/internal/core/table_rerun.go +++ b/internal/core/table_rerun.go @@ -148,7 +148,7 @@ func (t *TableDiffTask) ExecuteRerunTask() error { for _, count := range newDiffResult.Summary.DiffRowsCount { totalPersistentDiffs += count } - logger.Info("Found %d persistent differences. Writing new report to %s", totalPersistentDiffs, outputFileName) + logger.Warn("%s Found %d persistent differences. Writing new report to %s", CrossMark, totalPersistentDiffs, outputFileName) jsonData, mErr := json.MarshalIndent(newDiffResult, "", " ") if mErr != nil { return fmt.Errorf("failed to marshal new diff report: %w", mErr) @@ -157,7 +157,7 @@ func (t *TableDiffTask) ExecuteRerunTask() error { return fmt.Errorf("failed to write new diff report: %w", wErr) } } else { - logger.Info("All previously reported differences have been resolved.") + logger.Info("%s All previously reported differences have been resolved.", CheckMark) } return nil @@ -255,11 +255,13 @@ func fetchRowsByPkeys(ctx context.Context, pool *pgxpool.Pool, t *TableDiffTask, } createTempTableSQL := fmt.Sprintf("CREATE TEMPORARY TABLE %s (%s) ON COMMIT PRESERVE ROWS", sanitisedTempTable, strings.Join(pkColDefs, ", ")) + logger.Debug("Creating temporary table for pkeys: %s", createTempTableSQL) _, err = tx.Exec(ctx, createTempTableSQL) if err != nil { return nil, fmt.Errorf("failed to create temporary table: %w", err) } + logger.Debug("Copying %d pkey sets into temporary table %s", len(pkeyVals), tempTableName) _, err = tx.CopyFrom(ctx, pgx.Identifier{tempTableName}, t.Key, pgx.CopyFromRows(pkeyVals)) if err != nil { return nil, fmt.Errorf("failed to copy primary keys to temporary table: %w", err) @@ -282,6 +284,7 @@ func fetchRowsByPkeys(ctx context.Context, pool *pgxpool.Pool, t *TableDiffTask, fetchSQL := fmt.Sprintf("SELECT %s FROM %s t JOIN %s temp ON %s", strings.Join(selectCols, ", "), schemaTable, sanitisedTempTable, strings.Join(joinConditions, " AND ")) + logger.Debug("Fetching rows with pkeys from temporary table: %s", fetchSQL) pgRows, err := tx.Query(ctx, fetchSQL) if err != nil { return nil, fmt.Errorf("failed to query rows using temp table join: %w", err) @@ -366,21 +369,17 @@ func (t *TableDiffTask) reCompareDiffs(fetchedRowsByNode map[string]map[string]m newRow1, nowOnNode1 := fetchedRowsByNode[node1][pkStr] newRow2, nowOnNode2 := fetchedRowsByNode[node2][pkStr] - _, wasOnNode1 := originalNode1Rows[pkStr] - _, wasOnNode2 := originalNode2Rows[pkStr] - isDifferent := false - if wasOnNode1 && wasOnNode2 { - if !nowOnNode1 || !nowOnNode2 || !reflect.DeepEqual(newRow1, newRow2) { - isDifferent = true - } - } else if wasOnNode1 { - if !nowOnNode1 || nowOnNode2 { - isDifferent = true - } + if !nowOnNode1 || !nowOnNode2 { + isDifferent = true } else { - if nowOnNode1 || !nowOnNode2 { - isDifferent = true + for _, colName := range t.Cols { + val1 := newRow1[colName] + val2 := newRow2[colName] + if !reflect.DeepEqual(val1, val2) { + isDifferent = true + break + } } }