diff --git a/pkg/machine/machine.go b/pkg/machine/machine.go index fd23096..a554265 100644 --- a/pkg/machine/machine.go +++ b/pkg/machine/machine.go @@ -66,7 +66,7 @@ type Machine struct { StateNames S emitters []*emitter - clock map[string]uint64 + clock Clocks cancel context.CancelFunc logLevel LogLevel logger Logger @@ -81,6 +81,8 @@ type Machine struct { indexEventChLock sync.Mutex handlerDone chan bool handlerPanic chan any + logEntriesLock sync.Mutex + logEntries []string } // New creates a new Machine instance, bound to context and modified with @@ -690,8 +692,7 @@ func (m *Machine) recoverToErr(emitter *emitter, r any) { m.setActiveStates(t.CalledStates(), activeStates, t.IsAuto()) t.IsCompleted = true } - m.log(LogOps, "[cancel:%s] (%s) by recover", t.latestStep.ID[:5], - j(t.TargetStates)) + m.log(LogOps, "[cancel] (%s) by recover", j(t.TargetStates)) if t.Mutation == nil { // TODO can this even happen? panic(fmt.Sprintf("no mutation panic in %s: %s", emitter.id, err)) @@ -763,6 +764,10 @@ func (m *Machine) setActiveStates(calledStates S, targetStates S, if !lo.Contains(previous, state) || (lo.Contains(calledStates, state) && data.Multi) { m.clock[state]++ + // treat multi states as new states (for logging) + if data.Multi && lo.Contains(previous, state) { + newStates = append(newStates, state) + } } } @@ -817,7 +822,7 @@ func (m *Machine) processQueue() Result { item := &m.Queue[0] m.Queue = m.Queue[1:] m.queueLock.Unlock() - m.Transition = newTransition(m, item) + newTransition(m, item) // execute the transition ret = append(ret, m.Transition.emitEvents()) m.processWhenBindings() @@ -937,11 +942,24 @@ func (m *Machine) log(level LogLevel, msg string, args ...any) { if m.LogID { msg = "[" + m.ID[:5] + "] " + msg } + out := fmt.Sprintf(msg, args...) if m.logger != nil { m.logger(level, msg, args...) - return + } else { + fmt.Println(out) + } + + t := m.Transition + if t != nil { + // append the log msg to the current transition + t.LogEntries = append(t.LogEntries, out) + } else { + // append the log msg the machine and collect at the end of the next + // transition + m.logEntriesLock.Lock() + defer m.logEntriesLock.Unlock() + m.logEntries = append(m.logEntries, out) } - fmt.Printf(msg+"\n", args...) } // SetLogger sets a custom logger function. @@ -949,10 +967,17 @@ func (m *Machine) SetLogger(fn Logger) { m.logger = fn } +// GetLogger returns the current custom logger function, or nil. +func (m *Machine) GetLogger() Logger { + return m.logger +} + // emit is a synchronous (blocking) emit with cancellation via a return channel. // Can block indefinitely if the handler doesn't return or the emitter isn't // accepting events. -func (m *Machine) emit(name string, args A, step *TransitionStep) Result { +func (m *Machine) emit( + name string, args A, step *TransitionStep, +) (Result, bool) { e := &Event{ Name: name, Machine: m, @@ -966,14 +991,14 @@ func (m *Machine) emit(name string, args A, step *TransitionStep) Result { targetStates = j(t.TargetStates) } // call the handlers - res := m.processEmitters(e) + res, handlerCalled := m.processEmitters(e) if m.panicCaught { res = Canceled m.panicCaught = false } // check if this is an internal event if step == nil { - return Executed + return Executed, handlerCalled } // negotiation support if !step.IsFinal && res == Canceled { @@ -981,29 +1006,21 @@ func (m *Machine) emit(name string, args A, step *TransitionStep) Result { if step.IsSelf { self = ":self" } - stepID := step.ID[:5] - m.log(LogOps, "[cancel%s:%s] (%s) by %s", self, stepID, + m.log(LogOps, "[cancel%s] (%s) by %s", self, targetStates, name) - // queue-end lacks a transition - if t != nil { - t.addSteps(step) - } - return Canceled + return Canceled, handlerCalled } - return Executed + return Executed, handlerCalled } -func (m *Machine) processEmitters(e *Event) Result { +func (m *Machine) processEmitters(e *Event) (Result, bool) { var emitter *emitter + handlerCalled := false for _, emitter = range m.emitters { if m.Ctx.Err() != nil { break } method := e.Name - stepID := method - if e.step != nil { - stepID = e.step.ID[:5] - } // internal event if e.step == nil { break @@ -1014,9 +1031,7 @@ func (m *Machine) processEmitters(e *Event) Result { emitterID = emitterID[:15] } emitterID = padString(strings.ReplaceAll(emitterID, " ", "_"), 15, "_") - m.log(LogEverything, "[emit:%-15s:%s] %s", emitterID, - stepID, method) - m.Transition.addSteps(e.step) + m.log(LogEverything, "[emit:%-15s] %s", emitterID, method) } // if no handler, skip if !emitter.methods.MethodByName(method).IsValid() { @@ -1025,6 +1040,7 @@ func (m *Machine) processEmitters(e *Event) Result { m.log(LogOps, "[handler] %s", method) // call the handler var ret bool + handlerCalled = true go func() { if m.PanicToException { @@ -1050,7 +1066,7 @@ func (m *Machine) processEmitters(e *Event) Result { case <-m.Ctx.Done(): break case <-time.After(m.HandlerTimeout): - m.log(LogOps, "[cancel:%s] (%s) by timeout", stepID, + m.log(LogOps, "[cancel] (%s) by timeout", j(m.Transition.TargetStates)) break case ret = <-m.handlerDone: @@ -1066,14 +1082,14 @@ func (m *Machine) processEmitters(e *Event) Result { // returns from State and End handlers are ignored default: if !ret { - return Canceled + return Canceled, handlerCalled } } } if m.processEventChs(e) == Canceled { - return Canceled + return Canceled, handlerCalled } - return Executed + return Executed, handlerCalled } // processEventChs sends the event to all On() dynamic handlers. @@ -1176,12 +1192,25 @@ func (m *Machine) Has(states S) bool { // HasStateChanged checks current active states have changed from the passed // ones. -func (m *Machine) HasStateChanged(before S) bool { +// Optionally also compares clock ticks. +func (m *Machine) HasStateChanged(before S, clocks Clocks) bool { m.activeStatesLock.RLock() defer m.activeStatesLock.RUnlock() lenEqual := len(before) == len(m.ActiveStates) - return !lenEqual || len(DiffStates(before, m.ActiveStates)) > 0 + if !lenEqual || len(DiffStates(before, m.ActiveStates)) > 0 { + return true + } + if clocks == nil { + return true + } + // compare clocks + for state, tick := range clocks { + if m.clock[state] != tick { + return true + } + } + return false } // String returns a one line representation of the currently active states, diff --git a/pkg/machine/machine_test.go b/pkg/machine/machine_test.go index 2379a08..9f935d4 100644 --- a/pkg/machine/machine_test.go +++ b/pkg/machine/machine_test.go @@ -463,7 +463,7 @@ func TestNegotiationCancel(t *testing.T) { // DEnter will cancel the transition return m.Set(S{"D"}, nil) }, - regexp.MustCompile(`\[cancel:\w+?] \(D\) by DEnter`), + regexp.MustCompile(`\[cancel] \(D\) by DEnter`), }, { "using add", @@ -472,7 +472,7 @@ func TestNegotiationCancel(t *testing.T) { // DEnter will cancel the transition return m.Add(S{"D"}, nil) }, - regexp.MustCompile(`\[cancel:\w+?] \(D A\) by DEnter`), + regexp.MustCompile(`\[cancel] \(D A\) by DEnter`), }, } for _, test := range tests { @@ -541,7 +541,7 @@ func TestNegotiationRemove(t *testing.T) { // assert assert.Equal(t, am.Canceled, result, "transition should be canceled") assertStates(t, m, S{"A"}, "state shouldnt be changed") - assert.Regexp(t, `\[cancel:\w+?] \(\) by AExit`, log, + assert.Regexp(t, `\[cancel] \(\) by AExit`, log, "log should explain the reason of cancellation") } @@ -803,7 +803,7 @@ func TestPartialNegotiationPanic(t *testing.T) { assert.Equal(t, am.Canceled, m.Add(S{"B"}, nil)) // assert assertStates(t, m, S{"A", "Exception"}) - assert.Regexp(t, `\[cancel:\w+?] \(B A\) by recover`, log, + assert.Regexp(t, `\[cancel] \(B A\) by recover`, log, "log contains the target states and handler") } @@ -835,7 +835,7 @@ func TestPartialFinalPanic(t *testing.T) { "log contains the target states and handler") assert.Contains(t, log, "[error:trace] goroutine", "log contains the stack trace") - assert.Regexp(t, `\[cancel:\w+?] \(A B C\) by recover`, log, + assert.Regexp(t, `\[cancel] \(A B C\) by recover`, log, "log contains the target states and handler") } diff --git a/pkg/machine/misc.go b/pkg/machine/misc.go index 4491ce1..b7bafef 100644 --- a/pkg/machine/misc.go +++ b/pkg/machine/misc.go @@ -18,6 +18,9 @@ type A map[string]any // T is an ordered list of state clocks. type T []uint64 +// Clocks is a map of state names to their clocks. +type Clocks map[string]uint64 + // State defines a single state of a machine, its properties and relations. type State struct { Auto bool diff --git a/pkg/machine/resolver.go b/pkg/machine/resolver.go index 634eeae..1411792 100644 --- a/pkg/machine/resolver.go +++ b/pkg/machine/resolver.go @@ -61,9 +61,9 @@ func (rr *DefaultRelationsResolver) GetTargetStates( } m.log(lvl, "[rel:remove] %s by %s", name, j(blockedBy)) if m.Is(S{name}) { - t.addSteps(newStep(name, "", TransitionStepTypeRemove, nil)) + t.addSteps(newStep("", name, TransitionStepTypeRemove, nil)) } else { - t.addSteps(newStep(name, "", TransitionStepTypeNoSet, nil)) + t.addSteps(newStep("", name, TransitionStepTypeNoSet, nil)) } return false }) @@ -179,7 +179,7 @@ func (rr *DefaultRelationsResolver) stateBlockedBy( if !lo.Contains(state.Remove, blocked) { continue } - t.addSteps(newStep(blocked, blocking, TransitionStepTypeRelation, + t.addSteps(newStep(blocking, blocked, TransitionStepTypeRelation, RelationRemove)) blockedBy = append(blockedBy, blocking) } diff --git a/pkg/machine/transition.go b/pkg/machine/transition.go index 0e262fb..63526c8 100644 --- a/pkg/machine/transition.go +++ b/pkg/machine/transition.go @@ -12,7 +12,6 @@ func newStep(from string, to string, stepType TransitionStepType, data any, ) *TransitionStep { return &TransitionStep{ - ID: uuid.New().String(), FromState: from, ToState: to, Type: stepType, @@ -32,15 +31,15 @@ func newSteps(from string, toStates S, stepType TransitionStepType, // Transition represents processing of a single mutation withing a machine. type Transition struct { + ID string // List of steps taken by this transition (so far). - // TODO []*TransitionStep Steps []*TransitionStep - // Index of added steps. - StepIDs map[string]bool // When true, execution of the transition has been completed. IsCompleted bool // states before the transition StatesBefore S + // clocks of the states from before the transition + ClocksBefore Clocks // States with "enter" handlers to execute Enters S // States with "exit" handlers to executed @@ -53,6 +52,8 @@ type Transition struct { Mutation *Mutation // Parent machine Machine *Machine + // Log entries produced during the transition + LogEntries []string // Latest / current step of the transition latestStep *TransitionStep @@ -63,13 +64,22 @@ func newTransition(m *Machine, item *Mutation) *Transition { m.activeStatesLock.RLock() defer m.activeStatesLock.RUnlock() + clocks := Clocks{} + for _, state := range m.ActiveStates { + clocks[state] = m.clock[state] + } t := &Transition{ + ID: uuid.New().String(), Mutation: item, StatesBefore: m.ActiveStates, + ClocksBefore: clocks, Machine: m, Accepted: true, - StepIDs: map[string]bool{}, } + // set early to catch the logs + m.Transition = t + // set early to catch the logs + m.Transition = t states := t.CalledStates() mutType := t.Type() t.addSteps(newSteps("", states, TransitionStepTypeRequested, nil)...) @@ -133,14 +143,7 @@ func (t *Transition) Type() MutationType { } func (t *Transition) addSteps(steps ...*TransitionStep) { - for _, step := range steps { - // prevent dups from >1 emitter - if _, ok := t.StepIDs[step.ID]; ok { - continue - } - t.StepIDs[step.ID] = true - t.Steps = append(t.Steps, step) - } + t.Steps = append(t.Steps, steps...) } // String representation of the transition and the steps taken so far. @@ -188,6 +191,7 @@ func (t *Transition) setupExitEnter() { func (t *Transition) emitSelfEvents() Result { m := t.Machine ret := Executed + var handlerCalled bool for _, s := range t.CalledStates() { // only the active states if !t.Machine.Is(S{s}) { @@ -196,8 +200,10 @@ func (t *Transition) emitSelfEvents() Result { name := s + s step := newStep(s, "", TransitionStepTypeTransition, name) step.IsSelf = true - t.addSteps(step) - ret = m.emit(name, t.Mutation.Args, step) + ret, handlerCalled = m.emit(name, t.Mutation.Args, step) + if handlerCalled { + t.addSteps(step) + } if ret == Canceled { break } @@ -245,9 +251,12 @@ func (t *Transition) emitExitEvents() Result { func (t *Transition) emitHandler(from, to, event string, args A) Result { step := newStep(from, to, TransitionStepTypeTransition, event) - t.addSteps(step) t.latestStep = step - return t.Machine.emit(event, args, step) + ret, handlerCalled := t.Machine.emit(event, args, step) + if handlerCalled { + t.addSteps(step) + } + return ret } func (t *Transition) emitFinalEvents() { @@ -266,7 +275,10 @@ func (t *Transition) emitFinalEvents() { step.IsFinal = true step.IsEnter = isEnter t.latestStep = step - ret := t.Machine.emit(handler, t.Mutation.Args, step) + ret, handlerCalled := t.Machine.emit(handler, t.Mutation.Args, step) + if handlerCalled { + t.addSteps(step) + } if ret == Canceled { break } @@ -304,7 +316,7 @@ func (t *Transition) emitEvents() Result { m.setActiveStates(t.CalledStates(), t.TargetStates, t.IsAuto()) t.emitFinalEvents() t.IsCompleted = true - hasStateChanged = m.HasStateChanged(t.StatesBefore) + hasStateChanged = m.HasStateChanged(t.StatesBefore, t.ClocksBefore) if hasStateChanged { m.emit("tick", A{"before": t.StatesBefore}, nil) } @@ -322,7 +334,12 @@ func (t *Transition) emitEvents() Result { } } - // stop emitting + // stop emitting, collect previous log entries + m.logEntriesLock.Lock() + // TODO struct type + txArgs["pre_logs"] = m.logEntries + m.logEntries = []string{} + m.logEntriesLock.Unlock() m.emit("transition-end", txArgs, nil) if result == Canceled { diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go new file mode 100644 index 0000000..d179df8 --- /dev/null +++ b/pkg/telemetry/telemetry.go @@ -0,0 +1,161 @@ +package telemetry + +import ( + "encoding/gob" + "log" + "net/rpc" + + "github.com/samber/lo" + + am "github.com/pancsta/asyncmachine-go/pkg/machine" +) + +const RpcHost = "localhost:9823" + +type Msg interface { + // Clock returns the state's clock, using the passed index + Clock(statesIndex am.S, state string) uint64 + // Is true if the state is active, using the passed index + Is(statesIndex am.S, states am.S) bool +} + +// MsgStruct contains the state and relations data. +type MsgStruct struct { + // Machine ID + ID string + // state names defining the indexes for diffs + StatesIndex am.S + // all the states with relations + States am.States + // log level of the machine + LogLevel am.LogLevel +} + +func (d *MsgStruct) Clock(_ am.S, _ string) uint64 { + return 0 +} + +func (d *MsgStruct) Is(_ am.S, _ am.S) bool { + return false +} + +// MsgTx contains transaction data. +type MsgTx struct { + // Transition ID + ID string + // map of positions from the index to the active state + StatesActive []bool + // map of positions from the index to state clocks + Clocks []uint64 + // result of the transition + Accepted bool + // all the transition steps + Steps []*am.TransitionStep + // log entries since the last diff + LogEntries []string + // log entries before the transition, which happened after the prev one + PreLogEntries []string + // tx was triggered by an auto state + IsAuto bool +} + +func (d *MsgTx) Clock(statesIndex am.S, state string) uint64 { + idx := lo.IndexOf(statesIndex, state) + return d.Clocks[idx] +} + +func (d *MsgTx) Is(statesIndex am.S, states am.S) bool { + for _, state := range states { + idx := lo.IndexOf(statesIndex, state) //nolint:typecheck + if !d.StatesActive[idx] { + return false + } + } + return true +} + +type rpcClient struct { + client *rpc.Client +} + +func newClient(url string) (*rpcClient, error) { + log.Printf("Connecting to %s", url) + client, err := rpc.Dial("tcp", url) + if err != nil { + return nil, err + } + return &rpcClient{client: client}, nil +} + +func (c *rpcClient) sendMsgTx(msg *MsgTx) error { + var reply string + // TODO use Go() to not block + err := c.client.Call("RPCServer.MsgTx", msg, &reply) + if err != nil { + return err + } + return nil +} + +func (c *rpcClient) sendMsgStruct(msg *MsgStruct) error { + var reply string + // TODO use Go() to not block + err := c.client.Call("RPCServer.MsgStruct", msg, &reply) + if err != nil { + return err + } + return nil +} + +func MonitorTransitions(m *am.Machine, url string) error { + var err error + gob.Register(am.Relation(0)) + client, err := newClient(url) + if err != nil { + return err + } + msg := &MsgStruct{ + ID: m.ID, + StatesIndex: m.StateNames, + States: m.States, + LogLevel: m.LogLevel, + } + // TODO retries + err = client.sendMsgStruct(msg) + if err != nil { + log.Println(err) + } + go func() { + // bind to transitions + txEndCh := m.On([]string{"transition-end"}, nil) + // send incoming transitions + for event := range txEndCh { + tx := event.Args["transition"].(*am.Transition) + preLogs := event.Args["pre_logs"].([]string) + msg := &MsgTx{ + ID: tx.ID, + StatesActive: make([]bool, len(m.StateNames)), + Clocks: make([]uint64, len(m.StateNames)), + Accepted: tx.Accepted, + Steps: lo.Map(tx.Steps, + func(step *am.TransitionStep, _ int) *am.TransitionStep { + return step + }), + LogEntries: tx.LogEntries, + PreLogEntries: preLogs, + IsAuto: tx.IsAuto(), + } + for i, state := range m.StateNames { + msg.StatesActive[i] = m.Is(am.S{state}) + msg.Clocks[i] = m.Clock(state) + } + // TODO retries + err := client.sendMsgTx(msg) + if err != nil { + log.Println(err) + return + } + } + }() + return nil +}