Skip to content

Commit

Permalink
Merge 107fbe8 into 7ee3b99
Browse files Browse the repository at this point in the history
  • Loading branch information
jpmckinney committed Feb 25, 2021
2 parents 7ee3b99 + 107fbe8 commit b3c4dce
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 46 deletions.
2 changes: 2 additions & 0 deletions docs/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Plucks one data value per publisher. It writes a CSV file with the results, and
- ``--package-pointer=STR`` (``-p``): The JSON Pointer to the value in the package.
- ``--release-pointer=STR`` (``-r``): The JSON Pointer to the value in the release.
- ``--truncate=NUM`` (``-t``): Truncate the value to this number of characters.
- ``--max-bytes=NUM``: Stop downloading an OCDS file after reading at least this many bytes.
- ``spider``: Run specific spiders. Omit to run all spiders.

Get each publisher's publication policy
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
75 changes: 39 additions & 36 deletions kingfisher_scrapy/base_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,42 +333,45 @@ def parse(self, response):
else:
raise UnknownArchiveFormatError(response.request.meta['file_name'])

with cls(BytesIO(response.body)) as archive_file:
number = 1
for file_info in archive_file.infolist():
# Avoid reading the rest of a large file, since the rest of the items will be dropped.
if self.sample and number > self.sample:
break

filename = file_info.filename
basename = os.path.basename(filename)
if self.file_name_must_contain not in basename:
continue
if archive_format == 'rar' and file_info.isdir():
continue
if archive_format == 'zip' and file_info.is_dir():
continue
if not basename.endswith('.json'):
basename += '.json'

compressed_file = archive_file.open(filename)

# If `resize_package = True`, then we need to open the file twice: once to extract the package metadata
# and then to extract the releases themselves.
if self.resize_package:
data = {'data': compressed_file, 'package': archive_file.open(filename)}
else:
data = compressed_file

yield File({
'file_name': basename,
'data': data,
'data_type': self.data_type,
'url': response.request.url,
'encoding': self.encoding
})

number += 1
# If we use a context manager here, the archive file might close before the item pipeline reads from the file
# handlers of the compressed files.
archive_file = cls(BytesIO(response.body))

number = 1
for file_info in archive_file.infolist():
# Avoid reading the rest of a large file, since the rest of the items will be dropped.
if self.sample and number > self.sample:
break

filename = file_info.filename
basename = os.path.basename(filename)
if self.file_name_must_contain not in basename:
continue
if archive_format == 'rar' and file_info.isdir():
continue
if archive_format == 'zip' and file_info.is_dir():
continue
if not basename.endswith('.json'):
basename += '.json'

compressed_file = archive_file.open(filename)

# If `resize_package = True`, then we need to open the file twice: once to extract the package metadata and
# then to extract the releases themselves.
if self.resize_package:
data = {'data': compressed_file, 'package': archive_file.open(filename)}
else:
data = compressed_file

yield File({
'file_name': basename,
'data': data,
'data_type': self.data_type,
'url': response.request.url,
'encoding': self.encoding
})

number += 1


class LinksSpider(SimpleSpider):
Expand Down
8 changes: 7 additions & 1 deletion kingfisher_scrapy/commands/pluck.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@


class Pluck(ScrapyCommand):
def syntax(self):
return '[options] [spider ...]'

def short_desc(self):
return 'Pluck one data value per publisher'

Expand All @@ -21,6 +24,7 @@ def add_options(self, parser):
parser.add_option('-p', '--package-pointer', help='The JSON Pointer to the value in the package')
parser.add_option('-r', '--release-pointer', help='The JSON Pointer to the value in the release')
parser.add_option('-t', '--truncate', type=int, help='Truncate the value to this number of characters')
parser.add_option('--max-bytes', type=int, help='Stop downloading an OCDS file after reading this many bytes')

