diff --git a/docs/cli.rst b/docs/cli.rst index 403343f01..137ac35c4 100644 --- a/docs/cli.rst +++ b/docs/cli.rst @@ -13,6 +13,8 @@ Plucks one data value per publisher. It writes a CSV file with the results, and - ``--package-pointer=STR`` (``-p``): The JSON Pointer to the value in the package. - ``--release-pointer=STR`` (``-r``): The JSON Pointer to the value in the release. - ``--truncate=NUM`` (``-t``): Truncate the value to this number of characters. +- ``--max-bytes=NUM``: Stop downloading an OCDS file after reading at least this many bytes. +- ``spider``: Run specific spiders. Omit to run all spiders. Get each publisher's publication policy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/kingfisher_scrapy/base_spider.py b/kingfisher_scrapy/base_spider.py index 31d084e19..38b626c08 100644 --- a/kingfisher_scrapy/base_spider.py +++ b/kingfisher_scrapy/base_spider.py @@ -333,42 +333,45 @@ def parse(self, response): else: raise UnknownArchiveFormatError(response.request.meta['file_name']) - with cls(BytesIO(response.body)) as archive_file: - number = 1 - for file_info in archive_file.infolist(): - # Avoid reading the rest of a large file, since the rest of the items will be dropped. - if self.sample and number > self.sample: - break - - filename = file_info.filename - basename = os.path.basename(filename) - if self.file_name_must_contain not in basename: - continue - if archive_format == 'rar' and file_info.isdir(): - continue - if archive_format == 'zip' and file_info.is_dir(): - continue - if not basename.endswith('.json'): - basename += '.json' - - compressed_file = archive_file.open(filename) - - # If `resize_package = True`, then we need to open the file twice: once to extract the package metadata - # and then to extract the releases themselves. - if self.resize_package: - data = {'data': compressed_file, 'package': archive_file.open(filename)} - else: - data = compressed_file - - yield File({ - 'file_name': basename, - 'data': data, - 'data_type': self.data_type, - 'url': response.request.url, - 'encoding': self.encoding - }) - - number += 1 + # If we use a context manager here, the archive file might close before the item pipeline reads from the file + # handlers of the compressed files. + archive_file = cls(BytesIO(response.body)) + + number = 1 + for file_info in archive_file.infolist(): + # Avoid reading the rest of a large file, since the rest of the items will be dropped. + if self.sample and number > self.sample: + break + + filename = file_info.filename + basename = os.path.basename(filename) + if self.file_name_must_contain not in basename: + continue + if archive_format == 'rar' and file_info.isdir(): + continue + if archive_format == 'zip' and file_info.is_dir(): + continue + if not basename.endswith('.json'): + basename += '.json' + + compressed_file = archive_file.open(filename) + + # If `resize_package = True`, then we need to open the file twice: once to extract the package metadata and + # then to extract the releases themselves. + if self.resize_package: + data = {'data': compressed_file, 'package': archive_file.open(filename)} + else: + data = compressed_file + + yield File({ + 'file_name': basename, + 'data': data, + 'data_type': self.data_type, + 'url': response.request.url, + 'encoding': self.encoding + }) + + number += 1 class LinksSpider(SimpleSpider): diff --git a/kingfisher_scrapy/commands/pluck.py b/kingfisher_scrapy/commands/pluck.py index e8e07fb5a..fbe36bbb9 100644 --- a/kingfisher_scrapy/commands/pluck.py +++ b/kingfisher_scrapy/commands/pluck.py @@ -13,6 +13,9 @@ class Pluck(ScrapyCommand): + def syntax(self): + return '[options] [spider ...]' + def short_desc(self): return 'Pluck one data value per publisher' @@ -21,6 +24,7 @@ def add_options(self, parser): parser.add_option('-p', '--package-pointer', help='The JSON Pointer to the value in the package') parser.add_option('-r', '--release-pointer', help='The JSON Pointer to the value in the release') parser.add_option('-t', '--truncate', type=int, help='Truncate the value to this number of characters') + parser.add_option('--max-bytes', type=int, help='Stop downloading an OCDS file after reading this many bytes') def run(self, args, opts): if not (bool(opts.package_pointer) ^ bool(opts.release_pointer)): @@ -35,6 +39,8 @@ def run(self, args, opts): 'scrapy.extensions.telnet.TelnetConsole': None, 'kingfisher_scrapy.extensions.KingfisherPluck': 1, }) + if opts.max_bytes: + self.settings.set('KINGFISHER_PLUCK_MAX_BYTES', opts.max_bytes) filename = _pluck_filename(opts) if os.path.isfile(filename): @@ -44,7 +50,7 @@ def run(self, args, opts): skipped = defaultdict(list) running = [] for spider_name in self.crawler_process.spider_loader.list(): - if spider_name != 'fail': + if not args and spider_name != 'fail' or spider_name in args: spidercls = self.crawler_process.spider_loader.load(spider_name) if hasattr(spidercls, 'skip_pluck'): skipped[spidercls.skip_pluck].append(spider_name) diff --git a/kingfisher_scrapy/extensions.py b/kingfisher_scrapy/extensions.py index 1d2a5fcae..74e20cf28 100644 --- a/kingfisher_scrapy/extensions.py +++ b/kingfisher_scrapy/extensions.py @@ -2,10 +2,11 @@ import json import os +from collections import defaultdict import sentry_sdk from scrapy import signals -from scrapy.exceptions import NotConfigured +from scrapy.exceptions import NotConfigured, StopDownload from kingfisher_scrapy import util from kingfisher_scrapy.items import File, FileError, FileItem, PluckedItem @@ -15,30 +16,46 @@ # https://docs.scrapy.org/en/latest/topics/extensions.html#writing-your-own-extension class KingfisherPluck: - def __init__(self, directory): + def __init__(self, directory, max_bytes): self.directory = directory - self.spiders_seen = set() + self.max_bytes = max_bytes + + self.bytes_received_per_spider = defaultdict(int) + self.item_scraped_per_spider = set() @classmethod def from_crawler(cls, crawler): directory = crawler.settings['KINGFISHER_PLUCK_PATH'] + max_bytes = crawler.settings['KINGFISHER_PLUCK_MAX_BYTES'] - extension = cls(directory=directory) + extension = cls(directory=directory, max_bytes=max_bytes) crawler.signals.connect(extension.item_scraped, signal=signals.item_scraped) crawler.signals.connect(extension.spider_closed, signal=signals.spider_closed) + if max_bytes: + crawler.signals.connect(extension.bytes_received, signal=signals.bytes_received) return extension + def bytes_received(self, data, request, spider): + # We only limit the bytes received for final requests (i.e. where the callback is the default `parse` method). + if not spider.pluck or request.callback is not None or request.meta['file_name'].endswith(('.rar', '.zip')): + return + + # Scrapy typically downloads of 16,384-byte chunks. + self.bytes_received_per_spider[spider.name] += len(data) + if self.bytes_received_per_spider[spider.name] >= self.max_bytes: + raise StopDownload(fail=False) + def item_scraped(self, item, spider): - if not spider.pluck or spider.name in self.spiders_seen or not isinstance(item, PluckedItem): + if not spider.pluck or spider.name in self.item_scraped_per_spider or not isinstance(item, PluckedItem): return - self.spiders_seen.add(spider.name) + self.item_scraped_per_spider.add(spider.name) self._write(spider, item['value']) def spider_closed(self, spider, reason): - if not spider.pluck or spider.name in self.spiders_seen: + if not spider.pluck or spider.name in self.item_scraped_per_spider: return self._write(spider, f'closed: {reason}') diff --git a/kingfisher_scrapy/pipelines.py b/kingfisher_scrapy/pipelines.py index 54dd72c4f..5424d8389 100644 --- a/kingfisher_scrapy/pipelines.py +++ b/kingfisher_scrapy/pipelines.py @@ -92,6 +92,9 @@ def process_item(self, item, spider): value = next(ijson.items(item['data'], pointer.replace('/', '.')[1:])) except StopIteration: value = f'error: {pointer} not found' + # The JSON text can be truncated by a `bytes_received` handler. + except ijson.common.IncompleteJSONError: + value = f'error: {pointer} not found within initial bytes' else: # spider.release_pointer if isinstance(item['data'], dict): data = item['data'] diff --git a/kingfisher_scrapy/settings.py b/kingfisher_scrapy/settings.py index 783b92a9e..8c55400b0 100644 --- a/kingfisher_scrapy/settings.py +++ b/kingfisher_scrapy/settings.py @@ -120,6 +120,7 @@ KINGFISHER_OPENOPPS_PASSWORD = os.getenv('KINGFISHER_OPENOPPS_PASSWORD') KINGFISHER_PLUCK_PATH = os.getenv('KINGFISHER_PLUCK_PATH', '') +KINGFISHER_PLUCK_MAX_BYTES = None # Enable and configure the AutoThrottle extension (disabled by default) # See https://docs.scrapy.org/en/latest/topics/autothrottle.html diff --git a/kingfisher_scrapy/spiders/digiwhist_base.py b/kingfisher_scrapy/spiders/digiwhist_base.py index b205722db..4a5964a78 100644 --- a/kingfisher_scrapy/spiders/digiwhist_base.py +++ b/kingfisher_scrapy/spiders/digiwhist_base.py @@ -27,6 +27,6 @@ def parse(self, response): # Load a line at the time, pass it to API with tarfile.open(fileobj=BytesIO(response.body), mode="r:gz") as tar: - with tar.extractfile(tar.getnames()[0]) as readfp: - yield self.build_file_from_response(data=readfp, response=response, file_name='data.json', + with tar.extractfile(tar.next()) as f: + yield self.build_file_from_response(data=f, response=response, file_name='data.json', data_type='release_package') diff --git a/tests/extensions/test_kingfisher_pluck.py b/tests/extensions/test_kingfisher_pluck.py index 13a92d1be..b96e5715a 100644 --- a/tests/extensions/test_kingfisher_pluck.py +++ b/tests/extensions/test_kingfisher_pluck.py @@ -1,7 +1,12 @@ + import os from glob import glob from tempfile import TemporaryDirectory +import pytest +from scrapy import Request +from scrapy.exceptions import StopDownload + from kingfisher_scrapy.extensions import KingfisherPluck from kingfisher_scrapy.items import PluckedItem from tests import spider_with_crawler @@ -67,3 +72,40 @@ def test_spider_closed_without_items(): with open(os.path.join(tmpdirname, 'pluck-release-date.csv')) as f: assert 'closed: itemcount,test\n' == f.read() + + +def test_bytes_received_stop_download(): + with TemporaryDirectory() as tmpdirname: + spider = spider_with_crawler(settings={'KINGFISHER_PLUCK_PATH': tmpdirname, + 'KINGFISHER_PLUCK_MAX_BYTES': 1}, release_pointer='/date') + extension = KingfisherPluck.from_crawler(spider.crawler) + request = Request('http://example.com', meta={'file_name': 'test.json'}) + + with pytest.raises(StopDownload): + extension.bytes_received(data={'test': 'test'}, spider=spider, request=request) + + assert extension.max_bytes == 1 + + +def test_bytes_received_dont_stop_download(): + with TemporaryDirectory() as tmpdirname: + spider = spider_with_crawler(settings={'KINGFISHER_PLUCK_PATH': tmpdirname, + 'KINGFISHER_PLUCK_MAX_BYTES': 10}, release_pointer='/date') + extension = KingfisherPluck.from_crawler(spider.crawler) + request = Request('http://example.com', meta={'file_name': 'test.json'}) + + extension.bytes_received(data={'test': 'test'}, spider=spider, request=request) + assert extension.bytes_received_per_spider[spider.name] == 1 + + assert extension.max_bytes == 10 + + +def test_bytes_received_for_compressed_files(): + with TemporaryDirectory() as tmpdirname: + spider = spider_with_crawler(settings={'KINGFISHER_PLUCK_PATH': tmpdirname, + 'KINGFISHER_PLUCK_MAX_BYTES': 10}, release_pointer='/date') + extension = KingfisherPluck.from_crawler(spider.crawler) + request = Request('http://example.com', meta={'file_name': 'test.zip'}) + + extension.bytes_received(data={'test': 'test'}, spider=spider, request=request) + assert extension.bytes_received_per_spider[spider.name] == 0