Skip to content

Pool discovery protocols shared across proxies on the same worker — Closes #55#117

Merged
conradbzura merged 5 commits intowool-labs:mainfrom
conradbzura:55-pool-discovery-protocols
Mar 25, 2026
Merged

Pool discovery protocols shared across proxies on the same worker — Closes #55#117
conradbzura merged 5 commits intowool-labs:mainfrom
conradbzura:55-pool-discovery-protocols

Conversation

@conradbzura
Copy link
Copy Markdown
Contributor

@conradbzura conradbzura commented Mar 24, 2026

Summary

Rewrite the subscriber pooling layer so that proxies sharing the same discovery protocol reuse a single underlying subscription with demand-driven multicast. Introduce a Fanout utility that wraps any async generator and distributes each item to multiple independent consumers via a leader/follower pull model — no background task required. Late-joining consumers receive a replay of all currently known workers so they start with a consistent view.

All class-level state uses WeakKeyDictionary so that entries cascade away automatically when their keys are garbage-collected — no manual cleanup needed beyond closing the async generator on shutdown.

Closes #55

Proposed changes

Fanout multicast utility

Add a generic Fanout[T] container that wraps a single AsyncGenerator[T] source and multicasts each pulled item to every registered FanoutConsumer. The first consumer whose queue is empty acquires a shared asyncio.Lock, pulls one item from the source via anext, distributes it to all other consumers' queues, and returns. Consumers are tracked in a WeakSet; when a consumer goes out of scope the weak reference expires and it is silently removed from the fan-out set. cleanup() closes the source generator and pushes a sentinel to all remaining consumer queues.

Demand-driven _SharedSubscription

Add a _SharedSubscription wrapper that bridges a discovery subscriber and a Fanout. Each __aiter__ call enters a ResourcePool resource, lazily wraps the raw subscriber in a shared Fanout, and iterates an independent FanoutConsumer. Worker state is tracked as a dict[uid, WorkerMetadata] per subscriber — only the latest metadata is stored, and entries are removed on worker-dropped. When a new consumer registers, it receives a replay of worker-added events for all currently known workers.

SubscriberMeta via __new__

Switch from __call__ to __new__ on the metaclass. The metaclass's __new__ (called at class definition time) injects a custom __new__ onto the subscriber class that caches the raw subscriber in the ResourcePool and returns a _SharedSubscription wrapper. Since the returned object is not an instance of the subscriber class, type.__call__ skips __init__ — no double-initialization.

Pickle support

