/
feedexport.py
256 lines (207 loc) · 8.16 KB
/
feedexport.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
"""
Feed Exports extension
See documentation in docs/topics/feed-exports.rst
"""
import os
import sys
import logging
import posixpath
from tempfile import NamedTemporaryFile
from datetime import datetime
import six
from six.moves.urllib.parse import urlparse
from ftplib import FTP
from zope.interface import Interface, implementer
from twisted.internet import defer, threads
from w3lib.url import file_uri_to_path
from scrapy import signals
from scrapy.utils.ftp import ftp_makedirs_cwd
from scrapy.exceptions import NotConfigured
from scrapy.utils.misc import load_object
from scrapy.utils.log import failure_to_exc_info
from scrapy.utils.python import without_none_values
from scrapy.utils.boto import is_botocore
logger = logging.getLogger(__name__)
class IFeedStorage(Interface):
"""Interface that all Feed Storages must implement"""
def __init__(uri):
"""Initialize the storage with the parameters given in the URI"""
def open(spider):
"""Open the storage for the given spider. It must return a file-like
object that will be used for the exporters"""
def store(file):
"""Store the given file stream"""
@implementer(IFeedStorage)
class BlockingFeedStorage(object):
def open(self, spider):
path = spider.crawler.settings['FEED_TEMPDIR']
if path and not os.path.isdir(path):
raise OSError('Not a Directory: ' + str(path))
return NamedTemporaryFile(prefix='feed-', dir=path)
def store(self, file):
return threads.deferToThread(self._store_in_thread, file)
def _store_in_thread(self, file):
raise NotImplementedError
@implementer(IFeedStorage)
class StdoutFeedStorage(object):
def __init__(self, uri, _stdout=None):
if not _stdout:
_stdout = sys.stdout if six.PY2 else sys.stdout.buffer
self._stdout = _stdout
def open(self, spider):
return self._stdout
def store(self, file):
pass
@implementer(IFeedStorage)
class FileFeedStorage(object):
def __init__(self, uri):
self.path = file_uri_to_path(uri)
def open(self, spider):
dirname = os.path.dirname(self.path)
if dirname and not os.path.exists(dirname):
os.makedirs(dirname)
return open(self.path, 'ab')
def store(self, file):
file.close()
class S3FeedStorage(BlockingFeedStorage):
def __init__(self, uri):
from scrapy.conf import settings
u = urlparse(uri)
self.bucketname = u.hostname
self.access_key = u.username or settings['AWS_ACCESS_KEY_ID']
self.secret_key = u.password or settings['AWS_SECRET_ACCESS_KEY']
self.is_botocore = is_botocore()
self.keyname = u.path[1:] # remove first "/"
if self.is_botocore:
import botocore.session
session = botocore.session.get_session()
self.s3_client = session.create_client(
's3', aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key)
else:
import boto
self.connect_s3 = boto.connect_s3
def _store_in_thread(self, file):
file.seek(0)
if self.is_botocore:
self.s3_client.put_object(
Bucket=self.bucketname, Key=self.keyname, Body=file)
else:
conn = self.connect_s3(self.access_key, self.secret_key)
bucket = conn.get_bucket(self.bucketname, validate=False)
key = bucket.new_key(self.keyname)
key.set_contents_from_file(file)
key.close()
class FTPFeedStorage(BlockingFeedStorage):
def __init__(self, uri):
u = urlparse(uri)
self.host = u.hostname
self.port = int(u.port or '21')
self.username = u.username
self.password = u.password
self.path = u.path
def _store_in_thread(self, file):
file.seek(0)
ftp = FTP()
ftp.connect(self.host, self.port)
ftp.login(self.username, self.password)
dirname, filename = posixpath.split(self.path)
ftp_makedirs_cwd(ftp, dirname)
ftp.storbinary('STOR %s' % filename, file)
ftp.quit()
class SpiderSlot(object):
def __init__(self, file, exporter, storage, uri):
self.file = file
self.exporter = exporter
self.storage = storage
self.uri = uri
self.itemcount = 0
class FeedExporter(object):
def __init__(self, settings):
self.settings = settings
self.urifmt = settings['FEED_URI']
if not self.urifmt:
raise NotConfigured
self.format = settings['FEED_FORMAT'].lower()
self.storages = self._load_components('FEED_STORAGES')
self.exporters = self._load_components('FEED_EXPORTERS')
if not self._storage_supported(self.urifmt):
raise NotConfigured
if not self._exporter_supported(self.format):
raise NotConfigured
self.store_empty = settings.getbool('FEED_STORE_EMPTY')
self.export_fields = settings.getlist('FEED_EXPORT_FIELDS') or None
uripar = settings['FEED_URI_PARAMS']
self._uripar = load_object(uripar) if uripar else lambda x, y: None
@classmethod
def from_crawler(cls, crawler):
o = cls(crawler.settings)
crawler.signals.connect(o.open_spider, signals.spider_opened)
crawler.signals.connect(o.close_spider, signals.spider_closed)
crawler.signals.connect(o.item_scraped, signals.item_scraped)
return o
def open_spider(self, spider):
uri = self.urifmt % self._get_uri_params(spider)
storage = self._get_storage(uri)
file = storage.open(spider)
exporter = self._get_exporter(file, fields_to_export=self.export_fields)
exporter.start_exporting()
self.slot = SpiderSlot(file, exporter, storage, uri)
def close_spider(self, spider):
slot = self.slot
if not slot.itemcount and not self.store_empty:
return
slot.exporter.finish_exporting()
logfmt = "%s %%(format)s feed (%%(itemcount)d items) in: %%(uri)s"
log_args = {'format': self.format,
'itemcount': slot.itemcount,
'uri': slot.uri}
d = defer.maybeDeferred(slot.storage.store, slot.file)
d.addCallback(lambda _: logger.info(logfmt % "Stored", log_args,
extra={'spider': spider}))
d.addErrback(lambda f: logger.error(logfmt % "Error storing", log_args,
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
return d
def item_scraped(self, item, spider):
slot = self.slot
slot.exporter.export_item(item)
slot.itemcount += 1
return item
def _load_components(self, setting_prefix):
conf = without_none_values(self.settings.getwithbase(setting_prefix))
d = {}
for k, v in conf.items():
try:
d[k] = load_object(v)
except NotConfigured:
pass
return d
def _exporter_supported(self, format):
if format in self.exporters:
return True
logger.error("Unknown feed format: %(format)s", {'format': format})
def _storage_supported(self, uri):
scheme = urlparse(uri).scheme
if scheme in self.storages:
try:
self._get_storage(uri)
return True
except NotConfigured:
logger.error("Disabled feed storage scheme: %(scheme)s",
{'scheme': scheme})
else:
logger.error("Unknown feed storage scheme: %(scheme)s",
{'scheme': scheme})
def _get_exporter(self, *args, **kwargs):
return self.exporters[self.format](*args, **kwargs)
def _get_storage(self, uri):
return self.storages[urlparse(uri).scheme](uri)
def _get_uri_params(self, spider):
params = {}
for k in dir(spider):
params[k] = getattr(spider, k)
ts = datetime.utcnow().replace(microsecond=0).isoformat().replace(':', '-')
params['time'] = ts
self._uripar(params, spider)
return params