Skip to content

Commit

Permalink
Merge pull request #1029 from open-contracting/rootpathmiddleware-limit
Browse files Browse the repository at this point in the history
RootPathMiddleware: Combine into multiple packages
  • Loading branch information
yolile committed Oct 5, 2023
2 parents f7fab75 + 9c66ea5 commit 9fa02e3
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 74 deletions.
126 changes: 73 additions & 53 deletions kingfisher_scrapy/spidermiddlewares.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import copy
import itertools
import json
from zipfile import BadZipFile

Expand All @@ -6,6 +8,19 @@
from kingfisher_scrapy import util
from kingfisher_scrapy.items import File, FileItem

MAX_GROUP_SIZE = 100


# Avoid reading the rest of a large file, since the rest of the items will be dropped.
def sample_filled(spider, number):
return spider.sample and number > spider.sample


def group_size(spider):
if spider.sample:
return min(spider.sample, MAX_GROUP_SIZE)
return MAX_GROUP_SIZE


class ConcatenatedJSONMiddleware:
"""
Expand All @@ -26,8 +41,7 @@ def process_spider_output(self, response, result, spider):

# ijson can read from bytes or a file-like object.
for number, obj in enumerate(util.transcode(spider, ijson.items, data, '', multiple_values=True), 1):
# Avoid reading the rest of a large file, since the rest of the items will be dropped.
if spider.sample and number > spider.sample:
if sample_filled(spider, number):
return

yield spider.build_file_item(number, obj, item)
Expand All @@ -49,14 +63,12 @@ def process_spider_output(self, response, result, spider):
continue

data = item['data']

# Data can be bytes or a file-like object.
if isinstance(data, bytes):
data = data.splitlines(True)

for number, line in enumerate(data, 1):
# Avoid reading the rest of a large file, since the rest of the items will be dropped.
if spider.sample and number > spider.sample:
if sample_filled(spider, number):
return

yield spider.build_file_item(number, line, item)
Expand All @@ -65,8 +77,8 @@ def process_spider_output(self, response, result, spider):
class RootPathMiddleware:
"""
If the spider's ``root_path`` class attribute is non-empty, replaces the item's ``data`` with the objects at that
prefix; if there are multiple releases, records or packages at that prefix, combines them into a single package,
and updates the item's ``data_type`` if needed. Otherwise, yields the original item.
prefix; if there are multiple releases, records or packages at that prefix, combines them into packages in groups
of 100, and updates the item's ``data_type`` if needed. Otherwise, yields the original item.
"""

def process_spider_output(self, response, result, spider):
Expand All @@ -79,26 +91,13 @@ def process_spider_output(self, response, result, spider):
continue

data = item['data']
is_multiple = 'item' in spider.root_path.split('.')
is_package = 'package' in item['data_type']

# Re-encode the data, to traverse the JSON using only ijson, instead of either ijson or Python.
if isinstance(data, (dict, list)):
data = util.json_dumps(data).encode()

if 'release' in item['data_type']:
key = 'releases'
data_type = 'release_package'
else:
key = 'records'
data_type = 'record_package'

package = {key: [], 'version': spider.ocds_version}

for number, obj in enumerate(util.transcode(spider, ijson.items, data, spider.root_path), 1):
# Avoid reading the rest of a large file, since the rest of the items will be dropped.
if spider.sample and number > spider.sample:
break
iterable = util.transcode(spider, ijson.items, data, spider.root_path)

if 'item' in spider.root_path.split('.'):
# Two common issues in OCDS data are:
#
# - Multiple releases or records, without a package
Expand All @@ -107,25 +106,50 @@ def process_spider_output(self, response, result, spider):
# Yielding each release, record or package creates a lot of overhead in terms of the number of files
# written, the number of messages in RabbitMQ and the number of rows in PostgreSQL.
#
# We fix the packaging to reduce the overhead.
if is_multiple:
# Assume that the `extensions` are the same for all packages.
if number == 1 and is_package:
package = obj.copy()
package[key] = []

if is_package:
package[key].extend(obj[key])
else:
package[key].append(obj)
# We re-package in groups of 100 to reduce the overhead.

is_package = 'package' in item['data_type']

if 'release' in item['data_type']:
key = 'releases'
item['data_type'] = 'release_package'
else:
item['data'] = obj
key = 'records'
item['data_type'] = 'record_package'

try:
head = next(iterable)
except StopIteration:
# Always yield an item, even if the root_path points to an empty object.
# https://github.com/open-contracting/kingfisher-collect/pull/944#issuecomment-1149156552
item['data'] = {'version': spider.ocds_version, key: []}
yield item
else:
iterable = itertools.chain([head], iterable)
for number, items in enumerate(util.grouper(iterable, group_size(spider)), 1):
if sample_filled(spider, number):
return

# Omit the None values returned by `grouper(*, fillvalue=None)`.
items = filter(None, items)

if is_package:
# Assume that the `extensions` are the same for all packages.
package = next(items)
for other in items:
package[key].extend(other[key])
else:
package = {'version': spider.ocds_version, key: list(items)}

yield spider.build_file_item(number, package, item)
else:
# Iterates at most once.
for number, obj in enumerate(iterable, 1):
if sample_filled(spider, number):
return

if is_multiple:
item['data'] = package
item['data_type'] = data_type
yield item
item['data'] = obj
yield item


class AddPackageMiddleware:
Expand Down Expand Up @@ -156,7 +180,7 @@ def process_spider_output(self, response, result, spider):
else:
key = 'records'

item['data'] = {key: [data], 'version': spider.ocds_version}
item['data'] = {'version': spider.ocds_version, key: [data]}
item['data_type'] += '_package'

yield item
Expand All @@ -181,27 +205,23 @@ def process_spider_output(self, response, result, spider):

data = item['data']

if spider.sample:
size = spider.sample
else:
size = 100
if spider.data_type == 'release_package':
if item['data_type'] == 'release_package':
key = 'releases'
else:
key = 'records'

package = self._get_package_metadata(spider, data['package'], key, item['data_type'])
template = self._get_package_metadata(spider, data['package'], key, item['data_type'])
iterable = util.transcode(spider, ijson.items, data['data'], f'{key}.item')
# We yield packages containing a maximum of 100 releases or records.
for number, items in enumerate(util.grouper(iterable, size), 1):
# Avoid reading the rest of a large file, since the rest of the items will be dropped.
if spider.sample and number > spider.sample:

for number, items in enumerate(util.grouper(iterable, group_size(spider)), 1):
if sample_filled(spider, number):
return

package[key] = filter(None, items)
data = util.json_dumps(package).encode()
package = copy.deepcopy(template)
# Omit the None values returned by `grouper(*, fillvalue=None)`.
package[key] = list(filter(None, items))

yield spider.build_file_item(number, data, item)
yield spider.build_file_item(number, package, item)

def _get_package_metadata(self, spider, data, skip_key, data_type):
"""
Expand Down
74 changes: 53 additions & 21 deletions tests/test_spidermiddlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ def test_passthrough(middleware_class, item):


