diff --git a/scrapy/core/scraper.py b/scrapy/core/scraper.py index c760a4155e8..96aa536867e 100644 --- a/scrapy/core/scraper.py +++ b/scrapy/core/scraper.py @@ -3,11 +3,10 @@ import logging from collections import deque -from collections.abc import Iterable -from typing import Union +from typing import Any, Deque, Iterable, Optional, Set, Tuple, Union from itemadapter import is_item -from twisted.internet import defer +from twisted.internet.defer import Deferred, inlineCallbacks from twisted.python.failure import Failure from scrapy import signals, Spider @@ -20,6 +19,9 @@ from scrapy.utils.spider import iterate_spider_output +QueueTuple = Tuple[Union[Response, Failure], Request, Deferred] + + logger = logging.getLogger(__name__) @@ -28,46 +30,46 @@ class Slot: MIN_RESPONSE_SIZE = 1024 - def __init__(self, max_active_size=5000000): + def __init__(self, max_active_size: int = 5000000): self.max_active_size = max_active_size - self.queue = deque() - self.active = set() - self.active_size = 0 - self.itemproc_size = 0 - self.closing = None - - def add_response_request(self, response, request): - deferred = defer.Deferred() - self.queue.append((response, request, deferred)) - if isinstance(response, Response): - self.active_size += max(len(response.body), self.MIN_RESPONSE_SIZE) + self.queue: Deque[QueueTuple] = deque() + self.active: Set[Request] = set() + self.active_size: int = 0 + self.itemproc_size: int = 0 + self.closing: Optional[Deferred] = None + + def add_response_request(self, result: Union[Response, Failure], request: Request) -> Deferred: + deferred = Deferred() + self.queue.append((result, request, deferred)) + if isinstance(result, Response): + self.active_size += max(len(result.body), self.MIN_RESPONSE_SIZE) else: self.active_size += self.MIN_RESPONSE_SIZE return deferred - def next_response_request_deferred(self): + def next_response_request_deferred(self) -> QueueTuple: response, request, deferred = self.queue.popleft() self.active.add(request) return response, request, deferred - def finish_response(self, response, request): + def finish_response(self, result: Union[Response, Failure], request: Request) -> None: self.active.remove(request) - if isinstance(response, Response): - self.active_size -= max(len(response.body), self.MIN_RESPONSE_SIZE) + if isinstance(result, Response): + self.active_size -= max(len(result.body), self.MIN_RESPONSE_SIZE) else: self.active_size -= self.MIN_RESPONSE_SIZE - def is_idle(self): + def is_idle(self) -> bool: return not (self.queue or self.active) - def needs_backout(self): + def needs_backout(self) -> bool: return self.active_size > self.max_active_size class Scraper: def __init__(self, crawler): - self.slot = None + self.slot: Optional[Slot] = None self.spidermw = SpiderMiddlewareManager.from_crawler(crawler) itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR']) self.itemproc = itemproc_cls.from_crawler(crawler) @@ -76,32 +78,37 @@ def __init__(self, crawler): self.signals = crawler.signals self.logformatter = crawler.logformatter - @defer.inlineCallbacks - def open_spider(self, spider): + @inlineCallbacks + def open_spider(self, spider: Spider): """Open the given spider for scraping and allocate resources for it""" self.slot = Slot(self.crawler.settings.getint('SCRAPER_SLOT_MAX_ACTIVE_SIZE')) yield self.itemproc.open_spider(spider) - def close_spider(self, spider): + def close_spider(self, spider: Spider) -> Deferred: """Close a spider being scraped and release its resources""" - self.slot.closing = defer.Deferred() + if self.slot is None: + raise RuntimeError("Scraper slot not assigned") + self.slot.closing = Deferred() self.slot.closing.addCallback(self.itemproc.close_spider) self._check_if_closing(spider) return self.slot.closing - def is_idle(self): + def is_idle(self) -> bool: """Return True if there isn't any more spiders to process""" return not self.slot - def _check_if_closing(self, spider): + def _check_if_closing(self, spider: Spider) -> None: + assert self.slot is not None # typing if self.slot.closing and self.slot.is_idle(): self.slot.closing.callback(spider) - def enqueue_scrape(self, response, request, spider): - dfd = self.slot.add_response_request(response, request) + def enqueue_scrape(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred: + if self.slot is None: + raise RuntimeError("Scraper slot not assigned") + dfd = self.slot.add_response_request(result, request) def finish_scraping(_): - self.slot.finish_response(response, request) + self.slot.finish_response(result, request) self._check_if_closing(spider) self._scrape_next(spider) return _ @@ -115,12 +122,13 @@ def finish_scraping(_): self._scrape_next(spider) return dfd - def _scrape_next(self, spider): + def _scrape_next(self, spider: Spider) -> None: + assert self.slot is not None # typing while self.slot.queue: response, request, deferred = self.slot.next_response_request_deferred() self._scrape(response, request, spider).chainDeferred(deferred) - def _scrape(self, result: Union[Response, Failure], request: Request, spider: Spider): + def _scrape(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred: """ Handle the downloaded response or failure through the spider callback/errback """ @@ -131,7 +139,7 @@ def _scrape(self, result: Union[Response, Failure], request: Request, spider: Sp dfd.addCallback(self.handle_spider_output, request, result, spider) return dfd - def _scrape2(self, result: Union[Response, Failure], request: Request, spider: Spider): + def _scrape2(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred: """ Handle the different cases of request's result been a Response or a Failure """ @@ -141,7 +149,7 @@ def _scrape2(self, result: Union[Response, Failure], request: Request, spider: S dfd = self.call_spider(result, request, spider) return dfd.addErrback(self._log_download_errors, result, request, spider) - def call_spider(self, result: Union[Response, Failure], request: Request, spider: Spider): + def call_spider(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred: if isinstance(result, Response): if getattr(result, "request", None) is None: result.request = request @@ -156,7 +164,7 @@ def call_spider(self, result: Union[Response, Failure], request: Request, spider dfd.addErrback(request.errback) return dfd.addCallback(iterate_spider_output) - def handle_spider_error(self, _failure: Failure, request: Request, response: Response, spider: Spider): + def handle_spider_error(self, _failure: Failure, request: Request, response: Response, spider: Spider) -> None: exc = _failure.value if isinstance(exc, CloseSpider): self.crawler.engine.close_spider(spider, exc.reason or 'cancelled') @@ -177,7 +185,7 @@ def handle_spider_error(self, _failure: Failure, request: Request, response: Res spider=spider ) - def handle_spider_output(self, result: Iterable, request: Request, response: Response, spider: Spider): + def handle_spider_output(self, result: Iterable, request: Request, response: Response, spider: Spider) -> Deferred: if not result: return defer_succeed(None) it = iter_errback(result, self.handle_spider_error, request, response, spider) @@ -185,10 +193,12 @@ def handle_spider_output(self, result: Iterable, request: Request, response: Res request, response, spider) return dfd - def _process_spidermw_output(self, output, request, response, spider): + def _process_spidermw_output(self, output: Any, request: Request, response: Response, + spider: Spider) -> Optional[Deferred]: """Process each Request/Item (given in the output parameter) returned from the given spider """ + assert self.slot is not None # typing if isinstance(output, Request): self.crawler.engine.crawl(request=output, spider=spider) elif is_item(output): @@ -205,12 +215,18 @@ def _process_spidermw_output(self, output, request, response, spider): {'request': request, 'typename': typename}, extra={'spider': spider}, ) + return None - def _log_download_errors(self, spider_failure, download_failure, request, spider): + def _log_download_errors(self, spider_failure: Failure, download_failure: Failure, request: Request, + spider: Spider) -> Union[Failure, None]: """Log and silence errors that come from the engine (typically download - errors that got propagated thru here) + errors that got propagated thru here). + + spider_failure: the value passed into the errback of self.call_spider() + download_failure: the value passed into _scrape2() from + ExecutionEngine._handle_downloader_output() as "result" """ - if isinstance(download_failure, Failure) and not download_failure.check(IgnoreRequest): + if not download_failure.check(IgnoreRequest): if download_failure.frames: logkws = self.logformatter.download_error(download_failure, request, spider) logger.log( @@ -230,10 +246,12 @@ def _log_download_errors(self, spider_failure, download_failure, request, spider if spider_failure is not download_failure: return spider_failure + return None - def _itemproc_finished(self, output, item, response, spider): + def _itemproc_finished(self, output: Any, item: Any, response: Response, spider: Spider) -> None: """ItemProcessor finished for the given ``item`` and returned ``output`` """ + assert self.slot is not None # typing self.slot.itemproc_size -= 1 if isinstance(output, Failure): ex = output.value