Skip to content

Commit

Permalink
Enhanced Google pipelines file copy strategy
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
pditommaso committed Jan 26, 2019
1 parent 4a167e7 commit 678405b
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 51 deletions.
11 changes: 8 additions & 3 deletions modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/

package nextflow.extension

import org.codehaus.groovy.runtime.InvokerHelper
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING

import java.nio.ByteBuffer
Expand Down Expand Up @@ -1480,14 +1482,17 @@ class FilesEx {
}

static String toUriString( Path path ) {
if(!path)
if(path==null)
return null
final scheme = getScheme(path)
if( scheme == 'file' )
return path.toString()
if( scheme == 's3')
if( scheme == 's3' )
return "$scheme:/$path".toString()
else
if( scheme == 'gs' ) {
final bucket = InvokerHelper.invokeMethod(path, 'bucket', InvokerHelper.EMPTY_ARGS)
return "$scheme://$bucket$path".toString()
}
return path.toUri().toString()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ class GooglePipelinesFileCopyStrategy extends SimpleFileCopyStrategy {
def stagePath = Paths.get(stageName)
def parent = stagePath.parent
//we can't simply use the handy ToUriString function since it also URL encodes spaces and gsutil doesn't like that
//TODO: Find a cleaner way of doing this
def escapedStoreUri = Escape.path(absStorePath.getFileSystem().toString()+absStorePath.toString())
def escapedStoreUri = "${absStorePath.getFileSystem()}${Escape.path(absStorePath)}"
def escapedStageName = Escape.path(stageName)

//check if we need to create parent dirs for staging file since gsutil doesn't create them for us
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2013-2019, Centre for Genomic Regulation (CRG)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.cloud.google

import spock.lang.Specification

import java.nio.file.FileSystem
import java.nio.file.Path
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.spi.FileSystemProvider

/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
abstract class GoogleSpecification extends Specification {

protected Path mockGsPath(String path, boolean isDir=false) {
assert path.startsWith('gs://')

def tokens = path.tokenize('/')
def bucket = tokens[1]
def file = '/' + tokens[2..-1].join('/')

def attr = Mock(BasicFileAttributes)
attr.isDirectory() >> isDir
attr.isRegularFile() >> !isDir
attr.isSymbolicLink() >> false

def provider = Mock(FileSystemProvider)
provider.getScheme() >> 'gs'
provider.readAttributes(_, _, _) >> attr

def fs = Mock(FileSystem)
fs.provider() >> provider
fs.toString() >> ('gs:/' + bucket)
def uri = GroovyMock(URI)
uri.toString() >> path


def result = GroovyMock(Path)
result.bucket() >> bucket
result.toUriString() >> path
result.toString() >> file
result.getFileSystem() >> fs
result.toUri() >> uri
result.resolve(_) >> { mockGsPath("$path/${it[0]}")}
result.toAbsolutePath() >> result
result.asBoolean() >> true
return result
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,21 @@

package nextflow.cloud.google.pipelines

import spock.lang.Specification

import java.nio.file.Path
import java.nio.file.Paths

import nextflow.cloud.google.GoogleSpecification
import nextflow.processor.TaskBean
/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class GooglePipelinesScriptLauncherTest extends Specification {
class GooglePipelinesScriptLauncherTest extends GoogleSpecification {


def 'should create task env' () {

given:
def bucket = Paths.get('/bucket/work')
def config = new GooglePipelinesConfiguration(remoteBinDir: 'gs://bucket/bin' as Path)
def bucket = mockGsPath('gs://bucket/work')
def binDir = mockGsPath('gs://bucket/bin')
def config = new GooglePipelinesConfiguration(remoteBinDir: binDir)
def handler = [:] as GooglePipelinesTaskHandler
handler.pipelineConfiguration = config
def bean = [
Expand All @@ -46,8 +44,8 @@ class GooglePipelinesScriptLauncherTest extends Specification {
def binding = new GooglePipelinesScriptLauncher(bean, handler).makeBinding()
then:
binding.task_env == '''\
chmod +x /bucket/work/nextflow-bin/*
export PATH=/bucket/work/nextflow-bin:$PATH
chmod +x /work/nextflow-bin/*
export PATH=/work/nextflow-bin:$PATH
export FOO="xxx"
'''.stripIndent()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,21 @@
package nextflow.cloud.google.pipelines

import spock.lang.Shared
import spock.lang.Specification

import java.nio.file.FileSystem
import java.nio.file.Path
import java.nio.file.Paths
import java.nio.file.spi.FileSystemProvider

import com.google.api.services.genomics.v2alpha1.model.Event
import com.google.api.services.genomics.v2alpha1.model.Metadata
import com.google.api.services.genomics.v2alpha1.model.Operation
import nextflow.Session
import nextflow.cloud.google.GoogleSpecification
import nextflow.exception.ProcessUnrecoverableException
import nextflow.processor.TaskId
import nextflow.processor.TaskRun
import nextflow.processor.TaskStatus
import nextflow.util.CacheHelper

class GooglePipelinesTaskHandlerTest extends Specification {
class GooglePipelinesTaskHandlerTest extends GoogleSpecification {

@Shared
GooglePipelinesConfiguration pipeConfig = new GooglePipelinesConfiguration("testProject",["testZone"],["testRegion"])
Expand Down Expand Up @@ -129,30 +126,13 @@ class GooglePipelinesTaskHandlerTest extends Specification {
op == operation
}

private Path mockPath(String path) {
def provider = Mock(FileSystemProvider)
provider.getScheme() >> 'gs'
def fs = Mock(FileSystem)
fs.provider() >> provider
def uri = GroovyMock(URI)
uri.toString() >> 'gs:/' + path

def result = Mock(Path)
result.toString() >> path
result.toUriString() >> 'gs:/' + path
result.getFileSystem() >> fs
result.toUri() >> uri

return result
}

def 'should create pipeline request' () {
given:

def helper = Mock(GooglePipelinesHelper)
def executor = new GooglePipelinesExecutor(helper: helper)
def config = Mock(GooglePipelinesConfiguration)
def workDir = mockPath('/work/dir')
def workDir = mockGsPath('gs://my-bucket/work/dir')

def task = Mock(TaskRun)
task.getName() >> 'foo'
Expand Down Expand Up @@ -191,12 +171,12 @@ class GooglePipelinesTaskHandlerTest extends Specification {
req.mainScript == 'cd /work/dir; bash .command.run 2>&1 | tee -a .command.log'
// check unstaging script
req.unstagingScript.tokenize(';')[0] == 'cd /work/dir'
req.unstagingScript.tokenize(';')[1] == ' [[ $GOOGLE_PIPELINE_FAILED == 1 || $NXF_DEBUG ]] && gsutil -m -q cp -R /google/ gs://work/dir || true'
req.unstagingScript.tokenize(';')[2] == ' [[ -f .command.trace ]] && gsutil -m -q cp -R .command.trace gs://work/dir || true'
req.unstagingScript.tokenize(';')[3] == ' gsutil -m -q cp -R .command.err gs://work/dir || true'
req.unstagingScript.tokenize(';')[4] == ' gsutil -m -q cp -R .command.out gs://work/dir || true'
req.unstagingScript.tokenize(';')[5] == ' gsutil -m -q cp -R .command.log gs://work/dir || true'
req.unstagingScript.tokenize(';')[6] == ' gsutil -m -q cp -R .exitcode gs://work/dir || true'
req.unstagingScript.tokenize(';')[1] == ' [[ $GOOGLE_PIPELINE_FAILED == 1 || $NXF_DEBUG ]] && gsutil -m -q cp -R /google/ gs://my-bucket/work/dir || true'
req.unstagingScript.tokenize(';')[2] == ' [[ -f .command.trace ]] && gsutil -m -q cp -R .command.trace gs://my-bucket/work/dir || true'
req.unstagingScript.tokenize(';')[3] == ' gsutil -m -q cp -R .command.err gs://my-bucket/work/dir || true'
req.unstagingScript.tokenize(';')[4] == ' gsutil -m -q cp -R .command.out gs://my-bucket/work/dir || true'
req.unstagingScript.tokenize(';')[5] == ' gsutil -m -q cp -R .command.log gs://my-bucket/work/dir || true'
req.unstagingScript.tokenize(';')[6] == ' gsutil -m -q cp -R .exitcode gs://my-bucket/work/dir || true'
req.unstagingScript.tokenize(';').size() == 7
}

Expand All @@ -206,7 +186,7 @@ class GooglePipelinesTaskHandlerTest extends Specification {
def helper = Mock(GooglePipelinesHelper)
def executor = new GooglePipelinesExecutor(helper: helper)
def config = Mock(GooglePipelinesConfiguration)
def workDir = mockPath('/work/dir')
def workDir = mockGsPath('gs://my-bucket/work/dir')

def task = Mock(TaskRun)
task.getName() >> 'foo'
Expand Down Expand Up @@ -237,14 +217,14 @@ class GooglePipelinesTaskHandlerTest extends Specification {
req.mainScript == 'cd /work/dir; bash .command.run 2>&1 | tee -a .command.log'
// check unstaging script
req.unstagingScript.tokenize(';')[0] == 'cd /work/dir'
req.unstagingScript.tokenize(';')[1] == ' [[ $GOOGLE_PIPELINE_FAILED == 1 || $NXF_DEBUG ]] && gsutil -m -q cp -R /google/ gs://work/dir || true'
req.unstagingScript.tokenize(';')[1] == ' [[ $GOOGLE_PIPELINE_FAILED == 1 || $NXF_DEBUG ]] && gsutil -m -q cp -R /google/ gs://my-bucket/work/dir || true'
req.unstagingScript.tokenize(';')[2] == ' foo'
req.unstagingScript.tokenize(';')[3] == ' bar'
req.unstagingScript.tokenize(';')[4] == ' [[ -f .command.trace ]] && gsutil -m -q cp -R .command.trace gs://work/dir || true'
req.unstagingScript.tokenize(';')[5] == ' gsutil -m -q cp -R .command.err gs://work/dir || true'
req.unstagingScript.tokenize(';')[6] == ' gsutil -m -q cp -R .command.out gs://work/dir || true'
req.unstagingScript.tokenize(';')[7] == ' gsutil -m -q cp -R .command.log gs://work/dir || true'
req.unstagingScript.tokenize(';')[8] == ' gsutil -m -q cp -R .exitcode gs://work/dir || true'
req.unstagingScript.tokenize(';')[4] == ' [[ -f .command.trace ]] && gsutil -m -q cp -R .command.trace gs://my-bucket/work/dir || true'
req.unstagingScript.tokenize(';')[5] == ' gsutil -m -q cp -R .command.err gs://my-bucket/work/dir || true'
req.unstagingScript.tokenize(';')[6] == ' gsutil -m -q cp -R .command.out gs://my-bucket/work/dir || true'
req.unstagingScript.tokenize(';')[7] == ' gsutil -m -q cp -R .command.log gs://my-bucket/work/dir || true'
req.unstagingScript.tokenize(';')[8] == ' gsutil -m -q cp -R .exitcode gs://my-bucket/work/dir || true'

req.unstagingScript.tokenize(';').size() == 9
}
Expand Down
49 changes: 49 additions & 0 deletions modules/nf-google/src/test/nextflow/extension/FilesExTest2.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2013-2019, Centre for Genomic Regulation (CRG)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.extension

import spock.lang.Specification
import spock.lang.Unroll

import java.nio.file.Path

import com.google.cloud.storage.contrib.nio.CloudStoragePath

/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class FilesExTest2 extends Specification {

@Unroll
def 'should return uri string for #PATH' () {

when:
def path = PATH as Path
then:
path instanceof CloudStoragePath
FilesEx.toUriString(path) == PATH

where:
PATH | _
'gs://foo/bar' | _
'gs://foo' | _
'gs://foo/' | _
'gs://foo/bar/baz' | _
'gs://foo/bar/baz/' | _
}
}

0 comments on commit 678405b

Please sign in to comment.