Skip to content

Commit

Permalink
Merge 6f1aceb into 92f090a
Browse files Browse the repository at this point in the history
  • Loading branch information
yolile committed May 26, 2020
2 parents 92f090a + 6f1aceb commit 2d45aab
Show file tree
Hide file tree
Showing 9 changed files with 355 additions and 292 deletions.
162 changes: 68 additions & 94 deletions kingfisher_scrapy/base_spider.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import hashlib
import json
import os
from datetime import datetime
from io import BytesIO
from zipfile import ZipFile
Expand Down Expand Up @@ -49,9 +48,9 @@ def __init__(self, sample=None, note=None, from_date=None, until_date=None,

# https://docs.scrapy.org/en/latest/topics/spiders.html#spider-arguments
self.sample = sample == 'true'
self.note = note
self.from_date = from_date
self.until_date = until_date
self.note = note
self.date_format = self.VALID_DATE_FORMATS[date_format]

spider_arguments = {
Expand All @@ -70,10 +69,10 @@ def from_crawler(cls, crawler, *args, **kwargs):
# Checks Spider date ranges arguments
if spider.from_date or spider.until_date:
if not spider.from_date:
# 'from_date' defaults to 'default_from_date' spider class attribute
# Default to `default_from_date` class attribute.
spider.from_date = spider.default_from_date
if not spider.until_date:
# 'until_date' defaults to today
# Default to today.
spider.until_date = datetime.now().strftime(spider.date_format)
try:
spider.from_date = datetime.strptime(spider.from_date, spider.date_format)
Expand All @@ -86,150 +85,125 @@ def from_crawler(cls, crawler, *args, **kwargs):

return spider

def get_local_file_path_including_filestore(self, filename):
"""
Prepends Scrapy's storage directory and the crawl's relative directory to the filename.
"""
return os.path.join(self.crawler.settings['FILES_STORE'], self._get_crawl_path(), filename)

def get_local_file_path_excluding_filestore(self, filename):
"""
Prepends the crawl's relative directory to the filename.
def get_start_time(self, format):
"""
return os.path.join(self._get_crawl_path(), filename)

def save_response_to_disk(self, response, filename, data_type=None, encoding='utf-8'):
Returns the formatted start time of the crawl.
"""
Writes the response's body to the filename in the crawl's directory.
return self.crawler.stats.get_value('start_time').strftime(format)

Writes a ``<filename>.fileinfo`` metadata file in the crawl's directory, and returns a dict with the metadata.
def save_response_to_disk(self, response, filename, data_type=None, encoding='utf-8', post_to_api=True):
"""
return self._save_response_to_disk(response.body, filename, response.request.url, data_type, encoding)

def save_data_to_disk(self, data, filename, url=None, data_type=None, encoding='utf-8'):
Returns an item to yield, based on the response to a request.
"""
Writes the data to the filename in the crawl's directory.
return self.save_data_to_disk(response.body, filename, response.request.url, data_type, encoding,
post_to_api)

Writes a ``<filename>.fileinfo`` metadata file in the crawl's directory, and returns a dict with the metadata.
def save_data_to_disk(self, data, filename, url=None, data_type=None, encoding='utf-8', post_to_api=True):
"""
return self._save_response_to_disk(data, filename, url, data_type, encoding)

def get_start_time(self, format):
Returns an item to yield.
"""
Returns the formatted start time of the crawl.
"""
return self.crawler.stats.get_value('start_time').strftime(format)

def _save_response_to_disk(self, data, filename, url, data_type, encoding):
self._write_file(filename, data)

metadata = {
'url': url,
return {
'success': True,
'file_name': filename,
'data': data,
'data_type': data_type,
'url': url,
'encoding': encoding,
'post_to_api': post_to_api,
}

self._write_file(filename + '.fileinfo', metadata)

metadata['success'] = True
metadata['file_name'] = filename

return metadata

def _write_file(self, filename, data):
path = self.get_local_file_path_including_filestore(filename)
os.makedirs(os.path.dirname(path), exist_ok=True)

if isinstance(data, bytes):
mode = 'wb'
else:
mode = 'w'

with open(path, mode) as f:
if isinstance(data, (bytes, str)):
f.write(data)
else:
json.dump(data, f)

def _get_crawl_path(self):
name = self.name
if self.sample:
name += '_sample'
return os.path.join(name, self.get_start_time('%Y%m%d_%H%M%S'))

def _build_file_item(self, number, line, data_type, url, encoding):
def _build_file_item(self, number, data, data_type, url, encoding, file_name):
return {
'success': True,
'number': number,
'file_name': 'data.json',
'data': line,
'file_name': file_name,
'data': data,
'data_type': data_type,
'url': url,
'encoding': encoding,
'post_to_api': True,
}

def parse_json_lines(self, f, data_type, url, encoding='utf-8'):
for number, line in enumerate(f, 1):
if self.sample and number > self.MAX_SAMPLE:
break
if isinstance(line, bytes):
line = line.decode(encoding=encoding)
yield self._build_file_item(number, line, data_type, url, encoding)

def get_package(self, f, array_name):
def _get_package_metadata(self, f, skip_key):
"""
Returns the package data from a array_name_package object
Returns the package metadata from a file object.
:param f: a file object
:param str skip_key: the key to skip
:returns: the package metadata
:rtype: dict
"""
package = {}
for item in util.items(ijson.parse(f), '', array_name=array_name):
for item in util.items(ijson.parse(f), '', skip_key=skip_key):
package.update(item)
return package

def parse_json_array(self, f_package, f_list, data_type, url, encoding='utf-8', array_field_name='releases'):
def parse_json_lines(self, f, data_type, url, encoding='utf-8', file_name='data.json'):
for number, line in enumerate(f, 1):
if self.sample and number > self.MAX_SAMPLE:
break
if isinstance(line, bytes):
line = line.decode(encoding=encoding)
yield self._build_file_item(number, line, data_type, url, encoding, file_name)

def parse_json_array(self, f_package, f_list, data_type, url, encoding='utf-8', array_field_name='releases',
file_name='data.json'):
if self.sample:
size = self.MAX_SAMPLE
else:
size = self.MAX_RELEASES_PER_PACKAGE

package = self.get_package(f_package, array_field_name)
package = self._get_package_metadata(f_package, array_field_name)

for number, items in enumerate(util.grouper(ijson.items(f_list, '{}.item'.format(array_field_name)), size), 1):
package[array_field_name] = filter(None, items)
yield self._build_file_item(number, json.dumps(package, default=util.default), data_type, url, encoding)
data = json.dumps(package, default=util.default)
yield self._build_file_item(number, data, data_type, url, encoding, file_name)
if self.sample:
break


class ZipSpider(BaseSpider):
def parse_zipfile(self, response, data_type, file_format=None, encoding='utf-8'):
"""
Handling response with JSON data in ZIP files
:param str file_format: The zipped file's format. If this is set to "json_lines", then each line of the zipped
file will be yielded separately. If this is set to "release_package", then the releases will be re-packaged
in groups of :const:`~kingfisher_scrapy.base_spider.BaseSpider.MAX_RELEASES_PER_PACKAGE` and yielded. In
both cases, only the zipped file will be saved to disk. If this is not set, the file will be yielded and
saved to disk.
:param response response: the response that contains the zip file.
:param str data_type: the zipped files data_type
:param str encoding: the zipped files encoding. Default to utf-8
Handles a response that is a ZIP file.
:param response response: the response
:param str data_type: the compressed files' ``data_type``
:param str file_format: The compressed files' format
``json_lines``
Yields each line of the compressed files.
The ZIP file is saved to disk.
``release_package``
Re-packages the releases in the compressed files in groups of
:const:`~kingfisher_scrapy.base_spider.BaseSpider.MAX_RELEASES_PER_PACKAGE`, and yields the packages.
The ZIP file is saved to disk.
``None``
Yields each compressed file.
Each compressed file is saved to disk.
:param str encoding: the compressed files' encoding
"""
if response.status == 200:
if file_format:
self.save_response_to_disk(response, '{}.zip'.format(hashlib.md5(response.url.encode('utf-8'))
.hexdigest()))
filename = '{}.zip'.format(hashlib.md5(response.url.encode('utf-8')).hexdigest())
self.save_response_to_disk(response, filename, post_to_api=False)

zip_file = ZipFile(BytesIO(response.body))
for finfo in zip_file.infolist():
filename = finfo.filename
if not filename.endswith('.json'):
filename += '.json'

data = zip_file.open(finfo.filename)

if file_format == 'json_lines':
yield from self.parse_json_lines(data, data_type, response.request.url, encoding=encoding)
yield from self.parse_json_lines(data, data_type, response.request.url, encoding=encoding,
file_name=filename)
elif file_format == 'release_package':
package = zip_file.open(finfo.filename)
yield from self.parse_json_array(package, data, data_type, response.request.url,
encoding=encoding)
encoding=encoding, file_name=filename)
else:
yield self.save_data_to_disk(data.read(), filename, data_type=data_type, url=response.request.url,
encoding=encoding)
Expand Down
71 changes: 69 additions & 2 deletions kingfisher_scrapy/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,70 @@


# https://docs.scrapy.org/en/latest/topics/extensions.html#writing-your-own-extension
class KingfisherFilesStore:
def __init__(self, directory):
self.directory = directory

@classmethod
def from_crawler(cls, crawler):
directory = crawler.settings['FILES_STORE']
extension = cls(directory)
crawler.signals.connect(extension.item_scraped, signal=signals.item_scraped)
return extension

def item_scraped(self, item, spider):
"""
Writes the item's data to the filename in the crawl's directory.
Writes a ``<filename>.fileinfo`` metadata file in the crawl's directory, and returns a dict with the metadata.
"""
if 'number' not in item:
self._write_file(item['file_name'], item['data'], spider)
metadata = {
'url': item['url'],
'data_type': item['data_type'],
'encoding': item['encoding'],
}
self._write_file(item['file_name'] + '.fileinfo', metadata, spider)
item['path_including_file_store'] = self.get_local_file_path_including_filestore(item['file_name'],
spider)
item['path_excluding_file_store'] = self.get_local_file_path_excluding_filestore(item['file_name'],
spider)

def _write_file(self, filename, data, spider):
path = self.get_local_file_path_including_filestore(filename, spider)
os.makedirs(os.path.dirname(path), exist_ok=True)

if isinstance(data, bytes):
mode = 'wb'
else:
mode = 'w'

with open(path, mode) as f:
if isinstance(data, (bytes, str)):
f.write(data)
else:
json.dump(data, f)

def get_local_file_path_including_filestore(self, filename, spider):
"""
Prepends Scrapy's storage directory and the crawl's relative directory to the filename.
"""
return os.path.join(self.directory, self._get_crawl_path(spider), filename)

def get_local_file_path_excluding_filestore(self, filename, spider):
"""
Prepends the crawl's relative directory to the filename.
"""
return os.path.join(self._get_crawl_path(spider), filename)

def _get_crawl_path(self, spider):
name = spider.name
if spider.sample:
name += '_sample'
return os.path.join(name, spider.get_start_time('%Y%m%d_%H%M%S'))


class KingfisherAPI:
def __init__(self, url, key, directory=None):
"""
Expand Down Expand Up @@ -54,6 +118,9 @@ def item_scraped(self, item, spider):
If the Scrapy item indicates success, sends a Kingfisher Process API request to create either a Kingfisher
Process file or file item. Otherwise, sends an API request to create a file error.
"""
if not item.get('post_to_api', True):
return

data = {
'collection_source': spider.name,
'collection_data_version': spider.get_start_time('%Y-%m-%d %H:%M:%S'),
Expand All @@ -78,11 +145,11 @@ def item_scraped(self, item, spider):
# File
else:
if self.directory:
path = spider.get_local_file_path_excluding_filestore(item['file_name'])
path = item['path_excluding_file_store']
data['local_file_name'] = os.path.join(self.directory, path)
files = {}
else:
path = spider.get_local_file_path_including_filestore(item['file_name'])
path = item['path_including_file_store']
f = open(path, 'rb')
files = {'file': (item['file_name'], f, 'application/json')}

Expand Down
12 changes: 12 additions & 0 deletions kingfisher_scrapy/log_formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from scrapy import logformatter


class KingfisherLogFormatter(logformatter.LogFormatter):
# https://docs.scrapy.org/en/latest/_modules/scrapy/logformatter.html#LogFormatter.scraped
def scraped(self, item, response, spider):
"""
Omits an item's `data` value from the log message.
"""
item = item.copy()
item.pop('data', None)
return super().scraped(item, response, spider)
7 changes: 6 additions & 1 deletion kingfisher_scrapy/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@
# 'scrapy.extensions.telnet.TelnetConsole': None,
#}
EXTENSIONS = {
'kingfisher_scrapy.extensions.KingfisherAPI': 0,
# `KingfisherFilesStore` must run before `KingfisherAPI`, because the file needs to be written before the request
# is sent to Kingfisher Process.
'kingfisher_scrapy.extensions.KingfisherFilesStore': 100,
'kingfisher_scrapy.extensions.KingfisherAPI': 500,
}

# Configure item pipelines
Expand All @@ -85,6 +88,8 @@
# instead of files to Kingfisher Process' API. To enable that, set this to the absolute path to the `FILES_STORE`.
KINGFISHER_API_LOCAL_DIRECTORY = os.getenv('KINGFISHER_API_LOCAL_DIRECTORY')

LOG_FORMATTER = 'kingfisher_scrapy.log_formatter.KingfisherLogFormatter'

KINGFISHER_PARAGUAY_HACIENDA_REQUEST_TOKEN = os.getenv('KINGFISHER_PARAGUAY_HACIENDA_REQUEST_TOKEN')
KINGFISHER_PARAGUAY_HACIENDA_CLIENT_SECRET = os.getenv('KINGFISHER_PARAGUAY_HACIENDA_CLIENT_SECRET')

Expand Down

0 comments on commit 2d45aab

Please sign in to comment.