Skip to content

xcono/actor

Repository files navigation

actor

Minimal, lock-free single-threaded actor model for Go. Under development.

go get github.com/xcono/actor

Quick Start

func Counter(ctx context.Context, queue *sqlq.SQLQueue) error {
    var state int

    r := actor.Routes(
        actor.On("increment", Increment{}),
        actor.On("decrement", Decrement{}),
    )
    r.Use(actor.WithMutator(&state))

    for msg := range queue.Subscribe(ctx, 0) {
        r.Dispatch(ctx, msg)
    }
    return nil
}

type Increment struct{ Amount int `json:"amount"` }
type Decrement struct{ Amount int `json:"amount"` }

func (e *Increment) Mutation(s *int) error { *s += e.Amount; return nil }
func (e *Decrement) Mutation(s *int) error { *s -= e.Amount; return nil }

That's it. State is a local variable. No mutexes. No framework.

Why This Design?

func MyActor(ctx context.Context, sub Subscriber) error {
    var state MyState          // 1. State is local — cannot escape
    r := Routes(...)           // 2. Router is local — stays here
    
    for msg := range sub.Subscribe(ctx, 0) {  // 3. Single goroutine
        r.Dispatch(ctx, msg)                   //    processes messages
    }
    return nil
}

The single-threaded guarantee is structural, not conventional. You can't accidentally share state because it's a local variable.

Actor Modes

Subscribe-Only (Projection)

Read events and build a view. No output.

func UserCountProjection(ctx context.Context, events Subscriber, replay Sourcing) error {
    var count int
    cursor, _ := actor.Restore(ctx, &count, nil, replay)

    r := actor.Routes(actor.On("user.created", UserCreated{}))
    r.Use(actor.WithMutator(&count))

    for msg := range events.Subscribe(ctx, cursor) {
        r.Dispatch(ctx, msg)
    }
    return nil
}

type UserCreated struct{}
func (e *UserCreated) Mutation(count *int) error { *count++; return nil }

Publish-Only (Producer)

Generate events. No input subscription.

func Heartbeat(ctx context.Context, pub Publisher) error {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return nil
        case <-ticker.C:
            pub.Publish(ctx, actor.Message{Name: "heartbeat"})
        }
    }
}

Pub/Sub (Event Sourcing / CDC)

Subscribe to commands, publish events. Full CQRS.

func OrderAggregate(ctx context.Context, commands, events *sqlq.SQLQueue) error {
    var state Order
    cursor, _ := actor.Restore(ctx, &state, nil, events)

    r := actor.Routes(
        actor.On("command.place_order", PlaceOrder{}),
        actor.On("event.order_placed", OrderPlaced{}),
    )

    // Transform command → event
    r.Use(actor.Transform(func(ctx context.Context, cmd *PlaceOrder) ([]actor.Message, error) {
        return []actor.Message{
            {Name: "event.order_placed", Event: &OrderPlaced{OrderID: cmd.OrderID}},
        }, nil
    }))

    r.Use(actor.WithMutator(&state))
    r.Use(actor.WithSourcing(events))

    for msg := range commands.Subscribe(ctx, cursor) {
        r.Dispatch(ctx, msg)
    }
    return nil
}

Middleware

Middleware processes messages in order. Chain stops on error or empty result.

r := actor.Routes(
    actor.On("command.create", CreateCommand{}),
    actor.On("event.created", CreatedEvent{}),
)

// 1. Transform: command → event
r.Use(actor.Transform(func(ctx context.Context, cmd *CreateCommand) ([]actor.Message, error) {
    return []actor.Message{{Name: "event.created", Event: &CreatedEvent{}}}, nil
}))

// 2. Mutate state
r.Use(actor.WithMutator(&state))

// 3. Persist to event store
r.Use(actor.WithSourcing(eventStore))

// 4. Custom middleware
r.Use(func(ctx context.Context, msgs ...actor.Message) ([]actor.Message, error) {
    log.Printf("processed: %s", msgs[0].Name)
    return msgs, nil
})

Built-in Middleware

Middleware Purpose
WithMutator(&state) Apply mutations via StateMutator interface
WithSourcing(pub) Publish messages to a Publisher
Transform[T](fn) Convert message type T to other messages

Periodic Callbacks

Method Purpose
r.EveryN(n, fn) Call fn every N messages (no middleware overhead)
SnapshotFn(id, &state, store) Helper for snapshots

Event Sourcing

Restore state from snapshots and replay events:

func MyActor(ctx context.Context, sub Subscriber, pub Publisher, snap SnapshotReader, replay Sourcing) error {
    var state MyState

    // Restore from snapshot + replay events
    cursor, err := actor.Restore(ctx, &state, snap, replay)
    if err != nil {
        return err
    }

    r := actor.Routes(...)
    r.Use(actor.WithMutator(&state))
    r.Use(actor.WithSourcing(pub))

    // Continue from last processed message
    for msg := range sub.Subscribe(ctx, cursor) {
        r.Dispatch(ctx, msg)
    }
    return nil
}

Periodic Snapshots

Create snapshots every N messages for faster restores:

r.Use(actor.WithMutator(&state))
r.Use(actor.WithSourcing(events))
r.EveryN(100, actor.SnapshotFn("order-123", &state, snapshots))

EveryN runs directly in Dispatch — no middleware overhead:

// Log every 1000 messages
r.EveryN(1000, func(ctx context.Context, msg actor.Message) {
    slog.Info("checkpoint", "id", msg.ID)
})

Abstract Interfaces

Subscriber and Publisher are not just queues:

Interface Can Be
Subscriber SQL queue, WebSocket client, gRPC stream, file watcher, timer
Publisher SQL queue, HTTP API, WebSocket server, gRPC stream, file writer

Bridge different systems:

// Kafka → HTTP API
func Bridge(ctx context.Context, kafka Subscriber, api Publisher) error {
    for msg := range kafka.Subscribe(ctx, 0) {
        api.Publish(ctx, transform(msg))
    }
    return nil
}

Storage

SQL Queue

import "github.com/xcono/actor/store/sqlq"

db, _ := sql.Open("mysql", dsn)
queue := sqlq.NewSQLQueue(db, "events")
queue.Init(ctx, "mysql")

// Use as Subscriber, Publisher, and Sourcing
for msg := range queue.Subscribe(ctx, 0) { ... }
queue.Publish(ctx, msg)
for msg, err := range queue.Replay(ctx, 0) { ... }

In-Memory (Testing)

queue, _ := sqlq.NewSQLiteQueue(ctx, "test_events")
// Closes automatically when ctx is cancelled

Snapshots

import "github.com/xcono/actor/store"
import "github.com/xcono/actor/store/sqlq"

index := sqlq.NewSQLSnapshotStore(db, "snapshots", "my-actor")
kv := store.NewFileKV("/var/snapshots")
snapshots := store.NewSnapshotStore(index, kv)
snapshots.Init(ctx, "mysql")

Lifecycle

This package does not manage lifecycle. Your infrastructure does:

Environment Lifecycle Manager
Docker --restart=always
Kubernetes Deployment with restart policy
systemd Restart=always
Nomad restart stanza

The actor runs until context cancellation or error. That's it.

Design Principles

  1. State is a local variable — cannot escape the function
  2. Router is a local variable — stays within the function
  3. Processing loop is explicit — single-threaded nature is visible
  4. No mutexes required — structure guarantees single-threaded access
  5. Infrastructure handles lifecycle — Docker/k8s manages restarts

This is not the traditional actor model (Erlang, Akka). It's closer to Cloudflare Durable Objects: a single-threaded execution context with durable state.

License

MIT

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages