From 3f3437bb29c515c0eed6a3dd869f5f59f603a84a Mon Sep 17 00:00:00 2001 From: Joshua Temple Date: Sun, 31 May 2026 13:08:09 -0400 Subject: [PATCH] feat: add a wire-codec endpoint for network transports Signed-off-by: Joshua Temple --- cluster/wire.go | 59 +++++++++++++++++++++++++++++ cluster/wire_test.go | 88 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+) create mode 100644 cluster/wire.go create mode 100644 cluster/wire_test.go diff --git a/cluster/wire.go b/cluster/wire.go new file mode 100644 index 0000000..c3fd91a --- /dev/null +++ b/cluster/wire.go @@ -0,0 +1,59 @@ +package cluster + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/stablekernel/crucible/state" +) + +// WireEndpoint is a node's receive side as a network transport sees it: it accepts +// an operation whose payload arrived as JSON over the wire and applies it to the +// node's local actor system, decoding the payload into the node's own concrete +// types. A *System satisfies it, so a network transport serving a node just holds +// that node's System behind this type-erased interface. +// +// It is the network counterpart to the in-process Endpoint: where InMemoryTransport +// passes Go values between same-process systems unchanged, a network transport +// serializes the event/input on the sending node and hands the bytes to the owning +// node's WireEndpoint, which decodes them into its event type E (or input map) and +// delivers locally. Decoding on the owning node is what lets the transport stay +// type-erased while the kernel keeps its concrete, typed events. +type WireEndpoint interface { + // DeliverWire decodes eventJSON into the node's event type and delivers it to + // the local actor named by ref, reporting whether the actor accepted it. + DeliverWire(ctx context.Context, ref state.ActorRef, eventJSON []byte) (bool, error) + // SpawnWire decodes inputJSON into the spawn input map and spawns an actor with + // the given id from src in the local system, returning a ref to it. + SpawnWire(ctx context.Context, src, id string, inputJSON []byte) (state.ActorRef, error) +} + +// DeliverWire decodes a JSON-encoded event into this system's event type E and +// delivers it to the local actor named by ref. It is the receive half a network +// transport calls on the actor's owning node; the sending node produced eventJSON +// with json.Marshal of the original event. An empty or null payload decodes to E's +// zero value. +func (s *System[S, E, C]) DeliverWire(ctx context.Context, ref state.ActorRef, eventJSON []byte) (bool, error) { + var event E + if len(eventJSON) > 0 { + if err := json.Unmarshal(eventJSON, &event); err != nil { + return false, fmt.Errorf("cluster: decode wire event for %q: %w", ref.ID, err) + } + } + return s.local.Deliver(ctx, ref, event), nil +} + +// SpawnWire decodes a JSON-encoded input map and spawns an actor with the given id +// from src in this node's local system, returning a ref stamped with this node. It +// is the receive half of a network transport's Spawn. An empty payload spawns with +// a nil input. +func (s *System[S, E, C]) SpawnWire(ctx context.Context, src, id string, inputJSON []byte) (state.ActorRef, error) { + var input map[string]any + if len(inputJSON) > 0 { + if err := json.Unmarshal(inputJSON, &input); err != nil { + return state.ActorRef{}, fmt.Errorf("cluster: decode wire input for %q: %w", id, err) + } + } + return s.SpawnLocal(ctx, src, id, input) +} diff --git a/cluster/wire_test.go b/cluster/wire_test.go new file mode 100644 index 0000000..66b9f4c --- /dev/null +++ b/cluster/wire_test.go @@ -0,0 +1,88 @@ +package cluster_test + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stablekernel/crucible/cluster" +) + +// TestWire_DeliverDecodesEvent confirms DeliverWire decodes a JSON event into the +// system's event type and delivers it — the receive half of a network transport. +func TestWire_DeliverDecodesEvent(t *testing.T) { + ctx := context.Background() + sys, ref := spawnedSystem(t, "node-a") + + // A sending node would marshal the event; here it is the string "finish". + eventJSON, err := json.Marshal("finish") + if err != nil { + t.Fatalf("marshal event: %v", err) + } + delivered, err := sys.DeliverWire(ctx, ref, eventJSON) + if err != nil { + t.Fatalf("DeliverWire: %v", err) + } + if !delivered { + t.Fatal("DeliverWire = false, want true") + } + if sys.Running() != 0 { + t.Fatalf("Running() after wire finish = %d, want 0", sys.Running()) + } +} + +// TestWire_SpawnDecodesInput confirms SpawnWire decodes a JSON input map and spawns +// a local actor stamped with the node. +func TestWire_SpawnDecodesInput(t *testing.T) { + ctx := context.Background() + sys := registeredSystem("node-b") + + inputJSON, err := json.Marshal(map[string]any{"k": "v"}) + if err != nil { + t.Fatalf("marshal input: %v", err) + } + ref, err := sys.SpawnWire(ctx, "child", "w-wire", inputJSON) + if err != nil { + t.Fatalf("SpawnWire: %v", err) + } + if ref.ID != "w-wire" || ref.Node != "node-b" { + t.Fatalf("ref = %+v, want ID=w-wire Node=node-b", ref) + } + if sys.Running() != 1 { + t.Fatalf("Running() = %d, want 1", sys.Running()) + } +} + +// TestWire_EmptyPayloads confirms empty/nil wire payloads decode to the zero event +// and a nil input rather than erroring. +func TestWire_EmptyPayloads(t *testing.T) { + ctx := context.Background() + sys := registeredSystem("node-a") + if _, err := sys.SpawnWire(ctx, "child", "w-empty", nil); err != nil { + t.Fatalf("SpawnWire(nil input): %v", err) + } + ref, _ := sys.Ref("w-empty") + // The empty event decodes to "" and is a no-op the actor ignores. + if _, err := sys.DeliverWire(ctx, ref, nil); err != nil { + t.Fatalf("DeliverWire(nil event): %v", err) + } +} + +// TestWire_BadPayloadErrors confirms malformed JSON surfaces a decode error rather +// than being silently dropped. +func TestWire_BadPayloadErrors(t *testing.T) { + ctx := context.Background() + sys, ref := spawnedSystem(t, "node-a") + if _, err := sys.DeliverWire(ctx, ref, []byte("{not json")); err == nil { + t.Fatal("DeliverWire(bad json) = nil error, want a decode error") + } + if _, err := sys.SpawnWire(ctx, "child", "x", []byte("{not json")); err == nil { + t.Fatal("SpawnWire(bad json) = nil error, want a decode error") + } +} + +// TestWire_SystemSatisfiesWireEndpoint is a compile-time check that *System is a +// WireEndpoint, so a network transport can hold it type-erased. +func TestWire_SystemSatisfiesWireEndpoint(t *testing.T) { + var _ cluster.WireEndpoint = cluster.NewSystem[string, string, *parentEnt]("n", nil) +}