From c9c7032c2e34132cf721ffabfea09d893adf3761 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Tue, 26 Mar 2024 14:53:43 -0500 Subject: [PATCH] Add retry strategy for publishing (#4839) Signed-off-by: Ben Sherman Signed-off-by: Paolo Di Tommaso Co-authored-by: Paolo Di Tommaso --- docs/config.md | 26 +++++++++ docs/process.md | 5 +- .../nextflow/processor/PublishDir.groovy | 41 ++++++++++++-- .../processor/PublishRetryConfig.groovy | 53 +++++++++++++++++++ .../nextflow/processor/PublishDirTest.groovy | 6 ++- .../processor/PublishRetryConfigTest.groovy | 45 ++++++++++++++++ 6 files changed, 169 insertions(+), 7 deletions(-) create mode 100644 modules/nextflow/src/main/groovy/nextflow/processor/PublishRetryConfig.groovy create mode 100644 modules/nextflow/src/test/groovy/nextflow/processor/PublishRetryConfigTest.groovy diff --git a/docs/config.md b/docs/config.md index c46cfbd85a..fd254b953e 100644 --- a/docs/config.md +++ b/docs/config.md @@ -1224,6 +1224,32 @@ manifest { Read the {ref}`sharing-page` page to learn how to publish your pipeline to GitHub, BitBucket or GitLab. +(config-nextflow)= + +### Scope `nextflow` + +The `nextflow` scope provides configuration options for the Nextflow runtime. + +`nextflow.publish.retryPolicy.delay` +: :::{versionadded} 24.03.0-edge + ::: +: Delay when retrying a failed publish operation (default: `350ms`). + +`nextflow.publish.retryPolicy.jitter` +: :::{versionadded} 24.03.0-edge + ::: +: Jitter value when retrying a failed publish operation (default: `0.25`). + +`nextflow.publish.retryPolicy.maxAttempt` +: :::{versionadded} 24.03.0-edge + ::: +: Max attempts when retrying a failed publish operation (default: `5`). + +`nextflow.publish.retryPolicy.maxDelay` +: :::{versionadded} 24.03.0-edge + ::: +: Max delay when retrying a failed publish operation (default: `90s`). + (config-notification)= ### Scope `notification` diff --git a/docs/process.md b/docs/process.md index d619a713b3..c700706e9d 100644 --- a/docs/process.md +++ b/docs/process.md @@ -2219,7 +2219,10 @@ Available options: : Enable or disable the publish rule depending on the boolean value specified (default: `true`). `failOnError` -: When `true` abort the execution if some file can't be published to the specified target directory or bucket for any cause (default: `false`) +: :::{versionchanged} 24.03.0-edge + The default value was change from `false` to `true` + ::: +: When `true` abort the execution if some file can't be published to the specified target directory or bucket for any cause (default: `true`) `mode` : The file publishing method. Can be one of the following values: diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 6fe0bdd7a3..6c7aed3735 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -24,9 +24,14 @@ import java.nio.file.LinkOption import java.nio.file.NoSuchFileException import java.nio.file.Path import java.nio.file.PathMatcher +import java.time.temporal.ChronoUnit import java.util.concurrent.ExecutorService import java.util.regex.Pattern +import dev.failsafe.Failsafe +import dev.failsafe.RetryPolicy +import dev.failsafe.event.EventListener +import dev.failsafe.event.ExecutionAttemptedEvent import groovy.transform.CompileDynamic import groovy.transform.CompileStatic import groovy.transform.EqualsAndHashCode @@ -95,7 +100,7 @@ class PublishDir { /** * Trow an exception in case publish fails */ - boolean failOnError = false + boolean failOnError = true /** * Tags to be associated to the target file @@ -114,6 +119,8 @@ class PublishDir { */ private String storageClass + private PublishRetryConfig retryConfig + private PathMatcher matcher private FileSystem sourceFileSystem @@ -220,6 +227,9 @@ class PublishDir { protected void apply0(Set files) { assert path + final retryOpts = session.config.navigate('nextflow.publish.retryPolicy') as Map ?: Collections.emptyMap() + this.retryConfig = new PublishRetryConfig(retryOpts) + createPublishDir() validatePublishMode() @@ -359,14 +369,37 @@ class PublishDir { protected void safeProcessFile(Path source, Path target) { try { - processFile(source, target) + retryableProcessFile(source, target) } catch( Throwable e ) { - log.warn "Failed to publish file: ${source.toUriString()}; to: ${target.toUriString()} [${mode.toString().toLowerCase()}] -- See log file for details", e - if( NF.strictMode || failOnError){ + final msg = "Failed to publish file: ${source.toUriString()}; to: ${target.toUriString()} [${mode.toString().toLowerCase()}] -- See log file for details" + final shouldFail = NF.strictMode || failOnError + if( shouldFail ) { + log.error(msg,e) session?.abort(e) } + else + log.warn(msg,e) + } + } + + protected void retryableProcessFile(Path source, Path target) { + final listener = new EventListener() { + @Override + void accept(ExecutionAttemptedEvent event) throws Throwable { + log.debug "Failed to publish file: ${source.toUriString()}; to: ${target.toUriString()} [${mode.toString().toLowerCase()}] -- attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}" + } } + final retryPolicy = RetryPolicy.builder() + .handle(Exception) + .withBackoff(retryConfig.delay.toMillis(), retryConfig.maxDelay.toMillis(), ChronoUnit.MILLIS) + .withMaxAttempts(retryConfig.maxAttempts) + .withJitter(retryConfig.jitter) + .onRetry(listener) + .build() + Failsafe + .with( retryPolicy ) + .get({it-> processFile(source, target)}) } protected void processFile( Path source, Path destination ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishRetryConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishRetryConfig.groovy new file mode 100644 index 0000000000..860a849678 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishRetryConfig.groovy @@ -0,0 +1,53 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.processor + +import groovy.transform.CompileStatic +import groovy.transform.EqualsAndHashCode +import groovy.transform.ToString +import nextflow.util.Duration + +/** + * Models retry policy configuration for publishing outputs + * + * @author Ben Sherman + */ +@ToString(includePackage = false, includeNames = true) +@EqualsAndHashCode +@CompileStatic +class PublishRetryConfig { + Duration delay = Duration.of('350ms') + Duration maxDelay = Duration.of('90s') + int maxAttempts = 5 + double jitter = 0.25 + + PublishRetryConfig() { + this(Collections.emptyMap()) + } + + PublishRetryConfig(Map config) { + if( config.delay ) + delay = config.delay as Duration + if( config.maxDelay ) + maxDelay = config.maxDelay as Duration + if( config.maxAttempts ) + maxAttempts = config.maxAttempts as int + if( config.jitter ) + jitter = config.jitter as double + } +} diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy index 92d6d4f778..74ba70740f 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy @@ -30,6 +30,10 @@ import test.TestHelper */ class PublishDirTest extends Specification { + def setup() { + Global.session = Mock(Session) { getConfig()>>[:] } + } + def 'should create a publish dir obj'() { PublishDir publish @@ -116,8 +120,6 @@ class PublishDirTest extends Specification { def 'should create symlinks for output files' () { given: - Global.session = Mock(Session) { getConfig()>>[:] } - and: def folder = Files.createTempDirectory('nxf') folder.resolve('work-dir').mkdir() folder.resolve('work-dir/file1.txt').text = 'aaa' diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/PublishRetryConfigTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/PublishRetryConfigTest.groovy new file mode 100644 index 0000000000..bac3fec500 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/processor/PublishRetryConfigTest.groovy @@ -0,0 +1,45 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.processor + +import nextflow.util.Duration +import spock.lang.Specification + +/** + * + * @author Ben Sherman + */ +class PublishRetryConfigTest extends Specification { + + def 'should create retry config' () { + + expect: + new PublishRetryConfig().delay == Duration.of('350ms') + new PublishRetryConfig().maxDelay == Duration.of('90s') + new PublishRetryConfig().maxAttempts == 5 + new PublishRetryConfig().jitter == 0.25d + + and: + new PublishRetryConfig([maxAttempts: 20]).maxAttempts == 20 + new PublishRetryConfig([delay: '1s']).delay == Duration.of('1s') + new PublishRetryConfig([maxDelay: '1m']).maxDelay == Duration.of('1m') + new PublishRetryConfig([jitter: '0.5']).jitter == 0.5d + + } + +}