Skip to content

Commit

Permalink
Merge 98f339d into df85f71
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubkrafka committed Mar 16, 2021
2 parents df85f71 + 98f339d commit 899b325
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 17 deletions.
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[flake8]
per-file-ignores =
kingfisher_scrapy/settings.py:E265
max-line-length = 119
120 changes: 120 additions & 0 deletions kingfisher_scrapy/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import json
import os

import requests
import sentry_sdk
import treq
from scrapy import signals
from scrapy.exceptions import NotConfigured, StopDownload

Expand Down Expand Up @@ -291,3 +293,121 @@ def from_crawler(cls, crawler):
extension = cls()
sentry_sdk.init(sentry_dsn)
return extension


class KingfisherProcessNGAPI:
"""
If the ``KINGFISHER_NG_API_URL`` environment variable or configuration setting is set,
then messages are sent to a Kingfisher Process API for the ``item_scraped`` and ``spider_closed`` signals.
"""
def __init__(self, url, username, password):
self.url = url
self.username = username
self.password = password

@classmethod
def from_crawler(cls, crawler):
url = crawler.settings['KINGFISHER_NG_API_URL']
username = crawler.settings['KINGFISHER_NG_API_USERNAME']
password = crawler.settings['KINGFISHER_NG_API_PASSWORD']

if not url:
raise NotConfigured('KINGFISHER_NG_API_URL is not set.')

if (username and not password) or (password and not username):
raise NotConfigured('Both KINGFISHER_NG_API_USERNAME and KINGFISHER_NG_API_PASSWORD must be set.')

extension = cls(url, username, password)
crawler.signals.connect(extension.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(extension.item_scraped, signal=signals.item_scraped)
crawler.signals.connect(extension.spider_closed, signal=signals.spider_closed)

return extension

def spider_opened(self, spider):
"""
Sends an API request to start the collection in Kingfisher Process.
"""
data = {
"source_id": spider.name,
"data_version": spider.get_start_time('%Y-%m-%d %H:%M:%S'),
"note": spider.note,
"sample": spider.sample,
"compile": True,
"upgrade": spider.ocds_version == '1.0',
"check": True,
}

response = self._post_sync("api/v1/create_collection", data)

if not response.ok:
spider.logger.warning(
'Failed to POST create_collection. API status code: {}'.format(response.status_code))
else:
response_data = response.json()
self.collection_id = response_data["collection_id"]

def spider_closed(self, spider, reason):
"""
Sends an API request to close the collection.
"""
# https://docs.scrapy.org/en/latest/topics/signals.html#spider-closed
if spider.pluck or spider.keep_collection_open:
return

data = {
"collection_id": self.collection_id,
"reason": reason
}

def response_callback(response):
if not response.code == 200:
spider.logger.warning(
'Failed to post close collection. API status code: {}'.format(response.code))

self._post_async("api/v1/close_collection", data, response_callback)

def item_scraped(self, item, spider):
"""
Sends an API request to store the file in Kingfisher Process.
"""
if isinstance(item, PluckedItem):
return

data = {
"collection_id": self.collection_id,
"path": os.path.join(item.get("files_store", None), item.get("path", None)),
"url": item.get('url', None)
}

if isinstance(item, FileError):
# in case of error send info about it to api
data['errors'] = json.dumps(item.get("errors", None))

def response_callback(response):
if not response.code == 200:
spider.logger.warning("Failed to POST create_collection_file. API status code: {}".format(
response.code))

self._post_async("api/v1/create_collection_file", data, response_callback)

def _post_sync(self, url, data):
"""
Posts synchronous requests to Kingfisher Process' API, adding authentication if needed.
"""
kwargs = {}
if self.username and self.password:
kwargs['auth'] = (self.username, self.password)

return requests.post("{}/{}".format(self.url, url), json=data, **kwargs)

def _post_async(self, url, data, callback):
"""
Posts asynchronous requests to Kingfisher Process' API, adding authentication if needed.
"""
kwargs = {}
if self.username and self.password:
kwargs['auth'] = (self.username, self.password)
request = treq.post("{}/{}".format(self.url, url), json=data, **kwargs)

request.addCallback(callback)
5 changes: 5 additions & 0 deletions kingfisher_scrapy/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
'kingfisher_scrapy.extensions.KingfisherFilesStore': 100,
'kingfisher_scrapy.extensions.KingfisherProcessAPI': 500,
'kingfisher_scrapy.extensions.KingfisherItemCount': 600,
'kingfisher_scrapy.extensions.KingfisherProcessNGAPI': 700,
}

