Skip to content

Commit

Permalink
Merge 61157e6 into 01eefcb
Browse files Browse the repository at this point in the history
  • Loading branch information
yolile committed Jan 31, 2021
2 parents 01eefcb + 61157e6 commit c5fd589
Show file tree
Hide file tree
Showing 35 changed files with 588 additions and 280 deletions.
133 changes: 35 additions & 98 deletions kingfisher_scrapy/base_spider.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
import json
import os
from abc import abstractmethod
from datetime import datetime
from io import BytesIO
from math import ceil
from zipfile import ZipFile

import ijson
import scrapy
from jsonpointer import resolve_pointer
from rarfile import RarFile

from kingfisher_scrapy import util
from kingfisher_scrapy.exceptions import MissingNextLinkError, SpiderArgumentError
from kingfisher_scrapy.items import File, FileError, FileItem
from kingfisher_scrapy.exceptions import MissingNextLinkError, SpiderArgumentError, UnknownArchiveFormatError
from kingfisher_scrapy.items import File, FileError
from kingfisher_scrapy.util import add_query_string, handle_http_error

browser_user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36' # noqa: E501
Expand All @@ -28,20 +26,25 @@ class BaseSpider(scrapy.Spider):
- If a spider requires date parameters to be set, add a ``date_required = True`` class attribute, and set the
``default_from_date`` class attribute to a date string.
- If the spider doesn't work with the ``pluck`` command, set a ``skip_pluck`` class attribute to the reason.
- If a spider collects data as CSV or XLSX files, set the class attribute ``unflatten = True`` to convert each
- If a spider collects data as CSV or XLSX files, add a ``unflatten = True`` class attribute to convert each
item to json files in the Unflatten pipeline class using the ``unflatten`` command from Flatten Tool.
If you need to set more arguments for the unflatten command, set a ``unflatten_args`` dict with them.
- If the data is not formatted as OCDS (record, release, record package or release package), set the ``root_path``
class attribute to the path to the OCDS data.
- If the data is line-delimited JSON, add a ``line_delimited = True`` class attribute.
If ``date_required`` is ``True``, or if either the ``from_date`` or ``until_date`` spider arguments are set, then
``from_date`` defaults to the ``default_from_date`` class attribute, and ``until_date`` defaults to the
``get_default_until_date()`` return value (which is the current time, by default).
"""
MAX_RELEASES_PER_PACKAGE = 100
VALID_DATE_FORMATS = {'date': '%Y-%m-%d', 'datetime': '%Y-%m-%dT%H:%M:%S'}

ocds_version = '1.1'
date_format = 'date'
date_required = False
unflatten = False
root_path = ''
line_delimited = False

unflatten_args = {}

def __init__(self, sample=None, note=None, from_date=None, until_date=None, crawl_time=None,
Expand Down Expand Up @@ -205,7 +208,7 @@ def build_file_from_response(self, response, **kwargs):
kwargs.setdefault('data', response.body)
return self.build_file(**kwargs)

def build_file(self, *, file_name=None, url=None, data=None, data_type=None, encoding='utf-8', post_to_api=True):
def build_file(self, *, file_name=None, url=None, data=None, data_type=None, encoding='utf-8'):
"""
Returns a File item to yield.
"""
Expand All @@ -215,20 +218,6 @@ def build_file(self, *, file_name=None, url=None, data=None, data_type=None, enc
'data_type': data_type,
'url': url,
'encoding': encoding,
'post_to_api': post_to_api,
})

def build_file_item(self, *, number=None, file_name=None, url=None, data=None, data_type=None, encoding='utf-8'):
"""
Returns a FileItem item to yield.
"""
return FileItem({
'number': number,
'file_name': file_name,
'data': data,
'data_type': data_type,
'url': url,
'encoding': encoding,
})

def build_file_error_from_response(self, response, **kwargs):
Expand All @@ -244,46 +233,6 @@ def build_file_error_from_response(self, response, **kwargs):
item.update(kwargs)
return item

def _get_package_metadata(self, f, skip_key):
"""
Returns the package metadata from a file object.
:param f: a file object
:param str skip_key: the key to skip
:returns: the package metadata
:rtype: dict
"""
package = {}
for item in util.items(ijson.parse(f), '', skip_key=skip_key):
package.update(item)
return package

def parse_json_lines(self, f, *, file_name='data.json', url=None, data_type=None, encoding='utf-8'):
for number, line in enumerate(f, 1):
if self.sample and number > self.sample:
break
if isinstance(line, bytes):
line = line.decode(encoding=encoding)
yield self.build_file_item(number=number, file_name=file_name, url=url, data=line, data_type=data_type,
encoding=encoding)

def parse_json_array(self, f_package, f_list, *, file_name='data.json', url=None, data_type=None, encoding='utf-8',
array_field_name='releases'):
if self.sample:
size = self.sample
else:
size = self.MAX_RELEASES_PER_PACKAGE

package = self._get_package_metadata(f_package, array_field_name)

for number, items in enumerate(util.grouper(ijson.items(f_list, f'{array_field_name}.item'), size), 1):
package[array_field_name] = filter(None, items)
data = json.dumps(package, default=util.default)
yield self.build_file_item(number=number, file_name=file_name, url=url, data=data, data_type=data_type,
encoding=encoding)
if self.sample:
break

@classmethod
def get_default_until_date(cls, spider):
"""
Expand All @@ -299,7 +248,6 @@ class SimpleSpider(BaseSpider):
#. Inherit from ``SimpleSpider``
#. Set a ``data_type`` class attribute to the data type of the responses
#. Optionally, set an ``encoding`` class attribute to the encoding of the responses (default UTF-8)
#. Optionally, set a ``data_pointer`` class attribute to the JSON Pointer for OCDS data (default "")
#. Write a ``start_requests`` method (and any intermediate callbacks) to send requests
.. code-block:: python
Expand All @@ -317,37 +265,21 @@ def start_requests(self):
"""

encoding = 'utf-8'
data_pointer = ''

@handle_http_error
def parse(self, response):
kwargs = {}
if self.data_pointer:
kwargs['data'] = json.dumps(resolve_pointer(response.json(), self.data_pointer)).encode()

yield self.build_file_from_response(response, data_type=self.data_type, encoding=self.encoding, **kwargs)
yield self.build_file_from_response(response, data_type=self.data_type, encoding=self.encoding)


class CompressedFileSpider(BaseSpider):
"""
This class makes it easy to collect data from ZIP or RAR files. It assumes all files have the same data type.
Each compressed file is saved to disk. The archive file is *not* saved to disk.
#. Inherit from ``CompressedFileSpider``
#. Set a ``data_type`` class attribute to the data type of the compressed files
#. Optionally, set an ``encoding`` class attribute to the encoding of the compressed files (default UTF-8)
#. Optionally, set a ``compressed_file_format`` class attribute to the format of the compressed files
``json_lines``
Yields each line of each compressed file.
The archive file is saved to disk. The compressed files are *not* saved to disk.
``release_package``
Re-packages the releases in the compressed files in groups of
:const:`~kingfisher_scrapy.base_spider.BaseSpider.MAX_RELEASES_PER_PACKAGE`, and yields the packages.
The archive file is saved to disk. The compressed files are *not* saved to disk.
``None``
Yields each compressed file.
Each compressed file is saved to disk. The archive file is *not* saved to disk.
#. Optionally, add a ``resize_package = True`` class attribute to split large packages (e.g. greater than 100MB)
#. Write a ``start_requests`` method to request the archive files
.. code-block:: python
Expand All @@ -361,23 +293,28 @@ class MySpider(CompressedFileSpider):
def start_requests(self):
yield self.build_request('https://example.com/api/packages.zip', formatter=components(-1))
.. note::
``resize_package = True`` is not compatible with ``line_delimited = True`` or ``root_path``.
"""

encoding = 'utf-8'
skip_pluck = 'Archive files are not supported'
compressed_file_format = None
resize_package = False
file_name_must_contain = ''

@handle_http_error
def parse(self, response):
archive_name, archive_format = os.path.splitext(response.request.meta['file_name'])
archive_format = archive_format[1:].lower()
if self.compressed_file_format:
yield self.build_file_from_response(response, data_type=archive_format, post_to_api=False)

if archive_format == 'zip':
cls = ZipFile
else:
elif archive_format == 'rar':
cls = RarFile
else:
raise UnknownArchiveFormatError(response.request.meta['file_name'])

archive_file = cls(BytesIO(response.body))
for file_info in archive_file.infolist():
filename = file_info.filename
Expand All @@ -391,21 +328,21 @@ def parse(self, response):
if not basename.endswith('.json'):
basename += '.json'

data = archive_file.open(filename)
compressed_file = archive_file.open(filename)
# If `resize_package = True`, then we need to open the file twice: once to extract the package metadata and
# then to extract the releases themselves.
if self.resize_package:
data = {'data': compressed_file, 'package': archive_file.open(filename)}
else:
data = compressed_file

kwargs = {
yield File({
'file_name': basename,
'url': response.request.url,
'data': data,
'data_type': self.data_type,
'encoding': self.encoding,
}
if self.compressed_file_format == 'json_lines':
yield from self.parse_json_lines(data, **kwargs)
elif self.compressed_file_format == 'release_package':
package = archive_file.open(filename)
yield from self.parse_json_array(package, data, **kwargs)
else:
yield self.build_file(data=data.read(), **kwargs)
'url': response.request.url,
'encoding': self.encoding
})


class LinksSpider(SimpleSpider):
Expand Down
3 changes: 1 addition & 2 deletions kingfisher_scrapy/commands/crawlall.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from scrapy.commands import ScrapyCommand
from scrapy.exceptions import UsageError

from kingfisher_scrapy.base_spider import BaseSpider, CompressedFileSpider
from kingfisher_scrapy.base_spider import CompressedFileSpider

EXCEPTIONS = {
'fail',
Expand Down Expand Up @@ -44,7 +44,6 @@ def run(self, args, opts):
if opts.sample:
kwargs['sample'] = opts.sample

BaseSpider.parse_json_lines = yield_nothing
CompressedFileSpider.parse = yield_nothing

# Stop after one item or error.
Expand Down
4 changes: 4 additions & 0 deletions kingfisher_scrapy/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ class AccessTokenError(KingfisherScrapyError):

class MissingNextLinkError(KingfisherScrapyError):
"""Raised when a next link is not found on the first page of results"""


class UnknownArchiveFormatError(KingfisherScrapyError):
"""Raised if the archive format of a file can't be determined from the filename"""
13 changes: 9 additions & 4 deletions kingfisher_scrapy/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from scrapy import signals
from scrapy.exceptions import NotConfigured