@pytest.mark.parametrize('middleware_class,attribute,value,override', [
(ConcatenatedJSONMiddleware, 'concatenated_json', True, {'data': {'a': [{'b': 'c'}]}, 'number': 1}),
(LineDelimitedMiddleware, 'line_delimited', True, {'data': b'{"a":[{"b": "c"}]}', 'number': 1}),
(ConcatenatedJSONMiddleware, 'concatenated_json', True,
{'data': {'a': [{'b': 'c'}]}, 'number': 1}),
(LineDelimitedMiddleware, 'line_delimited', True,
{'data': b'{"a":[{"b": "c"}]}', 'number': 1}),
(RootPathMiddleware, 'root_path', 'a.item',
{'data': {'releases': [{'b': 'c'}], 'version': '1.1'}, 'data_type': 'release_package'}),
{'data': {'releases': [{'b': 'c'}], 'version': '1.1'}, 'data_type': 'release_package', 'number': 1}),
(AddPackageMiddleware, 'data_type', 'release',
{'data': {'releases': [{'a': [{'b': 'c'}]}], 'version': '1.1'}, 'data_type': 'release_package'}),
# ResizePackageMiddleware is only used with CompressedFileSpider and BigFileSpider.
Expand Down Expand Up @@ -178,6 +180,8 @@ def test_add_package_middleware(data_type, data, root_path):
'file_name': 'test.json',
'url': 'http://test.com',
}
if 'item' in root_path:
expected['number'] = 1

if 'package' in data_type:
expected['data'] = {f"{data_type[:-8]}s": [{"ocid": "abc"}], "uri": "test"}
Expand All @@ -189,7 +193,7 @@ def test_add_package_middleware(data_type, data, root_path):
assert item == expected


