Skip to content

Commit

Permalink
Merge pull request #4270 from wRAR/asyncio-pipelines
Browse files Browse the repository at this point in the history
async def support in pipelines
  • Loading branch information
kmike committed Jan 15, 2020
2 parents ce618fb + 7d85984 commit 50310fc
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 2 deletions.
1 change: 1 addition & 0 deletions pytest.ini
Expand Up @@ -95,6 +95,7 @@ flake8-ignore =
scrapy/loader/__init__.py E501 E128
scrapy/loader/processors.py E501
# scrapy/pipelines
scrapy/pipelines/__init__.py E501
scrapy/pipelines/files.py E116 E501 E266
scrapy/pipelines/images.py E265 E501
scrapy/pipelines/media.py E125 E501 E266
Expand Down
3 changes: 2 additions & 1 deletion scrapy/pipelines/__init__.py
Expand Up @@ -6,6 +6,7 @@

from scrapy.middleware import MiddlewareManager
from scrapy.utils.conf import build_component_list
from scrapy.utils.defer import deferred_f_from_coro_f


class ItemPipelineManager(MiddlewareManager):
Expand All @@ -19,7 +20,7 @@ def _get_mwlist_from_settings(cls, settings):
def _add_middleware(self, pipe):
super(ItemPipelineManager, self)._add_middleware(pipe)
if hasattr(pipe, 'process_item'):
self.methods['process_item'].append(pipe.process_item)
self.methods['process_item'].append(deferred_f_from_coro_f(pipe.process_item))

def process_item(self, item, spider):
return self._process_chain('process_item', item, spider)
13 changes: 13 additions & 0 deletions scrapy/utils/defer.py
Expand Up @@ -2,6 +2,7 @@
Helper functions for dealing with Twisted deferreds
"""
import asyncio
from functools import wraps
import inspect

from twisted.internet import defer, task
Expand Down Expand Up @@ -140,3 +141,15 @@ def deferred_from_coro(o):
# wrapping the coroutine into a Future and then into a Deferred, this requires AsyncioSelectorReactor
return defer.Deferred.fromFuture(asyncio.ensure_future(o))
return o


def deferred_f_from_coro_f(coro_f):
""" Converts a coroutine function into a function that returns a Deferred.
The coroutine function will be called at the time when the wrapper is called. Wrapper args will be passed to it.
This is useful for callback chains, as callback functions are called with the previous callback result.
"""
@wraps(coro_f)
def f(*coro_args, **coro_kwargs):
return deferred_from_coro(coro_f(*coro_args, **coro_kwargs))
return f
32 changes: 31 additions & 1 deletion tests/test_pipelines.py
@@ -1,9 +1,12 @@
import asyncio

from pytest import mark
from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.trial import unittest

from scrapy import Spider, signals, Request
from scrapy.utils.test import get_crawler
from scrapy.utils.test import get_crawler, get_from_asyncio_queue

from tests.mockserver import MockServer

Expand All @@ -26,6 +29,20 @@ def process_item(self, item, spider):
return d


class AsyncDefPipeline:
async def process_item(self, item, spider):
await defer.succeed(42)
item['pipeline_passed'] = True
return item


class AsyncDefAsyncioPipeline:
async def process_item(self, item, spider):
await asyncio.sleep(0.2)
item['pipeline_passed'] = await get_from_asyncio_queue(True)
return item


class ItemSpider(Spider):
name = 'itemspider'

Expand Down Expand Up @@ -69,3 +86,16 @@ def test_deferred_pipeline(self):
crawler = self._create_crawler(DeferredPipeline)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 1)

@defer.inlineCallbacks
def test_asyncdef_pipeline(self):
crawler = self._create_crawler(AsyncDefPipeline)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 1)

@mark.only_asyncio()
@defer.inlineCallbacks
def test_asyncdef_asyncio_pipeline(self):
crawler = self._create_crawler(AsyncDefAsyncioPipeline)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 1)

0 comments on commit 50310fc

Please sign in to comment.