Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameters to most Deferred in scrapy/core. #6395

Merged
merged 3 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions scrapy/core/downloader/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
from __future__ import annotations

import random
import warnings
from collections import deque
from datetime import datetime
from time import time
from typing import TYPE_CHECKING, Any, Deque, Dict, Optional, Set, Tuple, cast
from typing import (
TYPE_CHECKING,
Any,
Deque,
Dict,
Optional,
Set,
Tuple,
TypeVar,
Union,
cast,
)

from twisted.internet import task
from twisted.internet.defer import Deferred
Expand All @@ -22,6 +35,8 @@
if TYPE_CHECKING:
from scrapy.crawler import Crawler

_T = TypeVar("_T")


class Slot:
"""Downloader slot"""
Expand All @@ -40,7 +55,7 @@ def __init__(
self.throttle = throttle

self.active: Set[Request] = set()
self.queue: Deque[Tuple[Request, Deferred]] = deque()
self.queue: Deque[Tuple[Request, Deferred[Response]]] = deque()
self.transferring: Set[Request] = set()
self.lastseen: float = 0
self.latercall = None
Expand Down Expand Up @@ -93,7 +108,7 @@ def _get_concurrency_delay(
class Downloader:
DOWNLOAD_SLOT = "download_slot"

def __init__(self, crawler: "Crawler"):
def __init__(self, crawler: Crawler):
self.settings: BaseSettings = crawler.settings
self.signals: SignalManager = crawler.signals
self.slots: Dict[str, Slot] = {}
Expand All @@ -114,13 +129,17 @@ def __init__(self, crawler: "Crawler"):
"DOWNLOAD_SLOTS", {}
)

def fetch(self, request: Request, spider: Spider) -> Deferred:
def _deactivate(response: Response) -> Response:
def fetch(
self, request: Request, spider: Spider
) -> Deferred[Union[Response, Request]]:
def _deactivate(response: _T) -> _T:
self.active.remove(request)
return response

self.active.add(request)
dfd = self.middleware.download(self._enqueue_request, request, spider)
dfd: Deferred[Union[Response, Request]] = self.middleware.download(
self._enqueue_request, request, spider
)
return dfd.addBoth(_deactivate)

def needs_backout(self) -> bool:
Expand Down Expand Up @@ -163,7 +182,7 @@ def _get_slot_key(self, request: Request, spider: Optional[Spider]) -> str:
)
return self.get_slot_key(request)

def _enqueue_request(self, request: Request, spider: Spider) -> Deferred:
def _enqueue_request(self, request: Request, spider: Spider) -> Deferred[Response]:
key, slot = self._get_slot(request, spider)
request.meta[self.DOWNLOAD_SLOT] = key

Expand All @@ -175,7 +194,7 @@ def _deactivate(response: Response) -> Response:
self.signals.send_catch_log(
signal=signals.request_reached_downloader, request=request, spider=spider
)
deferred: Deferred = Deferred().addBoth(_deactivate)
deferred: Deferred[Response] = Deferred().addBoth(_deactivate)
slot.queue.append((request, deferred))
self._process_queue(spider, slot)
return deferred
Expand Down Expand Up @@ -208,11 +227,15 @@ def _process_queue(self, spider: Spider, slot: Slot) -> None:
self._process_queue(spider, slot)
break

def _download(self, slot: Slot, request: Request, spider: Spider) -> Deferred:
def _download(
self, slot: Slot, request: Request, spider: Spider
) -> Deferred[Response]:
# The order is very important for the following deferreds. Do not change!

# 1. Create the download deferred
dfd = mustbe_deferred(self.handlers.download_request, request, spider)
dfd: Deferred[Response] = mustbe_deferred(
self.handlers.download_request, request, spider
)

# 2. Notify response_downloaded listeners about the recent download
# before querying queue for next request
Expand All @@ -233,7 +256,7 @@ def _downloaded(response: Response) -> Response:
# middleware itself)
slot.transferring.add(request)

def finish_transferring(_: Any) -> Any:
def finish_transferring(_: _T) -> _T:
slot.transferring.remove(request)
self._process_queue(spider, slot)
self.signals.send_catch_log(
Expand Down
38 changes: 30 additions & 8 deletions scrapy/core/downloader/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,25 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, Union, cast
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generator,
Optional,
Protocol,
Type,
Union,
cast,
)

from twisted.internet import defer
from twisted.internet.defer import Deferred

from scrapy import Request, Spider, signals
from scrapy.exceptions import NotConfigured, NotSupported
from scrapy.http import Response
from scrapy.utils.httpobj import urlparse_cached
from scrapy.utils.misc import build_from_crawler, load_object
from scrapy.utils.python import without_none_values
Expand All @@ -20,13 +32,21 @@
logger = logging.getLogger(__name__)


