2525 import flux .job
2626 from flux .job import JobspecV1
2727except ImportError :
28- error = " no flux Python bindings found"
28+ error = ' no flux Python bindings found'
2929else :
3030 error = None
3131
32- waiting_states = [ " QUEUED" , " HELD" , " WAITING" , " PENDING" ]
32+ WAITING_STATES = ( ' QUEUED' , ' HELD' , ' WAITING' , ' PENDING' )
3333
3434
3535class _FluxJob (Job ):
3636 def __init__ (self , * args , ** kwargs ):
37- super ().__init__ (* args , ** kwargs )
38-
39- self ._cancelled = False
37+ '''Create the flux job (and future) to watch.'''
4038
41- # This is set by the scheduler when both the job's state is
42- # 'COMPLETED' and the job's stdout and stderr are written back
43- self ._completed = False
44- self .create_job ()
39+ super ().__init__ (* args , ** kwargs )
4540
46- def create_job (self ):
47- """
48- Create the flux job (and future) to watch.
49- """
5041 # Generate the flux job
5142 self .fluxjob = JobspecV1 .from_command (
52- command = [" /bin/bash" , self .script_filename ],
43+ command = [' /bin/bash' , self .script_filename ],
5344 num_tasks = self .num_tasks_per_core or 1 ,
5445 cores_per_task = self .num_cpus_per_task or 1 ,
5546 )
@@ -65,21 +56,17 @@ def create_job(self):
6556 self .fluxjob .cwd = os .path .abspath (self .workdir )
6657 self .fluxjob .environment = dict (os .environ )
6758
68- @property
69- def cancelled (self ):
70- return self ._cancelled
71-
7259 @property
7360 def completed (self ):
74- return self ._completed
61+ return not self .state in WAITING_STATES
7562
7663
77- @register_scheduler (" flux" , error = error )
64+ @register_scheduler (' flux' , error = error )
7865class FluxJobScheduler (JobScheduler ):
7966 def __init__ (self ):
8067 self ._fexecutor = flux .job .FluxExecutor ()
8168 self ._submit_timeout = rt .runtime ().get_option (
82- f" schedulers/@{ self .registered_name } /job_submit_timeout"
69+ f' schedulers/@{ self .registered_name } /job_submit_timeout'
8370 )
8471
8572 def emit_preamble (self , job ):
@@ -90,36 +77,24 @@ def make_job(self, *args, **kwargs):
9077 return _FluxJob (* args , ** kwargs )
9178
9279 def submit (self , job ):
93- """
94- Submit a job to the flux executor.
95- """
80+ '''Submit a job to the flux executor.'''
81+
9682 flux_future = self ._fexecutor .submit (job .fluxjob )
9783 job ._jobid = str (flux_future .jobid ())
9884 job ._submit_time = time .time ()
9985 job ._flux_future = flux_future
10086
10187 def cancel (self , job ):
102- """
103- Cancel a running Flux job.
104- """
88+ '''Cancel a running Flux job.'''
89+
10590 # Job future cannot cancel once running or completed
10691 if not job ._flux_future .cancel ():
10792 # This will raise JobException with event=cancel (on poll)
10893 flux .job .cancel (flux .Flux (), job ._flux_future .jobid ())
10994
110- # In testing, cancel is too "soft" - or often the pid is still remaining
111- # after. We thus do a cancel in addition to a kill.
112- try :
113- flux .job .kill (flux .Flux (), job ._flux_future .jobid ())
114-
115- # Job is inactive
116- except EnvironmentError :
117- pass
118-
11995 def poll (self , * jobs ):
120- """
121- Poll running Flux jobs for updated states.
122- """
96+ '''Poll running Flux jobs for updated states.'''
97+
12398 if jobs :
12499 # filter out non-jobs
125100 jobs = [job for job in jobs if job is not None ]
@@ -135,41 +110,43 @@ def poll(self, *jobs):
135110 exit_code = job ._flux_future .result (0 )
136111 except flux .job .JobException :
137112 # Currently the only state we see is cancelled here
138- self .log (f"Job { job .jobid } was likely cancelled." )
139- job ._state = "CANCELLED"
140- job ._cancelled = True
113+ self .log (f'Job { job .jobid } was likely cancelled.' )
114+ job ._state = 'CANCELLED'
141115 except RuntimeError :
142116 # Assume some runtime issue (suspended)
143- self .log (f" Job { job .jobid } was likely suspended." )
144- job ._state = " SUSPENDED"
117+ self .log (f' Job { job .jobid } was likely suspended.' )
118+ job ._state = ' SUSPENDED'
145119 else :
146120 # the job finished (but possibly with nonzero exit code)
147121 if exit_code != 0 :
148- self .log (f"Job { job .jobid } did not finish successfully" )
122+ self .log (
123+ f'Job { job .jobid } did not finish successfully'
124+ )
149125
150- job ._state = " COMPLETED"
126+ job ._state = ' COMPLETED'
151127
152128 job ._completed = True
153- elif job .state in waiting_states and job .max_pending_time :
129+ elif job .state in WAITING_STATES and job .max_pending_time :
154130 if time .time () - job .submit_time >= job .max_pending_time :
155131 self .cancel (job )
156132 job ._exception = JobError (
157- " maximum pending time exceeded" , job .jobid
133+ ' maximum pending time exceeded' , job .jobid
158134 )
159135 else :
160136 # Otherwise, we are still running
161- job ._state = " RUNNING"
137+ job ._state = ' RUNNING'
162138
163139 def allnodes (self ):
164- raise NotImplementedError (" flux backend does not support node listing" )
140+ raise NotImplementedError (' flux backend does not support node listing' )
165141
166142 def filternodes (self , job , nodes ):
167- raise NotImplementedError ("flux backend does not support node filtering" )
143+ raise NotImplementedError (
144+ 'flux backend does not support node filtering'
145+ )
168146
169147 def wait (self , job ):
170- """
171- Wait until a job is finished.
172- """
148+ '''Wait until a job is finished.'''
149+
173150 intervals = itertools .cycle ([1 , 2 , 3 ])
174151 while not self .finished (job ):
175152 self .poll (job )
@@ -178,4 +155,5 @@ def wait(self, job):
178155 def finished (self , job ):
179156 if job .exception :
180157 raise job .exception
158+
181159 return job .completed
0 commit comments