-
Notifications
You must be signed in to change notification settings - Fork 4
/
plugin.py
96 lines (71 loc) · 2.91 KB
/
plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import json
import requests
import rdflib
from pylons import config
import ckan.plugins as p
from ckanext.dcat.interfaces import IDCATRDFHarvester
from ckanext.sweden.dcat import template_helpers
VALIDATION_SERVICE = 'https://validator.dcat-editor.com/service'
# Assume that remote files with this media types are RDF/XML
rdflib.plugin.register(
'application/octet-stream', rdflib.parser.Parser,
'rdflib.plugins.parsers.rdfxml', 'RDFXMLParser')
rdflib.plugin.register(
'application/rss+xml', rdflib.parser.Parser,
'rdflib.plugins.parsers.rdfxml', 'RDFXMLParser')
class SwedenDCATRDFHarvester(p.SingletonPlugin):
p.implements(IDCATRDFHarvester, inherit=True)
p.implements(p.IConfigurer)
p.implements(p.ITemplateHelpers)
def after_download(self, content, harvest_job):
if not p.toolkit.asbool(config.get('ckanext.sweden.harvest.use_validation', True)):
return content, []
validation_service = config.get('ckanext.sweden.harvest.validation_service', VALIDATION_SERVICE)
stop_on_errors = p.toolkit.asbool(config.get('ckanext.sweden.harvest.stop_on_validation_errors', False))
errors = []
try:
r = requests.post(validation_service, data=content)
except requests.exceptions.RequestException, e:
errors.append(p.toolkit._(
'Error contacting the validation service: {0}'.format(str(e)))
)
if stop_on_errors:
return None, errors
else:
return content, errors
if r.status_code != 200:
errors.append(p.toolkit._(
'The validation service returned an error: {0}'.format(
r.status_code)))
if stop_on_errors:
return None, errors
else:
return content, errors
else:
response = r.json()
if not any([response.get('rdfError'),
response.get('errors'),
response.get('warnings')]):
# All clear
return content, []
if response.get('rdfError'):
errors.append(response.get('rdfError'))
else:
if response.get('mandatoryError'):
for _class in response['mandatoryError']:
errors.append(p.toolkit._(
'Mandatory class {0} missing'.format(_class)))
for resource in response.get('resources', []):
errors.append(json.dumps(resource))
if stop_on_errors:
return None, errors
else:
return content, errors
# IConfigurer
def update_config(self, config):
p.toolkit.add_template_directory(config, 'templates')
# ITemplateHelpers
def get_helpers(self):
return {
'json_loads': template_helpers.json_loads,
}