forked from wireservice/csvkit
/
test_cleanup.py
87 lines (78 loc) · 3.59 KB
/
test_cleanup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import unittest
from csvkit.cleanup import *
from csvkit.exceptions import LengthMismatchError
class TestCleanup(unittest.TestCase):
def test_fix_rows(self):
"""Test to ensure that row merging yields correct results"""
start = [['1', '2', '3',],
[''],
['abc'],
['4', '5']
]
fixed = join_rows(start)
self.assertEqual(4,len(fixed))
self.assertEqual(start[0][0],fixed[0])
self.assertEqual(start[0][1],fixed[1])
self.assertEqual(" ".join([start[0][-1], start[1][0], start[2][0], start[3][0]]),fixed[2])
self.assertEqual(start[3][1],fixed[3])
def test_fix_length_errors_basic(self):
expected_length = 4
errs = [LengthMismatchError(1,['alpha','beta','gam'],expected_length)]
errs.append(LengthMismatchError(2,['ma','delta'],expected_length))
fixed = fix_length_errors(errs,expected_length)
self.assertEqual(1,len(fixed))
fixed = fixed[0]
self.assertEqual('alpha',fixed[0])
self.assertEqual('beta',fixed[1])
self.assertEqual('gam ma',fixed[2])
self.assertEqual('delta',fixed[3])
def test_extract_joinable_row_errors(self):
e1 = LengthMismatchError(1,['foo', 'bar', 'baz'], 10)
e2 = LengthMismatchError(2,['foo', 'bar', 'baz'], 10)
e3 = LengthMismatchError(3,['foo', 'bar', 'baz'], 10)
errs = [e1, e2, e3]
joinable = extract_joinable_row_errors(errs)
self.assertEqual(3,len(joinable))
for e, j in zip(errs, joinable):
self.assertTrue(e is j)
def test_extract_joinable_row_errors_2(self):
e1 = LengthMismatchError(1,['foo', 'bar', 'baz'], 10)
e2 = CSVTestException(2,['foo', 'bar', 'baz'], "A throwaway message.")
e3 = LengthMismatchError(3,['foo', 'bar', 'baz'], 10)
errs = [e1, e2, e3]
joinable = extract_joinable_row_errors(errs)
self.assertEqual(1,len(joinable))
self.assertTrue(iter(joinable).next() is e3)
def test_extract_joinable_row_errors_3(self):
e1 = CSVTestException(1,['foo', 'bar', 'baz'], "A throwaway message.")
e2 = LengthMismatchError(2,['foo', 'bar', 'baz'], 10)
e3 = LengthMismatchError(3,['foo', 'bar', 'baz'], 10)
errs = [e1, e2, e3]
joinable = extract_joinable_row_errors(errs)
self.assertEqual(2,len(joinable))
joinable = list(joinable)
self.assertTrue(joinable[0] is e2)
self.assertTrue(joinable[1] is e3)
def test_extract_joinable_row_errors_4(self):
e1 = CSVTestException(1,['foo', 'bar', 'baz'], "A throwaway message.")
e2 = LengthMismatchError(2,['foo', 'bar', 'baz'], 10)
e3 = LengthMismatchError(4,['foo', 'bar', 'baz'], 10)
errs = [e1, e2, e3]
joinable = extract_joinable_row_errors(errs)
self.assertEqual(1,len(joinable))
self.assertTrue(iter(joinable).next() is e3)
def test_real_world_join_fail(self):
start = [['168772', '1102', '$0.23 TO $0.72', 'HOUR', '1.5%'],
['GROSS', '1.5% '],
['GROSS', '430938']]
fixed = join_rows(start)
self.assertEqual(7,len(fixed))
self.assertEqual(start[0][0],fixed[0])
self.assertEqual(start[0][1],fixed[1])
self.assertEqual(start[0][2],fixed[2])
self.assertEqual(start[0][3],fixed[3])
expected4 = " ".join([start[0][-1], start[1][0]])
self.assertEqual(expected4,fixed[4])
expected5 = " ".join([start[1][1],start[2][0]])
self.assertEqual(expected5,fixed[5])
self.assertEqual(start[2][1],fixed[6])