/
test.py
357 lines (276 loc) · 10.9 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
import os
import sys
import glob
import time
import socket
import tempfile
from multiprocessing import Process
if sys.version_info[1] < 7:
try:
import unittest2 as unittest
except ImportError:
print 'Test Suite requires Python 2.7 or unittest2'
sys.exit(1)
else:
import unittest
import mock
import flask
import httplib2
import scrapelib
app = flask.Flask(__name__)
app.config.shaky_fail = False
app.config.shaky_404_fail = False
@app.route('/')
def index():
resp = app.make_response("Hello world!")
return resp
@app.route('/ua')
def ua():
resp = app.make_response(flask.request.headers['user-agent'])
resp.headers['cache-control'] = 'no-cache'
return resp
@app.route('/p/s.html')
def secret():
return "secret"
@app.route('/redirect')
def redirect():
return flask.redirect(flask.url_for('index'))
@app.route('/500')
def fivehundred():
flask.abort(500)
@app.route('/robots.txt')
def robots():
return """
User-agent: *
Disallow: /p/
Allow: /
"""
@app.route('/shaky')
def shaky():
# toggle failure state each time
app.config.shaky_fail = not app.config.shaky_fail
if app.config.shaky_fail:
flask.abort(500)
else:
return "shaky success!"
@app.route('/shaky404')
def shaky404():
# toggle failure state each time
app.config.shaky_404_fail = not app.config.shaky_404_fail
if app.config.shaky_404_fail:
flask.abort(404)
else:
return "shaky404 success!"
def run_server():
class NullFile(object):
def write(self, s):
pass
sys.stdout = NullFile()
sys.stderr = NullFile()
app.run()
class HeaderTest(unittest.TestCase):
def test_keys(self):
h = scrapelib.Headers()
h['A'] = '1'
self.assertEqual(h['A'], '1')
self.assertEqual(h.getallmatchingheaders('A'), ["A: 1"])
self.assertEqual(h.getallmatchingheaders('b'), [])
self.assertEqual(h.getheaders('A'), ['1'])
self.assertEqual(h.getheaders('b'), [])
# should be case-insensitive
self.assertEqual(h['a'], '1')
h['a'] = '2'
self.assertEqual(h['A'], '2')
self.assert_('a' in h)
self.assert_('A' in h)
del h['A']
self.assert_('a' not in h)
self.assert_('A' not in h)
def test_equality(self):
h1 = scrapelib.Headers()
h1['Accept-Encoding'] = '*'
h2 = scrapelib.Headers()
self.assertNotEqual(h1, h2)
h2['accept-encoding'] = 'not'
self.assertNotEqual(h1, h2)
h2['accept-encoding'] = '*'
self.assertEqual(h1, h2)
class ScraperTest(unittest.TestCase):
def setUp(self):
self.cache_dir = tempfile.mkdtemp()
self.error_dir = tempfile.mkdtemp()
self.s = scrapelib.Scraper(requests_per_minute=0,
error_dir=self.error_dir,
cache_dir=self.cache_dir,
use_cache_first=True)
def tearDown(self):
for path in glob.iglob(os.path.join(self.cache_dir, "*")):
os.remove(path)
os.rmdir(self.cache_dir)
for path in glob.iglob(os.path.join(self.error_dir, "*")):
os.remove(path)
os.rmdir(self.error_dir)
def test_get(self):
self.assertEqual('Hello world!',
self.s.urlopen("http://localhost:5000/"))
def test_request_throttling(self):
requests = 0
s = scrapelib.Scraper(requests_per_minute=30)
self.assertEqual(s.requests_per_minute, 30)
begin = time.time()
while time.time() <= (begin + 1):
s.urlopen("http://localhost:5000/")
requests += 1
self.assert_(requests <= 2)
s.requests_per_minute = 500
requests = 0
begin = time.time()
while time.time() <= (begin + 1):
s.urlopen("http://localhost:5000/")
requests += 1
self.assert_(requests > 5)
def test_user_agent(self):
resp = self.s.urlopen("http://localhost:5000/ua")
self.assertEqual(resp, scrapelib._user_agent)
self.s.user_agent = 'a different agent'
resp = self.s.urlopen("http://localhost:5000/ua")
self.assertEqual(resp, 'a different agent')
def test_default_to_http(self):
self.assertEqual('Hello world!',
self.s.urlopen("localhost:5000/"))
def test_follow_robots(self):
self.assertRaises(scrapelib.RobotExclusionError, self.s.urlopen,
"http://localhost:5000/p/s.html")
self.assertRaises(scrapelib.RobotExclusionError, self.s.urlopen,
"http://localhost:5000/p/a/t/h/")
self.s.follow_robots = False
self.assertEqual("secret",
self.s.urlopen("http://localhost:5000/p/s.html"))
self.assertRaises(scrapelib.HTTPError, self.s.urlopen,
"http://localhost:5000/p/a/t/h/")
def test_error_context(self):
def raises():
with self.s.urlopen("http://localhost:5000/"):
raise Exception('test')
self.assertRaises(Exception, raises)
self.assertTrue(os.path.isfile(os.path.join(
self.error_dir, "http:,,localhost:5000,")))
def test_404(self):
self.assertRaises(scrapelib.HTTPError, self.s.urlopen,
"http://localhost:5000/does/not/exist")
self.s.raise_errors = False
resp = self.s.urlopen("http://localhost:5000/does/not/exist")
self.assertEqual(404, resp.response.code)
def test_500(self):
self.assertRaises(scrapelib.HTTPError, self.s.urlopen,
"http://localhost:5000/500")
self.s.raise_errors = False
resp = self.s.urlopen("http://localhost:5000/500")
self.assertEqual(resp.response.code, 500)
def test_follow_redirect(self):
resp = self.s.urlopen("http://localhost:5000/redirect")
self.assertEqual("http://localhost:5000/", resp.response.url)
self.assertEqual("http://localhost:5000/redirect",
resp.response.requested_url)
self.assertEqual(200, resp.response.code)
self.s.follow_redirects = False
resp = self.s.urlopen("http://localhost:5000/redirect")
self.assertEqual("http://localhost:5000/redirect",
resp.response.url)
self.assertEqual("http://localhost:5000/redirect",
resp.response.requested_url)
self.assertEqual(302, resp.response.code)
def test_caching(self):
resp = self.s.urlopen("http://localhost:5000/")
self.assertFalse(resp.response.fromcache)
resp = self.s.urlopen("http://localhost:5000/")
self.assert_(resp.response.fromcache)
self.s.use_cache_first = False
resp = self.s.urlopen("http://localhost:5000/")
self.assertFalse(resp.response.fromcache)
def test_urlretrieve(self):
fname, resp = self.s.urlretrieve("http://localhost:5000/")
with open(fname) as f:
self.assertEqual(f.read(), "Hello world!")
self.assertEqual(200, resp.code)
os.remove(fname)
(fh, set_fname) = tempfile.mkstemp()
fname, resp = self.s.urlretrieve("http://localhost:5000/",
set_fname)
self.assertEqual(fname, set_fname)
with open(set_fname) as f:
self.assertEqual(f.read(), "Hello world!")
self.assertEqual(200, resp.code)
os.remove(set_fname)
# TODO: on these retry tests it'd be nice to ensure that it tries
# 3 times for 500 and once for 404
def test_retry_httplib2(self):
s = scrapelib.Scraper(retry_attempts=3, retry_wait_seconds=0.1)
# one failure, then success
resp, content = s._do_request('http://localhost:5000/shaky',
'GET', None, {})
self.assertEqual(content, 'shaky success!')
# 500 always
resp, content = s._do_request('http://localhost:5000/500',
'GET', None, {})
self.assertEqual(resp.code, 500)
def test_retry_httplib2_404(self):
s = scrapelib.Scraper(retry_attempts=3, retry_wait_seconds=0.1)
# like shaky but raises a 404
resp, content = s._do_request('http://localhost:5000/shaky404',
'GET', None, {}, retry_on_404=True)
self.assertEqual(content, 'shaky404 success!')
# 404
resp, content = s._do_request('http://localhost:5000/404',
'GET', None, {})
self.assertEqual(resp.code, 404)
def test_socket_retry(self):
orig_request = httplib2.Http().request
count = []
# On the first call raise socket.timeout
# On subsequent calls pass through to httplib2.Http.request
def side_effect(*args, **kwargs):
if count:
return orig_request(*args, **kwargs)
count.append(1)
raise socket.timeout('timed out :(')
mock_request = mock.Mock(side_effect=side_effect)
with mock.patch.object(httplib2.Http, 'request', mock_request):
s = scrapelib.Scraper(retry_attempts=0, retry_wait_seconds=0.1)
self.assertRaises(socket.timeout, self.s.urlopen,
"http://localhost:5000/")
mock_request.reset_mock()
count = []
with mock.patch.object(httplib2.Http, 'request', mock_request):
s = scrapelib.Scraper(retry_attempts=2, retry_wait_seconds=0.1)
resp = s.urlopen("http://localhost:5000/")
self.assertEqual(resp, "Hello world!")
def test_disable_compression(self):
s = scrapelib.Scraper(disable_compression=True)
headers = s._make_headers("http://google.com")
self.assertEqual(headers['accept-encoding'], 'text/*')
# A supplied Accept-Encoding headers overrides the
# disable_compression option
s.headers['accept-encoding'] = '*'
headers = s._make_headers('http://google.com')
self.assertEqual(headers['accept-encoding'], '*')
def test_callable_headers(self):
s = scrapelib.Scraper(headers=lambda url: {'URL': url})
headers = s._make_headers('http://google.com')
self.assertEqual(headers['url'], 'http://google.com')
# Make sure it gets called freshly each time
headers = s._make_headers('example.com')
self.assertEqual(headers['url'], 'example.com')
def test_ftp_method_restrictions(self):
# only http(s) supports non-'GET' requests
self.assertRaises(scrapelib.HTTPMethodUnavailableError,
lambda: self.s.urlopen("ftp://google.com",
method='POST'))
if __name__ == '__main__':
process = Process(target=run_server)
process.start()
time.sleep(0.1)
try:
unittest.main()
finally:
process.terminate()