-
Notifications
You must be signed in to change notification settings - Fork 44
/
test_write.py
128 lines (112 loc) · 4.25 KB
/
test_write.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import datetime
import os
import shutil
from unittest import TestCase
from dateutil.tz import tzlocal
from pysparkling import Context, Row
from pysparkling.sql.session import SparkSession
from pysparkling.sql.utils import AnalysisException
spark = SparkSession(Context())
def get_folder_content(folder_path):
folder_content = {}
for root, _, files in os.walk(folder_path):
relative_path = root[len(folder_path):]
for file in files:
file_path = os.path.join(root, file)
with open(file_path, 'r') as file_content:
folder_content[os.path.join(relative_path, file)] = file_content.readlines()
return folder_content
class DataFrameWriterTests(TestCase):
maxDiff = None
@staticmethod
def clean():
if os.path.exists(".tmp"):
shutil.rmtree(".tmp")
def setUp(self):
self.clean()
def tearDown(self):
self.clean()
def test_write_to_csv(self):
df = spark.createDataFrame(
[Row(age=2, name='Alice', time=datetime.datetime(2017, 1, 1, tzinfo=tzlocal()), ),
Row(age=5, name='Bob', time=datetime.datetime(2014, 3, 2, tzinfo=tzlocal()))]
)
df.write.csv(".tmp/wonderland/")
self.assertDictEqual(
get_folder_content(".tmp/wonderland"),
{
'_SUCCESS': [],
'part-00000-8447389540241120843.csv': [
'2,Alice,2017-01-01T00:00:00.000+01:00\n',
'5,Bob,2014-03-02T00:00:00.000+01:00\n'
]
}
)
def test_write_to_csv_with_custom_options(self):
df = spark.createDataFrame(
[
Row(age=2, name='Alice', occupation=None),
Row(age=5, name='Bob', occupation=""),
]
)
df.write.csv(".tmp/wonderland/", sep="^", emptyValue="", nullValue="null", header=True)
self.assertDictEqual(
get_folder_content(".tmp/wonderland"),
{'_SUCCESS': [],
'part-00000-4061950540148431296.csv': [
'age^name^occupation\n',
'2^Alice^null\n',
'5^Bob^\n'
]}
)
def test_write_to_csv_fail_when_overwrite(self):
df = spark.createDataFrame(
[Row(age=2, name='Alice'),
Row(age=5, name='Bob')]
)
df.write.csv(".tmp/wonderland/")
with self.assertRaises(AnalysisException) as ctx:
df.write.csv(".tmp/wonderland/")
self.assertEqual(ctx.exception.args[0], 'path .tmp/wonderland already exists.;')
self.assertDictEqual(
get_folder_content(".tmp/wonderland"),
{'_SUCCESS': [],
'part-00000-3434325560268771971.csv': [
'2,Alice\n',
'5,Bob\n'
]}
)
def test_write_to_json(self):
df = spark.createDataFrame(
[Row(age=2, name='Alice', time=datetime.datetime(2017, 1, 1, tzinfo=tzlocal()), ),
Row(age=5, name='Bob', time=datetime.datetime(2014, 3, 2, tzinfo=tzlocal()))]
)
df.write.json(".tmp/wonderland/")
self.assertDictEqual(
get_folder_content(".tmp/wonderland"),
{'_SUCCESS': [],
'part-00000-8447389540241120843.json': [
'{"age":2,"name":"Alice","time":"2017-01-01T00:00:00.000+01:00"}\n',
'{"age":5,"name":"Bob","time":"2014-03-02T00:00:00.000+01:00"}\n'
]}
)
def test_write_nested_rows_to_json(self):
df = spark.createDataFrame(
[Row(age=2, name='Alice', animals=[
Row(name="Chessur", type="cat"),
Row(name="The White Rabbit", type="Rabbit")
]),
Row(age=5, name='Bob', animals=[])]
)
df.write.json(".tmp/wonderland/")
self.assertDictEqual(
get_folder_content(".tmp/wonderland"),
{'_SUCCESS': [],
'part-00000-2819354714706678872.json': [
'{"age":2,"animals":['
'{"name":"Chessur","type":"cat"},'
'{"name":"The White Rabbit","type":"Rabbit"}'
'],"name":"Alice"}\n',
'{"age":5,"animals":[],"name":"Bob"}\n'
]}
)