-
Notifications
You must be signed in to change notification settings - Fork 9
/
tests.py
342 lines (265 loc) · 9.41 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
from untwisted.event import ACCEPT, CLOSE, CONNECT, \
CONNECT_ERR, READ, LOAD, DUMPED, DONE, SSL_ACCEPT, SSL_CONNECT, SSL_CONNECT_ERR, SSL_ACCEPT_ERR
from untwisted.network import SuperSocket, SuperSocketSSL
from untwisted.timer import Timer
from threading import Thread
from untwisted.job import Job
from untwisted.waker import waker
from untwisted.splits import Terminator, AccUntil
from untwisted.sock_writer import SockWriter
from untwisted.sock_reader import SockReader
from untwisted.client import Client, create_client, lose, ClientSSL
from untwisted.dispatcher import Dispatcher, Stop, Erase
from untwisted.server import create_server, create_server_ssl
from untwisted.core import die
from untwisted import core
import unittest
import time
from socket import socket, AF_INET, SOCK_STREAM
import ssl
class TestExpect(unittest.TestCase):
def setUp(self):
pass
class TestDispatcher(unittest.TestCase):
def setUp(self):
self.dispatcher = Dispatcher()
self.dispatcher.add_map('event0', self.handle_event0, False, True)
self.dispatcher.add_map('event1', self.handle_event1, False)
self.dispatcher.add_map('event2', self.handle_event2, False)
self.dispatcher.add_map('event3', self.handle_event3, False)
self.dispatcher.add_map('event4', self.handle_event4)
self.wrapper = self.dispatcher.once('event5', self.handle_event5)
def handle_event0(self, dispatcher, value0, value1):
self.assertEqual(value1, True)
self.assertEqual(value0, False)
def handle_event1(self, dispatcher, value0, value1, value2):
self.assertEqual(value2, False)
self.assertEqual(value1, True)
self.assertEqual(value0, True)
def handle_event2(self, dispatcher, value0, value1):
self.assertEqual(value1, False)
self.assertEqual(value0, True)
def handle_event3(self, dispatcher, value):
self.assertEqual(value, False)
def handle_event4(self, dispatcher):
raise Stop
def handle_event5(self, dispatcher):
maps = dispatcher.base['event5']
self.assertNotIn((self.wrapper, ()), maps)
def test_drive(self):
self.dispatcher.drive('event0')
self.dispatcher.drive('event1', True, True)
self.dispatcher.drive('event2', True)
self.dispatcher.drive('event3')
self.dispatcher.drive('event4')
def test_once(self):
self.dispatcher.drive('event5')
class TestJob(unittest.TestCase):
def setUp(self):
job = Job(self.do_job)
job.add_map(DONE, self.handle_done)
self.retval = False
def handle_done(self, job, retval):
self.retval = retval
die()
def do_job(self):
time.sleep(1)
return True
def test_job(self):
core.gear.mainloop()
self.assertEqual(self.retval, True)
class TestTask(unittest.TestCase):
def setUp(self):
pass
class TestClient(unittest.TestCase):
def setUp(self):
self.server = create_server('0.0.0.0', 1236, 5)
self.client = create_client('0.0.0.0', 1236)
self.client.add_map(CONNECT, self.handle_connect)
def handle_connect(self, client):
print('Connected!', client)
lose(client)
lose(self.server)
die()
def test_accept(self):
core.gear.mainloop()
class TestLose(unittest.TestCase):
def setUp(self):
self.server = create_server('0.0.0.0', 1235, 5)
self.client = SuperSocket()
self.client.connect_ex(('0.0.0.0', 1235))
Client(self.client)
self.client.add_map(CONNECT, self.handle_connect)
self.server.add_map(ACCEPT, self.handle_accept)
def handle_accept(self, server, ssock):
self.ssock = ssock
self.ssock.add_map(CLOSE, lambda ssock, err: die())
def handle_connect(self, client):
lose(client)
lose(self.server)
def test_accept(self):
core.gear.mainloop()
class TestServer(unittest.TestCase):
def setUp(self):
self.server = create_server('0.0.0.0', 1237, 5)
self.server.add_map(ACCEPT, self.handle_accept)
self.client = create_client('0.0.0.0', 1237)
def handle_accept(self, server, ssock):
print('Accepted:', ssock)
lose(ssock)
lose(self.server)
die()
def test_accept(self):
core.gear.mainloop()
class TestSockWriter(unittest.TestCase):
def setUp(self):
self.server = create_server('0.0.0.0', 1238, 5)
self.server.add_map(ACCEPT, self.handle_accept)
self.client = create_client('0.0.0.0', 1238)
self.client.add_map(CONNECT, self.handle_connect)
self.sent = b''
self.ssock = None
def handle_load(self, ssock, data):
self.sent = self.sent + data
def handle_accept(self, server, ssock):
self.ssock = ssock
ssock.add_map(LOAD, self.handle_load)
def handle_done(self, ssock, data):
print('Received bytes from:', ssock)
lose(ssock)
lose(self.server)
die()
def handle_connect(self, client):
client.dump(b'abc' * 100)
client.add_map(DUMPED, self.handle_dumped)
def handle_dumped(self, client):
print('Sent from:', client)
self.ssock.add_map(LOAD, self.handle_done)
def test_accept(self):
core.gear.mainloop()
self.assertEqual(self.sent, b'abc' * 100)
class TestSockReader(unittest.TestCase):
def setUp(self):
self.server = create_server('0.0.0.0', 1239, 5)
self.server.add_map(ACCEPT, self.handle_accept)
self.client = create_client('0.0.0.0', 1239)
self.client.add_map(LOAD, self.handle_load)
self.sent = b''
self.ssock = None
def handle_load(self, client, data):
self.sent = self.sent + data
def handle_accept(self, server, ssock):
self.ssock = ssock
ssock.dump(b'abc' * 100)
ssock.add_map(DUMPED, self.handle_dumped)
def handle_dumped(self, ssock):
print('Sent from:', ssock)
self.client.add_map(LOAD, self.handle_done)
def handle_done(self, client, data):
print('Received bytes from:', self.client)
lose(client)
lose(self.server)
die()
def test_accept(self):
core.gear.mainloop()
class TestSockAccUntil(unittest.TestCase):
def setUp(self):
self.server = create_server('0.0.0.0', 1241, 5)
self.server.add_map(ACCEPT, self.handle_accept)
self.client = create_client('0.0.0.0', 1241)
self.client.add_map(CONNECT, self.handle_connect)
def handle_done(self, client, a, b):
self.assertEqual(a, b'abc' * 100)
self.assertEqual(b, b'efg' * 100)
lose(client)
lose(self.server)
die()
def handle_accept(self, server, ssock):
ssock.dump(b'abc' * 100 + b'\r\n\r\n' + b'efg'* 100)
def handle_connect(self, client):
acc = AccUntil(client)
acc.start()
client.add_map(AccUntil.DONE, self.handle_done)
def test_accept(self):
core.gear.mainloop()
class TestTerminator(unittest.TestCase):
def setUp(self):
self.server = create_server('0.0.0.0', 1240, 5)
self.server.add_map(ACCEPT, self.handle_accept)
self.client = create_client('0.0.0.0', 1240)
self.client.add_map(CONNECT, self.handle_connect)
self.sent = b''
self.ssock = None
def handle_found(self, client, data):
self.sent = self.sent + data
if self.sent == b'abc' * 100:
self.terminate()
def terminate(self):
lose(self.ssock)
lose(self.server)
die()
def handle_accept(self, server, ssock):
self.ssock = ssock
ssock.dump(b'abc\r\n' * 100)
def handle_connect(self, client):
Terminator(client)
client.add_map(Terminator.FOUND, self.handle_found)
def test_accept(self):
core.gear.mainloop()
class TestTmpFile(unittest.TestCase):
def setUp(self):
pass
class HandleWakeup:
def update(self):
die()
class TestWaker(unittest.TestCase):
def setUp(self):
self.handlewakeup = HandleWakeup()
core.gear.pool.append(self.handlewakeup)
thread = Thread(target=self.do_job)
thread.start()
def do_job(self):
time.sleep(1)
waker.wake_up()
def test_wakeup(self):
core.gear.mainloop()
# Remove the object off the pool otherwise it will
# just break the mainloop in the other test cases.
core.gear.pool.remove(self.handlewakeup)
class TestFileReader(unittest.TestCase):
def setUp(self):
pass
class TestFileWriter(unittest.TestCase):
def setUp(self):
pass
class TestTimer(unittest.TestCase):
def setUp(self):
self.count = 0
for ind in range(0, 100):
timer = Timer(2, self.handle_timeout, ind)
timer = Timer(3, die)
def handle_timeout(self, value):
self.count += 1
def test_timer(self):
core.gear.mainloop()
self.assertEqual(self.count, 100)
class TestSched(unittest.TestCase):
def setUp(self):
pass
class TestSockWriterSSL(unittest.TestCase):
def setUp(self):
pass
class TestSockReaderSSL(unittest.TestCase):
def setUp(self):
pass
class TestSockReaderSSL(unittest.TestCase):
def setUp(self):
pass
class TestSuperSocket(unittest.TestCase):
def setUp(self):
pass
class TestDevice(unittest.TestCase):
def setUp(self):
pass
if __name__ == '__main__':
unittest.main()