diff --git a/internal/verifier/check.go b/internal/verifier/check.go index be64d92a..b7a1338d 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -40,6 +40,7 @@ func (verifier *Verifier) Check(ctx context.Context, filter map[string]any) { verifier.logger.Fatal().Err(err).Msgf("Fatal error in generation %d", verifier.generation) } }() + verifier.MaybeStartPeriodicHeapProfileCollection(ctx) } func (verifier *Verifier) waitForChangeStream() error { diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 711ed9f4..9271634b 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -136,6 +136,8 @@ type Verifier struct { // The filter is applied to all namespaces in both initial checking and iterative checking. // The verifier only checks documents within the filter. globalFilter map[string]any + + pprofInterval time.Duration } // VerificationStatus holds the Verification Status @@ -360,6 +362,20 @@ func (verifier *Verifier) SetReadPreference(arg string) error { return err } +func (verifier *Verifier) SetPprofInterval(arg string) error { + if arg == "" { + return nil + } + + interval, err := time.ParseDuration(arg) + if err != nil { + return err + } + + verifier.pprofInterval = interval + return nil +} + // DocumentStats gets various stats (TODO clarify) func DocumentStats(ctx context.Context, client *mongo.Client, namespaces []string) { diff --git a/internal/verifier/pprof.go b/internal/verifier/pprof.go new file mode 100644 index 00000000..1ffc3371 --- /dev/null +++ b/internal/verifier/pprof.go @@ -0,0 +1,45 @@ +package verifier + +import ( + "context" + "fmt" + "os" + "runtime/pprof" + "time" +) + +func (verifier *Verifier) MaybeStartPeriodicHeapProfileCollection(ctx context.Context) { + if verifier.pprofInterval == 0 { + return + } + + go func() { + ticker := time.NewTicker(verifier.pprofInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + collectHeapUsage() + } + } + }() + +} + +func collectHeapUsage() { + heapFileName := fmt.Sprintf("heap-%s.out", time.Now().UTC().Format("20060102T150405Z")) + heapFile, err := os.Create(heapFileName) + defer heapFile.Close() + + if err != nil { + panic(err) + } + + err = pprof.Lookup("heap").WriteTo(heapFile, 0) + if err != nil { + panic(err) + } +} diff --git a/main/migration_verifier.go b/main/migration_verifier.go index 8513b2d5..4289eb5d 100644 --- a/main/migration_verifier.go +++ b/main/migration_verifier.go @@ -40,6 +40,7 @@ const ( failureDisplaySize = "failureDisplaySize" ignoreReadConcernFlag = "ignoreReadConcern" configFileFlag = "configFile" + pprofInterval = "pprofInterval" ) func main() { @@ -146,6 +147,10 @@ func main() { Name: ignoreReadConcernFlag, Usage: "Use connection-default read concerns rather than setting majority read concern. This option may degrade consistency, so only enable it if majority read concern (the default) doesn’t work.", }), + altsrc.NewStringFlag(cli.StringFlag{ + Name: pprofInterval, + Usage: "Interval to periodically collect pprof profiles (e.g. --pprofInterval=\"5m\")", + }), } app := &cli.App{ @@ -207,6 +212,7 @@ func handleArgs(ctx context.Context, cCtx *cli.Context) (*verifier.Verifier, err v.SetNumWorkers(cCtx.Int(numWorkers)) v.SetGenerationPauseDelayMillis(time.Duration(cCtx.Int64(generationPauseDelay))) v.SetWorkerSleepDelayMillis(time.Duration(cCtx.Int64(workerSleepDelay))) + v.SetPprofInterval(cCtx.String(pprofInterval)) partitionSizeMB := cCtx.Uint64(partitionSizeMB) if partitionSizeMB != 0 {