Skip to content

Commit

Permalink
fix: address misc issues (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
pancsta committed Jul 10, 2024
1 parent 561ac9f commit 737770f
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 115 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ var States = am.Struct{

- [godoc](https://godoc.org/github.com/pancsta/asyncmachine-go/pkg/machine)
- [cookbook](/docs/cookbook.md)
- [manual](/docs/manual.md)
- [manual](/docs/manual.md) ([pdf](/assets/manual.pdf))
- [Machine and States](/docs/manual.md#machine-and-states)
- [State Clocks and Context](/docs/manual.md#state-clocks-and-context)
- [Auto States](/docs/manual.md#auto-states)
Expand Down Expand Up @@ -401,13 +401,14 @@ See [`tools/cmd/am-gen`](tools/cmd/am-gen/README.md) for more info.
![TUI Debugger](assets/am-dbg.png)

`am-dbg` is a lightweight, multi-client debugger for AM. It easily handles >100
client machines simultaneously (and potentially many more). Some features include:
client machines simultaneously streaming telemetry data (and potentially many more). Some features include:

- states tree
- log view
- time travel
- transition steps
- import / export
- filters
- matrix view

See [`tools/cmd/am-dbg`](tools/cmd/am-dbg/README.md) for more info.
Expand Down Expand Up @@ -488,7 +489,7 @@ for more info.

### am-dbg

am-dbg is a [tview](https://github.com/rivo/tview/) TUI app with a single machine consisting of:
am-dbg is a [cview](https://code.rocket9labs.com/tslocum/cview) TUI app with a single machine consisting of:

- input events (7 states)
- external state (11 states)
Expand Down
22 changes: 11 additions & 11 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,33 @@ tasks:
- task: build-telemetry
- task: build-debugger

debugger:
am-dbg:
silent: false
cmds:
- go run tools/cmd/am-dbg/main.go {{.CLI_ARGS}}

debugger-assets:
am-dbg-asset:
silent: false
cmds:
- |
go run tools/cmd/am-dbg/main.go \
--import-data assets/am-dbg-dump-sim.gob.bz2 \
- go run tools/cmd/am-dbg/main.go
--import-data assets/am-dbg-sim.gob.bz2
{{.CLI_ARGS}}

debugger-client:
am-dbg-dump:
silent: false
cmds:
- go run tools/cmd/am-dbg/main.go
--log-level 2
--am-dbg-url localhost:9913
- go run tools/cmd/am-dbg/main.go
--import-data am-dbg-dump.gob.bz2
{{.CLI_ARGS}}

debugger-debugger:
am-dbg-dbg:
cmds:
- go run tools/cmd/am-dbg/main.go
--log-file am-dbg-dbg.log
--log-level 2
--listen-on localhost:9913
--select-connected
{{.CLI_ARGS}}

test:
cmds:
Expand Down Expand Up @@ -169,7 +170,6 @@ tasks:
gen-states-file:
desc: Generate states file for provided state names, eg "task gen-states-file -- Foo,Bar"
example: task gen-states-file --state-names "state1,state2,state3"
cmds:
- go run tools/cmd/am-gen/main.go states-file {{.CLI_ARGS}}

Expand Down
4 changes: 2 additions & 2 deletions docs/cookbook.md
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@ import (
)

func main() {
m := am.New(nil, ss.States, nil)
ctx := context.Background()
m := am.New(ctx, ss.States, nil)
err := m.VerifyStates(ss.Names)
if err != nil {
print(err)
Expand Down Expand Up @@ -549,7 +550,6 @@ func Msg(msgTx *Msg) {
// enqueue
queue = append(queue, msgTx)
}

```
### Switch a state group
Expand Down
4 changes: 2 additions & 2 deletions examples/nfa/nfa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ func (t *Regexp) input1(mach *am.Machine) {

// example

func TestRegexp(t *testing.T) {
func TestNFA(t *testing.T) {
var err error
mach := am.New(context.Background(), states, &am.Opts{
ID: "regexp",
ID: "nfa",
DontPanicToException: true,
DontLogID: true,
LogLevel: am.LogChanges,
Expand Down
19 changes: 6 additions & 13 deletions pkg/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
)

type History struct {
*am.NoOpTracer

Entries []Entry
// LastActivated is a map of state names to the last time they were activated
LastActivated map[string]time.Time
Expand All @@ -22,27 +24,15 @@ type History struct {
maxEntries int
}

func (h *History) TransitionInit(transition *am.Transition) {}
func (h *History) HandlerStart(transition *am.Transition, emitter string,
handler string) {
}

func (h *History) HandlerEnd(transition *am.Transition, emitter string,
handler string) {
}
func (h *History) End() {}
func (h *History) MachineInit(mach *am.Machine) {}
func (h *History) NewSubmachine(parent, mach *am.Machine) {}
func (h *History) Inheritable() bool {
return false
}
func (h *History) MachineDispose(machID string) {}
func (h *History) QueueEnd(mach *am.Machine) {}

func (h *History) TransitionEnd(tx *am.Transition) {
if !tx.Accepted {
return
}

mut := tx.Mutation
match := false
for _, name := range h.States {
Expand All @@ -54,6 +44,7 @@ func (h *History) TransitionEnd(tx *am.Transition) {
if !match {
return
}

// rotate TODO optimize rotation
if len(h.Entries) >= h.maxEntries {
cutFrom := len(h.Entries) - h.maxEntries
Expand Down Expand Up @@ -169,12 +160,14 @@ func Track(mach *am.Machine, states am.S, maxEntries int) *History {
States: states,
maxEntries: maxEntries,
}

// mark active states as activated to reflect the current state
for _, name := range states {
if mach.Is1(name) {
history.LastActivated[name] = time.Now()
}
}
mach.Tracers = append(mach.Tracers, history)

return history
}
69 changes: 22 additions & 47 deletions pkg/machine/machine.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
// Package machine is a general purpose state machine for managing complex
// async workflows in a safe and structured way.
//
// It can be used as a lightweight in-memory Temporal [1] alternative, worker
// for Asynq [2], or to write simple consensus engines, stateful firewalls,
// telemetry, bots, etc.
//
// The project itself is a minimal implementation of AsyncMachine in Golang
// It's a dependency-free implementation of AsyncMachine in Golang
// using channels and context. It aims at simplicity and speed.
//
// [1]: https://github.com/temporalio/temporal
// [2]: https://github.com/hibiken/asynq
package machine // import "github.com/pancsta/asyncmachine-go/pkg/machine"

import (
Expand All @@ -29,7 +22,7 @@ import (
// Machine represent states, provides mutation methods, helpers methods and
// info about the current and scheduled transitions (if any).
type Machine struct {
// Unique ID of this machine. Default: random ID.
// Unique ID of this machine. Default: random ID. Read-only.
ID string
// Time for a handler to execute. Default: time.Second
HandlerTimeout time.Duration
Expand Down Expand Up @@ -400,7 +393,7 @@ func (m *Machine) disposeEmitter(emitter *emitter) {
// ctx: optional context defaults to the machine's context.
func (m *Machine) WhenErr(ctx context.Context) <-chan struct{} {
// handle with a shared channel with broadcast via close
return m.When([]string{"Exception"}, ctx)
return m.When([]string{Exception}, ctx)
}

// When returns a channel that will be closed when all the passed states
Expand Down Expand Up @@ -444,36 +437,7 @@ func (m *Machine) When(states S, ctx context.Context) <-chan struct{} {
}

// dispose with context
// TODO extract
if ctx != nil {
go func() {
select {
case <-ch:
return
case <-m.Ctx.Done():
return
case <-ctx.Done():
}

// GC only if needed
if m.Disposed {
return
}

m.activeStatesLock.Lock()
defer m.activeStatesLock.Unlock()

for _, s := range states {
if _, ok := m.indexWhen[s]; ok {
if len(m.indexWhen[s]) == 1 {
delete(m.indexWhen, s)
} else {
m.indexWhen[s] = slicesWithout(m.indexWhen[s], binding)
}
}
}
}()
}
diposeWithCtx(m, ctx, ch, states, binding, &m.activeStatesLock, m.indexWhen)

// insert the binding
for _, s := range states {
Expand Down Expand Up @@ -996,7 +960,7 @@ func (m *Machine) queueMutation(mutationType MutationType, states S, args A) {
statesParsed := m.MustParseStates(states)
// Detect duplicates and avoid queueing them.
if len(args) == 0 && m.detectQueueDuplicates(mutationType, statesParsed) {
m.log(LogOps, "[queue:skipped] Duplicate detected for [%s] '%s'",
m.log(LogOps, "[queue:skipped] Duplicate detected for [%s] %s",
mutationType, j(statesParsed))
return
}
Expand Down Expand Up @@ -1158,6 +1122,7 @@ func (m *Machine) BindHandlers(handlers any) error {
//
// It's not supported to nest OnEvent() calls, as it would cause a deadlock.
// Using OnEvent is recommended only in special cases, like test assertions.
// The Tracer API is a better way to event feeds.
func (m *Machine) OnEvent(events []string, ctx context.Context) chan *Event {
ch := make(chan *Event, 50)
if m.Disposed {
Expand Down Expand Up @@ -1855,7 +1820,11 @@ func (m *Machine) processEmitters(e *Event) (Result, bool) {
timeout: false,
}
m.handlerTimer.Reset(m.HandlerTimeout)
m.handlerStart <- handler
select {
case <-m.Ctx.Done():
break
case m.handlerStart <- handler:
}

// wait on the result / timeout / context
select {
Expand Down Expand Up @@ -1942,7 +1911,13 @@ func (m *Machine) handlerLoop() {
continue
}

m.handlerEnd <- ret
select {
case <-m.Ctx.Done():
// dispose with context
m.Dispose()
return
case m.handlerEnd <- ret:
}
}
}
}
Expand Down Expand Up @@ -1994,9 +1969,9 @@ func (m *Machine) processWhenArgs(e *Event) {
}
}

// detectQueueDuplicates checks for duplicated mutations without params.
// 1. Check if a mutation is scheduled
// 2. Check if a counter mutation isn't scheduled later
// detectQueueDuplicates checks for duplicated mutations
// 1. Check if a mutation is scheduled (without params)
// 2. Check if a counter mutation isn't scheduled later (any params)
func (m *Machine) detectQueueDuplicates(mutationType MutationType,
parsed S,
) bool {
Expand Down Expand Up @@ -2332,7 +2307,7 @@ func (m *Machine) CanRemove(states S) bool {
panic("CanRemove not implemented; github.com/pancsta/asyncmachine-go/pulls")
}

// Clocks returns the map of specified cloks or all clocks if states is nil.
// Clocks returns the map of specified clocks or all clocks if states is nil.
func (m *Machine) Clocks(states S) Clocks {
if states == nil {
return maps.Clone(m.clock)
Expand Down
Loading

0 comments on commit 737770f

Please sign in to comment.