Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import nextflow.util.Escape

import static java.nio.file.StandardOpenOption.*

import nextflow.util.MemoryUnit

/**
* Builder to create the Bash script which is used to
* wrap and launch the user task
Expand All @@ -48,10 +50,12 @@ import static java.nio.file.StandardOpenOption.*
@CompileStatic
class BashWrapperBuilder {

private static MemoryUnit DEFAULT_STAGE_FILE_THRESHOLD = MemoryUnit.of('1 MB')
private static int DEFAULT_WRITE_BACK_OFF_BASE = 3
private static int DEFAULT_WRITE_BACK_OFF_DELAY = 250
private static int DEFAULT_WRITE_MAX_ATTEMPTS = 5

private MemoryUnit stageFileThreshold = SysEnv.get('NXF_WRAPPER_STAGE_FILE_THRESHOLD') as MemoryUnit ?: DEFAULT_STAGE_FILE_THRESHOLD
private int writeBackOffBase = SysEnv.get('NXF_WRAPPER_BACK_OFF_BASE') as Integer ?: DEFAULT_WRITE_BACK_OFF_BASE
private int writeBackOffDelay = SysEnv.get('NXF_WRAPPER_BACK_OFF_DELAY') as Integer ?: DEFAULT_WRITE_BACK_OFF_DELAY
private int writeMaxAttempts = SysEnv.get('NXF_WRAPPER_MAX_ATTEMPTS') as Integer ?: DEFAULT_WRITE_MAX_ATTEMPTS
Expand Down Expand Up @@ -107,6 +111,10 @@ class BashWrapperBuilder {

private Path wrapperFile

private Path stageFile

private String stageScript

private BashTemplateEngine engine = new BashTemplateEngine()

BashWrapperBuilder( TaskRun task ) {
Expand Down Expand Up @@ -184,7 +192,20 @@ class BashWrapperBuilder {
}
result.toString()
}


protected String stageCommand(String stagingScript) {
if( !stagingScript )
return null

final header = "# stage input files\n"
if( stagingScript.size() >= stageFileThreshold.bytes ) {
stageScript = stagingScript
return header + "bash ${stageFile}"
}
else
return header + stagingScript
}

protected Map<String,String> makeBinding() {
/*
* initialise command files
Expand All @@ -194,6 +215,7 @@ class BashWrapperBuilder {
startedFile = workDir.resolve(TaskRun.CMD_START)
exitedFile = workDir.resolve(TaskRun.CMD_EXIT)
wrapperFile = workDir.resolve(TaskRun.CMD_RUN)
stageFile = workDir.resolve(TaskRun.CMD_STAGE)

// set true when running with through a container engine
runWithContainer = containerEnabled && !containerNative
Expand Down Expand Up @@ -275,7 +297,7 @@ class BashWrapperBuilder {
* staging input files when required
*/
final stagingScript = copyStrategy.getStageInputFilesScript(inputFiles)
binding.stage_inputs = stagingScript ? "# stage input files\n${stagingScript}" : null
binding.stage_inputs = stageCommand(stagingScript)

binding.stdout_file = TaskRun.CMD_OUTFILE
binding.stderr_file = TaskRun.CMD_ERRFILE
Expand Down Expand Up @@ -340,6 +362,8 @@ class BashWrapperBuilder {
write0(targetScriptFile(), script)
if( input != null )
write0(targetInputFile(), input.toString())
if( stageScript != null )
write0(targetStageFile(), stageScript)
return result
}

