Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional typing for scraper and a small code change. #5100

Merged
merged 2 commits into from Apr 14, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
102 changes: 60 additions & 42 deletions scrapy/core/scraper.py
Expand Up @@ -3,11 +3,10 @@

import logging
from collections import deque
from collections.abc import Iterable
from typing import Union
from typing import Union, Optional, Tuple, Set, Deque, Any, Iterable
Gallaecio marked this conversation as resolved.
Show resolved Hide resolved

from itemadapter import is_item
from twisted.internet import defer
from twisted.internet.defer import Deferred, inlineCallbacks
from twisted.python.failure import Failure

from scrapy import signals, Spider
Expand All @@ -20,6 +19,9 @@
from scrapy.utils.spider import iterate_spider_output


QUEUE_TUPLE = Tuple[Union[Response, Failure], Request, Deferred]
Gallaecio marked this conversation as resolved.
Show resolved Hide resolved


logger = logging.getLogger(__name__)


Expand All @@ -28,46 +30,46 @@ class Slot:

MIN_RESPONSE_SIZE = 1024

def __init__(self, max_active_size=5000000):
def __init__(self, max_active_size: int = 5000000):
self.max_active_size = max_active_size
self.queue = deque()
self.active = set()
self.active_size = 0
self.itemproc_size = 0
self.closing = None

def add_response_request(self, response, request):
deferred = defer.Deferred()
self.queue.append((response, request, deferred))
if isinstance(response, Response):
self.active_size += max(len(response.body), self.MIN_RESPONSE_SIZE)
self.queue: Deque[QUEUE_TUPLE] = deque()
self.active: Set[Request] = set()
self.active_size: int = 0
self.itemproc_size: int = 0
self.closing: Optional[Deferred] = None

def add_response_request(self, result: Union[Response, Failure], request: Request) -> Deferred:
deferred = Deferred()
self.queue.append((result, request, deferred))
if isinstance(result, Response):
self.active_size += max(len(result.body), self.MIN_RESPONSE_SIZE)
else:
self.active_size += self.MIN_RESPONSE_SIZE
return deferred

def next_response_request_deferred(self):
def next_response_request_deferred(self) -> QUEUE_TUPLE:
response, request, deferred = self.queue.popleft()
self.active.add(request)
return response, request, deferred

def finish_response(self, response, request):
def finish_response(self, result: Union[Response, Failure], request: Request) -> None:
self.active.remove(request)
if isinstance(response, Response):
self.active_size -= max(len(response.body), self.MIN_RESPONSE_SIZE)
if isinstance(result, Response):
self.active_size -= max(len(result.body), self.MIN_RESPONSE_SIZE)
else:
self.active_size -= self.MIN_RESPONSE_SIZE

def is_idle(self):
def is_idle(self) -> bool:
return not (self.queue or self.active)

def needs_backout(self):
def needs_backout(self) -> bool:
return self.active_size > self.max_active_size


class Scraper:

def __init__(self, crawler):
self.slot = None
self.slot: Optional[Slot] = None
self.spidermw = SpiderMiddlewareManager.from_crawler(crawler)
itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR'])
self.itemproc = itemproc_cls.from_crawler(crawler)
Expand All @@ -76,32 +78,37 @@ def __init__(self, crawler):
self.signals = crawler.signals
self.logformatter = crawler.logformatter

@defer.inlineCallbacks
def open_spider(self, spider):
@inlineCallbacks
def open_spider(self, spider: Spider):
"""Open the given spider for scraping and allocate resources for it"""
self.slot = Slot(self.crawler.settings.getint('SCRAPER_SLOT_MAX_ACTIVE_SIZE'))
yield self.itemproc.open_spider(spider)

def close_spider(self, spider):
def close_spider(self, spider: Spider) -> Deferred:
"""Close a spider being scraped and release its resources"""
self.slot.closing = defer.Deferred()
if self.slot is None:
raise RuntimeError("Scraper slot not assigned")
self.slot.closing = Deferred()
self.slot.closing.addCallback(self.itemproc.close_spider)
self._check_if_closing(spider)
return self.slot.closing

def is_idle(self):
def is_idle(self) -> bool:
"""Return True if there isn't any more spiders to process"""
return not self.slot

def _check_if_closing(self, spider):
def _check_if_closing(self, spider: Spider) -> None:
assert self.slot is not None # typing
if self.slot.closing and self.slot.is_idle():
self.slot.closing.callback(spider)

