Skip to content

Commit

Permalink
Merge pull request #5100 from wRAR/more-scraper-typing
Browse files Browse the repository at this point in the history
Additional typing for scraper and a small code change.
  • Loading branch information
wRAR committed Apr 14, 2021
2 parents 9bf9ab7 + 309a637 commit 06f3d12
Showing 1 changed file with 60 additions and 42 deletions.
102 changes: 60 additions & 42 deletions scrapy/core/scraper.py
Expand Up @@ -3,11 +3,10 @@

import logging
from collections import deque
from collections.abc import Iterable
from typing import Union
from typing import Any, Deque, Iterable, Optional, Set, Tuple, Union

from itemadapter import is_item
from twisted.internet import defer
from twisted.internet.defer import Deferred, inlineCallbacks
from twisted.python.failure import Failure

from scrapy import signals, Spider
Expand All @@ -20,6 +19,9 @@
from scrapy.utils.spider import iterate_spider_output


QueueTuple = Tuple[Union[Response, Failure], Request, Deferred]


logger = logging.getLogger(__name__)


Expand All @@ -28,46 +30,46 @@ class Slot:

MIN_RESPONSE_SIZE = 1024

def __init__(self, max_active_size=5000000):
def __init__(self, max_active_size: int = 5000000):
self.max_active_size = max_active_size
self.queue = deque()
self.active = set()
self.active_size = 0
self.itemproc_size = 0
self.closing = None

def add_response_request(self, response, request):
deferred = defer.Deferred()
self.queue.append((response, request, deferred))
if isinstance(response, Response):
self.active_size += max(len(response.body), self.MIN_RESPONSE_SIZE)
self.queue: Deque[QueueTuple] = deque()
self.active: Set[Request] = set()
self.active_size: int = 0
self.itemproc_size: int = 0
self.closing: Optional[Deferred] = None

def add_response_request(self, result: Union[Response, Failure], request: Request) -> Deferred:
deferred = Deferred()
self.queue.append((result, request, deferred))
if isinstance(result, Response):
self.active_size += max(len(result.body), self.MIN_RESPONSE_SIZE)
else:
self.active_size += self.MIN_RESPONSE_SIZE
return deferred

def next_response_request_deferred(self):
def next_response_request_deferred(self) -> QueueTuple:
response, request, deferred = self.queue.popleft()
self.active.add(request)
return response, request, deferred

def finish_response(self, response, request):
def finish_response(self, result: Union[Response, Failure], request: Request) -> None:
self.active.remove(request)
if isinstance(response, Response):
self.active_size -= max(len(response.body), self.MIN_RESPONSE_SIZE)
if isinstance(result, Response):
self.active_size -= max(len(result.body), self.MIN_RESPONSE_SIZE)
else:
self.active_size -= self.MIN_RESPONSE_SIZE

def is_idle(self):
def is_idle(self) -> bool:
return not (self.queue or self.active)

def needs_backout(self):
def needs_backout(self) -> bool:
return self.active_size > self.max_active_size


class Scraper:

def __init__(self, crawler):
self.slot = None
self.slot: Optional[Slot] = None
self.spidermw = SpiderMiddlewareManager.from_crawler(crawler)
itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR'])
self.itemproc = itemproc_cls.from_crawler(crawler)
Expand All @@ -76,32 +78,37 @@ def __init__(self, crawler):
self.signals = crawler.signals
self.logformatter = crawler.logformatter

@defer.inlineCallbacks
def open_spider(self, spider):
@inlineCallbacks
def open_spider(self, spider: Spider):
"""Open the given spider for scraping and allocate resources for it"""
self.slot = Slot(self.crawler.settings.getint('SCRAPER_SLOT_MAX_ACTIVE_SIZE'))
yield self.itemproc.open_spider(spider)

def close_spider(self, spider):
def close_spider(self, spider: Spider) -> Deferred:
"""Close a spider being scraped and release its resources"""
self.slot.closing = defer.Deferred()
if self.slot is None:
raise RuntimeError("Scraper slot not assigned")
self.slot.closing = Deferred()
self.slot.closing.addCallback(self.itemproc.close_spider)
self._check_if_closing(spider)
return self.slot.closing

def is_idle(self):
def is_idle(self) -> bool:
"""Return True if there isn't any more spiders to process"""
return not self.slot

def _check_if_closing(self, spider):
def _check_if_closing(self, spider: Spider) -> None:
assert self.slot is not None # typing
if self.slot.closing and self.slot.is_idle():
self.slot.closing.callback(spider)

