Skip to content

Commit

Permalink
Merge f05e63b into 92f090a
Browse files Browse the repository at this point in the history
  • Loading branch information
yolile committed May 21, 2020
2 parents 92f090a + f05e63b commit 077beb4
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 246 deletions.
98 changes: 29 additions & 69 deletions kingfisher_scrapy/base_spider.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import hashlib
import json
import os
from datetime import datetime
from io import BytesIO
from zipfile import ZipFile
Expand Down Expand Up @@ -86,95 +85,52 @@ def from_crawler(cls, crawler, *args, **kwargs):

return spider

def get_local_file_path_including_filestore(self, filename):
def save_response_to_disk(self, response, filename, data_type=None, encoding='utf-8', post_to_api=True):
"""
Prepends Scrapy's storage directory and the crawl's relative directory to the filename.
Returns an item to yield, based on the response to a request.
"""
return os.path.join(self.crawler.settings['FILES_STORE'], self._get_crawl_path(), filename)
return self.save_data_to_disk(response.body, filename, response.request.url, data_type, encoding,
post_to_api)

def get_local_file_path_excluding_filestore(self, filename):
def save_data_to_disk(self, data, filename, url=None, data_type=None, encoding='utf-8', post_to_api=True):
"""
Prepends the crawl's relative directory to the filename.
Returns an item to yield
"""
return os.path.join(self._get_crawl_path(), filename)

def save_response_to_disk(self, response, filename, data_type=None, encoding='utf-8'):
"""
Writes the response's body to the filename in the crawl's directory.
Writes a ``<filename>.fileinfo`` metadata file in the crawl's directory, and returns a dict with the metadata.
"""
return self._save_response_to_disk(response.body, filename, response.request.url, data_type, encoding)

def save_data_to_disk(self, data, filename, url=None, data_type=None, encoding='utf-8'):
"""
Writes the data to the filename in the crawl's directory.
Writes a ``<filename>.fileinfo`` metadata file in the crawl's directory, and returns a dict with the metadata.
"""
return self._save_response_to_disk(data, filename, url, data_type, encoding)
return {
'data': data,
'file_name': filename,
'url': url,
'data_type': data_type,
'encoding': encoding,
'success': True,
'post_to_api': post_to_api,
}

def get_start_time(self, format):
"""
Returns the formatted start time of the crawl.
"""
return self.crawler.stats.get_value('start_time').strftime(format)

def _save_response_to_disk(self, data, filename, url, data_type, encoding):
self._write_file(filename, data)

metadata = {
'url': url,
'data_type': data_type,
'encoding': encoding,
}

self._write_file(filename + '.fileinfo', metadata)

metadata['success'] = True
metadata['file_name'] = filename

return metadata

def _write_file(self, filename, data):
path = self.get_local_file_path_including_filestore(filename)
os.makedirs(os.path.dirname(path), exist_ok=True)

if isinstance(data, bytes):
mode = 'wb'
else:
mode = 'w'

with open(path, mode) as f:
if isinstance(data, (bytes, str)):
f.write(data)
else:
json.dump(data, f)

def _get_crawl_path(self):
name = self.name
if self.sample:
name += '_sample'
return os.path.join(name, self.get_start_time('%Y%m%d_%H%M%S'))

def _build_file_item(self, number, line, data_type, url, encoding):
def _build_file_item(self, number, line, data_type, url, encoding, file_name):
return {
'success': True,
'number': number,
'file_name': 'data.json',
'file_name': file_name,
'data': line,
'data_type': data_type,
'url': url,
'encoding': encoding,
'post_to_api': True
}

def parse_json_lines(self, f, data_type, url, encoding='utf-8'):
def parse_json_lines(self, f, data_type, url, encoding='utf-8', file_name='data.json'):
for number, line in enumerate(f, 1):
if self.sample and number > self.MAX_SAMPLE:
break
if isinstance(line, bytes):
line = line.decode(encoding=encoding)
yield self._build_file_item(number, line, data_type, url, encoding)
yield self._build_file_item(number, line, data_type, url, encoding, file_name)

