-
Notifications
You must be signed in to change notification settings - Fork 44
/
test_streaming_files.py
74 lines (56 loc) · 1.91 KB
/
test_streaming_files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import tornado.testing
import pysparkling
class TextFile(tornado.testing.AsyncTestCase):
def test_connect(self):
sc = pysparkling.Context()
ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
result = []
(
ssc.textFileStream('LICENS*', process_all=True)
.count()
.foreachRDD(lambda rdd: result.append(rdd.collect()[0]))
)
ssc.start()
ssc.awaitTermination(timeout=0.3)
self.assertEqual(sum(result), 44)
def test_save(self):
sc = pysparkling.Context()
ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
(
ssc.textFileStream('LICENS*')
.count()
.saveAsTextFiles('tests/textout/')
)
def test_save_gz(self):
sc = pysparkling.Context()
ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
(
ssc.textFileStream('LICENS*')
.count()
.saveAsTextFiles('tests/textout/', suffix='.gz')
)
class BinaryFile(tornado.testing.AsyncTestCase):
def test_read_file(self):
sc = pysparkling.Context()
ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
result = []
(
ssc.fileBinaryStream('LICENS*', process_all=True)
.count()
.foreachRDD(lambda rdd: result.append(rdd.collect()[0]))
)
ssc.start()
ssc.awaitTermination(timeout=0.3)
self.assertEqual(sum(result), 1)
def test_read_chunks(self):
sc = pysparkling.Context()
ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
result = []
(
ssc.fileBinaryStream('LICENS*', recordLength=40, process_all=True)
.count()
.foreachRDD(lambda rdd: result.append(rdd.collect()[0]))
)
ssc.start()
ssc.awaitTermination(timeout=0.3)
self.assertEqual(sum(result), 54)