Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion plugboard/_zmq/zmq_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
import zmq.asyncio


try:
from uvloop import run as _asyncio_run
except ImportError: # pragma: no cover
from asyncio import run as _asyncio_run

zmq_sockopts_t: _t.TypeAlias = list[tuple[int, int | bytes | str]]
ZMQ_ADDR: str = r"tcp://127.0.0.1"

Expand Down Expand Up @@ -162,7 +167,7 @@ def _start_proxy(
def _run_process(self) -> None:
"""Entry point for the child process."""
try:
asyncio.run(self._run())
_asyncio_run(self._run())
finally: # pragma: no cover
self._close()

Expand Down
10 changes: 8 additions & 2 deletions plugboard/utils/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
import typing as _t


try:
from uvloop import run as _asyncio_run
except ImportError: # pragma: no cover
from asyncio import run as _asyncio_run


async def gather_except(*coros: _t.Coroutine) -> list[_t.Any]:
"""Attempts to gather the given coroutines, raising any exceptions."""
results = await asyncio.gather(*coros, return_exceptions=True)
Expand All @@ -16,7 +22,7 @@ async def gather_except(*coros: _t.Coroutine) -> list[_t.Any]:

def _run_coro_in_thread(coro: _t.Coroutine, timeout: _t.Optional[float] = None) -> _t.Any:
def _target() -> _t.Any:
return asyncio.run(coro)
return _asyncio_run(coro)

with ThreadPoolExecutor() as pool:
future = pool.submit(_target)
Expand All @@ -34,7 +40,7 @@ def run_coro_sync(coro: _t.Coroutine, timeout: _t.Optional[float] = None) -> _t.
loop = asyncio.get_running_loop()
except RuntimeError: # pragma: no cover
# No loop running in current thread
return asyncio.run(coro)
return _asyncio_run(coro)

if loop.is_running():
# Run coroutine in new thread with dedicated event loop.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies = [
"structlog>=25.1.0,<26",
"that-depends>=3.4.1,<4",
"typer>=0.12,<1",
"uvloop>=0.21.0,<1 ; platform_system != 'Windows'",
]

[project.optional-dependencies]
Expand Down
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Configuration for the test suite."""

from abc import ABC, abstractmethod
from asyncio.events import BaseDefaultEventLoopPolicy
import multiprocessing
import os
import typing as _t
Expand All @@ -11,6 +12,7 @@
import pytest_cases
import ray
from that_depends import ContextScopes, container_context
import uvloop

from plugboard.component import Component, IOController as IO
from plugboard.component.io_controller import IOStreamClosedError
Expand All @@ -19,6 +21,12 @@
from plugboard.utils.settings import Settings


@pytest.fixture(scope="session")
def event_loop_policy() -> BaseDefaultEventLoopPolicy:
"""Set uvloop as the event loop policy for the test session."""
return uvloop.EventLoopPolicy()


@pytest.fixture(scope="session", autouse=True)
def mp_set_start_method() -> None:
"""Set the start method for multiprocessing to 'spawn'."""
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/test_state_backend_multiprocess.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Tests for the `StateBackend` classes that permit multiprocessing."""

import asyncio
import typing as _t

import pytest
import pytest_asyncio
from ray.util.multiprocessing import Pool
import uvloop

from plugboard.component import Component, IOController
from plugboard.connector import Connector, ZMQConnector
Expand Down Expand Up @@ -139,7 +139,7 @@ async def _inner() -> None:
await comp.init()
print("Component initialised.")

asyncio.run(_inner())
uvloop.run(_inner())

# At the end of `Component.init` the component upserts itself into the state
# backend, so we expect the state backend to have up to date component data afterwards
Expand All @@ -161,7 +161,7 @@ def upsert_connector(conn: Connector) -> None:
async def _inner() -> None:
await state_backend.upsert_connector(conn)

asyncio.run(_inner())
uvloop.run(_inner())

mp_processes = []
with Pool(2) as pool:
Expand Down
24 changes: 23 additions & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading