Skip to content

Commit

Permalink
Fix execution hang when native task fail to submit #2060
Browse files Browse the repository at this point in the history
  • Loading branch information
pditommaso committed Apr 25, 2021
1 parent e1eebdc commit 1253485
Showing 1 changed file with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,11 @@ class TaskPollingMonitor implements TaskMonitor {
// check all running tasks for termination
checkAllTasks(tasks)

if( (session.isTerminated() && runningQueue.size()==0 && pendingQueue.size()==0) || session.isAborted() ) {
final shouldBreak
= (session.isTerminated() && runningQueue.size()==0 && pendingQueue.size()==0)
|| (session.isCancelled() && runningQueue.size()==0) // cancel is set when error 'finish' is set, therefore tasks in the pending queue should not be taken in consideration
|| session.isAborted()
if( shouldBreak ) {
break
}

Expand Down Expand Up @@ -546,26 +550,24 @@ class TaskPollingMonitor implements TaskMonitor {

int count = 0
def itr = pendingQueue.iterator()
while( itr.hasNext() ) {
while( itr.hasNext() && session.isSuccess() ) {
final handler = itr.next()
submitRateLimit?.acquire()
try {
submitRateLimit?.acquire()

if( !canSubmit(handler) )
continue

if( session.isSuccess() ) {
itr.remove(); count++ // <-- remove the task in all cases
handler.incProcessForks()
submit(handler)
}
else
break
count++
handler.incProcessForks()
submit(handler)
}
catch ( Throwable e ) {
handleException(handler, e)
session.notifyTaskComplete(handler)
}
// remove processed handler either on successful submit or failed one (managed by catch section)
// when `canSubmit` return false the handler should be retained to be tried in a following iteration
itr.remove()
}

return count
Expand Down

0 comments on commit 1253485

Please sign in to comment.