diff --git a/api/context/resolver.go b/api/context/resolver.go new file mode 100644 index 000000000..6900ac9cd --- /dev/null +++ b/api/context/resolver.go @@ -0,0 +1,218 @@ +// SPDX-License-Identifier: MPL-2.0 + +package context + +import ( + "context" + "errors" + "fmt" + "sort" + "sync" + "sync/atomic" + + "github.com/wippyai/runtime/api/attrs" +) + +// frameResolversCtx keys the FrameResolvers registry on the AppContext. +var frameResolversCtx = &Key{Name: "frame.resolvers"} + +// ErrFrameResolverNotRegistered is returned when an options bag contains a key +// claimed by a frame-context resolver, but no registered resolver covers it. +var ErrFrameResolverNotRegistered = errors.New("frame resolver not registered") + +var ( + frameClaimsMu sync.Mutex + frameClaims atomic.Pointer[map[string]FrameResolverClaim] +) + +// FrameResolver maps a call's merged options to frame-context pairs applied to +// a newly spawned task or process frame. Resolvers are pure and stateless: they +// read ctx and options and emit pairs. This lets frame-decorating options (the +// network overlay, filesystem root, ...) be registered once at boot instead of +// hand-wired into every dispatcher. +type FrameResolver func(ctx context.Context, options attrs.Attributes) ([]Pair, error) + +// FrameResolverClaim reports whether a frame-context selection is active for a +// call. It lets Resolve fail closed when a subsystem option such as the network +// overlay was selected but the corresponding resolver was not boot-registered. +type FrameResolverClaim func(ctx context.Context, options attrs.Attributes) bool + +type frameResolverEntry struct { + fn FrameResolver + name string + claims []string + order int +} + +// FrameResolvers is an ordered set of FrameResolver functions. Registration +// happens once at boot and rebuilds an immutable snapshot; Resolve reads that +// snapshot atomically with no lock, so the spawn path pays only an atomic load. +// A nil *FrameResolvers is a valid empty registry for ordinary options, but +// Resolve still fails closed when the options bag contains a globally claimed +// frame option that no registered resolver covers. +type FrameResolvers struct { + snapshot atomic.Pointer[[]frameResolverEntry] + mu sync.Mutex // guards Register's copy-on-write +} + +// NewFrameResolvers returns an empty registry. +func NewFrameResolvers() *FrameResolvers { return &FrameResolvers{} } + +// RegisterFrameResolverClaim claims a frame-context selection. If selected +// returns true during dispatch but no resolver registered for this name, Resolve +// returns ErrFrameResolverNotRegistered instead of silently ignoring it. +// +// Packages that define frame-context selections should call this from init, +// then pass the same claim name to FrameResolvers.Register when their boot +// component wires the resolver. Duplicate claim names panic: claims are +// process-global, and silently replacing one could make missing-resolver +// validation fail open. +func RegisterFrameResolverClaim(name string, selected FrameResolverClaim) { + if name == "" { + panic("frame resolver claim name cannot be empty") + } + if selected == nil { + panic("frame resolver claim cannot be nil") + } + frameClaimsMu.Lock() + defer frameClaimsMu.Unlock() + + next := map[string]FrameResolverClaim{name: selected} + if cur := frameClaims.Load(); cur != nil { + next = make(map[string]FrameResolverClaim, len(*cur)+1) + for k, v := range *cur { + if k == name { + panic(fmt.Sprintf("frame resolver claim %q already registered", name)) + } + next[k] = v + } + next[name] = selected + } + frameClaims.Store(&next) +} + +// Register adds a resolver under a unique name with an explicit apply order +// (ascending; ties broken by name). Returns an error on a nil function or a +// duplicate name. claims names the claimed frame selections this resolver +// covers; Resolve fails closed if such a selection is active but the resolver +// was not registered. Intended to be called at boot only; it rebuilds the snapshot +// copy-on-write so Resolve never observes a partial update. +func (r *FrameResolvers) Register(name string, order int, fn FrameResolver, claims ...string) error { + if fn == nil { + return fmt.Errorf("frame resolver %q: nil function", name) + } + r.mu.Lock() + defer r.mu.Unlock() + + var entries []frameResolverEntry + if cur := r.snapshot.Load(); cur != nil { + entries = make([]frameResolverEntry, len(*cur), len(*cur)+1) + copy(entries, *cur) + } + for _, e := range entries { + if e.name == name { + return fmt.Errorf("frame resolver %q already registered", name) + } + } + entries = append(entries, frameResolverEntry{ + fn: fn, + name: name, + order: order, + claims: append([]string(nil), claims...), + }) + sort.Slice(entries, func(i, j int) bool { + if entries[i].order != entries[j].order { + return entries[i].order < entries[j].order + } + return entries[i].name < entries[j].name + }) + r.snapshot.Store(&entries) + return nil +} + +// Resolve applies every registered resolver in order and appends the pairs each +// produces to pairs, returning the extended slice. It stops at the first +// resolver error, wrapping it with the resolver name (the cause is preserved +// for errors.Is). A nil receiver, or one with no resolvers, returns pairs +// unchanged unless a claimed frame selection is active and no resolver covers +// it. This is lock-free and allocation-free: it reads immutable snapshots +// atomically. +func (r *FrameResolvers) Resolve(ctx context.Context, options attrs.Attributes, pairs []Pair) ([]Pair, error) { + if r == nil { + return pairs, validateFrameResolverClaims(ctx, options, nil) + } + cur := r.snapshot.Load() + if cur == nil { + return pairs, validateFrameResolverClaims(ctx, options, nil) + } + if err := validateFrameResolverClaims(ctx, options, *cur); err != nil { + return nil, err + } + for _, e := range *cur { + got, err := e.fn(ctx, options) + if err != nil { + return nil, fmt.Errorf("frame resolver %q: %w", e.name, err) + } + pairs = append(pairs, got...) + } + return pairs, nil +} + +func validateFrameResolverClaims(ctx context.Context, options attrs.Attributes, entries []frameResolverEntry) error { + claims := frameClaims.Load() + if claims == nil { + return nil + } + for name, selected := range *claims { + if frameResolverClaimCovered(entries, name) { + continue + } + if !selected(ctx, options) { + continue + } + return fmt.Errorf("%w for %q", ErrFrameResolverNotRegistered, name) + } + return nil +} + +func frameResolverClaimCovered(entries []frameResolverEntry, name string) bool { + for _, e := range entries { + if e.name == name { + return true + } + for _, claim := range e.claims { + if claim == name { + return true + } + } + } + return false +} + +// WithFrameResolvers stores the registry on the AppContext (write-once, boot +// time). No-op when the AppContext is absent or already holds a registry. +func WithFrameResolvers(ctx context.Context, resolvers *FrameResolvers) context.Context { + ac := AppFromContext(ctx) + if ac == nil { + return ctx + } + if ac.Get(frameResolversCtx) == nil { + ac.With(frameResolversCtx, resolvers) + } + return ctx +} + +// FrameResolversFrom retrieves the registry from the AppContext, or nil when +// none is wired. Calling Resolve on nil is allowed, but still rejects selected +// frame resolver claims that were globally registered and are not covered by a +// registered resolver. +func FrameResolversFrom(ctx context.Context) *FrameResolvers { + ac := AppFromContext(ctx) + if ac == nil { + return nil + } + if v, ok := ac.Get(frameResolversCtx).(*FrameResolvers); ok { + return v + } + return nil +} diff --git a/api/context/resolver_test.go b/api/context/resolver_test.go new file mode 100644 index 000000000..6a6738b5b --- /dev/null +++ b/api/context/resolver_test.go @@ -0,0 +1,235 @@ +// SPDX-License-Identifier: MPL-2.0 + +package context + +import ( + "context" + "errors" + "strconv" + "strings" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/wippyai/runtime/api/attrs" +) + +var ( + benchmarkCoveredClaimOnce sync.Once + testFrameClaimSeq atomic.Uint64 +) + +func testFrameResolverClaimName(t *testing.T) string { + t.Helper() + return "test.claim." + strings.NewReplacer("/", "_", " ", "_").Replace(t.Name()) + "." + strconv.FormatUint(testFrameClaimSeq.Add(1), 10) +} + +func pairResolver(order int, key *Key, value string) (string, FrameResolver) { + return value, func(_ context.Context, _ attrs.Attributes) ([]Pair, error) { + return []Pair{{Key: key, Value: value}}, nil + } +} + +func TestFrameResolvers_ResolveAppliesInOrder(t *testing.T) { + r := NewFrameResolvers() + key := &Key{Name: "test.key"} + + // Register out of order; Resolve must apply ascending by order. + _, third := pairResolver(30, key, "c") + _, first := pairResolver(10, key, "a") + _, second := pairResolver(20, key, "b") + require.NoError(t, r.Register("third", 30, third)) + require.NoError(t, r.Register("first", 10, first)) + require.NoError(t, r.Register("second", 20, second)) + + out, err := r.Resolve(context.Background(), nil, nil) + require.NoError(t, err) + require.Len(t, out, 3) + assert.Equal(t, "a", out[0].Value) + assert.Equal(t, "b", out[1].Value) + assert.Equal(t, "c", out[2].Value) +} + +func TestFrameResolvers_ResolveAppendsToInput(t *testing.T) { + r := NewFrameResolvers() + key := &Key{Name: "test.key"} + _, fn := pairResolver(10, key, "added") + require.NoError(t, r.Register("one", 10, fn)) + + existing := []Pair{{Key: &Key{Name: "pre"}, Value: "keep"}} + out, err := r.Resolve(context.Background(), nil, existing) + require.NoError(t, err) + require.Len(t, out, 2) + assert.Equal(t, "keep", out[0].Value, "input pairs must be preserved") + assert.Equal(t, "added", out[1].Value) +} + +func TestFrameResolvers_NilReceiverIsNoOp(t *testing.T) { + var r *FrameResolvers + existing := []Pair{{Key: &Key{Name: "pre"}, Value: "keep"}} + out, err := r.Resolve(context.Background(), nil, existing) + require.NoError(t, err) + assert.Equal(t, existing, out) +} + +func TestFrameResolvers_MissingClaimedResolverFailsClosed(t *testing.T) { + claim := testFrameResolverClaimName(t) + RegisterFrameResolverClaim(claim, func(_ context.Context, options attrs.Attributes) bool { + return options != nil && options.GetString(claim, "") != "" + }) + + var r *FrameResolvers + options := attrs.NewBag() + options.Set(claim, "selected") + + out, err := r.Resolve(context.Background(), options, nil) + require.Error(t, err) + assert.Nil(t, out) + assert.True(t, errors.Is(err, ErrFrameResolverNotRegistered)) + assert.Contains(t, err.Error(), claim) +} + +func TestFrameResolvers_RegisteredClaimAllowsResolve(t *testing.T) { + claim := testFrameResolverClaimName(t) + RegisterFrameResolverClaim(claim, func(_ context.Context, options attrs.Attributes) bool { + return options != nil && options.GetString(claim, "") != "" + }) + + r := NewFrameResolvers() + key := &Key{Name: "covered.key"} + _, fn := pairResolver(10, key, "ok") + require.NoError(t, r.Register("covered", 10, fn, claim)) + + options := attrs.NewBag() + options.Set(claim, "selected") + + out, err := r.Resolve(context.Background(), options, nil) + require.NoError(t, err) + require.Len(t, out, 1) + assert.Equal(t, "ok", out[0].Value) +} + +func TestRegisterFrameResolverClaim_DuplicatePanics(t *testing.T) { + claim := testFrameResolverClaimName(t) + RegisterFrameResolverClaim(claim, func(context.Context, attrs.Attributes) bool { + return false + }) + assert.Panics(t, func() { + RegisterFrameResolverClaim(claim, func(context.Context, attrs.Attributes) bool { + return true + }) + }) +} + +func TestFrameResolvers_ResolverNameCoversSameNamedClaim(t *testing.T) { + claim := testFrameResolverClaimName(t) + RegisterFrameResolverClaim(claim, func(_ context.Context, options attrs.Attributes) bool { + return options != nil && options.GetBool(claim, false) + }) + + r := NewFrameResolvers() + key := &Key{Name: "name.covered.key"} + _, fn := pairResolver(10, key, "ok") + require.NoError(t, r.Register(claim, 10, fn)) + + options := attrs.NewBag() + options.Set(claim, true) + + out, err := r.Resolve(context.Background(), options, nil) + require.NoError(t, err) + require.Len(t, out, 1) + assert.Equal(t, "ok", out[0].Value) +} + +func TestFrameResolvers_FirstErrorStopsAndWraps(t *testing.T) { + sentinel := errors.New("boom") + r := NewFrameResolvers() + key := &Key{Name: "test.key"} + _, ok := pairResolver(10, key, "a") + require.NoError(t, r.Register("ok", 10, ok)) + require.NoError(t, r.Register("bad", 20, func(_ context.Context, _ attrs.Attributes) ([]Pair, error) { + return nil, sentinel + })) + require.NoError(t, r.Register("after", 30, func(_ context.Context, _ attrs.Attributes) ([]Pair, error) { + t.Fatal("resolver after a failing one must not run") + return nil, nil + })) + + out, err := r.Resolve(context.Background(), nil, nil) + require.Error(t, err) + assert.Nil(t, out) + assert.True(t, errors.Is(err, sentinel), "cause must be preserved for errors.Is") + assert.Contains(t, err.Error(), "bad", "error must name the failing resolver") +} + +func TestFrameResolvers_RegisterRejectsDuplicateAndNil(t *testing.T) { + r := NewFrameResolvers() + _, fn := pairResolver(10, &Key{Name: "k"}, "v") + require.NoError(t, r.Register("dup", 10, fn)) + require.Error(t, r.Register("dup", 20, fn), "duplicate name must be rejected") + require.Error(t, r.Register("nilfn", 30, nil), "nil function must be rejected") +} + +func TestFrameResolvers_ContextRoundTrip(t *testing.T) { + assert.Nil(t, FrameResolversFrom(context.Background()), "no registry on a bare ctx") + + reg := NewFrameResolvers() + ctx := WithFrameResolvers(NewRootContext(), reg) + assert.Same(t, reg, FrameResolversFrom(ctx)) +} + +func BenchmarkFrameResolvers_Resolve(b *testing.B) { + r := NewFrameResolvers() + key := &Key{Name: "bench"} + for i, name := range []string{"a", "b"} { + _, fn := pairResolver(i*10, key, name) + _ = r.Register(name, i*10, fn) + } + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + out, err := r.Resolve(ctx, nil, nil) + if err != nil || len(out) != 2 { + b.Fatal(err) + } + } +} + +func BenchmarkFrameResolvers_ResolveEmpty(b *testing.B) { + var r *FrameResolvers // nil registry — the common no-overlay case + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = r.Resolve(ctx, nil, nil) + } +} + +func BenchmarkFrameResolvers_ResolveCoveredClaim(b *testing.B) { + const claim = "bench.claim.covered" + benchmarkCoveredClaimOnce.Do(func() { + RegisterFrameResolverClaim(claim, func(_ context.Context, options attrs.Attributes) bool { + return options != nil && options.GetBool("bench", false) + }) + }) + + r := NewFrameResolvers() + require.NoError(b, r.Register("bench", 10, func(context.Context, attrs.Attributes) ([]Pair, error) { + return nil, nil + }, claim)) + options := attrs.NewBag() + options.Set("bench", true) + + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + out, err := r.Resolve(ctx, options, nil) + if err != nil || len(out) != 0 { + b.Fatal(err) + } + } +} diff --git a/api/net/default_context.go b/api/net/default_context.go index e165fcbbf..4130dea94 100644 --- a/api/net/default_context.go +++ b/api/net/default_context.go @@ -29,8 +29,9 @@ var appDefaultNetworkKey = &ctxapi.Key{Name: "net.app_default_network"} // 3. app-level – network_service.default_network in .wippy.yaml // // Tiers 1 and 2 share the same options bag and are already merged by the -// caller; the runtime dispatchers (process, lua, wasm managers) fall back to -// tier 3 via AppDefaultNetwork when the merged bag has no "network" key. +// caller; OverlayResolver (applied by the function registry executor and the +// process manager) falls back to tier 3 via AppDefaultNetwork when the merged +// bag has no "network" key. // The resulting ID is written onto the new FrameContext via DefaultNetworkPair // and inherited through every fork. @@ -88,8 +89,8 @@ func AppDefaultNetwork(ctx context.Context) string { // ResolveOverlayID picks the overlay network ID to apply to a newly-spawned // task or process. Precedence: per-call/per-entry options bag ("network" // key) first, then the app-wide boot default, then empty ("clearnet"). -// Prefer ApplyOverlayPair for spawn sites — ResolveOverlayID is exported for -// callers that need the raw ID. +// ResolveOverlayID is exported for callers that need the raw ID; spawn sites +// go through OverlayResolver on the frame-resolver registry. func ResolveOverlayID(ctx context.Context, options attrs.Attributes) string { if options != nil { if id := options.GetString(OptionKeyNetwork, ""); id != "" { @@ -99,20 +100,36 @@ func ResolveOverlayID(ctx context.Context, options attrs.Attributes) string { return AppDefaultNetwork(ctx) } -// ApplyOverlayPair is the single entry point spawn sites use to decorate a -// new task/process context with its overlay network selection. It resolves -// the ID (per-call options, then app default), verifies the network is -// registered, and appends DefaultNetworkPair to pairs. When no overlay is -// selected the input is returned unchanged. Returns ErrNetworkNotFound when -// a selector names an unregistered network. -func ApplyOverlayPair(ctx context.Context, options attrs.Attributes, pairs []ctxapi.Pair) ([]ctxapi.Pair, error) { - networkID := ResolveOverlayID(ctx, options) - if networkID == "" { - return pairs, nil +// OverlayResolver is the frame-context resolver that decorates a new +// task/process frame with its overlay network selection. It resolves the ID +// (per-call options, then app default), verifies the network is registered, +// and emits DefaultNetworkPair. It emits nothing when no overlay is selected +// and returns ErrNetworkNotFound when a selector names an unregistered network. +// Registered on the frame-resolver registry at boot; the dispatchers apply it +// generically. +func OverlayResolver() ctxapi.FrameResolver { + return func(ctx context.Context, options attrs.Attributes) ([]ctxapi.Pair, error) { + networkID := ResolveOverlayID(ctx, options) + if networkID == "" { + return nil, nil + } + reg := GetNetworkRegistry(ctx) + if reg == nil || !reg.HasNetwork(registry.ParseID(networkID)) { + return nil, ErrNetworkNotFound + } + return []ctxapi.Pair{DefaultNetworkPair(networkID)}, nil } - reg := GetNetworkRegistry(ctx) - if reg == nil || !reg.HasNetwork(registry.ParseID(networkID)) { - return nil, ErrNetworkNotFound +} + +// ApplyOverlayPair resolves the overlay selection and appends its pair to pairs. +// +// Deprecated: spawn sites now apply the overlay through the frame-resolver +// registry (OverlayResolver, registered at boot). Retained for external Go +// callers; new code should register a FrameResolver instead. +func ApplyOverlayPair(ctx context.Context, options attrs.Attributes, pairs []ctxapi.Pair) ([]ctxapi.Pair, error) { + got, err := OverlayResolver()(ctx, options) + if err != nil { + return nil, err } - return append(pairs, DefaultNetworkPair(networkID)), nil + return append(pairs, got...), nil } diff --git a/api/net/default_context_test.go b/api/net/default_context_test.go index 3a89a265a..656f58be8 100644 --- a/api/net/default_context_test.go +++ b/api/net/default_context_test.go @@ -133,7 +133,7 @@ func TestWithAppDefaultNetwork_AppDefaultDoesNotReachFrame(t *testing.T) { assert.Equal(t, "app.net:socks5", AppDefaultNetwork(ctx)) } -// --- ApplyOverlayPair (centralized spawn-site helper) --- +// --- OverlayResolver (network frame-context resolver) --- // stubRegistry implements NetworkRegistry with an in-memory set of IDs. type stubRegistry struct { @@ -178,80 +178,83 @@ func optsWithNetwork(id string) attrs.Attributes { return b } -func TestApplyOverlayPair_NoSelection_Passthrough(t *testing.T) { +func TestOverlayResolver_NoSelection_Empty(t *testing.T) { ctx := ctxWithRegistry(newStubRegistry()) - pairs := []ctxapi.Pair{} - out, err := ApplyOverlayPair(ctx, nil, pairs) + out, err := OverlayResolver()(ctx, nil) require.NoError(t, err) - assert.Empty(t, out, "no selection must leave pairs unchanged") + assert.Empty(t, out, "no selection must emit no pair") } -func TestApplyOverlayPair_EmptyOptions_Passthrough(t *testing.T) { +func TestOverlayResolver_EmptyOptions_Empty(t *testing.T) { ctx := ctxWithRegistry(newStubRegistry()) - empty := attrs.NewBag() - out, err := ApplyOverlayPair(ctx, empty, nil) + out, err := OverlayResolver()(ctx, attrs.NewBag()) require.NoError(t, err) - assert.Nil(t, out, "empty options with nil pairs must round-trip nil") + assert.Empty(t, out, "empty options must emit no pair") } -func TestApplyOverlayPair_OptionsSelection_Appends(t *testing.T) { +func TestOverlayResolver_OptionsSelection(t *testing.T) { ctx := ctxWithRegistry(newStubRegistry("app.net:socks5")) - out, err := ApplyOverlayPair(ctx, optsWithNetwork("app.net:socks5"), nil) + out, err := OverlayResolver()(ctx, optsWithNetwork("app.net:socks5")) require.NoError(t, err) require.Len(t, out, 1) assert.Equal(t, DefaultNetworkPair("app.net:socks5"), out[0]) } -func TestApplyOverlayPair_AppDefaultFallback_Appends(t *testing.T) { +func TestOverlayResolver_AppDefaultFallback(t *testing.T) { ctx := ctxWithRegistry(newStubRegistry("app.net:socks5")) ctx = WithAppDefaultNetwork(ctx, "app.net:socks5") - out, err := ApplyOverlayPair(ctx, nil, nil) + out, err := OverlayResolver()(ctx, nil) require.NoError(t, err) require.Len(t, out, 1) assert.Equal(t, "app.net:socks5", out[0].Value) } -func TestApplyOverlayPair_OptionsBeatsAppDefault(t *testing.T) { +func TestOverlayResolver_OptionsBeatsAppDefault(t *testing.T) { ctx := ctxWithRegistry(newStubRegistry("app.net:tailscale")) ctx = WithAppDefaultNetwork(ctx, "app.net:socks5") - out, err := ApplyOverlayPair(ctx, optsWithNetwork("app.net:tailscale"), nil) + out, err := OverlayResolver()(ctx, optsWithNetwork("app.net:tailscale")) require.NoError(t, err) require.Len(t, out, 1) assert.Equal(t, "app.net:tailscale", out[0].Value, "per-call options must override app-level default") } -func TestApplyOverlayPair_UnknownNetwork_Errors(t *testing.T) { +func TestOverlayResolver_UnknownNetwork_Errors(t *testing.T) { ctx := ctxWithRegistry(newStubRegistry("app.net:other")) - out, err := ApplyOverlayPair(ctx, optsWithNetwork("app.net:missing"), nil) + out, err := OverlayResolver()(ctx, optsWithNetwork("app.net:missing")) assert.True(t, errors.Is(err, ErrNetworkNotFound)) assert.Nil(t, out) } -func TestApplyOverlayPair_NoRegistry_Errors(t *testing.T) { +func TestOverlayResolver_NoRegistry_Errors(t *testing.T) { // Selection present, but no registry on the AppContext to verify it. ctx := ctxapi.NewRootContext() - out, err := ApplyOverlayPair(ctx, optsWithNetwork("app.net:socks5"), nil) + out, err := OverlayResolver()(ctx, optsWithNetwork("app.net:socks5")) assert.True(t, errors.Is(err, ErrNetworkNotFound)) assert.Nil(t, out) } -func TestApplyOverlayPair_PreservesExistingPairs(t *testing.T) { +// ApplyOverlayPair is deprecated but retained for external callers; it must +// still append the resolved overlay pair to the supplied slice. +func TestApplyOverlayPair_DeprecatedWrapper(t *testing.T) { ctx := ctxWithRegistry(newStubRegistry("app.net:socks5")) - marker := &ctxapi.Key{Name: "test.marker"} - existing := []ctxapi.Pair{{Key: marker, Value: "sentinel"}} + existing := []ctxapi.Pair{{Key: marker, Value: "keep"}} out, err := ApplyOverlayPair(ctx, optsWithNetwork("app.net:socks5"), existing) require.NoError(t, err) require.Len(t, out, 2) - assert.Equal(t, "sentinel", out[0].Value, "prior pairs must be preserved") - assert.Equal(t, "app.net:socks5", out[1].Value, "overlay pair must be appended") + assert.Equal(t, "keep", out[0].Value, "prior pairs preserved") + assert.Equal(t, "app.net:socks5", out[1].Value, "overlay pair appended") + + // Unknown network still errors through the wrapper. + _, err = ApplyOverlayPair(ctx, optsWithNetwork("app.net:missing"), nil) + assert.True(t, errors.Is(err, ErrNetworkNotFound)) } diff --git a/api/net/network.go b/api/net/network.go index 739aecd71..a6f4ccf5f 100644 --- a/api/net/network.go +++ b/api/net/network.go @@ -3,7 +3,10 @@ package net import ( + "context" + "github.com/wippyai/runtime/api/attrs" + ctxapi "github.com/wippyai/runtime/api/context" "github.com/wippyai/runtime/api/registry" ) @@ -17,10 +20,23 @@ const ( KindTailscale registry.Kind = "network.tailscale" ) -// OptionKeyNetwork is the key under task/start Options bag that selects -// the overlay network to route outbound traffic through. Its value is -// a registry ID string such as "app.net:socks5". -const OptionKeyNetwork = "network" +const ( + // OptionKeyNetwork is the key under task/start Options bag that selects + // the overlay network to route outbound traffic through. Its value is + // a registry ID string such as "app.net:socks5". + OptionKeyNetwork = "network" + + // FrameResolverClaimNetwork is the global frame-resolver claim name for + // overlay network selection. Resolver registrations named this value, or + // explicitly covering it, satisfy the fail-closed network claim. + FrameResolverClaimNetwork = "network" +) + +func init() { + ctxapi.RegisterFrameResolverClaim(FrameResolverClaimNetwork, func(ctx context.Context, options attrs.Attributes) bool { + return ResolveOverlayID(ctx, options) != "" + }) +} // NetworkConfig holds common configuration for all network entries. type NetworkConfig struct { diff --git a/boot/components/system/all.go b/boot/components/system/all.go index 2ad1e55fd..ed389fcd0 100644 --- a/boot/components/system/all.go +++ b/boot/components/system/all.go @@ -17,6 +17,7 @@ func All() []boot.Component { Lifecycle(), Filesystem(), Environment(), + FrameResolvers(), Network(), SocketDispatcher(), Resources(), diff --git a/boot/components/system/constants.go b/boot/components/system/constants.go index 3310d6269..8c7c436bc 100644 --- a/boot/components/system/constants.go +++ b/boot/components/system/constants.go @@ -6,19 +6,20 @@ import "github.com/wippyai/runtime/api/boot" const ( // FilesystemName is a System component name - FilesystemName boot.Name = "filesystem" - EnvironmentName boot.Name = "env" - NetworkName boot.Name = "network" - ResourcesName boot.Name = "resources" - InterceptorName boot.Name = "interceptor" - FunctionsName boot.Name = "functions" - ContractsName boot.Name = "contracts" - ClusterName boot.Name = "cluster" - RaftName boot.Name = "raft" - GlobalRegName boot.Name = "globalreg" - EventualRegName boot.Name = "eventualreg" - KVCRDTName boot.Name = "kv.crdt" - PGName boot.Name = "pg" + FilesystemName boot.Name = "filesystem" + EnvironmentName boot.Name = "env" + NetworkName boot.Name = "network" + ResourcesName boot.Name = "resources" + InterceptorName boot.Name = "interceptor" + FrameResolversName boot.Name = "frame_resolvers" + FunctionsName boot.Name = "functions" + ContractsName boot.Name = "contracts" + ClusterName boot.Name = "cluster" + RaftName boot.Name = "raft" + GlobalRegName boot.Name = "globalreg" + EventualRegName boot.Name = "eventualreg" + KVCRDTName boot.Name = "kv.crdt" + PGName boot.Name = "pg" // ClusterEnabled is a Cluster configuration key ClusterEnabled boot.Name = "enabled" diff --git a/boot/components/system/errors.go b/boot/components/system/errors.go index d15bc1fcd..31ad676df 100644 --- a/boot/components/system/errors.go +++ b/boot/components/system/errors.go @@ -18,6 +18,7 @@ var ( ErrRouterNotAvailable = apierror.New(apierror.Internal, "router not available in context").WithRetryable(apierror.False) ErrTopologyNotAvailable = apierror.New(apierror.Internal, "topology not available in context").WithRetryable(apierror.False) ErrHandlerRegistryNotAvailable = apierror.New(apierror.Internal, "handler registry not available in context").WithRetryable(apierror.False) + ErrFrameResolversMissing = apierror.New(apierror.Internal, "frame resolver registry not available in context").WithRetryable(apierror.False) ) func NewHostnameError(cause error) apierror.Error { diff --git a/boot/components/system/frameresolvers.go b/boot/components/system/frameresolvers.go new file mode 100644 index 000000000..c330ca450 --- /dev/null +++ b/boot/components/system/frameresolvers.go @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: MPL-2.0 + +package system + +import ( + "context" + + "github.com/wippyai/runtime/api/boot" + ctxapi "github.com/wippyai/runtime/api/context" +) + +// FrameResolverOrderNetwork is the apply order of the network overlay resolver. +// Lower orders run first; the resulting pairs are applied to the frame in that +// order, so a later resolver's key wins a collision. +const FrameResolverOrderNetwork = 200 + +// FrameResolvers creates the frame-context resolver registry. Frame-decorating +// options register a resolver here at boot; the function and process +// dispatchers apply the whole set generically, so no dispatcher depends on a +// specific subsystem. +func FrameResolvers() boot.Component { + return boot.New(boot.P{ + Name: FrameResolversName, + DependsOn: []boot.Name{}, + Load: func(ctx context.Context) (context.Context, error) { + return ctxapi.WithFrameResolvers(ctx, ctxapi.NewFrameResolvers()), nil + }, + }) +} diff --git a/boot/components/system/network.go b/boot/components/system/network.go index 336b67244..122321e2c 100644 --- a/boot/components/system/network.go +++ b/boot/components/system/network.go @@ -6,6 +6,7 @@ import ( "context" "github.com/wippyai/runtime/api/boot" + ctxapi "github.com/wippyai/runtime/api/context" logapi "github.com/wippyai/runtime/api/logs" netapi "github.com/wippyai/runtime/api/net" netsystem "github.com/wippyai/runtime/system/net" @@ -17,7 +18,8 @@ import ( // registered at the service layer. func Network() boot.Component { return boot.New(boot.P{ - Name: NetworkName, + Name: NetworkName, + DependsOn: []boot.Name{FrameResolversName}, Load: func(ctx context.Context) (context.Context, error) { logger := logapi.GetLogger(ctx) @@ -29,6 +31,18 @@ func Network() boot.Component { reg := netsystem.NewRegistry(logger.Named("network")) ctx = netapi.WithNetworkRegistry(ctx, reg) + // Apply the overlay selection generically at spawn time. The + // registry is a hard dependency (DependsOn FrameResolversName); a + // missing one is a wiring bug, and failing loud here keeps the + // overlay from silently degrading to clearnet. + resolvers := ctxapi.FrameResolversFrom(ctx) + if resolvers == nil { + return nil, ErrFrameResolversMissing + } + if err := resolvers.Register(netapi.FrameResolverClaimNetwork, FrameResolverOrderNetwork, netapi.OverlayResolver()); err != nil { + return nil, err + } + return ctx, nil }, }) diff --git a/runtime/lua/component/function/lifecycle.go b/runtime/lua/component/function/lifecycle.go index ce39bc532..5048f7136 100644 --- a/runtime/lua/component/function/lifecycle.go +++ b/runtime/lua/component/function/lifecycle.go @@ -6,8 +6,6 @@ import ( "context" "errors" - ctxapi "github.com/wippyai/runtime/api/context" - netapi "github.com/wippyai/runtime/api/net" "github.com/wippyai/runtime/api/registry" "github.com/wippyai/runtime/api/runtime" api "github.com/wippyai/runtime/api/runtime/lua" @@ -99,19 +97,8 @@ func (m *Manager) Execute(ctx context.Context, task runtime.Task) (*runtime.Resu } defer entry.release() - var err error - if task.Context, err = netapi.ApplyOverlayPair(ctx, task.Options, task.Context); err != nil { - return nil, err - } - - if len(task.Context) > 0 { - fc := ctxapi.FrameFromContext(ctx) - if fc != nil { - if err := fc.SetMultiple(task.Context...); err != nil { - return nil, runtimelua.NewOperationError("set task context", err) - } - } - } + // task.Context is already applied to this forked frame by the function + // registry executor before this handler runs, so it is not re-set here. if entry.hostID != "" { if framePID, ok := runtime.GetFramePID(ctx); ok { diff --git a/runtime/wasm/component/function/lifecycle.go b/runtime/wasm/component/function/lifecycle.go index 9a1b359ce..56af4a49b 100644 --- a/runtime/wasm/component/function/lifecycle.go +++ b/runtime/wasm/component/function/lifecycle.go @@ -6,8 +6,6 @@ import ( "context" "fmt" - ctxapi "github.com/wippyai/runtime/api/context" - netapi "github.com/wippyai/runtime/api/net" "github.com/wippyai/runtime/api/registry" "github.com/wippyai/runtime/api/runtime" api "github.com/wippyai/runtime/api/runtime/wasm" @@ -76,19 +74,8 @@ func (m *Manager) Execute(ctx context.Context, task runtime.Task) (*runtime.Resu } defer entry.release() - var err error - if task.Context, err = netapi.ApplyOverlayPair(ctx, task.Options, task.Context); err != nil { - return nil, err - } - - if len(task.Context) > 0 { - fc := ctxapi.FrameFromContext(ctx) - if fc != nil { - if err := fc.SetMultiple(task.Context...); err != nil { - return nil, fmt.Errorf("set task context: %w", err) - } - } - } + // task.Context is already applied to this forked frame by the function + // registry executor before this handler runs, so it is not re-set here. if entry.hostID != "" { if framePID, ok := runtime.GetFramePID(ctx); ok { diff --git a/system/function/functions.go b/system/function/functions.go index 670bf4ddb..fb160520b 100644 --- a/system/function/functions.go +++ b/system/function/functions.go @@ -221,6 +221,13 @@ func (f *Registry) executor(ctx context.Context, handler function.Func, task run ) pairs = append(pairs, task.Context...) + // Apply the registered frame-context resolvers (e.g. the network overlay) + // generically, so this dispatcher stays agnostic of any specific subsystem. + pairs, err := ctxapi.FrameResolversFrom(ctx).Resolve(ctx, task.Options, pairs) + if err != nil { + return nil, err + } + if err := fc.SetMultiple(pairs...); err != nil { return nil, NewFrameContextError(err) } diff --git a/system/process/manager.go b/system/process/manager.go index 08a76255d..87677a93c 100644 --- a/system/process/manager.go +++ b/system/process/manager.go @@ -5,7 +5,7 @@ package process import ( "context" - netapi "github.com/wippyai/runtime/api/net" + ctxapi "github.com/wippyai/runtime/api/context" "github.com/wippyai/runtime/api/pid" api "github.com/wippyai/runtime/api/process" "github.com/wippyai/runtime/api/relay" @@ -47,8 +47,10 @@ func (m *Manager) Start(ctx context.Context, start *api.Start) (pid.PID, error) return pid.PID{}, NewInvalidHostError(start.HostID) } + // Apply the registered frame-context resolvers (e.g. the network overlay) + // generically; the manager stays agnostic of any specific subsystem. var err error - if start.Context, err = netapi.ApplyOverlayPair(ctx, start.Options, start.Context); err != nil { + if start.Context, err = ctxapi.FrameResolversFrom(ctx).Resolve(ctx, start.Options, start.Context); err != nil { return pid.PID{}, err } diff --git a/system/process/manager_network_test.go b/system/process/manager_network_test.go index dce1f7ac3..67bc9cdb4 100644 --- a/system/process/manager_network_test.go +++ b/system/process/manager_network_test.go @@ -47,8 +47,21 @@ func (m *mockNetworkRegistry) NetworkKind(_ registry.ID) registry.Kind { } // ctxWithNetworkRegistry returns a context carrying the given NetworkRegistry -// on a fresh AppContext. +// and a frame-resolver registry with the network overlay resolver, matching how +// boot wires them — the manager applies the resolver generically. func ctxWithNetworkRegistry(reg netapi.NetworkRegistry) context.Context { + ctx := ctxapi.NewRootContext() + if reg != nil { + ctx = netapi.WithNetworkRegistry(ctx, reg) + } + resolvers := ctxapi.NewFrameResolvers() + if err := resolvers.Register(netapi.FrameResolverClaimNetwork, 200, netapi.OverlayResolver()); err != nil { + panic(err) + } + return ctxapi.WithFrameResolvers(ctx, resolvers) +} + +func ctxWithNetworkRegistryOnly(reg netapi.NetworkRegistry) context.Context { ctx := ctxapi.NewRootContext() if reg != nil { ctx = netapi.WithNetworkRegistry(ctx, reg) @@ -162,6 +175,50 @@ func TestManager_Start_NetworkOption_NoRegistry_ReturnsErrNetworkNotFound(t *tes assert.False(t, host.runCalled, "host.Run must not be called when registry is unavailable") } +func TestManager_Start_NetworkOption_NoResolverFailsClosed(t *testing.T) { + node := newMockNode() + host := &mockHost{} + _ = node.RegisterHost("test-host", host) + + mgr := NewManager(node, zap.NewNop()) + + opts := attrs.NewBag() + opts.Set(netapi.OptionKeyNetwork, "app.net:socks5") + + start := &process.Start{ + HostID: "test-host", + Source: registry.NewID("test", "source"), + Options: opts, + } + + _, err := mgr.Start(ctxWithNetworkRegistryOnly(newMockNetworkRegistry("app.net:socks5")), start) + require.Error(t, err) + assert.True(t, errors.Is(err, ctxapi.ErrFrameResolverNotRegistered)) + assert.False(t, host.runCalled, "host.Run must not be called when network resolver is missing") +} + +func TestManager_Start_AppDefaultNetwork_NoResolverFailsClosed(t *testing.T) { + node := newMockNode() + host := &mockHost{} + _ = node.RegisterHost("test-host", host) + + mgr := NewManager(node, zap.NewNop()) + + ctx := ctxWithNetworkRegistryOnly(newMockNetworkRegistry("app.net:socks5")) + ctx = netapi.WithAppDefaultNetwork(ctx, "app.net:socks5") + + start := &process.Start{ + HostID: "test-host", + Source: registry.NewID("test", "source"), + Options: attrs.NewBag(), + } + + _, err := mgr.Start(ctx, start) + require.Error(t, err) + assert.True(t, errors.Is(err, ctxapi.ErrFrameResolverNotRegistered)) + assert.False(t, host.runCalled, "host.Run must not be called when app-default network resolver is missing") +} + func TestManager_Start_NetworkOption_AppendsToExistingContext(t *testing.T) { node := newMockNode() host := &mockHost{}