From ec81f9742ebb9348290e5f8d45f341ce0bfb9a9d Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Thu, 11 Dec 2025 21:15:05 +0000 Subject: [PATCH 1/5] fix benchmark --- workflow_test.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/workflow_test.go b/workflow_test.go index a1d12f5..df4f977 100644 --- a/workflow_test.go +++ b/workflow_test.go @@ -218,15 +218,15 @@ func acceptanceTestWorkflow() *workflow.Builder[MyType, status] { } func BenchmarkWorkflow(b *testing.B) { - b.Run("1", func(b *testing.B) { - benchmarkWorkflow(b, 1) - }) b.Run("5", func(b *testing.B) { benchmarkWorkflow(b, 5) }) b.Run("10", func(b *testing.B) { benchmarkWorkflow(b, 10) }) + b.Run("100", func(b *testing.B) { + benchmarkWorkflow(b, 10) + }) } func benchmarkWorkflow(b *testing.B, numberOfSteps int) { @@ -236,9 +236,11 @@ func benchmarkWorkflow(b *testing.B, numberOfSteps int) { bldr := workflow.NewBuilder[MyType, status]("benchmark") for i := range numberOfSteps { - bldr.AddStep(status(i), func(ctx context.Context, r *workflow.Run[MyType, status]) (status, error) { - return status(i + 1), nil - }, status(i+1)) + from := i + 1 + to := i + 2 + bldr.AddStep(status(from), func(ctx context.Context, r *workflow.Run[MyType, status]) (status, error) { + return status(to), nil + }, status(to)) } recordStore := memrecordstore.New() @@ -266,7 +268,7 @@ func benchmarkWorkflow(b *testing.B, numberOfSteps int) { b.Fatal(err) } - workflow.Require(b, wf, fid, status(numberOfSteps), MyType{ + workflow.Require(b, wf, fid, status(numberOfSteps+1), MyType{ UserID: expectedUserID, }) } From 1c24fdd38a11c091b6142395fa881b8509746e41 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Thu, 11 Dec 2025 21:21:07 +0000 Subject: [PATCH 2/5] pool run objects --- builder.go | 6 ++++++ callback.go | 5 ++++- callback_internal_test.go | 6 ++++++ run.go | 45 +++++++++++++++++++++++++++++++-------- schedule_test.go | 2 +- step.go | 9 +++++++- step_internal_test.go | 8 +++++++ testing.go | 5 ++++- timeout.go | 7 +++++- timeout_internal_test.go | 6 ++++++ workflow.go | 3 +++ 11 files changed, 88 insertions(+), 14 deletions(-) diff --git a/builder.go b/builder.go index fe90adb..36a9489 100644 --- a/builder.go +++ b/builder.go @@ -3,6 +3,7 @@ package workflow import ( "context" "os" + "sync" "time" "k8s.io/utils/clock" @@ -38,6 +39,11 @@ func NewBuilder[Type any, Status StatusType](name string) *Builder[Type, Status] inner: interal_logger.New(os.Stdout), }, runStateChangeHooks: make(map[RunState]RunStateChangeHookFunc[Type, Status]), + runPool: sync.Pool{ + New: func() interface{} { + return &Run[Type, Status]{} + }, + }, }, } } diff --git a/callback.go b/callback.go index 1660a98..0ad378d 100644 --- a/callback.go +++ b/callback.go @@ -63,11 +63,14 @@ func processCallback[Type any, Status StatusType]( return nil } - run, err := buildRun[Type, Status](store, wr) + run, err := buildRun[Type, Status](w.newRunObj(), store, wr) if err != nil { return err } + // Ensure the run is returned to the pool when we're done + defer w.releaseRun(run) + if payload == nil { // Ensure that an empty value implementation of io.Reader is passed in instead of nil to avoid panic and // rather allow an unmarshalling error. diff --git a/callback_internal_test.go b/callback_internal_test.go index 64abc8f..45390ab 100644 --- a/callback_internal_test.go +++ b/callback_internal_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "io" + "sync" "testing" "time" @@ -21,6 +22,11 @@ func TestProcessCallback(t *testing.T) { clock: clock_testing.NewFakeClock(time.Date(2024, time.April, 19, 0, 0, 0, 0, time.UTC)), statusGraph: graph.New(), logger: &logger{}, + runPool: sync.Pool{ + New: func() interface{} { + return &Run[string, testStatus]{} + }, + }, } w.statusGraph.AddTransition(int(statusStart), int(statusEnd)) diff --git a/run.go b/run.go index 7416771..c0154ee 100644 --- a/run.go +++ b/run.go @@ -15,6 +15,9 @@ type Run[Type any, Status StatusType] struct { controller RunStateController } +func (r *Run[Type, Status]) reset() { +} + // Pause is intended to be used inside a workflow process where (Status, error) are the return signature. This allows // the user to simply type "return r.Pause(ctx)" to pause a record from inside a workflow which results in the record // being temporarily left alone and will not be processed until it is resumed. @@ -48,7 +51,12 @@ func (r *Run[Type, Status]) SaveAndRepeat() (Status, error) { return Status(skipTypeSaveAndRepeat), nil } -func buildRun[Type any, Status StatusType](store storeFunc, wr *Record) (*Run[Type, Status], error) { +type ( + runCollector[Type any, Status StatusType] func() *Run[Type, Status] + runReleaser[Type any, Status StatusType] func(*Run[Type, Status]) +) + +func buildRun[Type any, Status StatusType](collector runCollector[Type, Status], store storeFunc, wr *Record) (*Run[Type, Status], error) { var t Type err := Unmarshal(wr.Object, &t) if err != nil { @@ -62,15 +70,34 @@ func buildRun[Type any, Status StatusType](store storeFunc, wr *Record) (*Run[Ty wr.RunState = RunStateRunning } + // Get Run from pool and initialize + run := collector() + + // Reset/initialize the run object controller := NewRunStateController(store, wr) - record := Run[Type, Status]{ - TypedRecord: TypedRecord[Type, Status]{ - Record: *wr, - Status: Status(wr.Status), - Object: &t, - }, - controller: controller, + run.TypedRecord = TypedRecord[Type, Status]{ + Record: *wr, + Status: Status(wr.Status), + Object: &t, } + run.controller = controller + + return run, nil +} + +// newRunObj returns a function that gets a Run from the pool +func (w *Workflow[Type, Status]) newRunObj() runCollector[Type, Status] { + return func() *Run[Type, Status] { + return w.runPool.Get().(*Run[Type, Status]) + } +} + +// releaseRun returns a Run object back to the workflow's pool for reuse +func (w *Workflow[Type, Status]) releaseRun(run *Run[Type, Status]) { + // Clear references to prevent memory leaks + run.Object = nil + run.controller = nil + // Note: We don't clear the Record as it's a value type - return &record, nil + w.runPool.Put(run) } diff --git a/schedule_test.go b/schedule_test.go index 0813b4b..eeaba28 100644 --- a/schedule_test.go +++ b/schedule_test.go @@ -282,7 +282,7 @@ func TestWorkflow_ScheduleWithInitialValue(t *testing.T) { clock.SetTime(expectedTimestamp) // Allow scheduling to take place - time.Sleep(10 * time.Millisecond) + time.Sleep(time.Second) run, err := recordStore.Latest(ctx, workflowName, "test") require.NoError(t, err) diff --git a/step.go b/step.go index aa3536d..d1f2568 100644 --- a/step.go +++ b/step.go @@ -91,6 +91,8 @@ func consumeStepEvents[Type any, Status StatusType]( updater, pauseAfterErrCount, w.errorCounter, + w.newRunObj(), + w.releaseRun, ), w.clock, lag, @@ -111,6 +113,8 @@ func stepConsumer[Type any, Status StatusType]( updater updater[Type, Status], pauseAfterErrCount int, errorCounter ErrorCounter, + runCollector runCollector[Type, Status], + runReleaser runReleaser[Type, Status], ) func(ctx context.Context, e *Event) error { return func(ctx context.Context, e *Event) error { record, err := lookupFn(ctx, e.ForeignID) @@ -160,11 +164,14 @@ func stepConsumer[Type any, Status StatusType]( return nil } - run, err := buildRun[Type, Status](store, record) + run, err := buildRun[Type, Status](runCollector, store, record) if err != nil { return err } + // Ensure the run is returned to the pool when we're done + defer runReleaser(run) + next, err := stepLogic(ctx, run) if err != nil { originalErr := err diff --git a/step_internal_test.go b/step_internal_test.go index 9809276..bd299a7 100644 --- a/step_internal_test.go +++ b/step_internal_test.go @@ -90,6 +90,8 @@ func Test_stepConsumer(t *testing.T) { updater, 0, w.errorCounter, + func() *Run[string, testStatus] { return &Run[string, testStatus]{} }, + func(*Run[string, testStatus]) {}, )(ctx, &Event{}) require.NoError(t, err) @@ -141,6 +143,8 @@ func Test_stepConsumer(t *testing.T) { updater, 0, w.errorCounter, + func() *Run[string, testStatus] { return &Run[string, testStatus]{} }, + func(*Run[string, testStatus]) {}, )(ctx, &Event{}) require.NoError(t, err) @@ -190,6 +194,8 @@ func Test_stepConsumer(t *testing.T) { updater, 0, w.errorCounter, + func() *Run[string, testStatus] { return &Run[string, testStatus]{} }, + func(*Run[string, testStatus]) {}, )(ctx, &Event{}) require.NoError(t, err) @@ -241,6 +247,8 @@ func Test_stepConsumer(t *testing.T) { updater, 3, w.errorCounter, + func() *Run[string, testStatus] { return &Run[string, testStatus]{} }, + func(*Run[string, testStatus]) {}, ) require.NoError(t, err) diff --git a/testing.go b/testing.go index 6779f0e..96bc1c3 100644 --- a/testing.go +++ b/testing.go @@ -150,9 +150,12 @@ func WaitFor[Type any, Status StatusType]( } _, err := waitFor(t, w, foreignID, func(r *Record) (bool, error) { - run, err := buildRun[Type, Status](w.recordStore.Store, r) + run, err := buildRun[Type, Status](w.newRunObj(), w.recordStore.Store, r) require.NoError(t, err) + // Ensure the run is returned to the pool when we're done + defer w.releaseRun(run) + return fn(run) }) require.NoError(t, err) diff --git a/timeout.go b/timeout.go index efad533..69aac28 100644 --- a/timeout.go +++ b/timeout.go @@ -118,11 +118,14 @@ func processTimeout[Type any, Status StatusType]( processName string, pauseAfterErrCount int, ) error { - run, err := buildRun[Type, Status](store, record) + run, err := buildRun[Type, Status](w.newRunObj(), store, record) if err != nil { return err } + // Ensure the run is returned to the pool when we're done + defer w.releaseRun(run) + next, err := config.TimeoutFunc(ctx, run, w.clock.Now()) if err != nil { _, err := maybePause(ctx, pauseAfterErrCount, w.errorCounter, err, processName, run, w.logger) @@ -287,6 +290,8 @@ func timeoutAutoInserterConsumer[Type any, Status StatusType]( updater, pauseAfterErrCount, w.errorCounter, + w.newRunObj(), + w.releaseRun, ), w.clock, 0, diff --git a/timeout_internal_test.go b/timeout_internal_test.go index 79f3491..6ebebef 100644 --- a/timeout_internal_test.go +++ b/timeout_internal_test.go @@ -3,6 +3,7 @@ package workflow import ( "context" "errors" + "sync" "testing" "time" @@ -23,6 +24,11 @@ func TestProcessTimeout(t *testing.T) { clock: clock_testing.NewFakeClock(time.Date(2024, time.April, 19, 0, 0, 0, 0, time.UTC)), errorCounter: counter, logger: &logger{}, + runPool: sync.Pool{ + New: func() interface{} { + return &Run[string, testStatus]{} + }, + }, } value := "data" diff --git a/workflow.go b/workflow.go index 81874b4..155f7a7 100644 --- a/workflow.go +++ b/workflow.go @@ -92,6 +92,9 @@ type Workflow[Type any, Status StatusType] struct { // and block until this transition is complete. launching sync.WaitGroup + // runPool pools Run objects to reduce allocations + runPool sync.Pool + // defaultStartingPoint defines that status that the workflow run will start on when Trigger is called. defaultStartingPoint Status statusGraph *graph.Graph From c9b5a8f628f7962c3c18b2f62e433c638793cc74 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Thu, 11 Dec 2025 21:35:53 +0000 Subject: [PATCH 3/5] pool run objects --- builder.go | 7 +------ callback_internal_test.go | 7 +------ run.go | 17 +++++++++++++++++ schedule_test.go | 2 +- timeout_internal_test.go | 7 +------ workflow.go | 2 +- workflow_test.go | 2 +- 7 files changed, 23 insertions(+), 21 deletions(-) diff --git a/builder.go b/builder.go index 36a9489..2f056c1 100644 --- a/builder.go +++ b/builder.go @@ -3,7 +3,6 @@ package workflow import ( "context" "os" - "sync" "time" "k8s.io/utils/clock" @@ -39,11 +38,7 @@ func NewBuilder[Type any, Status StatusType](name string) *Builder[Type, Status] inner: interal_logger.New(os.Stdout), }, runStateChangeHooks: make(map[RunState]RunStateChangeHookFunc[Type, Status]), - runPool: sync.Pool{ - New: func() interface{} { - return &Run[Type, Status]{} - }, - }, + runPool: newRunPool[Type, Status](), }, } } diff --git a/callback_internal_test.go b/callback_internal_test.go index 45390ab..ca2a941 100644 --- a/callback_internal_test.go +++ b/callback_internal_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "io" - "sync" "testing" "time" @@ -22,11 +21,7 @@ func TestProcessCallback(t *testing.T) { clock: clock_testing.NewFakeClock(time.Date(2024, time.April, 19, 0, 0, 0, 0, time.UTC)), statusGraph: graph.New(), logger: &logger{}, - runPool: sync.Pool{ - New: func() interface{} { - return &Run[string, testStatus]{} - }, - }, + runPool: newRunPool[string, testStatus](), } w.statusGraph.AddTransition(int(statusStart), int(statusEnd)) diff --git a/run.go b/run.go index c0154ee..5157207 100644 --- a/run.go +++ b/run.go @@ -2,6 +2,7 @@ package workflow import ( "context" + "sync" ) // Run is a representation of a workflow run. It incorporates all the fields from the Record as well as @@ -56,6 +57,22 @@ type ( runReleaser[Type any, Status StatusType] func(*Run[Type, Status]) ) +// newRunPool creates a new sync.Pool for Run objects with 10 pre-allocated instances +func newRunPool[Type any, Status StatusType]() *sync.Pool { + pool := sync.Pool{ + New: func() interface{} { + return &Run[Type, Status]{} + }, + } + + // Pre-allocate 10 Run objects in the pool for better performance + for i := 0; i < 10; i++ { + pool.Put(&Run[Type, Status]{}) + } + + return &pool +} + func buildRun[Type any, Status StatusType](collector runCollector[Type, Status], store storeFunc, wr *Record) (*Run[Type, Status], error) { var t Type err := Unmarshal(wr.Object, &t) diff --git a/schedule_test.go b/schedule_test.go index eeaba28..0813b4b 100644 --- a/schedule_test.go +++ b/schedule_test.go @@ -282,7 +282,7 @@ func TestWorkflow_ScheduleWithInitialValue(t *testing.T) { clock.SetTime(expectedTimestamp) // Allow scheduling to take place - time.Sleep(time.Second) + time.Sleep(10 * time.Millisecond) run, err := recordStore.Latest(ctx, workflowName, "test") require.NoError(t, err) diff --git a/timeout_internal_test.go b/timeout_internal_test.go index 6ebebef..c44a196 100644 --- a/timeout_internal_test.go +++ b/timeout_internal_test.go @@ -3,7 +3,6 @@ package workflow import ( "context" "errors" - "sync" "testing" "time" @@ -24,11 +23,7 @@ func TestProcessTimeout(t *testing.T) { clock: clock_testing.NewFakeClock(time.Date(2024, time.April, 19, 0, 0, 0, 0, time.UTC)), errorCounter: counter, logger: &logger{}, - runPool: sync.Pool{ - New: func() interface{} { - return &Run[string, testStatus]{} - }, - }, + runPool: newRunPool[string, testStatus](), } value := "data" diff --git a/workflow.go b/workflow.go index 155f7a7..c181593 100644 --- a/workflow.go +++ b/workflow.go @@ -93,7 +93,7 @@ type Workflow[Type any, Status StatusType] struct { launching sync.WaitGroup // runPool pools Run objects to reduce allocations - runPool sync.Pool + runPool *sync.Pool // defaultStartingPoint defines that status that the workflow run will start on when Trigger is called. defaultStartingPoint Status diff --git a/workflow_test.go b/workflow_test.go index df4f977..96fbba0 100644 --- a/workflow_test.go +++ b/workflow_test.go @@ -225,7 +225,7 @@ func BenchmarkWorkflow(b *testing.B) { benchmarkWorkflow(b, 10) }) b.Run("100", func(b *testing.B) { - benchmarkWorkflow(b, 10) + benchmarkWorkflow(b, 100) }) } From 1702164cfadfa584f3c7a48db03f783211d9c8f6 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Thu, 11 Dec 2025 21:42:55 +0000 Subject: [PATCH 4/5] pool run objects --- run.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/run.go b/run.go index 5157207..e980504 100644 --- a/run.go +++ b/run.go @@ -16,9 +16,6 @@ type Run[Type any, Status StatusType] struct { controller RunStateController } -func (r *Run[Type, Status]) reset() { -} - // Pause is intended to be used inside a workflow process where (Status, error) are the return signature. This allows // the user to simply type "return r.Pause(ctx)" to pause a record from inside a workflow which results in the record // being temporarily left alone and will not be processed until it is resumed. From 588ade29154061f43476e697240fa1f9680a03d5 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Thu, 11 Dec 2025 22:44:14 +0000 Subject: [PATCH 5/5] clear our whole run --- run.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/run.go b/run.go index e980504..7e0c588 100644 --- a/run.go +++ b/run.go @@ -108,10 +108,7 @@ func (w *Workflow[Type, Status]) newRunObj() runCollector[Type, Status] { // releaseRun returns a Run object back to the workflow's pool for reuse func (w *Workflow[Type, Status]) releaseRun(run *Run[Type, Status]) { - // Clear references to prevent memory leaks - run.Object = nil + run.TypedRecord = TypedRecord[Type, Status]{} run.controller = nil - // Note: We don't clear the Record as it's a value type - w.runPool.Put(run) }