diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 836d098b..425a90d2 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -448,6 +448,8 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error { ) } + verifier.workerTracker.Set(workerNum, *task) + switch task.Type { case verificationTaskVerifyCollection: err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, task) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 924cada3..9b056a51 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -142,6 +142,8 @@ type Verifier struct { pprofInterval time.Duration + workerTracker *WorkerTracker + verificationStatusCheckInterval time.Duration } @@ -206,6 +208,8 @@ func NewVerifier(settings VerifierSettings) *Verifier { // here in case the change streams gets an event before then. eventRecorder: NewEventRecorder(), + workerTracker: NewWorkerTracker(NumWorkers), + verificationStatusCheckInterval: 15 * time.Second, } } @@ -1515,6 +1519,14 @@ func (verifier *Verifier) PrintVerificationSummary(ctx context.Context, genstatu verifier.printChangeEventStatistics(strBuilder) + // Only print the worker status table if debug logging is enabled. + if verifier.logger.Debug().Enabled() { + switch genstatus { + case Gen0MetadataAnalysisComplete, GenerationInProgress: + verifier.printWorkerStatus(strBuilder) + } + } + var statusLine string if hasTasks { diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index 54e45de1..4c86ae67 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -15,6 +15,7 @@ import ( "github.com/10gen/migration-verifier/internal/reportutils" "github.com/10gen/migration-verifier/internal/types" "github.com/olekukonko/tablewriter" + "go.mongodb.org/mongo-driver/bson/primitive" "golang.org/x/exp/maps" ) @@ -425,3 +426,48 @@ func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) { builder.WriteString("\nMost frequently-changing namespaces:\n") table.Render() } + +func (verifier *Verifier) printWorkerStatus(builder *strings.Builder) { + + table := tablewriter.NewWriter(builder) + table.SetHeader([]string{"Thread #", "Namespace", "Task", "Time Elapsed"}) + + wsmap := verifier.workerTracker.Load() + + activeThreadCount := 0 + for w := 0; w <= verifier.numWorkers; w++ { + if wsmap[w].TaskID == nil { + continue + } + + activeThreadCount++ + + var taskIdStr string + + switch id := wsmap[w].TaskID.(type) { + case primitive.ObjectID: + theBytes, _ := id.MarshalText() + + taskIdStr = string(theBytes) + default: + taskIdStr = fmt.Sprintf("%s", wsmap[w].TaskID) + } + + table.Append( + []string{ + strconv.Itoa(w), + wsmap[w].Namespace, + taskIdStr, + reportutils.DurationToHMS(time.Since(wsmap[w].StartTime)), + }, + ) + } + + builder.WriteString(fmt.Sprintf( + "\nActive worker threads (%d of %d):\n", + activeThreadCount, + verifier.numWorkers, + )) + + table.Render() +} diff --git a/internal/verifier/worker_tracker.go b/internal/verifier/worker_tracker.go new file mode 100644 index 00000000..116f8d82 --- /dev/null +++ b/internal/verifier/worker_tracker.go @@ -0,0 +1,72 @@ +package verifier + +import ( + "time" + + "github.com/10gen/migration-verifier/msync" + "golang.org/x/exp/maps" +) + +// WorkerTracker holds certain data points about each worker thread +// in a check generation. It is thread-safe. +type WorkerTracker struct { + guard *msync.DataGuard[WorkerStatusMap] +} + +// WorkerStatusMap represents the status of each worker, +// indexed by worker number (which start at 0). +type WorkerStatusMap = map[int]WorkerStatus + +// WorkerStatus details the work that an individual worker thread +// is doing. +type WorkerStatus struct { + TaskID any + TaskType verificationTaskType + Namespace string + StartTime time.Time +} + +// NewWorkerTracker creates and returns a WorkerTracker. +func NewWorkerTracker(workersCount int) *WorkerTracker { + wsmap := WorkerStatusMap{} + for i := 0; i < workersCount; i++ { + wsmap[i] = WorkerStatus{} + } + return &WorkerTracker{ + guard: msync.NewDataGuard(wsmap), + } +} + +// Set updates the worker’s state in the WorkerTracker. +func (wt *WorkerTracker) Set(workerNum int, task VerificationTask) { + wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap { + m[workerNum] = WorkerStatus{ + TaskID: task.PrimaryKey, + TaskType: task.Type, + Namespace: task.QueryFilter.Namespace, + StartTime: time.Now(), + } + + return m + }) +} + +// Unset tells the WorkerTracker that the worker is now inactive. +func (wt *WorkerTracker) Unset(workerNum int) { + wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap { + m[workerNum] = WorkerStatus{} + + return m + }) +} + +// Load duplicates and returns the WorkerTracker’s internal +// state map. +func (wt *WorkerTracker) Load() WorkerStatusMap { + var wtmap WorkerStatusMap + wt.guard.Load(func(m map[int]WorkerStatus) { + wtmap = maps.Clone(m) + }) + + return wtmap +}