from kingfisher_scrapy import util
from kingfisher_scrapy.items import File, FileError, FileItem, PluckedItem
from kingfisher_scrapy.kingfisher_process import Client
from kingfisher_scrapy.util import _pluck_filename
Expand Down Expand Up @@ -69,14 +70,18 @@ def item_scraped(self, item, spider):
Returns a dict with the metadata.
"""
if not isinstance(item, File):
if not isinstance(item, (File, FileItem)):
return

# The crawl's relative directory, in the format `<spider_name>[_sample]/<YYMMDD_HHMMSS>`.
name = spider.name
if spider.sample:
name += '_sample'
path = os.path.join(name, spider.get_start_time('%Y%m%d_%H%M%S'), item['file_name'])
if isinstance(item, File):
file_name = item['file_name']
elif isinstance(item, FileItem):
file_name = f"{item['number']}-{item['file_name']}"
path = os.path.join(name, spider.get_start_time('%Y%m%d_%H%M%S'), file_name)

self._write_file(path, item['data'])

Expand All @@ -96,7 +101,7 @@ def _write_file(self, path, data):
if isinstance(data, (bytes, str)):
f.write(data)
else:
json.dump(data, f)
json.dump(data, f, default=util.default)


class KingfisherItemCount:
Expand Down Expand Up @@ -173,7 +178,7 @@ def item_scraped(self, item, spider):
Sends an API request to store the file, file item or file error in Kingfisher Process.
"""
# https://docs.scrapy.org/en/latest/topics/signals.html#scrapy.signals.item_scraped
if not item.get('post_to_api', True) or isinstance(item, PluckedItem):
if isinstance(item, PluckedItem):
return

