From 55f9d72cdc9856b0f7183a0fbf666549e8f55c3d Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Sun, 17 Nov 2024 23:14:25 -0500 Subject: [PATCH 1/4] count active threads --- internal/verifier/check.go | 4 ++ internal/verifier/migration_verifier.go | 12 +++++ internal/verifier/summary.go | 41 +++++++++++++++++ internal/verifier/worker_tracker.go | 60 +++++++++++++++++++++++++ 4 files changed, 117 insertions(+) create mode 100644 internal/verifier/worker_tracker.go diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 5a60bfb5..7c6dafd1 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -399,6 +399,8 @@ func (verifier *Verifier) Work(ctx context.Context, workerNum int, wg *sync.Wait panic(err) } + verifier.workerTracker.Set(workerNum, *task) + if task.Type == verificationTaskVerifyCollection { verifier.ProcessCollectionVerificationTask(ctx, workerNum, task) if task.Generation == 0 { @@ -410,6 +412,8 @@ func (verifier *Verifier) Work(ctx context.Context, workerNum int, wg *sync.Wait } else { verifier.ProcessVerifyTask(workerNum, task) } + + verifier.workerTracker.Unset(workerNum) } } } diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 809e4fe3..bbbc2572 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -138,6 +138,8 @@ type Verifier struct { globalFilter map[string]any pprofInterval time.Duration + + workerTracker *WorkerTracker } // VerificationStatus holds the Verification Status @@ -201,6 +203,8 @@ func NewVerifier(settings VerifierSettings) *Verifier { // This will get recreated once gen0 starts, but we want it // here in case the change streams gets an event before then. eventRecorder: NewEventRecorder(), + + workerTracker: NewWorkerTracker(NumWorkers), } } @@ -1348,6 +1352,14 @@ func (verifier *Verifier) PrintVerificationSummary(ctx context.Context, genstatu verifier.printChangeEventStatistics(strBuilder) + // Only print the worker status table if debug logging is enabled. + if verifier.logger.Debug().Enabled() { + switch genstatus { + case Gen0MetadataAnalysisComplete, GenerationInProgress: + verifier.printWorkerStatus(strBuilder) + } + } + var statusLine string if hasTasks { diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index 54e45de1..05487682 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -15,6 +15,7 @@ import ( "github.com/10gen/migration-verifier/internal/reportutils" "github.com/10gen/migration-verifier/internal/types" "github.com/olekukonko/tablewriter" + "go.mongodb.org/mongo-driver/bson/primitive" "golang.org/x/exp/maps" ) @@ -425,3 +426,43 @@ func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) { builder.WriteString("\nMost frequently-changing namespaces:\n") table.Render() } + +func (verifier *Verifier) printWorkerStatus(builder *strings.Builder) { + + table := tablewriter.NewWriter(builder) + table.SetHeader([]string{"Thread #", "Namespace", "Task", "Time Elapsed"}) + + wsmap := verifier.workerTracker.Load() + + activeThreadCount := 0 + for w := 0; w <= verifier.numWorkers; w++ { + if wsmap[w].TaskID == nil { + continue + } + + activeThreadCount++ + + var taskIdStr string + + switch id := wsmap[w].TaskID.(type) { + case primitive.ObjectID: + theBytes, _ := id.MarshalText() + + taskIdStr = string(theBytes) + default: + taskIdStr = fmt.Sprintf("%s", wsmap[w].TaskID) + } + + table.Append( + []string{ + strconv.Itoa(w), + wsmap[w].Namespace, + taskIdStr, + reportutils.DurationToHMS(time.Since(wsmap[w].StartTime)), + }, + ) + } + + builder.WriteString(fmt.Sprintf("\nActive worker threads (%d):\n", activeThreadCount)) + table.Render() +} diff --git a/internal/verifier/worker_tracker.go b/internal/verifier/worker_tracker.go new file mode 100644 index 00000000..110c50df --- /dev/null +++ b/internal/verifier/worker_tracker.go @@ -0,0 +1,60 @@ +package verifier + +import ( + "time" + + "github.com/10gen/migration-verifier/msync" +) + +type WorkerTracker struct { + guard *msync.DataGuard[WorkerStatusMap] +} + +type WorkerStatusMap = map[int]WorkerStatus + +type WorkerStatus struct { + TaskID any + TaskType verificationTaskType + Namespace string + StartTime time.Time +} + +func NewWorkerTracker(workersCount int) *WorkerTracker { + wsmap := WorkerStatusMap{} + for i := 0; i < workersCount; i++ { + wsmap[i] = WorkerStatus{} + } + return &WorkerTracker{ + guard: msync.NewDataGuard(wsmap), + } +} + +func (wt *WorkerTracker) Set(workerNum int, task VerificationTask) { + wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap { + m[workerNum] = WorkerStatus{ + TaskID: task.PrimaryKey, + TaskType: task.Type, + Namespace: task.QueryFilter.Namespace, + StartTime: time.Now(), + } + + return m + }) +} + +func (wt *WorkerTracker) Unset(workerNum int) { + wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap { + m[workerNum] = WorkerStatus{} + + return m + }) +} + +func (wt *WorkerTracker) Load() WorkerStatusMap { + var wtmap WorkerStatusMap + wt.guard.Load(func(m map[int]WorkerStatus) { + wtmap = m + }) + + return wtmap +} From 77a61ac9487c538cc4dfb30e605df247f7e943d5 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 18 Nov 2024 09:16:46 -0500 Subject: [PATCH 2/4] save --- internal/verifier/check.go | 6 ------ internal/verifier/summary.go | 7 ++++++- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 7c6dafd1..3c7d2739 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -387,12 +387,6 @@ func (verifier *Verifier) Work(ctx context.Context, workerNum int, wg *sync.Wait task, err := verifier.FindNextVerifyTaskAndUpdate() if errors.Is(err, mongo.ErrNoDocuments) { duration := verifier.workerSleepDelayMillis * time.Millisecond - - verifier.logger.Debug(). - Int("workerNum", workerNum). - Stringer("duration", duration). - Msg("No tasks found. Sleeping.") - time.Sleep(duration) continue } else if err != nil { diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index 05487682..4c86ae67 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -463,6 +463,11 @@ func (verifier *Verifier) printWorkerStatus(builder *strings.Builder) { ) } - builder.WriteString(fmt.Sprintf("\nActive worker threads (%d):\n", activeThreadCount)) + builder.WriteString(fmt.Sprintf( + "\nActive worker threads (%d of %d):\n", + activeThreadCount, + verifier.numWorkers, + )) + table.Render() } From b45667915fabd571e7992927545ecb3d38ff547a Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 26 Nov 2024 20:49:50 -0500 Subject: [PATCH 3/4] godoc --- internal/verifier/worker_tracker.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/internal/verifier/worker_tracker.go b/internal/verifier/worker_tracker.go index 110c50df..24cf40da 100644 --- a/internal/verifier/worker_tracker.go +++ b/internal/verifier/worker_tracker.go @@ -6,12 +6,18 @@ import ( "github.com/10gen/migration-verifier/msync" ) +// WorkerTracker holds certain data points about each worker thread +// in a check generation. It is thread-safe. type WorkerTracker struct { guard *msync.DataGuard[WorkerStatusMap] } +// WorkerStatusMap represents the status of each worker, +// indexed by worker number (which start at 0). type WorkerStatusMap = map[int]WorkerStatus +// WorkerStatus details the work that an individual worker thread +// is doing. type WorkerStatus struct { TaskID any TaskType verificationTaskType @@ -19,6 +25,7 @@ type WorkerStatus struct { StartTime time.Time } +// NewWorkerTracker creates and returns a WorkerTracker. func NewWorkerTracker(workersCount int) *WorkerTracker { wsmap := WorkerStatusMap{} for i := 0; i < workersCount; i++ { @@ -29,6 +36,7 @@ func NewWorkerTracker(workersCount int) *WorkerTracker { } } +// Set updates the worker’s state in the WorkerTracker. func (wt *WorkerTracker) Set(workerNum int, task VerificationTask) { wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap { m[workerNum] = WorkerStatus{ @@ -42,6 +50,7 @@ func (wt *WorkerTracker) Set(workerNum int, task VerificationTask) { }) } +// Unset tells the WorkerTracker that the worker is now inactive. func (wt *WorkerTracker) Unset(workerNum int) { wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap { m[workerNum] = WorkerStatus{} @@ -50,6 +59,8 @@ func (wt *WorkerTracker) Unset(workerNum int) { }) } +// Load duplicates and returns the WorkerTracker’s internal +// state map. func (wt *WorkerTracker) Load() WorkerStatusMap { var wtmap WorkerStatusMap wt.guard.Load(func(m map[int]WorkerStatus) { From 46b040cddb4a184b8f8b24b4c97e336f3d10df4a Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 26 Nov 2024 21:11:38 -0500 Subject: [PATCH 4/4] clone the map on load --- internal/verifier/worker_tracker.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/verifier/worker_tracker.go b/internal/verifier/worker_tracker.go index 24cf40da..116f8d82 100644 --- a/internal/verifier/worker_tracker.go +++ b/internal/verifier/worker_tracker.go @@ -4,6 +4,7 @@ import ( "time" "github.com/10gen/migration-verifier/msync" + "golang.org/x/exp/maps" ) // WorkerTracker holds certain data points about each worker thread @@ -64,7 +65,7 @@ func (wt *WorkerTracker) Unset(workerNum int) { func (wt *WorkerTracker) Load() WorkerStatusMap { var wtmap WorkerStatusMap wt.guard.Load(func(m map[int]WorkerStatus) { - wtmap = m + wtmap = maps.Clone(m) }) return wtmap