-
Notifications
You must be signed in to change notification settings - Fork 31
/
srunit.py
499 lines (420 loc) · 15.6 KB
/
srunit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
# -*- coding: utf-8 -*-
u"""Support for unit tests
:copyright: Copyright (c) 2016 RadiaSoft LLC. All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from __future__ import absolute_import, division, print_function
import flask
import flask.testing
import json
import re
from pykern.pkcollections import PKDict
#: Default "app"
MYAPP = 'myapp'
#: import sirepo.server
server = None
#: app result from server.init
app = None
#: Matches javascript-redirect.html
_JAVASCRIPT_REDIRECT_RE = re.compile(r'window.location = "([^"]+)"')
#: set by conftest.py
CONFTEST_ALL_CODES = None
def flask_client(cfg=None, sim_types=None):
"""Return FlaskClient with easy access methods.
Creates a new run directory every test file so can assume
sharing of state on the server within a file (module).
Two methods of interest: `sr_post` and `sr_get`.
Args:
cfg (dict): extra configuration for reset_state_for_testing
sim_types (str): value for SIREPO_FEATURE_CONFIG_SIM_TYPES [CONFTEST_ALL_CODES]
Returns:
FlaskClient: for local requests to Flask server
"""
global server, app
a = 'srunit_flask_client'
if not cfg:
cfg = PKDict()
t = sim_types or CONFTEST_ALL_CODES
if t:
if isinstance(t, (tuple, list)):
t = ':'.join(t)
cfg['SIREPO_FEATURE_CONFIG_SIM_TYPES'] = t
if not (server and hasattr(app, a)):
from pykern import pkconfig
# initialize pkdebug with correct values
pkconfig.reset_state_for_testing(cfg)
from pykern import pkunit
with pkunit.save_chdir_work() as wd:
from pykern import pkio
cfg['SIREPO_SRDB_ROOT'] = str(pkio.mkdir_parent(wd.join('db')))
pkconfig.reset_state_for_testing(cfg)
from sirepo import server as s
server = s
app = server.init()
app.config['TESTING'] = True
app.test_client_class = _TestClient
setattr(app, a, app.test_client())
return getattr(app, a)
def init_auth_db():
"""Force a request that creates a user in db with just myapp"""
fc = flask_client(sim_types=MYAPP)
fc.sr_login_as_guest()
return fc, fc.sr_post('listSimulations', {'simulationType': fc.sr_sim_type})
def sim_data(sim_name=None, sim_type=None, sim_types=CONFTEST_ALL_CODES, cfg=None):
"""Get simulation data
Args:
sim_name (str): full name of simulation
sim_type (str): app [defaults to myapp]
sim_types (str): `SIREPO_FEATURE_CONFIG_SIM_TYPES` value
Returns:
PKDict: simulation data
object: flask client
"""
fc = flask_client(sim_types=sim_types or [sim_type or MYAPP], cfg=cfg)
fc.sr_login_as_guest()
return fc.sr_sim_data(sim_name=sim_name, sim_type=sim_type), fc
def test_in_request(op, cfg=None, before_request=None, headers=None, want_cookie=True, want_user=True, **kwargs):
fc = flask_client(cfg, **kwargs)
try:
from pykern import pkunit
from pykern import pkcollections
if before_request:
before_request(fc)
setattr(
server._app,
server.SRUNIT_TEST_IN_REQUEST,
PKDict(op=op, want_cookie=want_cookie, want_user=want_user),
)
from sirepo import uri_router
r = fc.get(
uri_router.srunit_uri,
headers=headers,
)
pkunit.pkeq(200, r.status_code, 'FAIL: status={}', r.status)
if r.mimetype == 'text/html':
m = _JAVASCRIPT_REDIRECT_RE.search(r.data)
if m:
pkunit.pkfail('redirect={}', m.group(1))
pkunit.pkfail('other html response={}', r.data)
d = pkcollections.json_load_any(r.data)
pkunit.pkeq('ok', d.get('state'), 'FAIL: data={}', d)
finally:
try:
delattr(server._app, server.SRUNIT_TEST_IN_REQUEST)
except AttributeError:
pass
return r
def wrap_in_request(*args, **kwargs):
"""Decorator for calling functions in `test_in_request`
Examples:
# note that the parens are required
@srunit.wrap_in_request()
def test_simple():
inside request context
@srunit.wrap_in_request(cfg={'SIREPO_AUTH_METHODS': 'github:guest'})
def test_myapp():
inside a request context here
Args:
func (callable): function to be wrapped
kwargs (dict): passed to test_in_request
Returns:
callable: replacement function
"""
def _decorator(func):
def _wrapper(*ignore_args, **ignore_kwargs):
try:
return test_in_request(lambda: func(), **kwargs)
finally:
if hasattr(flask, 'g') and 'sirepo_cookie' in flask.g:
del flask.g['sirepo_cookie']
return _wrapper
return _decorator
class _TestClient(flask.testing.FlaskClient):
SR_SIM_TYPE_DEFAULT = MYAPP
def __init__(self, *args, **kwargs):
super(_TestClient, self).__init__(*args, **kwargs)
self.sr_uid = None
self.sr_sim_type = None
def sr_auth_state(self, **kwargs):
"""Gets authState and prases
Returns:
dict: parsed auth_state
"""
from pykern import pkunit
import pykern.pkcollections
m = re.search(r'(\{.*\})', self.sr_get('authState').data)
s = pykern.pkcollections.json_load_any(m.group(1))
for k, v in kwargs.items():
pkunit.pkeq(
v,
s[k],
'key={} expected={} != actual={}: auth_state={}',
k,
v,
s[k],
s,
)
return s
def sr_get(self, route_or_uri, params=None, query=None, **kwargs):
"""Gets a request to route_or_uri to server
Args:
route_or_uri (str): string name of route or uri if contains '/' (http:// or '/foo')
params (dict): optional params to route_or_uri
Returns:
flask.Response: reply object
"""
return self.__req(route_or_uri, params, query, self.get, raw_response=True, **kwargs)
def sr_get_json(self, route_or_uri, params=None, query=None, headers=None, **kwargs):
"""Gets a request to route_or_uri to server
Args:
route_or_uri (str): identifies route in schema-common.json
params (dict): optional params to route_or_uri
Returns:
object: Parsed JSON result
"""
return self.__req(
route_or_uri,
params,
query,
lambda r: self.get(r, headers=headers),
raw_response=False,
**kwargs
)
def sr_get_root(self, sim_type=None, **kwargs):
"""Gets root app for sim_type
Args:
sim_type (str): app name ['myapp' or default type]
Returns:
flask.Response: reply object
"""
self.sr_sim_type_set(sim_type)
return self.__req(
'root',
{'simulation_type': self.sr_sim_type},
None,
self.get,
raw_response=True,
**kwargs
)
def sr_login_as_guest(self, sim_type=None):
"""Sets up a guest login
Args:
sim_type (str): simulation type ['myapp']
Returns:
str: new user id
"""
self.sr_sim_type_set(sim_type)
self.cookie_jar.clear()
# Get a cookie
self.sr_get('authState')
self.sr_get('authGuestLogin', {'simulation_type': self.sr_sim_type})
self.sr_uid = self.sr_auth_state(needCompleteRegistration=False, isLoggedIn=True).uid
return self.sr_uid
def sr_logout(self):
"""Logout but leave cookie in place
Returns:
object: self
"""
self.sr_uid = None
self.sr_get('authLogout', PKDict(simulation_type=self.sr_sim_type))
return self
def sr_post(self, route_or_uri, data, params=None, raw_response=False, **kwargs):
"""Posts JSON data to route_or_uri to server
File parameters are posted as::
Args:
route_or_uri (str): string name of route or uri if contains '/' (http:// or '/foo')
data (object): will be formatted as form data
params (dict): optional params to route_or_uri
Returns:
object: Parsed JSON result
"""
op = lambda r: self.post(r, data=json.dumps(data), content_type='application/json')
return self.__req(route_or_uri, params, {}, op, raw_response=raw_response, **kwargs)
def sr_post_form(self, route_or_uri, data, params=None, raw_response=False, file=None, **kwargs):
"""Posts form data to route_or_uri to server with data
Args:
route_or_uri (str): identifies route in schema-common.json
data (dict): will be formatted as JSON
params (dict): optional params to route_or_uri
file (object): if str, will look in data_dir, else assumed py.path
Returns:
object: Parsed JSON result
"""
from pykern import pkunit, pkconfig
if file:
p = file
if isinstance(p, pkconfig.STRING_TYPES):
p = pkunit.data_dir().join(p)
data.file = (open(str(p), 'rb'), p.basename)
return self.__req(
route_or_uri,
params,
PKDict(),
lambda r: self.post(r, data=data),
raw_response=raw_response,
**kwargs
)
def sr_run_sim(self, data, model, expect_completed=True, timeout=7, **post_args):
from pykern import pkunit
from pykern.pkdebug import pkdlog, pkdexc
import time
cancel = None
try:
r = self.sr_post(
'runSimulation',
PKDict(
models=data.models,
report=model,
simulationId=data.models.simulation.simulationId,
simulationType=data.simulationType,
).pkupdate(**post_args),
)
if r.state == 'completed':
return r
pkunit.pkeq('pending', r.state, 'not pending, run={}', r)
cancel = r.nextRequest
for _ in range(timeout):
if r.state in ('completed', 'error'):
pkdlog(r.state)
cancel = None
break
r = self.sr_post('runStatus', r.nextRequest)
time.sleep(1)
else:
pkunit.pkok(not expect_completed, 'did not complete: runStatus={}', r)
if expect_completed:
pkunit.pkeq('completed', r.state)
return r
finally:
if cancel:
pkdlog('runCancel')
self.sr_post('runCancel', cancel)
import subprocess
o = subprocess.check_output(['ps', 'axww'], stderr=subprocess.STDOUT)
o = filter(lambda x: 'mpiexec' in x, o.split('\n'))
if o:
pkdlog('found "mpiexec" after cancel in ps={}', '\n'.join(o))
# this exception won't be seen because in finally
raise AssertionError('cancel failed')
def sr_sim_data(self, sim_name=None, sim_type=None):
"""Return simulation data by name
Args:
sim_name (str): case sensitive name ['Scooby Doo']
sim_type (str): app ['myapp']
Returns:
dict: data
"""
from pykern import pkunit
from pykern.pkdebug import pkdpretty
self.sr_sim_type_set(sim_type)
if not sim_name:
sim_name = 'Scooby Doo'
d = self.sr_post(
'listSimulations',
PKDict(
simulationType=self.sr_sim_type,
search=PKDict({'simulation.name': sim_name}),
)
)
assert 1 == len(d), \
'listSimulations name={} returned count={}'.format(sim_name, len(d))
d = d[0].simulation
res = self.sr_get_json(
'simulationData',
PKDict(
simulation_type=self.sr_sim_type,
pretty='0',
simulation_id=d.simulationId,
),
)
pkunit.pkeq(sim_name, res.models.simulation.name)
return res
def sr_sim_type_set(self, sim_type=None):
"""Set `sr_sim_type
Args:
sim_type (str): app name
Returns:
object: self
"""
self.sr_sim_type = sim_type or self.sr_sim_type or self.SR_SIM_TYPE_DEFAULT
return self
def __req(self, route_or_uri, params, query, op, raw_response, **kwargs):
"""Make request and parse result
Args:
route_or_uri (str): string name of route or uri if contains '/' (http:// or '/foo')
params (dict): parameters to apply to route
op (func): how to request
Returns:
object: parsed JSON result
"""
from pykern.pkdebug import pkdlog, pkdexc, pkdc, pkdp
import pykern.pkjson
import sirepo.http_reply
import sirepo.uri
import sirepo.util
redirects = kwargs.setdefault('__redirects', 0) + 1
assert redirects <= 5
kwargs['__redirects'] = redirects
u = None
r = None
try:
u = sirepo.uri.server_route(route_or_uri, params, query)
pkdc('uri={}', u)
r = op(u)
pkdc('status={} data={}', r.status_code, r.data)
# Emulate code in sirepo.js to deal with redirects
if r.status_code == 200 and r.mimetype == 'text/html':
m = _JAVASCRIPT_REDIRECT_RE.search(r.data)
if m:
if m.group(1).endswith('#/error'):
raise sirepo.util.Error(
PKDict(error='server error uri={}'.format(m.group(1))),
)
if kwargs.get('redirect', True):
# Execute the redirect
return self.__req(
m.group(1),
None,
None,
self.get,
raw_response,
__redirects=redirects,
)
return flask.redirect(m.group(1))
if r.status_code in (301, 302, 303, 305, 307, 308):
if kwargs.get('redirect', True):
# Execute the redirect
return self.__req(
r.headers['Location'],
None,
None,
self.get,
raw_response,
__redirects=redirects,
)
return r
if raw_response:
return r
# Treat SRException as a real exception (so we don't ignore them)
d = pykern.pkjson.load_any(r.data)
if (
isinstance(d, dict)
and d.get('state') == sirepo.http_reply.SR_EXCEPTION_STATE
):
raise sirepo.util.SRException(
d.srException.routeName,
d.srException.params,
)
return d
except Exception as e:
if not isinstance(e, (sirepo.util.Reply)):
pkdlog(
'Exception: {}: msg={} uri={} status={} data={} stack={}',
type(e),
e,
u,
r and r.status_code,
r and r.data,
pkdexc(),
)
raise