# Configure item pipelines
Expand All @@ -103,6 +104,10 @@
# https://kingfisher-collect.readthedocs.io/en/latest/kingfisher_process.html
KINGFISHER_API_URI = os.getenv('KINGFISHER_API_URI')
KINGFISHER_API_KEY = os.getenv('KINGFISHER_API_KEY')
KINGFISHER_NG_API_URL = os.getenv('KINGFISHER_NG_API_URL')
KINGFISHER_NG_API_USERNAME = os.getenv('KINGFISHER_NG_API_USERNAME')
KINGFISHER_NG_API_PASSWORD = os.getenv('KINGFISHER_NG_API_PASSWORD')

# If Kingfisher Process can read Kingfisher Collect's `FILES_STORE`, then Kingfisher Collect can send file paths
# instead of files to Kingfisher Process' API. To enable that, set this to the absolute path to the `FILES_STORE`.
KINGFISHER_API_LOCAL_DIRECTORY = os.getenv('KINGFISHER_API_LOCAL_DIRECTORY')
Expand Down
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ scrapyd-client
sentry-sdk
treq
twisted
requests
7 changes: 2 additions & 5 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# This file is autogenerated by pip-compile
# To update, run:
#
# pip-compile
# pip-compile --output-file=requirements.txt requirements.in
#
attrs==19.3.0
# via
Expand Down Expand Up @@ -51,8 +51,6 @@ idna==2.8
# twisted
ijson==3.1.1
# via -r requirements.in
importlib-metadata==1.6.1
# via jsonschema
incremental==17.5.0
# via
# treq
Expand Down Expand Up @@ -118,6 +116,7 @@ rarfile==3.1
# via -r requirements.in
requests==2.22.0
# via
# -r requirements.in
# ocdsmerge
# treq
rfc3987==1.3.8
Expand Down Expand Up @@ -171,8 +170,6 @@ w3lib==1.21.0
# scrapy
xmltodict==0.12.0
# via flattentool
zipp==3.1.0
# via importlib-metadata
zope.interface==4.7.1
# via
# scrapy
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
aece853089689129b5daad83c9b608f78bf74471b6b67ffd7eb64cdca7530587 requirements.txt
e0b193ad7af55858f4e13d72eef4e400103c9812 requirements.txt
12 changes: 1 addition & 11 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# This file is autogenerated by pip-compile
# To update, run:
#
# pip-compile requirements_dev.in
# pip-compile --output-file=requirements_dev.txt requirements_dev.in
#
attrs==19.3.0
# via
Expand Down Expand Up @@ -90,12 +90,6 @@ idna==2.8
# twisted
ijson==3.1.1
# via -r requirements.txt
importlib-metadata==1.6.1
# via
# -r requirements.txt
# jsonschema
# pluggy
# pytest
incremental==17.5.0
# via
# -r requirements.txt
Expand Down Expand Up @@ -293,10 +287,6 @@ xmltodict==0.12.0
# via
# -r requirements.txt
# flattentool
zipp==3.1.0
# via
# -r requirements.txt
# importlib-metadata
zope.interface==4.7.1
# via
# -r requirements.txt
Expand Down
145 changes: 145 additions & 0 deletions tests/extensions/test_kingfisher_process_api_NG.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import os
from unittest.mock import MagicMock

import pytest
from scrapy.exceptions import NotConfigured

from kingfisher_scrapy.extensions import KingfisherFilesStore, KingfisherProcessNGAPI
from tests import spider_with_crawler, spider_with_files_store


class Response(object):
def json(self):
return self.json_response


