-
Notifications
You must be signed in to change notification settings - Fork 77
/
runpipe
executable file
·88 lines (70 loc) · 2.31 KB
/
runpipe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import (
absolute_import, division, print_function, unicode_literals)
import requests
import sys
sys.path.append('../riko')
from os import path as p
from importlib import import_module
try:
# python 3.3+
from importlib.machinery import SourceFileLoader
except ImportError:
try:
# python 3.4+
from importlib.util import spec_from_file_location, module_from_spec
except ImportError:
# python 2.7-
from imp import load_source as load_file
io_error = IOError
else:
io_error = FileNotFoundError
def load_file(name, src):
location = 'examples/%s.py' % src
spec = spec_from_file_location(name, location)
module = module_from_spec(spec)
spec.loader.exec_module(module)
return module
else:
io_error = FileNotFoundError
load_file = lambda name, src: SourceFileLoader(name, src).load_module()
from argparse import RawTextHelpFormatter, ArgumentParser
from riko.bado import react
parser = ArgumentParser(
description='description: Runs a riko pipe', prog='runpipe',
usage='%(prog)s [pipeid]', formatter_class=RawTextHelpFormatter)
parser.add_argument(
dest='pipeid', nargs='?', default=sys.stdin,
help='The pipe to run (default: reads from stdin).')
parser.add_argument(
'-a', '--async', dest='isasync', action='store_true', default=False,
help="Load async pipe.\n\n")
parser.add_argument(
'-t', '--test', action='store_true', default=False,
help="Run in test mode (uses default inputs).\n\n")
args = parser.parse_args()
def file2name(path):
return p.splitext(p.basename(path))[0]
def run():
"""CLI runner"""
try:
pipeid = args.pipeid.read()
except AttributeError:
pipeid = args.pipeid
try:
name = file2name('%s.py' % pipeid)
module = load_file(name, pipeid)
except io_error:
try:
module = import_module('examples.%s' % pipeid)
except ImportError:
exit('Pipe examples.%s not found!' % pipeid)
if args.isasync:
pipeline = getattr(module, 'async_pipe')
react(pipeline, [args.test])
else:
pipeline = getattr(module, 'pipe')
pipeline(test=args.test)
if __name__ == "__main__":
run()