-
Notifications
You must be signed in to change notification settings - Fork 149
/
test_filesystem.py
70 lines (59 loc) · 2.5 KB
/
test_filesystem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from XRootD import client
from XRootD.client import AsyncResponseHandler
from XRootD.enums import OpenFlags, QueryCode, MkDirFlags, AccessMode, \
DirListFlags, PrepareFlags
import pytest
def test_filesystem():
c = client.Client("root://localhost")
funcspecs = [#(c.locate, ('/tmp', OpenFlags.REFRESH), True),
#(c.deeplocate, ('/tmp', OpenFlags.REFRESH), True),
(c.query, (QueryCode.SPACE, '/tmp'), True),
(c.truncate, ('/tmp/spam', 1000), False),
(c.mv, ('/tmp/spam', '/tmp/ham'), False),
(c.chmod, ('/tmp/ham', AccessMode.UR | AccessMode.UW), False),
(c.rm, ('/tmp/ham',), False),
(c.mkdir, ('/tmp/somedir', MkDirFlags.MAKEPATH), False),
(c.rmdir, ('/tmp/somedir',), False),
(c.ping, (), False),
(c.stat, ('/tmp',), True),
(c.statvfs, ('/tmp',), True),
(c.protocol, (), True),
(c.dirlist, ('/tmp', DirListFlags.STAT), True),
(c.sendinfo, ('important info',), False),
(c.prepare, (['/tmp/foo'], PrepareFlags.STAGE), True),
]
for func, args, hasReturnObject in funcspecs:
sync (func, args, hasReturnObject)
# Create new temp file
f = client.File()
status, response = f.open('root://localhost//tmp/spam', OpenFlags.NEW)
for func, args, hasReturnObject in funcspecs:
async(func, args, hasReturnObject)
def sync(func, args, hasReturnObject):
status, response = func(*args)
print status
assert status.ok
if hasReturnObject:
print response
assert response
def async(func, args, hasReturnObject):
handler = AsyncResponseHandler()
status = func(callback=handler, *args)
print status
assert status.ok
status, response, hostlist = handler.waitFor()
#assert 0
assert status.ok
if response:
print response
assert response
#assert hostList
def test_args():
c = client.Client("root://localhost")
status, response = c.locate(path="/tmp", flags=0, timeout=1)
assert status
assert response
pytest.raises(TypeError, 'c.locate(path="/tmp")')
pytest.raises(TypeError, 'c.locate(path="/tmp", foo=1)')
pytest.raises(TypeError, 'c.locate(foo="/tmp")')
pytest.raises(TypeError, 'c.locate(path="/tmp", flags=1, foo=0)')