Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async def support in pipelines #4270

Merged
merged 6 commits into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ flake8-ignore =
scrapy/loader/__init__.py E501 E128
scrapy/loader/processors.py E501
# scrapy/pipelines
scrapy/pipelines/__init__.py E501
scrapy/pipelines/files.py E116 E501 E266
scrapy/pipelines/images.py E265 E501
scrapy/pipelines/media.py E125 E501 E266
Expand Down
3 changes: 2 additions & 1 deletion scrapy/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from scrapy.middleware import MiddlewareManager
from scrapy.utils.conf import build_component_list
from scrapy.utils.defer import deferred_f_from_coro_f


class ItemPipelineManager(MiddlewareManager):
Expand All @@ -19,7 +20,7 @@ def _get_mwlist_from_settings(cls, settings):
def _add_middleware(self, pipe):
super(ItemPipelineManager, self)._add_middleware(pipe)
if hasattr(pipe, 'process_item'):
self.methods['process_item'].append(pipe.process_item)
self.methods['process_item'].append(deferred_f_from_coro_f(pipe.process_item))

def process_item(self, item, spider):
return self._process_chain('process_item', item, spider)
13 changes: 13 additions & 0 deletions scrapy/utils/defer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Helper functions for dealing with Twisted deferreds
"""
import asyncio
from functools import wraps
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💄 Below import inspect?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like @Gallaecio's style proposal a bit more than what's in the code. Merging PR anyways, this is not a blocker.

import inspect

from twisted.internet import defer, task
Expand Down Expand Up @@ -140,3 +141,15 @@ def deferred_from_coro(o):
# wrapping the coroutine into a Future and then into a Deferred, this requires AsyncioSelectorReactor
return defer.Deferred.fromFuture(asyncio.ensure_future(o))
return o


def deferred_f_from_coro_f(coro_f):
""" Converts a coroutine function into a function that returns a Deferred.

The coroutine function will be called at the time when the wrapper is called. Wrapper args will be passed to it.
This is useful for callback chains, as callback functions are called with the previous callback result.
"""
@wraps(coro_f)
def f(*coro_args, **coro_kwargs):
return deferred_from_coro(coro_f(*coro_args, **coro_kwargs))
return f
32 changes: 31 additions & 1 deletion tests/test_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import asyncio

from pytest import mark
from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.trial import unittest

from scrapy import Spider, signals, Request
from scrapy.utils.test import get_crawler
from scrapy.utils.test import get_crawler, get_from_asyncio_queue

from tests.mockserver import MockServer

Expand All @@ -26,6 +29,20 @@ def process_item(self, item, spider):
return d


class AsyncDefPipeline:
async def process_item(self, item, spider):
await defer.succeed(42)
item['pipeline_passed'] = True
return item


class AsyncDefAsyncioPipeline:
async def process_item(self, item, spider):
await asyncio.sleep(0.2)
item['pipeline_passed'] = await get_from_asyncio_queue(True)
return item


class ItemSpider(Spider):
name = 'itemspider'

Expand Down Expand Up @@ -69,3 +86,16 @@ def test_deferred_pipeline(self):
crawler = self._create_crawler(DeferredPipeline)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 1)

@defer.inlineCallbacks
def test_asyncdef_pipeline(self):
crawler = self._create_crawler(AsyncDefPipeline)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 1)

@mark.only_asyncio()
@defer.inlineCallbacks
def test_asyncdef_asyncio_pipeline(self):
crawler = self._create_crawler(AsyncDefAsyncioPipeline)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 1)