Skip to content

Commit

Permalink
Add retry strategy for publishing (#4839)
Browse files Browse the repository at this point in the history

Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Co-authored-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
bentsherman and pditommaso committed Mar 26, 2024
1 parent 2ee4d11 commit c9c7032
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 7 deletions.
26 changes: 26 additions & 0 deletions docs/config.md
Expand Up @@ -1224,6 +1224,32 @@ manifest {

Read the {ref}`sharing-page` page to learn how to publish your pipeline to GitHub, BitBucket or GitLab.

(config-nextflow)=

### Scope `nextflow`

The `nextflow` scope provides configuration options for the Nextflow runtime.

`nextflow.publish.retryPolicy.delay`
: :::{versionadded} 24.03.0-edge
:::
: Delay when retrying a failed publish operation (default: `350ms`).

`nextflow.publish.retryPolicy.jitter`
: :::{versionadded} 24.03.0-edge
:::
: Jitter value when retrying a failed publish operation (default: `0.25`).

`nextflow.publish.retryPolicy.maxAttempt`
: :::{versionadded} 24.03.0-edge
:::
: Max attempts when retrying a failed publish operation (default: `5`).

`nextflow.publish.retryPolicy.maxDelay`
: :::{versionadded} 24.03.0-edge
:::
: Max delay when retrying a failed publish operation (default: `90s`).

(config-notification)=

### Scope `notification`
Expand Down
5 changes: 4 additions & 1 deletion docs/process.md
Expand Up @@ -2219,7 +2219,10 @@ Available options:
: Enable or disable the publish rule depending on the boolean value specified (default: `true`).

`failOnError`
: When `true` abort the execution if some file can't be published to the specified target directory or bucket for any cause (default: `false`)
: :::{versionchanged} 24.03.0-edge
The default value was change from `false` to `true`
:::
: When `true` abort the execution if some file can't be published to the specified target directory or bucket for any cause (default: `true`)

`mode`
: The file publishing method. Can be one of the following values:
Expand Down
Expand Up @@ -24,9 +24,14 @@ import java.nio.file.LinkOption
import java.nio.file.NoSuchFileException
import java.nio.file.Path
import java.nio.file.PathMatcher
import java.time.temporal.ChronoUnit
import java.util.concurrent.ExecutorService
import java.util.regex.Pattern

import dev.failsafe.Failsafe
import dev.failsafe.RetryPolicy
import dev.failsafe.event.EventListener
import dev.failsafe.event.ExecutionAttemptedEvent
import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
Expand Down Expand Up @@ -95,7 +100,7 @@ class PublishDir {
/**
* Trow an exception in case publish fails
*/
boolean failOnError = false
boolean failOnError = true

/**
* Tags to be associated to the target file
Expand All @@ -114,6 +119,8 @@ class PublishDir {
*/
private String storageClass

private PublishRetryConfig retryConfig

private PathMatcher matcher

private FileSystem sourceFileSystem
Expand Down Expand Up @@ -220,6 +227,9 @@ class PublishDir {
protected void apply0(Set<Path> files) {
assert path

final retryOpts = session.config.navigate('nextflow.publish.retryPolicy') as Map ?: Collections.emptyMap()
this.retryConfig = new PublishRetryConfig(retryOpts)

createPublishDir()
validatePublishMode()

Expand Down Expand Up @@ -359,14 +369,37 @@ class PublishDir {

protected void safeProcessFile(Path source, Path target) {
try {
processFile(source, target)
retryableProcessFile(source, target)
}
catch( Throwable e ) {
log.warn "Failed to publish file: ${source.toUriString()}; to: ${target.toUriString()} [${mode.toString().toLowerCase()}] -- See log file for details", e
if( NF.strictMode || failOnError){
final msg = "Failed to publish file: ${source.toUriString()}; to: ${target.toUriString()} [${mode.toString().toLowerCase()}] -- See log file for details"
final shouldFail = NF.strictMode || failOnError
if( shouldFail ) {
log.error(msg,e)
session?.abort(e)
}
else
log.warn(msg,e)
}
}

protected void retryableProcessFile(Path source, Path target) {
final listener = new EventListener<ExecutionAttemptedEvent>() {
@Override
void accept(ExecutionAttemptedEvent event) throws Throwable {
log.debug "Failed to publish file: ${source.toUriString()}; to: ${target.toUriString()} [${mode.toString().toLowerCase()}] -- attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}"
}
}
final retryPolicy = RetryPolicy.builder()
.handle(Exception)
.withBackoff(retryConfig.delay.toMillis(), retryConfig.maxDelay.toMillis(), ChronoUnit.MILLIS)
.withMaxAttempts(retryConfig.maxAttempts)
.withJitter(retryConfig.jitter)
.onRetry(listener)
.build()
Failsafe
.with( retryPolicy )
.get({it-> processFile(source, target)})
}

protected void processFile( Path source, Path destination ) {
Expand Down
@@ -0,0 +1,53 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.processor

import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import nextflow.util.Duration

/**
* Models retry policy configuration for publishing outputs
*
* @author Ben Sherman <bentshermann@gmail.com>
*/
@ToString(includePackage = false, includeNames = true)
@EqualsAndHashCode
@CompileStatic
class PublishRetryConfig {
Duration delay = Duration.of('350ms')
Duration maxDelay = Duration.of('90s')
int maxAttempts = 5
double jitter = 0.25

PublishRetryConfig() {
this(Collections.emptyMap())
}

PublishRetryConfig(Map config) {
if( config.delay )
delay = config.delay as Duration
if( config.maxDelay )
maxDelay = config.maxDelay as Duration
if( config.maxAttempts )
maxAttempts = config.maxAttempts as int
if( config.jitter )
jitter = config.jitter as double
}
}
Expand Up @@ -30,6 +30,10 @@ import test.TestHelper
*/
class PublishDirTest extends Specification {

def setup() {
Global.session = Mock(Session) { getConfig()>>[:] }
}

def 'should create a publish dir obj'() {

PublishDir publish
Expand Down Expand Up @@ -116,8 +120,6 @@ class PublishDirTest extends Specification {

def 'should create symlinks for output files' () {
given:
Global.session = Mock(Session) { getConfig()>>[:] }
and:
def folder = Files.createTempDirectory('nxf')
folder.resolve('work-dir').mkdir()
folder.resolve('work-dir/file1.txt').text = 'aaa'
Expand Down
@@ -0,0 +1,45 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.processor

import nextflow.util.Duration
import spock.lang.Specification

/**
*
* @author Ben Sherman <bentshermann@gmail.com>
*/
class PublishRetryConfigTest extends Specification {

def 'should create retry config' () {

expect:
new PublishRetryConfig().delay == Duration.of('350ms')
new PublishRetryConfig().maxDelay == Duration.of('90s')
new PublishRetryConfig().maxAttempts == 5
new PublishRetryConfig().jitter == 0.25d

and:
new PublishRetryConfig([maxAttempts: 20]).maxAttempts == 20
new PublishRetryConfig([delay: '1s']).delay == Duration.of('1s')
new PublishRetryConfig([maxDelay: '1m']).maxDelay == Duration.of('1m')
new PublishRetryConfig([jitter: '0.5']).jitter == 0.5d

}

}

0 comments on commit c9c7032

Please sign in to comment.