def test_from_crawler():
spider = spider_with_crawler(settings={
'KINGFISHER_NG_API_URL': 'http://httpbin.org/anything',
'KINGFISHER_NG_API_USERNAME': 'xxx',
'KINGFISHER_NG_API_PASSWORD': 'password',
})

extension = KingfisherProcessNGAPI.from_crawler(spider.crawler)

assert extension.username == 'xxx'
assert extension.password == 'password'
assert extension.url == 'http://httpbin.org/anything'


def test_from_crawler_missing_uri():
spider = spider_with_crawler(settings={
"KINGFISHER_API_URL": "missing",
"KINGFISHER_NG_API_USERNAME": "aaa",
})

with pytest.raises(NotConfigured) as excinfo:
KingfisherProcessNGAPI.from_crawler(spider.crawler)

assert str(excinfo.value) == "KINGFISHER_NG_API_URL is not set."


def test_from_crawler_missing_password():
spider = spider_with_crawler(settings={
"KINGFISHER_NG_API_URL": "/some/uti",
"KINGFISHER_NG_API_USERNAME": "aaa",
})

with pytest.raises(NotConfigured) as excinfo:
KingfisherProcessNGAPI.from_crawler(spider.crawler)

assert str(excinfo.value) == "Both KINGFISHER_NG_API_USERNAME and KINGFISHER_NG_API_PASSWORD must be set."


def mocked_spider_open(cls, *args, **kwargs):
# Your custom testing override
return 1


def test_spider_opened(tmpdir):
spider = spider_with_files_store(tmpdir, settings={
'KINGFISHER_NG_API_URL': 'anything',
'KINGFISHER_NG_API_USERNAME': 'xxx',
'KINGFISHER_NG_API_PASSWORD': 'password',
})

extension = KingfisherProcessNGAPI.from_crawler(spider.crawler)

response = Response()
response.ok = True
response.json_response = {"collection_id": "1"}
extension._post_sync = MagicMock(return_value=response)
extension.spider_opened(spider)

extension._post_sync.assert_called_with('api/v1/create_collection',
{
'source_id': 'test',
'data_version': '2001-02-03 04:05:06',
'note': None,
'sample': None,
'compile': True,
'upgrade': False,
'check': True
})


def test_spider_closed(tmpdir):
spider = spider_with_files_store(tmpdir, settings={
'KINGFISHER_NG_API_URL': 'anything',
'KINGFISHER_NG_API_USERNAME': 'xxx',
'KINGFISHER_NG_API_PASSWORD': 'password',
})

extension = KingfisherProcessNGAPI.from_crawler(spider.crawler)
extension.collection_id = 1

response = Response()
response.ok = True
response.text = '{"collection_id":"1"}'

extension._post_async = MagicMock(return_value=response)
extension.spider_closed(spider, "done")

call_args = extension._post_async.call_args
call = call_args[0]

assert call[0] == "api/v1/close_collection"
assert call[1] == {'collection_id': 1, 'reason': 'done'}


def test_item_scraped(tmpdir):
settings = {
"KINGFISHER_API_LOCAL_DIRECTORY": str(tmpdir.join('xxx')),
'KINGFISHER_NG_API_URL': 'anything',
'KINGFISHER_NG_API_USERNAME': 'xxx',
'KINGFISHER_NG_API_PASSWORD': 'password',
}

spider = spider_with_files_store(tmpdir, settings=settings)
extension = KingfisherProcessNGAPI.from_crawler(spider.crawler)
extension.collection_id = 1

item = spider.build_file(
file_name='file.json',
url='https://example.com/remote.json',
data=b'{"key": "value"}',
data_type='release_package',
)

store_extension = KingfisherFilesStore.from_crawler(spider.crawler)
store_extension.item_scraped(item, spider)

response = Response()
response.ok = True
response.text = '{"collection_id":"1"}'

extension._post_async = MagicMock(return_value=response)
extension.item_scraped(item, spider)

call_args = extension._post_async.call_args
call = call_args[0]
assert call[0] == "api/v1/create_collection_file"
assert call[1] == {
'collection_id': 1,
'path': os.path.join(item['files_store'], item['path']),
'url': 'https://example.com/remote.json'}

0 comments on commit 899b325

Please sign in to comment.