diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 08dbe489..b87c2005 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -41,12 +41,10 @@ repos: entry: yamllint language: python types: [file, yaml] - args: ['-d', "{\ - extends: default,\ - rules: {\ - colons: { max-spaces-after: -1 }\ - }\ - }"] + args: [ + '-d', + "{ extends: default, rules: { colons: { max-spaces-after: -1 } } }", + ] - repo: https://github.com/rhysd/actionlint rev: v1.7.8 hooks: diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 3906d9b5..51611e1c 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,5 +1,7 @@ -- Add argument to Session/Target send_command with_perf to return - timing information about browser write/read. +v1.3.0 +- Change to process group for better killing of multi-process chrome +- Add argument to Session/Target `send_command(..., *, with_perf: bool)` to + return timing information about browser write/read. - Update default chrome from 135.0.7011.0/1418433 to 144.0.7527.0/1544685 - Fix: New chrome takes longer/doesn't populate targets right away, so add a retry loop to populate targets diff --git a/pyproject.toml b/pyproject.toml index f01224a9..0cff85d9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,6 +55,7 @@ dev = [ "pytest-asyncio; python_version < '3.14'", "pytest-asyncio>=1.2.0; python_version >= '3.14'", "pytest-xdist", + "typing-extensions>=4.13.2", ] # uv doens't allow dependency groups to have separate python requirements diff --git a/src/choreographer/_brokers/_async.py b/src/choreographer/_brokers/_async.py index 66db45d7..bd2cd7b6 100644 --- a/src/choreographer/_brokers/_async.py +++ b/src/choreographer/_brokers/_async.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import time import warnings from functools import partial from typing import TYPE_CHECKING @@ -147,7 +146,7 @@ def check_read_loop_error(result: asyncio.Future[Any]) -> None: async def read_loop() -> None: # noqa: PLR0912, PLR0915, C901 loop = asyncio.get_running_loop() fn = partial(self._channel.read_jsons, blocking=True) - responses = await loop.run_in_executor( + responses, perf = await loop.run_in_executor( executor=self._executor, func=fn, ) @@ -222,6 +221,7 @@ async def read_loop() -> None: # noqa: PLR0912, PLR0915, C901 event_session.unsubscribe(query) elif key: + self.read_perfs[key] = perf _logger.debug(f"Have a response with key {key}") if key in self.futures: _logger.debug(f"Found future for key {key}") @@ -232,7 +232,6 @@ async def read_loop() -> None: # noqa: PLR0912, PLR0915, C901 raise RuntimeError(f"Couldn't find a future for key: {key}") if not future.done(): future.set_result(response) - self.read_perfs[key] = time.perf_counter() if len(self.write_perfs) > PERFS_MAX: self.write_perfs = dict( list(self.write_perfs.items())[TRIM_SIZE:], @@ -281,15 +280,14 @@ async def write_json( self.futures[key] = future _logger.debug(f"Created future: {key} {future}") try: - perf_start = time.perf_counter() async with self._write_lock: # this should be a queue not a lock loop = asyncio.get_running_loop() - await loop.run_in_executor( + perf = await loop.run_in_executor( self._executor, self._channel.write_json, obj, ) - self.write_perfs[key] = (perf_start, time.perf_counter()) + self.write_perfs[key] = perf except (_manual_thread_pool.ExecutorClosedError, asyncio.CancelledError) as e: if not future.cancel() or not future.cancelled(): await future # it wasn't canceled, so listen to it before raising diff --git a/src/choreographer/browser_async.py b/src/choreographer/browser_async.py index 8c141ed9..8330f1a6 100644 --- a/src/choreographer/browser_async.py +++ b/src/choreographer/browser_async.py @@ -1,5 +1,7 @@ """Provides the async api: `Browser`, `Tab`.""" +# hello, thank you for visiting + from __future__ import annotations import asyncio @@ -30,7 +32,8 @@ from .browsers._interface_type import BrowserImplInterface from .channels._interface_type import ChannelInterface -_N = MAX_POPULATE_LOOPS = 20 + +MAX_POPULATE_LOOPS = 40 if "CI" in os.environ else 20 _logger = logistro.getLogger(__name__) @@ -172,7 +175,7 @@ def run() -> subprocess.Popen[bytes] | subprocess.Popen[str]: # depends on args await self.populate_targets() await asyncio.sleep(0.1) counter += 1 - if counter == MAX_POPULATE_LOOPS: + if counter >= MAX_POPULATE_LOOPS: break except (BrowserClosedError, BrowserFailedError, asyncio.CancelledError) as e: raise BrowserFailedError( diff --git a/src/choreographer/browsers/chromium.py b/src/choreographer/browsers/chromium.py index 447b3b1d..d3ff25e6 100644 --- a/src/choreographer/browsers/chromium.py +++ b/src/choreographer/browsers/chromium.py @@ -204,6 +204,8 @@ def get_popen_args(self) -> Mapping[str, Any]: if isinstance(self._channel, Pipe): args["stdin"] = self._channel.from_choreo_to_external args["stdout"] = self._channel.from_external_to_choreo + args["start_new_session"] = True + _logger.debug(f"Returning args: {args}") return args diff --git a/src/choreographer/channels/_interface_type.py b/src/choreographer/channels/_interface_type.py index c7482254..ad2dc4c4 100644 --- a/src/choreographer/channels/_interface_type.py +++ b/src/choreographer/channels/_interface_type.py @@ -14,7 +14,7 @@ class ChannelInterface(Protocol): """Defines the basic interface of a channel.""" # Not sure I like the obj type - def write_json(self, obj: Mapping[str, Any]) -> None: + def write_json(self, obj: Mapping[str, Any]) -> tuple[float, float]: ... # """ # Accept an object and send it doesnt the channel serialized. @@ -24,7 +24,11 @@ def write_json(self, obj: Mapping[str, Any]) -> None: # # """ - def read_jsons(self, *, blocking: bool = True) -> Sequence[BrowserResponse]: + def read_jsons( + self, + *, + blocking: bool = True, + ) -> tuple[Sequence[BrowserResponse], float]: ... # """ # Read all available jsons in the channel and returns a list of complete ones. diff --git a/src/choreographer/channels/pipe.py b/src/choreographer/channels/pipe.py index c31751cb..f7f22cf8 100644 --- a/src/choreographer/channels/pipe.py +++ b/src/choreographer/channels/pipe.py @@ -5,6 +5,7 @@ import os import platform import sys +import time import warnings from threading import Lock from typing import TYPE_CHECKING @@ -78,7 +79,7 @@ def open(self) -> None: if not self._open_lock.acquire(blocking=False): raise RuntimeError("Cannot open same pipe twice.") - def write_json(self, obj: Mapping[str, Any]) -> None: + def write_json(self, obj: Mapping[str, Any]) -> tuple[float, float]: """ Send one json down the pipe. @@ -97,6 +98,7 @@ def write_json(self, obj: Mapping[str, Any]) -> None: f"size: {len(encoded_message)}.", ) _logger.debug2(f"Full Message: {encoded_message!r}") + start = time.perf_counter() try: ret = os.write(self._write_to_browser, encoded_message) _logger.debug( @@ -109,12 +111,13 @@ def write_json(self, obj: Mapping[str, Any]) -> None: except OSError as e: self.close() raise ChannelClosedError from e + return (start, time.perf_counter()) def read_jsons( # noqa: PLR0912, PLR0915, C901 branches, complexity self, *, blocking: bool = True, - ) -> Sequence[BrowserResponse]: + ) -> tuple[Sequence[BrowserResponse], float]: """ Read from the pipe and return one or more jsons in a list. @@ -168,7 +171,7 @@ def read_jsons( # noqa: PLR0912, PLR0915, C901 branches, complexity raw_buffer += os.read(self._read_from_browser, 10000) except BlockingIOError: _logger.debug("BlockingIOError") - return jsons + return jsons, time.perf_counter() except OSError as e: _logger.debug("OSError") self.close() @@ -182,7 +185,7 @@ def read_jsons( # noqa: PLR0912, PLR0915, C901 branches, complexity ) _logger.debug2(f"Whole buffer: {raw_buffer!r}") if raw_buffer is None: - return jsons + return jsons, time.perf_counter() decoded_buffer = raw_buffer.decode("utf-8") raw_messages = decoded_buffer.split("\0") _logger.debug(f"Received {len(raw_messages)} raw_messages.") @@ -195,7 +198,7 @@ def read_jsons( # noqa: PLR0912, PLR0915, C901 branches, complexity except: _logger.exception("Error in trying to decode JSON off our read.") raise - return jsons + return jsons, time.perf_counter() def _unblock_fd(self, fd: int) -> None: try: diff --git a/src/choreographer/cli/_cli_utils_no_qa.py b/src/choreographer/cli/_cli_utils_no_qa.py index 0e83427d..c8058d07 100644 --- a/src/choreographer/cli/_cli_utils_no_qa.py +++ b/src/choreographer/cli/_cli_utils_no_qa.py @@ -73,7 +73,7 @@ def diagnose() -> None: b._browser_impl.pre_open() cli = b._browser_impl.get_cli() env = b._browser_impl.get_env() # noqa: F841 - args = b._browser_impl.get_popen_args() + pargs = b._browser_impl.get_popen_args() b._browser_impl.clean() del b print("*** cli:") @@ -86,7 +86,7 @@ def diagnose() -> None: # print(" " * 8 + f"{k}:{v}") print("*** Popen args:") - for k, v in args.items(): + for k, v in pargs.items(): print(" " * 8 + f"{k}:{v}") print("*".center(50, "*")) print("VERSION INFO:".center(50, "*")) diff --git a/src/choreographer/protocol/devtools_async.py b/src/choreographer/protocol/devtools_async.py index 96688a1f..dd2d3a55 100644 --- a/src/choreographer/protocol/devtools_async.py +++ b/src/choreographer/protocol/devtools_async.py @@ -274,7 +274,12 @@ async def send_command( if not self.sessions.values(): raise RuntimeError("Cannot send_command without at least one valid session") session = self.get_session() - return await session.send_command(command, params, with_perf=with_perf) + # so mypy can't handle bool = Literal[True, False] + # so this is suboptimal but it quiets typer + if with_perf: + return await session.send_command(command, params, with_perf=True) + else: + return await session.send_command(command, params, with_perf=False) async def create_session(self) -> Session: """Create a new session on this target.""" diff --git a/src/choreographer/protocol/devtools_async_helpers.py b/src/choreographer/protocol/devtools_async_helpers.py index a85ac8f0..a40e6b04 100644 --- a/src/choreographer/protocol/devtools_async_helpers.py +++ b/src/choreographer/protocol/devtools_async_helpers.py @@ -76,7 +76,7 @@ async def navigate_and_wait( load_future = temp_session.subscribe_once("Page.loadEventFired") try: - async def _freezers(): + async def _freezers() -> None: # If no resolve, will freeze await temp_session.send_command("Page.navigate", params={"url": url}) # Can freeze if resolve bad diff --git a/src/choreographer/pyrightconfig.json b/src/choreographer/pyrightconfig.json index 0102dcd9..263583bc 100644 --- a/src/choreographer/pyrightconfig.json +++ b/src/choreographer/pyrightconfig.json @@ -1,3 +1,3 @@ { - "typeCheckingMode": "strict" + "typeCheckingMode": "strict", } diff --git a/src/choreographer/utils/_kill.py b/src/choreographer/utils/_kill.py index 17d4cfc7..67512d5b 100644 --- a/src/choreographer/utils/_kill.py +++ b/src/choreographer/utils/_kill.py @@ -1,26 +1,43 @@ from __future__ import annotations +import os import platform import subprocess import logistro +if (_system := platform.system()) != "Windows": + import signal + _logger = logistro.getLogger(__name__) def kill(process: subprocess.Popen[bytes] | subprocess.Popen[str]) -> None: - if platform.system() == "Windows": + if _system == "Windows": subprocess.call( # noqa: S603, false positive, input fine ["taskkill", "/F", "/T", "/PID", str(process.pid)], # noqa: S607 windows full path... stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, timeout=6, ) - else: + return + + try: + os.killpg( + process.pid, + signal.SIGTERM, # type: ignore[reportPossiblyUnboundVariable] + ) + except ProcessLookupError: process.terminate() - _logger.debug("Called terminate (a light kill).") + _logger.debug("Called terminate (a light kill).") + try: + process.wait(timeout=6) + except subprocess.TimeoutExpired: + _logger.debug("Calling kill (a heavy kill).") try: - process.wait(timeout=6) - except subprocess.TimeoutExpired: - _logger.debug("Calling kill (a heavy kill).") + os.killpg( + process.pid, + signal.SIGKILL, # type: ignore[reportPossiblyUnboundVariable] + ) + except ProcessLookupError: process.kill() diff --git a/src/choreographer/utils/_manual_thread_pool.py b/src/choreographer/utils/_manual_thread_pool.py index 90fd35b0..0b71f738 100644 --- a/src/choreographer/utils/_manual_thread_pool.py +++ b/src/choreographer/utils/_manual_thread_pool.py @@ -1,9 +1,24 @@ +from __future__ import annotations + import queue import threading from concurrent.futures import Executor, Future +from typing import TYPE_CHECKING import logistro +if TYPE_CHECKING: + from typing import Any, Callable, TypeVar + + try: + from typing import ParamSpec + except ImportError: + from typing_extensions import ParamSpec + + _P = ParamSpec("_P") # Runtime special generic that gives you access to fn sig + _T = TypeVar("_T") + + _logger = logistro.getLogger(__name__) @@ -12,8 +27,22 @@ class ExecutorClosedError(RuntimeError): class ManualThreadExecutor(Executor): - def __init__(self, *, max_workers=2, daemon=True, name="manual-exec"): - self._q = queue.Queue() + def __init__( + self, + *, + max_workers: int = 2, + daemon: bool = True, + name: str = "manual-exec", + ) -> None: + self._q: queue.Queue[ + tuple[ # could be typed more specifically if singleton @ submit() + Callable[..., Any], + Any, + Any, + Future[Any], + ] + | None + ] = queue.Queue() self._stop = False self._threads = [] self.name = name @@ -26,7 +55,7 @@ def __init__(self, *, max_workers=2, daemon=True, name="manual-exec"): t.start() self._threads.append(t) - def _worker(self): + def _worker(self) -> None: while True: item = self._q.get() if item is None: # sentinel @@ -41,22 +70,36 @@ def _worker(self): fut.set_result(res) self._q.task_done() - def submit(self, fn, *args, **kwargs): - fut = Future() + # _T is generic so we can mar + def submit( + self, + fn: Callable[_P, _T], + /, + *args: _P.args, + **kwargs: _P.kwargs, + ) -> Future[_T]: + fut: Future[_T] = Future() if self._stop: fut.set_exception(ExecutorClosedError("Cannot submit tasks.")) return fut self._q.put((fn, args, kwargs, fut)) return fut - def shutdown(self, wait=True, *, cancel_futures=False): # noqa: FBT002 overriding, can't change args + def shutdown( + self, + wait: bool = True, # noqa: FBT001, FBT002 overriding, can't change args + *, + cancel_futures: bool = False, + ) -> None: self._stop = True if cancel_futures: # Drain queue and cancel pending try: while True: - _, _, _, fut = self._q.get_nowait() - fut.cancel() + full = self._q.get_nowait() + if full is not None: + _, _, _, fut = full + fut.cancel() self._q.task_done() except queue.Empty: pass diff --git a/uv.lock b/uv.lock index f288e144..453e2b08 100644 --- a/uv.lock +++ b/uv.lock @@ -58,6 +58,8 @@ dev = [ { name = "pytest-xdist", version = "3.8.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.9'" }, { name = "types-simplejson", version = "3.19.0.20241221", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" }, { name = "types-simplejson", version = "3.20.0.20250822", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.9'" }, + { name = "typing-extensions", version = "4.13.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" }, + { name = "typing-extensions", version = "4.15.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.9'" }, ] [package.metadata] @@ -80,6 +82,7 @@ dev = [ { name = "pytest-asyncio", marker = "python_full_version >= '3.14'", specifier = ">=1.2.0" }, { name = "pytest-xdist" }, { name = "types-simplejson", specifier = ">=3.19.0.20241221" }, + { name = "typing-extensions", specifier = ">=4.13.2" }, ] [[package]]