Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions cluster/wire.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package cluster

import (
"context"
"encoding/json"
"fmt"

"github.com/stablekernel/crucible/state"
)

// WireEndpoint is a node's receive side as a network transport sees it: it accepts
// an operation whose payload arrived as JSON over the wire and applies it to the
// node's local actor system, decoding the payload into the node's own concrete
// types. A *System satisfies it, so a network transport serving a node just holds
// that node's System behind this type-erased interface.
//
// It is the network counterpart to the in-process Endpoint: where InMemoryTransport
// passes Go values between same-process systems unchanged, a network transport
// serializes the event/input on the sending node and hands the bytes to the owning
// node's WireEndpoint, which decodes them into its event type E (or input map) and
// delivers locally. Decoding on the owning node is what lets the transport stay
// type-erased while the kernel keeps its concrete, typed events.
type WireEndpoint interface {
// DeliverWire decodes eventJSON into the node's event type and delivers it to
// the local actor named by ref, reporting whether the actor accepted it.
DeliverWire(ctx context.Context, ref state.ActorRef, eventJSON []byte) (bool, error)
// SpawnWire decodes inputJSON into the spawn input map and spawns an actor with
// the given id from src in the local system, returning a ref to it.
SpawnWire(ctx context.Context, src, id string, inputJSON []byte) (state.ActorRef, error)
}

// DeliverWire decodes a JSON-encoded event into this system's event type E and
// delivers it to the local actor named by ref. It is the receive half a network
// transport calls on the actor's owning node; the sending node produced eventJSON
// with json.Marshal of the original event. An empty or null payload decodes to E's
// zero value.
func (s *System[S, E, C]) DeliverWire(ctx context.Context, ref state.ActorRef, eventJSON []byte) (bool, error) {
var event E
if len(eventJSON) > 0 {
if err := json.Unmarshal(eventJSON, &event); err != nil {
return false, fmt.Errorf("cluster: decode wire event for %q: %w", ref.ID, err)
}
}
return s.local.Deliver(ctx, ref, event), nil
}

// SpawnWire decodes a JSON-encoded input map and spawns an actor with the given id
// from src in this node's local system, returning a ref stamped with this node. It
// is the receive half of a network transport's Spawn. An empty payload spawns with
// a nil input.
func (s *System[S, E, C]) SpawnWire(ctx context.Context, src, id string, inputJSON []byte) (state.ActorRef, error) {
var input map[string]any
if len(inputJSON) > 0 {
if err := json.Unmarshal(inputJSON, &input); err != nil {
return state.ActorRef{}, fmt.Errorf("cluster: decode wire input for %q: %w", id, err)
}
}
return s.SpawnLocal(ctx, src, id, input)
}
88 changes: 88 additions & 0 deletions cluster/wire_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cluster_test

import (
"context"
"encoding/json"
"testing"

"github.com/stablekernel/crucible/cluster"
)

// TestWire_DeliverDecodesEvent confirms DeliverWire decodes a JSON event into the
// system's event type and delivers it — the receive half of a network transport.
func TestWire_DeliverDecodesEvent(t *testing.T) {
ctx := context.Background()
sys, ref := spawnedSystem(t, "node-a")

// A sending node would marshal the event; here it is the string "finish".
eventJSON, err := json.Marshal("finish")
if err != nil {
t.Fatalf("marshal event: %v", err)
}
delivered, err := sys.DeliverWire(ctx, ref, eventJSON)
if err != nil {
t.Fatalf("DeliverWire: %v", err)
}
if !delivered {
t.Fatal("DeliverWire = false, want true")
}
if sys.Running() != 0 {
t.Fatalf("Running() after wire finish = %d, want 0", sys.Running())
}
}

// TestWire_SpawnDecodesInput confirms SpawnWire decodes a JSON input map and spawns
// a local actor stamped with the node.
func TestWire_SpawnDecodesInput(t *testing.T) {
ctx := context.Background()
sys := registeredSystem("node-b")

inputJSON, err := json.Marshal(map[string]any{"k": "v"})
if err != nil {
t.Fatalf("marshal input: %v", err)
}
ref, err := sys.SpawnWire(ctx, "child", "w-wire", inputJSON)
if err != nil {
t.Fatalf("SpawnWire: %v", err)
}
if ref.ID != "w-wire" || ref.Node != "node-b" {
t.Fatalf("ref = %+v, want ID=w-wire Node=node-b", ref)
}
if sys.Running() != 1 {
t.Fatalf("Running() = %d, want 1", sys.Running())
}
}

// TestWire_EmptyPayloads confirms empty/nil wire payloads decode to the zero event
// and a nil input rather than erroring.
func TestWire_EmptyPayloads(t *testing.T) {
ctx := context.Background()
sys := registeredSystem("node-a")
if _, err := sys.SpawnWire(ctx, "child", "w-empty", nil); err != nil {
t.Fatalf("SpawnWire(nil input): %v", err)
}
ref, _ := sys.Ref("w-empty")
// The empty event decodes to "" and is a no-op the actor ignores.
if _, err := sys.DeliverWire(ctx, ref, nil); err != nil {
t.Fatalf("DeliverWire(nil event): %v", err)
}
}

// TestWire_BadPayloadErrors confirms malformed JSON surfaces a decode error rather
// than being silently dropped.
func TestWire_BadPayloadErrors(t *testing.T) {
ctx := context.Background()
sys, ref := spawnedSystem(t, "node-a")
if _, err := sys.DeliverWire(ctx, ref, []byte("{not json")); err == nil {
t.Fatal("DeliverWire(bad json) = nil error, want a decode error")
}
if _, err := sys.SpawnWire(ctx, "child", "x", []byte("{not json")); err == nil {
t.Fatal("SpawnWire(bad json) = nil error, want a decode error")
}
}

// TestWire_SystemSatisfiesWireEndpoint is a compile-time check that *System is a
// WireEndpoint, so a network transport can hold it type-erased.
func TestWire_SystemSatisfiesWireEndpoint(t *testing.T) {
var _ cluster.WireEndpoint = cluster.NewSystem[string, string, *parentEnt]("n", nil)
}
Loading