-
Notifications
You must be signed in to change notification settings - Fork 1
/
dv.py
executable file
·144 lines (124 loc) · 4.23 KB
/
dv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python3.4
from multiprocessing import Process, Queue
from threading import Thread
import asyncio
import signal
from types import GeneratorType
from os import path
import logging
from traceback import extract_tb
from server import start_agent_server
from job import JobEngine
from scheduler import Scheduler
from entity import entity, action, cmd
from visitor import visitor, join, spawn
from local import Local
from lsf import LSF
from gcfengine import GCFEngine
from utils import require, get_ns
from option import args_parse
from logger import logger
import event
from test import run_test, Test
server_p = None
def cleanup():
if server_p:
server_p.terminate()
JobEngine.cleanup()
logging.shutdown()
def handler(signum, frame):
cleanup()
exit(-1)
#signal.signal(signal.SIGINT, handler)
#signal.signal(signal.SIGTERM, handler)
def main():
global server_p
# parsing arguments
(opts, args) = args_parse()
in_q = Queue()
out_q = Queue()
logger.info('running dv.py')
# start agent server
#loop = asyncio.get_event_loop()
server_p = Process(target=start_agent_server, args=(in_q, out_q, path.abspath(opts.out_dir), opts.verbose,))
#server_p = Thread(target=start_agent_server, args=(loop, in_q, out_q,))
server_p.start()
try:
# waiting for server started
host, port = in_q.get()
#logger.info("agent server started on {}:{}".format(host, port))
# set gcf engine
if opts.gcf == 'local':
GCFEngine.set_imp(Local(host, port, path.abspath(opts.out_dir), opts.verbose))
else:
if opts.gcf == 'lsf':
GCFEngine.set_imp(LSF(host, port, path.abspath(opts.out_dir), opts.verbose))
else:
raise Exception('unsupported gcf engine {}'.format(opts.gcf))
# config job engine
JobEngine.connect(in_q, out_q)
JobEngine.out_dir = path.abspath(opts.out_dir)
logger.info('max agents = {}'.format(opts.max_agents))
JobEngine.max_cmds = opts.max_agents
# load files
require('loader')
if opts.patchfile:
for f in opts.patchfile:
require(f)
# evaluate experssions
@visitor
def top():
@join
def body(self):
if opts.expr:
for e in opts.expr:
@spawn(self)
def body(ee=e):
res = eval(ee, get_ns(), get_ns())
if type(res) == GeneratorType:
yield from res
return res
if opts.test:
@spawn(self)
def body():
res = run_test(*opts.test, action=opts.action, where=opts.where)
if type(res) == GeneratorType:
yield from res
return res
yield from body()
# run
while True:
JobEngine.run()
Scheduler.run()
if JobEngine.is_waiting() or Scheduler.is_waiting():
next
else:
break
for t in Test.test_status:
if Test.test_status[t] == 'passed':
logger.info("*** test '{}' passed".format(t))
else:
logger.error("*** test '{}' failed".format(t))
if top.exception:
def print_exception(e, indent=0):
if isinstance(e, Exception):
for l in extract_tb(e.__traceback__):
logger.debug((" "*indent)+str(l))
if not isinstance(e, Exception):
logger.error((" "*indent)+str(e))
return
for i in e.args:
if not isinstance(i, list):
i = [i]
for j in i:
print_exception(j, indent+2)
print_exception(top.exception)
logger.error('dv.py failed')
#raise top.exception
else:
logger.info('dv.py passed')
finally:
event.notify('dvpy_done')
cleanup()
if __name__ == '__main__':
main()