Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: address misc issues #84

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ var States = am.Struct{

- [godoc](https://godoc.org/github.com/pancsta/asyncmachine-go/pkg/machine)
- [cookbook](/docs/cookbook.md)
- [manual](/docs/manual.md)
- [manual](/docs/manual.md) ([pdf](/assets/manual.pdf))
- [Machine and States](/docs/manual.md#machine-and-states)
- [State Clocks and Context](/docs/manual.md#state-clocks-and-context)
- [Auto States](/docs/manual.md#auto-states)
Expand Down Expand Up @@ -401,13 +401,14 @@ See [`tools/cmd/am-gen`](tools/cmd/am-gen/README.md) for more info.
![TUI Debugger](assets/am-dbg.png)

`am-dbg` is a lightweight, multi-client debugger for AM. It easily handles >100
client machines simultaneously (and potentially many more). Some features include:
client machines simultaneously streaming telemetry data (and potentially many more). Some features include:

- states tree
- log view
- time travel
- transition steps
- import / export
- filters
- matrix view

See [`tools/cmd/am-dbg`](tools/cmd/am-dbg/README.md) for more info.
Expand Down Expand Up @@ -488,7 +489,7 @@ for more info.

### am-dbg

am-dbg is a [tview](https://github.com/rivo/tview/) TUI app with a single machine consisting of:
am-dbg is a [cview](https://code.rocket9labs.com/tslocum/cview) TUI app with a single machine consisting of:

- input events (7 states)
- external state (11 states)
Expand Down
22 changes: 11 additions & 11 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,33 @@ tasks:
- task: build-telemetry
- task: build-debugger

debugger:
am-dbg:
silent: false
cmds:
- go run tools/cmd/am-dbg/main.go {{.CLI_ARGS}}

debugger-assets:
am-dbg-asset:
silent: false
cmds:
- |
go run tools/cmd/am-dbg/main.go \
--import-data assets/am-dbg-dump-sim.gob.bz2 \
- go run tools/cmd/am-dbg/main.go
--import-data assets/am-dbg-sim.gob.bz2
{{.CLI_ARGS}}

debugger-client:
am-dbg-dump:
silent: false
cmds:
- go run tools/cmd/am-dbg/main.go
--log-level 2
--am-dbg-url localhost:9913
- go run tools/cmd/am-dbg/main.go
--import-data am-dbg-dump.gob.bz2
{{.CLI_ARGS}}

debugger-debugger:
am-dbg-dbg:
cmds:
- go run tools/cmd/am-dbg/main.go
--log-file am-dbg-dbg.log
--log-level 2
--listen-on localhost:9913
--select-connected
{{.CLI_ARGS}}

test:
cmds:
Expand Down Expand Up @@ -169,7 +170,6 @@ tasks:

gen-states-file:
desc: Generate states file for provided state names, eg "task gen-states-file -- Foo,Bar"
example: task gen-states-file --state-names "state1,state2,state3"
cmds:
- go run tools/cmd/am-gen/main.go states-file {{.CLI_ARGS}}

Expand Down
4 changes: 2 additions & 2 deletions docs/cookbook.md
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@ import (
)

func main() {
m := am.New(nil, ss.States, nil)
ctx := context.Background()
m := am.New(ctx, ss.States, nil)
err := m.VerifyStates(ss.Names)
if err != nil {
print(err)
Expand Down Expand Up @@ -549,7 +550,6 @@ func Msg(msgTx *Msg) {
// enqueue
queue = append(queue, msgTx)
}

```
### Switch a state group
Expand Down
4 changes: 2 additions & 2 deletions examples/nfa/nfa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ func (t *Regexp) input1(mach *am.Machine) {

// example

func TestRegexp(t *testing.T) {
func TestNFA(t *testing.T) {
var err error
mach := am.New(context.Background(), states, &am.Opts{
ID: "regexp",
ID: "nfa",
DontPanicToException: true,
DontLogID: true,
LogLevel: am.LogChanges,
Expand Down
19 changes: 6 additions & 13 deletions pkg/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
)

type History struct {
*am.NoOpTracer

Entries []Entry
// LastActivated is a map of state names to the last time they were activated
LastActivated map[string]time.Time
Expand All @@ -22,27 +24,15 @@ type History struct {
maxEntries int
}

func (h *History) TransitionInit(transition *am.Transition) {}
func (h *History) HandlerStart(transition *am.Transition, emitter string,
handler string) {
}

func (h *History) HandlerEnd(transition *am.Transition, emitter string,
handler string) {
}
func (h *History) End() {}
func (h *History) MachineInit(mach *am.Machine) {}
func (h *History) NewSubmachine(parent, mach *am.Machine) {}
func (h *History) Inheritable() bool {
return false
}
func (h *History) MachineDispose(machID string) {}
func (h *History) QueueEnd(mach *am.Machine) {}

func (h *History) TransitionEnd(tx *am.Transition) {
if !tx.Accepted {
return
}

mut := tx.Mutation
match := false
for _, name := range h.States {
Expand All @@ -54,6 +44,7 @@ func (h *History) TransitionEnd(tx *am.Transition) {
if !match {
return
}

// rotate TODO optimize rotation
if len(h.Entries) >= h.maxEntries {
cutFrom := len(h.Entries) - h.maxEntries
Expand Down Expand Up @@ -169,12 +160,14 @@ func Track(mach *am.Machine, states am.S, maxEntries int) *History {
States: states,
maxEntries: maxEntries,
}

// mark active states as activated to reflect the current state
for _, name := range states {
if mach.Is1(name) {
history.LastActivated[name] = time.Now()
}
}
mach.Tracers = append(mach.Tracers, history)

return history
}
69 changes: 22 additions & 47 deletions pkg/machine/machine.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
// Package machine is a general purpose state machine for managing complex
// async workflows in a safe and structured way.
//
// It can be used as a lightweight in-memory Temporal [1] alternative, worker
// for Asynq [2], or to write simple consensus engines, stateful firewalls,
// telemetry, bots, etc.
//
// The project itself is a minimal implementation of AsyncMachine in Golang
// It's a dependency-free implementation of AsyncMachine in Golang
// using channels and context. It aims at simplicity and speed.
//
// [1]: https://github.com/temporalio/temporal
// [2]: https://github.com/hibiken/asynq
package machine // import "github.com/pancsta/asyncmachine-go/pkg/machine"

import (
Expand All @@ -29,7 +22,7 @@ import (
// Machine represent states, provides mutation methods, helpers methods and
// info about the current and scheduled transitions (if any).
type Machine struct {
// Unique ID of this machine. Default: random ID.
// Unique ID of this machine. Default: random ID. Read-only.
ID string
// Time for a handler to execute. Default: time.Second
HandlerTimeout time.Duration
Expand Down Expand Up @@ -400,7 +393,7 @@ func (m *Machine) disposeEmitter(emitter *emitter) {
// ctx: optional context defaults to the machine's context.
func (m *Machine) WhenErr(ctx context.Context) <-chan struct{} {
// handle with a shared channel with broadcast via close
return m.When([]string{"Exception"}, ctx)
return m.When([]string{Exception}, ctx)
}

// When returns a channel that will be closed when all the passed states
Expand Down Expand Up @@ -444,36 +437,7 @@ func (m *Machine) When(states S, ctx context.Context) <-chan struct{} {
}

// dispose with context
// TODO extract
if ctx != nil {
go func() {
select {
case <-ch:
return
case <-m.Ctx.Done():
return
case <-ctx.Done():
}

// GC only if needed
if m.Disposed {
return
}

m.activeStatesLock.Lock()
defer m.activeStatesLock.Unlock()

for _, s := range states {
if _, ok := m.indexWhen[s]; ok {
if len(m.indexWhen[s]) == 1 {
delete(m.indexWhen, s)
} else {
m.indexWhen[s] = slicesWithout(m.indexWhen[s], binding)
}
}
}
}()
}
diposeWithCtx(m, ctx, ch, states, binding, &m.activeStatesLock, m.indexWhen)

// insert the binding
for _, s := range states {
Expand Down Expand Up @@ -996,7 +960,7 @@ func (m *Machine) queueMutation(mutationType MutationType, states S, args A) {
statesParsed := m.MustParseStates(states)
// Detect duplicates and avoid queueing them.
if len(args) == 0 && m.detectQueueDuplicates(mutationType, statesParsed) {
m.log(LogOps, "[queue:skipped] Duplicate detected for [%s] '%s'",
m.log(LogOps, "[queue:skipped] Duplicate detected for [%s] %s",
mutationType, j(statesParsed))
return
}
Expand Down Expand Up @@ -1158,6 +1122,7 @@ func (m *Machine) BindHandlers(handlers any) error {
//
// It's not supported to nest OnEvent() calls, as it would cause a deadlock.
// Using OnEvent is recommended only in special cases, like test assertions.
// The Tracer API is a better way to event feeds.
func (m *Machine) OnEvent(events []string, ctx context.Context) chan *Event {
ch := make(chan *Event, 50)
if m.Disposed {
Expand Down Expand Up @@ -1855,7 +1820,11 @@ func (m *Machine) processEmitters(e *Event) (Result, bool) {
timeout: false,
}
m.handlerTimer.Reset(m.HandlerTimeout)
m.handlerStart <- handler
select {
case <-m.Ctx.Done():
break
case m.handlerStart <- handler:
}

// wait on the result / timeout / context
select {
Expand Down Expand Up @@ -1942,7 +1911,13 @@ func (m *Machine) handlerLoop() {
continue
}

m.handlerEnd <- ret
select {
case <-m.Ctx.Done():
// dispose with context
m.Dispose()
return
case m.handlerEnd <- ret:
}
}
}
}
Expand Down Expand Up @@ -1994,9 +1969,9 @@ func (m *Machine) processWhenArgs(e *Event) {
}
}

// detectQueueDuplicates checks for duplicated mutations without params.
// 1. Check if a mutation is scheduled
// 2. Check if a counter mutation isn't scheduled later
// detectQueueDuplicates checks for duplicated mutations
// 1. Check if a mutation is scheduled (without params)
// 2. Check if a counter mutation isn't scheduled later (any params)
func (m *Machine) detectQueueDuplicates(mutationType MutationType,
parsed S,
) bool {
Expand Down Expand Up @@ -2332,7 +2307,7 @@ func (m *Machine) CanRemove(states S) bool {
panic("CanRemove not implemented; github.com/pancsta/asyncmachine-go/pulls")
}

// Clocks returns the map of specified cloks or all clocks if states is nil.
// Clocks returns the map of specified clocks or all clocks if states is nil.
func (m *Machine) Clocks(states S) Clocks {
if states == nil {
return maps.Clone(m.clock)
Expand Down
Loading
Loading