diff --git a/tests/test_async.py b/tests/test_async.py index bc1ee7e9..f145e3cb 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -1,3 +1,6 @@ +import time +import threading + import anyio from pytest import mark, raises @@ -12,13 +15,19 @@ class GPUPromise(BaseGPUPromise): # Subclass with each own set of unresolved promise instances _UNRESOLVED = set() + def _sync_wait(self): + # Same implementation as the wgpu_native backend. + # If we have a test that has not polling thread, and sync_wait() is called + # when the promise is still pending, this will hang. + self._thread_event.wait() + class SillyLoop: def __init__(self): self._pending_calls = [] self.errors = [] - def call_soon(self, f, *args): + def call_soon_threadsafe(self, f, *args): self._pending_calls.append((f, args)) def process_events(self): @@ -73,23 +82,18 @@ def test_promise_basics(): # %%%%% Promise using sync_wait -def test_promise_sync_need_poll(): - promise = GPUPromise("test", None) - - with raises(RuntimeError): # cannot poll without poll function - promise.sync_wait() +def run_in_thread(callable): + t = threading.Thread(target=callable) + t.start() def test_promise_sync_simple(): - count = 0 - + @run_in_thread def poller(): - nonlocal count - count += 1 - if count > 5: - promise._wgpu_set_input(42) + time.sleep(0.1) + promise._wgpu_set_input(42) - promise = GPUPromise("test", None, poller=poller) + promise = GPUPromise("test", None) result = promise.sync_wait() assert result == 42 @@ -99,15 +103,12 @@ def test_promise_sync_normal(): def handler(input): return input * 2 - count = 0 - + @run_in_thread def poller(): - nonlocal count - count += 1 - if count > 5: - promise._wgpu_set_input(42) + time.sleep(0.1) + promise._wgpu_set_input(42) - promise = GPUPromise("test", handler, poller=poller) + promise = GPUPromise("test", handler) result = promise.sync_wait() assert result == 84 @@ -117,15 +118,12 @@ def test_promise_sync_fail1(): def handler(input): return input * 2 - count = 0 - + @run_in_thread def poller(): - nonlocal count - count += 1 - if count > 5: - promise._wgpu_set_error(ZeroDivisionError()) + time.sleep(0.1) + promise._wgpu_set_error(ZeroDivisionError()) - promise = GPUPromise("test", handler, poller=poller) + promise = GPUPromise("test", handler) with raises(ZeroDivisionError): promise.sync_wait() @@ -135,15 +133,12 @@ def test_promise_sync_fail2(): def handler(input): return input / 0 - count = 0 - + @run_in_thread def poller(): - nonlocal count - count += 1 - if count > 5: - promise._wgpu_set_input(42) + time.sleep(0.1) + promise._wgpu_set_input(42) - promise = GPUPromise("test", handler, poller=poller) + promise = GPUPromise("test", handler) with raises(ZeroDivisionError): promise.sync_wait() @@ -152,25 +147,14 @@ def poller(): # %% Promise using await with poll and loop -@mark.anyio -async def test_promise_async_need_poll_or_loop(): - promise = GPUPromise("test", None) - - with raises(RuntimeError): # cannot poll without poll function - await promise - - @mark.anyio async def test_promise_async_poll_simple(): - count = 0 - + @run_in_thread def poller(): - nonlocal count - count += 1 - if count > 5: - promise._wgpu_set_input(42) + time.sleep(0.1) + promise._wgpu_set_input(42) - promise = GPUPromise("test", None, poller=poller) + promise = GPUPromise("test", None) result = await promise assert result == 42 @@ -181,15 +165,12 @@ async def test_promise_async_poll_normal(): def handler(input): return input * 2 - count = 0 - + @run_in_thread def poller(): - nonlocal count - count += 1 - if count > 5: - promise._wgpu_set_input(42) + time.sleep(0.1) + promise._wgpu_set_input(42) - promise = GPUPromise("test", handler, poller=poller) + promise = GPUPromise("test", handler) result = await promise assert result == 84 @@ -200,15 +181,12 @@ async def test_promise_async_poll_fail1(): def handler(input): return input * 2 - count = 0 - + @run_in_thread def poller(): - nonlocal count - count += 1 - if count > 5: - promise._wgpu_set_error(ZeroDivisionError()) + time.sleep(0.1) + promise._wgpu_set_error(ZeroDivisionError()) - promise = GPUPromise("test", handler, poller=poller) + promise = GPUPromise("test", handler) with raises(ZeroDivisionError): await promise @@ -219,15 +197,12 @@ async def test_promise_async_poll_fail2(): def handler(input): return input / 0 - count = 0 - + @run_in_thread def poller(): - nonlocal count - count += 1 - if count > 5: - promise._wgpu_set_input(42) + time.sleep(0.1) + promise._wgpu_set_input(42) - promise = GPUPromise("test", handler, poller=poller) + promise = GPUPromise("test", handler) with raises(ZeroDivisionError): await promise diff --git a/tests/test_wgpu_native_poller.py b/tests/test_wgpu_native_poller.py new file mode 100644 index 00000000..b6403a6c --- /dev/null +++ b/tests/test_wgpu_native_poller.py @@ -0,0 +1,188 @@ +import gc +import time +import queue + +import wgpu +from wgpu.backends.wgpu_native._poller import PollThread, PollToken + +from testutils import can_use_wgpu_lib, run_tests, is_pypy +from pytest import mark + + +def test_poll_thread(): + # A timeout to give polling thread time to progress. The GIL switches + # threads about every 5ms, but in this cases likely faster, because it also switches + # when it goes to sleep on a blocking call. So 50ms seems plenty. + timeout = 0.05 + + count = 0 + gpu_work_done_queue = queue.SimpleQueue() + + def reset(): + nonlocal count + ref_count = count + # Make sure the poller is not waiting in poll_func + gpu_work_done_queue.put(None) + gpu_work_done_queue.put(None) + # Give it time + time.sleep(timeout) + # Check that it did not enter again, i.e. is waiting for tokens + assert count == ref_count, "Looks like a token is still active" + # Reset + count = 0 + while True: + try: + gpu_work_done_queue.get(False) + except queue.Empty: + break + + def finish_tokens(*tokens): + # This mimics the GPU finishing an async task, and invoking its + # callback that sets the token to done. + gpu_work_done_queue.put(None) + for token in tokens: + assert not token.is_done() + token.set_done() + + def poll_func(block): + # This mimics the wgpuDevicePoll. + nonlocal count + count += 1 + if block: + gpu_work_done_queue.get() # blocking + else: + try: + gpu_work_done_queue.get(False) + except queue.Empty: + pass + + # Start the poller + t = PollThread(poll_func) + t.start() + + reset() + + # == Normal behavior + + token = t.get_token() + assert isinstance(token, PollToken) + time.sleep(timeout) + assert count == 2 + + finish_tokens(token) + + time.sleep(timeout) + assert count == 2 + + reset() + + # == Always at least one poll + + token = t.get_token() + token.set_done() + time.sleep(timeout) + assert count in (1, 2) # typically 1, but can sometimes be 2 + + reset() + + # == Mark done through deletion + + token = t.get_token() + time.sleep(timeout) + assert count == 2 + + finish_tokens() + + time.sleep(timeout) + assert count == 3 + + finish_tokens() + + time.sleep(timeout) + assert count == 4 + + del token + gc.collect() + gc.collect() + + finish_tokens() + + time.sleep(timeout) + assert count == 4 + + reset() + + # More tasks + + token1 = t.get_token() + time.sleep(timeout) + assert count == 2 + + token2 = t.get_token() + time.sleep(timeout) + assert count == 2 + + token3 = t.get_token() + token4 = t.get_token() + time.sleep(timeout) + assert count == 2 + + finish_tokens(token1) + time.sleep(timeout) + assert count == 3 + + finish_tokens(token2, token3) + time.sleep(timeout) + assert count == 4 + + finish_tokens() # can actually bump more unrelated works + finish_tokens() + time.sleep(timeout) + assert count == 6 + + token5 = t.get_token() + finish_tokens(token4) + time.sleep(timeout) + assert count == 7 + + finish_tokens(token5) + time.sleep(timeout) + assert count == 8 + + reset() + + # Shut it down + + t.stop() + time.sleep(0.1) + assert not t.is_alive() + + +@mark.skipif(not can_use_wgpu_lib, reason="Needs wgpu lib") +def test_poller_stops_when_device_gone(): + device = wgpu.gpu.request_adapter_sync().request_device_sync() + + t = device._poller + assert t.is_alive() + device.__del__() + time.sleep(0.1) + + assert not t.is_alive() + + device = wgpu.gpu.request_adapter_sync().request_device_sync() + + t = device._poller + assert t.is_alive() + del device + gc.collect() + gc.collect() + if is_pypy: + gc.collect() + gc.collect() + time.sleep(0.1) + + assert not t.is_alive() + + +if __name__ == "__main__": + run_tests(globals()) diff --git a/wgpu/_async.py b/wgpu/_async.py index 20dff7c3..81e08f29 100644 --- a/wgpu/_async.py +++ b/wgpu/_async.py @@ -3,7 +3,6 @@ from __future__ import annotations import sys -import time import logging import threading from typing import Callable, Awaitable, Generator, Generic, TypeVar @@ -81,7 +80,7 @@ class GPUPromise(Awaitable[AwaitedType], Generic[AwaitedType]): * "rejected": meaning that the operation failed. """ - # We keep a set of unresolved promises, because whith using .then, noone else holds a ref to the promise + # We keep a set of unresolved promises, because with using .then, nothing else holds a ref to the promise _UNRESOLVED = set() def __init__( @@ -90,7 +89,6 @@ def __init__( handler: Callable | None, *, loop: LoopInterface | None = None, - poller: Callable | None = None, keepalive: object = None, ): """ @@ -99,24 +97,22 @@ def __init__( handler (callable, optional): The function to turn promise input into the result. If None, the result will simply be the input. loop (LoopInterface, optional): A loop object that at least has a ``call_soon()`` method. - If not given, this promise does not support .then() or pronise-chaining. - poller (callable, optional): A function to call on a regular interval to poll internal systems - (most likely the wgpu backend). + If not given, this promise does not support .then() or promise-chaining. keepalive (object, optional): Pass any data via this arg who's lifetime must be bound to the - resolving of this prommise. + resolving of this promise. """ self._title = str(title) # title for debugging self._handler = handler # function to turn input into the result self._loop = loop # Event loop instance, can be None - self._poller = poller # call to poll (process events) self._keepalive = keepalive # just to keep something alive self._state = "pending" # "pending", "pending-rejected", "pending-fulfilled", "rejected", "fulfilled" self._value = None # The incoming value, final value, or error - self._event = None # AsyncEvent for __await__ self._lock = threading.RLock() # Allow threads to set the value + self._async_event = None # AsyncEvent for __await__ + self._thread_event = threading.Event() self._done_callbacks = [] self._error_callbacks = [] self._UNRESOLVED.add(self) @@ -181,20 +177,26 @@ def _set_error(self, error: str | Exception, *, resolve_now=True) -> None: def _set_pending_resolved(self, *, resolve_now=False): """The promise received its input (or error), and now we need to handle it, then call callbacks etc.""" + # This may be called from a different thread. If resolve_now is True, it should be the main/reference thread. + # We can now drop the reference. self._UNRESOLVED.discard(self) + # Mark as not pending for threads + self._thread_event.set() # Do or schedule a call to resolve. if resolve_now: self._resolve_callback() + if self._async_event is not None: + self._async_event.set() elif self._loop is not None: - self._loop.call_soon(self._resolve_callback) - # Allow tasks that await this promise to continue. Do this last, since - # it allows any waiting tasks to continue. These taks are assumed to be - # on the 'reference' thread, but *this* may be a different thread. - if self._event is not None: - self._event.set() + self._loop.call_soon_threadsafe(self._resolve_callback) def _resolve_callback(self): + # This should only be called in the main/reference thread. + + # Allow tasks that await this promise to continue. + if self._async_event is not None: + self._async_event.set() # The callback may already be resolved if self._state.startswith("pending-"): self._resolve() @@ -216,18 +218,17 @@ def _resolve(self): if self._state.endswith("rejected"): error = self._value for cb in self._error_callbacks: - self._loop.call_soon(cb, error) + self._loop.call_soon_threadsafe(cb, error) elif self._state.endswith("fulfilled"): result = self._value for cb in self._done_callbacks: - self._loop.call_soon(cb, result) + self._loop.call_soon_threadsafe(cb, result) # New state self._state = self._state.replace("pending-", "") # Clean up self._error_callbacks = [] self._done_callbacks = [] self._handler = None - self._poller = None self._keepalive = None # Resolve to the caller if self._state == "rejected": @@ -247,20 +248,16 @@ def sync_wait(self) -> AwaitedType: portable. """ if self._state == "pending": - if self._poller is None: - raise RuntimeError( - "Cannot GPUPromise.sync_wait(), if the polling function is not set." - ) - # Do small incremental sync naps. Other threads can run. - # Note that time.sleep is accurate (does not suffer from the inaccuracy issue on Windows). - sleep_gen = get_backoff_time_generator() - self._poller() - while self._state == "pending": - time.sleep(next(sleep_gen)) - self._poller() - + self._sync_wait() return self._resolve() # returns result if fulfilled or raise error if rejected + def _sync_wait(self): + # Each subclass may implement this in its own way. E.g. it may wait for + # the _thread_event, it may poll the device in a loop while checking the + # status, and Pyodide may use its special logic to sync wait the JS + # promise. + raise NotImplementedError() + def _chain(self, to_promise: GPUPromise): with self._lock: self._done_callbacks.append(to_promise._set_input) @@ -296,9 +293,7 @@ def then( title = self._title + " -> " + callback_name # Create new promise - new_promise = self.__class__( - title, callback, loop=self._loop, poller=self._poller - ) + new_promise = self.__class__(title, callback, loop=self._loop) self._chain(new_promise) if error_callback is not None: @@ -322,9 +317,7 @@ def catch(self, callback: Callable[[Exception], None] | None): title = "Catcher for " + self._title # Create new promise - new_promise = self.__class__( - title, callback, loop=self._loop, poller=self._poller - ) + new_promise = self.__class__(title, callback, loop=self._loop) # Custom chain with self._lock: @@ -339,30 +332,26 @@ def __await__(self): # An async busy loop async def awaiter(): if self._state == "pending": - # backoff_time_generator = self._get_backoff_time_generator() - if self._poller is None: - raise RuntimeError( - "Cannot await a GPUPromise if neither the loop nor the poller are set." - ) # Do small incremental async naps. Other tasks and threads can run. # Note that async sleep, with sleep_time > 0, is inaccurate on Windows. sleep_gen = get_backoff_time_generator() - self._poller() while self._state == "pending": await async_sleep(next(sleep_gen)) - self._poller() return self._resolve() else: - # Using a signal + # Using an async Event. + # When using a thread to poll, that thread will wake as soon as the GPU is done, + # and will then (via a call_soon_threadsafe) set the event; this is a very fast + # path with no busy-looping whatsoever. with self._lock: - if self._event is None: - self._event = AsyncEvent() + if self._async_event is None: + self._async_event = AsyncEvent() if self._state != "pending": - self._event.set() + self._async_event.set() async def awaiter(): - await self._event.wait() + await self._async_event.wait() return self._resolve() return (yield from awaiter().__await__()) diff --git a/wgpu/backends/wgpu_native/_api.py b/wgpu/backends/wgpu_native/_api.py index 94f24725..c9602089 100644 --- a/wgpu/backends/wgpu_native/_api.py +++ b/wgpu/backends/wgpu_native/_api.py @@ -29,6 +29,7 @@ from ._ffi import ffi, lib from ._mappings import cstructfield2enum, enummap, enum_str2int, enum_int2str +from ._poller import PollThread from ._helpers import ( get_wgpu_instance, get_surface_id_from_info, @@ -564,24 +565,25 @@ def request_adapter_callback(status, result, c_message, _userdata1, _userdata2): def handler(adapter_id): return self._create_adapter(adapter_id, loop) - instance = get_wgpu_instance() - - def poller(): - # H: void f(WGPUInstance instance) - libf.wgpuInstanceProcessEvents(instance) - - # Note that although we claim this is an asynchronous method, the callback - # happens within libf.wgpuInstanceRequestAdapter promise = GPUPromise( "request_adapter", handler, loop=loop, - poller=poller, keepalive=request_adapter_callback, ) + instance = get_wgpu_instance() + # H: WGPUFuture f(WGPUInstance instance, WGPURequestAdapterOptions const * options, WGPURequestAdapterCallbackInfo callbackInfo) - libf.wgpuInstanceRequestAdapter(get_wgpu_instance(), struct, callback_info) + libf.wgpuInstanceRequestAdapter(instance, struct, callback_info) + + # Note that although we claim this is an asynchronous method, the callback + # happens within libf.wgpuInstanceRequestAdapter. + # To be sure though, we tickle the instance and double-check that the promise is set. + + # H: void f(WGPUInstance instance) + libf.wgpuInstanceProcessEvents(instance) + assert promise._state != "pending" return promise @@ -690,7 +692,10 @@ def get_canvas_context(self, present_info: dict) -> GPUCanvasContext: class GPUPromise(classes.GPUPromise): - pass + def _sync_wait(self): + # In the wgpu-native backend, we do the polling in a per-device thread. + # The base class already sets a threading.Event, we can just use that here. + self._thread_event.wait() class GPUCanvasContext(classes.GPUCanvasContext): @@ -1380,23 +1385,24 @@ def handler(device_id): return device - instance = get_wgpu_instance() - - def poller(): - # H: void f(WGPUInstance instance) - libf.wgpuInstanceProcessEvents(instance) - promise = GPUPromise( "request_device", handler, loop=self._loop, - poller=poller, keepalive=request_device_callback, ) # H: WGPUFuture f(WGPUAdapter adapter, WGPUDeviceDescriptor const * descriptor, WGPURequestDeviceCallbackInfo callbackInfo) libf.wgpuAdapterRequestDevice(self._internal, struct, callback_info) + # Note that although we claim this is an asynchronous method, the callback + # happens within libf.wgpuAdapterRequestDevice. + # To be sure though, we tickle the instance and double-check that the promise is set. + + # H: void f(WGPUInstance instance) + libf.wgpuInstanceProcessEvents(get_wgpu_instance()) + assert promise._state != "pending" + return promise def _release(self): @@ -1415,11 +1421,33 @@ class GPUDevice(classes.GPUDevice, GPUObjectBase): # they now exist in the header, but are still unimplemented: https://github.com/gfx-rs/wgpu-native/blob/f29ebee88362934f8f9fab530f3ccb7fde2d49a9/src/unimplemented.rs#L66-L82 _CREATE_PIPELINE_ASYNC_IS_IMPLEMENTED = False - def _poll(self): + def __init__(self, label, internal, adapter, features, limits, queue): + super().__init__(label, internal, adapter, features, limits, queue) + + # Create a polling thread for this device. An alternative would be to + # have a single global thread for the instance, but this is currently + # not feasible because wgpuInstanceProcessEvents() has no arg to make it + # blocking. A per-device thread is also likely better, otherwise a + # long-running GPU task in one device can prevent the resolving of + # promises in other devices. This is probably why a per-device thread is + # mentioned as a possible improvement in the PR that adds a similar + # mechanic to the Servo browser. I + + internal = self._internal # just an int + + def poll_func(block): + # This function has no direct nor indirect refs to the device object; avoid circular loops + # H: WGPUBool f(WGPUDevice device, WGPUBool wait, WGPUSubmissionIndex const * submissionIndex) + libf.wgpuDevicePoll(internal, block, ffi.NULL) + + self._poller = PollThread(poll_func) + self._poller.start() + + def _poll(self, block=False): # Internal function if self._internal: # H: WGPUBool f(WGPUDevice device, WGPUBool wait, WGPUSubmissionIndex const * submissionIndex) - libf.wgpuDevicePoll(self._internal, False, ffi.NULL) + libf.wgpuDevicePoll(self._internal, block, ffi.NULL) def _poll_wait(self): if self._internal: @@ -1940,6 +1968,7 @@ def create_compute_pipeline_async( "void(WGPUCreatePipelineAsyncStatus, WGPUComputePipeline, char *, void *, void *)" ) def callback(status, result, c_message, _userdata1, _userdata2): + token.set_done() if status != lib.WGPUCreatePipelineAsyncStatus_Success: msg = from_c_string_view(c_message) promise._wgpu_set_error( @@ -1965,10 +1994,11 @@ def handler(id): "create_compute_pipeline", handler, loop=self._loop, - poller=self._device._poll, keepalive=callback, ) + token = self._device._poller.get_token() + # H: WGPUFuture f(WGPUDevice device, WGPUComputePipelineDescriptor const * descriptor, WGPUCreateComputePipelineAsyncCallbackInfo callbackInfo) libf.wgpuDeviceCreateComputePipelineAsync( self._internal, descriptor, callback_info @@ -2065,6 +2095,7 @@ def create_render_pipeline_async( "void(WGPUCreatePipelineAsyncStatus, WGPURenderPipeline, WGPUStringView, void *, void *)" ) def callback(status, result, c_message, _userdata1, _userdata2): + token.set_done() if status != lib.WGPUCreatePipelineAsyncStatus_Success: msg = from_c_string_view(c_message) promise._wgpu_set_error( @@ -2090,10 +2121,11 @@ def handler(id): "create_render_pipeline", handler, loop=self._loop, - poller=self._device._poll, keepalive=callback, ) + token = self._device._poller.get_token() + # H: WGPUFuture f(WGPUDevice device, WGPURenderPipelineDescriptor const * descriptor, WGPUCreateRenderPipelineAsyncCallbackInfo callbackInfo) libf.wgpuDeviceCreateRenderPipelineAsync( self._internal, @@ -2450,6 +2482,9 @@ def _release(self): if self._queue is not None: queue, self._queue = self._queue, None queue._release() + if self._poller is not None: + self._poller.stop() + self._poller = None super()._release() @@ -2538,6 +2573,7 @@ def map_async( @ffi.callback("void(WGPUMapAsyncStatus, WGPUStringView, void *, void *)") def buffer_map_callback(status, c_message, _userdata1, _userdata2): + token.set_done() if status != lib.WGPUMapAsyncStatus_Success: msg = from_c_string_view(c_message) promise._wgpu_set_error( @@ -2565,10 +2601,11 @@ def handler(_status): "buffer.map", handler, loop=self._device._loop, - poller=self._device._poll, keepalive=buffer_map_callback, ) + token = self._device._poller.get_token() + # Map it self._map_state = enums.BufferMapState.pending # H: WGPUFuture f(WGPUBuffer buffer, WGPUMapMode mode, size_t offset, size_t size, WGPUBufferMapCallbackInfo callbackInfo) @@ -4109,6 +4146,7 @@ def read_texture( def on_submitted_work_done_async(self) -> GPUPromise[None]: @ffi.callback("void(WGPUQueueWorkDoneStatus, void *, void *)") def work_done_callback(status, _userdata1, _userdata2): + token.set_done() if status == lib.WGPUQueueWorkDoneStatus_Success: promise._wgpu_set_input(True) else: @@ -4138,10 +4176,11 @@ def handler(_value): "on_submitted_work_done", handler, loop=self._device._loop, - poller=self._device._poll_wait, keepalive=work_done_callback, ) + token = self._device._poller.get_token() + # H: WGPUFuture f(WGPUQueue queue, WGPUQueueWorkDoneCallbackInfo callbackInfo) libf.wgpuQueueOnSubmittedWorkDone(self._internal, work_done_callback_info) diff --git a/wgpu/backends/wgpu_native/_poller.py b/wgpu/backends/wgpu_native/_poller.py new file mode 100644 index 00000000..6b1beb00 --- /dev/null +++ b/wgpu/backends/wgpu_native/_poller.py @@ -0,0 +1,94 @@ +import threading + + +class PollToken: + """A token for the poller to be obtained via PollThread.get_token(). + + For as long as the token is active (alive and set_done() is not called), + the poll thread will keep polling. + """ + + def __init__(self, id, ids): + self._id = id + self._ids = ids + + def set_done(self): + """Mark the token as done to relief the poll thread of its polling work for this token.""" + self._ids.discard(self._id) + + def is_done(self): + return self._id not in self._ids + + def __del__(self): + self.set_done() + + +class PollThread(threading.Thread): + """A thread to poll the device, but only when needed. + + The thread actively poll when there is stuff waiting. Compared to polling + periodically, this results in less battery drain, while having faster + responses. Inspired by the implementation wgpu-polling in the Servo browser. + + Relevant links: + + * https://github.com/sagudev/servo/blob/main/components/webgpu/poll_thread.rs + * https://github.com/servo/servo/pull/32266 + * https://bugzilla.mozilla.org/show_bug.cgi?id=1870699 + + """ + + def __init__(self, poll_func): + super().__init__() + self._poll_func = poll_func + self._token_ids = set() # note that add and discard are atomic under the GIL + self._token_count = 0 + self._token_id_lock = threading.Lock() + self._event = threading.Event() + self._shutdown = False + self.daemon = True # don't let this thread prevent shutdown + + def get_token(self): + """Awake the poll thread and get a PollToken. + + The thread will keep polling the device until the token's ``set_done()`` + method is called, or it is deleted by the garbage collector. + """ + if self._shutdown: + raise RuntimeError("Cannot use PollThread because it has stopped.") + + with self._token_id_lock: + self._token_count += 1 + token_id = self._token_count + + # First add the token id to the set, then wake the thread. The thread + # will now keep on polling (with block) until the token is done (removed + # from token_ids). + self._token_ids.add(token_id) + token = PollToken(token_id, self._token_ids) + self._event.set() + + return token + + def stop(self): + self._shutdown = True + self._poll_func = lambda _: None + self._token_ids.clear() + self._event.set() + + def run(self): + """The thread logic.""" + # No sleeps, just block waiting for the GPU to finish something, or waiting for a token to be created. + + event = self._event + token_ids = self._token_ids + + while not self._shutdown: + # Wait for token to be created + event.wait() # blocking + event.clear() + # Do one non-blocking call + self._poll_func(False) + # Keep polling until the tokens are all done + while token_ids: + self._poll_func(True) # blocking diff --git a/wgpu/resources/codegen_report.md b/wgpu/resources/codegen_report.md index f87fb411..fc8ee5c9 100644 --- a/wgpu/resources/codegen_report.md +++ b/wgpu/resources/codegen_report.md @@ -21,7 +21,7 @@ * Diffs for GPUQueue: add read_buffer, add read_texture, hide copy_external_image_to_texture * Validated 38 classes, 120 methods, 50 properties ### Patching API for backends/wgpu_native/_api.py -* Validated 38 classes, 115 methods, 0 properties +* Validated 38 classes, 117 methods, 0 properties ## Validating backends/wgpu_native/_api.py * Enum field FeatureName.core-features-and-limits missing in webgpu.h/wgpu.h * Enum field FeatureName.subgroups missing in webgpu.h/wgpu.h @@ -40,6 +40,6 @@ * Enum CanvasAlphaMode missing in webgpu.h/wgpu.h * Enum CanvasToneMappingMode missing in webgpu.h/wgpu.h * Wrote 255 enum mappings and 47 struct-field mappings to wgpu_native/_mappings.py -* Validated 153 C function calls +* Validated 154 C function calls * Not using 68 C functions * Validated 96 C structs