Skip to content

Commit

Permalink
feat: add telemetry via net/rpc (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
pancsta committed Feb 22, 2024
1 parent 88ef244 commit d51db0f
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 59 deletions.
91 changes: 60 additions & 31 deletions pkg/machine/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Machine struct {
StateNames S

emitters []*emitter
clock map[string]uint64
clock Clocks
cancel context.CancelFunc
logLevel LogLevel
logger Logger
Expand All @@ -81,6 +81,8 @@ type Machine struct {
indexEventChLock sync.Mutex
handlerDone chan bool
handlerPanic chan any
logEntriesLock sync.Mutex
logEntries []string
}

// New creates a new Machine instance, bound to context and modified with
Expand Down Expand Up @@ -690,8 +692,7 @@ func (m *Machine) recoverToErr(emitter *emitter, r any) {
m.setActiveStates(t.CalledStates(), activeStates, t.IsAuto())
t.IsCompleted = true
}
m.log(LogOps, "[cancel:%s] (%s) by recover", t.latestStep.ID[:5],
j(t.TargetStates))
m.log(LogOps, "[cancel] (%s) by recover", j(t.TargetStates))
if t.Mutation == nil {
// TODO can this even happen?
panic(fmt.Sprintf("no mutation panic in %s: %s", emitter.id, err))
Expand Down Expand Up @@ -763,6 +764,10 @@ func (m *Machine) setActiveStates(calledStates S, targetStates S,
if !lo.Contains(previous, state) ||
(lo.Contains(calledStates, state) && data.Multi) {
m.clock[state]++
// treat multi states as new states (for logging)
if data.Multi && lo.Contains(previous, state) {
newStates = append(newStates, state)
}
}
}

Expand Down Expand Up @@ -817,7 +822,7 @@ func (m *Machine) processQueue() Result {
item := &m.Queue[0]
m.Queue = m.Queue[1:]
m.queueLock.Unlock()
m.Transition = newTransition(m, item)
newTransition(m, item)
// execute the transition
ret = append(ret, m.Transition.emitEvents())
m.processWhenBindings()
Expand Down Expand Up @@ -937,22 +942,42 @@ func (m *Machine) log(level LogLevel, msg string, args ...any) {
if m.LogID {
msg = "[" + m.ID[:5] + "] " + msg
}
out := fmt.Sprintf(msg, args...)
if m.logger != nil {
m.logger(level, msg, args...)
return
} else {
fmt.Println(out)
}

t := m.Transition
if t != nil {
// append the log msg to the current transition
t.LogEntries = append(t.LogEntries, out)
} else {
// append the log msg the machine and collect at the end of the next
// transition
m.logEntriesLock.Lock()
defer m.logEntriesLock.Unlock()
m.logEntries = append(m.logEntries, out)
}
fmt.Printf(msg+"\n", args...)
}

// SetLogger sets a custom logger function.
func (m *Machine) SetLogger(fn Logger) {
m.logger = fn
}

// GetLogger returns the current custom logger function, or nil.
func (m *Machine) GetLogger() Logger {
return m.logger
}

// emit is a synchronous (blocking) emit with cancellation via a return channel.
// Can block indefinitely if the handler doesn't return or the emitter isn't
// accepting events.
func (m *Machine) emit(name string, args A, step *TransitionStep) Result {
func (m *Machine) emit(
name string, args A, step *TransitionStep,
) (Result, bool) {
e := &Event{
Name: name,
Machine: m,
Expand All @@ -966,44 +991,36 @@ func (m *Machine) emit(name string, args A, step *TransitionStep) Result {
targetStates = j(t.TargetStates)
}
// call the handlers
res := m.processEmitters(e)
res, handlerCalled := m.processEmitters(e)
if m.panicCaught {
res = Canceled
m.panicCaught = false
}
// check if this is an internal event
if step == nil {
return Executed
return Executed, handlerCalled
}
// negotiation support
if !step.IsFinal && res == Canceled {
var self string
if step.IsSelf {
self = ":self"
}
stepID := step.ID[:5]
m.log(LogOps, "[cancel%s:%s] (%s) by %s", self, stepID,
m.log(LogOps, "[cancel%s] (%s) by %s", self,
targetStates, name)
// queue-end lacks a transition
if t != nil {
t.addSteps(step)
}
return Canceled
return Canceled, handlerCalled
}
return Executed
return Executed, handlerCalled
}

func (m *Machine) processEmitters(e *Event) Result {
func (m *Machine) processEmitters(e *Event) (Result, bool) {
var emitter *emitter
handlerCalled := false
for _, emitter = range m.emitters {
if m.Ctx.Err() != nil {
break
}
method := e.Name
stepID := method
if e.step != nil {
stepID = e.step.ID[:5]
}
// internal event
if e.step == nil {
break
Expand All @@ -1014,9 +1031,7 @@ func (m *Machine) processEmitters(e *Event) Result {
emitterID = emitterID[:15]
}
emitterID = padString(strings.ReplaceAll(emitterID, " ", "_"), 15, "_")
m.log(LogEverything, "[emit:%-15s:%s] %s", emitterID,
stepID, method)
m.Transition.addSteps(e.step)
m.log(LogEverything, "[emit:%-15s] %s", emitterID, method)
}
// if no handler, skip
if !emitter.methods.MethodByName(method).IsValid() {
Expand All @@ -1025,6 +1040,7 @@ func (m *Machine) processEmitters(e *Event) Result {
m.log(LogOps, "[handler] %s", method)
// call the handler
var ret bool
handlerCalled = true

go func() {
if m.PanicToException {
Expand All @@ -1050,7 +1066,7 @@ func (m *Machine) processEmitters(e *Event) Result {
case <-m.Ctx.Done():
break
case <-time.After(m.HandlerTimeout):
m.log(LogOps, "[cancel:%s] (%s) by timeout", stepID,
m.log(LogOps, "[cancel] (%s) by timeout",
j(m.Transition.TargetStates))
break
case ret = <-m.handlerDone:
Expand All @@ -1066,14 +1082,14 @@ func (m *Machine) processEmitters(e *Event) Result {
// returns from State and End handlers are ignored
default:
if !ret {
return Canceled
return Canceled, handlerCalled
}
}
}
if m.processEventChs(e) == Canceled {
return Canceled
return Canceled, handlerCalled
}
return Executed
return Executed, handlerCalled
}

// processEventChs sends the event to all On() dynamic handlers.
Expand Down Expand Up @@ -1176,12 +1192,25 @@ func (m *Machine) Has(states S) bool {

// HasStateChanged checks current active states have changed from the passed
// ones.
func (m *Machine) HasStateChanged(before S) bool {
// Optionally also compares clock ticks.
func (m *Machine) HasStateChanged(before S, clocks Clocks) bool {
m.activeStatesLock.RLock()
defer m.activeStatesLock.RUnlock()

lenEqual := len(before) == len(m.ActiveStates)
return !lenEqual || len(DiffStates(before, m.ActiveStates)) > 0
if !lenEqual || len(DiffStates(before, m.ActiveStates)) > 0 {
return true
}
if clocks == nil {
return true
}
// compare clocks
for state, tick := range clocks {
if m.clock[state] != tick {
return true
}
}
return false
}

// String returns a one line representation of the currently active states,
Expand Down
10 changes: 5 additions & 5 deletions pkg/machine/machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func TestNegotiationCancel(t *testing.T) {
// DEnter will cancel the transition
return m.Set(S{"D"}, nil)
},
regexp.MustCompile(`\[cancel:\w+?] \(D\) by DEnter`),
regexp.MustCompile(`\[cancel] \(D\) by DEnter`),
},
{
"using add",
Expand All @@ -472,7 +472,7 @@ func TestNegotiationCancel(t *testing.T) {
// DEnter will cancel the transition
return m.Add(S{"D"}, nil)
},
regexp.MustCompile(`\[cancel:\w+?] \(D A\) by DEnter`),
regexp.MustCompile(`\[cancel] \(D A\) by DEnter`),
},
}
for _, test := range tests {
Expand Down Expand Up @@ -541,7 +541,7 @@ func TestNegotiationRemove(t *testing.T) {
// assert
assert.Equal(t, am.Canceled, result, "transition should be canceled")
assertStates(t, m, S{"A"}, "state shouldnt be changed")
assert.Regexp(t, `\[cancel:\w+?] \(\) by AExit`, log,
assert.Regexp(t, `\[cancel] \(\) by AExit`, log,
"log should explain the reason of cancellation")
}

Expand Down Expand Up @@ -803,7 +803,7 @@ func TestPartialNegotiationPanic(t *testing.T) {
assert.Equal(t, am.Canceled, m.Add(S{"B"}, nil))
// assert
assertStates(t, m, S{"A", "Exception"})
assert.Regexp(t, `\[cancel:\w+?] \(B A\) by recover`, log,
assert.Regexp(t, `\[cancel] \(B A\) by recover`, log,
"log contains the target states and handler")
}

Expand Down Expand Up @@ -835,7 +835,7 @@ func TestPartialFinalPanic(t *testing.T) {
"log contains the target states and handler")
assert.Contains(t, log, "[error:trace] goroutine",
"log contains the stack trace")
assert.Regexp(t, `\[cancel:\w+?] \(A B C\) by recover`, log,
assert.Regexp(t, `\[cancel] \(A B C\) by recover`, log,
"log contains the target states and handler")
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/machine/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type A map[string]any
// T is an ordered list of state clocks.
type T []uint64

// Clocks is a map of state names to their clocks.
type Clocks map[string]uint64

// State defines a single state of a machine, its properties and relations.
type State struct {
Auto bool
Expand Down
6 changes: 3 additions & 3 deletions pkg/machine/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func (rr *DefaultRelationsResolver) GetTargetStates(
}
m.log(lvl, "[rel:remove] %s by %s", name, j(blockedBy))
if m.Is(S{name}) {
t.addSteps(newStep(name, "", TransitionStepTypeRemove, nil))
t.addSteps(newStep("", name, TransitionStepTypeRemove, nil))
} else {
t.addSteps(newStep(name, "", TransitionStepTypeNoSet, nil))
t.addSteps(newStep("", name, TransitionStepTypeNoSet, nil))
}
return false
})
Expand Down Expand Up @@ -179,7 +179,7 @@ func (rr *DefaultRelationsResolver) stateBlockedBy(
if !lo.Contains(state.Remove, blocked) {
continue
}
t.addSteps(newStep(blocked, blocking, TransitionStepTypeRelation,
t.addSteps(newStep(blocking, blocked, TransitionStepTypeRelation,
RelationRemove))
blockedBy = append(blockedBy, blocking)
}
Expand Down
Loading

0 comments on commit d51db0f

Please sign in to comment.