diff --git a/scrapy/commands/parse.py b/scrapy/commands/parse.py index 9c3fc86d48c..ac937e46495 100644 --- a/scrapy/commands/parse.py +++ b/scrapy/commands/parse.py @@ -1,3 +1,4 @@ +import inspect import json import logging from typing import Dict @@ -10,7 +11,11 @@ from scrapy.exceptions import UsageError from scrapy.http import Request from scrapy.utils import display -from scrapy.utils.spider import iterate_spider_output, spidercls_for_request +from scrapy.utils.asyncgen import collect_asyncgen +from scrapy.utils.defer import aiter_errback, deferred_from_coro +from scrapy.utils.log import failure_to_exc_info +from scrapy.utils.misc import arg_to_iter +from scrapy.utils.spider import spidercls_for_request logger = logging.getLogger(__name__) @@ -108,6 +113,25 @@ def max_level(self): max_requests = max(self.requests) return max(max_items, max_requests) + def handle_exception(self, _failure): + logger.error( + "An error is caught while iterating the async iterable", + exc_info=failure_to_exc_info(_failure), + ) + + def iterate_spider_output(self, result): + if inspect.isasyncgen(result): + d = deferred_from_coro( + collect_asyncgen(aiter_errback(result, self.handle_exception)) + ) + d.addCallback(self.iterate_spider_output) + return d + if inspect.iscoroutine(result): + d = deferred_from_coro(result) + d.addCallback(self.iterate_spider_output) + return d + return arg_to_iter(deferred_from_coro(result)) + def add_items(self, lvl, new_items): old_items = self.items.get(lvl, []) self.items[lvl] = old_items + new_items @@ -165,7 +189,7 @@ def _get_items_and_requests(self, spider_output, opts, depth, spider, callback): def run_callback(self, response, callback, cb_kwargs=None): cb_kwargs = cb_kwargs or {} - d = maybeDeferred(iterate_spider_output, callback(response, **cb_kwargs)) + d = maybeDeferred(self.iterate_spider_output, callback(response, **cb_kwargs)) return d def get_callback_from_rules(self, spider, response): diff --git a/tests/test_command_parse.py b/tests/test_command_parse.py index b0fb978e952..037333c03af 100644 --- a/tests/test_command_parse.py +++ b/tests/test_command_parse.py @@ -30,14 +30,53 @@ def setUp(self): from scrapy.linkextractors import LinkExtractor from scrapy.spiders import CrawlSpider, Rule from scrapy.utils.test import get_from_asyncio_queue +import asyncio -class AsyncDefAsyncioSpider(scrapy.Spider): - name = 'asyncdef{self.spider_name}' +class AsyncDefAsyncioReturnSpider(scrapy.Spider): + name = "asyncdef_asyncio_return" async def parse(self, response): + await asyncio.sleep(0.2) status = await get_from_asyncio_queue(response.status) - return [scrapy.Item(), dict(foo='bar')] + self.logger.info(f"Got response {{status}}") + return [{{'id': 1}}, {{'id': 2}}] + +class AsyncDefAsyncioReturnSingleElementSpider(scrapy.Spider): + name = "asyncdef_asyncio_return_single_element" + + async def parse(self, response): + await asyncio.sleep(0.1) + status = await get_from_asyncio_queue(response.status) + self.logger.info(f"Got response {{status}}") + return {{'foo': 42}} + +class AsyncDefAsyncioGenLoopSpider(scrapy.Spider): + name = "asyncdef_asyncio_gen_loop" + + async def parse(self, response): + for i in range(10): + await asyncio.sleep(0.1) + yield {{'foo': i}} + self.logger.info(f"Got response {{response.status}}") + +class AsyncDefAsyncioSpider(scrapy.Spider): + name = "asyncdef_asyncio" + + async def parse(self, response): + await asyncio.sleep(0.2) + status = await get_from_asyncio_queue(response.status) + self.logger.debug(f"Got response {{status}}") + +class AsyncDefAsyncioGenExcSpider(scrapy.Spider): + name = "asyncdef_asyncio_gen_exc" + + async def parse(self, response): + for i in range(10): + await asyncio.sleep(0.1) + yield {{'foo': i}} + if i > 5: + raise ValueError("Stopping the processing") class MySpider(scrapy.Spider): name = '{self.spider_name}' @@ -213,17 +252,76 @@ def test_pipelines(self): self.assertIn("INFO: It Works!", _textmode(stderr)) @defer.inlineCallbacks - def test_asyncio_parse_items(self): + def test_async_def_asyncio_parse_items_list(self): status, out, stderr = yield self.execute( [ "--spider", - "asyncdef" + self.spider_name, + "asyncdef_asyncio_return", "-c", "parse", self.url("/html"), ] ) - self.assertIn("""[{}, {'foo': 'bar'}]""", _textmode(out)) + self.assertIn("INFO: Got response 200", _textmode(stderr)) + self.assertIn("{'id': 1}", _textmode(out)) + self.assertIn("{'id': 2}", _textmode(out)) + + @defer.inlineCallbacks + def test_async_def_asyncio_parse_items_single_element(self): + status, out, stderr = yield self.execute( + [ + "--spider", + "asyncdef_asyncio_return_single_element", + "-c", + "parse", + self.url("/html"), + ] + ) + self.assertIn("INFO: Got response 200", _textmode(stderr)) + self.assertIn("{'foo': 42}", _textmode(out)) + + @defer.inlineCallbacks + def test_async_def_asyncgen_parse_loop(self): + status, out, stderr = yield self.execute( + [ + "--spider", + "asyncdef_asyncio_gen_loop", + "-c", + "parse", + self.url("/html"), + ] + ) + self.assertIn("INFO: Got response 200", _textmode(stderr)) + for i in range(10): + self.assertIn(f"{{'foo': {i}}}", _textmode(out)) + + @defer.inlineCallbacks + def test_async_def_asyncgen_parse_exc(self): + status, out, stderr = yield self.execute( + [ + "--spider", + "asyncdef_asyncio_gen_exc", + "-c", + "parse", + self.url("/html"), + ] + ) + self.assertIn("ValueError", _textmode(stderr)) + for i in range(7): + self.assertIn(f"{{'foo': {i}}}", _textmode(out)) + + @defer.inlineCallbacks + def test_async_def_asyncio_parse(self): + _, _, stderr = yield self.execute( + [ + "--spider", + "asyncdef_asyncio", + "-c", + "parse", + self.url("/html"), + ] + ) + self.assertIn("DEBUG: Got response 200", _textmode(stderr)) @defer.inlineCallbacks def test_parse_items(self):