/
torque.py
129 lines (113 loc) · 4.01 KB
/
torque.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
try:
import xml.etree.cElementTree as et
except:
import xml.etree.ElementTree as et
try:
from galaxy.model import Job
job_states = Job.states
except ImportError:
# Not in Galaxy, map Galaxy job states to LWR ones.
from galaxy.util import enum
job_states = enum(RUNNING='running', OK='complete', QUEUED='queued')
from ..job import BaseJobExec
from ...job_script import job_script
from ...env import env_to_statement
__all__ = ('Torque',)
from logging import getLogger
log = getLogger(__name__)
argmap = {'destination': '-q',
'Execution_Time': '-a',
'Account_Name': '-A',
'Checkpoint': '-c',
'Error_Path': '-e',
'Group_List': '-g',
'Hold_Types': '-h',
'Join_Paths': '-j',
'Keep_Files': '-k',
'Resource_List': '-l',
'Mail_Points': '-m',
'Mail_Users': '-M',
'Job_Name': '-N',
'Output_Path': '-o',
'Priority': '-p',
'Rerunable': '-r',
'Shell_Path_List': '-S',
'job_array_request': '-t',
'User_List': '-u',
'Variable_List': '-v'}
class Torque(BaseJobExec):
def __init__(self, **params):
self.params = {}
for k, v in params.items():
self.params[k] = v
def get_job_template(self, ofile, efile, job_name, working_directory, command_line, ecfile, env=[]):
pbsargs = {'-o': ofile,
'-e': efile,
'-N': job_name}
for k, v in self.params.items():
if k == 'plugin':
continue
try:
if not k.startswith('-'):
k = argmap[k]
pbsargs[k] = v
except:
log.warning('Unrecognized long argument passed to Torque CLI plugin: %s' % k)
template_pbsargs = ''
for k, v in pbsargs.items():
template_pbsargs += '#PBS %s %s\n' % (k, v)
template_env = {
'headers': template_pbsargs,
'working_directory': working_directory,
'env_setup_commands': map(env_to_statement, env),
'exit_code_path': ecfile,
'command': command_line,
}
return job_script(**template_env)
def submit(self, script_file):
return 'qsub %s' % script_file
def delete(self, job_id):
return 'qdel %s' % job_id
def get_status(self, job_ids=None):
return 'qstat -x'
def get_single_status(self, job_id):
return 'qstat -f %s' % job_id
def parse_status(self, status, job_ids):
# in case there's noise in the output, find the big blob 'o xml
tree = None
rval = {}
for line in status.strip().splitlines():
try:
tree = et.fromstring(line.strip())
assert tree.tag == 'Data'
break
except Exception:
tree = None
if tree is None:
log.warning('No valid qstat XML return from `qstat -x`, got the following: %s' % status)
return None
else:
for job in tree.findall('Job'):
id = job.find('Job_Id').text
if id in job_ids:
state = job.find('job_state').text
# map PBS job states to Galaxy job states.
rval[id] = self.__get_job_state(state)
return rval
def parse_single_status(self, status, job_id):
for line in status.splitlines():
line = line.split(' = ')
if line[0] == 'job_state':
return self.__get_job_state(line[1].strip())
# no state found, job has exited
return job_states.OK
def __get_job_state(self, state):
try:
return {
'E': job_states.RUNNING,
'R': job_states.RUNNING,
'Q': job_states.QUEUED,
'C': job_states.OK
}.get(state)
except KeyError:
raise KeyError("Failed to map torque status code [%s] to job state." % state)