Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions scenarios/throughput_stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ const (
// SleepActivityJsonFlag is a JSON string that defines the sleep activity's behavior.
// See throughputstress.SleepActivityConfig for details.
SleepActivityJsonFlag = "sleep-activity-json"
// MinThroughputPerHourFlag is the minimum workflow throughput required (workflows/hour).
// Default is 0, meaning disabled. The scenario calculates actual throughput and compares.
MinThroughputPerHourFlag = "min-throughput-per-hour"
)

const (
Expand All @@ -47,6 +50,9 @@ type tpsState struct {
CompletedIterations int `json:"completedIterations"`
// LastCompletedIterationAt is the time when the last iteration was completed. Helpful for debugging.
LastCompletedIterationAt time.Time `json:"lastCompletedIterationAt"`
// AccumulatedDuration is the total execution time across all runs (original + resumes).
// This excludes any downtime between runs. Used for accurate throughput calculation.
AccumulatedDuration time.Duration `json:"accumulatedDuration"`
}

type tpsConfig struct {
Expand All @@ -58,6 +64,7 @@ type tpsConfig struct {
SkipCleanNamespaceCheck bool
SleepActivities *loadgen.SleepActivityConfig
VisibilityVerificationTimeout time.Duration
MinThroughputPerHour float64
ScenarioRunID string
RngSeed int64
}
Expand Down Expand Up @@ -111,11 +118,13 @@ func (t *tpsExecutor) LoadState(loader func(any) error) error {
return nil
}

// Configure initializes tpsConfig. Largely, it reads and validates throughput_stress scenario options
func (t *tpsExecutor) Configure(info loadgen.ScenarioInfo) error {
config := &tpsConfig{
InternalIterTimeout: info.ScenarioOptionDuration(IterTimeoutFlag, cmp.Or(info.Configuration.Duration+1*time.Minute, 1*time.Minute)),
NexusEndpoint: info.ScenarioOptions[NexusEndpointFlag],
SkipCleanNamespaceCheck: info.ScenarioOptionBool(SkipCleanNamespaceCheckFlag, false),
MinThroughputPerHour: info.ScenarioOptionFloat(MinThroughputPerHourFlag, 0),
ScenarioRunID: info.RunID,
}

Expand Down Expand Up @@ -174,6 +183,9 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
}
t.runID = info.RunID

// Track start time of current run
currentRunStartTime := time.Now()

// Add search attribute, if it doesn't exist yet, to query for workflows by run ID.
// Running this on resume, too, in case a previous Omes run crashed before it could add the search attribute.
if err := loadgen.InitSearchAttribute(ctx, info, ThroughputStressScenarioIdSearchAttribute); err != nil {
Expand Down Expand Up @@ -257,6 +269,8 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error

t.lock.Lock()
completedIterations := t.state.CompletedIterations
t.state.AccumulatedDuration += time.Since(currentRunStartTime)
totalDuration := t.state.AccumulatedDuration
t.lock.Unlock()

completedChildWorkflows := completedIterations * t.config.InternalIterations
Expand Down Expand Up @@ -299,6 +313,24 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
return err
}

// Post-scenario: check throughput threshold
if t.config.MinThroughputPerHour > 0 {
actualThroughputPerHour := float64(completedWorkflows) / totalDuration.Hours()

if actualThroughputPerHour < t.config.MinThroughputPerHour {
// Calculate how many workflows we expected given the duration
expectedWorkflows := int(totalDuration.Hours() * t.config.MinThroughputPerHour)

return fmt.Errorf("insufficient throughput: %.1f workflows/hour < %.1f required "+
"(completed %d workflows, expected %d in %v)",
actualThroughputPerHour,
t.config.MinThroughputPerHour,
completedWorkflows,
expectedWorkflows,
totalDuration.Round(time.Second))
}
}

// Post-scenario: ensure there are no failed or terminated workflows for this run.
return loadgen.VerifyNoFailedWorkflows(ctx, info, ThroughputStressScenarioIdSearchAttribute, info.RunID)
}
Expand Down