Skip to content

Commit

Permalink
fix: ctx, linter
Browse files Browse the repository at this point in the history
  • Loading branch information
pancsta committed Jan 18, 2024
1 parent 9e61b13 commit 52f239a
Showing 1 changed file with 48 additions and 32 deletions.
80 changes: 48 additions & 32 deletions pkg/machine/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,17 +246,13 @@ func (m *Machine) GetRelationsOf(fromState string) []Relation {
// listening on 2 When() channels within the same `select` to GC the 2nd one.
func (m *Machine) When(states []string, ctx context.Context) chan struct{} {
ch := make(chan struct{})
if ctx == nil {
ctx = m.Ctx
}

// if all active, close early
if m.Is(states) {
close(ch)
return ch
}

// TODO deadlock with processWhenBindings()
m.activeStatesLock.Lock()
setMap := stateIsActive{}
matched := 0
Expand All @@ -274,21 +270,24 @@ func (m *Machine) When(states []string, ctx context.Context) chan struct{} {
total: len(states),
matched: matched,
}
// TODO backport to #2
go func() {
// dispose the binding on ctx.Done() and m.Ctx.Done()
select {
case <-ctx.Done():
case <-m.Ctx.Done():
// dispose with context
if ctx != nil {
go func() {
<-ctx.Done()
// GC only if needed
if m.disposed {
return
}
m.activeStatesLock.Lock()
for _, s := range states {
if _, ok := m.indexWhen[s]; ok {
m.indexWhen[s] = lo.Without(m.indexWhen[s], binding)
}
}
m.activeStatesLock.Unlock()
}
}()
}()
}
// insert the binding
for _, s := range states {
if _, ok := m.indexWhen[s]; !ok {
m.indexWhen[s] = []*whenBinding{binding}
Expand All @@ -307,9 +306,6 @@ func (m *Machine) When(states []string, ctx context.Context) chan struct{} {
// listening on 2 WhenNot() channels within the same `select` to GC the 2nd one.
func (m *Machine) WhenNot(states []string, ctx context.Context) chan struct{} {
ch := make(chan struct{})
if ctx == nil {
ctx = m.Ctx
}

// if all active, close early
if m.Not(states) {
Expand All @@ -334,21 +330,24 @@ func (m *Machine) WhenNot(states []string, ctx context.Context) chan struct{} {
total: len(states),
matched: matched,
}
// TODO backport to #2
go func() {
// dispose the binding on ctx.Done() and m.Ctx.Done()
select {
case <-ctx.Done():
case <-m.Ctx.Done():
// dispose with context
if ctx != nil {
go func() {
<-ctx.Done()
// GC only if needed
if m.disposed {
return
}
m.activeStatesLock.Lock()
for _, s := range states {
if _, ok := m.indexWhen[s]; ok {
m.indexWhen[s] = lo.Without(m.indexWhen[s], binding)
}
}
m.activeStatesLock.Unlock()
}
}()
}()
}
// insert the binding
for _, s := range states {
if _, ok := m.indexWhen[s]; !ok {
m.indexWhen[s] = []*whenBinding{binding}
Expand Down Expand Up @@ -565,7 +564,6 @@ func (m *Machine) On(events []string, ctx context.Context) chan *Event {
m.indexEventChLock.Lock()
defer m.indexEventChLock.Unlock()

// emitter := m.newEmitter(fmt.Sprintf("ON %s", j(events)))
if ctx == nil {
ctx = m.Ctx
}
Expand All @@ -576,6 +574,28 @@ func (m *Machine) On(events []string, ctx context.Context) chan *Event {
m.indexEventCh[e] = append(m.indexEventCh[e], ch)
}
}
// dispose with context
if ctx != nil {
go func() {
<-ctx.Done()
// GC only if needed
if m.disposed {
return
}
m.indexEventChLock.Lock()
for _, e := range events {
if _, ok := m.indexEventCh[e]; ok {
if len(m.indexEventCh[e]) == 1 {
// delete the whole map, as theres many possible events
delete(m.indexEventCh, e)
} else {
m.indexEventCh[e] = lo.Without(m.indexEventCh[e], ch)
}
}
}
m.indexEventChLock.Unlock()
}()
}
return ch
}

Expand Down Expand Up @@ -759,9 +779,7 @@ func (m *Machine) processStateCtxBindings() {
m.activeStatesLock.Lock()
var toCancel []context.CancelFunc
for _, s := range deactivated {
for _, cancel := range m.indexStateCtx[s] {
toCancel = append(toCancel, cancel)
}
toCancel = append(toCancel, m.indexStateCtx[s]...)
delete(m.indexStateCtx, s)
}
m.activeStatesLock.Unlock()
Expand Down Expand Up @@ -897,10 +915,7 @@ func (m *Machine) emit(name string, args A, step *TransitionStep) Result {
if step.IsSelf {
self = ":self"
}
stepID := name
if step != nil {
stepID = step.ID[:5]
}
stepID := step.ID[:5]
m.log(LogOps, "[cancel%s:%s] (%s) by %s", self, stepID,
targetStates, name)
// queue-end lacks a transition
Expand Down Expand Up @@ -968,7 +983,8 @@ func (m *Machine) processEmitters(e *Event) Result {
case <-m.Ctx.Done():
break
case <-time.After(m.HandlerTimeout):
m.log(LogOps, "[cancel:%s] (%s) by timeout", stepID, j(m.Transition.TargetStates))
m.log(LogOps, "[cancel:%s] (%s) by timeout", stepID,
j(m.Transition.TargetStates))
break
case ret = <-m.handlerDone:
// ok
Expand Down

0 comments on commit 52f239a

Please sign in to comment.