def enqueue_scrape(self, response, request, spider):
dfd = self.slot.add_response_request(response, request)
def enqueue_scrape(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
if self.slot is None:
raise RuntimeError("Scraper slot not assigned")
dfd = self.slot.add_response_request(result, request)

def finish_scraping(_):
self.slot.finish_response(response, request)
self.slot.finish_response(result, request)
self._check_if_closing(spider)
self._scrape_next(spider)
return _
Expand All @@ -115,12 +122,13 @@ def finish_scraping(_):
self._scrape_next(spider)
return dfd

def _scrape_next(self, spider):
def _scrape_next(self, spider: Spider) -> None:
assert self.slot is not None # typing
while self.slot.queue:
response, request, deferred = self.slot.next_response_request_deferred()
self._scrape(response, request, spider).chainDeferred(deferred)

def _scrape(self, result: Union[Response, Failure], request: Request, spider: Spider):
def _scrape(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
"""
Handle the downloaded response or failure through the spider callback/errback
"""
Expand All @@ -131,7 +139,7 @@ def _scrape(self, result: Union[Response, Failure], request: Request, spider: Sp
dfd.addCallback(self.handle_spider_output, request, result, spider)
return dfd

def _scrape2(self, result: Union[Response, Failure], request: Request, spider: Spider):
def _scrape2(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
"""
Handle the different cases of request's result been a Response or a Failure
"""
Expand All @@ -141,7 +149,7 @@ def _scrape2(self, result: Union[Response, Failure], request: Request, spider: S
dfd = self.call_spider(result, request, spider)
return dfd.addErrback(self._log_download_errors, result, request, spider)

def call_spider(self, result: Union[Response, Failure], request: Request, spider: Spider):
def call_spider(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
if isinstance(result, Response):
if getattr(result, "request", None) is None:
result.request = request
Expand All @@ -156,7 +164,7 @@ def call_spider(self, result: Union[Response, Failure], request: Request, spider
dfd.addErrback(request.errback)
return dfd.addCallback(iterate_spider_output)

def handle_spider_error(self, _failure: Failure, request: Request, response: Response, spider: Spider):
def handle_spider_error(self, _failure: Failure, request: Request, response: Response, spider: Spider) -> None:
exc = _failure.value
if isinstance(exc, CloseSpider):
self.crawler.engine.close_spider(spider, exc.reason or 'cancelled')
Expand All @@ -177,18 +185,20 @@ def handle_spider_error(self, _failure: Failure, request: Request, response: Res
spider=spider
)

def handle_spider_output(self, result: Iterable, request: Request, response: Response, spider: Spider):
def handle_spider_output(self, result: Iterable, request: Request, response: Response, spider: Spider) -> Deferred:
if not result:
return defer_succeed(None)
it = iter_errback(result, self.handle_spider_error, request, response, spider)
dfd = parallel(it, self.concurrent_items, self._process_spidermw_output,
request, response, spider)
return dfd

def _process_spidermw_output(self, output, request, response, spider):
def _process_spidermw_output(self, output: Any, request: Request, response: Response,
spider: Spider) -> Optional[Deferred]:
"""Process each Request/Item (given in the output parameter) returned
from the given spider
"""
assert self.slot is not None # typing
if isinstance(output, Request):
self.crawler.engine.crawl(request=output, spider=spider)
elif is_item(output):
Expand All @@ -205,12 +215,18 @@ def _process_spidermw_output(self, output, request, response, spider):
{'request': request, 'typename': typename},
extra={'spider': spider},
)
return None

def _log_download_errors(self, spider_failure, download_failure, request, spider):
def _log_download_errors(self, spider_failure: Failure, download_failure: Failure, request: Request,
spider: Spider) -> Union[Failure, None]:
"""Log and silence errors that come from the engine (typically download
errors that got propagated thru here)
errors that got propagated thru here).
spider_failure: the value passed into the errback of self.call_spider()
download_failure: the value passed into _scrape2() from
ExecutionEngine._handle_downloader_output() as "result"
"""
if isinstance(download_failure, Failure) and not download_failure.check(IgnoreRequest):
if not download_failure.check(IgnoreRequest):
if download_failure.frames:
logkws = self.logformatter.download_error(download_failure, request, spider)
logger.log(
Expand All @@ -230,10 +246,12 @@ def _log_download_errors(self, spider_failure, download_failure, request, spider

if spider_failure is not download_failure:
return spider_failure
return None

def _itemproc_finished(self, output, item, response, spider):
def _itemproc_finished(self, output: Any, item: Any, response: Response, spider: Spider) -> None:
"""ItemProcessor finished for the given ``item`` and returned ``output``
"""
assert self.slot is not None # typing
self.slot.itemproc_size -= 1
if isinstance(output, Failure):
ex = output.value
Expand Down

0 comments on commit 06f3d12

Please sign in to comment.