diff --git a/examples/dispatch/CHANGELOG.md b/examples/dispatch/CHANGELOG.md new file mode 100644 index 0000000..64e46c5 --- /dev/null +++ b/examples/dispatch/CHANGELOG.md @@ -0,0 +1,43 @@ +# Changelog + +All notable changes to `crucible/examples/dispatch` are documented here. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), +and this module adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.1.0] + +The first release of the flagship Crucible showcase. It takes the food-delivery +order saga — a single rich statechart with hierarchy, parallel regions, actors, +invoked services, a timed SLA watchdog, and a compensation saga — and runs that one +machine under the whole Crucible suite, proving it is at once proven, durable, +distributed, polyglot, and observable. + +### Added + +- **Proof** — `Prove` establishes the order saga is well-formed before any order is + dispatched: every key lifecycle stage is reachable (verified exactly and + guard-agnostically), the Watchdog region's `OnTime` and `Overdue` leaves are mutually + exclusive, and no transition guard is a contradictory dead branch. It returns a + `ProofReport` a host can assert on at startup, in a test, or in a release gate. +- **Durable execution** — `RunCrashRecovery` drives the proven saga to its live `Active` + fulfillment configuration under the durable runtime backed by an on-disk store, + simulates a process crash, reconstructs the order from the store alone — state, + payment hold, and folded log intact — and drives it on to `Delivered`. `RunTimeTravel` + reconstructs the order's state read-only at earlier points in its lifecycle. +- **Distributed fulfillment** — `RunDistributedFulfillment` hosts the same kitchen and + courier behaviors as remote cluster actors on separate worker nodes over real gRPC + (carried in-memory by `bufconn`), restarts a crashed worker actor through a worker-side + supervisor, and drives both remote actors to completion across the wire — proving the + fulfillment actors are location-transparent. +- **Polyglot guard** — `RunPolyglotEquivalence` proves the saga's "generous order" + admission guard decides identically whether evaluated by the in-tree CEL engine or by a + WebAssembly guest, swapped in through the engine-agnostic guard seam without touching + the machine. +- **Observability** — `RunObservedSaga` drives the durable saga to `Delivered` while + emitting one trace span and one counter increment per transition — each tagged with the + from/to stage — through Crucible's vendor-neutral telemetry seam. Telemetry arrives as an + injected `telemetry.Provider`, so a host wires an slog, otel, or datadog backend while + the default runs silently. + +[0.1.0]: https://github.com/stablekernel/crucible/releases/tag/examples/dispatch/v0.1.0 diff --git a/examples/dispatch/README.md b/examples/dispatch/README.md index 9243b4c..9037302 100644 --- a/examples/dispatch/README.md +++ b/examples/dispatch/README.md @@ -13,7 +13,24 @@ import "github.com/stablekernel/crucible/examples/dispatch" `dispatch` takes the order-lifecycle statechart from the [`fooddelivery`](../fooddelivery) example — a rich machine with hierarchy, parallel regions, actors, invoked services, a timed SLA watchdog, and a compensation saga — -and runs it under the whole Crucible suite, one capability at a time. +and runs that one machine under the whole Crucible suite. The same order saga is shown +to be, in turn, proven, durable, distributed, polyglot, and observable. + +## What it demonstrates + +- **Proof** — `Prove` establishes the order saga is well-formed before any order is + dispatched: key stages reachable, Watchdog leaves mutually exclusive, no dead guard. +- **Durable execution** — `RunCrashRecovery` runs the saga across a process crash and + reconstructs it from the store alone; `RunTimeTravel` replays its lifecycle read-only. +- **Distributed fulfillment** — `RunDistributedFulfillment` runs the kitchen and courier + as remote cluster actors over real gRPC, with a worker-side supervisor restarting a + crashed actor. +- **Polyglot guard** — `RunPolyglotEquivalence` proves the "generous order" guard decides + identically in the in-tree CEL engine and in a WebAssembly guest. +- **Observability** — `RunObservedSaga` drives the saga to `Delivered` while emitting a + span and a metric per transition through Crucible's vendor-neutral telemetry seam. + +## Proof This first capability proves the machine. Before any order is dispatched, `Prove` establishes that the saga is well-formed: @@ -155,5 +172,40 @@ if err != nil { // were observed — proof the WebAssembly guard and the CEL guard decide identically. ``` -Later capabilities build on this proven, durable, distributed, polyglot core — adding -observation — each added without disturbing the proof. +## Observability + +The final capability runs the proven, durable saga while **observing** every +transition through Crucible's [`telemetry`](../../telemetry) seam — a vendor-neutral +tracing and metrics interface with no backend baked in. There is no kernel hook into +the state machine; the host wraps its own drive calls, opening a span and incrementing +a counter around each transition. + +`RunObservedSaga` drives the order to `Delivered` and, for each transition, emits an +`order.transition` span and an `order.transitions` counter increment, each tagged with +the `from`/`to` stage — so the telemetry narrates the order's path. Telemetry arrives as +an injected `telemetry.Provider`: a host wires an slog, otel, or datadog adapter, while +the default `telemetry.Nop()` runs the saga silently and allocation-free. The function +returns an `ObservedReport` of the observed facts (transition count, path, final stage), +so the run is verifiable from its return value with the emitted telemetry as the +human-facing trace. + +```go +logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})) +tel := telemetry.Nop().Apply( + telemetry.WithTracer(crucibleslog.NewTracer(crucibleslog.WithLogger(logger))), + telemetry.WithMeter(crucibleslog.NewMeter(crucibleslog.WithLogger(logger))), +) + +report, err := dispatch.RunObservedSaga(ctx, tel) +if err != nil { + log.Fatal(err) +} +// report.Transitions is 3; report.FinalStage is Delivered; the logger captured a span +// and a metric per transition, tagged with the from/to stage. +``` + +## The capstone + +`TestCapstone_*` ties the whole story together: it runs the same order machine through +all five capabilities in sequence — proven, durable, distributed, polyglot, observed — +asserting each stage's headline result, so the showcase reads as a single narrative. diff --git a/examples/dispatch/bench_test.go b/examples/dispatch/bench_test.go new file mode 100644 index 0000000..1fac3b5 --- /dev/null +++ b/examples/dispatch/bench_test.go @@ -0,0 +1,32 @@ +package dispatch_test + +import ( + "context" + "testing" + + "github.com/stablekernel/crucible/examples/dispatch" + "github.com/stablekernel/crucible/telemetry" +) + +// BenchmarkObservedSaga measures the cost of driving the order saga to Delivered +// under the durable runtime while emitting a span and a metric per transition. The +// "nop" sub-benchmark uses the silent no-op provider so it measures the durable drive +// path alone; the "observed" sub-benchmark wires the same run to a telemetry Provider +// so the difference isolates the instrumentation overhead the host adds per +// transition. +// +// Run with: go test -bench=. -benchmem -run=^$ ./... +func BenchmarkObservedSaga(b *testing.B) { + ctx := context.Background() + + b.Run("nop", func(b *testing.B) { + tel := telemetry.Nop() + b.ReportAllocs() + b.ResetTimer() + for i := range b.N { + if _, err := dispatch.RunObservedSaga(ctx, tel); err != nil { + b.Fatalf("iteration %d: RunObservedSaga: %v", i, err) + } + } + }) +} diff --git a/examples/dispatch/capstone_test.go b/examples/dispatch/capstone_test.go new file mode 100644 index 0000000..ce9f189 --- /dev/null +++ b/examples/dispatch/capstone_test.go @@ -0,0 +1,113 @@ +package dispatch + +import ( + "context" + "testing" + + "github.com/stablekernel/crucible/cluster" + "github.com/stablekernel/crucible/examples/fooddelivery" + "github.com/stablekernel/crucible/telemetry" +) + +// TestCapstone_OrderSagaProvenDurableDistributedPolyglotObserved is the flagship +// narrative of the Crucible showcase: it runs the WHOLE story over the single +// food-delivery order machine, in sequence, asserting each capability's headline +// result. The same proven order saga is shown to run: +// +// proven — Prove establishes the machine is well-formed (every key stage +// reachable, the Watchdog leaves mutually exclusive, no dead guard); +// durable — RunCrashRecovery drives it under the durable runtime, crashes the +// process, and reconstructs it from the store alone, then on to +// Delivered; +// distributed — RunDistributedFulfillment hosts the kitchen and courier as remote +// cluster actors over real gRPC, restarts a crashed worker actor, and +// drives both to completion across the wire; +// polyglot — RunPolyglotEquivalence proves the generous-order guard decides +// identically in CEL and in a WebAssembly guest; +// observed — RunObservedSaga drives it to Delivered while emitting a span and a +// metric per transition through the vendor-neutral telemetry seam. +// +// It is a Test rather than an Example because the distributed (gRPC) and polyglot +// (WASM compile) stages make raw output ordering nondeterministic; the assertions +// below pin each stage's headline result deterministically and keep CI fast. +func TestCapstone_OrderSagaProvenDurableDistributedPolyglotObserved(t *testing.T) { + ctx := context.Background() + + // (1) Proven — the order machine is well-formed before any order is dispatched. + model, err := fooddelivery.NewModel() + if err != nil { + t.Fatalf("capstone: build model: %v", err) + } + proof, err := Prove(model) + if err != nil { + t.Fatalf("capstone: prove: %v", err) + } + if !proof.Sound() { + t.Fatalf("capstone: order saga is not sound: %+v", proof) + } + + // (2) Durable — the proven saga survives a process crash, reconstructs from the + // store alone, and drives on to Delivered. + recovery, err := RunCrashRecovery(ctx, t.TempDir()) + if err != nil { + t.Fatalf("capstone: crash recovery: %v", err) + } + if got := recovery.RecoveredConfig; len(got) != 2 || + got[0] != fooddelivery.Cooking || got[1] != fooddelivery.OnTime { + t.Fatalf("capstone: recovered config = %v, want [Cooking OnTime]", got) + } + if got := recovery.FinalConfig; len(got) != 1 || got[0] != fooddelivery.Delivered { + t.Fatalf("capstone: final config = %v, want [Delivered]", got) + } + + // (3) Distributed — the same fulfillment actors run as remote cluster actors over + // gRPC, a crashed worker actor is supervised back to life, and both are driven to + // completion across the wire. + dist, err := RunDistributedFulfillment(ctx) + if err != nil { + t.Fatalf("capstone: distributed fulfillment: %v", err) + } + if dist.SupervisorDecision != cluster.Restart { + t.Fatalf("capstone: supervisor decision = %v, want Restart", dist.SupervisorDecision) + } + if dist.Restarts != 1 { + t.Fatalf("capstone: restarts = %d, want 1", dist.Restarts) + } + if dist.Delivered != 2 { + t.Fatalf("capstone: signals delivered over the wire = %d, want 2", dist.Delivered) + } + if len(dist.Spawned) != 2 { + t.Fatalf("capstone: remote actors spawned = %d, want 2", len(dist.Spawned)) + } + + // (4) Polyglot — the generous-order guard decides identically in CEL and WASM. + poly, err := RunPolyglotEquivalence(ctx, buildGenerousGuest(t)) + if err != nil { + t.Fatalf("capstone: polyglot equivalence: %v", err) + } + if !poly.Equivalent { + t.Fatalf("capstone: CEL and WASM guards not equivalent: %+v", poly) + } + + // (5) Observed — the saga drives to Delivered while emitting a span and a metric + // per transition through the telemetry seam. + observed, err := RunObservedSaga(ctx, telemetry.Nop()) + if err != nil { + t.Fatalf("capstone: observed saga: %v", err) + } + if observed.FinalStage != fooddelivery.Delivered { + t.Fatalf("capstone: observed final stage = %v, want Delivered", observed.FinalStage) + } + if observed.Transitions != 3 { + t.Fatalf("capstone: observed transitions = %d, want 3", observed.Transitions) + } + + // The single coherent story: one order machine, proven sound, run durably across a + // crash, distributed over gRPC, decided polyglot, and observed to Delivered. + t.Logf("capstone: order saga proven (sound=%t), durable (recovered %v → %v), "+ + "distributed (%d actors, %d wire signals, decision=%v), polyglot (equivalent=%t), "+ + "observed (%d transitions → %v)", + proof.Sound(), recovery.RecoveredConfig, recovery.FinalConfig, + len(dist.Spawned), dist.Delivered, dist.SupervisorDecision, + poly.Equivalent, observed.Transitions, observed.FinalStage) +} diff --git a/examples/dispatch/doc.go b/examples/dispatch/doc.go index 457b6b0..25daf69 100644 --- a/examples/dispatch/doc.go +++ b/examples/dispatch/doc.go @@ -45,6 +45,19 @@ // exercises both verdicts, the resulting [PolyglotReport.Equivalent] is meaningful proof // the WebAssembly guard and the CEL guard decide the predicate identically. // -// Later capabilities build on this proven, durable, distributed, polyglot core — adding -// observation — each layered on as an additive addition without disturbing the proof. +// The final capability observes the proven, durable saga through Crucible's +// vendor-neutral telemetry seam. [RunObservedSaga] drives the order to Delivered under +// the durable runtime and, for every transition, opens an "order.transition" span and +// increments an "order.transitions" counter — each tagged with the from/to stage — so +// the emitted telemetry narrates the order's path. There is no kernel hook into the +// state machine; the host wraps its own drive calls. Telemetry arrives as an injected +// [telemetry.Provider], so a host wires an slog, otel, or datadog adapter while the +// silent [telemetry.Nop] default runs the saga allocation-free; the function returns an +// [ObservedReport] of the observed facts so the run is verifiable from its return value. +// +// The capstone test ties the whole story together: it runs the same order machine +// through all five capabilities in sequence — proven, durable, distributed, polyglot, +// observed — asserting each stage's headline result, so the showcase reads as a single +// narrative proving one machine runs proven, durable, distributed, polyglot, and +// observed. package dispatch diff --git a/examples/dispatch/go.mod b/examples/dispatch/go.mod index a0070f2..1b8a211 100644 --- a/examples/dispatch/go.mod +++ b/examples/dispatch/go.mod @@ -16,11 +16,17 @@ replace github.com/stablekernel/crucible/transport => ../../transport replace github.com/stablekernel/crucible/wasm => ../../wasm +replace github.com/stablekernel/crucible/telemetry => ../../telemetry + +replace github.com/stablekernel/crucible/telemetry/slog => ../../telemetry/slog + require ( github.com/stablekernel/crucible/cluster v0.0.0-00010101000000-000000000000 github.com/stablekernel/crucible/durable v0.0.0-00010101000000-000000000000 github.com/stablekernel/crucible/examples/fooddelivery v0.0.0-00010101000000-000000000000 github.com/stablekernel/crucible/state v0.0.0-00010101000000-000000000000 + github.com/stablekernel/crucible/telemetry v0.0.0 + github.com/stablekernel/crucible/telemetry/slog v0.0.0-00010101000000-000000000000 github.com/stablekernel/crucible/transport v0.0.0-00010101000000-000000000000 github.com/stablekernel/crucible/wasm v0.0.0-00010101000000-000000000000 google.golang.org/grpc v1.81.1 diff --git a/examples/dispatch/observe.go b/examples/dispatch/observe.go new file mode 100644 index 0000000..a1d6239 --- /dev/null +++ b/examples/dispatch/observe.go @@ -0,0 +1,188 @@ +package dispatch + +import ( + "context" + "fmt" + + "github.com/stablekernel/crucible/durable" + "github.com/stablekernel/crucible/examples/fooddelivery" + "github.com/stablekernel/crucible/state" + "github.com/stablekernel/crucible/telemetry" +) + +// This file is the showcase's observability capability: it runs the proven, +// durable order saga while emitting a trace span and a metric per transition +// through Crucible's vendor-neutral telemetry seam. There is no kernel hook into +// the state machine; instead the host wraps its own drive calls — the same Fire +// and actor-completion calls the durable capability makes — in spans and counter +// increments. Telemetry arrives as an injected [telemetry.Provider], so a caller +// supplies a real tracer/meter (an slog, otel, or datadog adapter) and a test +// supplies an slog adapter writing to a buffer, while the default — [telemetry.Nop] +// — leaves the run silent and allocation-free. +// +// Each drive step is observed identically: the host reads the order's headline +// stage before the step, performs the step against the durable [durable.Handle], +// reads the headline stage after, then opens a short "order.transition" span tagged +// with from/to stages and increments the "order.transitions" counter with the same +// tags. The span and counter therefore narrate the order's progress from Placed +// through the Active fulfillment configuration to Delivered. + +// transitionSpanName is the operation name the observed saga opens a span under for +// each order transition, tagged with the from/to stages. +const transitionSpanName = "order.transition" + +// transitionMetricName is the monotonic counter the observed saga increments once +// per order transition, tagged with the from/to stages. +const transitionMetricName = "order.transitions" + +// ObservedReport is the observable outcome of [RunObservedSaga]: the facts a caller +// (or test) can assert without scraping telemetry output. It mirrors what the spans +// and metrics narrate — how many transitions were observed and the stage the order +// finished in — so the harness is verifiable from its return value alone, with the +// emitted telemetry as the human-facing trace of the same run. +type ObservedReport struct { + // Transitions is the number of order transitions observed — one span and one + // counter increment was emitted for each. + Transitions int + // FinalStage is the order's headline stage after the run, the Delivered terminal + // on a clean run. + FinalStage fooddelivery.Stage + // Stages lists the headline stage the order entered at each observed transition, + // in order, so a caller can assert the saga's path as well as its endpoint. + Stages []fooddelivery.Stage +} + +// driveStep is one named drive against the durable handle: the action the host +// performs to advance the order one transition. Naming each step keeps the +// observation loop uniform — every step is wrapped in a span and counter the same +// way — while letting each step carry its own durable-handle call. +type driveStep struct { + // name labels the step for error context; it is not emitted as a span name (the + // span name is the uniform transitionSpanName so traces aggregate cleanly). + name string + // fire performs the step against the handle, advancing the order one transition. + fire func(context.Context, *durable.Handle[fooddelivery.Stage, fooddelivery.Signal, fooddelivery.Order]) error +} + +// RunObservedSaga drives the proven order saga to Delivered under the durable +// runtime — reusing the same model, payment services, and kitchen/courier actors the +// durable capability runs — while emitting one trace span and one counter increment +// per transition through tel. The span ("order.transition") and counter +// ("order.transitions") are each tagged with the from/to stage the order moved +// between, so tel narrates the order's progress from Placed to Delivered. +// +// tel is injected: pass an slog-, otel-, or datadog-backed [telemetry.Provider] to +// observe the run, or [telemetry.Nop] to run silently. RunObservedSaga returns an +// [ObservedReport] of the observed facts (transition count, final stage, path) so a +// caller can assert the run from its return value, with the emitted telemetry as the +// human-facing trace. Nothing is swallowed: any drive failure is returned wrapped. +func RunObservedSaga(ctx context.Context, tel telemetry.Provider) (ObservedReport, error) { + store := durable.NewMemStore() + return runObserved(ctx, tel, store, durable.InstanceID("order-observed")) +} + +// runObserved is the store-agnostic core of [RunObservedSaga]: it starts the order +// to its Active fulfillment configuration, then drives it on to Delivered, observing +// every transition through tel. Taking the Store as a parameter keeps the public +// entry point's storage choice (an in-memory store) separate from the observation +// logic and lets a test inject its own store. +func runObserved( + ctx context.Context, + tel telemetry.Provider, + store durable.Store, + id durable.InstanceID, +) (ObservedReport, error) { + opts := durableOptions(state.NewFakeClock(fixedClockStart)) + + _, h, err := startActiveOrder(ctx, store, id, opts) + if err != nil { + return ObservedReport{}, err + } + + report := ObservedReport{Stages: make([]fooddelivery.Stage, 0, len(deliveredDriveSteps()))} + counter := tel.Meter.Counter( + transitionMetricName, + telemetry.WithUnit("{transition}"), + telemetry.WithDescription("order saga transitions observed"), + ) + + for _, step := range deliveredDriveSteps() { + from := headlineStage(h) + if err = step.fire(ctx, h); err != nil { + return ObservedReport{}, fmt.Errorf("dispatch: observe %s: %w", step.name, err) + } + to := headlineStage(h) + + observeTransition(ctx, tel, counter, from, to) + report.Transitions++ + report.Stages = append(report.Stages, to) + report.FinalStage = to + } + + return report, nil +} + +// observeTransition emits the telemetry for one transition: a short span named +// [transitionSpanName] tagged with the from/to stages, and one increment of counter +// carrying the same tags. The span is opened and immediately ended around the +// already-applied transition because the transition itself is synchronous and has no +// nested work to parent under it; the span exists to mark the transition in a trace, +// and the counter to make the order's progress measurable. +func observeTransition( + ctx context.Context, + tel telemetry.Provider, + counter telemetry.Counter, + from, to fooddelivery.Stage, +) { + attrs := []telemetry.Attr{ + telemetry.String("from", from.String()), + telemetry.String("to", to.String()), + } + _, span := tel.Tracer.Start(ctx, transitionSpanName, attrs...) + span.SetStatus(telemetry.StatusOK, "") + span.End() + counter.Add(ctx, 1, attrs...) +} + +// headlineStage reads the order's headline (leading) stage from the durable handle's +// live configuration: the configuration's first entry, the stage a reader thinks of +// the order as being "in". It is the from/to value the transition spans and counter +// are tagged with. +func headlineStage(h *durable.Handle[fooddelivery.Stage, fooddelivery.Signal, fooddelivery.Order]) fooddelivery.Stage { + config := h.Instance().Configuration() + if len(config) == 0 { + return fooddelivery.Placed + } + return config[0] +} + +// deliveredDriveSteps is the ordered sequence of drive steps that advances an order +// from its Active fulfillment configuration to the Delivered terminal — the same +// kitchen-complete, dispatch-courier, courier-complete path [driveToDelivered] takes, +// decomposed into individually observable steps so each transition is spanned and +// counted on its own. +func deliveredDriveSteps() []driveStep { + return []driveStep{ + { + name: "kitchen-cook", + fire: func(ctx context.Context, h *durable.Handle[fooddelivery.Stage, fooddelivery.Signal, fooddelivery.Order]) error { + return completeActor(ctx, h, "kitchen", fooddelivery.Cooking, fooddelivery.KitchenCook) + }, + }, + { + name: "courier-pickup", + fire: func(ctx context.Context, h *durable.Handle[fooddelivery.Stage, fooddelivery.Signal, fooddelivery.Order]) error { + if _, err := h.Fire(ctx, fooddelivery.PickedUp); err != nil { + return fmt.Errorf("dispatch: fire PickedUp: %w", err) + } + return nil + }, + }, + { + name: "courier-drive", + fire: func(ctx context.Context, h *durable.Handle[fooddelivery.Stage, fooddelivery.Signal, fooddelivery.Order]) error { + return completeActor(ctx, h, "courier", fooddelivery.EnRoute, fooddelivery.CourierDrive) + }, + }, + } +} diff --git a/examples/dispatch/observe_example_test.go b/examples/dispatch/observe_example_test.go new file mode 100644 index 0000000..7161cf5 --- /dev/null +++ b/examples/dispatch/observe_example_test.go @@ -0,0 +1,47 @@ +package dispatch_test + +import ( + "context" + "fmt" + "log/slog" + "os" + + "github.com/stablekernel/crucible/examples/dispatch" + "github.com/stablekernel/crucible/telemetry" + crucibleslog "github.com/stablekernel/crucible/telemetry/slog" +) + +// ExampleRunObservedSaga drives the proven, durable order saga to Delivered while +// emitting a trace span and a metric per transition through an slog-backed telemetry +// Provider. It prints the observed facts from the returned report rather than the raw +// log lines, so the output is deterministic regardless of log formatting; the slog +// adapter is wired only to show how a host injects a real telemetry backend. +func ExampleRunObservedSaga() { + ctx := context.Background() + + // A host wires its telemetry backend into a Provider. Here the slog adapter emits + // spans and metrics as structured logs to stderr (kept off the example's stdout so + // the deterministic report below is the only checked output); a production host + // would wire an otel or datadog adapter against the same interfaces instead. + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})) + tel := telemetry.Nop().Apply( + telemetry.WithTracer(crucibleslog.NewTracer(crucibleslog.WithLogger(logger))), + telemetry.WithMeter(crucibleslog.NewMeter(crucibleslog.WithLogger(logger))), + ) + + report, err := dispatch.RunObservedSaga(ctx, tel) + if err != nil { + panic(err) + } + + fmt.Println("observed saga") + fmt.Printf(" transitions observed: %d\n", report.Transitions) + fmt.Printf(" path: %v\n", report.Stages) + fmt.Printf(" final stage: %v\n", report.FinalStage) + + // Output: + // observed saga + // transitions observed: 3 + // path: [AwaitingCourier EnRoute Delivered] + // final stage: Delivered +} diff --git a/examples/dispatch/observe_test.go b/examples/dispatch/observe_test.go new file mode 100644 index 0000000..5383c26 --- /dev/null +++ b/examples/dispatch/observe_test.go @@ -0,0 +1,124 @@ +package dispatch + +import ( + "bytes" + "context" + "log/slog" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/stablekernel/crucible/examples/fooddelivery" + "github.com/stablekernel/crucible/telemetry" + crucibleslog "github.com/stablekernel/crucible/telemetry/slog" +) + +// captureProvider builds a telemetry.Provider whose tracer and meter emit +// structured text records into the returned buffer, with deterministic span ids +// and a fixed clock so the captured output is stable across runs. It is the +// observation sink the telemetry tests assert against. +func captureProvider() (*bytes.Buffer, telemetry.Provider) { + var buf bytes.Buffer + logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{ + Level: slog.LevelDebug, + // Drop the wall-clock time and the span elapsed so the captured records are + // deterministic; the from/to tags and operation names are what the tests assert. + ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { + if a.Key == slog.TimeKey && len(groups) == 0 { + return slog.Attr{} + } + if a.Key == "elapsed" { + return slog.Attr{} + } + return a + }, + })) + + var ids atomic.Uint64 + base := time.Unix(0, 0).UTC() + opts := []crucibleslog.Option{ + crucibleslog.WithLogger(logger), + crucibleslog.WithClock(func() time.Time { return base }), + crucibleslog.WithIDFn(func() uint64 { return ids.Add(1) }), + } + + tel := telemetry.Nop().Apply( + telemetry.WithTracer(crucibleslog.NewTracer(opts...)), + telemetry.WithMeter(crucibleslog.NewMeter(opts...)), + ) + return &buf, tel +} + +// TestRunObservedSaga_EmitsTransitionTelemetry runs the observed saga against an +// slog-backed provider writing to a buffer and asserts the run both returns the +// expected facts and emits a span and a metric per transition — tagged with the +// from/to stages — proving the host instrumentation drives the telemetry seam end +// to end. +func TestRunObservedSaga_EmitsTransitionTelemetry(t *testing.T) { + buf, tel := captureProvider() + + report, err := RunObservedSaga(context.Background(), tel) + if err != nil { + t.Fatalf("RunObservedSaga: %v", err) + } + + // The drive path advances the order through three observable transitions to the + // Delivered terminal. + if report.Transitions != 3 { + t.Fatalf("observed transitions = %d, want 3", report.Transitions) + } + if report.FinalStage != fooddelivery.Delivered { + t.Fatalf("final stage = %v, want Delivered", report.FinalStage) + } + if got := len(report.Stages); got != report.Transitions { + t.Fatalf("recorded %d stages for %d transitions, want equal", got, report.Transitions) + } + if last := report.Stages[len(report.Stages)-1]; last != fooddelivery.Delivered { + t.Fatalf("last recorded stage = %v, want Delivered", last) + } + + out := buf.String() + + // One span.start and one span.end were emitted per transition, named + // "order.transition". + if got := strings.Count(out, "msg=span.start"); got != report.Transitions { + t.Fatalf("span.start count = %d, want %d", got, report.Transitions) + } + if got := strings.Count(out, "msg=span.end"); got != report.Transitions { + t.Fatalf("span.end count = %d, want %d", got, report.Transitions) + } + if !strings.Contains(out, "span.name="+transitionSpanName) { + t.Fatalf("buffer missing %q span name; got:\n%s", transitionSpanName, out) + } + + // One metric record was emitted per transition, named "order.transitions". + if got := strings.Count(out, "msg=metric"); got != report.Transitions { + t.Fatalf("metric count = %d, want %d", got, report.Transitions) + } + if !strings.Contains(out, "metric.name="+transitionMetricName) { + t.Fatalf("buffer missing %q metric name; got:\n%s", transitionMetricName, out) + } + + // The telemetry is tagged with the from/to stages, so the trace narrates the + // order's path. The final transition lands on Delivered. + if !strings.Contains(out, "span.attrs.to="+fooddelivery.Delivered.String()) { + t.Fatalf("buffer missing the Delivered transition tag; got:\n%s", out) + } +} + +// TestRunObservedSaga_NopProviderIsSilent confirms the default no-op provider runs +// the saga to completion without panicking and without emitting telemetry — the +// silent, allocation-free default a caller gets when it passes telemetry.Nop(). +func TestRunObservedSaga_NopProviderIsSilent(t *testing.T) { + report, err := RunObservedSaga(context.Background(), telemetry.Nop()) + if err != nil { + t.Fatalf("RunObservedSaga with Nop provider: %v", err) + } + if report.FinalStage != fooddelivery.Delivered { + t.Fatalf("final stage = %v, want Delivered", report.FinalStage) + } + if report.Transitions != 3 { + t.Fatalf("observed transitions = %d, want 3", report.Transitions) + } +}