diff --git a/README.md b/README.md index 3696a28..2eb6364 100644 --- a/README.md +++ b/README.md @@ -321,7 +321,7 @@ var States = am.Struct{ - [godoc](https://godoc.org/github.com/pancsta/asyncmachine-go/pkg/machine) - [cookbook](/docs/cookbook.md) -- [manual](/docs/manual.md) +- [manual](/docs/manual.md) ([pdf](/assets/manual.pdf)) - [Machine and States](/docs/manual.md#machine-and-states) - [State Clocks and Context](/docs/manual.md#state-clocks-and-context) - [Auto States](/docs/manual.md#auto-states) @@ -401,13 +401,14 @@ See [`tools/cmd/am-gen`](tools/cmd/am-gen/README.md) for more info. ![TUI Debugger](assets/am-dbg.png) `am-dbg` is a lightweight, multi-client debugger for AM. It easily handles >100 - client machines simultaneously (and potentially many more). Some features include: + client machines simultaneously streaming telemetry data (and potentially many more). Some features include: - states tree - log view - time travel - transition steps - import / export +- filters - matrix view See [`tools/cmd/am-dbg`](tools/cmd/am-dbg/README.md) for more info. @@ -488,7 +489,7 @@ for more info. ### am-dbg -am-dbg is a [tview](https://github.com/rivo/tview/) TUI app with a single machine consisting of: +am-dbg is a [cview](https://code.rocket9labs.com/tslocum/cview) TUI app with a single machine consisting of: - input events (7 states) - external state (11 states) diff --git a/Taskfile.yml b/Taskfile.yml index 84dc9f9..80e4ffb 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -25,32 +25,33 @@ tasks: - task: build-telemetry - task: build-debugger - debugger: + am-dbg: silent: false cmds: - go run tools/cmd/am-dbg/main.go {{.CLI_ARGS}} - debugger-assets: + am-dbg-asset: silent: false cmds: - - | - go run tools/cmd/am-dbg/main.go \ - --import-data assets/am-dbg-dump-sim.gob.bz2 \ + - go run tools/cmd/am-dbg/main.go + --import-data assets/am-dbg-sim.gob.bz2 {{.CLI_ARGS}} - debugger-client: + am-dbg-dump: + silent: false cmds: - - go run tools/cmd/am-dbg/main.go - --log-level 2 - --am-dbg-url localhost:9913 + - go run tools/cmd/am-dbg/main.go + --import-data am-dbg-dump.gob.bz2 + {{.CLI_ARGS}} - debugger-debugger: + am-dbg-dbg: cmds: - go run tools/cmd/am-dbg/main.go --log-file am-dbg-dbg.log --log-level 2 --listen-on localhost:9913 --select-connected + {{.CLI_ARGS}} test: cmds: @@ -169,7 +170,6 @@ tasks: gen-states-file: desc: Generate states file for provided state names, eg "task gen-states-file -- Foo,Bar" - example: task gen-states-file --state-names "state1,state2,state3" cmds: - go run tools/cmd/am-gen/main.go states-file {{.CLI_ARGS}} diff --git a/docs/cookbook.md b/docs/cookbook.md index 729c9d8..5a8f140 100644 --- a/docs/cookbook.md +++ b/docs/cookbook.md @@ -451,7 +451,8 @@ import ( ) func main() { - m := am.New(nil, ss.States, nil) + ctx := context.Background() + m := am.New(ctx, ss.States, nil) err := m.VerifyStates(ss.Names) if err != nil { print(err) @@ -549,7 +550,6 @@ func Msg(msgTx *Msg) { // enqueue queue = append(queue, msgTx) } - ``` ### Switch a state group diff --git a/examples/nfa/nfa_test.go b/examples/nfa/nfa_test.go index bc77856..1b337fc 100644 --- a/examples/nfa/nfa_test.go +++ b/examples/nfa/nfa_test.go @@ -155,10 +155,10 @@ func (t *Regexp) input1(mach *am.Machine) { // example -func TestRegexp(t *testing.T) { +func TestNFA(t *testing.T) { var err error mach := am.New(context.Background(), states, &am.Opts{ - ID: "regexp", + ID: "nfa", DontPanicToException: true, DontLogID: true, LogLevel: am.LogChanges, diff --git a/pkg/history/history.go b/pkg/history/history.go index 110b8f9..b526e3f 100644 --- a/pkg/history/history.go +++ b/pkg/history/history.go @@ -12,6 +12,8 @@ import ( ) type History struct { + *am.NoOpTracer + Entries []Entry // LastActivated is a map of state names to the last time they were activated LastActivated map[string]time.Time @@ -22,27 +24,15 @@ type History struct { maxEntries int } -func (h *History) TransitionInit(transition *am.Transition) {} -func (h *History) HandlerStart(transition *am.Transition, emitter string, - handler string) { -} - -func (h *History) HandlerEnd(transition *am.Transition, emitter string, - handler string) { -} -func (h *History) End() {} -func (h *History) MachineInit(mach *am.Machine) {} -func (h *History) NewSubmachine(parent, mach *am.Machine) {} func (h *History) Inheritable() bool { return false } -func (h *History) MachineDispose(machID string) {} -func (h *History) QueueEnd(mach *am.Machine) {} func (h *History) TransitionEnd(tx *am.Transition) { if !tx.Accepted { return } + mut := tx.Mutation match := false for _, name := range h.States { @@ -54,6 +44,7 @@ func (h *History) TransitionEnd(tx *am.Transition) { if !match { return } + // rotate TODO optimize rotation if len(h.Entries) >= h.maxEntries { cutFrom := len(h.Entries) - h.maxEntries @@ -169,6 +160,7 @@ func Track(mach *am.Machine, states am.S, maxEntries int) *History { States: states, maxEntries: maxEntries, } + // mark active states as activated to reflect the current state for _, name := range states { if mach.Is1(name) { @@ -176,5 +168,6 @@ func Track(mach *am.Machine, states am.S, maxEntries int) *History { } } mach.Tracers = append(mach.Tracers, history) + return history } diff --git a/pkg/machine/machine.go b/pkg/machine/machine.go index 5504b85..8bcaf88 100644 --- a/pkg/machine/machine.go +++ b/pkg/machine/machine.go @@ -1,15 +1,8 @@ // Package machine is a general purpose state machine for managing complex // async workflows in a safe and structured way. // -// It can be used as a lightweight in-memory Temporal [1] alternative, worker -// for Asynq [2], or to write simple consensus engines, stateful firewalls, -// telemetry, bots, etc. -// -// The project itself is a minimal implementation of AsyncMachine in Golang +// It's a dependency-free implementation of AsyncMachine in Golang // using channels and context. It aims at simplicity and speed. -// -// [1]: https://github.com/temporalio/temporal -// [2]: https://github.com/hibiken/asynq package machine // import "github.com/pancsta/asyncmachine-go/pkg/machine" import ( @@ -29,7 +22,7 @@ import ( // Machine represent states, provides mutation methods, helpers methods and // info about the current and scheduled transitions (if any). type Machine struct { - // Unique ID of this machine. Default: random ID. + // Unique ID of this machine. Default: random ID. Read-only. ID string // Time for a handler to execute. Default: time.Second HandlerTimeout time.Duration @@ -400,7 +393,7 @@ func (m *Machine) disposeEmitter(emitter *emitter) { // ctx: optional context defaults to the machine's context. func (m *Machine) WhenErr(ctx context.Context) <-chan struct{} { // handle with a shared channel with broadcast via close - return m.When([]string{"Exception"}, ctx) + return m.When([]string{Exception}, ctx) } // When returns a channel that will be closed when all the passed states @@ -444,36 +437,7 @@ func (m *Machine) When(states S, ctx context.Context) <-chan struct{} { } // dispose with context - // TODO extract - if ctx != nil { - go func() { - select { - case <-ch: - return - case <-m.Ctx.Done(): - return - case <-ctx.Done(): - } - - // GC only if needed - if m.Disposed { - return - } - - m.activeStatesLock.Lock() - defer m.activeStatesLock.Unlock() - - for _, s := range states { - if _, ok := m.indexWhen[s]; ok { - if len(m.indexWhen[s]) == 1 { - delete(m.indexWhen, s) - } else { - m.indexWhen[s] = slicesWithout(m.indexWhen[s], binding) - } - } - } - }() - } + diposeWithCtx(m, ctx, ch, states, binding, &m.activeStatesLock, m.indexWhen) // insert the binding for _, s := range states { @@ -996,7 +960,7 @@ func (m *Machine) queueMutation(mutationType MutationType, states S, args A) { statesParsed := m.MustParseStates(states) // Detect duplicates and avoid queueing them. if len(args) == 0 && m.detectQueueDuplicates(mutationType, statesParsed) { - m.log(LogOps, "[queue:skipped] Duplicate detected for [%s] '%s'", + m.log(LogOps, "[queue:skipped] Duplicate detected for [%s] %s", mutationType, j(statesParsed)) return } @@ -1158,6 +1122,7 @@ func (m *Machine) BindHandlers(handlers any) error { // // It's not supported to nest OnEvent() calls, as it would cause a deadlock. // Using OnEvent is recommended only in special cases, like test assertions. +// The Tracer API is a better way to event feeds. func (m *Machine) OnEvent(events []string, ctx context.Context) chan *Event { ch := make(chan *Event, 50) if m.Disposed { @@ -1855,7 +1820,11 @@ func (m *Machine) processEmitters(e *Event) (Result, bool) { timeout: false, } m.handlerTimer.Reset(m.HandlerTimeout) - m.handlerStart <- handler + select { + case <-m.Ctx.Done(): + break + case m.handlerStart <- handler: + } // wait on the result / timeout / context select { @@ -1942,7 +1911,13 @@ func (m *Machine) handlerLoop() { continue } - m.handlerEnd <- ret + select { + case <-m.Ctx.Done(): + // dispose with context + m.Dispose() + return + case m.handlerEnd <- ret: + } } } } @@ -1994,9 +1969,9 @@ func (m *Machine) processWhenArgs(e *Event) { } } -// detectQueueDuplicates checks for duplicated mutations without params. -// 1. Check if a mutation is scheduled -// 2. Check if a counter mutation isn't scheduled later +// detectQueueDuplicates checks for duplicated mutations +// 1. Check if a mutation is scheduled (without params) +// 2. Check if a counter mutation isn't scheduled later (any params) func (m *Machine) detectQueueDuplicates(mutationType MutationType, parsed S, ) bool { @@ -2332,7 +2307,7 @@ func (m *Machine) CanRemove(states S) bool { panic("CanRemove not implemented; github.com/pancsta/asyncmachine-go/pulls") } -// Clocks returns the map of specified cloks or all clocks if states is nil. +// Clocks returns the map of specified clocks or all clocks if states is nil. func (m *Machine) Clocks(states S) Clocks { if states == nil { return maps.Clone(m.clock) diff --git a/pkg/machine/misc.go b/pkg/machine/misc.go index 0b0fc47..65e8fe0 100644 --- a/pkg/machine/misc.go +++ b/pkg/machine/misc.go @@ -40,9 +40,9 @@ type State struct { // Struct is a map of state names to state definitions. type Struct = map[string]State -// /////////////// -// ///// options -// /////////////// +// ///// ///// ///// +// ///// OPTIONS +// ///// ///// ///// // Opts struct is used to configure a new Machine. type Opts struct { @@ -108,9 +108,9 @@ func OptsWithParentTracers(opts *Opts, parent *Machine) *Opts { return opts } -// /////////////// -// ///// enums -// /////////////// +// ///// ///// ///// +// ///// ENUMS +// ///// ///// ///// // Result enum is the result of a state Transition type Result int @@ -220,6 +220,7 @@ type Mutation struct { // StateWasCalled returns true if the Mutation was called (directly) with the // passed state (in opposite to it coming from an `Add` relation). +// TODO change to CalledIs(), CalledIs1(), CalledAny(), CalledAny1() func (m Mutation) StateWasCalled(state string) bool { return slices.Contains(m.CalledStates, state) } @@ -301,9 +302,9 @@ func (r Relation) String() string { return "" } -// /////////////// -// ///// logging -// /////////////// +// ///// ///// ///// +// ///// LOGGING +// ///// ///// ///// // Logger is a logging function for the machine. type Logger func(level LogLevel, msg string, args ...any) @@ -412,9 +413,9 @@ func (t *NoOpTracer) NewSubmachine(parent, mach *Machine) {} func (t *NoOpTracer) QueueEnd(machine *Machine) {} func (t *NoOpTracer) Inheritable() bool { return false } -// /////////////// -// ///// events, when, emitters -// /////////////// +// ///// ///// ///// +// ///// EVENTS, WHEN, EMITTERS +// ///// ///// ///// // Event struct represents a single event of a Mutation withing a Transition. // One event can have 0-n handlers. @@ -499,9 +500,9 @@ func (e *emitter) dispose() { e.methods = nil } -// /////////////// -// ///// exception support -// /////////////// +// ///// ///// ///// +// ///// EXCEPTION SUPPORT +// ///// ///// ///// const ( // Exception is a name the Exception state. @@ -547,9 +548,9 @@ func (eh *ExceptionHandler) ExceptionState(e *Event) { } } -// /////////////// -// ///// pub utils -// /////////////// +// ///// ///// ///// +// ///// PUB UTILS +// ///// ///// ///// // DiffStates returns the states that are in states1 but not in states2. func DiffStates(states1 S, states2 S) S { @@ -633,9 +634,9 @@ func SMerge(states ...S) S { return slicesUniq(s) } -// /////////////// -// ///// utils -// /////////////// +// ///// ///// ///// +// ///// UTILS +// ///// ///// ///// // j joins state names into a single string func j(states []string) string { diff --git a/pkg/machine/resolver.go b/pkg/machine/resolver.go index e3aa1c1..1e8ee24 100644 --- a/pkg/machine/resolver.go +++ b/pkg/machine/resolver.go @@ -271,8 +271,8 @@ func (rr *DefaultRelationsResolver) getMissingRequires( return ret } -// GetRelationsBetween returns a list of relation types between the given -// states. Not thread safe. +// GetRelationsBetween returns a list of directional relation types between +// the given states. Not thread safe. func (rr *DefaultRelationsResolver) GetRelationsBetween( fromState, toState string, ) ([]Relation, error) { diff --git a/pkg/machine/transition.go b/pkg/machine/transition.go index 161d757..37edaf4 100644 --- a/pkg/machine/transition.go +++ b/pkg/machine/transition.go @@ -40,11 +40,12 @@ type Transition struct { ClocksBefore Clocks // clocks of the states from after the transition // TODO timeAfter, produce Clocks via ClockAfter(), add index diffs + // TODO unify with pkg/telemetry ClocksAfter Clocks TAfter T - // State names with "enter" handlers to execute + // Struct with "enter" handlers to execute Enters S - // State names with "exit" handlers to executed + // Struct with "exit" handlers to executed Exits S // target states after parsing the relations TargetStates S @@ -258,10 +259,6 @@ func (t *Transition) emitSelfEvents() Result { func (t *Transition) emitEnterEvents() Result { for _, toState := range t.Enters { - - // TODO double check removal - // states implied by Add cant cancel the transition - // isCalled := slices.Contains(t.CalledStates(), toState) args := t.Mutation.Args ret := t.emitHandler(Any, toState, Any+toState, args) diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index eef8ac9..0da0a4d 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -20,9 +20,9 @@ import ( am "github.com/pancsta/asyncmachine-go/pkg/machine" ) -// //////////////// +// ///// ///// ///// // /// AM-DBG -// //////////////// +// ///// ///// ///// const DbgHost = "localhost:6831" @@ -75,7 +75,7 @@ type DbgMsgTx struct { IsAuto bool // queue length at the start of the transition Queue int - // dont send this over the wire + // don't send this over the wire Time *time.Time } @@ -228,7 +228,7 @@ func TransitionsToDBG(mach *am.Machine, url string) error { // TODO retries err := client.sendMsgTx(msg) if err != nil { - log.Println("failed to send a msg to am-dbg: " + url + err.Error()) + log.Printf("failed to send a msg to am-dbg: %s %s", url, err) return } } @@ -278,9 +278,9 @@ func removeLogPrefix(msg *DbgMsgTx) { } } -// //////////////// +// ///// ///// ///// // /// OPEN TELEMETRY -// //////////////// +// ///// ///// ///// // OtelMachTracer implements machine.Tracer for OpenTelemetry. // Support tracing multiple machines @@ -682,9 +682,11 @@ func (ot *OtelMachTracer) Inheritable() bool { return true } -// //////////////// +func (ot *OtelMachTracer) QueueEnd(*am.Machine) {} + +// ///// ///// ///// // /// UTILS -// //////////////// +// ///// ///// ///// // j joins state names func j(states []string) string {