Skip to content

Commit

Permalink
pull out firing mode implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
qmuntal committed Sep 1, 2023
1 parent a570685 commit 8af1ab7
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 79 deletions.
88 changes: 88 additions & 0 deletions modes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package stateless

import (
"context"
"sync"
"sync/atomic"
)

type fireMode interface {
Fire(ctx context.Context, trigger Trigger, args ...any) error
Firing() bool
}

type fireModeImmediate struct {
ops atomic.Uint64
sm *StateMachine
}

func (f *fireModeImmediate) Firing() bool {
return f.ops.Load() > 0
}

func (f *fireModeImmediate) Fire(ctx context.Context, trigger Trigger, args ...any) error {
f.ops.Add(1)
defer f.ops.Add(^uint64(0))
return f.sm.internalFireOne(ctx, trigger, args...)
}

type queuedTrigger struct {
Context context.Context
Trigger Trigger
Args []any
}

type fireModeQueued struct {
firing atomic.Bool
sm *StateMachine

triggers []queuedTrigger
mu sync.Mutex // guards triggers
}

func (f *fireModeQueued) Firing() bool {
return f.firing.Load()
}

func (f *fireModeQueued) Fire(ctx context.Context, trigger Trigger, args ...any) error {
f.enqueue(ctx, trigger, args...)
for {
et, ok := f.fetch()
if !ok {
break
}
err := f.execute(et)
if err != nil {
return err
}
}
return nil
}

func (f *fireModeQueued) enqueue(ctx context.Context, trigger Trigger, args ...any) {
f.mu.Lock()
defer f.mu.Unlock()

f.triggers = append(f.triggers, queuedTrigger{Context: ctx, Trigger: trigger, Args: args})
}

func (f *fireModeQueued) fetch() (et queuedTrigger, ok bool) {
f.mu.Lock()
defer f.mu.Unlock()

if len(f.triggers) == 0 {
return queuedTrigger{}, false
}

if !f.firing.CompareAndSwap(false, true) {
return queuedTrigger{}, false
}

et, f.triggers = f.triggers[0], f.triggers[1:]
return et, true
}

func (f *fireModeQueued) execute(et queuedTrigger) error {
defer f.firing.Swap(false)
return f.sm.internalFireOne(et.Context, et.Trigger, et.Args...)
}
92 changes: 13 additions & 79 deletions statemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"reflect"
"sync"
"sync/atomic"
)

// State is used to to represent the possible machine states.
Expand Down Expand Up @@ -63,25 +62,29 @@ func callEvents(events []TransitionFunc, ctx context.Context, transition Transit
// It is safe to use the StateMachine concurrently, but non of the callbacks (state manipulation, actions, events, ...) are guarded,
// so it is up to the client to protect them against race conditions.
type StateMachine struct {
ops atomic.Uint64
stateConfig map[State]*stateRepresentation
triggerConfig map[Trigger]triggerWithParameters
stateAccessor func(context.Context) (State, error)
stateMutator func(context.Context, State) error
unhandledTriggerAction UnhandledTriggerActionFunc
onTransitioningEvents []TransitionFunc
onTransitionedEvents []TransitionFunc
eventQueue queue
firingMode FiringMode
stateMutex sync.RWMutex
mode fireMode
}

func newStateMachine() *StateMachine {
return &StateMachine{
func newStateMachine(firingMode FiringMode) *StateMachine {
sm := &StateMachine{
stateConfig: make(map[State]*stateRepresentation),
triggerConfig: make(map[Trigger]triggerWithParameters),
unhandledTriggerAction: UnhandledTriggerActionFunc(DefaultUnhandledTriggerAction),
}
if firingMode == FiringImmediate {
sm.mode = &fireModeImmediate{sm: sm}
} else {
sm.mode = &fireModeQueued{sm: sm}
}
return sm
}

// NewStateMachine returns a queued state machine.
Expand All @@ -92,7 +95,7 @@ func NewStateMachine(initialState State) *StateMachine {
// NewStateMachineWithMode returns a state machine with the desired firing mode
func NewStateMachineWithMode(initialState State, firingMode FiringMode) *StateMachine {
var stateMutex sync.Mutex
sm := newStateMachine()
sm := newStateMachine(firingMode)
reference := &struct {
State State
}{State: initialState}
Expand All @@ -107,16 +110,14 @@ func NewStateMachineWithMode(initialState State, firingMode FiringMode) *StateMa
reference.State = state
return nil
}
sm.firingMode = firingMode
return sm
}

// NewStateMachineWithExternalStorage returns a state machine with external state storage.
func NewStateMachineWithExternalStorage(stateAccessor func(context.Context) (State, error), stateMutator func(context.Context, State) error, firingMode FiringMode) *StateMachine {
sm := newStateMachine()
sm := newStateMachine(firingMode)
sm.stateAccessor = stateAccessor
sm.stateMutator = stateMutator
sm.firingMode = firingMode
return sm
}

Expand Down Expand Up @@ -274,7 +275,7 @@ func (sm *StateMachine) Configure(state State) *StateConfiguration {

// Firing returns true when the state machine is processing a trigger.
func (sm *StateMachine) Firing() bool {
return sm.ops.Load() != 0
return sm.mode.Firing()
}

// String returns a human-readable representation of the state machine.
Expand Down Expand Up @@ -319,74 +320,7 @@ func (sm *StateMachine) stateRepresentation(state State) *stateRepresentation {
}

func (sm *StateMachine) internalFire(ctx context.Context, trigger Trigger, args ...any) error {
switch sm.firingMode {
case FiringImmediate:
sm.ops.Add(1)
defer sm.ops.Add(^uint64(0))
return sm.internalFireOne(ctx, trigger, args...)
case FiringQueued:
fallthrough
default:
return sm.internalFireQueued(ctx, trigger, args...)
}
}

type queuedTrigger struct {
Context context.Context
Trigger Trigger
Args []any
}

type queue struct {
triggers []queuedTrigger
mu sync.Mutex
}

func (sm *StateMachine) queueEvent(ctx context.Context, trigger Trigger, args ...any) {
sm.eventQueue.mu.Lock()
defer sm.eventQueue.mu.Unlock()

sm.eventQueue.triggers = append(sm.eventQueue.triggers, queuedTrigger{Context: ctx, Trigger: trigger, Args: args})
}

func (sm *StateMachine) fetchOneEvent() (et queuedTrigger, ok bool) {
sm.eventQueue.mu.Lock()
defer sm.eventQueue.mu.Unlock()

if len(sm.eventQueue.triggers) == 0 {
return queuedTrigger{}, false
}

if !sm.ops.CompareAndSwap(0, 1) {
return queuedTrigger{}, false
}

et, sm.eventQueue.triggers = sm.eventQueue.triggers[0], sm.eventQueue.triggers[1:]
return et, true
}

func (sm *StateMachine) executeOneEvent(et queuedTrigger) error {
defer sm.ops.Add(^uint64(0))

return sm.internalFireOne(et.Context, et.Trigger, et.Args...)
}

func (sm *StateMachine) internalFireQueued(ctx context.Context, trigger Trigger, args ...any) error {
sm.queueEvent(ctx, trigger, args...)

for {
et, ok := sm.fetchOneEvent()
if !ok {
break
}

err := sm.executeOneEvent(et)
if err != nil {
return err
}
}

return nil
return sm.mode.Fire(ctx, trigger, args...)
}

func (sm *StateMachine) internalFireOne(ctx context.Context, trigger Trigger, args ...any) error {
Expand Down

0 comments on commit 8af1ab7

Please sign in to comment.