Add __reduce__ to _SharedSubscription (delegates to the raw subscriber's __reduce__, so unpickling goes through the metaclass and re-wraps automatically) and to afilter (pickles predicate + inner subscriber).

Subscriber simplification

Remove the _consume() method from both LanDiscovery.Subscriber and LocalDiscovery.Subscriber. __aiter__ now returns _event_stream() directly. _shutdown becomes async and delegates to _SharedSubscription.cleanup.

Integration test: DURABLE_SHARED pool mode

Add a new pool arrangement where two WorkerPool instances share the same LocalDiscovery. The second pool discovers existing workers via late-joiner replay. Include pairwise filter and Hypothesis strategy updates so the new mode is covered across all dimension combinations.

Test cases

Test Suite # Given When Then Coverage Target
TestSubscriberMeta SM-001 A subscriber class using SubscriberMeta The class is instantiated It should return a _SharedSubscription wrapping the cached raw subscriber new wrapping
TestSubscriberMeta SM-002 Two construction calls with the same arguments Instantiated twice Both should wrap the same underlying raw subscriber Singleton caching
TestSubscriberMeta SM-003 Two construction calls with different arguments Instantiated with different keys Underlying raw subscribers should be different objects Key isolation
TestSubscriberMeta SM-004 The subscriber_pool ContextVar is None A subscriber is constructed The ContextVar should be set to a ResourcePool Lazy pool init
TestSharedSubscription SS-001 A _SharedSubscription wrapping a source that yields one event anext is called on the consumer It should return the event from the source Demand-driven pull
TestSharedSubscription SS-002 Two registered consumers sharing the same source One consumer triggers a pull via anext Both consumers should receive the event Fan-out to multiple consumers
TestSharedSubscription SS-003 A _SharedSubscription wrapping a subscriber aiter is called twice It should return two distinct _SharedSubscription instances Independent consumers
TestSharedSubscription SS-004 A registered consumer against a subscriber cleanup is called Iterator should be removed and sentinel pushed to consumer queues Cleanup and shutdown
TestSharedSubscription SS-005 A _SharedSubscription wrapping a stub subscriber with reduce Pickled and unpickled via cloudpickle It should produce a _SharedSubscription wrapping a subscriber with the same key Pickle roundtrip
TestSharedSubscription SS-006 Two consumers sharing a finite source of 2 events Both iterate to exhaustion First consumer should get both events; second should get replay Source exhaustion + late-joiner replay
TestSharedSubscription SS-007 A consumer registered against a cleaned-up subscriber anext is called It should raise StopAsyncIteration Post-cleanup termination
Testafilter AF-001 A source yielding one matching worker-added event afilter is iterated with an accepting predicate It should yield the worker-added event Single matching event
Testafilter AF-002 A source yielding worker-added then worker-updated afilter is iterated with an accept-all predicate It should yield both events in order Unfiltered passthrough
Testafilter AF-003 A source yielding a worker-added event afilter is iterated with a rejecting predicate It should yield no events Filter suppression
Testafilter AF-004 A source yielding worker-added (fails) then worker-updated (passes) afilter is iterated with a tag filter It should yield worker-added for the now-matching worker Filter entry transition
Testafilter AF-005 A source yielding worker-added (passes) then worker-updated (fails) afilter is iterated with a tag filter It should yield worker-added then worker-dropped with prior metadata Filter exit transition
Testafilter AF-006 A source yielding worker-added then worker-dropped afilter is iterated with accept-all It should yield both events Tracked worker drop
Testafilter AF-007 A source yielding worker-added (fails) then worker-dropped afilter is iterated with rejecting filter It should yield no events Untracked worker drop suppression
Testafilter AF-008 An afilter wrapping a repeatable source aiter is called twice sequentially Each iteration should produce independent results Independent iteration state
Testafilter AF-009 An afilter wrapping a picklable subscriber Pickled and unpickled via cloudpickle It should produce an afilter instance Pickle roundtrip
TestPoolComposition IC-001 Two WorkerPool instances sharing the same LocalDiscovery Both pools entered, coroutine dispatched through primary It should discover the worker and return the correct result Shared subscriber end-to-end

@conradbzura conradbzura force-pushed the 55-pool-discovery-protocols branch 4 times, most recently from c430e3e to bc950c0 Compare March 24, 2026 15:36
@conradbzura conradbzura self-assigned this Mar 24, 2026
@conradbzura conradbzura marked this pull request as ready for review March 24, 2026 15:40
Comment thread wool/src/wool/runtime/discovery/lan.py Outdated
Comment thread wool/src/wool/runtime/discovery/lan.py Outdated
Comment thread wool/src/wool/runtime/discovery/lan.py Outdated
Comment thread wool/src/wool/runtime/discovery/pool.py
Comment thread wool/src/wool/runtime/discovery/lan.py Outdated
Comment thread wool/src/wool/runtime/discovery/local.py Outdated
Comment thread wool/src/wool/runtime/discovery/lan.py
Comment thread wool/src/wool/runtime/worker/service.py Outdated
@conradbzura conradbzura force-pushed the 55-pool-discovery-protocols branch 4 times, most recently from ee94403 to c1b34cd Compare March 25, 2026 00:58
Comment thread wool/src/wool/runtime/discovery/lan.py Outdated
Comment thread wool/src/wool/runtime/discovery/__init__.py Outdated
Comment thread wool/src/wool/runtime/discovery/lan.py Outdated
Comment thread wool/src/wool/runtime/discovery/lan.py Outdated
original_init = cls.__init__ # type: ignore[misc]

def _subscriber_new(cls_arg: type, *args: Any, **kwargs: Any) -> Any:
key = cls_arg._cache_key(*args, **kwargs) # type: ignore[attr-defined]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a kwarg to __new__ so that usage can be FooSub(metaclass=SubscriberMeta, key=...).

Comment thread wool/src/wool/runtime/discovery/pool.py Outdated
@conradbzura conradbzura force-pushed the 55-pool-discovery-protocols branch from 874031a to 5c41926 Compare March 25, 2026 05:46
…ol.get_or_create_sync

Infrastructure for pooling discovery protocol subscriptions across
proxies sharing the same worker. Adds an async filter class that
wraps discovery event streams with transition tracking, a ContextVar
for managing subscriber singletons, and a synchronous cache accessor
on ResourcePool for use by metaclass construction.
Extracts the demand-driven multicast logic from _SharedSubscription
into a reusable Fanout/FanoutConsumer pair in wool.utilities.fanout.
A single async iterable source is fanned out to independent consumers
on demand with no background task — the first consumer whose queue is
empty acquires a lock, pulls one item, and distributes it.
@conradbzura conradbzura force-pushed the 55-pool-discovery-protocols branch from be408de to 781d4c2 Compare March 25, 2026 06:28
Introduce a SubscriberMeta metaclass that caches discovery
subscriber instances as singletons in a global ResourcePool,
keyed by protocol configuration. Each subscriber is wrapped in
a SharedSubscription that uses Fanout to multicast a single
event source to multiple concurrent consumers via a
leader/follower pull model — no background task required.

Late-joining consumers receive a replay of current workers so
they start with a consistent view. All class-level state uses
WeakKeyDictionary so entries cascade away when their keys are
garbage-collected.

Decouple filtering from discovery by using the afilter utility
in subscribe() instead of passing predicates into the event
stream. Simplify the Zeroconf and shared-memory listeners to
emit all events unfiltered.
…nout

Add tests for SubscriberMeta singleton caching and lazy pool
initialization, SharedSubscription fan-out and late-join replay,
afilter transition tracking with independent iteration state, and
ResourcePool.get_or_create_sync. Add pool subscription, afilter, and
discovery tests covering the new demand-driven multicast model.

Add DURABLE_SHARED pool arrangement where two WorkerPool instances
share the same LocalDiscovery subscriber, exercising SubscriberMeta
caching and SharedSubscription fan-out end-to-end with late-joiner replay.

Rewrite pool tests to exercise the new Fanout-backed iteration model
instead of the removed queue/lock internals. Add coverage for the
pool-absent RuntimeError guard, worker-dropped replay tracking, and
concurrent fan-out scenarios.

Update lan/local subscriber tests to use DiscoverySubscriberLike
isinstance checks. Clear the subscriber pool ContextVar between tests.
@conradbzura conradbzura force-pushed the 55-pool-discovery-protocols branch from 781d4c2 to 43c6759 Compare March 25, 2026 06:32
@conradbzura conradbzura force-pushed the 55-pool-discovery-protocols branch from 43c6759 to 3d16d90 Compare March 25, 2026 06:34
@conradbzura conradbzura merged commit 4e600b4 into wool-labs:main Mar 25, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Pool discovery protocols shared across proxies on the same worker

1 participant