def enqueue_scrape(self, response, request, spider):
dfd = self.slot.add_response_request(response, request)
def enqueue_scrape(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
if self.slot is None:
raise RuntimeError("Scraper slot not assigned")
dfd = self.slot.add_response_request(result, request)

def finish_scraping(_):
self.slot.finish_response(response, request)
self.slot.finish_response(result, request)
self._check_if_closing(spider)
self._scrape_next(spider)
return _
Expand All @@ -115,12 +122,13 @@ def finish_scraping(_):
self._scrape_next(spider)
return dfd

def _scrape_next(self, spider):
def _scrape_next(self, spider: Spider) -> None:
assert self.slot is not None # typing
while self.slot.queue:
response, request, deferred = self.slot.next_response_request_deferred()
self._scrape(response, request, spider).chainDeferred(deferred)

def _scrape(self, result: Union[Response, Failure], request: Request, spider: Spider):
def _scrape(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
"""
Handle the downloaded response or failure through the spider callback/errback
"""
Expand All @@ -131,7 +139,7 @@ def _scrape(self, result: Union[Response, Failure], request: Request, spider: Sp
dfd.addCallback(self.handle_spider_output, request, result, spider)
return dfd

def _scrape2(self, result: Union[Response, Failure], request: Request, spider: Spider):
def _scrape2(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
"""
Handle the different cases of request's result been a Response or a Failure
"""
Expand All @@ -141,7 +149,7 @@ def _scrape2(self, result: Union[Response, Failure], request: Request, spider: S
dfd = self.call_spider(result, request, spider)
return dfd.addErrback(self._log_download_errors, result, request, spider)

def call_spider(self, result: Union[Response, Failure], request: Request, spider: Spider):
def call_spider(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
if isinstance(result, Response):
if getattr(result, "request", None) is None:
result.request = request
Expand All @@ -156,7 +164,7 @@ def call_spider(self, result: Union[Response, Failure], request: Request, spider
dfd.addErrback(request.errback)
return dfd.addCallback(iterate_spider_output)

def handle_spider_error(self, _failure: Failure, request: Request, response: Response, spider: Spider):
def handle_spider_error(self, _failure: Failure, request: Request, response: Response, spider: Spider) -> None:
exc = _failure.value
if isinstance(exc, CloseSpider):
self.crawler.engine.close_spider(spider, exc.reason or 'cancelled')
Expand All @@ -177,18 +185,20 @@ def handle_spider_error(self, _failure: Failure, request: Request, response: Res
spider=spider
)

def handle_spider_output(self, result: Iterable, request: Request, response: Response, spider: Spider):
def handle_spider_output(self, result: Iterable, request: Request, response: Response, spider: Spider) -> Deferred:
if not result:
return defer_succeed(None)
it = iter_errback(result, self.handle_spider_error, request, response, spider)
dfd = parallel(it, self.concurrent_items, self._process_spidermw_output,
request, response, spider)
return dfd

def _process_spidermw_output(self, output, request, response, spider):
def _process_spidermw_output(self, output: Any, request: Request, response: Response,
spider: Spider) -> Optional[Deferred]:
"""Process each Request/Item (given in the output parameter) returned
from the given spider
"""
assert self.slot is not None # typing
if isinstance(output, Request):
self.crawler.engine.crawl(request=output, spider=spider)
elif is_item(output):
Expand All @@ -205,12 +215,18 @@ def _process_spidermw_output(self, output, request, response, spider):
{'request': request, 'typename': typename},
extra={'spider': spider},
)
return None

def _log_download_errors(self, spider_failure, download_failure, request, spider):
def _log_download_errors(self, spider_failure: Failure, download_failure: Failure, request: Request,
spider: Spider) -> Union[Failure, None]:
"""Log and silence errors that come from the engine (typically download
errors that got propagated thru here)
errors that got propagated thru here).

spider_failure: the value passed into the errback of self.call_spider()
download_failure: the value passed into _scrape2() from
ExecutionEngine._handle_downloader_output() as "result"
"""
if isinstance(download_failure, Failure) and not download_failure.check(IgnoreRequest):
if not download_failure.check(IgnoreRequest):
if download_failure.frames:
logkws = self.logformatter.download_error(download_failure, request, spider)
logger.log(
Expand All @@ -230,10 +246,12 @@ def _log_download_errors(self, spider_failure, download_failure, request, spider

if spider_failure is not download_failure:
return spider_failure
return None

def _itemproc_finished(self, output, item, response, spider):
def _itemproc_finished(self, output: Any, item: Any, response: Response, spider: Spider) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think it needs to be done in this pull request, but I’m thinking that it may be good to have a type alias for all itemadapter-supported item types, and use it in scenarios like this one instead of Any. It probably makes sense to implement that type alias in itemadapter, though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, assuming it's possible to express.

"""ItemProcessor finished for the given ``item`` and returned ``output``
"""
assert self.slot is not None # typing
self.slot.itemproc_size -= 1
if isinstance(output, Failure):
ex = output.value
Expand Down