Skip to content

Commit

Permalink
Add parameters to most Deferred in scrapy/core. (#6395)
Browse files Browse the repository at this point in the history
  • Loading branch information
wRAR committed Jun 10, 2024
1 parent ddc98fe commit 1282ddf
Show file tree
Hide file tree
Showing 16 changed files with 236 additions and 131 deletions.
45 changes: 34 additions & 11 deletions scrapy/core/downloader/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
from __future__ import annotations

import random
import warnings
from collections import deque
from datetime import datetime
from time import time
from typing import TYPE_CHECKING, Any, Deque, Dict, Optional, Set, Tuple, cast
from typing import (
TYPE_CHECKING,
Any,
Deque,
Dict,
Optional,
Set,
Tuple,
TypeVar,
Union,
cast,
)

from twisted.internet import task
from twisted.internet.defer import Deferred
Expand All @@ -22,6 +35,8 @@
if TYPE_CHECKING:
from scrapy.crawler import Crawler

_T = TypeVar("_T")


class Slot:
"""Downloader slot"""
Expand All @@ -40,7 +55,7 @@ def __init__(
self.throttle = throttle

self.active: Set[Request] = set()
self.queue: Deque[Tuple[Request, Deferred]] = deque()
self.queue: Deque[Tuple[Request, Deferred[Response]]] = deque()
self.transferring: Set[Request] = set()
self.lastseen: float = 0
self.latercall = None
Expand Down Expand Up @@ -93,7 +108,7 @@ def _get_concurrency_delay(
class Downloader:
DOWNLOAD_SLOT = "download_slot"

def __init__(self, crawler: "Crawler"):
def __init__(self, crawler: Crawler):
self.settings: BaseSettings = crawler.settings
self.signals: SignalManager = crawler.signals
self.slots: Dict[str, Slot] = {}
Expand All @@ -114,13 +129,17 @@ def __init__(self, crawler: "Crawler"):
"DOWNLOAD_SLOTS", {}
)

def fetch(self, request: Request, spider: Spider) -> Deferred:
def _deactivate(response: Response) -> Response:
def fetch(
self, request: Request, spider: Spider
) -> Deferred[Union[Response, Request]]:
def _deactivate(response: _T) -> _T:
self.active.remove(request)
return response

self.active.add(request)
dfd = self.middleware.download(self._enqueue_request, request, spider)
dfd: Deferred[Union[Response, Request]] = self.middleware.download(
self._enqueue_request, request, spider
)
return dfd.addBoth(_deactivate)

def needs_backout(self) -> bool:
Expand Down Expand Up @@ -163,7 +182,7 @@ def _get_slot_key(self, request: Request, spider: Optional[Spider]) -> str:
)
return self.get_slot_key(request)

def _enqueue_request(self, request: Request, spider: Spider) -> Deferred:
def _enqueue_request(self, request: Request, spider: Spider) -> Deferred[Response]:
key, slot = self._get_slot(request, spider)
request.meta[self.DOWNLOAD_SLOT] = key

Expand All @@ -175,7 +194,7 @@ def _deactivate(response: Response) -> Response:
self.signals.send_catch_log(
signal=signals.request_reached_downloader, request=request, spider=spider
)
deferred: Deferred = Deferred().addBoth(_deactivate)
deferred: Deferred[Response] = Deferred().addBoth(_deactivate)
slot.queue.append((request, deferred))
self._process_queue(spider, slot)
return deferred
Expand Down Expand Up @@ -208,11 +227,15 @@ def _process_queue(self, spider: Spider, slot: Slot) -> None:
self._process_queue(spider, slot)
break

def _download(self, slot: Slot, request: Request, spider: Spider) -> Deferred:
def _download(
self, slot: Slot, request: Request, spider: Spider
) -> Deferred[Response]:
# The order is very important for the following deferreds. Do not change!

# 1. Create the download deferred
dfd = mustbe_deferred(self.handlers.download_request, request, spider)
dfd: Deferred[Response] = mustbe_deferred(
self.handlers.download_request, request, spider
)

# 2. Notify response_downloaded listeners about the recent download
# before querying queue for next request
Expand All @@ -233,7 +256,7 @@ def _downloaded(response: Response) -> Response:
# middleware itself)
slot.transferring.add(request)

def finish_transferring(_: Any) -> Any:
def finish_transferring(_: _T) -> _T:
slot.transferring.remove(request)
self._process_queue(spider, slot)
self.signals.send_catch_log(
Expand Down
38 changes: 30 additions & 8 deletions scrapy/core/downloader/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,25 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, Union, cast
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generator,
Optional,
Protocol,
Type,
Union,
cast,
)

from twisted.internet import defer
from twisted.internet.defer import Deferred

from scrapy import Request, Spider, signals
from scrapy.exceptions import NotConfigured, NotSupported
from scrapy.http import Response
from scrapy.utils.httpobj import urlparse_cached
from scrapy.utils.misc import build_from_crawler, load_object
from scrapy.utils.python import without_none_values
Expand All @@ -20,13 +32,21 @@
logger = logging.getLogger(__name__)


class DownloadHandlerProtocol(Protocol):
def download_request(
self, request: Request, spider: Spider
) -> Deferred[Response]: ...


class DownloadHandlers:
def __init__(self, crawler: Crawler):
self._crawler: Crawler = crawler
self._schemes: Dict[str, Union[str, Callable[..., Any]]] = (
{}
) # stores acceptable schemes on instancing
self._handlers: Dict[str, Any] = {} # stores instanced handlers for schemes
self._handlers: Dict[str, DownloadHandlerProtocol] = (
{}
) # stores instanced handlers for schemes
self._notconfigured: Dict[str, str] = {} # remembers failed handlers
handlers: Dict[str, Union[str, Callable[..., Any]]] = without_none_values(
cast(
Expand All @@ -40,7 +60,7 @@ def __init__(self, crawler: Crawler):

crawler.signals.connect(self._close, signals.engine_stopped)

def _get_handler(self, scheme: str) -> Any:
def _get_handler(self, scheme: str) -> Optional[DownloadHandlerProtocol]:
"""Lazy-load the downloadhandler for a scheme
only on the first request for that scheme.
"""
Expand All @@ -54,10 +74,12 @@ def _get_handler(self, scheme: str) -> Any:

return self._load_handler(scheme)

def _load_handler(self, scheme: str, skip_lazy: bool = False) -> Any:
def _load_handler(
self, scheme: str, skip_lazy: bool = False
) -> Optional[DownloadHandlerProtocol]:
path = self._schemes[scheme]
try:
dhcls = load_object(path)
dhcls: Type[DownloadHandlerProtocol] = load_object(path)
if skip_lazy and getattr(dhcls, "lazy", True):
return None
dh = build_from_crawler(
Expand All @@ -80,17 +102,17 @@ def _load_handler(self, scheme: str, skip_lazy: bool = False) -> Any:
self._handlers[scheme] = dh
return dh

def download_request(self, request: Request, spider: Spider) -> Deferred:
def download_request(self, request: Request, spider: Spider) -> Deferred[Response]:
scheme = urlparse_cached(request).scheme
handler = self._get_handler(scheme)
if not handler:
raise NotSupported(
f"Unsupported URL scheme '{scheme}': {self._notconfigured[scheme]}"
)
return cast(Deferred, handler.download_request(request, spider))
return handler.download_request(request, spider)

@defer.inlineCallbacks
def _close(self, *_a: Any, **_kw: Any) -> Generator[Deferred, Any, None]:
def _close(self, *_a: Any, **_kw: Any) -> Generator[Deferred[Any], Any, None]:
for dh in self._handlers.values():
if hasattr(dh, "close"):
yield dh.close()
10 changes: 7 additions & 3 deletions scrapy/core/downloader/handlers/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(self, settings: BaseSettings):
def from_crawler(cls, crawler: Crawler) -> Self:
return cls(crawler.settings)

def download_request(self, request: Request, spider: Spider) -> Deferred:
def download_request(self, request: Request, spider: Spider) -> Deferred[Response]:
from twisted.internet import reactor

parsed_url = urlparse_cached(request)
Expand All @@ -103,10 +103,14 @@ def download_request(self, request: Request, spider: Spider) -> Deferred:
creator = ClientCreator(
reactor, FTPClient, user, password, passive=passive_mode
)
dfd: Deferred = creator.connectTCP(parsed_url.hostname, parsed_url.port or 21)
dfd: Deferred[FTPClient] = creator.connectTCP(
parsed_url.hostname, parsed_url.port or 21
)
return dfd.addCallback(self.gotClient, request, unquote(parsed_url.path))

def gotClient(self, client: FTPClient, request: Request, filepath: str) -> Deferred:
def gotClient(
self, client: FTPClient, request: Request, filepath: str
) -> Deferred[Response]:
self.client = client
protocol = ReceivedDataProtocol(request.meta.get("ftp_local_filename"))
d = client.retrieveFile(filepath, protocol)
Expand Down
3 changes: 2 additions & 1 deletion scrapy/core/downloader/handlers/http10.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from scrapy import Request, Spider
from scrapy.crawler import Crawler
from scrapy.http import Response
from scrapy.settings import BaseSettings
from scrapy.utils.misc import build_from_crawler, load_object
from scrapy.utils.python import to_unicode
Expand Down Expand Up @@ -38,7 +39,7 @@ def __init__(self, settings: BaseSettings, crawler: Crawler):
def from_crawler(cls, crawler: Crawler) -> Self:
return cls(crawler.settings, crawler)

def download_request(self, request: Request, spider: Spider) -> Deferred:
def download_request(self, request: Request, spider: Spider) -> Deferred[Response]:
"""Return a deferred for the HTTP download"""
factory = self.HTTPClientFactory(request)
self._connect(factory)
Expand Down

0 comments on commit 1282ddf

Please sign in to comment.