diff --git a/CI_REQUIREMENTS.txt b/CI_REQUIREMENTS.txt index e27efef..2c2374a 100644 --- a/CI_REQUIREMENTS.txt +++ b/CI_REQUIREMENTS.txt @@ -1,3 +1 @@ typing >= 3.6 ; python_version < "3.8" -gevent >= 1.2, <1.3.0 ; platform_python_implementation == "PyPy" -gevent >= 1.2 ; platform_python_implementation != "PyPy" diff --git a/README.rst b/README.rst index 08f02f0..3c7d071 100644 --- a/README.rst +++ b/README.rst @@ -3,6 +3,9 @@ threaded .. image:: https://travis-ci.org/python-useful-helpers/threaded.svg?branch=master :target: https://travis-ci.org/python-useful-helpers/threaded +.. image:: https://dev.azure.com/python-useful-helpers/threaded/_apis/build/status/python-useful-helpers.threaded?branchName=master + :alt: Azure DevOps builds + :target: https://dev.azure.com/python-useful-helpers/threaded/_build?definitionId=3 .. image:: https://coveralls.io/repos/github/python-useful-helpers/threaded/badge.svg?branch=master :target: https://coveralls.io/github/python-useful-helpers/threaded?branch=master .. image:: https://readthedocs.org/projects/threaded/badge/?version=latest @@ -58,9 +61,6 @@ Decorators: * `AsyncIOTask` - wrap in ``asyncio.Task``. Uses the same API, as `ThreadPooled`. * `asynciotask` is alias for `AsyncIOTask`. -* `GThreadPooled` - wrap function in ``gevent.threadpool.ThreadPool``. -* `gthreadpooled` is alias for `GThreadPooled`. - .. note:: gevent is not in default package requirements. @@ -214,32 +214,6 @@ Usage with loop extraction from call arguments: loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait_for(func(loop), timeout)) -GThreadPooled -------------- -Post function to ``gevent.threadpool.ThreadPool``. - -.. code-block:: python - - threaded.GThreadPooled.configure(max_workers=3) - -.. note:: - - By default, if executor is not configured - it configures with default parameters: ``max_workers=CPU_COUNT * 5`` - -.. note:: - - Instead of standard ThreadPoolExecutor, gevent pool is not re-created during re-configuration. - -Basic usage example: - -.. code-block:: python - - @threaded.GThreadPooled - def func(): - pass - - func().wait() - Testing ======= The main test mechanism for the package `threaded` is using `tox`. @@ -253,6 +227,8 @@ For code checking several CI systems is used in parallel: 2. `coveralls: `_ is used for coverage display. +3. `Azure CI: `_ is used for functional tests on Windows. + CD system ========= `Travis CI: `_ is used for package delivery on PyPI. diff --git a/azure-pipelines.yml b/azure-pipelines.yml index a73bb63..7562b31 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -18,7 +18,6 @@ jobs: python -m pip install --upgrade pip pip install -U setuptools pip install -r requirements.txt - pip install -r CI_REQUIREMENTS.txt displayName: 'Install dependencies' - script: | @@ -35,22 +34,22 @@ jobs: - template: .azure_pipelines/run_tests.yml parameters: {name: 'Python_37', python: '3.7', architecture: 'x64', kind: 'native'} -#- template: .azure_pipelines/run_tests.yml -# parameters: {name: 'Python_34', python: '3.4', architecture: 'x64', kind: 'cython'} -#- template: .azure_pipelines/run_tests.yml -# parameters: {name: 'Python_34', python: '3.4', architecture: 'x86', kind: 'cython'} -#- template: .azure_pipelines/run_tests.yml -# parameters: {name: 'Python_35', python: '3.5', architecture: 'x64', kind: 'cython'} -#- template: .azure_pipelines/run_tests.yml -# parameters: {name: 'Python_35', python: '3.5', architecture: 'x86', kind: 'cython'} -#- template: .azure_pipelines/run_tests.yml -# parameters: {name: 'Python_36', python: '3.6', architecture: 'x64', kind: 'cython'} -#- template: .azure_pipelines/run_tests.yml -# parameters: {name: 'Python_36', python: '3.6', architecture: 'x86', kind: 'cython'} -#- template: .azure_pipelines/run_tests.yml -# parameters: {name: 'Python_37', python: '3.7', architecture: 'x64', kind: 'cython'} -#- template: .azure_pipelines/run_tests.yml -# parameters: {name: 'Python_37', python: '3.7', architecture: 'x86', kind: 'cython'} +- template: .azure_pipelines/run_tests.yml + parameters: {name: 'Python_34', python: '3.4', architecture: 'x64', kind: 'cython'} +- template: .azure_pipelines/run_tests.yml + parameters: {name: 'Python_34', python: '3.4', architecture: 'x86', kind: 'cython'} +- template: .azure_pipelines/run_tests.yml + parameters: {name: 'Python_35', python: '3.5', architecture: 'x64', kind: 'cython'} +- template: .azure_pipelines/run_tests.yml + parameters: {name: 'Python_35', python: '3.5', architecture: 'x86', kind: 'cython'} +- template: .azure_pipelines/run_tests.yml + parameters: {name: 'Python_36', python: '3.6', architecture: 'x64', kind: 'cython'} +- template: .azure_pipelines/run_tests.yml + parameters: {name: 'Python_36', python: '3.6', architecture: 'x86', kind: 'cython'} +- template: .azure_pipelines/run_tests.yml + parameters: {name: 'Python_37', python: '3.7', architecture: 'x64', kind: 'cython'} +- template: .azure_pipelines/run_tests.yml + parameters: {name: 'Python_37', python: '3.7', architecture: 'x86', kind: 'cython'} # #- job: 'Build_and_deploy' # dependsOn: diff --git a/doc/source/gthreadpooled.rst b/doc/source/gthreadpooled.rst deleted file mode 100644 index aa94e2c..0000000 --- a/doc/source/gthreadpooled.rst +++ /dev/null @@ -1,57 +0,0 @@ -.. GThreadPooled, gthreadpooled. - -API: Decorators: `GThreadPooled`, `gthreadpooled`. -================================================== - -.. py:module:: pooled -.. py:currentmodule:: pooled - -.. py:class:: GThreadPooled - - Post function to gevent.threadpool.ThreadPool. - - .. py:method:: __init__(func, ) - - :param func: function to wrap - :type func: typing.Optional[typing.Callable[..., typing.Union[typing.Any, typing.Awaitable]]] - - .. note:: Attributes is read-only - - .. py:attribute:: executor - - ``gevent.threadpool.ThreadPool`` instance. Class-wide. - - .. py:attribute:: _func - - ``typing.Optional[typing.Callable[..., typing.Union[typing.Any, typing.Awaitable]]]`` - Wrapped function. Used for inheritance only. - - .. py:classmethod:: configure(max_workers=None, hub=None) - - Pool executor create and configure. - - :param max_workers: Maximum workers - :type max_workers: typing.Optional[int] - :param hub: Event-loop hub - :type hub: typing.Optional[gevent.hub.Hub] - - .. note:: max_workers=None means `CPU_COUNT * 5`, it's default value. - - .. py:classmethod:: shutdown - - Shutdown executor. - - .. py:method:: __call__(*args, **kwargs) - - Decorator entry point. - - :rtype: typing.Union[gevent.event.AsyncResult, typing.Callable[..., gevent.event.AsyncResult]] - - -.. py:function:: gthreadpooled(func, ) - - Post function to gevent.threadpool.ThreadPool. - - :param func: function to wrap - :type func: typing.Optional[typing.Callable[..., typing.Union[typing.Any, typing.Awaitable]]] - :rtype: typing.Union[GThreadPooled, typing.Callable[..., gevent.event.AsyncResult]] diff --git a/doc/source/index.rst b/doc/source/index.rst index 3db7be8..a9d1436 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -13,7 +13,6 @@ Contents: threadpooled threaded asynciotask - gthreadpooled Indices and tables ================== diff --git a/setup.py b/setup.py index 088d78f..fe8635b 100644 --- a/setup.py +++ b/setup.py @@ -32,10 +32,8 @@ # noinspection PyPackageRequirements from Cython.Build import cythonize - # noinspection PyPackageRequirements - import gevent except ImportError: - gevent = cythonize = None + cythonize = None with open(os.path.join(os.path.dirname(__file__), "threaded", "__init__.py")) as f: @@ -63,7 +61,6 @@ def _extension(modpath: str) -> setuptools.Extension: if "win32" != sys.platform: requires_optimization.append(_extension("threaded.__init__")) - requires_optimization.append(_extension("threaded._gthreadpooled")) # noinspection PyCallingNonCallable ext_modules = ( @@ -73,7 +70,7 @@ def _extension(modpath: str) -> setuptools.Extension: always_allow_keywords=True, binding=True, embedsignature=True, overflowcheck=True, language_level=3 ), ) - if cythonize is not None and "win32" != sys.platform + if cythonize is not None else [] ) @@ -216,7 +213,7 @@ def get_simple_vars_from_src( "Programming Language :: Python :: Implementation :: PyPy", ] -keywords = ["pooling", "multithreading", "threading", "asyncio", "gevent", "development"] +keywords = ["pooling", "multithreading", "threading", "asyncio", "development"] setup_args = dict( name="threaded", @@ -245,7 +242,6 @@ def get_simple_vars_from_src( "setuptools_scm", ], use_scm_version=True, - extras_require={"gevent": ["gevent >= 1.2.2"]}, install_requires=required, package_data={"threaded": ["py.typed"]}, ) diff --git a/test/async_syntax/test_gevent_threadpooled_async.py b/test/async_syntax/test_gevent_threadpooled_async.py deleted file mode 100644 index e7f6f5e..0000000 --- a/test/async_syntax/test_gevent_threadpooled_async.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright 2017 Alexey Stepanov aka penguinolog -## -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import threading -import unittest - -try: - import gevent - import gevent.threadpool -except ImportError: - gevent = None - -import threaded - - -@unittest.skipIf(gevent is None, "No gevent") -class TestThreadPooled(unittest.TestCase): - def tearDown(self): - threaded.GThreadPooled.shutdown() - - def test_thread_pooled_default_async(self): - @threaded.gthreadpooled - async def test(): - return threading.current_thread().name - - pooled_name = test().wait() - self.assertNotEqual(pooled_name, threading.current_thread().name) diff --git a/test/test_gevent_threadpooled.py b/test/test_gevent_threadpooled.py deleted file mode 100644 index 19914d4..0000000 --- a/test/test_gevent_threadpooled.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright 2017 Alexey Stepanov aka penguinolog -## -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import threading -import unittest - -try: - import gevent - import gevent.threadpool -except ImportError: - gevent = None - -import threaded - - -@unittest.skipIf(gevent is None, "No gevent") -class TestGeventThreadPooled(unittest.TestCase): - def tearDown(self): - threaded.GThreadPooled.shutdown() - - def test_thread_pooled_default(self): - @threaded.gthreadpooled - def test(): - return threading.current_thread().name - - pooled_name = test().wait() - self.assertNotEqual(pooled_name, threading.current_thread().name) - - def test_thread_pooled_construct(self): - @threaded.gthreadpooled() - def test(): - return threading.current_thread().name - - pooled_name = test().wait() - self.assertNotEqual(pooled_name, threading.current_thread().name) - - def test_thread_pooled_config(self): - thread_pooled = threaded.gthreadpooled() - thread_pooled.configure() - - self.assertEqual(thread_pooled.executor.maxsize, (os.cpu_count() or 1) * 5) - - thread_pooled.configure(max_workers=2) - - @thread_pooled - def test(): - return threading.current_thread().name - - pooled_name = test().wait() - self.assertNotEqual(pooled_name, threading.current_thread().name) - self.assertEqual(thread_pooled.executor.maxsize, 2) - - def test_reconfigure(self): - """Gevent worker instance is re-used.""" - thread_pooled = threaded.gthreadpooled() - executor = thread_pooled.executor - - thread_pooled.configure(max_workers=executor.maxsize) - self.assertIs(executor, thread_pooled.executor) - - old_size = executor.maxsize - thread_pooled.configure(max_workers=old_size + 1) - self.assertIs(executor, thread_pooled.executor) - self.assertEqual(executor.maxsize, old_size + 1) diff --git a/threaded/__init__.py b/threaded/__init__.py index 96b63c3..c110935 100644 --- a/threaded/__init__.py +++ b/threaded/__init__.py @@ -23,13 +23,6 @@ from ._threaded import Threaded, threaded from ._threadpooled import ThreadPooled, threadpooled - -try: # pragma: no cover - from ._gthreadpooled import GThreadPooled, gthreadpooled -except ImportError: # pragma: no cover - GThreadPooled = gthreadpooled = None # type: ignore -# pylint: enable=no-name-in-module - try: __version__ = pkg_resources.get_distribution(__name__).version except pkg_resources.DistributionNotFound: @@ -51,9 +44,6 @@ "asynciotask", ) # type: typing.Tuple[str, ...] -if GThreadPooled is not None: # pragma: no cover - __all__ += ("GThreadPooled", "gthreadpooled") - __author__ = "Alexey Stepanov" __author_email__ = "penguinolog@gmail.com" __maintainers__ = { diff --git a/threaded/_gthreadpooled.py b/threaded/_gthreadpooled.py deleted file mode 100644 index 88c1ed8..0000000 --- a/threaded/_gthreadpooled.py +++ /dev/null @@ -1,144 +0,0 @@ -# Copyright 2017-2018 Alexey Stepanov aka penguinolog -## -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Python 3 threaded implementation. - -Asyncio is supported -""" - -import functools -import os -import typing - -import gevent.event # type: ignore -import gevent.threadpool # type: ignore - -from . import _base_threaded - -__all__ = ("GThreadPooled", "gthreadpooled") - - -class GThreadPooled(_base_threaded.APIPooled): - """Post function to ThreadPoolExecutor.""" - - __slots__ = () - - __executor = None # type: typing.Optional[gevent.threadpool.ThreadPool] - - @classmethod - def configure( # pylint: disable=arguments-differ - cls: typing.Type["GThreadPooled"], - max_workers: typing.Optional[int] = None, - hub: typing.Optional[gevent.hub.Hub] = None, - ) -> None: - """Pool executor create and configure. - - :param max_workers: Maximum workers - :type max_workers: typing.Optional[int] - :param hub: Event-loop hub - :type hub: typing.Optional[gevent.hub.Hub] - """ - if max_workers is None: - max_workers = os.cpu_count() * 5 # type: ignore - - if isinstance(cls.__executor, gevent.threadpool.ThreadPool): - if hub is None or hub == cls.__executor.hub: - if max_workers == cls.__executor.maxsize: - return # Nothing to change) - cls.__executor.maxsize = max_workers # We can use internals - return - # Hub change. Very special case. - cls.__executor.kill() # pragma: no cover - - cls.__executor = gevent.threadpool.ThreadPool(maxsize=max_workers, hub=hub) - - @classmethod - def shutdown(cls: typing.Type["GThreadPooled"]) -> None: - """Shutdown executor. - - Due to not implemented method, set maxsize to 0 (do not accept new). - """ - if cls.__executor is not None: - cls.__executor.kill() - - @property - def executor(self) -> gevent.threadpool.ThreadPool: - """Executor instance. - - :rtype: gevent.threadpool.ThreadPool - """ - if not isinstance(self.__executor, gevent.threadpool.ThreadPool): - self.configure() - return self.__executor - - def _get_function_wrapper( - self, func: typing.Callable[..., typing.Union["typing.Awaitable", typing.Any]] - ) -> typing.Callable[..., gevent.event.AsyncResult]: - """Here should be constructed and returned real decorator. - - :param func: Wrapped function - :type func: typing.Callable[..., typing.Union[typing.Awaitable, typing.Any] - :return: wrapped coroutine or function - :rtype: typing.Callable[..., gevent.event.AsyncResult] - """ - prepared = self._await_if_required(func) - - # noinspection PyMissingOrEmptyDocstring - @functools.wraps(prepared) # pylint: disable=missing-docstring - def wrapper(*args, **kwargs): # type: (typing.Any, typing.Any) -> gevent.event.AsyncResult - return self.executor.spawn(prepared, *args, **kwargs) - - return wrapper - - def __call__( # pylint: disable=useless-super-delegation - self, - *args: typing.Union[typing.Callable[..., typing.Union["typing.Awaitable", typing.Any]], typing.Any], - **kwargs: typing.Any - ) -> typing.Union[gevent.event.AsyncResult, typing.Callable[..., gevent.event.AsyncResult]]: - """Callable instance.""" - return super(GThreadPooled, self).__call__(*args, **kwargs) - - -# pylint: disable=function-redefined, unused-argument -@typing.overload -def gthreadpooled( - func: typing.Callable[..., typing.Union["typing.Awaitable", typing.Any]] -) -> typing.Callable[..., gevent.event.AsyncResult]: - """Overloaded: func provided.""" - pass # pragma: no cover - - -@typing.overload # noqa: F811 -def gthreadpooled(func: None = None) -> GThreadPooled: - """Overloaded: Func is None.""" - pass # pragma: no cover - - -# pylint: enable=unused-argument -def gthreadpooled( # noqa: F811 - func: typing.Optional[typing.Callable[..., typing.Union["typing.Awaitable", typing.Any]]] = None -) -> typing.Union[GThreadPooled, typing.Callable[..., gevent.event.AsyncResult]]: - """Post function to gevent.threadpool.ThreadPool. - - :param func: function to wrap - :type func: typing.Optional[typing.Callable] - :return: GThreadPooled instance, if called as function or argumented decorator, else callable wrapper - :rtype: typing.Union[GThreadPooled, typing.Callable[..., gevent.event.AsyncResult]] - """ - if func is None: - return GThreadPooled(func=func) - return GThreadPooled(func=None)(func) - - -# pylint: enable=function-redefined