Skip to content

Commit

Permalink
Improve FileTransfer await termination log #1206
Browse files Browse the repository at this point in the history
This implements a better logging when nextflow stops to await
the completion of ongoing file transters.

It prints a message in the console every 10 minutes reporting
the number of uploads to complete and a message in the log file
every minute with the same information.
  • Loading branch information
pditommaso committed Jul 9, 2019
1 parent b710e21 commit cb968da
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 8 deletions.
42 changes: 36 additions & 6 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Expand Up @@ -16,13 +16,16 @@

package nextflow

import static nextflow.Const.*

import java.lang.reflect.Method
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

import com.google.common.hash.HashCode
Expand Down Expand Up @@ -74,7 +77,6 @@ import nextflow.util.HistoryFile
import nextflow.util.NameGenerator
import sun.misc.Signal
import sun.misc.SignalHandler
import static nextflow.Const.S3_UPLOADER_CLASS
/**
* Holds the information on the current execution
*
Expand Down Expand Up @@ -1304,16 +1306,44 @@ class Session implements ISession {
@Memoized // <-- this guarantees that the same executor is used across different publish dir in the same session
@CompileStatic
synchronized ExecutorService getFileTransferThreadPool() {
final result = new BlockingThreadExecutorFactory()
final factory = new BlockingThreadExecutorFactory()
.withName('FileTransfer')
.withMaxThreads( Runtime.runtime.availableProcessors()*3 )
.create()
final pool = factory.create()

this.onShutdown {
result.shutdown()
result.awaitTermination(36, TimeUnit.HOURS)
final max = factory.maxAwait.millis
final t0 = System.currentTimeMillis()
// start shutdown process
pool.shutdown()
// wait for ongoing file transfer to complete
int count=0
while( true ) {
final terminated = pool.awaitTermination(5, TimeUnit.SECONDS)
if( terminated )
break

final delta = System.currentTimeMillis()-t0
if( delta > max ) {
log.warn "Exiting before FileTransfer thread pool complete -- Some files maybe lost"
break
}

final p1 = ((ThreadPoolExecutor)pool)
final pending = p1.getTaskCount() - p1.getCompletedTaskCount()
// log to console every 10 minutes (120 * 5 sec)
if( count % 120 == 0 ) {
log.info1 "Waiting files transfer to complete (${pending} files)"
}
// log to the debug file every minute (12 * 5 sec)
else if( count % 12 == 0 ) {
log.debug "Waiting files transfer to complete (${pending} files)"
}
// increment the count
count++
}
}

return result
return pool
}
}
Expand Up @@ -44,11 +44,13 @@ class BlockingThreadExecutorFactory {
private static String DEFAULT_NAME = 'BlockingThreadExecutor'
private static int DEFAULT_POOL_SIZE = Runtime.runtime.availableProcessors()
private static Duration DEFAULT_KEEP_ALIVE = Duration.of('60sec')
private static Duration DEFAULT_MAX_AWAIT = Duration.of('36 hour')

String name
Integer maxThreads
Integer maxQueueSize
Duration keepAlive
Duration maxAwait

static ExecutorService create(String name) {
new BlockingThreadExecutorFactory().withName(name).create()
Expand Down Expand Up @@ -80,6 +82,7 @@ class BlockingThreadExecutorFactory {
maxQueueSize = session.config.navigate("threadPool.${name}.maxQueueSize") as Integer
keepAlive = session.config.navigate("threadPool.${name}.keepAlive") as Duration
maxThreads = session.config.navigate("threadPool.${name}.maxThreads") as Integer
maxAwait = session.config.navigate("threadPool.${name}.maxAwait") as Duration
if( !maxThreads )
maxThreads = (session.config.poolSize as Integer ?: 0) *3
}
Expand All @@ -90,6 +93,8 @@ class BlockingThreadExecutorFactory {
maxQueueSize = maxThreads *3
if( keepAlive == null )
keepAlive = DEFAULT_KEEP_ALIVE
if( maxAwait == null )
maxAwait = DEFAULT_MAX_AWAIT

log.debug "Thread pool name=$name; maxThreads=$maxThreads; maxQueueSize=$maxQueueSize; keepAlive=$keepAlive"
create0()
Expand Down
Expand Up @@ -32,7 +32,7 @@ import spock.lang.Specification
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@Slf4j
class BlockingTreadExecutorTest extends Specification {
class BlockingTreadExecutorFactoryTest extends Specification {

def 'should create thread pool with default config' () {
given:
Expand All @@ -45,6 +45,7 @@ class BlockingTreadExecutorTest extends Specification {
pool.maxThreads == CPUS
pool.maxQueueSize == CPUS * 3
pool.keepAlive == Duration.of('60 sec')
pool.maxAwait == Duration.of('36 hours')

}

Expand All @@ -68,7 +69,8 @@ class BlockingTreadExecutorTest extends Specification {
new Session([threadPool: [foo:[
maxThreads: 5,
maxQueueSize: 10,
keepAlive: '15 sec'
keepAlive: '15 sec',
maxAwait: '5 min'
]]])
def pool = new BlockingThreadExecutorFactory().withName('foo')

Expand All @@ -78,6 +80,7 @@ class BlockingTreadExecutorTest extends Specification {
pool.maxThreads == 5
pool.maxQueueSize == 10
pool.keepAlive == Duration.of('15 sec')
pool.maxAwait == Duration.of('5 min')
}

def 'should execute tasks' () {
Expand Down

0 comments on commit cb968da

Please sign in to comment.