Skip to content

Commit

Permalink
Add retry strategy on Bash wrapper write logic
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
pditommaso committed Mar 2, 2023
1 parent 025ff9d commit 1e38274
Showing 1 changed file with 29 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package nextflow.executor

import nextflow.SysEnv

import java.nio.file.FileSystemException
import java.nio.file.FileSystems
import java.nio.file.Files
import java.nio.file.Path

import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.SysEnv
import nextflow.container.ContainerBuilder
import nextflow.container.DockerBuilder
import nextflow.container.SingularityBuilder
Expand All @@ -36,6 +36,9 @@ import nextflow.processor.TaskProcessor
import nextflow.processor.TaskRun
import nextflow.secret.SecretsLoader
import nextflow.util.Escape

import static java.nio.file.StandardOpenOption.*

/**
* Builder to create the Bash script which is used to
* wrap and launch the user task
Expand All @@ -46,6 +49,14 @@ import nextflow.util.Escape
@CompileStatic
class BashWrapperBuilder {

private static int DEFAULT_WRITE_BACK_OFF_BASE = 3
private static int DEFAULT_WRITE_BACK_OFF_DELAY = 250
private static int DEFAULT_WRITE_MAX_ATTEMPTS = 5

private int writeBackOffBase = SysEnv.get('NXF_WRAPPER_BACK_OFF_BASE') as Integer ?: DEFAULT_WRITE_BACK_OFF_BASE
private int writeBackOffDelay = SysEnv.get('NXF_WRAPPER_BACK_OFF_DELAY') as Integer ?: DEFAULT_WRITE_BACK_OFF_DELAY
private int writeMaxAttempts = SysEnv.get('NXF_WRAPPER_MAX_ATTEMPTS') as Integer ?: DEFAULT_WRITE_MAX_ATTEMPTS

static final public KILL_CMD = '[[ "$pid" ]] && nxf_kill $pid'

static final private ENDL = '\n'
Expand Down Expand Up @@ -340,13 +351,22 @@ class BashWrapperBuilder {
protected Path targetInputFile() { return inputFile }

private Path write0(Path path, String data) {
try {
return Files.write(path, data.getBytes())
}
catch (FileSystemException e) {
// throw a ProcessStageException so that the error can be recovered
// via nextflow re-try mechanism
throw new ProcessException("Unable to create file ${path.toUriString()}", e)
int attempt=0
while( true ) {
try {
return Files.write(path, data.getBytes(), CREATE,WRITE,TRUNCATE_EXISTING)
}
catch (FileSystemException | SocketException | RuntimeException e) {
final isLocalFS = path.getFileSystem()==FileSystems.default
// the retry logic is needed for non-local file system such as S3.
// when the file is local fail without retrying
if( isLocalFS || ++attempt>=writeMaxAttempts )
throw new ProcessException("Unable to create file ${path.toUriString()}", e)
// use an exponential delay before making another attempt
final delay = (Math.pow(writeBackOffBase, attempt) as long) * writeBackOffDelay
log.debug "Unexpected error writing '${path.toUriString()}'; attempt: $attempt - cause: ${e.message}"
Thread.sleep(delay)
}
}
}

Expand Down

0 comments on commit 1e38274

Please sign in to comment.