def get_package(self, f, array_name):
"""
Expand All @@ -185,7 +141,8 @@ def get_package(self, f, array_name):
package.update(item)
return package

def parse_json_array(self, f_package, f_list, data_type, url, encoding='utf-8', array_field_name='releases'):
def parse_json_array(self, f_package, f_list, data_type, url, encoding='utf-8',
array_field_name='releases', file_name='data.json'):
if self.sample:
size = self.MAX_SAMPLE
else:
Expand All @@ -195,7 +152,8 @@ def parse_json_array(self, f_package, f_list, data_type, url, encoding='utf-8',

for number, items in enumerate(util.grouper(ijson.items(f_list, '{}.item'.format(array_field_name)), size), 1):
package[array_field_name] = filter(None, items)
yield self._build_file_item(number, json.dumps(package, default=util.default), data_type, url, encoding)
yield self._build_file_item(number, json.dumps(package, default=util.default), data_type, url, encoding,
file_name)
if self.sample:
break

Expand All @@ -217,19 +175,21 @@ def parse_zipfile(self, response, data_type, file_format=None, encoding='utf-8')
if response.status == 200:
if file_format:
self.save_response_to_disk(response, '{}.zip'.format(hashlib.md5(response.url.encode('utf-8'))
.hexdigest()))
.hexdigest()),
post_to_api=False)
zip_file = ZipFile(BytesIO(response.body))
for finfo in zip_file.infolist():
filename = finfo.filename
if not filename.endswith('.json'):
filename += '.json'
data = zip_file.open(finfo.filename)
if file_format == 'json_lines':
yield from self.parse_json_lines(data, data_type, response.request.url, encoding=encoding)
yield from self.parse_json_lines(data, data_type, response.request.url, encoding=encoding,
file_name=filename)
elif file_format == 'release_package':
package = zip_file.open(finfo.filename)
yield from self.parse_json_array(package, data, data_type, response.request.url,
encoding=encoding)
encoding=encoding, file_name=filename)
else:
yield self.save_data_to_disk(data.read(), filename, data_type=data_type, url=response.request.url,
encoding=encoding)
Expand Down
71 changes: 69 additions & 2 deletions kingfisher_scrapy/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,70 @@


# https://docs.scrapy.org/en/latest/topics/extensions.html#writing-your-own-extension
class KingfisherFilesStore:
def __init__(self, directory):
self.directory = directory

@classmethod
def from_crawler(cls, crawler):
directory = crawler.settings['FILES_STORE']
extension = cls(directory)
crawler.signals.connect(extension.item_scraped, signal=signals.item_scraped)
return extension

def item_scraped(self, item, spider):
"""
Writes the item's data to the filename in the crawl's directory.
Writes a ``<filename>.fileinfo`` metadata file in the crawl's directory, and returns a dict with the metadata.
"""
if 'number' not in item:
self._write_file(item['file_name'], item['data'], spider)
metadata = {
'url': item['url'],
'data_type': item['data_type'],
'encoding': item['encoding'],
}
self._write_file(item['file_name'] + '.fileinfo', metadata, spider)
item['path_including_file_store'] = self.get_local_file_path_including_filestore(item['file_name'],
spider)
item['path_excluding_file_store'] = self.get_local_file_path_excluding_filestore(item['file_name'],
spider)

def _write_file(self, filename, data, spider):
path = self.get_local_file_path_including_filestore(filename, spider)
os.makedirs(os.path.dirname(path), exist_ok=True)

if isinstance(data, bytes):
mode = 'wb'
else:
mode = 'w'

with open(path, mode) as f:
if isinstance(data, (bytes, str)):
f.write(data)
else:
json.dump(data, f)

def get_local_file_path_including_filestore(self, filename, spider):
"""
Prepends Scrapy's storage directory and the crawl's relative directory to the filename.
"""
return os.path.join(self.directory, self._get_crawl_path(spider), filename)

def get_local_file_path_excluding_filestore(self, filename, spider):
"""
Prepends the crawl's relative directory to the filename.
"""
return os.path.join(self._get_crawl_path(spider), filename)

def _get_crawl_path(self, spider):
name = spider.name
if spider.sample:
name += '_sample'
return os.path.join(name, spider.get_start_time('%Y%m%d_%H%M%S'))


class KingfisherAPI:
def __init__(self, url, key, directory=None):
"""
Expand Down Expand Up @@ -54,6 +118,9 @@ def item_scraped(self, item, spider):
If the Scrapy item indicates success, sends a Kingfisher Process API request to create either a Kingfisher
Process file or file item. Otherwise, sends an API request to create a file error.
"""
if not item.get('post_to_api', True):
return

data = {
'collection_source': spider.name,
'collection_data_version': spider.get_start_time('%Y-%m-%d %H:%M:%S'),
Expand All @@ -78,11 +145,11 @@ def item_scraped(self, item, spider):
# File
else:
if self.directory:
path = spider.get_local_file_path_excluding_filestore(item['file_name'])
path = item['path_excluding_file_store']
data['local_file_name'] = os.path.join(self.directory, path)
files = {}
else:
path = spider.get_local_file_path_including_filestore(item['file_name'])
path = item['path_including_file_store']
f = open(path, 'rb')
files = {'file': (item['file_name'], f, 'application/json')}

Expand Down
22 changes: 22 additions & 0 deletions kingfisher_scrapy/log_formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import logging

from scrapy import logformatter
from twisted.python.failure import Failure


class KingfisherCustomLoggerFormatter(logformatter.LogFormatter):
# from https://docs.scrapy.org/en/latest/_modules/scrapy/logformatter.html#LogFormatter.scraped
def scraped(self, item, response, spider):
"""Logs a message when an item is scraped by a spider."""
if isinstance(response, Failure):
src = response.getErrorMessage()
else:
src = response
return {
'level': logging.DEBUG,
# we dont log the item content just the source
'msg': "Scraped from %(src)s",
'args': {
'src': src
}
}
8 changes: 7 additions & 1 deletion kingfisher_scrapy/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@
# 'scrapy.extensions.telnet.TelnetConsole': None,
#}
EXTENSIONS = {
'kingfisher_scrapy.extensions.KingfisherAPI': 0,
'kingfisher_scrapy.extensions.KingfisherFilesStore': 100,
# KingfisherAPI must have lower priority than KingfisherStoreFiles, because the file needs to be written before the
# request is sent to Kingfisher Process.
'kingfisher_scrapy.extensions.KingfisherAPI': 500,
}

# Configure item pipelines
Expand All @@ -85,6 +88,9 @@
# instead of files to Kingfisher Process' API. To enable that, set this to the absolute path to the `FILES_STORE`.
KINGFISHER_API_LOCAL_DIRECTORY = os.getenv('KINGFISHER_API_LOCAL_DIRECTORY')

# We use a custom formatter to avoid logging a item content
LOG_FORMATTER = 'kingfisher_scrapy.log_formatter.KingfisherCustomLoggerFormatter'

KINGFISHER_PARAGUAY_HACIENDA_REQUEST_TOKEN = os.getenv('KINGFISHER_PARAGUAY_HACIENDA_REQUEST_TOKEN')
KINGFISHER_PARAGUAY_HACIENDA_CLIENT_SECRET = os.getenv('KINGFISHER_PARAGUAY_HACIENDA_CLIENT_SECRET')

Expand Down
21 changes: 3 additions & 18 deletions kingfisher_scrapy/spiders/digiwhist_base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import os
import tarfile
from io import BytesIO

from kingfisher_scrapy.base_spider import BaseSpider

Expand All @@ -10,24 +9,10 @@ class DigiwhistBase(BaseSpider):
def parse(self, response):
if response.status == 200:

save_file_name = self.get_local_file_path_including_filestore('file.tar.gz')

# Create folder for data
os.makedirs(os.path.dirname(save_file_name), exist_ok=True)

# Save original file
with open(save_file_name, "wb") as fp:
fp.write(response.body)

# Save some extra info alongside that file
with open(save_file_name + '.fileinfo', 'w') as f:
f.write(json.dumps({
'url': response.request.url,
'data_type': 'release_package_json_lines',
}))
yield self.save_response_to_disk(response, 'file.tar.gz', post_to_api=False)

# Load a line at the time, pass it to API
with tarfile.open(save_file_name, "r:gz") as tar:
with tarfile.open(fileobj=BytesIO(response.body), mode="r:gz") as tar:
with tar.extractfile(tar.getnames()[0]) as readfp:
yield from self.parse_json_lines(readfp, 'release_package', self.start_urls[0])
else:
Expand Down
32 changes: 5 additions & 27 deletions kingfisher_scrapy/spiders/georgia_opendata.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from zipfile import ZipFile

import scrapy

from kingfisher_scrapy.base_spider import BaseSpider
from kingfisher_scrapy.base_spider import ZipSpider


class GeorgiaOpenData(BaseSpider):
class GeorgiaOpenData(ZipSpider):
name = 'georgia_opendata'
custom_settings = {
# This has to download a 400MB file so .....
Expand All @@ -14,32 +12,12 @@ class GeorgiaOpenData(BaseSpider):

def start_requests(self):
yield scrapy.Request(
url='http://opendata.spa.ge/json/allTenders.zip',
callback=self.parse_zip
url='http://opendata.spa.ge/json/allTenders.zip'
)

def parse_zip(self, response):
def parse(self, response):
if response.status == 200:

# Save original file
save_file_name = self.get_local_file_path_including_filestore('allTenders.zip')
with open(save_file_name, "wb") as fp:
fp.write(response.body)

# Now extract each file one at a time, save to disk and pass to pipelines for processing
zip_file = ZipFile(save_file_name)
for finfo in zip_file.infolist():
if finfo.filename.endswith('.json'):
data = zip_file.open(finfo.filename).read()
yield self.save_data_to_disk(
data,
finfo.filename.replace('/', '-'),
data_type='release_package',
url=response.request.url
)
if self.sample:
return

yield from self.parse_zipfile(response, 'release_package', file_format='release_package')
else:
yield {
'success': False,
Expand Down

0 comments on commit 077beb4

Please sign in to comment.