-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix issue #2. Add singer message support #10
Changes from 3 commits
2cae0f5
b822dd9
2e3fe87
522d499
0dcb9fb
9cac4c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -88,4 +88,7 @@ ENV/ | |
# Rope project settings | ||
.ropeproject | ||
|
||
*~ | ||
*~ | ||
|
||
# VS Code project settings | ||
.vscode/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,8 +12,16 @@ like Excel or simply storing a backup of the source data set. | |
### Install and Run | ||
|
||
First, make sure Python 3 is installed on your system or follow these | ||
installation instructions for [Mac](python-mac) or | ||
[Ubuntu](python-ubuntu). | ||
installation instructions for [Mac] or | ||
[Ubuntu]. | ||
|
||
|
||
It's recommended to use a virtualenv: | ||
|
||
```bash | ||
> virtualenv -p python3 venv | ||
> source venv/bin/activate | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's do;
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that those are without the leading |
||
``` | ||
|
||
`target-csv` can be run with any [Singer Tap], but we'll use | ||
[`tap-fixerio`][Fixerio] - which pulls currency exchange rate data | ||
|
@@ -62,5 +70,5 @@ Copyright © 2017 Stitch | |
[Freshdesk]: https://github.com/singer-io/tap-freshdesk | ||
[Hubspot]: https://github.com/singer-io/tap-hubspot | ||
[Fixerio]: https://github.com/singer-io/tap-fixerio | ||
[python-mac]: http://docs.python-guide.org/en/latest/starting/install3/osx/ | ||
[python-ubuntu]: https://www.digitalocean.com/community/tutorials/how-to-install-python-3-and-set-up-a-local-programming-environment-on-ubuntu-16-04 | ||
[Mac]: http://docs.python-guide.org/en/latest/starting/install3/osx/ | ||
[Ubuntu]: https://www.digitalocean.com/community/tutorials/how-to-install-python-3-and-set-up-a-local-programming-environment-on-ubuntu-16-04 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,3 @@ | ||
#!/usr/bin/env python3 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to keep the hashbang since the file's executable, although it could be changed to |
||
|
||
import argparse | ||
import io | ||
import os | ||
|
@@ -11,8 +9,8 @@ | |
import urllib | ||
from datetime import datetime | ||
import collections | ||
|
||
import pkg_resources | ||
|
||
from jsonschema.validators import Draft4Validator | ||
import singer | ||
|
||
|
@@ -35,40 +33,34 @@ def flatten(d, parent_key='', sep='__'): | |
items.append((new_key, str(v) if type(v) is list else v)) | ||
return dict(items) | ||
|
||
def persist_lines(delimiter, quotechar, lines): | ||
def persist_messages(delimiter, quotechar, messages): | ||
state = None | ||
schemas = {} | ||
key_properties = {} | ||
headers = {} | ||
validators = {} | ||
|
||
now = datetime.now().strftime('%Y%m%dT%H%M%S') | ||
|
||
for line in lines: | ||
for message in messages: | ||
try: | ||
o = json.loads(line) | ||
o = singer.parse_message(message).asdict() | ||
except json.decoder.JSONDecodeError: | ||
logger.error("Unable to parse:\n{}".format(line)) | ||
logger.error("Unable to parse:\n{}".format(message)) | ||
raise | ||
|
||
if 'type' not in o: | ||
raise Exception("Line is missing required key 'type': {}".format(line)) | ||
t = o['type'] | ||
|
||
if t == 'RECORD': | ||
if 'stream' not in o: | ||
raise Exception("Line is missing required key 'stream': {}".format(line)) | ||
message_type = o['type'] | ||
if message_type == 'RECORD': | ||
if o['stream'] not in schemas: | ||
raise Exception("A record for stream {} was encountered before a corresponding schema".format(o['stream'])) | ||
raise Exception("A record for stream {}" | ||
"was encountered before a corresponding schema".format(o['stream'])) | ||
|
||
schema = schemas[o['stream']] | ||
validators[o['stream']].validate(o['record']) | ||
|
||
filename = o['stream'] + '-' + now + '.csv' | ||
file_is_empty = (not os.path.isfile(filename)) or os.stat(filename).st_size == 0 | ||
|
||
flattened_record = flatten(o['record']) | ||
|
||
if o['stream'] not in headers and not file_is_empty: | ||
with open(filename, 'r') as csvfile: | ||
reader = csv.reader(csvfile, | ||
|
@@ -78,35 +70,31 @@ def persist_lines(delimiter, quotechar, lines): | |
headers[o['stream']] = first_line if first_line else flattened_record.keys() | ||
else: | ||
headers[o['stream']] = flattened_record.keys() | ||
|
||
with open(filename, 'a') as csvfile: | ||
writer = csv.DictWriter(csvfile, | ||
writer = csv.DictWriter(csvfile, | ||
headers[o['stream']], | ||
extrasaction='ignore', | ||
delimiter=delimiter, | ||
quotechar=quotechar) | ||
if file_is_empty: | ||
writer.writeheader() | ||
writer.writerow(flattened_record) | ||
|
||
writer.writerow(flattened_record) | ||
|
||
state = None | ||
elif t == 'STATE': | ||
elif message_type == 'STATE': | ||
logger.debug('Setting state to {}'.format(o['value'])) | ||
state = o['value'] | ||
elif t == 'SCHEMA': | ||
if 'stream' not in o: | ||
raise Exception("Line is missing required key 'stream': {}".format(line)) | ||
elif message_type == 'SCHEMA': | ||
stream = o['stream'] | ||
schemas[stream] = o['schema'] | ||
validators[stream] = Draft4Validator(o['schema']) | ||
if 'key_properties' not in o: | ||
raise Exception("key_properties field is required") | ||
key_properties[stream] = o['key_properties'] | ||
else: | ||
raise Exception("Unknown message type {} in message {}" | ||
.format(o['type'], o)) | ||
|
||
return state | ||
|
||
|
||
|
@@ -135,8 +123,8 @@ def main(): | |
args = parser.parse_args() | ||
|
||
if args.config: | ||
with open(args.config) as input: | ||
config = json.load(input) | ||
with open(args.config) as input_json: | ||
config = json.load(input_json) | ||
else: | ||
config = {} | ||
|
||
|
@@ -146,11 +134,11 @@ def main(): | |
'the config parameter "disable_collection" to true') | ||
threading.Thread(target=send_usage_stats).start() | ||
|
||
input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') | ||
state = persist_lines(config.get('delimiter', ','), | ||
config.get('quotechar', '"'), | ||
input) | ||
input_messages = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') | ||
state = persist_messages(config.get('delimiter', ','), | ||
config.get('quotechar', '"'), | ||
input_messages) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you provide a gist showing that the output hasn't changed between this and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually if you install target-csv before a new tap, like tap-exchangeratesapi, it crashes, because target-csv now requires a specific outdated version of singer-python to work. I changed the code in setup.py to fix the problem. What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah. This comes up fairly regularly and I'm not sure exactly how to make it clearer. As is outlined in the Running and Developing guide, the intended way to run taps and targets is in separate virtual environments. This avoids issues exactly like you're describing. Our official stance on loose version requirements (as official as a comment on an issue can get!) shows our Java roots but we think ultimately yields a more sane development experience. :) So my recommendation would be to reverse the setup.py change and maybe link to that getting started guide? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, done! I think the development experience may be a bit more sane, but for newcomers to the library is very difficult to use. I mean, a lot of python users don't even know how to use a single virtualenv, imagine using two, one for each library. The end user just suffer more than it is necessary in my opinion. Just food for thought. Do you think it is necessary to show a gist? |
||
|
||
emit_state(state) | ||
logger.debug("Exiting normally") | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoa. Was not aware of this Markdown feature. Is this GFM or Markdown proper? (Linking to a named ref without the trailing
[]
).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually is just proper Markdown. Check this out https://github.com/adam-p/markdown-here/wiki/Markdown-Here-Cheatsheet#links