From a9e96f99077865bc2d844ecf0ae174db97ba6d8b Mon Sep 17 00:00:00 2001 From: Andrey Rakhmatullin Date: Sat, 3 Apr 2021 17:40:45 +0500 Subject: [PATCH 1/9] Add typing for middleware and coroutine related code. --- scrapy/core/downloader/middleware.py | 12 +++++++---- scrapy/core/spidermw.py | 31 ++++++++++++++++++---------- scrapy/middleware.py | 11 +++++----- scrapy/utils/asyncgen.py | 5 ++++- scrapy/utils/defer.py | 23 ++++++++++++--------- scrapy/utils/python.py | 7 ++++--- 6 files changed, 55 insertions(+), 34 deletions(-) diff --git a/scrapy/core/downloader/middleware.py b/scrapy/core/downloader/middleware.py index b0e612e43df..441fc9fa6ce 100644 --- a/scrapy/core/downloader/middleware.py +++ b/scrapy/core/downloader/middleware.py @@ -3,8 +3,12 @@ See documentation in docs/topics/downloader-middleware.rst """ +from typing import Callable, Union + from twisted.internet import defer +from twisted.python.failure import Failure +from scrapy import Spider from scrapy.exceptions import _InvalidOutput from scrapy.http import Request, Response from scrapy.middleware import MiddlewareManager @@ -29,9 +33,9 @@ def _add_middleware(self, mw): if hasattr(mw, 'process_exception'): self.methods['process_exception'].appendleft(mw.process_exception) - def download(self, download_func, request, spider): + def download(self, download_func: Callable, request: Request, spider: Spider): @defer.inlineCallbacks - def process_request(request): + def process_request(request: Request): for method in self.methods['process_request']: response = yield deferred_from_coro(method(request=request, spider=spider)) if response is not None and not isinstance(response, (Response, Request)): @@ -45,7 +49,7 @@ def process_request(request): return (yield download_func(request=request, spider=spider)) @defer.inlineCallbacks - def process_response(response): + def process_response(response: Union[Response, Request]): if response is None: raise TypeError("Received None in process_response") elif isinstance(response, Request): @@ -64,7 +68,7 @@ def process_response(response): return response @defer.inlineCallbacks - def process_exception(failure): + def process_exception(failure: Failure): exception = failure.value for method in self.methods['process_exception']: response = yield deferred_from_coro(method(request=request, exception=exception, spider=spider)) diff --git a/scrapy/core/spidermw.py b/scrapy/core/spidermw.py index 289292da7a3..b09adf8e26f 100644 --- a/scrapy/core/spidermw.py +++ b/scrapy/core/spidermw.py @@ -3,25 +3,32 @@ See documentation in docs/topics/spider-middleware.rst """ +from collections.abc import Iterable, AsyncIterable from itertools import islice +from typing import Callable, Union, Any from twisted.python.failure import Failure +from scrapy import Request, Spider from scrapy.exceptions import _InvalidOutput +from scrapy.http import Response from scrapy.middleware import MiddlewareManager from scrapy.utils.conf import build_component_list from scrapy.utils.defer import mustbe_deferred from scrapy.utils.python import MutableChain -def _isiterable(possible_iterator): - return hasattr(possible_iterator, '__iter__') +def _isiterable(o): + return isinstance(o, Iterable) def _fname(f): return f"{f.__self__.__class__.__name__}.{f.__func__.__name__}" +ScrapeFunc = Callable[[Union[Response, Failure], Request, Spider], Any] + + class SpiderMiddlewareManager(MiddlewareManager): component_name = 'spider middleware' @@ -41,7 +48,7 @@ def _add_middleware(self, mw): process_spider_exception = getattr(mw, 'process_spider_exception', None) self.methods['process_spider_exception'].appendleft(process_spider_exception) - def _process_spider_input(self, scrape_func, response, request, spider): + def _process_spider_input(self, scrape_func: ScrapeFunc, response: Response, request: Request, spider: Spider): for method in self.methods['process_spider_input']: try: result = method(response=response, spider=spider) @@ -55,7 +62,8 @@ def _process_spider_input(self, scrape_func, response, request, spider): return scrape_func(Failure(), request, spider) return scrape_func(response, request, spider) - def _evaluate_iterable(self, response, spider, iterable, exception_processor_index, recover_to): + def _evaluate_iterable(self, response: Response, spider: Spider, iterable: Iterable, + exception_processor_index: int, recover_to: MutableChain): try: for r in iterable: yield r @@ -66,7 +74,7 @@ def _evaluate_iterable(self, response, spider, iterable, exception_processor_ind raise recover_to.extend(exception_result) - def _process_spider_exception(self, response, spider, _failure, start_index=0): + def _process_spider_exception(self, response: Response, spider: Spider, _failure: Failure, start_index=0): exception = _failure.value # don't handle _InvalidOutput exception if isinstance(exception, _InvalidOutput): @@ -88,7 +96,8 @@ def _process_spider_exception(self, response, spider, _failure, start_index=0): raise _InvalidOutput(msg) return _failure - def _process_spider_output(self, response, spider, result, start_index=0): + def _process_spider_output(self, response: Response, spider: Spider, + result: Iterable, start_index=0): # items in this iterable do not need to go through the process_spider_output # chain, they went through it already from the process_spider_exception method recovered = MutableChain() @@ -114,21 +123,21 @@ def _process_spider_output(self, response, spider, result, start_index=0): return MutableChain(result, recovered) - def _process_callback_output(self, response, spider, result): + def _process_callback_output(self, response: Response, spider: Spider, result: Iterable): recovered = MutableChain() result = self._evaluate_iterable(response, spider, result, 0, recovered) return MutableChain(self._process_spider_output(response, spider, result), recovered) - def scrape_response(self, scrape_func, response, request, spider): - def process_callback_output(result): + def scrape_response(self, scrape_func: ScrapeFunc, response: Response, request: Request, spider: Spider): + def process_callback_output(result: Iterable): return self._process_callback_output(response, spider, result) - def process_spider_exception(_failure): + def process_spider_exception(_failure: Failure): return self._process_spider_exception(response, spider, _failure) dfd = mustbe_deferred(self._process_spider_input, scrape_func, response, request, spider) dfd.addCallbacks(callback=process_callback_output, errback=process_spider_exception) return dfd - def process_start_requests(self, start_requests, spider): + def process_start_requests(self, start_requests, spider: Spider): return self._process_chain('process_start_requests', start_requests, spider) diff --git a/scrapy/middleware.py b/scrapy/middleware.py index 5040378eaab..c53cfb81459 100644 --- a/scrapy/middleware.py +++ b/scrapy/middleware.py @@ -1,6 +1,7 @@ -from collections import defaultdict, deque import logging import pprint +from collections import defaultdict, deque +from typing import Callable from scrapy.exceptions import NotConfigured from scrapy.utils.misc import create_instance, load_object @@ -16,7 +17,7 @@ class MiddlewareManager: def __init__(self, *middlewares): self.middlewares = middlewares - self.methods = defaultdict(deque) + self.methods: dict[str, deque[Callable]] = defaultdict(deque) for mw in middlewares: self._add_middleware(mw) @@ -58,13 +59,13 @@ def _add_middleware(self, mw): if hasattr(mw, 'close_spider'): self.methods['close_spider'].appendleft(mw.close_spider) - def _process_parallel(self, methodname, obj, *args): + def _process_parallel(self, methodname: str, obj, *args): return process_parallel(self.methods[methodname], obj, *args) - def _process_chain(self, methodname, obj, *args): + def _process_chain(self, methodname: str, obj, *args): return process_chain(self.methods[methodname], obj, *args) - def _process_chain_both(self, cb_methodname, eb_methodname, obj, *args): + def _process_chain_both(self, cb_methodname: str, eb_methodname: str, obj, *args): return process_chain_both(self.methods[cb_methodname], self.methods[eb_methodname], obj, *args) diff --git a/scrapy/utils/asyncgen.py b/scrapy/utils/asyncgen.py index 7f697af5fcc..c290e376ce5 100644 --- a/scrapy/utils/asyncgen.py +++ b/scrapy/utils/asyncgen.py @@ -1,4 +1,7 @@ -async def collect_asyncgen(result): +from collections.abc import AsyncIterable + + +async def collect_asyncgen(result: AsyncIterable): results = [] async for x in result: results.append(x) diff --git a/scrapy/utils/defer.py b/scrapy/utils/defer.py index 6db9cc1177b..c382a00f7e2 100644 --- a/scrapy/utils/defer.py +++ b/scrapy/utils/defer.py @@ -3,16 +3,19 @@ """ import asyncio import inspect +from collections.abc import Coroutine from functools import wraps +from typing import Callable, Iterable, Any from twisted.internet import defer, task from twisted.python import failure +from twisted.python.failure import Failure from scrapy.exceptions import IgnoreRequest from scrapy.utils.reactor import is_asyncio_reactor_installed -def defer_fail(_failure): +def defer_fail(_failure: Failure): """Same as twisted.internet.defer.fail but delay calling errback until next reactor loop @@ -47,7 +50,7 @@ def defer_result(result): return defer_succeed(result) -def mustbe_deferred(f, *args, **kw): +def mustbe_deferred(f: Callable, *args, **kw): """Same as twisted.internet.defer.maybeDeferred, but delay calling callback/errback to next reactor loop """ @@ -64,7 +67,7 @@ def mustbe_deferred(f, *args, **kw): return defer_result(result) -def parallel(iterable, count, callable, *args, **named): +def parallel(iterable: Iterable, count: int, callable: Callable, *args, **named): """Execute a callable over the objects in the given iterable, in parallel, using no more than ``count`` concurrent calls. @@ -75,7 +78,7 @@ def parallel(iterable, count, callable, *args, **named): return defer.DeferredList([coop.coiterate(work) for _ in range(count)]) -def process_chain(callbacks, input, *a, **kw): +def process_chain(callbacks: Iterable[Callable], input, *a, **kw): """Return a Deferred built by chaining the given callbacks""" d = defer.Deferred() for x in callbacks: @@ -84,7 +87,7 @@ def process_chain(callbacks, input, *a, **kw): return d -def process_chain_both(callbacks, errbacks, input, *a, **kw): +def process_chain_both(callbacks: Iterable[Callable], errbacks: Iterable[Callable], input, *a, **kw): """Return a Deferred built by chaining the given callbacks and errbacks""" d = defer.Deferred() for cb, eb in zip(callbacks, errbacks): @@ -100,7 +103,7 @@ def process_chain_both(callbacks, errbacks, input, *a, **kw): return d -def process_parallel(callbacks, input, *a, **kw): +def process_parallel(callbacks: Iterable[Callable], input, *a, **kw): """Return a Deferred with the output of all successful calls to the given callbacks """ @@ -110,7 +113,7 @@ def process_parallel(callbacks, input, *a, **kw): return d -def iter_errback(iterable, errback, *a, **kw): +def iter_errback(iterable: Iterable, errback: Callable, *a, **kw): """Wraps an iterable calling an errback if an error is caught while iterating it. """ @@ -124,7 +127,7 @@ def iter_errback(iterable, errback, *a, **kw): errback(failure.Failure(), *a, **kw) -def deferred_from_coro(o): +def deferred_from_coro(o) -> Any: """Converts a coroutine into a Deferred, or returns the object as is if it isn't a coroutine""" if isinstance(o, defer.Deferred): return o @@ -139,7 +142,7 @@ def deferred_from_coro(o): return o -def deferred_f_from_coro_f(coro_f): +def deferred_f_from_coro_f(coro_f: Callable[..., Coroutine]): """ Converts a coroutine function into a function that returns a Deferred. The coroutine function will be called at the time when the wrapper is called. Wrapper args will be passed to it. @@ -151,7 +154,7 @@ def f(*coro_args, **coro_kwargs): return f -def maybeDeferred_coro(f, *args, **kw): +def maybeDeferred_coro(f: Callable, *args, **kw): """ Copy of defer.maybeDeferred that also converts coroutines to Deferreds. """ try: result = f(*args, **kw) diff --git a/scrapy/utils/python.py b/scrapy/utils/python.py index 5703fd4c3ae..bcc12f24f9c 100644 --- a/scrapy/utils/python.py +++ b/scrapy/utils/python.py @@ -8,6 +8,7 @@ import sys import warnings import weakref +from collections.abc import Iterable from functools import partial, wraps from itertools import chain @@ -335,15 +336,15 @@ def garbage_collect(): gc.collect() -class MutableChain: +class MutableChain(Iterable): """ Thin wrapper around itertools.chain, allowing to add iterables "in-place" """ - def __init__(self, *args): + def __init__(self, *args: Iterable): self.data = chain.from_iterable(args) - def extend(self, *iterables): + def extend(self, *iterables: Iterable): self.data = chain(self.data, chain.from_iterable(iterables)) def __iter__(self): From 414dd1119a7e15c612de7bd0fe560b6ca30b505f Mon Sep 17 00:00:00 2001 From: Andrey Rakhmatullin Date: Sat, 3 Apr 2021 17:54:55 +0500 Subject: [PATCH 2/9] Drop an unused import. --- scrapy/core/spidermw.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrapy/core/spidermw.py b/scrapy/core/spidermw.py index b09adf8e26f..9a5305376f9 100644 --- a/scrapy/core/spidermw.py +++ b/scrapy/core/spidermw.py @@ -3,7 +3,7 @@ See documentation in docs/topics/spider-middleware.rst """ -from collections.abc import Iterable, AsyncIterable +from collections.abc import Iterable from itertools import islice from typing import Callable, Union, Any From 7dc857668f16e8c52ff44662aceb32f93ae3d80e Mon Sep 17 00:00:00 2001 From: Andrey Rakhmatullin Date: Sun, 4 Apr 2021 16:15:33 +0500 Subject: [PATCH 3/9] Also some typing for Scraper. --- scrapy/core/scraper.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/scrapy/core/scraper.py b/scrapy/core/scraper.py index 0d3e3450f1e..4a3eff8881c 100644 --- a/scrapy/core/scraper.py +++ b/scrapy/core/scraper.py @@ -3,12 +3,14 @@ import logging from collections import deque +from collections.abc import Iterable +from typing import Union from itemadapter import is_item from twisted.internet import defer from twisted.python.failure import Failure -from scrapy import signals +from scrapy import signals, Spider from scrapy.core.spidermw import SpiderMiddlewareManager from scrapy.exceptions import CloseSpider, DropItem, IgnoreRequest from scrapy.http import Request, Response @@ -120,7 +122,7 @@ def _scrape_next(self, spider, slot): response, request, deferred = slot.next_response_request_deferred() self._scrape(response, request, spider).chainDeferred(deferred) - def _scrape(self, result, request, spider): + def _scrape(self, result: Union[Response, Failure], request: Request, spider: Spider): """ Handle the downloaded response or failure through the spider callback/errback """ @@ -131,7 +133,7 @@ def _scrape(self, result, request, spider): dfd.addCallback(self.handle_spider_output, request, result, spider) return dfd - def _scrape2(self, result, request, spider): + def _scrape2(self, result: Union[Response, Failure], request: Request, spider: Spider): """ Handle the different cases of request's result been a Response or a Failure """ @@ -141,7 +143,7 @@ def _scrape2(self, result, request, spider): dfd = self.call_spider(result, request, spider) return dfd.addErrback(self._log_download_errors, result, request, spider) - def call_spider(self, result, request, spider): + def call_spider(self, result: Union[Response, Failure], request: Request, spider: Spider): if isinstance(result, Response): if getattr(result, "request", None) is None: result.request = request @@ -156,7 +158,7 @@ def call_spider(self, result, request, spider): dfd.addErrback(request.errback) return dfd.addCallback(iterate_spider_output) - def handle_spider_error(self, _failure, request, response, spider): + def handle_spider_error(self, _failure: Failure, request: Request, response: Response, spider: Spider): exc = _failure.value if isinstance(exc, CloseSpider): self.crawler.engine.close_spider(spider, exc.reason or 'cancelled') @@ -177,7 +179,7 @@ def handle_spider_error(self, _failure, request, response, spider): spider=spider ) - def handle_spider_output(self, result, request, response, spider): + def handle_spider_output(self, result: Iterable, request: Request, response: Response, spider: Spider): if not result: return defer_succeed(None) it = iter_errback(result, self.handle_spider_error, request, response, spider) From 76fa2257ef0280fc82e123457c791254cc2f185e Mon Sep 17 00:00:00 2001 From: Andrey Rakhmatullin Date: Tue, 13 Apr 2021 20:01:18 +0500 Subject: [PATCH 4/9] Add typing also for return values, other small fixes. --- scrapy/core/spidermw.py | 31 +++++++++++++++++-------------- scrapy/middleware.py | 24 ++++++++++++++---------- scrapy/utils/defer.py | 24 ++++++++++++------------ 3 files changed, 43 insertions(+), 36 deletions(-) diff --git a/scrapy/core/spidermw.py b/scrapy/core/spidermw.py index dc0e5809594..05df8c98835 100644 --- a/scrapy/core/spidermw.py +++ b/scrapy/core/spidermw.py @@ -3,10 +3,10 @@ See documentation in docs/topics/spider-middleware.rst """ -from collections.abc import Iterable from itertools import islice -from typing import Callable, Union, Any +from typing import Callable, Union, Any, Generator, Iterable +from twisted.internet.defer import Deferred from twisted.python.failure import Failure from scrapy import Request, Spider @@ -18,11 +18,11 @@ from scrapy.utils.python import MutableChain -def _isiterable(o): - return isinstance(o, Iterable) +ScrapeFunc = Callable[[Union[Response, Failure], Request, Spider], Any] -ScrapeFunc = Callable[[Union[Response, Failure], Request, Spider], Any] +def _isiterable(o) -> bool: + return isinstance(o, Iterable) class SpiderMiddlewareManager(MiddlewareManager): @@ -44,7 +44,8 @@ def _add_middleware(self, mw): process_spider_exception = getattr(mw, 'process_spider_exception', None) self.methods['process_spider_exception'].appendleft(process_spider_exception) - def _process_spider_input(self, scrape_func: ScrapeFunc, response: Response, request: Request, spider: Spider): + def _process_spider_input(self, scrape_func: ScrapeFunc, response: Response, request: Request, + spider: Spider) -> Any: for method in self.methods['process_spider_input']: try: result = method(response=response, spider=spider) @@ -59,7 +60,7 @@ def _process_spider_input(self, scrape_func: ScrapeFunc, response: Response, req return scrape_func(response, request, spider) def _evaluate_iterable(self, response: Response, spider: Spider, iterable: Iterable, - exception_processor_index: int, recover_to: MutableChain): + exception_processor_index: int, recover_to: MutableChain) -> Generator: try: for r in iterable: yield r @@ -70,7 +71,8 @@ def _evaluate_iterable(self, response: Response, spider: Spider, iterable: Itera raise recover_to.extend(exception_result) - def _process_spider_exception(self, response: Response, spider: Spider, _failure: Failure, start_index=0): + def _process_spider_exception(self, response: Response, spider: Spider, _failure: Failure, + start_index: int = 0) -> Union[Failure, MutableChain]: exception = _failure.value # don't handle _InvalidOutput exception if isinstance(exception, _InvalidOutput): @@ -93,7 +95,7 @@ def _process_spider_exception(self, response: Response, spider: Spider, _failure return _failure def _process_spider_output(self, response: Response, spider: Spider, - result: Iterable, start_index=0): + result: Iterable, start_index: int = 0) -> MutableChain: # items in this iterable do not need to go through the process_spider_output # chain, they went through it already from the process_spider_exception method recovered = MutableChain() @@ -119,21 +121,22 @@ def _process_spider_output(self, response: Response, spider: Spider, return MutableChain(result, recovered) - def _process_callback_output(self, response: Response, spider: Spider, result: Iterable): + def _process_callback_output(self, response: Response, spider: Spider, result: Iterable) -> MutableChain: recovered = MutableChain() result = self._evaluate_iterable(response, spider, result, 0, recovered) return MutableChain(self._process_spider_output(response, spider, result), recovered) - def scrape_response(self, scrape_func: ScrapeFunc, response: Response, request: Request, spider: Spider): - def process_callback_output(result: Iterable): + def scrape_response(self, scrape_func: ScrapeFunc, response: Response, request: Request, + spider: Spider) -> Deferred: + def process_callback_output(result: Iterable) -> MutableChain: return self._process_callback_output(response, spider, result) - def process_spider_exception(_failure: Failure): + def process_spider_exception(_failure: Failure) -> Union[Failure, MutableChain]: return self._process_spider_exception(response, spider, _failure) dfd = mustbe_deferred(self._process_spider_input, scrape_func, response, request, spider) dfd.addCallbacks(callback=process_callback_output, errback=process_spider_exception) return dfd - def process_start_requests(self, start_requests, spider: Spider): + def process_start_requests(self, start_requests, spider: Spider) -> Deferred: return self._process_chain('process_start_requests', start_requests, spider) diff --git a/scrapy/middleware.py b/scrapy/middleware.py index c53cfb81459..3f8c1cbf53a 100644 --- a/scrapy/middleware.py +++ b/scrapy/middleware.py @@ -1,9 +1,13 @@ import logging import pprint from collections import defaultdict, deque -from typing import Callable +from typing import Callable, Dict, Deque +from twisted.internet import defer + +from scrapy import Spider from scrapy.exceptions import NotConfigured +from scrapy.settings import Settings from scrapy.utils.misc import create_instance, load_object from scrapy.utils.defer import process_parallel, process_chain, process_chain_both @@ -17,16 +21,16 @@ class MiddlewareManager: def __init__(self, *middlewares): self.middlewares = middlewares - self.methods: dict[str, deque[Callable]] = defaultdict(deque) + self.methods: Dict[str, Deque[Callable]] = defaultdict(deque) for mw in middlewares: self._add_middleware(mw) @classmethod - def _get_mwlist_from_settings(cls, settings): + def _get_mwlist_from_settings(cls, settings: Settings) -> list: raise NotImplementedError @classmethod - def from_settings(cls, settings, crawler=None): + def from_settings(cls, settings: Settings, crawler=None): mwlist = cls._get_mwlist_from_settings(settings) middlewares = [] enabled = [] @@ -53,24 +57,24 @@ def from_settings(cls, settings, crawler=None): def from_crawler(cls, crawler): return cls.from_settings(crawler.settings, crawler) - def _add_middleware(self, mw): + def _add_middleware(self, mw) -> None: if hasattr(mw, 'open_spider'): self.methods['open_spider'].append(mw.open_spider) if hasattr(mw, 'close_spider'): self.methods['close_spider'].appendleft(mw.close_spider) - def _process_parallel(self, methodname: str, obj, *args): + def _process_parallel(self, methodname: str, obj, *args) -> defer.Deferred: return process_parallel(self.methods[methodname], obj, *args) - def _process_chain(self, methodname: str, obj, *args): + def _process_chain(self, methodname: str, obj, *args) -> defer.Deferred: return process_chain(self.methods[methodname], obj, *args) - def _process_chain_both(self, cb_methodname: str, eb_methodname: str, obj, *args): + def _process_chain_both(self, cb_methodname: str, eb_methodname: str, obj, *args) -> defer.Deferred: return process_chain_both(self.methods[cb_methodname], self.methods[eb_methodname], obj, *args) - def open_spider(self, spider): + def open_spider(self, spider: Spider) -> defer.Deferred: return self._process_parallel('open_spider', spider) - def close_spider(self, spider): + def close_spider(self, spider: Spider) -> defer.Deferred: return self._process_parallel('close_spider', spider) diff --git a/scrapy/utils/defer.py b/scrapy/utils/defer.py index c382a00f7e2..095eae94c3c 100644 --- a/scrapy/utils/defer.py +++ b/scrapy/utils/defer.py @@ -5,7 +5,7 @@ import inspect from collections.abc import Coroutine from functools import wraps -from typing import Callable, Iterable, Any +from typing import Callable, Iterable, Any, Generator from twisted.internet import defer, task from twisted.python import failure @@ -15,7 +15,7 @@ from scrapy.utils.reactor import is_asyncio_reactor_installed -def defer_fail(_failure: Failure): +def defer_fail(_failure: Failure) -> defer.Deferred: """Same as twisted.internet.defer.fail but delay calling errback until next reactor loop @@ -28,7 +28,7 @@ def defer_fail(_failure: Failure): return d -def defer_succeed(result): +def defer_succeed(result) -> defer.Deferred: """Same as twisted.internet.defer.succeed but delay calling callback until next reactor loop @@ -41,7 +41,7 @@ def defer_succeed(result): return d -def defer_result(result): +def defer_result(result) -> defer.Deferred: if isinstance(result, defer.Deferred): return result elif isinstance(result, failure.Failure): @@ -50,7 +50,7 @@ def defer_result(result): return defer_succeed(result) -def mustbe_deferred(f: Callable, *args, **kw): +def mustbe_deferred(f: Callable, *args, **kw) -> defer.Deferred: """Same as twisted.internet.defer.maybeDeferred, but delay calling callback/errback to next reactor loop """ @@ -67,7 +67,7 @@ def mustbe_deferred(f: Callable, *args, **kw): return defer_result(result) -def parallel(iterable: Iterable, count: int, callable: Callable, *args, **named): +def parallel(iterable: Iterable, count: int, callable: Callable, *args, **named) -> defer.DeferredList: """Execute a callable over the objects in the given iterable, in parallel, using no more than ``count`` concurrent calls. @@ -78,7 +78,7 @@ def parallel(iterable: Iterable, count: int, callable: Callable, *args, **named) return defer.DeferredList([coop.coiterate(work) for _ in range(count)]) -def process_chain(callbacks: Iterable[Callable], input, *a, **kw): +def process_chain(callbacks: Iterable[Callable], input, *a, **kw) -> defer.Deferred: """Return a Deferred built by chaining the given callbacks""" d = defer.Deferred() for x in callbacks: @@ -87,7 +87,7 @@ def process_chain(callbacks: Iterable[Callable], input, *a, **kw): return d -def process_chain_both(callbacks: Iterable[Callable], errbacks: Iterable[Callable], input, *a, **kw): +def process_chain_both(callbacks: Iterable[Callable], errbacks: Iterable[Callable], input, *a, **kw) -> defer.Deferred: """Return a Deferred built by chaining the given callbacks and errbacks""" d = defer.Deferred() for cb, eb in zip(callbacks, errbacks): @@ -103,7 +103,7 @@ def process_chain_both(callbacks: Iterable[Callable], errbacks: Iterable[Callabl return d -def process_parallel(callbacks: Iterable[Callable], input, *a, **kw): +def process_parallel(callbacks: Iterable[Callable], input, *a, **kw) -> defer.Deferred: """Return a Deferred with the output of all successful calls to the given callbacks """ @@ -113,7 +113,7 @@ def process_parallel(callbacks: Iterable[Callable], input, *a, **kw): return d -def iter_errback(iterable: Iterable, errback: Callable, *a, **kw): +def iter_errback(iterable: Iterable, errback: Callable, *a, **kw) -> Generator: """Wraps an iterable calling an errback if an error is caught while iterating it. """ @@ -142,7 +142,7 @@ def deferred_from_coro(o) -> Any: return o -def deferred_f_from_coro_f(coro_f: Callable[..., Coroutine]): +def deferred_f_from_coro_f(coro_f: Callable[..., Coroutine]) -> Callable: """ Converts a coroutine function into a function that returns a Deferred. The coroutine function will be called at the time when the wrapper is called. Wrapper args will be passed to it. @@ -154,7 +154,7 @@ def f(*coro_args, **coro_kwargs): return f -def maybeDeferred_coro(f: Callable, *args, **kw): +def maybeDeferred_coro(f: Callable, *args, **kw) -> defer.Deferred: """ Copy of defer.maybeDeferred that also converts coroutines to Deferreds. """ try: result = f(*args, **kw) From 335a25675278543d4c85123bbbed99f228b26416 Mon Sep 17 00:00:00 2001 From: Andrey Rahmatullin Date: Tue, 13 Apr 2021 21:05:20 +0500 Subject: [PATCH 5/9] Update scrapy/core/spidermw.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Adrián Chaves --- scrapy/core/spidermw.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrapy/core/spidermw.py b/scrapy/core/spidermw.py index 05df8c98835..7e58521acbd 100644 --- a/scrapy/core/spidermw.py +++ b/scrapy/core/spidermw.py @@ -4,7 +4,7 @@ See documentation in docs/topics/spider-middleware.rst """ from itertools import islice -from typing import Callable, Union, Any, Generator, Iterable +from typing import Any, Callable, Generator, Iterable, Union from twisted.internet.defer import Deferred from twisted.python.failure import Failure From b0e75125749ec1dce14614468bf06cd8e8842649 Mon Sep 17 00:00:00 2001 From: Andrey Rahmatullin Date: Tue, 13 Apr 2021 21:05:25 +0500 Subject: [PATCH 6/9] Update scrapy/middleware.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Adrián Chaves --- scrapy/middleware.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrapy/middleware.py b/scrapy/middleware.py index 3f8c1cbf53a..09768b59df3 100644 --- a/scrapy/middleware.py +++ b/scrapy/middleware.py @@ -1,7 +1,7 @@ import logging import pprint from collections import defaultdict, deque -from typing import Callable, Dict, Deque +from typing import Callable, Deque, Dict from twisted.internet import defer From a8de04c823f5e10d012aaa2dd94a4f5a9f70b119 Mon Sep 17 00:00:00 2001 From: Andrey Rahmatullin Date: Tue, 13 Apr 2021 21:05:30 +0500 Subject: [PATCH 7/9] Update scrapy/utils/defer.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Adrián Chaves --- scrapy/utils/defer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrapy/utils/defer.py b/scrapy/utils/defer.py index 095eae94c3c..e1139b1d13c 100644 --- a/scrapy/utils/defer.py +++ b/scrapy/utils/defer.py @@ -5,7 +5,7 @@ import inspect from collections.abc import Coroutine from functools import wraps -from typing import Callable, Iterable, Any, Generator +from typing import Any, Callable, Generator, Iterable from twisted.internet import defer, task from twisted.python import failure From cef0a8b3d653d847efe32dfc2850e5992b627408 Mon Sep 17 00:00:00 2001 From: Andrey Rakhmatullin Date: Tue, 13 Apr 2021 21:07:07 +0500 Subject: [PATCH 8/9] Import Deferred directly in scrapy/middleware.py. --- scrapy/middleware.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/scrapy/middleware.py b/scrapy/middleware.py index 09768b59df3..bbec3808675 100644 --- a/scrapy/middleware.py +++ b/scrapy/middleware.py @@ -3,7 +3,7 @@ from collections import defaultdict, deque from typing import Callable, Deque, Dict -from twisted.internet import defer +from twisted.internet.defer import Deferred from scrapy import Spider from scrapy.exceptions import NotConfigured @@ -63,18 +63,18 @@ def _add_middleware(self, mw) -> None: if hasattr(mw, 'close_spider'): self.methods['close_spider'].appendleft(mw.close_spider) - def _process_parallel(self, methodname: str, obj, *args) -> defer.Deferred: + def _process_parallel(self, methodname: str, obj, *args) -> Deferred: return process_parallel(self.methods[methodname], obj, *args) - def _process_chain(self, methodname: str, obj, *args) -> defer.Deferred: + def _process_chain(self, methodname: str, obj, *args) -> Deferred: return process_chain(self.methods[methodname], obj, *args) - def _process_chain_both(self, cb_methodname: str, eb_methodname: str, obj, *args) -> defer.Deferred: + def _process_chain_both(self, cb_methodname: str, eb_methodname: str, obj, *args) -> Deferred: return process_chain_both(self.methods[cb_methodname], self.methods[eb_methodname], obj, *args) - def open_spider(self, spider: Spider) -> defer.Deferred: + def open_spider(self, spider: Spider) -> Deferred: return self._process_parallel('open_spider', spider) - def close_spider(self, spider: Spider) -> defer.Deferred: + def close_spider(self, spider: Spider) -> Deferred: return self._process_parallel('close_spider', spider) From 08e4eaf97369ba6daa4b5d84e00fd4d36b78e00a Mon Sep 17 00:00:00 2001 From: Andrey Rakhmatullin Date: Tue, 13 Apr 2021 22:41:01 +0500 Subject: [PATCH 9/9] Import Deferred directly in scrapy/utils/defer.py. --- scrapy/utils/defer.py | 48 ++++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/scrapy/utils/defer.py b/scrapy/utils/defer.py index e1139b1d13c..b317c12a346 100644 --- a/scrapy/utils/defer.py +++ b/scrapy/utils/defer.py @@ -7,7 +7,9 @@ from functools import wraps from typing import Any, Callable, Generator, Iterable -from twisted.internet import defer, task +from twisted.internet import defer +from twisted.internet.defer import Deferred, DeferredList, ensureDeferred +from twisted.internet.task import Cooperator from twisted.python import failure from twisted.python.failure import Failure @@ -15,7 +17,7 @@ from scrapy.utils.reactor import is_asyncio_reactor_installed -def defer_fail(_failure: Failure) -> defer.Deferred: +def defer_fail(_failure: Failure) -> Deferred: """Same as twisted.internet.defer.fail but delay calling errback until next reactor loop @@ -23,12 +25,12 @@ def defer_fail(_failure: Failure) -> defer.Deferred: before attending pending delayed calls, so do not set delay to zero. """ from twisted.internet import reactor - d = defer.Deferred() + d = Deferred() reactor.callLater(0.1, d.errback, _failure) return d -def defer_succeed(result) -> defer.Deferred: +def defer_succeed(result) -> Deferred: """Same as twisted.internet.defer.succeed but delay calling callback until next reactor loop @@ -36,13 +38,13 @@ def defer_succeed(result) -> defer.Deferred: before attending pending delayed calls, so do not set delay to zero. """ from twisted.internet import reactor - d = defer.Deferred() + d = Deferred() reactor.callLater(0.1, d.callback, result) return d -def defer_result(result) -> defer.Deferred: - if isinstance(result, defer.Deferred): +def defer_result(result) -> Deferred: + if isinstance(result, Deferred): return result elif isinstance(result, failure.Failure): return defer_fail(result) @@ -50,7 +52,7 @@ def defer_result(result) -> defer.Deferred: return defer_succeed(result) -def mustbe_deferred(f: Callable, *args, **kw) -> defer.Deferred: +def mustbe_deferred(f: Callable, *args, **kw) -> Deferred: """Same as twisted.internet.defer.maybeDeferred, but delay calling callback/errback to next reactor loop """ @@ -67,29 +69,29 @@ def mustbe_deferred(f: Callable, *args, **kw) -> defer.Deferred: return defer_result(result) -def parallel(iterable: Iterable, count: int, callable: Callable, *args, **named) -> defer.DeferredList: +def parallel(iterable: Iterable, count: int, callable: Callable, *args, **named) -> DeferredList: """Execute a callable over the objects in the given iterable, in parallel, using no more than ``count`` concurrent calls. Taken from: https://jcalderone.livejournal.com/24285.html """ - coop = task.Cooperator() + coop = Cooperator() work = (callable(elem, *args, **named) for elem in iterable) - return defer.DeferredList([coop.coiterate(work) for _ in range(count)]) + return DeferredList([coop.coiterate(work) for _ in range(count)]) -def process_chain(callbacks: Iterable[Callable], input, *a, **kw) -> defer.Deferred: +def process_chain(callbacks: Iterable[Callable], input, *a, **kw) -> Deferred: """Return a Deferred built by chaining the given callbacks""" - d = defer.Deferred() + d = Deferred() for x in callbacks: d.addCallback(x, *a, **kw) d.callback(input) return d -def process_chain_both(callbacks: Iterable[Callable], errbacks: Iterable[Callable], input, *a, **kw) -> defer.Deferred: +def process_chain_both(callbacks: Iterable[Callable], errbacks: Iterable[Callable], input, *a, **kw) -> Deferred: """Return a Deferred built by chaining the given callbacks and errbacks""" - d = defer.Deferred() + d = Deferred() for cb, eb in zip(callbacks, errbacks): d.addCallbacks( callback=cb, errback=eb, @@ -103,12 +105,12 @@ def process_chain_both(callbacks: Iterable[Callable], errbacks: Iterable[Callabl return d -def process_parallel(callbacks: Iterable[Callable], input, *a, **kw) -> defer.Deferred: +def process_parallel(callbacks: Iterable[Callable], input, *a, **kw) -> Deferred: """Return a Deferred with the output of all successful calls to the given callbacks """ dfds = [defer.succeed(input).addCallback(x, *a, **kw) for x in callbacks] - d = defer.DeferredList(dfds, fireOnOneErrback=True, consumeErrors=True) + d = DeferredList(dfds, fireOnOneErrback=True, consumeErrors=True) d.addCallbacks(lambda r: [x[1] for x in r], lambda f: f.value.subFailure) return d @@ -129,16 +131,16 @@ def iter_errback(iterable: Iterable, errback: Callable, *a, **kw) -> Generator: def deferred_from_coro(o) -> Any: """Converts a coroutine into a Deferred, or returns the object as is if it isn't a coroutine""" - if isinstance(o, defer.Deferred): + if isinstance(o, Deferred): return o if asyncio.isfuture(o) or inspect.isawaitable(o): if not is_asyncio_reactor_installed(): # wrapping the coroutine directly into a Deferred, this doesn't work correctly with coroutines # that use asyncio, e.g. "await asyncio.sleep(1)" - return defer.ensureDeferred(o) + return ensureDeferred(o) else: # wrapping the coroutine into a Future and then into a Deferred, this requires AsyncioSelectorReactor - return defer.Deferred.fromFuture(asyncio.ensure_future(o)) + return Deferred.fromFuture(asyncio.ensure_future(o)) return o @@ -154,14 +156,14 @@ def f(*coro_args, **coro_kwargs): return f -def maybeDeferred_coro(f: Callable, *args, **kw) -> defer.Deferred: +def maybeDeferred_coro(f: Callable, *args, **kw) -> Deferred: """ Copy of defer.maybeDeferred that also converts coroutines to Deferreds. """ try: result = f(*args, **kw) except: # noqa: E722 - return defer.fail(failure.Failure(captureVars=defer.Deferred.debug)) + return defer.fail(failure.Failure(captureVars=Deferred.debug)) - if isinstance(result, defer.Deferred): + if isinstance(result, Deferred): return result elif asyncio.isfuture(result) or inspect.isawaitable(result): return deferred_from_coro(result)