Skip to content

Commit

Permalink
Deduplicate GS file + parallel up/downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
pditommaso committed Sep 20, 2020
1 parent 0b65d61 commit 29d3708
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 116 deletions.
Expand Up @@ -25,7 +25,9 @@ import groovy.transform.Memoized
import groovy.transform.ToString
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.cloud.CloudTransferOptions
import nextflow.exception.AbortOperationException
import nextflow.util.Duration
import nextflow.util.MemoryUnit

/**
Expand All @@ -36,7 +38,7 @@ import nextflow.util.MemoryUnit
@Slf4j
@ToString(includePackage = false, includeNames = true)
@CompileStatic
class GoogleLifeSciencesConfig {
class GoogleLifeSciencesConfig implements CloudTransferOptions {

public final static String DEFAULT_COPY_IMAGE = 'google/cloud-sdk:alpine'

Expand All @@ -60,6 +62,10 @@ class GoogleLifeSciencesConfig {
boolean usePrivateAddress
boolean enableRequesterPaysBuckets

int maxParallelTransfers = MAX_TRANSFER
int maxTransferAttempts = MAX_TRANSFER_ATTEMPTS
Duration delayBetweenAttempts = DEFAULT_DELAY_BETWEEN_ATTEMPTS

@Deprecated
GoogleLifeSciencesConfig(String project, List<String> zone, List<String> region, Path remoteBinDir = null, boolean preemptible = false) {
this.project = project
Expand Down Expand Up @@ -117,6 +123,10 @@ class GoogleLifeSciencesConfig {
final debugMode = config.navigate('google.lifeSciences.debug', System.getenv('NXF_DEBUG'))
final privateAddr = config.navigate('google.lifeSciences.usePrivateAddress') as boolean
final requesterPays = config.navigate('google.enableRequesterPaysBuckets') as boolean
//
final maxParallelTransfers = config.navigate('aws.batch.maxParallelTransfers', MAX_TRANSFER) as int
final maxTransferAttempts = config.navigate('aws.batch.maxTransferAttempts', MAX_TRANSFER_ATTEMPTS) as int
final delayBetweenAttempts = config.navigate('aws.batch.delayBetweenAttempts', DEFAULT_DELAY_BETWEEN_ATTEMPTS) as Duration

def zones = (config.navigate("google.zone") as String)?.split(",")?.toList() ?: Collections.<String>emptyList()
def regions = (config.navigate("google.region") as String)?.split(",")?.toList() ?: Collections.<String>emptyList()
Expand All @@ -136,7 +146,11 @@ class GoogleLifeSciencesConfig {
sshDaemon: sshDaemon,
sshImage: sshImage,
usePrivateAddress: privateAddr,
enableRequesterPaysBuckets: requesterPays)
enableRequesterPaysBuckets: requesterPays,
maxParallelTransfers: maxParallelTransfers,
maxTransferAttempts: maxTransferAttempts,
delayBetweenAttempts: delayBetweenAttempts
)
}

static private Integer debugMode0(value) {
Expand Down
Expand Up @@ -109,6 +109,4 @@ class GoogleLifeSciencesExecutor extends Executor {

protected GoogleLifeSciencesConfig getConfig() { return config }



}
Expand Up @@ -17,22 +17,22 @@

package nextflow.cloud.google.lifesciences

import static nextflow.cloud.google.lifesciences.GoogleLifeSciencesHelper.*

import java.nio.file.Path
import java.nio.file.Paths

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.cloud.CloudTransferOptions
import nextflow.executor.BashFunLib
import nextflow.executor.SimpleFileCopyStrategy
import nextflow.processor.TaskBean
import nextflow.processor.TaskRun
import nextflow.util.Escape
import static nextflow.cloud.google.lifesciences.GoogleLifeSciencesHelper.getLocalTaskDir
import static nextflow.cloud.google.lifesciences.GoogleLifeSciencesHelper.getRemoteTaskDir


/**
* Defines the file/script copy strategies for Google Pipelines.
*
* @author Paolo Di Tommaso
* @author Ólafur Haukur Flygenring <olafurh@wuxinextcode.com>
*/
@Slf4j
Expand All @@ -50,59 +50,82 @@ class GoogleLifeSciencesFileCopyStrategy extends SimpleFileCopyStrategy {
this.task = bean
}

String getBeforeStartScript() {
def maxConnect = config.maxParallelTransfers ?: CloudTransferOptions.MAX_TRANSFER
def attempts = config.maxTransferAttempts ?: CloudTransferOptions.MAX_TRANSFER_ATTEMPTS
def delayBetweenAttempts = config.delayBetweenAttempts ?: CloudTransferOptions.DEFAULT_DELAY_BETWEEN_ATTEMPTS

BashFunLib.body(maxConnect, attempts, delayBetweenAttempts) +

'''
# google storage helper
nxf_gs_download() {
local source=$1
local target=$2
local project=$3
local basedir=$(dirname $2)
local ret
local opts="-m -q"
local opts
if [[ $project ]]; then
opts=('-q' '-m' '-u' "$project")
else
opts=('-q' '-m')
fi
## download assuming it's a file download
mkdir -p $basedir
ret=$(gsutil ${opts[@]} cp "$source" "$target" 2>&1) || {
## if fails check if it was trying to download a directory
mkdir $target
gsutil ${opts[@]} cp -R "$source/*" "$target" || {
rm -rf $target
>&2 echo "Unable to download path: $source"
exit 1
}
}
}
nxf_gs_upload() {
local name=$1
local target=$2
gsutil -m -q cp -R "$name" "$target/$name"
}
'''.stripIndent()
}

