Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func NewBuilder[Type any, Status StatusType](name string) *Builder[Type, Status]
inner: interal_logger.New(os.Stdout),
},
runStateChangeHooks: make(map[RunState]RunStateChangeHookFunc[Type, Status]),
runPool: newRunPool[Type, Status](),
},
}
}
Expand Down
5 changes: 4 additions & 1 deletion callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,14 @@ func processCallback[Type any, Status StatusType](
return nil
}

run, err := buildRun[Type, Status](store, wr)
run, err := buildRun[Type, Status](w.newRunObj(), store, wr)
if err != nil {
return err
}

// Ensure the run is returned to the pool when we're done
defer w.releaseRun(run)

if payload == nil {
// Ensure that an empty value implementation of io.Reader is passed in instead of nil to avoid panic and
// rather allow an unmarshalling error.
Expand Down
1 change: 1 addition & 0 deletions callback_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestProcessCallback(t *testing.T) {
clock: clock_testing.NewFakeClock(time.Date(2024, time.April, 19, 0, 0, 0, 0, time.UTC)),
statusGraph: graph.New(),
logger: &logger{},
runPool: newRunPool[string, testStatus](),
}

w.statusGraph.AddTransition(int(statusStart), int(statusEnd))
Expand Down
56 changes: 47 additions & 9 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package workflow

import (
"context"
"sync"
)

// Run is a representation of a workflow run. It incorporates all the fields from the Record as well as
Expand Down Expand Up @@ -48,7 +49,28 @@ func (r *Run[Type, Status]) SaveAndRepeat() (Status, error) {
return Status(skipTypeSaveAndRepeat), nil
}

func buildRun[Type any, Status StatusType](store storeFunc, wr *Record) (*Run[Type, Status], error) {
type (
runCollector[Type any, Status StatusType] func() *Run[Type, Status]
runReleaser[Type any, Status StatusType] func(*Run[Type, Status])
)

// newRunPool creates a new sync.Pool for Run objects with 10 pre-allocated instances
func newRunPool[Type any, Status StatusType]() *sync.Pool {
pool := sync.Pool{
New: func() interface{} {
return &Run[Type, Status]{}
},
}

// Pre-allocate 10 Run objects in the pool for better performance
for i := 0; i < 10; i++ {
pool.Put(&Run[Type, Status]{})
}

return &pool
}

func buildRun[Type any, Status StatusType](collector runCollector[Type, Status], store storeFunc, wr *Record) (*Run[Type, Status], error) {
var t Type
err := Unmarshal(wr.Object, &t)
if err != nil {
Expand All @@ -62,15 +84,31 @@ func buildRun[Type any, Status StatusType](store storeFunc, wr *Record) (*Run[Ty
wr.RunState = RunStateRunning
}

// Get Run from pool and initialize
run := collector()

// Reset/initialize the run object
controller := NewRunStateController(store, wr)
record := Run[Type, Status]{
TypedRecord: TypedRecord[Type, Status]{
Record: *wr,
Status: Status(wr.Status),
Object: &t,
},
controller: controller,
run.TypedRecord = TypedRecord[Type, Status]{
Record: *wr,
Status: Status(wr.Status),
Object: &t,
}
run.controller = controller

return run, nil
}

// newRunObj returns a function that gets a Run from the pool
func (w *Workflow[Type, Status]) newRunObj() runCollector[Type, Status] {
return func() *Run[Type, Status] {
return w.runPool.Get().(*Run[Type, Status])
}
}

return &record, nil
// releaseRun returns a Run object back to the workflow's pool for reuse
func (w *Workflow[Type, Status]) releaseRun(run *Run[Type, Status]) {
run.TypedRecord = TypedRecord[Type, Status]{}
run.controller = nil
w.runPool.Put(run)
}
9 changes: 8 additions & 1 deletion step.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func consumeStepEvents[Type any, Status StatusType](
updater,
pauseAfterErrCount,
w.errorCounter,
w.newRunObj(),
w.releaseRun,
),
w.clock,
lag,
Expand All @@ -111,6 +113,8 @@ func stepConsumer[Type any, Status StatusType](
updater updater[Type, Status],
pauseAfterErrCount int,
errorCounter ErrorCounter,
runCollector runCollector[Type, Status],
runReleaser runReleaser[Type, Status],
) func(ctx context.Context, e *Event) error {
return func(ctx context.Context, e *Event) error {
record, err := lookupFn(ctx, e.ForeignID)
Expand Down Expand Up @@ -160,11 +164,14 @@ func stepConsumer[Type any, Status StatusType](
return nil
}

run, err := buildRun[Type, Status](store, record)
run, err := buildRun[Type, Status](runCollector, store, record)
if err != nil {
return err
}

// Ensure the run is returned to the pool when we're done
defer runReleaser(run)

next, err := stepLogic(ctx, run)
if err != nil {
originalErr := err
Expand Down
8 changes: 8 additions & 0 deletions step_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func Test_stepConsumer(t *testing.T) {
updater,
0,
w.errorCounter,
func() *Run[string, testStatus] { return &Run[string, testStatus]{} },
func(*Run[string, testStatus]) {},
)(ctx, &Event{})
require.NoError(t, err)

Expand Down Expand Up @@ -141,6 +143,8 @@ func Test_stepConsumer(t *testing.T) {
updater,
0,
w.errorCounter,
func() *Run[string, testStatus] { return &Run[string, testStatus]{} },
func(*Run[string, testStatus]) {},
)(ctx, &Event{})
require.NoError(t, err)

Expand Down Expand Up @@ -190,6 +194,8 @@ func Test_stepConsumer(t *testing.T) {
updater,
0,
w.errorCounter,
func() *Run[string, testStatus] { return &Run[string, testStatus]{} },
func(*Run[string, testStatus]) {},
)(ctx, &Event{})
require.NoError(t, err)

Expand Down Expand Up @@ -241,6 +247,8 @@ func Test_stepConsumer(t *testing.T) {
updater,
3,
w.errorCounter,
func() *Run[string, testStatus] { return &Run[string, testStatus]{} },
func(*Run[string, testStatus]) {},
)
require.NoError(t, err)

Expand Down
5 changes: 4 additions & 1 deletion testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,12 @@ func WaitFor[Type any, Status StatusType](
}

_, err := waitFor(t, w, foreignID, func(r *Record) (bool, error) {
run, err := buildRun[Type, Status](w.recordStore.Store, r)
run, err := buildRun[Type, Status](w.newRunObj(), w.recordStore.Store, r)
require.NoError(t, err)

// Ensure the run is returned to the pool when we're done
defer w.releaseRun(run)

return fn(run)
})
require.NoError(t, err)
Expand Down
7 changes: 6 additions & 1 deletion timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,14 @@ func processTimeout[Type any, Status StatusType](
processName string,
pauseAfterErrCount int,
) error {
run, err := buildRun[Type, Status](store, record)
run, err := buildRun[Type, Status](w.newRunObj(), store, record)
if err != nil {
return err
}

// Ensure the run is returned to the pool when we're done
defer w.releaseRun(run)

next, err := config.TimeoutFunc(ctx, run, w.clock.Now())
if err != nil {
_, err := maybePause(ctx, pauseAfterErrCount, w.errorCounter, err, processName, run, w.logger)
Expand Down Expand Up @@ -287,6 +290,8 @@ func timeoutAutoInserterConsumer[Type any, Status StatusType](
updater,
pauseAfterErrCount,
w.errorCounter,
w.newRunObj(),
w.releaseRun,
),
w.clock,
0,
Expand Down
1 change: 1 addition & 0 deletions timeout_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestProcessTimeout(t *testing.T) {
clock: clock_testing.NewFakeClock(time.Date(2024, time.April, 19, 0, 0, 0, 0, time.UTC)),
errorCounter: counter,
logger: &logger{},
runPool: newRunPool[string, testStatus](),
}

value := "data"
Expand Down
3 changes: 3 additions & 0 deletions workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ type Workflow[Type any, Status StatusType] struct {
// and block until this transition is complete.
launching sync.WaitGroup

// runPool pools Run objects to reduce allocations
runPool *sync.Pool

// defaultStartingPoint defines that status that the workflow run will start on when Trigger is called.
defaultStartingPoint Status
statusGraph *graph.Graph
Expand Down
16 changes: 9 additions & 7 deletions workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,15 @@ func acceptanceTestWorkflow() *workflow.Builder[MyType, status] {
}

func BenchmarkWorkflow(b *testing.B) {
b.Run("1", func(b *testing.B) {
benchmarkWorkflow(b, 1)
})
b.Run("5", func(b *testing.B) {
benchmarkWorkflow(b, 5)
})
b.Run("10", func(b *testing.B) {
benchmarkWorkflow(b, 10)
})
b.Run("100", func(b *testing.B) {
benchmarkWorkflow(b, 100)
})
}

func benchmarkWorkflow(b *testing.B, numberOfSteps int) {
Expand All @@ -236,9 +236,11 @@ func benchmarkWorkflow(b *testing.B, numberOfSteps int) {
bldr := workflow.NewBuilder[MyType, status]("benchmark")

for i := range numberOfSteps {
bldr.AddStep(status(i), func(ctx context.Context, r *workflow.Run[MyType, status]) (status, error) {
return status(i + 1), nil
}, status(i+1))
from := i + 1
to := i + 2
bldr.AddStep(status(from), func(ctx context.Context, r *workflow.Run[MyType, status]) (status, error) {
return status(to), nil
}, status(to))
}

recordStore := memrecordstore.New()
Expand Down Expand Up @@ -266,7 +268,7 @@ func benchmarkWorkflow(b *testing.B, numberOfSteps int) {
b.Fatal(err)
}

workflow.Require(b, wf, fid, status(numberOfSteps), MyType{
workflow.Require(b, wf, fid, status(numberOfSteps+1), MyType{
UserID: expectedUserID,
})
}
Expand Down
Loading