def run(self, args, opts):
if not (bool(opts.package_pointer) ^ bool(opts.release_pointer)):
Expand All @@ -35,6 +39,8 @@ def run(self, args, opts):
'scrapy.extensions.telnet.TelnetConsole': None,
'kingfisher_scrapy.extensions.KingfisherPluck': 1,
})
if opts.max_bytes:
self.settings.set('KINGFISHER_PLUCK_MAX_BYTES', opts.max_bytes)

filename = _pluck_filename(opts)
if os.path.isfile(filename):
Expand All @@ -44,7 +50,7 @@ def run(self, args, opts):
skipped = defaultdict(list)
running = []
for spider_name in self.crawler_process.spider_loader.list():
if spider_name != 'fail':
if not args and spider_name != 'fail' or spider_name in args:
spidercls = self.crawler_process.spider_loader.load(spider_name)
if hasattr(spidercls, 'skip_pluck'):
skipped[spidercls.skip_pluck].append(spider_name)
Expand Down
31 changes: 24 additions & 7 deletions kingfisher_scrapy/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

import json
import os
from collections import defaultdict

import sentry_sdk
from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.exceptions import NotConfigured, StopDownload

from kingfisher_scrapy import util
from kingfisher_scrapy.items import File, FileError, FileItem, PluckedItem
Expand All @@ -15,30 +16,46 @@

# https://docs.scrapy.org/en/latest/topics/extensions.html#writing-your-own-extension
class KingfisherPluck:
def __init__(self, directory):
def __init__(self, directory, max_bytes):
self.directory = directory
self.spiders_seen = set()
self.max_bytes = max_bytes

self.bytes_received_per_spider = defaultdict(int)
self.item_scraped_per_spider = set()

@classmethod
def from_crawler(cls, crawler):
directory = crawler.settings['KINGFISHER_PLUCK_PATH']
max_bytes = crawler.settings['KINGFISHER_PLUCK_MAX_BYTES']

extension = cls(directory=directory)
extension = cls(directory=directory, max_bytes=max_bytes)
crawler.signals.connect(extension.item_scraped, signal=signals.item_scraped)
crawler.signals.connect(extension.spider_closed, signal=signals.spider_closed)
if max_bytes:
crawler.signals.connect(extension.bytes_received, signal=signals.bytes_received)

return extension

def bytes_received(self, data, request, spider):
# We only limit the bytes received for final requests (i.e. where the callback is the default `parse` method).
if not spider.pluck or request.callback is not None or request.meta['file_name'].endswith(('.rar', '.zip')):
return

# Scrapy typically downloads of 16,384-byte chunks.
self.bytes_received_per_spider[spider.name] += len(data)
if self.bytes_received_per_spider[spider.name] >= self.max_bytes:
raise StopDownload(fail=False)

def item_scraped(self, item, spider):
if not spider.pluck or spider.name in self.spiders_seen or not isinstance(item, PluckedItem):
if not spider.pluck or spider.name in self.item_scraped_per_spider or not isinstance(item, PluckedItem):
return

self.spiders_seen.add(spider.name)
self.item_scraped_per_spider.add(spider.name)

self._write(spider, item['value'])

def spider_closed(self, spider, reason):
if not spider.pluck or spider.name in self.spiders_seen:
if not spider.pluck or spider.name in self.item_scraped_per_spider:
return

self._write(spider, f'closed: {reason}')
Expand Down
3 changes: 3 additions & 0 deletions kingfisher_scrapy/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ def process_item(self, item, spider):
value = next(ijson.items(item['data'], pointer.replace('/', '.')[1:]))
except StopIteration:
value = f'error: {pointer} not found'
# The JSON text can be truncated by a `bytes_received` handler.
except ijson.common.IncompleteJSONError:
value = f'error: {pointer} not found within initial bytes'
else: # spider.release_pointer
if isinstance(item['data'], dict):
data = item['data']
Expand Down
1 change: 1 addition & 0 deletions kingfisher_scrapy/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
KINGFISHER_OPENOPPS_PASSWORD = os.getenv('KINGFISHER_OPENOPPS_PASSWORD')

