From 1282ddf8f77299edf613679c2ee0b606e96808ce Mon Sep 17 00:00:00 2001 From: Andrey Rakhmatullin Date: Mon, 10 Jun 2024 13:27:50 +0500 Subject: [PATCH] Add parameters to most Deferred in scrapy/core. (#6395) --- scrapy/core/downloader/__init__.py | 45 ++++++++++---- scrapy/core/downloader/handlers/__init__.py | 38 +++++++++--- scrapy/core/downloader/handlers/ftp.py | 10 ++- scrapy/core/downloader/handlers/http10.py | 3 +- scrapy/core/downloader/handlers/http11.py | 69 ++++++++++++--------- scrapy/core/downloader/handlers/http2.py | 4 +- scrapy/core/downloader/handlers/s3.py | 3 +- scrapy/core/downloader/middleware.py | 23 ++++--- scrapy/core/downloader/webclient.py | 6 +- scrapy/core/engine.py | 55 +++++++++------- scrapy/core/scheduler.py | 8 +-- scrapy/core/scraper.py | 59 +++++++++++------- scrapy/core/spidermw.py | 18 +++--- scrapy/pipelines/__init__.py | 4 +- scrapy/utils/defer.py | 20 +++--- tests/test_downloadermiddleware.py | 2 +- 16 files changed, 236 insertions(+), 131 deletions(-) diff --git a/scrapy/core/downloader/__init__.py b/scrapy/core/downloader/__init__.py index 0ab3bdb779b..41f729ed971 100644 --- a/scrapy/core/downloader/__init__.py +++ b/scrapy/core/downloader/__init__.py @@ -1,9 +1,22 @@ +from __future__ import annotations + import random import warnings from collections import deque from datetime import datetime from time import time -from typing import TYPE_CHECKING, Any, Deque, Dict, Optional, Set, Tuple, cast +from typing import ( + TYPE_CHECKING, + Any, + Deque, + Dict, + Optional, + Set, + Tuple, + TypeVar, + Union, + cast, +) from twisted.internet import task from twisted.internet.defer import Deferred @@ -22,6 +35,8 @@ if TYPE_CHECKING: from scrapy.crawler import Crawler +_T = TypeVar("_T") + class Slot: """Downloader slot""" @@ -40,7 +55,7 @@ def __init__( self.throttle = throttle self.active: Set[Request] = set() - self.queue: Deque[Tuple[Request, Deferred]] = deque() + self.queue: Deque[Tuple[Request, Deferred[Response]]] = deque() self.transferring: Set[Request] = set() self.lastseen: float = 0 self.latercall = None @@ -93,7 +108,7 @@ def _get_concurrency_delay( class Downloader: DOWNLOAD_SLOT = "download_slot" - def __init__(self, crawler: "Crawler"): + def __init__(self, crawler: Crawler): self.settings: BaseSettings = crawler.settings self.signals: SignalManager = crawler.signals self.slots: Dict[str, Slot] = {} @@ -114,13 +129,17 @@ def __init__(self, crawler: "Crawler"): "DOWNLOAD_SLOTS", {} ) - def fetch(self, request: Request, spider: Spider) -> Deferred: - def _deactivate(response: Response) -> Response: + def fetch( + self, request: Request, spider: Spider + ) -> Deferred[Union[Response, Request]]: + def _deactivate(response: _T) -> _T: self.active.remove(request) return response self.active.add(request) - dfd = self.middleware.download(self._enqueue_request, request, spider) + dfd: Deferred[Union[Response, Request]] = self.middleware.download( + self._enqueue_request, request, spider + ) return dfd.addBoth(_deactivate) def needs_backout(self) -> bool: @@ -163,7 +182,7 @@ def _get_slot_key(self, request: Request, spider: Optional[Spider]) -> str: ) return self.get_slot_key(request) - def _enqueue_request(self, request: Request, spider: Spider) -> Deferred: + def _enqueue_request(self, request: Request, spider: Spider) -> Deferred[Response]: key, slot = self._get_slot(request, spider) request.meta[self.DOWNLOAD_SLOT] = key @@ -175,7 +194,7 @@ def _deactivate(response: Response) -> Response: self.signals.send_catch_log( signal=signals.request_reached_downloader, request=request, spider=spider ) - deferred: Deferred = Deferred().addBoth(_deactivate) + deferred: Deferred[Response] = Deferred().addBoth(_deactivate) slot.queue.append((request, deferred)) self._process_queue(spider, slot) return deferred @@ -208,11 +227,15 @@ def _process_queue(self, spider: Spider, slot: Slot) -> None: self._process_queue(spider, slot) break - def _download(self, slot: Slot, request: Request, spider: Spider) -> Deferred: + def _download( + self, slot: Slot, request: Request, spider: Spider + ) -> Deferred[Response]: # The order is very important for the following deferreds. Do not change! # 1. Create the download deferred - dfd = mustbe_deferred(self.handlers.download_request, request, spider) + dfd: Deferred[Response] = mustbe_deferred( + self.handlers.download_request, request, spider + ) # 2. Notify response_downloaded listeners about the recent download # before querying queue for next request @@ -233,7 +256,7 @@ def _downloaded(response: Response) -> Response: # middleware itself) slot.transferring.add(request) - def finish_transferring(_: Any) -> Any: + def finish_transferring(_: _T) -> _T: slot.transferring.remove(request) self._process_queue(spider, slot) self.signals.send_catch_log( diff --git a/scrapy/core/downloader/handlers/__init__.py b/scrapy/core/downloader/handlers/__init__.py index 5ec5ef6db1b..ebc4898b56f 100644 --- a/scrapy/core/downloader/handlers/__init__.py +++ b/scrapy/core/downloader/handlers/__init__.py @@ -3,13 +3,25 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, Union, cast +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Generator, + Optional, + Protocol, + Type, + Union, + cast, +) from twisted.internet import defer from twisted.internet.defer import Deferred from scrapy import Request, Spider, signals from scrapy.exceptions import NotConfigured, NotSupported +from scrapy.http import Response from scrapy.utils.httpobj import urlparse_cached from scrapy.utils.misc import build_from_crawler, load_object from scrapy.utils.python import without_none_values @@ -20,13 +32,21 @@ logger = logging.getLogger(__name__) +class DownloadHandlerProtocol(Protocol): + def download_request( + self, request: Request, spider: Spider + ) -> Deferred[Response]: ... + + class DownloadHandlers: def __init__(self, crawler: Crawler): self._crawler: Crawler = crawler self._schemes: Dict[str, Union[str, Callable[..., Any]]] = ( {} ) # stores acceptable schemes on instancing - self._handlers: Dict[str, Any] = {} # stores instanced handlers for schemes + self._handlers: Dict[str, DownloadHandlerProtocol] = ( + {} + ) # stores instanced handlers for schemes self._notconfigured: Dict[str, str] = {} # remembers failed handlers handlers: Dict[str, Union[str, Callable[..., Any]]] = without_none_values( cast( @@ -40,7 +60,7 @@ def __init__(self, crawler: Crawler): crawler.signals.connect(self._close, signals.engine_stopped) - def _get_handler(self, scheme: str) -> Any: + def _get_handler(self, scheme: str) -> Optional[DownloadHandlerProtocol]: """Lazy-load the downloadhandler for a scheme only on the first request for that scheme. """ @@ -54,10 +74,12 @@ def _get_handler(self, scheme: str) -> Any: return self._load_handler(scheme) - def _load_handler(self, scheme: str, skip_lazy: bool = False) -> Any: + def _load_handler( + self, scheme: str, skip_lazy: bool = False + ) -> Optional[DownloadHandlerProtocol]: path = self._schemes[scheme] try: - dhcls = load_object(path) + dhcls: Type[DownloadHandlerProtocol] = load_object(path) if skip_lazy and getattr(dhcls, "lazy", True): return None dh = build_from_crawler( @@ -80,17 +102,17 @@ def _load_handler(self, scheme: str, skip_lazy: bool = False) -> Any: self._handlers[scheme] = dh return dh - def download_request(self, request: Request, spider: Spider) -> Deferred: + def download_request(self, request: Request, spider: Spider) -> Deferred[Response]: scheme = urlparse_cached(request).scheme handler = self._get_handler(scheme) if not handler: raise NotSupported( f"Unsupported URL scheme '{scheme}': {self._notconfigured[scheme]}" ) - return cast(Deferred, handler.download_request(request, spider)) + return handler.download_request(request, spider) @defer.inlineCallbacks - def _close(self, *_a: Any, **_kw: Any) -> Generator[Deferred, Any, None]: + def _close(self, *_a: Any, **_kw: Any) -> Generator[Deferred[Any], Any, None]: for dh in self._handlers.values(): if hasattr(dh, "close"): yield dh.close() diff --git a/scrapy/core/downloader/handlers/ftp.py b/scrapy/core/downloader/handlers/ftp.py index 77dcf3c38aa..724717ffd77 100644 --- a/scrapy/core/downloader/handlers/ftp.py +++ b/scrapy/core/downloader/handlers/ftp.py @@ -91,7 +91,7 @@ def __init__(self, settings: BaseSettings): def from_crawler(cls, crawler: Crawler) -> Self: return cls(crawler.settings) - def download_request(self, request: Request, spider: Spider) -> Deferred: + def download_request(self, request: Request, spider: Spider) -> Deferred[Response]: from twisted.internet import reactor parsed_url = urlparse_cached(request) @@ -103,10 +103,14 @@ def download_request(self, request: Request, spider: Spider) -> Deferred: creator = ClientCreator( reactor, FTPClient, user, password, passive=passive_mode ) - dfd: Deferred = creator.connectTCP(parsed_url.hostname, parsed_url.port or 21) + dfd: Deferred[FTPClient] = creator.connectTCP( + parsed_url.hostname, parsed_url.port or 21 + ) return dfd.addCallback(self.gotClient, request, unquote(parsed_url.path)) - def gotClient(self, client: FTPClient, request: Request, filepath: str) -> Deferred: + def gotClient( + self, client: FTPClient, request: Request, filepath: str + ) -> Deferred[Response]: self.client = client protocol = ReceivedDataProtocol(request.meta.get("ftp_local_filename")) d = client.retrieveFile(filepath, protocol) diff --git a/scrapy/core/downloader/handlers/http10.py b/scrapy/core/downloader/handlers/http10.py index da95595254b..3c4e48abb2c 100644 --- a/scrapy/core/downloader/handlers/http10.py +++ b/scrapy/core/downloader/handlers/http10.py @@ -9,6 +9,7 @@ from scrapy import Request, Spider from scrapy.crawler import Crawler +from scrapy.http import Response from scrapy.settings import BaseSettings from scrapy.utils.misc import build_from_crawler, load_object from scrapy.utils.python import to_unicode @@ -38,7 +39,7 @@ def __init__(self, settings: BaseSettings, crawler: Crawler): def from_crawler(cls, crawler: Crawler) -> Self: return cls(crawler.settings, crawler) - def download_request(self, request: Request, spider: Spider) -> Deferred: + def download_request(self, request: Request, spider: Spider) -> Deferred[Response]: """Return a deferred for the HTTP download""" factory = self.HTTPClientFactory(request) self._connect(factory) diff --git a/scrapy/core/downloader/handlers/http11.py b/scrapy/core/downloader/handlers/http11.py index 5e84be6ba51..e2ad8f59a76 100644 --- a/scrapy/core/downloader/handlers/http11.py +++ b/scrapy/core/downloader/handlers/http11.py @@ -8,7 +8,7 @@ from contextlib import suppress from io import BytesIO from time import time -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, List, Optional, Tuple, TypedDict, TypeVar, Union from urllib.parse import urldefrag, urlunparse from twisted.internet import ssl @@ -38,12 +38,22 @@ from scrapy.utils.python import to_bytes, to_unicode if TYPE_CHECKING: - # typing.Self requires Python 3.11 - from typing_extensions import Self - + # typing.NotRequired and typing.Self require Python 3.11 + from typing_extensions import NotRequired, Self logger = logging.getLogger(__name__) +_T = TypeVar("_T") + + +class _ResultT(TypedDict): + txresponse: TxResponse + body: bytes + flags: Optional[List[str]] + certificate: Optional[ssl.Certificate] + ip_address: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, None] + failure: NotRequired[Optional[Failure]] + class HTTP11DownloadHandler: lazy = False @@ -71,7 +81,7 @@ def __init__(self, settings: BaseSettings, crawler: Crawler): def from_crawler(cls, crawler: Crawler) -> Self: return cls(crawler.settings, crawler) - def download_request(self, request: Request, spider: Spider) -> Deferred: + def download_request(self, request: Request, spider: Spider) -> Deferred[Response]: """Return a deferred for the HTTP download""" agent = ScrapyAgent( contextFactory=self._contextFactory, @@ -83,10 +93,10 @@ def download_request(self, request: Request, spider: Spider) -> Deferred: ) return agent.download_request(request) - def close(self) -> Deferred: + def close(self) -> Deferred[None]: from twisted.internet import reactor - d: Deferred = self._pool.closeCachedConnections() + d: Deferred[None] = self._pool.closeCachedConnections() # closeCachedConnections will hang on network or server issues, so # we'll manually timeout the deferred. # @@ -97,7 +107,7 @@ def close(self) -> Deferred: # issue a callback after `_disconnect_timeout` seconds. delayed_call = reactor.callLater(self._disconnect_timeout, d.callback, []) - def cancel_delayed_call(result: Any) -> Any: + def cancel_delayed_call(result: _T) -> _T: if delayed_call.active(): delayed_call.cancel() return result @@ -137,7 +147,7 @@ def __init__( ): proxyHost, proxyPort, self._proxyAuthHeader = proxyConf super().__init__(reactor, proxyHost, proxyPort, timeout, bindAddress) - self._tunnelReadyDeferred: Deferred = Deferred() + self._tunnelReadyDeferred: Deferred[Protocol] = Deferred() self._tunneledHost: str = host self._tunneledPort: int = port self._contextFactory: IPolicyForHTTPS = contextFactory @@ -198,7 +208,7 @@ def connectFailed(self, reason: Failure) -> None: """Propagates the errback to the appropriate deferred.""" self._tunnelReadyDeferred.errback(reason) - def connect(self, protocolFactory: Factory) -> Deferred: + def connect(self, protocolFactory: Factory) -> Deferred[Protocol]: self._protocolFactory = protocolFactory connectDeferred = super().connect(protocolFactory) connectDeferred.addCallback(self.requestTunnel) @@ -271,7 +281,7 @@ def _requestWithEndpoint( headers: Optional[TxHeaders], bodyProducer: Optional[IBodyProducer], requestPath: bytes, - ) -> Deferred: + ) -> Deferred[TxResponse]: # proxy host and port are required for HTTP pool `key` # otherwise, same remote host connection request could reuse # a cached tunneled connection to a different proxy @@ -310,7 +320,7 @@ def request( uri: bytes, headers: Optional[TxHeaders] = None, bodyProducer: Optional[IBodyProducer] = None, - ) -> Deferred: + ) -> Deferred[TxResponse]: """ Issue a new request via the configured proxy. """ @@ -394,7 +404,7 @@ def _get_agent(self, request: Request, timeout: float) -> Agent: pool=self._pool, ) - def download_request(self, request: Request) -> Deferred: + def download_request(self, request: Request) -> Deferred[Response]: from twisted.internet import reactor timeout = request.meta.get("download_timeout") or self._connectTimeout @@ -411,22 +421,20 @@ def download_request(self, request: Request) -> Deferred: else: bodyproducer = None start_time = time() - d: Deferred = agent.request( + d: Deferred[TxResponse] = agent.request( method, to_bytes(url, encoding="ascii"), headers, bodyproducer ) # set download latency d.addCallback(self._cb_latency, request, start_time) # response body is ready to be consumed - d.addCallback(self._cb_bodyready, request) - d.addCallback(self._cb_bodydone, request, url) + d2: Deferred[_ResultT] = d.addCallback(self._cb_bodyready, request) + d3: Deferred[Response] = d2.addCallback(self._cb_bodydone, request, url) # check download timeout - self._timeout_cl = reactor.callLater(timeout, d.cancel) - d.addBoth(self._cb_timeout, request, url, timeout) - return d + self._timeout_cl = reactor.callLater(timeout, d3.cancel) + d3.addBoth(self._cb_timeout, request, url, timeout) + return d3 - def _cb_timeout( - self, result: Any, request: Request, url: str, timeout: float - ) -> Any: + def _cb_timeout(self, result: _T, request: Request, url: str, timeout: float) -> _T: if self._timeout_cl.active(): self._timeout_cl.cancel() return result @@ -437,7 +445,7 @@ def _cb_timeout( raise TimeoutError(f"Getting {url} took longer than {timeout} seconds.") - def _cb_latency(self, result: Any, request: Request, start_time: float) -> Any: + def _cb_latency(self, result: _T, request: Request, start_time: float) -> _T: request.meta["download_latency"] = time() - start_time return result @@ -451,7 +459,7 @@ def _headers_from_twisted_response(response: TxResponse) -> Headers: def _cb_bodyready( self, txresponse: TxResponse, request: Request - ) -> Union[Dict[str, Any], Deferred]: + ) -> Union[_ResultT, Deferred[_ResultT]]: headers_received_result = self._crawler.signals.send_catch_log( signal=signals.headers_received, headers=self._headers_from_twisted_response(txresponse), @@ -520,7 +528,7 @@ def _cancel(_: Any) -> None: # Abort connection immediately. txresponse._transport._producer.abortConnection() - d: Deferred = Deferred(_cancel) + d: Deferred[_ResultT] = Deferred(_cancel) txresponse.deliverBody( _ResponseReader( finished=d, @@ -539,7 +547,7 @@ def _cancel(_: Any) -> None: return d def _cb_bodydone( - self, result: Dict[str, Any], request: Request, url: str + self, result: _ResultT, request: Request, url: str ) -> Union[Response, Failure]: headers = self._headers_from_twisted_response(result["txresponse"]) respcls = responsetypes.from_args(headers=headers, url=url, body=result["body"]) @@ -559,8 +567,9 @@ def _cb_bodydone( protocol=protocol, ) if result.get("failure"): + assert result["failure"] result["failure"].value.response = response - return cast(Failure, result["failure"]) + return result["failure"] return response @@ -570,7 +579,7 @@ def __init__(self, body: bytes): self.body = body self.length = len(body) - def startProducing(self, consumer: IConsumer) -> Deferred: + def startProducing(self, consumer: IConsumer) -> Deferred[None]: consumer.write(self.body) return succeed(None) @@ -584,7 +593,7 @@ def stopProducing(self) -> None: class _ResponseReader(Protocol): def __init__( self, - finished: Deferred, + finished: Deferred[_ResultT], txresponse: TxResponse, request: Request, maxsize: int, @@ -592,7 +601,7 @@ def __init__( fail_on_dataloss: bool, crawler: Crawler, ): - self._finished: Deferred = finished + self._finished: Deferred[_ResultT] = finished self._txresponse: TxResponse = txresponse self._request: Request = request self._bodybuf: BytesIO = BytesIO() diff --git a/scrapy/core/downloader/handlers/http2.py b/scrapy/core/downloader/handlers/http2.py index 16fc1e3aea8..2ac4eca861b 100644 --- a/scrapy/core/downloader/handlers/http2.py +++ b/scrapy/core/downloader/handlers/http2.py @@ -37,7 +37,7 @@ def __init__(self, settings: Settings, crawler: Crawler): def from_crawler(cls, crawler: Crawler) -> Self: return cls(crawler.settings, crawler) - def download_request(self, request: Request, spider: Spider) -> Deferred: + def download_request(self, request: Request, spider: Spider) -> Deferred[Response]: agent = ScrapyH2Agent( context_factory=self._context_factory, pool=self._pool, @@ -98,7 +98,7 @@ def _get_agent(self, request: Request, timeout: Optional[float]) -> H2Agent: pool=self._pool, ) - def download_request(self, request: Request, spider: Spider) -> Deferred: + def download_request(self, request: Request, spider: Spider) -> Deferred[Response]: from twisted.internet import reactor timeout = request.meta.get("download_timeout") or self._connect_timeout diff --git a/scrapy/core/downloader/handlers/s3.py b/scrapy/core/downloader/handlers/s3.py index 1a3d36f45cb..0ad340721ce 100644 --- a/scrapy/core/downloader/handlers/s3.py +++ b/scrapy/core/downloader/handlers/s3.py @@ -8,6 +8,7 @@ from scrapy.core.downloader.handlers.http import HTTPDownloadHandler from scrapy.crawler import Crawler from scrapy.exceptions import NotConfigured +from scrapy.http import Response from scrapy.settings import BaseSettings from scrapy.utils.boto import is_botocore_available from scrapy.utils.httpobj import urlparse_cached @@ -76,7 +77,7 @@ def __init__( def from_crawler(cls, crawler: Crawler, **kwargs: Any) -> Self: return cls(crawler.settings, crawler=crawler, **kwargs) - def download_request(self, request: Request, spider: Spider) -> Deferred: + def download_request(self, request: Request, spider: Spider) -> Deferred[Response]: p = urlparse_cached(request) scheme = "https" if request.meta.get("is_secure") else "http" bucket = p.hostname diff --git a/scrapy/core/downloader/middleware.py b/scrapy/core/downloader/middleware.py index 52ebe4e22c1..2d8af114f85 100644 --- a/scrapy/core/downloader/middleware.py +++ b/scrapy/core/downloader/middleware.py @@ -4,6 +4,8 @@ See documentation in docs/topics/downloader-middleware.rst """ +from __future__ import annotations + from typing import Any, Callable, Generator, List, Union, cast from twisted.internet.defer import Deferred, inlineCallbacks @@ -34,10 +36,15 @@ def _add_middleware(self, mw: Any) -> None: self.methods["process_exception"].appendleft(mw.process_exception) def download( - self, download_func: Callable, request: Request, spider: Spider - ) -> Deferred: + self, + download_func: Callable[[Request, Spider], Deferred[Response]], + request: Request, + spider: Spider, + ) -> Deferred[Union[Response, Request]]: @inlineCallbacks - def process_request(request: Request) -> Generator[Deferred, Any, Any]: + def process_request( + request: Request, + ) -> Generator[Deferred[Any], Any, Union[Response, Request]]: for method in self.methods["process_request"]: method = cast(Callable, method) response = yield deferred_from_coro( @@ -52,12 +59,12 @@ def process_request(request: Request) -> Generator[Deferred, Any, Any]: ) if response: return response - return (yield download_func(request=request, spider=spider)) + return (yield download_func(request, spider)) @inlineCallbacks def process_response( response: Union[Response, Request] - ) -> Generator[Deferred, Any, Union[Response, Request]]: + ) -> Generator[Deferred[Any], Any, Union[Response, Request]]: if response is None: raise TypeError("Received None in process_response") elif isinstance(response, Request): @@ -80,7 +87,7 @@ def process_response( @inlineCallbacks def process_exception( failure: Failure, - ) -> Generator[Deferred, Any, Union[Failure, Response, Request]]: + ) -> Generator[Deferred[Any], Any, Union[Failure, Response, Request]]: exception = failure.value for method in self.methods["process_exception"]: method = cast(Callable, method) @@ -98,7 +105,9 @@ def process_exception( return response return failure - deferred = mustbe_deferred(process_request, request) + deferred: Deferred[Union[Response, Request]] = mustbe_deferred( + process_request, request + ) deferred.addErrback(process_exception) deferred.addCallback(process_response) return deferred diff --git a/scrapy/core/downloader/webclient.py b/scrapy/core/downloader/webclient.py index bb1f7380588..08a1d7c717a 100644 --- a/scrapy/core/downloader/webclient.py +++ b/scrapy/core/downloader/webclient.py @@ -8,7 +8,7 @@ from twisted.web.http import HTTPClient from scrapy import Request -from scrapy.http import Headers +from scrapy.http import Headers, Response from scrapy.responsetypes import responsetypes from scrapy.utils.httpobj import urlparse_cached from scrapy.utils.python import to_bytes, to_unicode @@ -145,7 +145,7 @@ def __init__(self, request: Request, timeout: float = 180): self.response_headers: Optional[Headers] = None self.timeout: float = request.meta.get("download_timeout") or timeout self.start_time: float = time() - self.deferred: defer.Deferred = defer.Deferred().addCallback( + self.deferred: defer.Deferred[Response] = defer.Deferred().addCallback( self._build_response, request ) @@ -155,7 +155,7 @@ def __init__(self, request: Request, timeout: float = 180): # needed to add the callback _waitForDisconnect. # Specifically this avoids the AttributeError exception when # clientConnectionFailed method is called. - self._disconnectedDeferred: defer.Deferred = defer.Deferred() + self._disconnectedDeferred: defer.Deferred[None] = defer.Deferred() self._set_connection_attributes(request) diff --git a/scrapy/core/engine.py b/scrapy/core/engine.py index dededf99dcb..4ffec78b94f 100644 --- a/scrapy/core/engine.py +++ b/scrapy/core/engine.py @@ -19,6 +19,7 @@ Optional, Set, Type, + TypeVar, Union, cast, ) @@ -43,10 +44,13 @@ if TYPE_CHECKING: from scrapy.core.scheduler import BaseScheduler + from scrapy.core.scraper import _HandleOutputDeferred from scrapy.crawler import Crawler logger = logging.getLogger(__name__) +_T = TypeVar("_T") + class Slot: def __init__( @@ -56,7 +60,7 @@ def __init__( nextcall: CallLaterOnce[None], scheduler: BaseScheduler, ) -> None: - self.closing: Optional[Deferred] = None + self.closing: Optional[Deferred[None]] = None self.inprogress: Set[Request] = set() self.start_requests: Optional[Iterator[Request]] = iter(start_requests) self.close_if_idle: bool = close_if_idle @@ -71,7 +75,7 @@ def remove_request(self, request: Request) -> None: self.inprogress.remove(request) self._maybe_fire_closing() - def close(self) -> Deferred: + def close(self) -> Deferred[None]: self.closing = Deferred() self._maybe_fire_closing() return self.closing @@ -123,20 +127,20 @@ def _get_scheduler_class(self, settings: BaseSettings) -> Type[BaseScheduler]: return scheduler_cls @inlineCallbacks - def start(self) -> Generator[Deferred, Any, None]: + def start(self) -> Generator[Deferred[Any], Any, None]: if self.running: raise RuntimeError("Engine already running") self.start_time = time() yield self.signals.send_catch_log_deferred(signal=signals.engine_started) self.running = True - self._closewait: Deferred = Deferred() + self._closewait: Deferred[None] = Deferred() yield self._closewait - def stop(self) -> Deferred: + def stop(self) -> Deferred[None]: """Gracefully stop the execution engine""" @inlineCallbacks - def _finish_stopping_engine(_: Any) -> Generator[Deferred, Any, None]: + def _finish_stopping_engine(_: Any) -> Generator[Deferred[Any], Any, None]: yield self.signals.send_catch_log_deferred(signal=signals.engine_stopped) self._closewait.callback(None) @@ -151,7 +155,7 @@ def _finish_stopping_engine(_: Any) -> Generator[Deferred, Any, None]: ) return dfd.addBoth(_finish_stopping_engine) - def close(self) -> Deferred: + def close(self) -> Deferred[None]: """ Gracefully close the execution engine. If it has already been started, stop it. In all cases, close the spider and the downloader. @@ -214,7 +218,7 @@ def _needs_backout(self) -> bool: or self.scraper.slot.needs_backout() ) - def _next_request_from_scheduler(self) -> Optional[Deferred]: + def _next_request_from_scheduler(self) -> Optional[Deferred[None]]: assert self.slot is not None # typing assert self.spider is not None # typing @@ -222,7 +226,7 @@ def _next_request_from_scheduler(self) -> Optional[Deferred]: if request is None: return None - d = self._download(request) + d: Deferred[Union[Response, Request]] = self._download(request) d.addBoth(self._handle_downloader_output, request) d.addErrback( lambda f: logger.info( @@ -236,8 +240,8 @@ def _remove_request(_: Any) -> None: assert self.slot self.slot.remove_request(request) - d.addBoth(_remove_request) - d.addErrback( + d2: Deferred[None] = d.addBoth(_remove_request) + d2.addErrback( lambda f: logger.info( "Error while removing request from slot", exc_info=failure_to_exc_info(f), @@ -245,19 +249,19 @@ def _remove_request(_: Any) -> None: ) ) slot = self.slot - d.addBoth(lambda _: slot.nextcall.schedule()) - d.addErrback( + d2.addBoth(lambda _: slot.nextcall.schedule()) + d2.addErrback( lambda f: logger.info( "Error while scheduling new request", exc_info=failure_to_exc_info(f), extra={"spider": self.spider}, ) ) - return d + return d2 def _handle_downloader_output( self, result: Union[Request, Response, Failure], request: Request - ) -> Optional[Deferred]: + ) -> Optional[_HandleOutputDeferred]: assert self.spider is not None # typing if not isinstance(result, (Request, Response, Failure)): @@ -319,20 +323,23 @@ def _schedule_request(self, request: Request, spider: Spider) -> None: signals.request_dropped, request=request, spider=spider ) - def download(self, request: Request) -> Deferred: + def download(self, request: Request) -> Deferred[Response]: """Return a Deferred which fires with a Response as result, only downloader middlewares are applied""" if self.spider is None: raise RuntimeError(f"No open spider to crawl: {request}") - return self._download(request).addBoth(self._downloaded, request) + d: Deferred[Union[Response, Request]] = self._download(request) + # Deferred.addBoth() overloads don't seem to support a Union[_T, Deferred[_T]] return type + d2: Deferred[Response] = d.addBoth(self._downloaded, request) # type: ignore[arg-type] + return d2 def _downloaded( self, result: Union[Response, Request, Failure], request: Request - ) -> Union[Deferred, Response, Failure]: + ) -> Union[Deferred[Response], Response, Failure]: assert self.slot is not None # typing self.slot.remove_request(request) return self.download(result) if isinstance(result, Request) else result - def _download(self, request: Request) -> Deferred: + def _download(self, request: Request) -> Deferred[Union[Response, Request]]: assert self.slot is not None # typing self.slot.add_request(request) @@ -359,13 +366,15 @@ def _on_success(result: Union[Response, Request]) -> Union[Response, Request]: ) return result - def _on_complete(_: Any) -> Any: + def _on_complete(_: _T) -> _T: assert self.slot is not None self.slot.nextcall.schedule() return _ assert self.spider is not None - dwld = self.downloader.fetch(request, self.spider) + dwld: Deferred[Union[Response, Request]] = self.downloader.fetch( + request, self.spider + ) dwld.addCallback(_on_success) dwld.addBoth(_on_complete) return dwld @@ -376,7 +385,7 @@ def open_spider( spider: Spider, start_requests: Iterable[Request] = (), close_if_idle: bool = True, - ) -> Generator[Deferred, Any, None]: + ) -> Generator[Deferred[Any], Any, None]: if self.slot is not None: raise RuntimeError(f"No free spider slot when opening {spider.name!r}") logger.info("Spider opened", extra={"spider": spider}) @@ -422,7 +431,7 @@ def _spider_idle(self) -> None: assert isinstance(ex, CloseSpider) # typing self.close_spider(self.spider, reason=ex.reason) - def close_spider(self, spider: Spider, reason: str = "cancelled") -> Deferred: + def close_spider(self, spider: Spider, reason: str = "cancelled") -> Deferred[None]: """Close (cancel) spider and clear all its outstanding requests""" if self.slot is None: raise RuntimeError("Engine slot not assigned") diff --git a/scrapy/core/scheduler.py b/scrapy/core/scheduler.py index e3b95e977c3..1e586c53ac4 100644 --- a/scrapy/core/scheduler.py +++ b/scrapy/core/scheduler.py @@ -71,7 +71,7 @@ def from_crawler(cls, crawler: Crawler) -> Self: """ return cls() - def open(self, spider: Spider) -> Optional[Deferred]: + def open(self, spider: Spider) -> Optional[Deferred[None]]: """ Called when the spider is opened by the engine. It receives the spider instance as argument and it's useful to execute initialization code. @@ -81,7 +81,7 @@ def open(self, spider: Spider) -> Optional[Deferred]: """ pass - def close(self, reason: str) -> Optional[Deferred]: + def close(self, reason: str) -> Optional[Deferred[None]]: """ Called when the spider is closed by the engine. It receives the reason why the crawl finished as argument and it's useful to execute cleaning code. @@ -216,7 +216,7 @@ def from_crawler(cls, crawler: Crawler) -> Self: def has_pending_requests(self) -> bool: return len(self) > 0 - def open(self, spider: Spider) -> Optional[Deferred]: + def open(self, spider: Spider) -> Optional[Deferred[None]]: """ (1) initialize the memory queue (2) initialize the disk queue if the ``jobdir`` attribute is a valid directory @@ -227,7 +227,7 @@ def open(self, spider: Spider) -> Optional[Deferred]: self.dqs: Optional[ScrapyPriorityQueue] = self._dq() if self.dqdir else None return self.df.open() - def close(self, reason: str) -> Optional[Deferred]: + def close(self, reason: str) -> Optional[Deferred[None]]: """ (1) dump pending requests to disk if there is a disk queue (2) return the result of the dupefilter's ``close`` method diff --git a/scrapy/core/scraper.py b/scrapy/core/scraper.py index 3b7492838e7..8a9e8f68771 100644 --- a/scrapy/core/scraper.py +++ b/scrapy/core/scraper.py @@ -12,6 +12,7 @@ Deque, Generator, Iterable, + Iterator, Optional, Set, Tuple, @@ -33,6 +34,7 @@ from scrapy.pipelines import ItemPipelineManager from scrapy.signalmanager import SignalManager from scrapy.utils.defer import ( + DeferredListResultListT, aiter_errback, defer_fail, defer_succeed, @@ -48,11 +50,16 @@ from scrapy.crawler import Crawler -_T = TypeVar("_T") -QueueTuple = Tuple[Union[Response, Failure], Request, Deferred] +logger = logging.getLogger(__name__) -logger = logging.getLogger(__name__) +_T = TypeVar("_T") +_ParallelResult = DeferredListResultListT[Iterator[Any]] + +if TYPE_CHECKING: + # parameterized Deferreds require Twisted 21.7.0 + _HandleOutputDeferred = Deferred[Union[_ParallelResult, None]] + QueueTuple = Tuple[Union[Response, Failure], Request, _HandleOutputDeferred] class Slot: @@ -66,12 +73,12 @@ def __init__(self, max_active_size: int = 5000000): self.active: Set[Request] = set() self.active_size: int = 0 self.itemproc_size: int = 0 - self.closing: Optional[Deferred] = None + self.closing: Optional[Deferred[Spider]] = None def add_response_request( self, result: Union[Response, Failure], request: Request - ) -> Deferred: - deferred: Deferred = Deferred() + ) -> _HandleOutputDeferred: + deferred: _HandleOutputDeferred = Deferred() self.queue.append((result, request, deferred)) if isinstance(result, Response): self.active_size += max(len(result.body), self.MIN_RESPONSE_SIZE) @@ -117,12 +124,12 @@ def __init__(self, crawler: Crawler) -> None: self.logformatter: LogFormatter = crawler.logformatter @inlineCallbacks - def open_spider(self, spider: Spider) -> Generator[Deferred, Any, None]: + def open_spider(self, spider: Spider) -> Generator[Deferred[Any], Any, None]: """Open the given spider for scraping and allocate resources for it""" self.slot = Slot(self.crawler.settings.getint("SCRAPER_SLOT_MAX_ACTIVE_SIZE")) yield self.itemproc.open_spider(spider) - def close_spider(self, spider: Spider) -> Deferred: + def close_spider(self, spider: Spider) -> Deferred[Spider]: """Close a spider being scraped and release its resources""" if self.slot is None: raise RuntimeError("Scraper slot not assigned") @@ -142,12 +149,12 @@ def _check_if_closing(self, spider: Spider) -> None: def enqueue_scrape( self, result: Union[Response, Failure], request: Request, spider: Spider - ) -> Deferred: + ) -> _HandleOutputDeferred: if self.slot is None: raise RuntimeError("Scraper slot not assigned") dfd = self.slot.add_response_request(result, request) - def finish_scraping(_: Any) -> Any: + def finish_scraping(_: _T) -> _T: assert self.slot is not None self.slot.finish_response(result, request) self._check_if_closing(spider) @@ -174,7 +181,7 @@ def _scrape_next(self, spider: Spider) -> None: def _scrape( self, result: Union[Response, Failure], request: Request, spider: Spider - ) -> Deferred: + ) -> _HandleOutputDeferred: """ Handle the downloaded response or failure through the spider callback/errback """ @@ -182,32 +189,35 @@ def _scrape( raise TypeError( f"Incorrect type: expected Response or Failure, got {type(result)}: {result!r}" ) - dfd = self._scrape2( + dfd: Deferred[Union[Iterable[Any], AsyncIterable[Any]]] = self._scrape2( result, request, spider ) # returns spider's processed output dfd.addErrback(self.handle_spider_error, request, result, spider) - dfd.addCallback( + dfd2: _HandleOutputDeferred = dfd.addCallback( self.handle_spider_output, request, cast(Response, result), spider ) - return dfd + return dfd2 def _scrape2( self, result: Union[Response, Failure], request: Request, spider: Spider - ) -> Deferred: + ) -> Deferred[Union[Iterable[Any], AsyncIterable[Any]]]: """ Handle the different cases of request's result been a Response or a Failure """ if isinstance(result, Response): - return self.spidermw.scrape_response( + # Deferreds are invariant so Mutable*Chain isn't matched to *Iterable + return self.spidermw.scrape_response( # type: ignore[return-value] self.call_spider, result, request, spider ) # else result is a Failure dfd = self.call_spider(result, request, spider) - return dfd.addErrback(self._log_download_errors, result, request, spider) + dfd.addErrback(self._log_download_errors, result, request, spider) + return dfd def call_spider( self, result: Union[Response, Failure], request: Request, spider: Spider - ) -> Deferred: + ) -> Deferred[Union[Iterable[Any], AsyncIterable[Any]]]: + dfd: Deferred[Any] if isinstance(result, Response): if getattr(result, "request", None) is None: result.request = request @@ -225,7 +235,10 @@ def call_spider( if request.errback: warn_on_generator_with_return_value(spider, request.errback) dfd.addErrback(request.errback) - return dfd.addCallback(iterate_spider_output) + dfd2: Deferred[Union[Iterable[Any], AsyncIterable[Any]]] = dfd.addCallback( + iterate_spider_output + ) + return dfd2 def handle_spider_error( self, @@ -262,10 +275,11 @@ def handle_spider_output( request: Request, response: Response, spider: Spider, - ) -> Deferred: + ) -> _HandleOutputDeferred: if not result: return defer_succeed(None) it: Union[Iterable[_T], AsyncIterable[_T]] + dfd: Deferred[_ParallelResult] if isinstance(result, AsyncIterable): it = aiter_errback( result, self.handle_spider_error, request, response, spider @@ -290,11 +304,12 @@ def handle_spider_output( response, spider, ) - return dfd + # returning Deferred[_ParallelResult] instead of Deferred[Union[_ParallelResult, None]] + return dfd # type: ignore[return-value] def _process_spidermw_output( self, output: Any, request: Request, response: Response, spider: Spider - ) -> Optional[Deferred]: + ) -> Optional[Deferred[Any]]: """Process each Request/Item (given in the output parameter) returned from the given spider """ diff --git a/scrapy/core/spidermw.py b/scrapy/core/spidermw.py index 58873f0d971..e792f8ca76c 100644 --- a/scrapy/core/spidermw.py +++ b/scrapy/core/spidermw.py @@ -45,7 +45,9 @@ _T = TypeVar("_T") -ScrapeFunc = Callable[[Union[Response, Failure], Request, Spider], Any] +ScrapeFunc = Callable[ + [Union[Response, Failure], Request, Spider], Union[Iterable[_T], AsyncIterable[_T]] +] def _isiterable(o: Any) -> bool: @@ -80,7 +82,7 @@ def _process_spider_input( response: Response, request: Request, spider: Spider, - ) -> Any: + ) -> Union[Iterable[_T], AsyncIterable[_T]]: for method in self.methods["process_spider_input"]: method = cast(Callable, method) try: @@ -311,7 +313,7 @@ def scrape_response( response: Response, request: Request, spider: Spider, - ) -> Deferred: + ) -> Deferred[Union[MutableChain[_T], MutableAsyncChain[_T]]]: async def process_callback_output( result: Union[Iterable[_T], AsyncIterable[_T]] ) -> Union[MutableChain[_T], MutableAsyncChain[_T]]: @@ -322,12 +324,14 @@ def process_spider_exception( ) -> Union[Failure, MutableChain[_T], MutableAsyncChain[_T]]: return self._process_spider_exception(response, spider, _failure) - dfd: Deferred = mustbe_deferred( + dfd: Deferred[Union[Iterable[_T], AsyncIterable[_T]]] = mustbe_deferred( self._process_spider_input, scrape_func, response, request, spider ) - dfd.addCallback(deferred_f_from_coro_f(process_callback_output)) - dfd.addErrback(process_spider_exception) - return dfd + dfd2: Deferred[Union[MutableChain[_T], MutableAsyncChain[_T]]] = ( + dfd.addCallback(deferred_f_from_coro_f(process_callback_output)) + ) + dfd2.addErrback(process_spider_exception) + return dfd2 def process_start_requests( self, start_requests: Iterable[Request], spider: Spider diff --git a/scrapy/pipelines/__init__.py b/scrapy/pipelines/__init__.py index 0cfbc156f82..21d649e3c8e 100644 --- a/scrapy/pipelines/__init__.py +++ b/scrapy/pipelines/__init__.py @@ -4,6 +4,8 @@ See documentation in docs/item-pipeline.rst """ +from __future__ import annotations + from typing import Any, List from twisted.internet.defer import Deferred @@ -29,5 +31,5 @@ def _add_middleware(self, pipe: Any) -> None: deferred_f_from_coro_f(pipe.process_item) ) - def process_item(self, item: Any, spider: Spider) -> Deferred: + def process_item(self, item: Any, spider: Spider) -> Deferred[Any]: return self._process_chain("process_item", item, spider) diff --git a/scrapy/utils/defer.py b/scrapy/utils/defer.py index f60b7dde839..ddb68c86b66 100644 --- a/scrapy/utils/defer.py +++ b/scrapy/utils/defer.py @@ -46,6 +46,12 @@ _P = ParamSpec("_P") _T = TypeVar("_T") +_T2 = TypeVar("_T2") + +# copied from twisted.internet.defer +_SelfResultT = TypeVar("_SelfResultT") +_DeferredListResultItemT = Tuple[bool, _SelfResultT] +DeferredListResultListT = List[_DeferredListResultItemT[_SelfResultT]] def defer_fail(_failure: Failure) -> Deferred: @@ -62,7 +68,7 @@ def defer_fail(_failure: Failure) -> Deferred: return d -def defer_succeed(result: Any) -> Deferred: +def defer_succeed(result: _T) -> Deferred[_T]: """Same as twisted.internet.defer.succeed but delay calling callback until next reactor loop @@ -128,10 +134,10 @@ def mustbe_deferred( def parallel( iterable: Iterable[_T], count: int, - callable: Callable[Concatenate[_T, _P], Any], + callable: Callable[Concatenate[_T, _P], _T2], *args: _P.args, **named: _P.kwargs, -) -> Deferred: +) -> Deferred[DeferredListResultListT[Iterator[_T2]]]: """Execute a callable over the objects in the given iterable, in parallel, using no more than ``count`` concurrent calls. @@ -191,12 +197,12 @@ class _AsyncCooperatorAdapter(Iterator[Deferred]): def __init__( self, aiterable: AsyncIterable[_T], - callable: Callable[Concatenate[_T, _P], Any], + callable: Callable[Concatenate[_T, _P], _T2], *callable_args: _P.args, **callable_kwargs: _P.kwargs, ): self.aiterator: AsyncIterator[_T] = aiterable.__aiter__() - self.callable: Callable[Concatenate[_T, _P], Any] = callable + self.callable: Callable[Concatenate[_T, _P], _T2] = callable self.callable_args: Tuple[Any, ...] = callable_args self.callable_kwargs: Dict[str, Any] = callable_kwargs self.finished: bool = False @@ -249,10 +255,10 @@ def __next__(self) -> Deferred: def parallel_async( async_iterable: AsyncIterable[_T], count: int, - callable: Callable[Concatenate[_T, _P], Any], + callable: Callable[Concatenate[_T, _P], _T2], *args: _P.args, **named: _P.kwargs, -) -> Deferred: +) -> Deferred[DeferredListResultListT[Iterator[_T2]]]: """Like parallel but for async iterators""" coop = Cooperator() work = _AsyncCooperatorAdapter(async_iterable, callable, *args, **named) diff --git a/tests/test_downloadermiddleware.py b/tests/test_downloadermiddleware.py index 0155c62eb3e..dd3f8ceb9cb 100644 --- a/tests/test_downloadermiddleware.py +++ b/tests/test_downloadermiddleware.py @@ -36,7 +36,7 @@ def _download(self, request, response=None): if not response: response = Response(request.url) - def download_func(**kwargs): + def download_func(request, spider): return response dfd = self.mwman.download(download_func, request, self.spider)