Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Split large files with ijson #366

Merged
merged 17 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 77 additions & 13 deletions kingfisher_scrapy/base_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import json
import os
from datetime import datetime
from decimal import Decimal
from io import BytesIO
from zipfile import ZipFile

import ijson
import scrapy

from kingfisher_scrapy.exceptions import SpiderArgumentError
Expand Down Expand Up @@ -36,6 +38,10 @@ class BaseSpider(scrapy.Spider):

scrapy crawl spider_name -a note='Started by NAME.'
"""

MAX_SAMPLE = 10
MAX_RELEASES_PER_PACKAGE = 100

def __init__(self, sample=None, note=None, from_date=None, until_date=None, *args, **kwargs):
super().__init__(*args, **kwargs)

Expand Down Expand Up @@ -152,35 +158,89 @@ def _get_crawl_path(self):
name += '_sample'
return os.path.join(name, self.get_start_time('%Y%m%d_%H%M%S'))

@staticmethod
def _parse_json_item(number, line, data_type, url, encoding):
yield {
jpmckinney marked this conversation as resolved.
Show resolved Hide resolved
'success': True,
'number': number,
'file_name': 'data.json',
'data': line,
jpmckinney marked this conversation as resolved.
Show resolved Hide resolved
'data_type': data_type,
'url': url,
'encoding': encoding,
}

def parse_json_lines(self, f, data_type, url, encoding='utf-8'):
for number, line in enumerate(f, 1):
yield {
'success': True,
'number': number,
'file_name': 'data.json',
'data': line,
'data_type': data_type,
'url': url,
'encoding': encoding,
}
if self.sample and number > 9:
if self.sample and number > self.MAX_SAMPLE:
break
yield from self._parse_json_item(number, line, data_type, url, encoding)

@staticmethod
def get_package(f, array_name):
package = {'extensions': []}
for prefix, event, value in ijson.parse(f):
if prefix and 'map' not in event and array_name not in prefix:
if 'extensions' in prefix:
if value:
package['extensions'].append(value)
elif '.' in prefix:
object_name = prefix.split('.')[0]
object_field = prefix.split('.')[1]
if object_name not in package:
package[object_name] = {}
package[object_name][object_field] = value
else:
package[prefix] = value
jpmckinney marked this conversation as resolved.
Show resolved Hide resolved
if not package['extensions']:
del(package['extensions'])
return package

def parse_json_array(self, f_package, f_list, data_type, url, encoding='utf-8', array_field_name='releases'):
packages = 0
package = self.get_package(f_package, array_field_name)
package[array_field_name] = []
for number, item in enumerate(ijson.items(f_list, '{}.item'.format(array_field_name))):
if self.sample and number > self.MAX_SAMPLE:
break
if len(package[array_field_name]) < self.MAX_RELEASES_PER_PACKAGE:
package[array_field_name].append(item)
else:
packages += 1
yield from self._parse_json_item(packages, self.json_dumps(package), data_type, url, encoding)
package[array_field_name].clear()
if package[array_field_name]:
yield from self._parse_json_item(packages + 1, self.json_dumps(package), data_type, url, encoding)
jpmckinney marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def json_dumps(data):
"""
From ocdskit, returns the data as JSON.
"""
jpmckinney marked this conversation as resolved.
Show resolved Hide resolved
def default(obj):
if isinstance(obj, Decimal):
return float(obj)
raise TypeError('%s is not JSON serializable' % repr(obj))
jpmckinney marked this conversation as resolved.
Show resolved Hide resolved

return json.dumps(data, default=default)


class ZipSpider(BaseSpider):

jpmckinney marked this conversation as resolved.
Show resolved Hide resolved
def parse_zipfile(self, response, data_type, file_format=None, encoding='utf-8'):
"""
Handling response with JSON data in ZIP files