Expand All @@ -349,11 +373,16 @@ class BashWrapperBuilder {

protected Path targetInputFile() { return inputFile }

protected Path targetStageFile() { return stageFile }

private Path write0(Path path, String data) {
int attempt=0
while( true ) {
try {
return Files.write(path, data.getBytes(), CREATE,WRITE,TRUNCATE_EXISTING)
try (BufferedWriter writer=Files.newBufferedWriter(path, CREATE,WRITE,TRUNCATE_EXISTING)) {
writer.write(data)
}
return path
}
catch (FileSystemException | SocketException | RuntimeException e) {
final isLocalFS = path.getFileSystem()==FileSystems.default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ class TaskRun implements Cloneable {
static final public String CMD_EXIT = '.exitcode'
static final public String CMD_START = '.command.begin'
static final public String CMD_RUN = '.command.run'
static final public String CMD_STAGE = '.command.stage'
static final public String CMD_TRACE = '.command.trace'
static final public String CMD_ENV = '.command.env'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.nio.file.Path
import java.nio.file.Paths

import nextflow.Session
import nextflow.SysEnv
import nextflow.container.ContainerConfig
import nextflow.container.DockerBuilder
import nextflow.container.SingularityBuilder
Expand Down Expand Up @@ -48,6 +49,8 @@ class BashWrapperBuilderTest extends Specification {
bean.workDir = Paths.get('/work/dir')
if( !bean.script )
bean.script = 'echo Hello world!'
if( !bean.containsKey('inputFiles') )
bean.inputFiles = [:]
new BashWrapperBuilder(bean as TaskBean) {
@Override
protected String getSecretsEnv() {
Expand Down Expand Up @@ -362,7 +365,17 @@ class BashWrapperBuilderTest extends Specification {

given:
def folder = Paths.get('/work/dir')
def inputs = ['sample_1.fq':Paths.get('/some/data/sample_1.fq'), 'sample_2.fq':Paths.get('/some/data/sample_2.fq'), ]
def inputs = [
'sample_1.fq': Paths.get('/some/data/sample_1.fq'),
'sample_2.fq': Paths.get('/some/data/sample_2.fq'),
]
def stageScript = '''\
# stage input files
rm -f sample_1.fq
rm -f sample_2.fq
ln -s /some/data/sample_1.fq sample_1.fq
ln -s /some/data/sample_2.fq sample_2.fq
'''.stripIndent().rightTrim()

when:
def binding = newBashWrapperBuilder([
Expand All @@ -371,15 +384,44 @@ class BashWrapperBuilderTest extends Specification {
inputFiles: inputs ]).makeBinding()

then:
binding.stage_inputs == '''\
# stage input files
binding.stage_inputs == stageScript
}

def 'should stage inputs to external file' () {
given:
SysEnv.push([NXF_WRAPPER_STAGE_FILE_THRESHOLD: '100'])
and:
def folder = Files.createTempDirectory('test')
and:
def inputs = [
'sample_1.fq': Paths.get('/some/data/sample_1.fq'),
'sample_2.fq': Paths.get('/some/data/sample_2.fq'),
]
def stageScript = '''\
rm -f sample_1.fq
rm -f sample_2.fq
ln -s /some/data/sample_1.fq sample_1.fq
ln -s /some/data/sample_2.fq sample_2.fq
'''.stripIndent().rightTrim()
and:
def builder = newBashWrapperBuilder([
workDir: folder,
targetDir: folder,
inputFiles: inputs ])

when:
def binding = builder.makeBinding()
then:
binding.stage_inputs == "# stage input files\nbash ${folder}/.command.stage"

when:
builder.build()
then:
folder.resolve('.command.stage').text == stageScript

cleanup:
SysEnv.pop()
folder?.deleteDir()
}

def 'should unstage outputs' () {
Expand Down Expand Up @@ -996,7 +1038,9 @@ class BashWrapperBuilderTest extends Specification {
def 'should include fix ownership command' () {

given:
def bean = Mock(TaskBean)
def bean = Mock(TaskBean) {
inputFiles >> [:]
}
def copy = Mock(ScriptFileCopyStrategy)
bean.workDir >> Paths.get('/work/dir')
and:
Expand Down
23 changes: 23 additions & 0 deletions tests/checks/stage-file.nf/.checks
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
set -e

#
# run normal mode
#
echo ''
NXF_WRAPPER_STAGE_FILE_THRESHOLD='100' $NXF_RUN | tee stdout
[[ `grep 'INFO' .nextflow.log | grep -c 'Submitted process'` == 2 ]] || false

TASK_DIR=`$NXF_CMD log last -F "process == 'bar'"`
[[ `cat $TASK_DIR/.command.stage | wc -l` -eq 19 ]] || false


#
# RESUME mode
#
echo ''
NXF_WRAPPER_STAGE_FILE_THRESHOLD='100' $NXF_RUN -resume | tee stdout
[[ `grep 'INFO' .nextflow.log | grep -c 'Cached process'` == 2 ]] || false

TASK_DIR=`$NXF_CMD log last -F "process == 'bar'"`
[[ `cat $TASK_DIR/.command.stage | wc -l` -eq 19 ]] || false

35 changes: 35 additions & 0 deletions tests/stage-file.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@

process foo {
input:
val n_files

output:
path '*.fastq'

script:
"""
for i in `seq 1 ${n_files}`; do
touch sample_\${i}.fastq
done
"""
}

process bar {
input:
path '*.fastq'

output:
stdout

script:
"""
for f in `ls *.fastq`; do
echo \$f
cat \$f
done
"""
}

workflow {
foo(10) | bar
}