/
test_source.py
114 lines (91 loc) · 3.25 KB
/
test_source.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division
import sys
import os
from petl.compat import PY3
from petl.test.helpers import ieq, eq_
from petl.io.avro import fromavro, toavro
from petl.io.csv import fromcsv, tocsv
from petl.util.vis import look
from petl.io.source.smb import _parse_smb_url
# region Codec test cases
try:
import s3fs
except ImportError as e:
print('SKIP S3 helper tests: %s' % e, file=sys.stderr)
else:
def test_helper_s3():
_write_read_from_url('PETL_S3_URL', "export PETL_S3_URL='s3://mybucket/path/folder'")
try:
import smbclient
except ImportError as e:
print('SKIP SMB helper tests: %s' % e, file=sys.stderr)
else:
def test_helper_smb():
_write_read_from_url('PETL_SMB_URL', "export PETL_SMB_URL='smb://DOMAIN;myuserID:mypassword@host/share'")
def test_helper_smb_url_parse():
url = r'smb://workgroup;user:password@server:444/share/folder/file.csv'
domain, host, port, user, passwd, server_path = _parse_smb_url(url)
print("Parsed:", domain, host, port, user, passwd, server_path)
eq_(domain, r'workgroup')
eq_(host, r'server')
eq_(port, 444)
eq_(user, r'user')
eq_(passwd, r'password')
eq_(server_path, "\\\\server\\share\\folder\\file.csv")
# endregion
# region Execution
def _write_read_from_url(env_var_name, example):
base_url = os.getenv(env_var_name, 'skip')
csv_url = os.path.join(base_url, 'filename1.csv')
gzc_url = os.path.join(base_url, 'filename3.csv.gz')
gza_url = os.path.join(base_url, 'filename4.avro.gz')
avr_url = os.path.join(base_url, 'filename2.avro')
_table = ( (u'name', u'friends', u'age'),
(u'Bob', '42', '33'),
(u'Jim', '13', '69'),
(u'Joe', '86', '17'),
(u'Ted', '23', '51'))
_show__rows_from("Expected:", _table)
has_avro = _test_avro_too()
if base_url == 'skip':
m = "# Skipping test because env var '{}' is not defined. Try this:\n$ {}"
msg = m.format(env_var_name, example)
print(msg)
return
tocsv(_table, csv_url, encoding='ascii', lineterminator='\n')
if PY3:
tocsv(_table, gzc_url, encoding='ascii', lineterminator='\n')
if has_avro:
toavro(_table, avr_url)
if PY3:
toavro(_table, gza_url)
csv_actual = fromcsv(csv_url, encoding='ascii')
if PY3:
gzp_actual = fromcsv(gzc_url, encoding='ascii')
if has_avro:
avr_actual = fromavro(avr_url)
if PY3:
gza_actual = fromavro(gza_url)
_show__rows_from("Actual:", csv_url)
ieq(_table, csv_actual)
ieq(_table, csv_actual) # verify can iterate twice
if PY3:
ieq(_table, gzp_actual)
ieq(_table, gzp_actual) # verify can iterate twice
if has_avro:
ieq(_table, avr_actual)
ieq(_table, avr_actual) # verify can iterate twice
if PY3:
ieq(_table, gza_actual)
ieq(_table, gza_actual) # verify can iterate twice
def _show__rows_from(label, test_rows, limit=0):
print(label)
print(look(test_rows, limit=limit))
def _test_avro_too():
try:
import fastavro
return True
except:
return False
# endregion