@Override
String getStageInputFilesScript(Map<String, Path> inputFiles) {

final localTaskDir = getLocalTaskDir(workDir)
final remoteTaskDir = getRemoteTaskDir(workDir)
final createDirectories = []
final stagingCommands = []

final gsutilPrefix = new StringBuilder()
gsutilPrefix.append("gsutil -m -q")
final reqPay = config.enableRequesterPaysBuckets ? config.project : ''

if(config.enableRequesterPaysBuckets) {
gsutilPrefix.append(" -u ${config.project}")
}
for( String target : inputFiles.keySet() ) {
final sourcePath = inputFiles.get(target)

for( String stageName : inputFiles.keySet() ) {
final storePath = inputFiles.get(stageName)
final storePathIsDir = storePath.isDirectory()
final stagePath = Paths.get(stageName)
final parent = stagePath.parent
final escapedStoreUri = Escape.uriPath(storePath)
final escapedStageName = Escape.path(stageName)

//check if we need to create parent dirs for staging file since gsutil doesn't create them for us
if(parent) {
createDirectories << "mkdir -p $localTaskDir/${Escape.path(parent)}".toString()
}
final cmd = config.maxTransferAttempts > 1
? "downloads+=(\"nxf_cp_retry nxf_gs_download ${Escape.uriPath(sourcePath)} ${Escape.path(target)} ${reqPay}\")"
: "downloads+=(\"nxf_gs_download ${Escape.uriPath(sourcePath)} ${Escape.path(target)} ${reqPay}\")"

if(storePathIsDir) {
stagingCommands << "$gsutilPrefix cp -R $escapedStoreUri $localTaskDir".toString()
//check if we need to move the directory (gsutil doesn't support renaming directories on copy)
if(parent || !storePath.name.endsWith(stageName)) {
stagingCommands << "mv $localTaskDir/${Escape.path(storePath.name)} $localTaskDir/$escapedStageName".toString()
}
} else {
stagingCommands << "$gsutilPrefix cp $escapedStoreUri $localTaskDir/$escapedStageName".toString()
}
stagingCommands.add(cmd)
}

final result = new StringBuilder()
// touch
result.append("echo start | gsutil -q cp -c - ${remoteTaskDir}/${TaskRun.CMD_START}").append('\n')

// create directories
if( createDirectories )
result.append(createDirectories.join('\n')) .append('\n')

// stage files
if( stagingCommands )
result.append(stagingCommands.join('\n')) .append('\n')
if( stagingCommands ) {
result.append('downloads=()\n')
result.append(stagingCommands.join('\n')).append('\n')
result.append('nxf_parallel "${downloads[@]}"\n')
}

// copy the remoteBinDir if it is defined
if(config.remoteBinDir) {
final localTaskDir = getLocalTaskDir(workDir)
result
.append("mkdir -p $localTaskDir/nextflow-bin").append('\n')
.append("gsutil -m -q cp -P -r ${Escape.uriPath(config.remoteBinDir)}/* $localTaskDir/nextflow-bin").append('\n')
Expand All @@ -111,18 +134,39 @@ class GoogleLifeSciencesFileCopyStrategy extends SimpleFileCopyStrategy {
result.toString()
}

/**
* Trim-trailing-slash
* @return
*/
private String tts(String loc) {
while( log && loc.size()>1 && loc.endsWith('/') )
loc = loc.substring(0,loc.length()-1)
return loc
}

@Override
String getUnstageOutputFilesScript(List<String> outputFiles, Path targetDir) {

final result = new StringBuilder()

for( String it : outputFiles ) {
result
.append(copyMany(it, targetDir))
.append('\n')
}

return result.toString()
final patterns = normalizeGlobStarPaths(outputFiles)
// create a bash script that will copy the out file to the working directory
log.trace "[GLS] Unstaging file path: $patterns"

if( !patterns )
return null

final escape = new ArrayList(outputFiles.size())
for( String it : patterns )
escape.add(tts(Escape.path(it)))

"""\
uploads=()
IFS=\$'\\n'
for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do
uploads+=("nxf_gs_upload '\$name' ${Escape.uriPath(targetDir)}")
done
unset IFS
nxf_parallel "\${uploads[@]}"
""".stripIndent()
}


Expand All @@ -136,12 +180,6 @@ class GoogleLifeSciencesFileCopyStrategy extends SimpleFileCopyStrategy {
"gsutil -m -q cp -R ${Escape.path(local)} ${Escape.uriPath(target)}"
}

String copyMany(String local, Path target) {
if ( local.endsWith("/") )
local = local.substring(0,local.length()-1)
"IFS=\$'\\n'; for name in \$(eval \"ls -1d ${Escape.path(local)}\" 2>/dev/null);do gsutil -m -q cp -R \$name ${Escape.uriPath(target)}/\$name; done; unset IFS"
}

/**
* {@inheritDoc}
*/
Expand Down

0 comments on commit 29d3708

Please sign in to comment.