Skip to content

Commit

Permalink
Add source parameter to bytes_received signal
Browse files Browse the repository at this point in the history
  • Loading branch information
elacuesta committed Jan 29, 2020
1 parent a499f38 commit 6f02a8d
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 10 deletions.
12 changes: 8 additions & 4 deletions docs/topics/signals.rst
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ bytes_received
--------------

.. signal:: bytes_received
.. function:: bytes_received(data, request, spider)
.. function:: bytes_received(data, request, spider, source)

Sent by the HTTP 1.1 download handler when a group of bytes is
received for a specific request. This signal might be fired
multiple times for the same request.
Sent by the HTTP 1.1 and S3 download handlers when a group of bytes is
received for a specific request. This signal might be fired multiple
times for the same request, with partial data each time.

This signal does not support returning deferreds from its handlers.

Expand All @@ -179,6 +179,10 @@ bytes_received
:param spider: the spider associated with the response
:type spider: :class:`~scrapy.spiders.Spider` object

:param source: a string to identify which handler sent the signal
(current values could be "http11" or "s3")
:type source: :class:`str` object

spider_closed
-------------

Expand Down
18 changes: 13 additions & 5 deletions scrapy/core/downloader/handlers/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
class HTTP11DownloadHandler:
lazy = False

def __init__(self, settings, crawler=None):
def __init__(self, settings, crawler=None, source="http11"):
self.crawler = crawler
self.source = source
self._pool = HTTPConnectionPool(reactor, persistent=True)
self._pool.maxPersistentPerHost = settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN')
self._pool._factory.noisy = False
Expand Down Expand Up @@ -67,8 +68,8 @@ def __init__(self, settings, crawler=None):
self._disconnect_timeout = 1

@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings, crawler)
def from_crawler(cls, crawler, **kwargs):
return cls(crawler.settings, crawler, **kwargs)

def download_request(self, request, spider):
"""Return a deferred for the HTTP download"""
Expand All @@ -79,6 +80,7 @@ def download_request(self, request, spider):
warnsize=getattr(spider, 'download_warnsize', self._default_warnsize),
fail_on_dataloss=self._fail_on_dataloss,
crawler=self.crawler,
source=self.source,
)
return agent.download_request(request)

Expand Down Expand Up @@ -275,7 +277,7 @@ class ScrapyAgent:
_TunnelingAgent = TunnelingAgent

def __init__(self, contextFactory=None, connectTimeout=10, bindAddress=None, pool=None,
maxsize=0, warnsize=0, fail_on_dataloss=True, crawler=None):
maxsize=0, warnsize=0, fail_on_dataloss=True, crawler=None, source=None):
self._contextFactory = contextFactory
self._connectTimeout = connectTimeout
self._bindAddress = bindAddress
Expand All @@ -285,6 +287,7 @@ def __init__(self, contextFactory=None, connectTimeout=10, bindAddress=None, poo
self._fail_on_dataloss = fail_on_dataloss
self._txresponse = None
self._crawler = crawler
self._source = source

def _get_agent(self, request, timeout):
bindaddress = request.meta.get('bindaddress') or self._bindAddress
Expand Down Expand Up @@ -421,6 +424,7 @@ def _cancel(_):
warnsize,
fail_on_dataloss,
self._crawler,
self._source,
)
)

Expand Down Expand Up @@ -457,7 +461,9 @@ def stopProducing(self):

class _ResponseReader(protocol.Protocol):

def __init__(self, finished, txresponse, request, maxsize, warnsize, fail_on_dataloss, crawler):
def __init__(
self, finished, txresponse, request, maxsize, warnsize, fail_on_dataloss, crawler, source
):
self._finished = finished
self._txresponse = txresponse
self._request = request
Expand All @@ -469,6 +475,7 @@ def __init__(self, finished, txresponse, request, maxsize, warnsize, fail_on_dat
self._reached_warnsize = False
self._bytes_received = 0
self._crawler = crawler
self._source = source

def dataReceived(self, bodyBytes):
# This maybe called several times after cancel was called with buffered data.
Expand All @@ -483,6 +490,7 @@ def dataReceived(self, bodyBytes):
data=bodyBytes,
request=self._request,
spider=self._crawler.spider,
source=self._source,
)

if self._maxsize and self._bytes_received > self._maxsize:
Expand Down
1 change: 1 addition & 0 deletions scrapy/core/downloader/handlers/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def __init__(self, settings, *,
objcls=httpdownloadhandler,
settings=settings,
crawler=crawler,
source="s3",
)
self._download_http = _http_handler.download_request

Expand Down
3 changes: 3 additions & 0 deletions tests/test_downloader_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,9 @@ def test_download_with_proxy_https_timeout(self):

class HttpDownloadHandlerMock:

def __init__(self, *args, **kwargs):
pass

def download_request(self, request, spider):
return request

Expand Down
5 changes: 4 additions & 1 deletion tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def __init__(self, spider_class):
self.itemerror = []
self.itemresp = []
self.bytes = defaultdict(lambda: list())
self.bytes_source = set()
self.signals_caught = {}
self.spider_class = spider_class

Expand Down Expand Up @@ -165,8 +166,9 @@ def item_error(self, item, response, spider, failure):
def item_scraped(self, item, spider, response):
self.itemresp.append((item, response))

def bytes_received(self, data, request, spider):
def bytes_received(self, data, request, spider, source):
self.bytes[request].append(data)
self.bytes_source.add(source)

def request_scheduled(self, request, spider):
self.reqplug.append((request, spider))
Expand Down Expand Up @@ -279,6 +281,7 @@ def _assert_scraped_items(self):

def _assert_bytes_received(self):
self.assertEqual(9, len(self.run.bytes))
self.assertEqual(self.run.bytes_source, set(["http11"]))
for request, data in self.run.bytes.items():
joined_data = b"".join(data)
if self.run.getpath(request.url) == "/":
Expand Down

0 comments on commit 6f02a8d

Please sign in to comment.