From 6b79af66ae602fceefc06bb101b8bd0bb2e6fe2b Mon Sep 17 00:00:00 2001 From: qmuntal Date: Thu, 31 Aug 2023 10:56:35 +0200 Subject: [PATCH 1/3] fix race conditions in queued Fire Co-authored-by: qmuntal Co-authored-by: liang-kang --- statemachine.go | 60 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 18 deletions(-) diff --git a/statemachine.go b/statemachine.go index be3631f..ec61cf1 100644 --- a/statemachine.go +++ b/statemachine.go @@ -1,7 +1,6 @@ package stateless import ( - "container/list" "context" "fmt" "reflect" @@ -72,7 +71,7 @@ type StateMachine struct { unhandledTriggerAction UnhandledTriggerActionFunc onTransitioningEvents []TransitionFunc onTransitionedEvents []TransitionFunc - eventQueue list.List + eventQueue queue firingMode FiringMode firingMutex sync.Mutex stateMutex sync.RWMutex @@ -323,6 +322,8 @@ func (sm *StateMachine) stateRepresentation(state State) *stateRepresentation { func (sm *StateMachine) internalFire(ctx context.Context, trigger Trigger, args ...any) error { switch sm.firingMode { case FiringImmediate: + sm.ops.Add(1) + defer sm.ops.Add(^uint64(0)) return sm.internalFireOne(ctx, trigger, args...) case FiringQueued: fallthrough @@ -337,33 +338,56 @@ type queuedTrigger struct { Args []any } -func (sm *StateMachine) internalFireQueued(ctx context.Context, trigger Trigger, args ...any) error { - sm.firingMutex.Lock() - sm.eventQueue.PushBack(queuedTrigger{Context: ctx, Trigger: trigger, Args: args}) - sm.firingMutex.Unlock() - if sm.Firing() { - return nil +type queue struct { + triggers []queuedTrigger + mu sync.Mutex +} + +func (q *queue) pushBack(ctx context.Context, trigger Trigger, args ...any) { + q.mu.Lock() + defer q.mu.Unlock() + q.triggers = append(q.triggers, queuedTrigger{Context: ctx, Trigger: trigger, Args: args}) +} + +func (q *queue) popFront() (et queuedTrigger, ok bool) { + q.mu.Lock() + defer q.mu.Unlock() + if len(q.triggers) == 0 { + return queuedTrigger{}, false } + et, q.triggers = q.triggers[0], q.triggers[1:] + return et, true +} - for { +func (sm *StateMachine) internalFireQueued(ctx context.Context, trigger Trigger, args ...any) error { + if !sm.Firing() { sm.firingMutex.Lock() - e := sm.eventQueue.Front() - if e == nil { + // Check again, since another goroutine may have added it while we were waiting for the lock. + if !sm.Firing() { + sm.ops.Add(1) // In this mode there is always only one operation in progress sm.firingMutex.Unlock() - break + defer sm.ops.Add(^uint64(0)) + if err := sm.internalFireOne(ctx, trigger, args...); err != nil { + return err + } + for { + et, ok := sm.eventQueue.popFront() + if !ok { + break + } + if err := sm.internalFireOne(et.Context, et.Trigger, et.Args...); err != nil { + return err + } + } + return nil } - et := sm.eventQueue.Remove(e).(queuedTrigger) sm.firingMutex.Unlock() - if err := sm.internalFireOne(et.Context, et.Trigger, et.Args...); err != nil { - return err - } } + sm.eventQueue.pushBack(ctx, trigger, args...) return nil } func (sm *StateMachine) internalFireOne(ctx context.Context, trigger Trigger, args ...any) error { - sm.ops.Add(1) - defer sm.ops.Add(^uint64(0)) var ( config triggerWithParameters ok bool From d3dba3d2b44a04fdad41e1e23803e9e074fa68ec Mon Sep 17 00:00:00 2001 From: Liang Kang Date: Fri, 1 Sep 2023 10:26:33 +0800 Subject: [PATCH 2/3] fix potential issue of statemachine trigger lost there exists a tiny gap between finishing iterations of firing queued event and sm.op decrease If another routine run internalFireQueued in that gap, it will not firing but save trigger in eventqueue and return Meanwhile, origin routine will not check event queue again because interations has finished so such trigger event will be left in queue and wait for later firing trigger to executing. if no trigger again, this trigger event will wait for ever. --- statemachine.go | 66 +++++++++++++++++++++++++------------------------ 1 file changed, 34 insertions(+), 32 deletions(-) diff --git a/statemachine.go b/statemachine.go index ec61cf1..09b2fc9 100644 --- a/statemachine.go +++ b/statemachine.go @@ -73,7 +73,6 @@ type StateMachine struct { onTransitionedEvents []TransitionFunc eventQueue queue firingMode FiringMode - firingMutex sync.Mutex stateMutex sync.RWMutex } @@ -343,47 +342,50 @@ type queue struct { mu sync.Mutex } -func (q *queue) pushBack(ctx context.Context, trigger Trigger, args ...any) { - q.mu.Lock() - defer q.mu.Unlock() - q.triggers = append(q.triggers, queuedTrigger{Context: ctx, Trigger: trigger, Args: args}) +func (sm *StateMachine) queueEvent(ctx context.Context, trigger Trigger, args ...any) { + sm.eventQueue.mu.Lock() + defer sm.eventQueue.mu.Unlock() + + sm.eventQueue.triggers = append(sm.eventQueue.triggers, queuedTrigger{Context: ctx, Trigger: trigger, Args: args}) } -func (q *queue) popFront() (et queuedTrigger, ok bool) { - q.mu.Lock() - defer q.mu.Unlock() - if len(q.triggers) == 0 { +func (sm *StateMachine) fetchOneEvent() (et queuedTrigger, ok bool) { + sm.eventQueue.mu.Lock() + defer sm.eventQueue.mu.Unlock() + + if len(sm.eventQueue.triggers) == 0 { + return queuedTrigger{}, false + } + + if !sm.ops.CompareAndSwap(0, 1) { return queuedTrigger{}, false } - et, q.triggers = q.triggers[0], q.triggers[1:] + + et, sm.eventQueue.triggers = sm.eventQueue.triggers[0], sm.eventQueue.triggers[1:] return et, true } +func (sm *StateMachine) executeOneEvent(et queuedTrigger) error { + defer sm.ops.Add(^uint64(0)) + + return sm.internalFireOne(et.Context, et.Trigger, et.Args...) +} + func (sm *StateMachine) internalFireQueued(ctx context.Context, trigger Trigger, args ...any) error { - if !sm.Firing() { - sm.firingMutex.Lock() - // Check again, since another goroutine may have added it while we were waiting for the lock. - if !sm.Firing() { - sm.ops.Add(1) // In this mode there is always only one operation in progress - sm.firingMutex.Unlock() - defer sm.ops.Add(^uint64(0)) - if err := sm.internalFireOne(ctx, trigger, args...); err != nil { - return err - } - for { - et, ok := sm.eventQueue.popFront() - if !ok { - break - } - if err := sm.internalFireOne(et.Context, et.Trigger, et.Args...); err != nil { - return err - } - } - return nil + sm.queueEvent(ctx, trigger, args...) + + for { + et, ok := sm.fetchOneEvent() + if !ok { + break + } + + err := sm.executeOneEvent(et) + if err != nil { + return err } - sm.firingMutex.Unlock() } - sm.eventQueue.pushBack(ctx, trigger, args...) + return nil } From 8af1ab775bd7401c7c840ffee460e199dabb4927 Mon Sep 17 00:00:00 2001 From: qmuntal Date: Fri, 1 Sep 2023 11:18:14 +0200 Subject: [PATCH 3/3] pull out firing mode implementations --- modes.go | 88 ++++++++++++++++++++++++++++++++++++++++++++++ statemachine.go | 92 +++++++------------------------------------------ 2 files changed, 101 insertions(+), 79 deletions(-) create mode 100644 modes.go diff --git a/modes.go b/modes.go new file mode 100644 index 0000000..3aec556 --- /dev/null +++ b/modes.go @@ -0,0 +1,88 @@ +package stateless + +import ( + "context" + "sync" + "sync/atomic" +) + +type fireMode interface { + Fire(ctx context.Context, trigger Trigger, args ...any) error + Firing() bool +} + +type fireModeImmediate struct { + ops atomic.Uint64 + sm *StateMachine +} + +func (f *fireModeImmediate) Firing() bool { + return f.ops.Load() > 0 +} + +func (f *fireModeImmediate) Fire(ctx context.Context, trigger Trigger, args ...any) error { + f.ops.Add(1) + defer f.ops.Add(^uint64(0)) + return f.sm.internalFireOne(ctx, trigger, args...) +} + +type queuedTrigger struct { + Context context.Context + Trigger Trigger + Args []any +} + +type fireModeQueued struct { + firing atomic.Bool + sm *StateMachine + + triggers []queuedTrigger + mu sync.Mutex // guards triggers +} + +func (f *fireModeQueued) Firing() bool { + return f.firing.Load() +} + +func (f *fireModeQueued) Fire(ctx context.Context, trigger Trigger, args ...any) error { + f.enqueue(ctx, trigger, args...) + for { + et, ok := f.fetch() + if !ok { + break + } + err := f.execute(et) + if err != nil { + return err + } + } + return nil +} + +func (f *fireModeQueued) enqueue(ctx context.Context, trigger Trigger, args ...any) { + f.mu.Lock() + defer f.mu.Unlock() + + f.triggers = append(f.triggers, queuedTrigger{Context: ctx, Trigger: trigger, Args: args}) +} + +func (f *fireModeQueued) fetch() (et queuedTrigger, ok bool) { + f.mu.Lock() + defer f.mu.Unlock() + + if len(f.triggers) == 0 { + return queuedTrigger{}, false + } + + if !f.firing.CompareAndSwap(false, true) { + return queuedTrigger{}, false + } + + et, f.triggers = f.triggers[0], f.triggers[1:] + return et, true +} + +func (f *fireModeQueued) execute(et queuedTrigger) error { + defer f.firing.Swap(false) + return f.sm.internalFireOne(et.Context, et.Trigger, et.Args...) +} diff --git a/statemachine.go b/statemachine.go index 09b2fc9..fcbc2bb 100644 --- a/statemachine.go +++ b/statemachine.go @@ -5,7 +5,6 @@ import ( "fmt" "reflect" "sync" - "sync/atomic" ) // State is used to to represent the possible machine states. @@ -63,7 +62,6 @@ func callEvents(events []TransitionFunc, ctx context.Context, transition Transit // It is safe to use the StateMachine concurrently, but non of the callbacks (state manipulation, actions, events, ...) are guarded, // so it is up to the client to protect them against race conditions. type StateMachine struct { - ops atomic.Uint64 stateConfig map[State]*stateRepresentation triggerConfig map[Trigger]triggerWithParameters stateAccessor func(context.Context) (State, error) @@ -71,17 +69,22 @@ type StateMachine struct { unhandledTriggerAction UnhandledTriggerActionFunc onTransitioningEvents []TransitionFunc onTransitionedEvents []TransitionFunc - eventQueue queue - firingMode FiringMode stateMutex sync.RWMutex + mode fireMode } -func newStateMachine() *StateMachine { - return &StateMachine{ +func newStateMachine(firingMode FiringMode) *StateMachine { + sm := &StateMachine{ stateConfig: make(map[State]*stateRepresentation), triggerConfig: make(map[Trigger]triggerWithParameters), unhandledTriggerAction: UnhandledTriggerActionFunc(DefaultUnhandledTriggerAction), } + if firingMode == FiringImmediate { + sm.mode = &fireModeImmediate{sm: sm} + } else { + sm.mode = &fireModeQueued{sm: sm} + } + return sm } // NewStateMachine returns a queued state machine. @@ -92,7 +95,7 @@ func NewStateMachine(initialState State) *StateMachine { // NewStateMachineWithMode returns a state machine with the desired firing mode func NewStateMachineWithMode(initialState State, firingMode FiringMode) *StateMachine { var stateMutex sync.Mutex - sm := newStateMachine() + sm := newStateMachine(firingMode) reference := &struct { State State }{State: initialState} @@ -107,16 +110,14 @@ func NewStateMachineWithMode(initialState State, firingMode FiringMode) *StateMa reference.State = state return nil } - sm.firingMode = firingMode return sm } // NewStateMachineWithExternalStorage returns a state machine with external state storage. func NewStateMachineWithExternalStorage(stateAccessor func(context.Context) (State, error), stateMutator func(context.Context, State) error, firingMode FiringMode) *StateMachine { - sm := newStateMachine() + sm := newStateMachine(firingMode) sm.stateAccessor = stateAccessor sm.stateMutator = stateMutator - sm.firingMode = firingMode return sm } @@ -274,7 +275,7 @@ func (sm *StateMachine) Configure(state State) *StateConfiguration { // Firing returns true when the state machine is processing a trigger. func (sm *StateMachine) Firing() bool { - return sm.ops.Load() != 0 + return sm.mode.Firing() } // String returns a human-readable representation of the state machine. @@ -319,74 +320,7 @@ func (sm *StateMachine) stateRepresentation(state State) *stateRepresentation { } func (sm *StateMachine) internalFire(ctx context.Context, trigger Trigger, args ...any) error { - switch sm.firingMode { - case FiringImmediate: - sm.ops.Add(1) - defer sm.ops.Add(^uint64(0)) - return sm.internalFireOne(ctx, trigger, args...) - case FiringQueued: - fallthrough - default: - return sm.internalFireQueued(ctx, trigger, args...) - } -} - -type queuedTrigger struct { - Context context.Context - Trigger Trigger - Args []any -} - -type queue struct { - triggers []queuedTrigger - mu sync.Mutex -} - -func (sm *StateMachine) queueEvent(ctx context.Context, trigger Trigger, args ...any) { - sm.eventQueue.mu.Lock() - defer sm.eventQueue.mu.Unlock() - - sm.eventQueue.triggers = append(sm.eventQueue.triggers, queuedTrigger{Context: ctx, Trigger: trigger, Args: args}) -} - -func (sm *StateMachine) fetchOneEvent() (et queuedTrigger, ok bool) { - sm.eventQueue.mu.Lock() - defer sm.eventQueue.mu.Unlock() - - if len(sm.eventQueue.triggers) == 0 { - return queuedTrigger{}, false - } - - if !sm.ops.CompareAndSwap(0, 1) { - return queuedTrigger{}, false - } - - et, sm.eventQueue.triggers = sm.eventQueue.triggers[0], sm.eventQueue.triggers[1:] - return et, true -} - -func (sm *StateMachine) executeOneEvent(et queuedTrigger) error { - defer sm.ops.Add(^uint64(0)) - - return sm.internalFireOne(et.Context, et.Trigger, et.Args...) -} - -func (sm *StateMachine) internalFireQueued(ctx context.Context, trigger Trigger, args ...any) error { - sm.queueEvent(ctx, trigger, args...) - - for { - et, ok := sm.fetchOneEvent() - if !ok { - break - } - - err := sm.executeOneEvent(et) - if err != nil { - return err - } - } - - return nil + return sm.mode.Fire(ctx, trigger, args...) } func (sm *StateMachine) internalFireOne(ctx context.Context, trigger Trigger, args ...any) error {