Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed #5819 -- Bug of parse command #5824

Merged
merged 6 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 26 additions & 2 deletions scrapy/commands/parse.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
import json
import logging
from typing import Dict
Expand All @@ -10,7 +11,11 @@
from scrapy.exceptions import UsageError
from scrapy.http import Request
from scrapy.utils import display
from scrapy.utils.spider import iterate_spider_output, spidercls_for_request
from scrapy.utils.asyncgen import collect_asyncgen
from scrapy.utils.defer import aiter_errback, deferred_from_coro
from scrapy.utils.log import failure_to_exc_info
from scrapy.utils.misc import arg_to_iter
from scrapy.utils.spider import spidercls_for_request

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -108,6 +113,25 @@ def max_level(self):
max_requests = max(self.requests)
return max(max_items, max_requests)

def handle_exception(self, _failure):
logger.error(
"An error is caught while iterating the async iterable",
exc_info=failure_to_exc_info(_failure),
)

def iterate_spider_output(self, result):
if inspect.isasyncgen(result):
d = deferred_from_coro(
collect_asyncgen(aiter_errback(result, self.handle_exception))
)
d.addCallback(self.iterate_spider_output)
return d
if inspect.iscoroutine(result):
d = deferred_from_coro(result)
d.addCallback(self.iterate_spider_output)
return d
return arg_to_iter(deferred_from_coro(result))

def add_items(self, lvl, new_items):
old_items = self.items.get(lvl, [])
self.items[lvl] = old_items + new_items
Expand Down Expand Up @@ -165,7 +189,7 @@ def _get_items_and_requests(self, spider_output, opts, depth, spider, callback):

def run_callback(self, response, callback, cb_kwargs=None):
cb_kwargs = cb_kwargs or {}
d = maybeDeferred(iterate_spider_output, callback(response, **cb_kwargs))
d = maybeDeferred(self.iterate_spider_output, callback(response, **cb_kwargs))
return d

def get_callback_from_rules(self, spider, response):
Expand Down
110 changes: 104 additions & 6 deletions tests/test_command_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,53 @@ def setUp(self):
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
from scrapy.utils.test import get_from_asyncio_queue
import asyncio

class AsyncDefAsyncioSpider(scrapy.Spider):

name = 'asyncdef{self.spider_name}'
class AsyncDefAsyncioReturnSpider(scrapy.Spider):
name = "asyncdef_asyncio_return"

async def parse(self, response):
await asyncio.sleep(0.2)
status = await get_from_asyncio_queue(response.status)
return [scrapy.Item(), dict(foo='bar')]
self.logger.info(f"Got response {{status}}")
return [{{'id': 1}}, {{'id': 2}}]

class AsyncDefAsyncioReturnSingleElementSpider(scrapy.Spider):
name = "asyncdef_asyncio_return_single_element"

async def parse(self, response):
await asyncio.sleep(0.1)
status = await get_from_asyncio_queue(response.status)
self.logger.info(f"Got response {{status}}")
return {{'foo': 42}}

class AsyncDefAsyncioGenLoopSpider(scrapy.Spider):
name = "asyncdef_asyncio_gen_loop"

async def parse(self, response):
for i in range(10):
await asyncio.sleep(0.1)
yield {{'foo': i}}
self.logger.info(f"Got response {{response.status}}")

class AsyncDefAsyncioSpider(scrapy.Spider):
name = "asyncdef_asyncio"

async def parse(self, response):
await asyncio.sleep(0.2)
status = await get_from_asyncio_queue(response.status)
self.logger.debug(f"Got response {{status}}")

class AsyncDefAsyncioGenExcSpider(scrapy.Spider):
name = "asyncdef_asyncio_gen_exc"

async def parse(self, response):
for i in range(10):
await asyncio.sleep(0.1)
yield {{'foo': i}}
if i > 5:
raise ValueError("Stopping the processing")

class MySpider(scrapy.Spider):
name = '{self.spider_name}'
Expand Down Expand Up @@ -213,17 +252,76 @@ def test_pipelines(self):
self.assertIn("INFO: It Works!", _textmode(stderr))

@defer.inlineCallbacks
def test_asyncio_parse_items(self):
def test_async_def_asyncio_parse_items_list(self):
status, out, stderr = yield self.execute(
[
"--spider",
"asyncdef" + self.spider_name,
"asyncdef_asyncio_return",
"-c",
"parse",
self.url("/html"),
]
)
self.assertIn("""[{}, {'foo': 'bar'}]""", _textmode(out))
self.assertIn("INFO: Got response 200", _textmode(stderr))
self.assertIn("{'id': 1}", _textmode(out))
self.assertIn("{'id': 2}", _textmode(out))

@defer.inlineCallbacks
def test_async_def_asyncio_parse_items_single_element(self):
status, out, stderr = yield self.execute(
[
"--spider",
"asyncdef_asyncio_return_single_element",
"-c",
"parse",
self.url("/html"),
]
)
self.assertIn("INFO: Got response 200", _textmode(stderr))
self.assertIn("{'foo': 42}", _textmode(out))

@defer.inlineCallbacks
def test_async_def_asyncgen_parse_loop(self):
status, out, stderr = yield self.execute(
[
"--spider",
"asyncdef_asyncio_gen_loop",
"-c",
"parse",
self.url("/html"),
]
)
self.assertIn("INFO: Got response 200", _textmode(stderr))
for i in range(10):
self.assertIn(f"{{'foo': {i}}}", _textmode(out))

@defer.inlineCallbacks
def test_async_def_asyncgen_parse_exc(self):
status, out, stderr = yield self.execute(
[
"--spider",
"asyncdef_asyncio_gen_exc",
"-c",
"parse",
self.url("/html"),
]
)
self.assertIn("ValueError", _textmode(stderr))
for i in range(7):
self.assertIn(f"{{'foo': {i}}}", _textmode(out))

@defer.inlineCallbacks
def test_async_def_asyncio_parse(self):
_, _, stderr = yield self.execute(
[
"--spider",
"asyncdef_asyncio",
"-c",
"parse",
self.url("/html"),
]
)
self.assertIn("DEBUG: Got response 200", _textmode(stderr))

@defer.inlineCallbacks
def test_parse_items(self):
Expand Down