KINGFISHER_PLUCK_PATH = os.getenv('KINGFISHER_PLUCK_PATH', '')
KINGFISHER_PLUCK_MAX_BYTES = None

# Enable and configure the AutoThrottle extension (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html
Expand Down
4 changes: 2 additions & 2 deletions kingfisher_scrapy/spiders/digiwhist_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ def parse(self, response):

# Load a line at the time, pass it to API
with tarfile.open(fileobj=BytesIO(response.body), mode="r:gz") as tar:
with tar.extractfile(tar.getnames()[0]) as readfp:
yield self.build_file_from_response(data=readfp, response=response, file_name='data.json',
with tar.extractfile(tar.next()) as f:
yield self.build_file_from_response(data=f, response=response, file_name='data.json',
data_type='release_package')
46 changes: 46 additions & 0 deletions tests/extensions/test_kingfisher_pluck.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@

import os
from glob import glob
from tempfile import TemporaryDirectory

import pytest
from scrapy import Request
from scrapy.exceptions import StopDownload

from kingfisher_scrapy.extensions import KingfisherPluck
from kingfisher_scrapy.items import PluckedItem
from tests import spider_with_crawler
Expand Down Expand Up @@ -67,3 +72,44 @@ def test_spider_closed_without_items():

with open(os.path.join(tmpdirname, 'pluck-release-date.csv')) as f:
assert 'closed: itemcount,test\n' == f.read()


def test_bytes_received_stop_download():
with TemporaryDirectory() as tmpdirname:
spider = spider_with_crawler(settings={'KINGFISHER_PLUCK_PATH': tmpdirname,
'KINGFISHER_PLUCK_MAX_BYTES': 1}, release_pointer='/date')
extension = KingfisherPluck.from_crawler(spider.crawler)
request = Request('http://example.com', meta={'file_name': 'test.json'})

with pytest.raises(StopDownload):
extension.bytes_received(data={'test': 'test'}, spider=spider, request=request)

assert extension.max_bytes == 1


def test_bytes_received_dont_stop_download():
with TemporaryDirectory() as tmpdirname:
spider = spider_with_crawler(settings={'KINGFISHER_PLUCK_PATH': tmpdirname,
'KINGFISHER_PLUCK_MAX_BYTES': 10}, release_pointer='/date')
extension = KingfisherPluck.from_crawler(spider.crawler)
request = Request('http://example.com', meta={'file_name': 'test.json'})

extension.bytes_received(data={'test': 'test'}, spider=spider, request=request)
assert extension.bytes_received_per_spider[spider.name] == 1

assert extension.max_bytes == 10


@pytest.mark.parametrize('test_request', [
Request('http://example.com', meta={'file_name': 'test.zip'}),
Request('http://example.com', meta={'file_name': 'test.rar'}),
Request('http://example.com', callback=lambda item: item, meta={'file_name': 'test.json'})
])
def test_bytes_received_ignored_requests(test_request):
with TemporaryDirectory() as tmpdirname:
spider = spider_with_crawler(settings={'KINGFISHER_PLUCK_PATH': tmpdirname,
'KINGFISHER_PLUCK_MAX_BYTES': 10}, release_pointer='/date')
extension = KingfisherPluck.from_crawler(spider.crawler)

extension.bytes_received(data={'test': 'test'}, spider=spider, request=test_request)
assert extension.bytes_received_per_spider[spider.name] == 0
14 changes: 14 additions & 0 deletions tests/pipelines/test_pluck.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,17 @@ def test_process_item_non_package_data_type():
})

assert pipeline.process_item(item, spider) == PluckedItem({'value': 'error: /publishedDate not found'})


def test_process_item_incomplete_json():
spider = spider_with_crawler(package_pointer='/publishedDate')

pipeline = Pluck()
item = File({
'file_name': 'test',
'data': b'{"key": "value"',
'data_type': 'release_package',
'url': 'http://test.com',
})

assert pipeline.process_item(item, spider) == {'value': 'error: /publishedDate not found within initial bytes'}

0 comments on commit b3c4dce

Please sign in to comment.