@pytest.mark.parametrize('sample,len_items,len_releases', [(None, 2, 100), (5, 5, 5)])
@pytest.mark.parametrize('sample,len_items,len_releases', [(None, 2, 100), (5, 5, 5), (200, 2, 100)])
@pytest.mark.parametrize('encoding,character', [('utf-8', b'\xc3\x9a'), ('iso-8859-1', b'\xda')])
@pytest.mark.parametrize('data_type, key', [('record_package', 'records'), ('release_package', 'releases')])
def test_resize_package_middleware(sample, len_items, len_releases, encoding, character, data_type, key):
Expand Down Expand Up @@ -223,8 +227,7 @@ def test_resize_package_middleware(sample, len_items, len_releases, encoding, ch
assert item['file_name'] == 'archive-test.json'
assert item['url'] == 'http://example.com'
assert item['number'] == i
assert isinstance(item['data'], bytes)
assert len(json.loads(item['data'])[key]) == len_releases
assert len(item['data'][key]) == len_releases
assert item['data_type'] == data_type


Expand Down Expand Up @@ -396,18 +399,52 @@ def test_retry_data_error_middleware(exception):
list(generator)


@pytest.mark.parametrize('root_path,data_type,sample,data,expected_data,expected_data_type', [
@pytest.mark.parametrize('root_path,data_type,data,expected_data,expected_data_type', [
# Empty root path.
('', 'my_data_type', None,
('', 'my_data_type',
{'a': 'b'},
{'a': 'b'}, 'my_data_type'),
# Root path without "item".
('x', 'my_data_type', None,
('x', 'my_data_type',
{'x': {'a': 'b'}},
{'a': 'b'}, 'my_data_type'),
# Root paths with "item" ...
# ... with an empty array, for data_type = "release".
('item', 'release',
[],
{'releases': [], 'version': '1.1'}, 'release_package'),
# ... with an empty array, for data_type = "record_package".
('item', 'record_package',
[],
{'records': [], 'version': '1.1'}, 'record_package'),
])
@pytest.mark.parametrize('klass', [File, FileItem])
def test_root_path_middleware(root_path, data_type, data, expected_data, expected_data_type, klass):
spider = spider_with_crawler()
middleware = RootPathMiddleware()
spider.data_type = data_type
spider.root_path = root_path

item = klass({
'file_name': 'test.json',
'data': data,
'data_type': data_type,
'url': 'http://test.com',
})

generator = middleware.process_spider_output(None, [item], spider)
transformed_items = list(generator)

assert len(transformed_items) == 1
for transformed_item in transformed_items:
assert isinstance(transformed_item, klass)
assert transformed_item['file_name'] == 'test.json'
assert transformed_item['data'] == expected_data
assert transformed_item['data_type'] == expected_data_type
assert transformed_item['url'] == 'http://test.com'


@pytest.mark.parametrize('root_path,data_type,sample,data,expected_data,expected_data_type', [
# ... for data_type = "release".
('item', 'release', None,
[{'a': 'b'}, {'c': 'd'}],
Expand All @@ -428,17 +465,9 @@ def test_retry_data_error_middleware(exception):
('item', 'release_package', None,
[{'releases': [{'a': 'b'}, {'c': 'd'}], 'x': 'y'}, {'releases': [{'e': 'f'}, {'g': 'h'}]}],
{'releases': [{'a': 'b'}, {'c': 'd'}, {'e': 'f'}, {'g': 'h'}], 'x': 'y'}, 'release_package'),
# ... with an empty object, for data_type = "release".
('item', 'release', None,
[],
{'releases': [], 'version': '1.1'}, 'release_package'),
# ... with an empty object, for data_type = "record_package".
('item', 'record_package', None,
[],
{'records': [], 'version': '1.1'}, 'record_package'),
])
@pytest.mark.parametrize('klass', [File, FileItem])
def test_root_path_middleware(root_path, data_type, sample, data, expected_data, expected_data_type, klass):
def test_root_path_middleware_item(root_path, data_type, sample, data, expected_data, expected_data_type, klass):
spider = spider_with_crawler()
middleware = RootPathMiddleware()
spider.data_type = data_type
Expand All @@ -457,6 +486,9 @@ def test_root_path_middleware(root_path, data_type, sample, data, expected_data,

assert len(transformed_items) == 1
for transformed_item in transformed_items:
assert isinstance(transformed_item, klass)
assert isinstance(transformed_item, FileItem)
assert transformed_item['number'] == 1
assert transformed_item['file_name'] == 'test.json'
assert transformed_item['data'] == expected_data
assert transformed_item['data_type'] == expected_data_type
assert transformed_item['url'] == 'http://test.com'

0 comments on commit 9fa02e3

Please sign in to comment.