Skip to content

pancsta/asyncmachine-go

Repository files navigation

asyncmachine-go

asyncmachine-go is a minimal implementation of AsyncMachine in Golang using channels and context. It aims at simplicity and speed.

It can be used as a lightweight in-memory Temporal alternative, worker for Asynq, or to write simple consensus engines, stateful firewalls, telemetry, bots, etc.

AsyncMachine is a relational state machine which never blocks.

package main

import (
    "context"
    "fmt"

    am "github.com/pancsta/asyncmachine-go/pkg/machine"
)

func main() {
    ctx := context.Background()
    m := am.New(ctx, am.States{
        "Foo": {
            Add: am.S{"Bar"}
        },
        "Bar": {},
    }, nil)
    m.Add(am.S{"Foo"}, nil)
    fmt.Printf("%s", m) // (Foo:1 Bar:1)
}

Examples

am.States{
    // CreateExpenseActivity
    "CreatingExpense": {
        Remove: am.S{"ExpenseCreated"},
    },
    "ExpenseCreated": {
        Remove: am.S{"CreatingExpense"},
    },
    // WaitForDecisionActivity
    "WaitingForApproval": {
        Auto:    true,
        Require: am.S{"ExpenseCreated"},
        Remove:  am.S{"ApprovalGranted"},
    },
    "ApprovalGranted": {
        Require: am.S{"ExpenseCreated"},
        Remove:  am.S{"WaitingForApproval"},
    },
    // PaymentActivity
    "PaymentInProgress": {
        Auto:    true,
        Require: am.S{"ApprovalGranted"},
        Remove:  am.S{"PaymentCompleted"},
    },
    "PaymentCompleted": {
        Require: am.S{"ExpenseCreated", "ApprovalGranted"},
        Remove:  am.S{"PaymentInProgress"},
    },
}
am.States{
    // DownloadFileActivity
    "DownloadingFile": {
        Remove: am.S{"FileDownloaded"},
    },
    "FileDownloaded": {
        Remove: am.S{"DownloadingFile"},
    },
    // ProcessFileActivity
    "ProcessingFile": {
        Auto:    true,
        Require: am.S{"FileDownloaded"},
        Remove:  am.S{"FileProcessed"},
    },
    "FileProcessed": {
        Remove: am.S{"ProcessingFile"},
    },
    // UploadFileActivity
    "UploadingFile": {
        Auto:    true,
        Require: am.S{"FileProcessed"},
        Remove:  am.S{"FileUploaded"},
    },
    "FileUploaded": {
        Remove: am.S{"UploadingFile"},
    },
}
func HandleFileProcessingTask(ctx context.Context, t *asynq.Task) error {
    var p FileProcessingPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
    }
    log.Printf("Processing file %s", p.Filename)
    // use the FileProcessing workflow ported from Temporal
    machine, err := processor.FileProcessingFlow(ctx, log.Printf, p.Filename)
    if err != nil {
        return err
    }
    // save the machine state as the result
    ret := machine.String()
    if _, err := t.ResultWriter().Write([]byte(ret)); err != nil {
        return fmt.Errorf("failed to write task result: %v", err)
    }
    return nil
}

Usage

  • Declarative
// randomly reject the mutation
func (h *Handlers) ProcessingFileEnter(e *am.Event) bool {
    rand.Seed(time.Now().UnixNano())
    randomBool := rand.Intn(2) == 0
    return randomBool
}
// cause side effects if accepted
func (h *Handlers) ProcessingFileState(e *am.Event) {
    mach := e.Machine
    stateCtx := mach.GetStateCtx("ProcessingFile")
    go func() {
        if stateCtx.Err() != nil {
            return
        }
        err := processFile(h.DownloadedName, stateCtx)
        if err != nil {
            mach.AddErr(err)
            return
        }
        mach.Add("FileProcessed", nil)
    }
}
  • Imperative
// change the state
mach.Add(am.S{"DownloadingFile"},
    am.A{"filename": filename})
// wait for completed
select {
case <-time.After(5 * time.Second):
    return mach, errors.New("timeout")
case <-mach.WhenErr(nil):
    return mach, mach.Err
case <-mach.When(am.S{"FileUploaded"}, nil):
}

Documentation

TUI Debugger

am-dbg is a simple, yet effective tool to debug your machines, including:

  • states with relations
  • time travel
  • transition steps
  • logs

TUI Debugger

Installing am-dbg

Download a release binary or use go install:

go install github.com/pancsta/asyncmachine-go/tools/am-dbg@latest

Using am-dbg

Set up telemetry:

import (
    "github.com/pancsta/asyncmachine-go/pkg/telemetry"
)
// ...
err := telemetry.MonitorTransitions(mach, telemetry.RpcHost)

Run am-dbg:

Usage:
  am-dbg [flags]

Flags:
      --am-dbg-url string   Debug this instance of am-dbg with another one
      --enable-mouse        Enable mouse support
  -h, --help                help for am-dbg
      --log-file string     Log file path (default "am-dbg.log")
      --log-level int       Log level, 0-5 (silent-everything)
      --log-machine-id      Include machine ID in log messages (default true)
      --server-url string   Host and port for the server to listen on (default "localhost:9823")

Changelog

Latest release: v0.3.1

See CHANELOG.md for details.

Status

Beta - although the ideas behind AsyncMachine have been proven to work, the golang implementation is fairly fresh and may suffer from bugs, memory leaks, race conditions and even panics. Please report bugs.

TODO

See issues.