Skip to content

Commit

Permalink
Run task finalisation asynchronously (#4890)
Browse files Browse the repository at this point in the history
This change introduces the ability to run the task finalisation phase asynchronously via a dedicated 
thread pool to avoid large output files traversal stuck the polling monitor queue. 

This feature needs to be enabled by using the environment variable 

```
NXF_ENABLE_ASYNC_FINALIZER=true
```


Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Co-authored-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
bentsherman and pditommaso committed May 1, 2024
1 parent 361cef8 commit e0e9422
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 19 deletions.
27 changes: 21 additions & 6 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package nextflow

import static nextflow.Const.*

import java.nio.file.Files
import java.nio.file.Path
Expand Down Expand Up @@ -48,8 +47,6 @@ import nextflow.executor.ExecutorFactory
import nextflow.extension.CH
import nextflow.file.FileHelper
import nextflow.file.FilePorter
import nextflow.util.Threads
import nextflow.util.ThreadPoolManager
import nextflow.plugin.Plugins
import nextflow.processor.ErrorStrategy
import nextflow.processor.TaskFault
Expand All @@ -74,6 +71,8 @@ import nextflow.util.ConfigHelper
import nextflow.util.Duration
import nextflow.util.HistoryFile
import nextflow.util.NameGenerator
import nextflow.util.ThreadPoolManager
import nextflow.util.Threads
import nextflow.util.VersionNumber
import org.apache.commons.lang.exception.ExceptionUtils
import sun.misc.Signal
Expand Down Expand Up @@ -666,8 +665,9 @@ class Session implements ISession {
void destroy() {
try {
log.trace "Session > destroying"
// shutdown publish dir executor
publishPoolManager.shutdown(aborted)
// shutdown thread pools
finalizePoolManager?.shutdown(aborted)
publishPoolManager?.shutdown(aborted)
// invoke shutdown callbacks
shutdown0()
log.trace "Session > after cleanup"
Expand Down Expand Up @@ -1431,10 +1431,25 @@ class Session implements ISession {
ansiLogObserver ? ansiLogObserver.appendInfo(file.text) : Files.copy(file, System.out)
}

private ThreadPoolManager publishPoolManager = new ThreadPoolManager('PublishDir')
private volatile ThreadPoolManager finalizePoolManager

@Memoized
synchronized ExecutorService finalizeTaskExecutorService() {
finalizePoolManager = new ThreadPoolManager('FinalizeTask')
return finalizePoolManager
.withConfig(config)
.withShutdownMessage(
"Waiting for remaining tasks to complete (%d tasks)",
"Exiting before some tasks were completed"
)
.create()
}

private volatile ThreadPoolManager publishPoolManager

@Memoized
synchronized ExecutorService publishDirExecutorService() {
publishPoolManager = new ThreadPoolManager('PublishDir')
return publishPoolManager
.withConfig(config)
.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package nextflow.processor

import java.util.concurrent.ExecutorService
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Condition
Expand All @@ -26,6 +27,7 @@ import com.google.common.util.concurrent.RateLimiter
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.SysEnv
import nextflow.exception.ProcessSubmitTimeoutException
import nextflow.executor.BatchCleanup
import nextflow.executor.GridTaskHandler
Expand Down Expand Up @@ -111,6 +113,11 @@ class TaskPollingMonitor implements TaskMonitor {
*/
private RateLimiter submitRateLimit

@Lazy
private ExecutorService finalizerPool = { session.finalizeTaskExecutorService() }()

private boolean enableAsyncFinalizer = SysEnv.get('NXF_ENABLE_ASYNC_FINALIZER','false') as boolean

/**
* Create the task polling monitor with the provided named parameters object.
* <p>
Expand Down Expand Up @@ -627,18 +634,27 @@ class TaskPollingMonitor implements TaskMonitor {
handler.task.error = new ProcessSubmitTimeoutException("Task '${handler.task.lazyName()}' could not be submitted within specified 'maxAwait' time: ${handler.task.config.getMaxSubmitAwait()}")
}

// finalize the tasks execution
final fault = handler.task.processor.finalizeTask(handler.task)

// notify task completion
session.notifyTaskComplete(handler)

// abort the execution in case of task failure
if (fault instanceof TaskFault) {
session.fault(fault, handler)
// finalize the task asynchronously
if( enableAsyncFinalizer ) {
finalizerPool.submit( ()-> finalizeTask(handler) )
}
else {
finalizeTask(handler)
}
}
}

protected void finalizeTask( TaskHandler handler ) {
// finalize the task execution
final fault = handler.task.processor.finalizeTask(handler.task)

// notify task completion
session.notifyTaskComplete(handler)

// abort the execution in case of task failure
if( fault instanceof TaskFault ) {
session.fault(fault, handler)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ class ThreadPoolManager {
private Boolean allowThreadTimeout
private Duration maxAwait = DEFAULT_MAX_AWAIT
private ExecutorService executorService
final private String name
private String name
private String waitMsg = "Waiting for file transfers to complete (%d files)"
private String exitMsg = "Exiting before file transfers were completed -- Some files may be lost"

ThreadPoolManager(String name) {
this.name = name
Expand All @@ -72,6 +74,12 @@ class ThreadPoolManager {
return this
}

ThreadPoolManager withShutdownMessage(String waitMsg, String exitMsg) {
this.waitMsg = waitMsg
this.exitMsg = exitMsg
return this
}

ExecutorService create() {
if( minThreads>maxThreads ) {
log.debug("Thread pool '$name' minThreads ($minThreads) cannot be greater than maxThreads ($maxThreads) - Setting minThreads to $maxThreads")
Expand Down Expand Up @@ -116,9 +124,7 @@ class ThreadPoolManager {
}

executorService.shutdown()
// wait for ongoing file transfer to complete
final waitMsg = "Waiting for file transfers to complete (%d files)"
final exitMsg = "Exiting before FileTransfer thread pool complete -- Some files may be lost"
// wait for remaining threads to complete
ThreadPoolHelper.await(executorService, maxAwait, waitMsg, exitMsg)
log.debug "Thread pool '$name' shutdown completed (hard=$hard)"
}
Expand Down
8 changes: 8 additions & 0 deletions validation/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ if [[ $TEST_MODE == 'test_integration' ]]; then
$NXF_CMD run nextflow-io/rnaseq-nf -with-docker $OPTS
$NXF_CMD run nextflow-io/rnaseq-nf -with-docker $OPTS -resume

#
# RNASEQ-NF with async finalizer
#
echo nextflow-io/rnaseq-nf async finalizer
[[ $TOWER_ACCESS_TOKEN ]] && OPTS='-with-tower' || OPTS=''
NXF_ENABLE_ASYNC_FINALIZER=true $NXF_CMD run nextflow-io/rnaseq-nf -with-docker
NXF_ENABLE_ASYNC_FINALIZER=true $NXF_CMD run nextflow-io/rnaseq-nf -with-docker -resume

exit 0
fi

Expand Down

0 comments on commit e0e9422

Please sign in to comment.