:param str file_format: The zipped files format. If this is set to 'json_lines', then the zipped file will be
slitted by lines before send it to kingfisher-process and only the zip file will be
stored as file.
splitted by lines before send it to kingfisher-process and only the zip file will be
stored as file. If it is set to 'release_package' the zipped file will be splitted by
releases.
jpmckinney marked this conversation as resolved.
Show resolved Hide resolved
:param response response: the response that contains the zip file.
:param str data_type: the zipped files data_type
:param str encoding: the zipped files encoding. Default to utf-8
"""
if response.status == 200:
if file_format == 'json_lines':
if file_format:
self.save_response_to_disk(response, 'file.zip')
zip_file = ZipFile(BytesIO(response.body))
for finfo in zip_file.infolist():
Expand All @@ -190,6 +250,10 @@ def parse_zipfile(self, response, data_type, file_format=None, encoding='utf-8')
data = zip_file.open(finfo.filename)
if file_format == 'json_lines':
yield from self.parse_json_lines(data, data_type, response.request.url, encoding=encoding)
elif file_format == 'release_package':
data_package = zip_file.open(finfo.filename)
jpmckinney marked this conversation as resolved.
Show resolved Hide resolved
yield from self.parse_json_array(data_package, data, data_type, response.request.url,
encoding=encoding)
else:
yield self.save_data_to_disk(data.read(), filename, data_type, response.request.url,
encoding=encoding)
Expand Down
37 changes: 15 additions & 22 deletions kingfisher_scrapy/spiders/argentina_buenos_aires.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import hashlib
import json
from io import BytesIO
from zipfile import ZipFile

import scrapy

from kingfisher_scrapy.base_spider import BaseSpider
from kingfisher_scrapy.base_spider import ZipSpider


class ArgentinaBuenosAires(BaseSpider):
class ArgentinaBuenosAires(ZipSpider):
jpmckinney marked this conversation as resolved.
Show resolved Hide resolved
name = 'argentina_buenos_aires'
start_urls = ['https://data.buenosaires.gob.ar/api/3/action/package_show?id=buenos-aires-compras']
# the data list service takes too long to be downloaded, so we increase the download timeout
Expand All @@ -17,29 +14,25 @@ class ArgentinaBuenosAires(BaseSpider):
def start_requests(self):
yield scrapy.Request(
url='https://data.buenosaires.gob.ar/api/3/action/package_show?id=buenos-aires-compras',
meta={'type': 'meta'}
callback=self.parse_list
)

def parse(self, response):
@staticmethod
jpmckinney marked this conversation as resolved.
Show resolved Hide resolved
def parse_list(response):
if response.status == 200:
if response.request.meta['type'] == 'meta':
data = json.loads(response.text)
for resource in data['result']['resources']:
if resource['format'].upper() == 'JSON':
yield scrapy.Request(
url=resource['url'],
meta={'type': 'data'}
)
else:
zip_file = ZipFile(BytesIO(response.body))
for finfo in zip_file.infolist():
data = zip_file.open(finfo.filename).read()
yield self.save_data_to_disk(data, finfo.filename, url=response.request.url,
data_type='release_package')
data = json.loads(response.text)
for resource in data['result']['resources']:
if resource['format'].upper() == 'JSON':
yield scrapy.Request(
url=resource['url']
)
jpmckinney marked this conversation as resolved.
Show resolved Hide resolved
else:
yield {
'success': False,
'file_name': hashlib.md5(response.request.url.encode('utf-8')).hexdigest() + '.json',
'file_name': 'list.json',
'url': response.request.url,
'errors': {'http_code': response.status}
}

def parse(self, response):
yield from self.parse_zipfile(response, 'release_package', file_format='release_package')
2 changes: 1 addition & 1 deletion kingfisher_scrapy/spiders/portugal.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ def parse_list(self, response):
}

def parse(self, response):
yield from self.parse_zipfile(response, data_type='record_package_json_lines',
yield from self.parse_zipfile(response, data_type='record_package',
file_format='json_lines', encoding='iso-8859-1')
37 changes: 34 additions & 3 deletions tests/test_base_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,8 @@ def test_parse_zipfile_json_lines():
tmp = os.path.join(files_store, 'test/20010203_040506')
os.makedirs(tmp)
with open(tmp + "test.json", 'w') as f:
f.write('{"key": "value"}\n{"key": "value"}\n{"key": "value"}\n{"key": "value"}\n{"key": "value"}'
'\n{"key": "value"}\n{"key": "value"}\n{"key": "value"}\n{"key": "value"}\n{"key": "value"}'
'\n{"key": "value"}')
for i in range(10):
f.write('{"key": "value"}\n')
with ZipFile(tmp + '/test.zip', 'w') as z:
z.write(tmp + "test.json")
with open(tmp + '/test.zip', 'rb') as z:
Expand All @@ -242,6 +241,38 @@ def test_parse_zipfile_json_lines():
assert total == 10


def test_parse_zipfile_release_package():
response = text.TextResponse('test')
response.status = 200
response.request = Mock()
response.request.meta = {'kf_filename': 'test.json'}
response.request.url = 'url'
with TemporaryDirectory() as tmpdirname:
files_store = os.path.join(tmpdirname, 'data')
tmp = os.path.join(files_store, 'test/20010203_040506')
os.makedirs(tmp)
with open(tmp + "test.json", 'w') as f:
release = {'releases': [], 'publisher': {'name': 'test'},
'extensions': ['a', 'b'], 'license': 'test'}
for i in range(110):
release['releases'].append({'key': 'value'})
json.dump(release, f)
with ZipFile(tmp + '/test.zip', 'w') as z:
z.write(tmp + "test.json")
with open(tmp + '/test.zip', 'rb') as z:
response = response.replace(body=z.read())
spider = spider_with_crawler(spider_class=ZipSpider)
spider.crawler.settings['FILES_STORE'] = files_store
for number, actual in enumerate(spider.parse_zipfile(response, None, file_format='release_package'), 1):
assert actual['success'] is True and actual['number'] == number
spider.sample = True
total = 0
for item in spider.parse_zipfile(response, None, file_format='release_package'):
total = total + 1
assert item['success'] is True and item['number'] == total
assert total == 1


def test_date_arguments():
test_date = '2000-01-01'
error_message = "time data 'test' does not match format '%Y-%m-%d'"
Expand Down