From 29d370838459492788b1e3b1159529d041129d39 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sat, 19 Sep 2020 23:14:07 +0200 Subject: [PATCH] Deduplicate GS file + parallel up/downloads --- .../GoogleLifeSciencesConfig.groovy | 18 ++- .../GoogleLifeSciencesExecutor.groovy | 2 - .../GoogleLifeSciencesFileCopyStrategy.groovy | 146 +++++++++++------- ...gleLifeSciencesFileCopyStrategyTest.groovy | 95 +++++------- .../google/lifesciences/bash-wrapper-gcp.txt | 90 ++++++++++- 5 files changed, 235 insertions(+), 116 deletions(-) diff --git a/modules/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesConfig.groovy b/modules/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesConfig.groovy index 2731e80c8f..6460725c13 100644 --- a/modules/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesConfig.groovy +++ b/modules/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesConfig.groovy @@ -25,7 +25,9 @@ import groovy.transform.Memoized import groovy.transform.ToString import groovy.util.logging.Slf4j import nextflow.Session +import nextflow.cloud.CloudTransferOptions import nextflow.exception.AbortOperationException +import nextflow.util.Duration import nextflow.util.MemoryUnit /** @@ -36,7 +38,7 @@ import nextflow.util.MemoryUnit @Slf4j @ToString(includePackage = false, includeNames = true) @CompileStatic -class GoogleLifeSciencesConfig { +class GoogleLifeSciencesConfig implements CloudTransferOptions { public final static String DEFAULT_COPY_IMAGE = 'google/cloud-sdk:alpine' @@ -60,6 +62,10 @@ class GoogleLifeSciencesConfig { boolean usePrivateAddress boolean enableRequesterPaysBuckets + int maxParallelTransfers = MAX_TRANSFER + int maxTransferAttempts = MAX_TRANSFER_ATTEMPTS + Duration delayBetweenAttempts = DEFAULT_DELAY_BETWEEN_ATTEMPTS + @Deprecated GoogleLifeSciencesConfig(String project, List zone, List region, Path remoteBinDir = null, boolean preemptible = false) { this.project = project @@ -117,6 +123,10 @@ class GoogleLifeSciencesConfig { final debugMode = config.navigate('google.lifeSciences.debug', System.getenv('NXF_DEBUG')) final privateAddr = config.navigate('google.lifeSciences.usePrivateAddress') as boolean final requesterPays = config.navigate('google.enableRequesterPaysBuckets') as boolean + // + final maxParallelTransfers = config.navigate('aws.batch.maxParallelTransfers', MAX_TRANSFER) as int + final maxTransferAttempts = config.navigate('aws.batch.maxTransferAttempts', MAX_TRANSFER_ATTEMPTS) as int + final delayBetweenAttempts = config.navigate('aws.batch.delayBetweenAttempts', DEFAULT_DELAY_BETWEEN_ATTEMPTS) as Duration def zones = (config.navigate("google.zone") as String)?.split(",")?.toList() ?: Collections.emptyList() def regions = (config.navigate("google.region") as String)?.split(",")?.toList() ?: Collections.emptyList() @@ -136,7 +146,11 @@ class GoogleLifeSciencesConfig { sshDaemon: sshDaemon, sshImage: sshImage, usePrivateAddress: privateAddr, - enableRequesterPaysBuckets: requesterPays) + enableRequesterPaysBuckets: requesterPays, + maxParallelTransfers: maxParallelTransfers, + maxTransferAttempts: maxTransferAttempts, + delayBetweenAttempts: delayBetweenAttempts + ) } static private Integer debugMode0(value) { diff --git a/modules/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesExecutor.groovy b/modules/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesExecutor.groovy index c6b75383f7..dcda450184 100644 --- a/modules/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesExecutor.groovy +++ b/modules/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesExecutor.groovy @@ -109,6 +109,4 @@ class GoogleLifeSciencesExecutor extends Executor { protected GoogleLifeSciencesConfig getConfig() { return config } - - } diff --git a/modules/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesFileCopyStrategy.groovy b/modules/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesFileCopyStrategy.groovy index af435d96c5..2382bddc73 100644 --- a/modules/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesFileCopyStrategy.groovy +++ b/modules/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesFileCopyStrategy.groovy @@ -17,22 +17,22 @@ package nextflow.cloud.google.lifesciences +import static nextflow.cloud.google.lifesciences.GoogleLifeSciencesHelper.* + import java.nio.file.Path -import java.nio.file.Paths import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import nextflow.cloud.CloudTransferOptions +import nextflow.executor.BashFunLib import nextflow.executor.SimpleFileCopyStrategy import nextflow.processor.TaskBean import nextflow.processor.TaskRun import nextflow.util.Escape -import static nextflow.cloud.google.lifesciences.GoogleLifeSciencesHelper.getLocalTaskDir -import static nextflow.cloud.google.lifesciences.GoogleLifeSciencesHelper.getRemoteTaskDir - - /** * Defines the file/script copy strategies for Google Pipelines. * + * @author Paolo Di Tommaso * @author Ólafur Haukur Flygenring */ @Slf4j @@ -50,59 +50,82 @@ class GoogleLifeSciencesFileCopyStrategy extends SimpleFileCopyStrategy { this.task = bean } + String getBeforeStartScript() { + def maxConnect = config.maxParallelTransfers ?: CloudTransferOptions.MAX_TRANSFER + def attempts = config.maxTransferAttempts ?: CloudTransferOptions.MAX_TRANSFER_ATTEMPTS + def delayBetweenAttempts = config.delayBetweenAttempts ?: CloudTransferOptions.DEFAULT_DELAY_BETWEEN_ATTEMPTS + + BashFunLib.body(maxConnect, attempts, delayBetweenAttempts) + + + ''' + # google storage helper + nxf_gs_download() { + local source=$1 + local target=$2 + local project=$3 + local basedir=$(dirname $2) + local ret + local opts="-m -q" + local opts + if [[ $project ]]; then + opts=('-q' '-m' '-u' "$project") + else + opts=('-q' '-m') + fi + + ## download assuming it's a file download + mkdir -p $basedir + ret=$(gsutil ${opts[@]} cp "$source" "$target" 2>&1) || { + ## if fails check if it was trying to download a directory + mkdir $target + gsutil ${opts[@]} cp -R "$source/*" "$target" || { + rm -rf $target + >&2 echo "Unable to download path: $source" + exit 1 + } + } + } + + nxf_gs_upload() { + local name=$1 + local target=$2 + gsutil -m -q cp -R "$name" "$target/$name" + } + '''.stripIndent() + } + @Override String getStageInputFilesScript(Map inputFiles) { - final localTaskDir = getLocalTaskDir(workDir) final remoteTaskDir = getRemoteTaskDir(workDir) - final createDirectories = [] final stagingCommands = [] - final gsutilPrefix = new StringBuilder() - gsutilPrefix.append("gsutil -m -q") + final reqPay = config.enableRequesterPaysBuckets ? config.project : '' - if(config.enableRequesterPaysBuckets) { - gsutilPrefix.append(" -u ${config.project}") - } + for( String target : inputFiles.keySet() ) { + final sourcePath = inputFiles.get(target) - for( String stageName : inputFiles.keySet() ) { - final storePath = inputFiles.get(stageName) - final storePathIsDir = storePath.isDirectory() - final stagePath = Paths.get(stageName) - final parent = stagePath.parent - final escapedStoreUri = Escape.uriPath(storePath) - final escapedStageName = Escape.path(stageName) - - //check if we need to create parent dirs for staging file since gsutil doesn't create them for us - if(parent) { - createDirectories << "mkdir -p $localTaskDir/${Escape.path(parent)}".toString() - } + final cmd = config.maxTransferAttempts > 1 + ? "downloads+=(\"nxf_cp_retry nxf_gs_download ${Escape.uriPath(sourcePath)} ${Escape.path(target)} ${reqPay}\")" + : "downloads+=(\"nxf_gs_download ${Escape.uriPath(sourcePath)} ${Escape.path(target)} ${reqPay}\")" - if(storePathIsDir) { - stagingCommands << "$gsutilPrefix cp -R $escapedStoreUri $localTaskDir".toString() - //check if we need to move the directory (gsutil doesn't support renaming directories on copy) - if(parent || !storePath.name.endsWith(stageName)) { - stagingCommands << "mv $localTaskDir/${Escape.path(storePath.name)} $localTaskDir/$escapedStageName".toString() - } - } else { - stagingCommands << "$gsutilPrefix cp $escapedStoreUri $localTaskDir/$escapedStageName".toString() - } + stagingCommands.add(cmd) } final result = new StringBuilder() // touch result.append("echo start | gsutil -q cp -c - ${remoteTaskDir}/${TaskRun.CMD_START}").append('\n') - // create directories - if( createDirectories ) - result.append(createDirectories.join('\n')) .append('\n') - // stage files - if( stagingCommands ) - result.append(stagingCommands.join('\n')) .append('\n') + if( stagingCommands ) { + result.append('downloads=()\n') + result.append(stagingCommands.join('\n')).append('\n') + result.append('nxf_parallel "${downloads[@]}"\n') + } // copy the remoteBinDir if it is defined if(config.remoteBinDir) { + final localTaskDir = getLocalTaskDir(workDir) result .append("mkdir -p $localTaskDir/nextflow-bin").append('\n') .append("gsutil -m -q cp -P -r ${Escape.uriPath(config.remoteBinDir)}/* $localTaskDir/nextflow-bin").append('\n') @@ -111,18 +134,39 @@ class GoogleLifeSciencesFileCopyStrategy extends SimpleFileCopyStrategy { result.toString() } + /** + * Trim-trailing-slash + * @return + */ + private String tts(String loc) { + while( log && loc.size()>1 && loc.endsWith('/') ) + loc = loc.substring(0,loc.length()-1) + return loc + } + @Override String getUnstageOutputFilesScript(List outputFiles, Path targetDir) { - final result = new StringBuilder() - - for( String it : outputFiles ) { - result - .append(copyMany(it, targetDir)) - .append('\n') - } - - return result.toString() + final patterns = normalizeGlobStarPaths(outputFiles) + // create a bash script that will copy the out file to the working directory + log.trace "[GLS] Unstaging file path: $patterns" + + if( !patterns ) + return null + + final escape = new ArrayList(outputFiles.size()) + for( String it : patterns ) + escape.add(tts(Escape.path(it))) + + """\ + uploads=() + IFS=\$'\\n' + for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do + uploads+=("nxf_gs_upload '\$name' ${Escape.uriPath(targetDir)}") + done + unset IFS + nxf_parallel "\${uploads[@]}" + """.stripIndent() } @@ -136,12 +180,6 @@ class GoogleLifeSciencesFileCopyStrategy extends SimpleFileCopyStrategy { "gsutil -m -q cp -R ${Escape.path(local)} ${Escape.uriPath(target)}" } - String copyMany(String local, Path target) { - if ( local.endsWith("/") ) - local = local.substring(0,local.length()-1) - "IFS=\$'\\n'; for name in \$(eval \"ls -1d ${Escape.path(local)}\" 2>/dev/null);do gsutil -m -q cp -R \$name ${Escape.uriPath(target)}/\$name; done; unset IFS" - } - /** * {@inheritDoc} */ diff --git a/modules/nf-google/src/test/nextflow/cloud/google/lifesciences/GoogleLifeSciencesFileCopyStrategyTest.groovy b/modules/nf-google/src/test/nextflow/cloud/google/lifesciences/GoogleLifeSciencesFileCopyStrategyTest.groovy index 1a50b36190..31fbe1bb9d 100644 --- a/modules/nf-google/src/test/nextflow/cloud/google/lifesciences/GoogleLifeSciencesFileCopyStrategyTest.groovy +++ b/modules/nf-google/src/test/nextflow/cloud/google/lifesciences/GoogleLifeSciencesFileCopyStrategyTest.groovy @@ -47,7 +47,9 @@ class GoogleLifeSciencesFileCopyStrategyTest extends GoogleSpecification { then: result == '''\ echo start | gsutil -q cp -c - gs://my-bucket/work/xx/yy/.command.begin - gsutil -m -q cp gs://my-bucket/bar/foo.txt /work/xx/yy/foo.txt + downloads=() + downloads+=("nxf_gs_download gs://my-bucket/bar/foo.txt foo.txt ") + nxf_parallel "${downloads[@]}" '''.stripIndent() // file with a different name @@ -57,7 +59,9 @@ class GoogleLifeSciencesFileCopyStrategyTest extends GoogleSpecification { then: result == '''\ echo start | gsutil -q cp -c - gs://my-bucket/work/xx/yy/.command.begin - gsutil -m -q cp gs://my-bucket/bar/foo.txt /work/xx/yy/hola.txt + downloads=() + downloads+=("nxf_gs_download gs://my-bucket/bar/foo.txt hola.txt ") + nxf_parallel "${downloads[@]}" '''.stripIndent() // file name with blanks @@ -67,7 +71,9 @@ class GoogleLifeSciencesFileCopyStrategyTest extends GoogleSpecification { then: result == '''\ echo start | gsutil -q cp -c - gs://my-bucket/work/xx/yy/.command.begin - gsutil -m -q cp gs://my-bucket/bar/f\\ o\\ o.txt /work/xx/yy/f\\ o\\ o.txt + downloads=() + downloads+=("nxf_gs_download gs://my-bucket/bar/f\\ o\\ o.txt f\\ o\\ o.txt ") + nxf_parallel "${downloads[@]}" '''.stripIndent() // file in a sub-directory @@ -77,30 +83,11 @@ class GoogleLifeSciencesFileCopyStrategyTest extends GoogleSpecification { then: result == '''\ echo start | gsutil -q cp -c - gs://my-bucket/work/xx/yy/.command.begin - mkdir -p /work/xx/yy/subdir - gsutil -m -q cp gs://my-bucket/bar/foo.txt /work/xx/yy/subdir/foo.txt + downloads=() + downloads+=("nxf_gs_download gs://my-bucket/bar/foo.txt subdir/foo.txt ") + nxf_parallel "${downloads[@]}" '''.stripIndent() - // file a directory name - when: - inputs = ['dir1': mockGsPath('gs://my-bucket/foo/dir1', true)] - result = strategy.getStageInputFilesScript(inputs) - then: - result == '''\ - echo start | gsutil -q cp -c - gs://my-bucket/work/xx/yy/.command.begin - gsutil -m -q cp -R gs://my-bucket/foo/dir1 /work/xx/yy - '''.stripIndent() - - // stage file is a directory with a different name - when: - inputs = ['dir2': mockGsPath('gs://my-bucket/foo/dir1', true)] - result = strategy.getStageInputFilesScript(inputs) - then: - result == '''\ - echo start | gsutil -q cp -c - gs://my-bucket/work/xx/yy/.command.begin - gsutil -m -q cp -R gs://my-bucket/foo/dir1 /work/xx/yy - mv /work/xx/yy/dir1 /work/xx/yy/dir2 - '''.stripIndent() } def 'create stage files using Requester Pays' () { @@ -123,7 +110,9 @@ class GoogleLifeSciencesFileCopyStrategyTest extends GoogleSpecification { then: result == '''\ echo start | gsutil -q cp -c - gs://my-bucket/work/xx/yy/.command.begin - gsutil -m -q -u foo cp gs://my-bucket/bar/foo.txt /work/xx/yy/foo.txt + downloads=() + downloads+=("nxf_gs_download gs://my-bucket/bar/foo.txt foo.txt foo") + nxf_parallel "${downloads[@]}" '''.stripIndent() // file a directory name @@ -133,7 +122,9 @@ class GoogleLifeSciencesFileCopyStrategyTest extends GoogleSpecification { then: result == '''\ echo start | gsutil -q cp -c - gs://my-bucket/work/xx/yy/.command.begin - gsutil -m -q -u foo cp -R gs://my-bucket/foo/dir1 /work/xx/yy + downloads=() + downloads+=("nxf_gs_download gs://my-bucket/foo/dir1 dir1 foo") + nxf_parallel "${downloads[@]}" '''.stripIndent() } @@ -153,18 +144,32 @@ class GoogleLifeSciencesFileCopyStrategyTest extends GoogleSpecification { def strategy = new GoogleLifeSciencesFileCopyStrategy(bean, handler) when: - def result = strategy.getUnstageOutputFilesScript(['foo.txt'], gsPath('gs://other/dir')) + def result = strategy.getUnstageOutputFilesScript(['foo.txt', 'dirx/'], gsPath('gs://other/dir')) then: - result == '''\ - IFS=$'\\n'; for name in $(eval "ls -1d foo.txt" 2>/dev/null);do gsutil -m -q cp -R $name gs://other/dir/\$name; done; unset IFS + result == + '''\ + uploads=() + IFS=$'\\n' + for name in $(eval "ls -1d foo.txt dirx" | sort | uniq); do + uploads+=("nxf_gs_upload '$name' gs://other/dir") + done + unset IFS + nxf_parallel "${uploads[@]}" ''' .stripIndent() when: result = strategy.getUnstageOutputFilesScript(['fo o.txt'], gsPath('gs://other/dir x/')) then: - result == '''\ - IFS=$'\\n'; for name in $(eval "ls -1d fo\\ o.txt" 2>/dev/null);do gsutil -m -q cp -R $name gs://other/dir\\ x/\$name; done; unset IFS + result == + '''\ + uploads=() + IFS=$'\\n' + for name in $(eval "ls -1d fo\\ o.txt" | sort | uniq); do + uploads+=("nxf_gs_upload '$name' gs://other/dir\\ x") + done + unset IFS + nxf_parallel "${uploads[@]}" ''' .stripIndent() @@ -230,29 +235,5 @@ class GoogleLifeSciencesFileCopyStrategyTest extends GoogleSpecification { '''.stripIndent() } - - def 'should validate copy many' () { - given: - def handler = Mock(GoogleLifeSciencesTaskHandler) { - getExecutor() >> Mock(GoogleLifeSciencesExecutor) { getConfig() >> Mock(GoogleLifeSciencesConfig) } - } - def strategy = new GoogleLifeSciencesFileCopyStrategy(Mock(TaskBean), handler) - - when: - def ret1 = strategy.copyMany('file.txt', mockGsPath('gs://foo/bar')) - then: - ret1 == 'IFS=$\'\\n\'; for name in $(eval "ls -1d file.txt" 2>/dev/null);do gsutil -m -q cp -R $name gs://foo/bar/$name; done; unset IFS' - - when: - def ret2 = strategy.copyMany('file name', mockGsPath('gs://foo/bar')) - then: - ret2 == 'IFS=$\'\\n\'; for name in $(eval "ls -1d file\\ name" 2>/dev/null);do gsutil -m -q cp -R $name gs://foo/bar/$name; done; unset IFS' - - when: - def ret3 = strategy.copyMany('dir-name/', mockGsPath('gs://foo/bar')) - then: - ret3 == 'IFS=$\'\\n\'; for name in $(eval "ls -1d dir-name" 2>/dev/null);do gsutil -m -q cp -R $name gs://foo/bar/$name; done; unset IFS' - - - } + } diff --git a/modules/nf-google/src/test/nextflow/cloud/google/lifesciences/bash-wrapper-gcp.txt b/modules/nf-google/src/test/nextflow/cloud/google/lifesciences/bash-wrapper-gcp.txt index e61df68044..7596ff99d3 100644 --- a/modules/nf-google/src/test/nextflow/cloud/google/lifesciences/bash-wrapper-gcp.txt +++ b/modules/nf-google/src/test/nextflow/cloud/google/lifesciences/bash-wrapper-gcp.txt @@ -5,6 +5,92 @@ set -u NXF_DEBUG=${NXF_DEBUG:=0}; [[ $NXF_DEBUG > 1 ]] && set -x NXF_ENTRY=${1:-nxf_main} +# bash helper functions +nxf_cp_retry() { + local max_attempts=1 + local timeout=10 + local attempt=0 + local exitCode=0 + while (( $attempt < $max_attempts )) + do + if "$@" + then + return 0 + else + exitCode=$? + fi + if [[ $exitCode == 0 ]] + then + break + fi + sleep $timeout + attempt=$(( attempt + 1 )) + timeout=$(( timeout * 2 )) + done +} + +nxf_parallel() { + IFS=$'\n' + local cmd=("$@") + local cpus=$(nproc 2>/dev/null || < /proc/cpuinfo grep '^process' -c) + local max=$(if (( cpus>16 )); then echo 16; else echo $cpus; fi) + local i=0 + local pid=() + ( + set +u + while ((i<${#cmd[@]})); do + local copy=() + for x in "${pid[@]}"; do + [[ -e /proc/$x ]] && copy+=($x) + done + pid=("${copy[@]}") + + if ((${#pid[@]}>=$max)); then + sleep 1 + else + eval "${cmd[$i]}" & + pid+=($!) + ((i+=1)) + fi + done + ((${#pid[@]}>0)) && wait ${pid[@]} + ) + unset IFS +} + +# google storage helper +nxf_gs_download() { + local source=$1 + local target=$2 + local project=$3 + local basedir=$(dirname $2) + local ret + local opts="-m -q" + local opts + if [[ $project ]]; then + opts=('-q' '-m' '-u' "$project") + else + opts=('-q' '-m') + fi + + ## download assuming it's a file download + mkdir -p $basedir + ret=$(gsutil ${opts[@]} cp "$source" "$target" 2>&1) || { + ## if fails check if it was trying to download a directory + mkdir $target + gsutil ${opts[@]} cp -R "$source/*" "$target" || { + rm -rf $target + >&2 echo "Unable to download path: $source" + exit 1 + } + } +} + +nxf_gs_upload() { + local name=$1 + local target=$2 + gsutil -m -q cp -R "$name" "$target/$name" +} nxf_date() { local ts=$(date +%s%3N); @@ -66,7 +152,9 @@ nxf_stage() { true # stage input files echo start | gsutil -q cp -c - gs://bucket/work/dir/.command.begin - gsutil -m -q cp gs://bucket/work/dir/.command.sh /work/dir/.command.sh + downloads=() + downloads+=("nxf_gs_download gs://bucket/work/dir/.command.sh .command.sh ") + nxf_parallel "${downloads[@]}" } nxf_unstage() {