Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
)
}

verifier.workerTracker.Set(workerNum, *task)

switch task.Type {
case verificationTaskVerifyCollection:
err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, task)
Expand Down
12 changes: 12 additions & 0 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ type Verifier struct {

pprofInterval time.Duration

workerTracker *WorkerTracker

verificationStatusCheckInterval time.Duration
}

Expand Down Expand Up @@ -206,6 +208,8 @@ func NewVerifier(settings VerifierSettings) *Verifier {
// here in case the change streams gets an event before then.
eventRecorder: NewEventRecorder(),

workerTracker: NewWorkerTracker(NumWorkers),

verificationStatusCheckInterval: 15 * time.Second,
}
}
Expand Down Expand Up @@ -1515,6 +1519,14 @@ func (verifier *Verifier) PrintVerificationSummary(ctx context.Context, genstatu

verifier.printChangeEventStatistics(strBuilder)

// Only print the worker status table if debug logging is enabled.
if verifier.logger.Debug().Enabled() {
switch genstatus {
case Gen0MetadataAnalysisComplete, GenerationInProgress:
verifier.printWorkerStatus(strBuilder)
}
}

var statusLine string

if hasTasks {
Expand Down
46 changes: 46 additions & 0 deletions internal/verifier/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/10gen/migration-verifier/internal/reportutils"
"github.com/10gen/migration-verifier/internal/types"
"github.com/olekukonko/tablewriter"
"go.mongodb.org/mongo-driver/bson/primitive"
"golang.org/x/exp/maps"
)

Expand Down Expand Up @@ -425,3 +426,48 @@ func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) {
builder.WriteString("\nMost frequently-changing namespaces:\n")
table.Render()
}

func (verifier *Verifier) printWorkerStatus(builder *strings.Builder) {

table := tablewriter.NewWriter(builder)
table.SetHeader([]string{"Thread #", "Namespace", "Task", "Time Elapsed"})

wsmap := verifier.workerTracker.Load()

activeThreadCount := 0
for w := 0; w <= verifier.numWorkers; w++ {
if wsmap[w].TaskID == nil {
continue
}

activeThreadCount++

var taskIdStr string

switch id := wsmap[w].TaskID.(type) {
case primitive.ObjectID:
theBytes, _ := id.MarshalText()

taskIdStr = string(theBytes)
default:
taskIdStr = fmt.Sprintf("%s", wsmap[w].TaskID)
}

table.Append(
[]string{
strconv.Itoa(w),
wsmap[w].Namespace,
taskIdStr,
reportutils.DurationToHMS(time.Since(wsmap[w].StartTime)),
},
)
}

builder.WriteString(fmt.Sprintf(
"\nActive worker threads (%d of %d):\n",
activeThreadCount,
verifier.numWorkers,
))

table.Render()
}
72 changes: 72 additions & 0 deletions internal/verifier/worker_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package verifier

import (
"time"

"github.com/10gen/migration-verifier/msync"
"golang.org/x/exp/maps"
)

// WorkerTracker holds certain data points about each worker thread
// in a check generation. It is thread-safe.
type WorkerTracker struct {
guard *msync.DataGuard[WorkerStatusMap]
}

// WorkerStatusMap represents the status of each worker,
// indexed by worker number (which start at 0).
type WorkerStatusMap = map[int]WorkerStatus

// WorkerStatus details the work that an individual worker thread
// is doing.
type WorkerStatus struct {
TaskID any
TaskType verificationTaskType
Namespace string
StartTime time.Time
}

// NewWorkerTracker creates and returns a WorkerTracker.
func NewWorkerTracker(workersCount int) *WorkerTracker {
wsmap := WorkerStatusMap{}
for i := 0; i < workersCount; i++ {
wsmap[i] = WorkerStatus{}
}
return &WorkerTracker{
guard: msync.NewDataGuard(wsmap),
}
}

// Set updates the worker’s state in the WorkerTracker.
func (wt *WorkerTracker) Set(workerNum int, task VerificationTask) {
wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap {
m[workerNum] = WorkerStatus{
TaskID: task.PrimaryKey,
TaskType: task.Type,
Namespace: task.QueryFilter.Namespace,
StartTime: time.Now(),
}

return m
})
}

// Unset tells the WorkerTracker that the worker is now inactive.
func (wt *WorkerTracker) Unset(workerNum int) {
wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap {
m[workerNum] = WorkerStatus{}

return m
})
}

// Load duplicates and returns the WorkerTracker’s internal
// state map.
func (wt *WorkerTracker) Load() WorkerStatusMap {
var wtmap WorkerStatusMap
wt.guard.Load(func(m map[int]WorkerStatus) {
wtmap = maps.Clone(m)
})

return wtmap
}