-
Notifications
You must be signed in to change notification settings - Fork 44
/
test_streaming_tcp.py
88 lines (71 loc) · 2.53 KB
/
test_streaming_tcp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from __future__ import print_function
from collections import Counter
from contextlib import closing
import struct
import tornado.gen
import tornado.tcpclient
import tornado.testing
import pysparkling
class TCPTextTest(tornado.testing.AsyncTestCase):
@tornado.gen.coroutine
def client(self):
client = tornado.tcpclient.TCPClient()
for v in range(20):
stream = yield client.connect('127.0.0.1', 8123)
with closing(stream):
stream.write('a = {}\n'.format(v).encode('utf8'))
client.close()
def test_connect(self):
sc = pysparkling.Context()
ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
counter = Counter()
(
ssc.socketTextStream('127.0.0.1', 8123)
.foreachRDD(lambda rdd:
counter.update(''.join(rdd.collect()))
if rdd.collect() else None)
)
self.client()
ssc.start()
ssc.awaitTermination(timeout=0.3)
self.assertEqual(counter['a'], 20)
class TCPBinaryFixedLengthTest(tornado.testing.AsyncTestCase):
@tornado.gen.coroutine
def client(self):
client = tornado.tcpclient.TCPClient()
stream = yield client.connect('127.0.0.1', 8124)
with closing(stream):
stream.write(b'hello')
client.close()
def test_main(self):
sc = pysparkling.Context()
ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
counter = Counter()
(
ssc.socketBinaryStream('127.0.0.1', 8124, length=5)
.foreachRDD(lambda rdd: counter.update(rdd.collect()))
)
self.client()
ssc.start()
ssc.awaitTermination(timeout=0.3)
self.assertEqual(counter[b'hello'], 1)
class TCPBinaryUIntLengthTest(tornado.testing.AsyncTestCase):
@tornado.gen.coroutine
def client(self):
client = tornado.tcpclient.TCPClient()
stream = yield client.connect('127.0.0.1', 8125)
with closing(stream):
stream.write(struct.pack('<I', 10) + b'hellohello')
client.close()
def test_main(self):
sc = pysparkling.Context()
ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
counter = Counter()
(
ssc.socketBinaryStream('127.0.0.1', 8125, length='<I')
.foreachRDD(lambda rdd: counter.update(rdd.collect()))
)
self.client()
ssc.start()
ssc.awaitTermination(timeout=0.3)
self.assertEqual(counter[b'hellohello'], 1)