-
Notifications
You must be signed in to change notification settings - Fork 224
/
test_txnode.py
172 lines (153 loc) · 6 KB
/
test_txnode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import py
import execnet
from xdist.txnode import TXNode
queue = py.builtin._tryimport("queue", "Queue")
Queue = queue.Queue
class EventQueue:
def __init__(self, registry, queue=None):
if queue is None:
queue = Queue()
self.queue = queue
registry.register(self)
def geteventargs(self, eventname, timeout=10.0):
events = []
while 1:
try:
eventcall = self.queue.get(timeout=timeout)
except queue.Empty:
#print "node channel", self.node.channel
#print "remoteerror", self.node.channel._getremoteerror()
py.builtin.print_("seen events", events)
raise IOError("did not see %r events" % (eventname))
else:
name, args, kwargs = eventcall
assert isinstance(name, str)
if name == eventname:
if args:
return args
return kwargs
events.append(name)
if name == "pytest_internalerror":
py.builtin.print_(str(kwargs["excrepr"]))
class MySetup:
def __init__(self, request):
self.id = 0
self.request = request
def geteventargs(self, eventname, timeout=10.0):
eq = EventQueue(self.config.pluginmanager, self.queue)
return eq.geteventargs(eventname, timeout=timeout)
def makenode(self, config=None, xspec="popen"):
if config is None:
testdir = self.request.getfuncargvalue("testdir")
config = testdir.reparseconfig([])
self.config = config
self.queue = Queue()
self.xspec = execnet.XSpec(xspec)
self.gateway = execnet.makegateway(self.xspec)
self.id += 1
self.gateway.id = str(self.id)
self.nodemanager = None
self.node = TXNode(self.nodemanager, self.gateway, self.config, putevent=self.queue.put)
assert not self.node.channel.isclosed()
return self.node
def xfinalize(self):
if hasattr(self, 'node'):
gw = self.node.gateway
py.builtin.print_("exiting:", gw)
gw.exit()
def pytest_funcarg__mysetup(request):
mysetup = MySetup(request)
#pyfuncitem.addfinalizer(mysetup.finalize)
return mysetup
def test_node_hash_equality(mysetup):
node = mysetup.makenode()
node2 = mysetup.makenode()
assert node != node2
assert node == node
assert not (node != node)
class TestMasterSlaveConnection:
def test_crash_invalid_item(self, mysetup):
node = mysetup.makenode()
node.send(123) # invalid item
kwargs = mysetup.geteventargs("pytest_testnodedown")
assert kwargs['node'] is node
#assert isinstance(kwargs['error'], execnet.RemoteError)
def test_crash_killed(self, testdir, mysetup):
if not hasattr(py.std.os, 'kill'):
py.test.skip("no os.kill")
item = testdir.getitem("""
def test_func():
import os
os.kill(os.getpid(), 9)
""")
node = mysetup.makenode(item.config)
node.send(item)
kwargs = mysetup.geteventargs("pytest_testnodedown")
assert kwargs['node'] is node
assert "Not properly terminated" in str(kwargs['error'])
def test_node_down(self, mysetup):
node = mysetup.makenode()
node.shutdown()
kwargs = mysetup.geteventargs("pytest_testnodedown")
assert kwargs['node'] is node
assert not kwargs['error']
node.callback(node.ENDMARK)
excinfo = py.test.raises(IOError,
"mysetup.geteventargs('testnodedown', timeout=0.01)")
def test_send_on_closed_channel(self, testdir, mysetup):
item = testdir.getitem("def test_func(): pass")
node = mysetup.makenode(item.config)
node.channel.close()
py.test.raises(IOError, "node.send(item)")
#ev = self.getcalls(pytest_internalerror)
#assert ev.excinfo.errisinstance(IOError)
def test_send_one(self, testdir, mysetup):
item = testdir.getitem("def test_func(): pass")
node = mysetup.makenode(item.config)
node.send(item)
kwargs = mysetup.geteventargs("pytest_runtest_logreport")
rep = kwargs['report']
assert rep.passed
py.builtin.print_(rep)
assert rep.item == item
def test_send_some(self, testdir, mysetup):
items = testdir.getitems("""
def test_pass():
pass
def test_fail():
assert 0
def test_skip():
import py
py.test.skip("x")
""")
node = mysetup.makenode(items[0].config)
for item in items:
node.send(item)
for outcome in "passed failed skipped".split():
kwargs = mysetup.geteventargs("pytest_runtest_logreport")
report = kwargs['report']
assert getattr(report, outcome)
node.sendlist(items)
for outcome in "passed failed skipped".split():
rep = mysetup.geteventargs("pytest_runtest_logreport")['report']
assert getattr(rep, outcome)
def test_send_one_with_env(self, testdir, mysetup, monkeypatch):
if execnet.XSpec("popen").env is None:
py.test.skip("requires execnet 1.0.7 or above")
monkeypatch.delenv('ENV1', raising=False)
monkeypatch.delenv('ENV2', raising=False)
monkeypatch.setenv('ENV3', 'var3')
item = testdir.getitem("""
def test_func():
import os
# ENV1, ENV2 set by xspec; ENV3 inherited from parent process
assert os.getenv('ENV2') == 'var2'
assert os.getenv('ENV1') == 'var1'
assert os.getenv('ENV3') == 'var3'
""")
node = mysetup.makenode(item.config,
xspec="popen//env:ENV1=var1//env:ENV2=var2")
node.send(item)
kwargs = mysetup.geteventargs("pytest_runtest_logreport")
rep = kwargs['report']
assert rep.passed