data = self._build_data_to_send(spider, item['file_name'], item['url'])
Expand Down
3 changes: 0 additions & 3 deletions kingfisher_scrapy/item_schema/File.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
],
"type": "object",
"properties": {
"post_to_api": {
"type": "boolean"
},
"path": {
"type": "string"
},
Expand Down
11 changes: 1 addition & 10 deletions kingfisher_scrapy/item_schema/item.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,8 @@
"enum": [
"record",
"release",
"release_list",
"record_package",
"release_package",
"record_package_list",
"release_package_list",
"record_package_list_in_results",
"release_package_list_in_results",
"release_in_Release",
"zip",
"rar",
"tar.gz"
"release_package"
]
},
"encoding": {
Expand Down
7 changes: 4 additions & 3 deletions kingfisher_scrapy/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ class File(KingfisherItem):
data_type = scrapy.Field()
encoding = scrapy.Field()

# If a file is split into file items, the file is stored to disk, but not sent to Kingfisher Process.
post_to_api = scrapy.Field()

# Added by the KingfisherFilesStore extension, for the KingfisherProcessAPI extension to read the file.
path = scrapy.Field()
files_store = scrapy.Field()
Expand All @@ -28,6 +25,10 @@ class FileItem(KingfisherItem):
data_type = scrapy.Field()
encoding = scrapy.Field()

# Added by the KingfisherFilesStore extension, for the KingfisherProcessAPI extension to read the file.
path = scrapy.Field()
files_store = scrapy.Field()


class FileError(KingfisherItem):
errors = scrapy.Field()
Expand Down

0 comments on commit c5fd589

Please sign in to comment.