Skip to content

Commit

Permalink
Fix Remote input file download slowdown submission rate #1206
Browse files Browse the repository at this point in the history
  • Loading branch information
pditommaso committed Jul 9, 2019
1 parent 1477485 commit b710e21
Show file tree
Hide file tree
Showing 18 changed files with 335 additions and 396 deletions.
27 changes: 22 additions & 5 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Expand Up @@ -23,6 +23,7 @@ import java.nio.file.Paths
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

import com.google.common.hash.HashCode
import com.upplication.s3fs.S3OutputStream
Expand Down Expand Up @@ -66,6 +67,7 @@ import nextflow.trace.TraceRecord
import nextflow.trace.WebLogObserver
import nextflow.trace.WorkflowStats
import nextflow.util.Barrier
import nextflow.util.BlockingThreadExecutorFactory
import nextflow.util.ConfigHelper
import nextflow.util.Duration
import nextflow.util.HistoryFile
Expand Down Expand Up @@ -606,7 +608,7 @@ class Session implements ISession {
}

protected Map<String,Path> findBinEntries(Path path) {
def result = new HashMap<String,Path>()
def result = new LinkedHashMap(10)
path
.listFiles { file -> Files.isExecutable(file) }
.each { Path file -> result.put(file.name,file) }
Expand Down Expand Up @@ -1220,7 +1222,7 @@ class Session implements ISession {
* @return The value of tasks to handle in parallel
*/
@Memoized
public int getQueueSize( String execName, int defValue ) {
int getQueueSize( String execName, int defValue ) {
getExecConfigProp(execName, 'queueSize', defValue) as int
}

Expand All @@ -1232,7 +1234,7 @@ class Session implements ISession {
* @return A {@code Duration} object. Default '1 second'
*/
@Memoized
public Duration getPollInterval( String execName, Duration defValue = Duration.of('1sec') ) {
Duration getPollInterval( String execName, Duration defValue = Duration.of('1sec') ) {
getExecConfigProp( execName, 'pollInterval', defValue ) as Duration
}

Expand All @@ -1245,7 +1247,7 @@ class Session implements ISession {
* @return A {@code Duration} object. Default '90 second'
*/
@Memoized
public Duration getExitReadTimeout( String execName, Duration defValue = Duration.of('90sec') ) {
Duration getExitReadTimeout( String execName, Duration defValue = Duration.of('90sec') ) {
getExecConfigProp( execName, 'exitReadTimeout', defValue ) as Duration
}

Expand All @@ -1269,7 +1271,7 @@ class Session implements ISession {
* @return A {@code Duration} object. Default '1 minute'
*/
@Memoized
public Duration getQueueStatInterval( String execName, Duration defValue = Duration.of('1min') ) {
Duration getQueueStatInterval( String execName, Duration defValue = Duration.of('1min') ) {
getExecConfigProp(execName, 'queueStatInterval', defValue) as Duration
}

Expand Down Expand Up @@ -1299,4 +1301,19 @@ class Session implements ISession {
ansiLogObserver ? ansiLogObserver.appendInfo(file.text) : Files.copy(file, System.out)
}

@Memoized // <-- this guarantees that the same executor is used across different publish dir in the same session
@CompileStatic
synchronized ExecutorService getFileTransferThreadPool() {
final result = new BlockingThreadExecutorFactory()
.withName('FileTransfer')
.withMaxThreads( Runtime.runtime.availableProcessors()*3 )
.create()

this.onShutdown {
result.shutdown()
result.awaitTermination(36, TimeUnit.HOURS)
}

return result
}
}
Expand Up @@ -20,7 +20,6 @@ import java.nio.file.Files
import java.nio.file.Path

import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.container.ContainerBuilder
Expand Down Expand Up @@ -140,12 +139,6 @@ class BashWrapperBuilder {
systemOsName.startsWith('Mac')
}

// memoize the result to avoid to multiple time the same input files
@Memoized
protected Map<String,Path> getResolvedInputs() {
copyStrategy.resolveForeignFiles(inputFiles)
}

@PackageScope String buildNew0() {
final template = BashWrapperBuilder.class.getResourceAsStream('command-run.txt')
try {
Expand Down Expand Up @@ -228,7 +221,7 @@ class BashWrapperBuilder {
/*
* staging input files when required
*/
final stagingScript = copyStrategy.getStageInputFilesScript(resolvedInputs)
final stagingScript = copyStrategy.getStageInputFilesScript(inputFiles)
binding.stage_inputs = stagingScript ? "# stage input files\n${stagingScript}" : null

binding.stdout_file = TaskRun.CMD_OUTFILE
Expand Down Expand Up @@ -432,7 +425,7 @@ class BashWrapperBuilder {
*/
// do not mount inputs when they are copied in the task work dir -- see #1105
if( stageInMode != 'copy' )
builder.addMountForInputs(resolvedInputs)
builder.addMountForInputs(inputFiles)

builder.addMount(binDir)

Expand Down
Expand Up @@ -28,16 +28,6 @@ interface ScriptFileCopyStrategy {
*/
String getBeforeStartScript()

/**
* Resolve all foreign file to storing then in the pipeline working directory
*
* @param inputFiles A map of <file name, path location> of files. It can hold both local and foreign files
* @return
* A map <file name, path location> of files in which all foreign files have been replaced
* by local copies in the working directory
*/
Map<String,Path> resolveForeignFiles(Map<String,Path> inputFiles)

/**
* @param inputFiles All the input files as a map of <stage name, store path> pairs
* @return A BASH snippet included in the wrapper script that stages the task input files
Expand Down
Expand Up @@ -93,14 +93,14 @@ class SimpleFileCopyStrategy implements ScriptFileCopyStrategy {
* @return
* A map containing remote file paths resolved as local paths
*/
@Override
Map<String,Path> resolveForeignFiles(Map<String,Path> files) {
if( !files )
return files
if( !porter )
porter = ((Session)Global.session).getFilePorter()
porter.stageForeignFiles(files, getStagingDir())
}
// @Override
// Map<String,Path> resolveForeignFiles(Map<String,Path> files) {
// if( !files )
// return files
// if( !porter )
// porter = ((Session)Global.session).getFilePorter()
// porter.stageForeignFiles(files, getStagingDir())
// }

protected Path getStagingDir() {
final result = workDir.parent?.parent?.resolve('stage')
Expand Down

0 comments on commit b710e21

Please sign in to comment.