diff --git a/kingfisher_scrapy/base_spider.py b/kingfisher_scrapy/base_spider.py index 4036afb23..c0eda20df 100644 --- a/kingfisher_scrapy/base_spider.py +++ b/kingfisher_scrapy/base_spider.py @@ -5,8 +5,10 @@ from io import BytesIO from zipfile import ZipFile +import ijson import scrapy +from kingfisher_scrapy import util from kingfisher_scrapy.exceptions import SpiderArgumentError @@ -36,6 +38,10 @@ class BaseSpider(scrapy.Spider): scrapy crawl spider_name -a note='Started by NAME.' """ + + MAX_SAMPLE = 10 + MAX_RELEASES_PER_PACKAGE = 100 + def __init__(self, sample=None, note=None, from_date=None, until_date=None, *args, **kwargs): super().__init__(*args, **kwargs) @@ -152,18 +158,46 @@ def _get_crawl_path(self): name += '_sample' return os.path.join(name, self.get_start_time('%Y%m%d_%H%M%S')) + def _build_file_item(self, number, line, data_type, url, encoding): + return { + 'success': True, + 'number': number, + 'file_name': 'data.json', + 'data': line, + 'data_type': data_type, + 'url': url, + 'encoding': encoding, + } + def parse_json_lines(self, f, data_type, url, encoding='utf-8'): for number, line in enumerate(f, 1): - yield { - 'success': True, - 'number': number, - 'file_name': 'data.json', - 'data': line, - 'data_type': data_type, - 'url': url, - 'encoding': encoding, - } - if self.sample and number > 9: + if self.sample and number > self.MAX_SAMPLE: + break + if isinstance(line, bytes): + line = line.decode() + yield self._build_file_item(number, line, data_type, url, encoding) + + def get_package(self, f, array_name): + """ + Returns the package data from a array_name_package object + """ + package = {} + for item in util.items(ijson.parse(f), '', array_name=array_name): + package.update(item) + return package + + def parse_json_array(self, f_package, f_list, data_type, url, encoding='utf-8', array_field_name='releases'): + if self.sample: + size = self.MAX_SAMPLE + else: + size = self.MAX_RELEASES_PER_PACKAGE + + package = self.get_package(f_package, array_field_name) + + for number, items in enumerate(util.grouper(ijson.items(f_list, '{}.item'.format(array_field_name)), size), 1): + package[array_field_name] = [item for item in items if item is not None] + yield self._build_file_item(number, json.dumps(package, default=util.default), data_type, url, encoding) + if self.sample: break @@ -172,15 +206,18 @@ def parse_zipfile(self, response, data_type, file_format=None, encoding='utf-8') """ Handling response with JSON data in ZIP files - :param str file_format: The zipped files format. If this is set to 'json_lines', then the zipped file will be - slitted by lines before send it to kingfisher-process and only the zip file will be - stored as file. + :param str file_format: The zipped file's format. If this is set to "json_lines", then each line of the zipped + file will be yielded separately. If this is set to "release_package", then the releases + will be re-packaged in groups of :const:~kingfisher_scrapy.base_spider.BaseSpider. + `~kingfisher_scrapy.base_spider.BaseSpider.MAX_RELEASES_PER_PACKAGE` and yielded. + In both cases, only the zipped file will be saved to disk. If this is not set, the file + will be yielded and saved to disk. :param response response: the response that contains the zip file. :param str data_type: the zipped files data_type :param str encoding: the zipped files encoding. Default to utf-8 """ if response.status == 200: - if file_format == 'json_lines': + if file_format: self.save_response_to_disk(response, 'file.zip') zip_file = ZipFile(BytesIO(response.body)) for finfo in zip_file.infolist(): @@ -190,6 +227,10 @@ def parse_zipfile(self, response, data_type, file_format=None, encoding='utf-8') data = zip_file.open(finfo.filename) if file_format == 'json_lines': yield from self.parse_json_lines(data, data_type, response.request.url, encoding=encoding) + elif file_format == 'release_package': + package = zip_file.open(finfo.filename) + yield from self.parse_json_array(package, data, data_type, response.request.url, + encoding=encoding) else: yield self.save_data_to_disk(data.read(), filename, data_type=data_type, url=response.request.url, encoding=encoding) diff --git a/kingfisher_scrapy/spiders/argentina_buenos_aires.py b/kingfisher_scrapy/spiders/argentina_buenos_aires.py index 6a4ae297d..fb9ccff2c 100644 --- a/kingfisher_scrapy/spiders/argentina_buenos_aires.py +++ b/kingfisher_scrapy/spiders/argentina_buenos_aires.py @@ -1,14 +1,20 @@ -import hashlib import json -from io import BytesIO -from zipfile import ZipFile import scrapy -from kingfisher_scrapy.base_spider import BaseSpider +from kingfisher_scrapy.base_spider import ZipSpider -class ArgentinaBuenosAires(BaseSpider): +class ArgentinaBuenosAires(ZipSpider): + """ + Bulk download documentation + https://data.buenosaires.gob.ar/dataset/buenos-aires-compras/archivo/2a3d077c-71b6-4ba7-8924-f3e38cf1b8fc + API documentation + https://data.buenosaires.gob.ar/acerca/ckan + Spider arguments + sample + Downloads the zip file and sends 10 releases to kingfisher process. + """ name = 'argentina_buenos_aires' start_urls = ['https://data.buenosaires.gob.ar/api/3/action/package_show?id=buenos-aires-compras'] # the data list service takes too long to be downloaded, so we increase the download timeout @@ -17,29 +23,22 @@ class ArgentinaBuenosAires(BaseSpider): def start_requests(self): yield scrapy.Request( url='https://data.buenosaires.gob.ar/api/3/action/package_show?id=buenos-aires-compras', - meta={'type': 'meta'} + callback=self.parse_list ) - def parse(self, response): + def parse_list(self, response): if response.status == 200: - if response.request.meta['type'] == 'meta': - data = json.loads(response.text) - for resource in data['result']['resources']: - if resource['format'].upper() == 'JSON': - yield scrapy.Request( - url=resource['url'], - meta={'type': 'data'} - ) - else: - zip_file = ZipFile(BytesIO(response.body)) - for finfo in zip_file.infolist(): - data = zip_file.open(finfo.filename).read() - yield self.save_data_to_disk(data, finfo.filename, url=response.request.url, - data_type='release_package') + data = json.loads(response.text) + for resource in data['result']['resources']: + if resource['format'].upper() == 'JSON': + yield scrapy.Request(url=resource['url']) else: yield { 'success': False, - 'file_name': hashlib.md5(response.request.url.encode('utf-8')).hexdigest() + '.json', + 'file_name': 'list.json', 'url': response.request.url, 'errors': {'http_code': response.status} } + + def parse(self, response): + yield from self.parse_zipfile(response, 'release_package', file_format='release_package') diff --git a/kingfisher_scrapy/spiders/portugal.py b/kingfisher_scrapy/spiders/portugal.py index 7b998d0c4..67c054db6 100644 --- a/kingfisher_scrapy/spiders/portugal.py +++ b/kingfisher_scrapy/spiders/portugal.py @@ -43,5 +43,5 @@ def parse_list(self, response): } def parse(self, response): - yield from self.parse_zipfile(response, data_type='record_package_json_lines', + yield from self.parse_zipfile(response, data_type='record_package', file_format='json_lines', encoding='iso-8859-1') diff --git a/kingfisher_scrapy/util.py b/kingfisher_scrapy/util.py new file mode 100644 index 000000000..7317da914 --- /dev/null +++ b/kingfisher_scrapy/util.py @@ -0,0 +1,60 @@ +import itertools +import json +from decimal import Decimal + +from ijson import ObjectBuilder, utils + + +@utils.coroutine +def items_basecoro(target, prefix, map_type=None, array_name=None): + """ + This is copied from ``ijson/common.py``. An ``array_name`` argument is added. If the ``array_name`` is in the + current path, the current event is skipped. Otherwise, the method is identical. + """ + while True: + current, event, value = (yield) + if array_name and array_name in current: + continue + if current == prefix: + if event in ('start_map', 'start_array'): + builder = ObjectBuilder(map_type=map_type) + end_event = event.replace('start', 'end') + while (current, event) != (prefix, end_event): + builder.event(event, value) + current, event, value = (yield) + del builder.containers[:] + target.send(builder.value) + else: + target.send(value) + + +def items(events, prefix, map_type=None, array_name=None): + """ + This is copied from ``ijson/common.py``. An ``array_name`` argument is added, which is passed as a keyword argument + to :meth:`~kingfisher_scrapy.util.items_basecoro`. Otherwise, the method is identical. + + """ + return utils.coros2gen(events, + (items_basecoro, (prefix,), {'map_type': map_type, 'array_name': array_name}) + ) + + +def default(obj): + """ + Dumps JSON to a string, and returns it. + """ + if isinstance(obj, Decimal): + return float(obj) + try: + iterable = iter(obj) + except TypeError: + pass + else: + return list(iterable) + return json.JSONEncoder().default(obj) + + +# See `grouper` recipe: https://docs.python.org/3.8/library/itertools.html#recipes +def grouper(iterable, n, fillvalue=None): + args = [iter(iterable)] * n + return itertools.zip_longest(*args, fillvalue=fillvalue) diff --git a/requirements.in b/requirements.in index ab64af98c..34c50ac3d 100644 --- a/requirements.in +++ b/requirements.in @@ -5,3 +5,4 @@ rarfile requests Scrapy scrapyd-client +ijson>=3 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 6541b8237..669121e80 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,7 @@ cryptography==2.8 # via pyopenssl, scrapy, service-identity cssselect==1.1.0 # via parsel, scrapy hyperlink==19.0.0 # via twisted idna==2.8 # via hyperlink, requests +ijson==3.0.3 incremental==17.5.0 # via twisted lxml==4.4.2 # via parsel, scrapy parsel==1.5.2 # via scrapy diff --git a/requirements_dev.txt b/requirements_dev.txt index 3d2cb46d6..1ebbde865 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -20,6 +20,7 @@ entrypoints==0.3 # via flake8 flake8==3.7.9 hyperlink==19.0.0 idna==2.8 +ijson==3.0.3 importlib-metadata==1.3.0 # via pluggy, pytest incremental==17.5.0 isort==4.3.21 diff --git a/tests/test_base_spider.py b/tests/test_base_spider.py index 01ee92b26..dbfdab5d8 100644 --- a/tests/test_base_spider.py +++ b/tests/test_base_spider.py @@ -223,9 +223,8 @@ def test_parse_zipfile_json_lines(): tmp = os.path.join(files_store, 'test/20010203_040506') os.makedirs(tmp) with open(tmp + "test.json", 'w') as f: - f.write('{"key": "value"}\n{"key": "value"}\n{"key": "value"}\n{"key": "value"}\n{"key": "value"}' - '\n{"key": "value"}\n{"key": "value"}\n{"key": "value"}\n{"key": "value"}\n{"key": "value"}' - '\n{"key": "value"}') + for i in range(10): + f.write('{"key": "value"}\n') with ZipFile(tmp + '/test.zip', 'w') as z: z.write(tmp + "test.json") with open(tmp + '/test.zip', 'rb') as z: @@ -242,6 +241,44 @@ def test_parse_zipfile_json_lines(): assert total == 10 +def test_parse_zipfile_release_package(): + response = text.TextResponse('test') + response.status = 200 + response.request = Mock() + response.request.meta = {'kf_filename': 'test.json'} + response.request.url = 'url' + with TemporaryDirectory() as tmpdirname: + files_store = os.path.join(tmpdirname, 'data') + tmp = os.path.join(files_store, 'test/20010203_040506') + os.makedirs(tmp) + with open(tmp + "test.json", 'w') as f: + release = {'releases': [], 'publisher': {'name': 'test'}, + 'extensions': ['a', 'b'], 'license': 'test', 'extra': 1.1} + for i in range(110): + release['releases'].append({'key': 'value'}) + json.dump(release, f) + with ZipFile(tmp + '/test.zip', 'w') as z: + z.write(tmp + "test.json") + with open(tmp + '/test.zip', 'rb') as z: + response = response.replace(body=z.read()) + spider = spider_with_crawler(spider_class=ZipSpider) + spider.crawler.settings['FILES_STORE'] = files_store + actual = spider.parse_zipfile(response, None, file_format='release_package').__next__() + data = json.loads(actual['data']) + assert actual['success'] is True and actual['number'] == 1 + assert data['publisher']['name'] == 'test' + assert data['extensions'] == ['a', 'b'] + assert len(data['releases']) == spider.MAX_RELEASES_PER_PACKAGE + spider.sample = True + total = 0 + for item in spider.parse_zipfile(response, None, file_format='release_package'): + total = total + 1 + data = json.loads(item['data']) + assert item['success'] is True and item['number'] == total + assert len(data['releases']) == spider.MAX_SAMPLE + assert total == 1 + + def test_date_arguments(): test_date = '2000-01-01' error_message = "time data 'test' does not match format '%Y-%m-%d'"