/
target_csv.py
executable file
·113 lines (90 loc) · 3.54 KB
/
target_csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env python3
import argparse
import io
import os
import sys
import json
import csv
from jsonschema import validate
import singer
logger = singer.get_logger()
def emit_state(state):
if state is not None:
line = json.dumps(state)
logger.debug('Emitting state {}'.format(line))
sys.stdout.write("{}\n".format(line))
sys.stdout.flush()
def persist_lines(delimiter, quotechar, lines):
state = None
schemas = {}
key_properties = {}
headers = {}
for line in lines:
try:
o = json.loads(line)
except json.decoder.JSONDecodeError:
logger.error("Unable to parse:\n{}".format(line))
raise
if 'type' not in o:
raise Exception("Line is missing required key 'type': {}".format(line))
t = o['type']
if t == 'RECORD':
if 'stream' not in o:
raise Exception("Line is missing required key 'stream': {}".format(line))
if o['stream'] not in schemas:
raise Exception("A record for stream {} was encountered before a corresponding schema".format(o['stream']))
schema = schemas[o['stream']]
validate(o['record'], schema)
filename = o['stream'] + '.csv'
file_is_empty = (not os.path.isfile(filename)) or os.stat(filename).st_size == 0
if o['stream'] not in headers and not file_is_empty:
with open(filename, 'r') as csvfile:
reader = csv.reader(csvfile,
delimiter=delimiter,
quotechar=quotechar)
first_line = next(reader)
headers[o['stream']] = first_line if first_line else o['record'].keys()
else:
headers[o['stream']] = o['record'].keys()
with open(filename, 'a') as csvfile:
writer = csv.DictWriter(csvfile,
headers[o['stream']],
delimiter=delimiter,
quotechar=quotechar)
if file_is_empty:
writer.writeheader()
writer.writerow(o['record'])
state = None
elif t == 'STATE':
logger.debug('Setting state to {}'.format(o['value']))
state = o['value']
elif t == 'SCHEMA':
if 'stream' not in o:
raise Exception("Line is missing required key 'stream': {}".format(line))
stream = o['stream']
schemas[stream] = o['schema']
if 'key_properties' not in o:
raise Exception("key_properties field is required")
key_properties[stream] = o['key_properties']
else:
raise Exception("Unknown message type {} in message {}"
.format(o['type'], o))
return state
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', help='Config file')
args = parser.parse_args()
if args.config:
with open(args.config) as input:
config = json.load(input)
else:
config = {}
input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
state = None
state = persist_lines(config.get('delimiter', ','),
config.get('quotechar', '"'),
input)
emit_state(state)
logger.debug("Exiting normally")
if __name__ == '__main__':
main()