class DownloadHandlerProtocol(Protocol):
def download_request(
self, request: Request, spider: Spider
) -> Deferred[Response]: ...


class DownloadHandlers:
def __init__(self, crawler: Crawler):
self._crawler: Crawler = crawler
self._schemes: Dict[str, Union[str, Callable[..., Any]]] = (
{}
) # stores acceptable schemes on instancing
self._handlers: Dict[str, Any] = {} # stores instanced handlers for schemes
self._handlers: Dict[str, DownloadHandlerProtocol] = (
{}
) # stores instanced handlers for schemes
self._notconfigured: Dict[str, str] = {} # remembers failed handlers
handlers: Dict[str, Union[str, Callable[..., Any]]] = without_none_values(
cast(
Expand All @@ -40,7 +60,7 @@ def __init__(self, crawler: Crawler):

crawler.signals.connect(self._close, signals.engine_stopped)

def _get_handler(self, scheme: str) -> Any:
def _get_handler(self, scheme: str) -> Optional[DownloadHandlerProtocol]:
"""Lazy-load the downloadhandler for a scheme
only on the first request for that scheme.
"""
Expand All @@ -54,10 +74,12 @@ def _get_handler(self, scheme: str) -> Any:

return self._load_handler(scheme)

def _load_handler(self, scheme: str, skip_lazy: bool = False) -> Any:
def _load_handler(
self, scheme: str, skip_lazy: bool = False
) -> Optional[DownloadHandlerProtocol]:
path = self._schemes[scheme]
try:
dhcls = load_object(path)
dhcls: Type[DownloadHandlerProtocol] = load_object(path)
if skip_lazy and getattr(dhcls, "lazy", True):
return None
dh = build_from_crawler(
Expand All @@ -80,17 +102,17 @@ def _load_handler(self, scheme: str, skip_lazy: bool = False) -> Any:
self._handlers[scheme] = dh
return dh

def download_request(self, request: Request, spider: Spider) -> Deferred:
def download_request(self, request: Request, spider: Spider) -> Deferred[Response]:
scheme = urlparse_cached(request).scheme
handler = self._get_handler(scheme)
if not handler:
raise NotSupported(
f"Unsupported URL scheme '{scheme}': {self._notconfigured[scheme]}"
)
return cast(Deferred, handler.download_request(request, spider))
return handler.download_request(request, spider)

@defer.inlineCallbacks
def _close(self, *_a: Any, **_kw: Any) -> Generator[Deferred, Any, None]:
def _close(self, *_a: Any, **_kw: Any) -> Generator[Deferred[Any], Any, None]:
for dh in self._handlers.values():
if hasattr(dh, "close"):
yield dh.close()
10 changes: 7 additions & 3 deletions scrapy/core/downloader/handlers/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(self, settings: BaseSettings):
def from_crawler(cls, crawler: Crawler) -> Self:
return cls(crawler.settings)

def download_request(self, request: Request, spider: Spider) -> Deferred:
def download_request(self, request: Request, spider: Spider) -> Deferred[Response]:
from twisted.internet import reactor

parsed_url = urlparse_cached(request)
Expand All @@ -103,10 +103,14 @@ def download_request(self, request: Request, spider: Spider) -> Deferred:
creator = ClientCreator(
reactor, FTPClient, user, password, passive=passive_mode
)
dfd: Deferred = creator.connectTCP(parsed_url.hostname, parsed_url.port or 21)
dfd: Deferred[FTPClient] = creator.connectTCP(
parsed_url.hostname, parsed_url.port or 21
)
return dfd.addCallback(self.gotClient, request, unquote(parsed_url.path))

def gotClient(self, client: FTPClient, request: Request, filepath: str) -> Deferred:
def gotClient(
self, client: FTPClient, request: Request, filepath: str
) -> Deferred[Response]:
self.client = client
protocol = ReceivedDataProtocol(request.meta.get("ftp_local_filename"))
d = client.retrieveFile(filepath, protocol)
Expand Down
3 changes: 2 additions & 1 deletion scrapy/core/downloader/handlers/http10.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from scrapy import Request, Spider
from scrapy.crawler import Crawler
from scrapy.http import Response
from scrapy.settings import BaseSettings
from scrapy.utils.misc import build_from_crawler, load_object
from scrapy.utils.python import to_unicode
Expand Down Expand Up @@ -38,7 +39,7 @@ def __init__(self, settings: BaseSettings, crawler: Crawler):
def from_crawler(cls, crawler: Crawler) -> Self:
return cls(crawler.settings, crawler)

def download_request(self, request: Request, spider: Spider) -> Deferred:
def download_request(self, request: Request, spider: Spider) -> Deferred[Response]:
"""Return a deferred for the HTTP download"""
factory = self.HTTPClientFactory(request)
self._connect(factory)
Expand Down
Loading
Loading