Skip to content

Commit

Permalink
Merge 317250b into f077936
Browse files Browse the repository at this point in the history
  • Loading branch information
yolile committed Apr 30, 2020
2 parents f077936 + 317250b commit 79273cc
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 40 deletions.
69 changes: 55 additions & 14 deletions kingfisher_scrapy/base_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from io import BytesIO
from zipfile import ZipFile

import ijson
import scrapy

from kingfisher_scrapy import util
from kingfisher_scrapy.exceptions import SpiderArgumentError


Expand Down Expand Up @@ -36,6 +38,10 @@ class BaseSpider(scrapy.Spider):
scrapy crawl spider_name -a note='Started by NAME.'
"""

MAX_SAMPLE = 10
MAX_RELEASES_PER_PACKAGE = 100

def __init__(self, sample=None, note=None, from_date=None, until_date=None, *args, **kwargs):
super().__init__(*args, **kwargs)

Expand Down Expand Up @@ -152,18 +158,46 @@ def _get_crawl_path(self):
name += '_sample'
return os.path.join(name, self.get_start_time('%Y%m%d_%H%M%S'))

def _build_file_item(self, number, line, data_type, url, encoding):
return {
'success': True,
'number': number,
'file_name': 'data.json',
'data': line,
'data_type': data_type,
'url': url,
'encoding': encoding,
}

def parse_json_lines(self, f, data_type, url, encoding='utf-8'):
for number, line in enumerate(f, 1):
yield {
'success': True,
'number': number,
'file_name': 'data.json',
'data': line,
'data_type': data_type,
'url': url,
'encoding': encoding,
}
if self.sample and number > 9:
if self.sample and number > self.MAX_SAMPLE:
break
if isinstance(line, bytes):
line = line.decode()
yield self._build_file_item(number, line, data_type, url, encoding)

def get_package(self, f, array_name):
"""
Returns the package data from a array_name_package object
"""
package = {}
for item in util.items(ijson.parse(f), '', array_name=array_name):
package.update(item)
return package

def parse_json_array(self, f_package, f_list, data_type, url, encoding='utf-8', array_field_name='releases'):
if self.sample:
size = self.MAX_SAMPLE
else:
size = self.MAX_RELEASES_PER_PACKAGE

package = self.get_package(f_package, array_field_name)

for number, items in enumerate(util.grouper(ijson.items(f_list, '{}.item'.format(array_field_name)), size), 1):
package[array_field_name] = [item for item in items if item is not None]
yield self._build_file_item(number, json.dumps(package, default=util.default), data_type, url, encoding)
if self.sample:
break


Expand All @@ -172,15 +206,18 @@ def parse_zipfile(self, response, data_type, file_format=None, encoding='utf-8')
"""
Handling response with JSON data in ZIP files
:param str file_format: The zipped files format. If this is set to 'json_lines', then the zipped file will be
slitted by lines before send it to kingfisher-process and only the zip file will be
stored as file.
:param str file_format: The zipped file's format. If this is set to "json_lines", then each line of the zipped
file will be yielded separately. If this is set to "release_package", then the releases
will be re-packaged in groups of :const:~kingfisher_scrapy.base_spider.BaseSpider.
`~kingfisher_scrapy.base_spider.BaseSpider.MAX_RELEASES_PER_PACKAGE` and yielded.
In both cases, only the zipped file will be saved to disk. If this is not set, the file
will be yielded and saved to disk.
:param response response: the response that contains the zip file.
:param str data_type: the zipped files data_type
:param str encoding: the zipped files encoding. Default to utf-8
"""
if response.status == 200:
if file_format == 'json_lines':
if file_format:
self.save_response_to_disk(response, 'file.zip')
zip_file = ZipFile(BytesIO(response.body))
for finfo in zip_file.infolist():
Expand All @@ -190,6 +227,10 @@ def parse_zipfile(self, response, data_type, file_format=None, encoding='utf-8')
data = zip_file.open(finfo.filename)
if file_format == 'json_lines':
yield from self.parse_json_lines(data, data_type, response.request.url, encoding=encoding)
elif file_format == 'release_package':
package = zip_file.open(finfo.filename)
yield from self.parse_json_array(package, data, data_type, response.request.url,
encoding=encoding)
else:
yield self.save_data_to_disk(data.read(), filename, data_type=data_type, url=response.request.url,
encoding=encoding)
Expand Down
43 changes: 21 additions & 22 deletions kingfisher_scrapy/spiders/argentina_buenos_aires.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import hashlib
import json
from io import BytesIO
from zipfile import ZipFile

import scrapy

from kingfisher_scrapy.base_spider import BaseSpider
from kingfisher_scrapy.base_spider import ZipSpider


class ArgentinaBuenosAires(BaseSpider):
class ArgentinaBuenosAires(ZipSpider):
"""
Bulk download documentation
https://data.buenosaires.gob.ar/dataset/buenos-aires-compras/archivo/2a3d077c-71b6-4ba7-8924-f3e38cf1b8fc
API documentation
https://data.buenosaires.gob.ar/acerca/ckan
Spider arguments
sample
Downloads the zip file and sends 10 releases to kingfisher process.
"""
name = 'argentina_buenos_aires'
start_urls = ['https://data.buenosaires.gob.ar/api/3/action/package_show?id=buenos-aires-compras']
# the data list service takes too long to be downloaded, so we increase the download timeout
Expand All @@ -17,29 +23,22 @@ class ArgentinaBuenosAires(BaseSpider):
def start_requests(self):
yield scrapy.Request(
url='https://data.buenosaires.gob.ar/api/3/action/package_show?id=buenos-aires-compras',
meta={'type': 'meta'}
callback=self.parse_list
)

def parse(self, response):
def parse_list(self, response):
if response.status == 200:
if response.request.meta['type'] == 'meta':
data = json.loads(response.text)
for resource in data['result']['resources']:
if resource['format'].upper() == 'JSON':
yield scrapy.Request(
url=resource['url'],
meta={'type': 'data'}
)
else:
zip_file = ZipFile(BytesIO(response.body))
for finfo in zip_file.infolist():
data = zip_file.open(finfo.filename).read()
yield self.save_data_to_disk(data, finfo.filename, url=response.request.url,
data_type='release_package')
data = json.loads(response.text)
for resource in data['result']['resources']:
if resource['format'].upper() == 'JSON':
yield scrapy.Request(url=resource['url'])
else:
yield {
'success': False,
'file_name': hashlib.md5(response.request.url.encode('utf-8')).hexdigest() + '.json',
'file_name': 'list.json',
'url': response.request.url,
'errors': {'http_code': response.status}
}

def parse(self, response):
yield from self.parse_zipfile(response, 'release_package', file_format='release_package')
2 changes: 1 addition & 1 deletion kingfisher_scrapy/spiders/portugal.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ def parse_list(self, response):
}

def parse(self, response):
yield from self.parse_zipfile(response, data_type='record_package_json_lines',
yield from self.parse_zipfile(response, data_type='record_package',
file_format='json_lines', encoding='iso-8859-1')
60 changes: 60 additions & 0 deletions kingfisher_scrapy/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import itertools
import json
from decimal import Decimal

from ijson import ObjectBuilder, utils


@utils.coroutine
def items_basecoro(target, prefix, map_type=None, array_name=None):
"""
This is copied from ``ijson/common.py``. An ``array_name`` argument is added. If the ``array_name`` is in the
current path, the current event is skipped. Otherwise, the method is identical.
"""
while True:
current, event, value = (yield)
if array_name and array_name in current:
continue
if current == prefix:
if event in ('start_map', 'start_array'):
builder = ObjectBuilder(map_type=map_type)
end_event = event.replace('start', 'end')
while (current, event) != (prefix, end_event):
builder.event(event, value)
current, event, value = (yield)
del builder.containers[:]
target.send(builder.value)
else:
target.send(value)


def items(events, prefix, map_type=None, array_name=None):
"""
This is copied from ``ijson/common.py``. An ``array_name`` argument is added, which is passed as a keyword argument
to :meth:`~kingfisher_scrapy.util.items_basecoro`. Otherwise, the method is identical.
"""
return utils.coros2gen(events,
(items_basecoro, (prefix,), {'map_type': map_type, 'array_name': array_name})
)


def default(obj):
"""
Dumps JSON to a string, and returns it.
"""
if isinstance(obj, Decimal):
return float(obj)
try:
iterable = iter(obj)
except TypeError:
pass
else:
return list(iterable)
return json.JSONEncoder().default(obj)


# See `grouper` recipe: https://docs.python.org/3.8/library/itertools.html#recipes
def grouper(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return itertools.zip_longest(*args, fillvalue=fillvalue)
43 changes: 40 additions & 3 deletions tests/test_base_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,8 @@ def test_parse_zipfile_json_lines():
tmp = os.path.join(files_store, 'test/20010203_040506')
os.makedirs(tmp)
with open(tmp + "test.json", 'w') as f:
f.write('{"key": "value"}\n{"key": "value"}\n{"key": "value"}\n{"key": "value"}\n{"key": "value"}'
'\n{"key": "value"}\n{"key": "value"}\n{"key": "value"}\n{"key": "value"}\n{"key": "value"}'
'\n{"key": "value"}')
for i in range(10):
f.write('{"key": "value"}\n')
with ZipFile(tmp + '/test.zip', 'w') as z:
z.write(tmp + "test.json")
with open(tmp + '/test.zip', 'rb') as z:
Expand All @@ -242,6 +241,44 @@ def test_parse_zipfile_json_lines():
assert total == 10


def test_parse_zipfile_release_package():
response = text.TextResponse('test')
response.status = 200
response.request = Mock()
response.request.meta = {'kf_filename': 'test.json'}
response.request.url = 'url'
with TemporaryDirectory() as tmpdirname:
files_store = os.path.join(tmpdirname, 'data')
tmp = os.path.join(files_store, 'test/20010203_040506')
os.makedirs(tmp)
with open(tmp + "test.json", 'w') as f:
release = {'releases': [], 'publisher': {'name': 'test'},
'extensions': ['a', 'b'], 'license': 'test', 'extra': 1.1}
for i in range(110):
release['releases'].append({'key': 'value'})
json.dump(release, f)
with ZipFile(tmp + '/test.zip', 'w') as z:
z.write(tmp + "test.json")
with open(tmp + '/test.zip', 'rb') as z:
response = response.replace(body=z.read())
spider = spider_with_crawler(spider_class=ZipSpider)
spider.crawler.settings['FILES_STORE'] = files_store
actual = spider.parse_zipfile(response, None, file_format='release_package').__next__()
data = json.loads(actual['data'])
assert actual['success'] is True and actual['number'] == 1
assert data['publisher']['name'] == 'test'
assert data['extensions'] == ['a', 'b']
assert len(data['releases']) == spider.MAX_RELEASES_PER_PACKAGE
spider.sample = True
total = 0
for item in spider.parse_zipfile(response, None, file_format='release_package'):
total = total + 1
data = json.loads(item['data'])
assert item['success'] is True and item['number'] == total
assert len(data['releases']) == spider.MAX_SAMPLE
assert total == 1


def test_date_arguments():
test_date = '2000-01-01'
error_message = "time data 'test' does not match format '%Y-%m-%d'"
Expand Down

0 comments on commit 79273cc

Please sign in to comment.