diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 10e4563aeb..a332ee9321 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -23,6 +23,7 @@ import java.nio.file.Paths import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit import com.google.common.hash.HashCode import com.upplication.s3fs.S3OutputStream @@ -66,6 +67,7 @@ import nextflow.trace.TraceRecord import nextflow.trace.WebLogObserver import nextflow.trace.WorkflowStats import nextflow.util.Barrier +import nextflow.util.BlockingThreadExecutorFactory import nextflow.util.ConfigHelper import nextflow.util.Duration import nextflow.util.HistoryFile @@ -606,7 +608,7 @@ class Session implements ISession { } protected Map findBinEntries(Path path) { - def result = new HashMap() + def result = new LinkedHashMap(10) path .listFiles { file -> Files.isExecutable(file) } .each { Path file -> result.put(file.name,file) } @@ -1220,7 +1222,7 @@ class Session implements ISession { * @return The value of tasks to handle in parallel */ @Memoized - public int getQueueSize( String execName, int defValue ) { + int getQueueSize( String execName, int defValue ) { getExecConfigProp(execName, 'queueSize', defValue) as int } @@ -1232,7 +1234,7 @@ class Session implements ISession { * @return A {@code Duration} object. Default '1 second' */ @Memoized - public Duration getPollInterval( String execName, Duration defValue = Duration.of('1sec') ) { + Duration getPollInterval( String execName, Duration defValue = Duration.of('1sec') ) { getExecConfigProp( execName, 'pollInterval', defValue ) as Duration } @@ -1245,7 +1247,7 @@ class Session implements ISession { * @return A {@code Duration} object. Default '90 second' */ @Memoized - public Duration getExitReadTimeout( String execName, Duration defValue = Duration.of('90sec') ) { + Duration getExitReadTimeout( String execName, Duration defValue = Duration.of('90sec') ) { getExecConfigProp( execName, 'exitReadTimeout', defValue ) as Duration } @@ -1269,7 +1271,7 @@ class Session implements ISession { * @return A {@code Duration} object. Default '1 minute' */ @Memoized - public Duration getQueueStatInterval( String execName, Duration defValue = Duration.of('1min') ) { + Duration getQueueStatInterval( String execName, Duration defValue = Duration.of('1min') ) { getExecConfigProp(execName, 'queueStatInterval', defValue) as Duration } @@ -1299,4 +1301,19 @@ class Session implements ISession { ansiLogObserver ? ansiLogObserver.appendInfo(file.text) : Files.copy(file, System.out) } + @Memoized // <-- this guarantees that the same executor is used across different publish dir in the same session + @CompileStatic + synchronized ExecutorService getFileTransferThreadPool() { + final result = new BlockingThreadExecutorFactory() + .withName('FileTransfer') + .withMaxThreads( Runtime.runtime.availableProcessors()*3 ) + .create() + + this.onShutdown { + result.shutdown() + result.awaitTermination(36, TimeUnit.HOURS) + } + + return result + } } diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy index 731a7ae079..753fb16d35 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy @@ -20,7 +20,6 @@ import java.nio.file.Files import java.nio.file.Path import groovy.transform.CompileStatic -import groovy.transform.Memoized import groovy.transform.PackageScope import groovy.util.logging.Slf4j import nextflow.container.ContainerBuilder @@ -140,12 +139,6 @@ class BashWrapperBuilder { systemOsName.startsWith('Mac') } - // memoize the result to avoid to multiple time the same input files - @Memoized - protected Map getResolvedInputs() { - copyStrategy.resolveForeignFiles(inputFiles) - } - @PackageScope String buildNew0() { final template = BashWrapperBuilder.class.getResourceAsStream('command-run.txt') try { @@ -228,7 +221,7 @@ class BashWrapperBuilder { /* * staging input files when required */ - final stagingScript = copyStrategy.getStageInputFilesScript(resolvedInputs) + final stagingScript = copyStrategy.getStageInputFilesScript(inputFiles) binding.stage_inputs = stagingScript ? "# stage input files\n${stagingScript}" : null binding.stdout_file = TaskRun.CMD_OUTFILE @@ -432,7 +425,7 @@ class BashWrapperBuilder { */ // do not mount inputs when they are copied in the task work dir -- see #1105 if( stageInMode != 'copy' ) - builder.addMountForInputs(resolvedInputs) + builder.addMountForInputs(inputFiles) builder.addMount(binDir) diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/ScriptFileCopyStrategy.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/ScriptFileCopyStrategy.groovy index 5d47dfeea3..49b4f3a87c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/ScriptFileCopyStrategy.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/ScriptFileCopyStrategy.groovy @@ -28,16 +28,6 @@ interface ScriptFileCopyStrategy { */ String getBeforeStartScript() - /** - * Resolve all foreign file to storing then in the pipeline working directory - * - * @param inputFiles A map of of files. It can hold both local and foreign files - * @return - * A map of files in which all foreign files have been replaced - * by local copies in the working directory - */ - Map resolveForeignFiles(Map inputFiles) - /** * @param inputFiles All the input files as a map of pairs * @return A BASH snippet included in the wrapper script that stages the task input files diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/SimpleFileCopyStrategy.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/SimpleFileCopyStrategy.groovy index 6a218a211f..6da7bc1042 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/SimpleFileCopyStrategy.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/SimpleFileCopyStrategy.groovy @@ -93,14 +93,14 @@ class SimpleFileCopyStrategy implements ScriptFileCopyStrategy { * @return * A map containing remote file paths resolved as local paths */ - @Override - Map resolveForeignFiles(Map files) { - if( !files ) - return files - if( !porter ) - porter = ((Session)Global.session).getFilePorter() - porter.stageForeignFiles(files, getStagingDir()) - } +// @Override +// Map resolveForeignFiles(Map files) { +// if( !files ) +// return files +// if( !porter ) +// porter = ((Session)Global.session).getFilePorter() +// porter.stageForeignFiles(files, getStagingDir()) +// } protected Path getStagingDir() { final result = workDir.parent?.parent?.resolve('stage') diff --git a/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy b/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy index 37c0d5924d..a926b529f1 100644 --- a/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy @@ -18,22 +18,15 @@ package nextflow.file import java.nio.file.NoSuchFileException import java.nio.file.Path -import java.util.concurrent.Callable import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutorService import java.util.concurrent.Future -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.ThreadFactory -import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException -import java.util.concurrent.atomic.AtomicInteger import groovy.transform.CompileStatic -import groovy.transform.EqualsAndHashCode import groovy.transform.PackageScope import groovy.transform.ToString -import groovy.transform.TupleConstructor import groovy.util.logging.Slf4j import nextflow.Session import nextflow.exception.ProcessStageException @@ -46,7 +39,7 @@ import nextflow.util.Duration * @author Paolo Di Tommaso */ @Slf4j -@ToString(includeFields = true, includeNames = true, includePackage = false, includes = 'coreThreads,maxThreads,maxRetries,keepAlive,pollTimeout') +@ToString(includeFields = true, includeNames = true, includePackage = false, includes = 'maxRetries,pollTimeout') @CompileStatic class FilePorter { @@ -54,111 +47,97 @@ class FilePorter { static final private int MAX_RETRIES = 3 - static final private int CORE_THREADS = Runtime.runtime.availableProcessors() - - static final private int MAX_THREADS = 2 * CORE_THREADS - - static final private Duration KEEP_ALIVE = Duration.of('60sec') - static final private Duration POLL_TIMEOUT = Duration.of('2sec') - final Map> stagingFutures = new WeakHashMap<>() + final Map stagingTransfers = new HashMap<>() - @Lazy private ExecutorService stagingExecutor = createExecutor() - - private Duration keepAlive + @Lazy private ExecutorService threadPool = session.getFileTransferThreadPool() private Duration pollTimeout - final int coreThreads - - final int maxThreads - final int maxRetries final Session session FilePorter( Session session ) { this.session = session - keepAlive = session.config.navigate('filePorter.keepAlive') as Duration ?: KEEP_ALIVE - pollTimeout = session.config.navigate('filePorter.pollTimeout') as Duration ?: POLL_TIMEOUT - coreThreads = session.config.navigate('filePorter.coreThreads') as Integer ?: CORE_THREADS - maxThreads = session.config.navigate('filePorter.maxThreads') as Integer ?: MAX_THREADS maxRetries = session.config.navigate('filePorter.maxRetries') as Integer ?: MAX_RETRIES + pollTimeout = session.config.navigate('filePorter.pollTimeout') as Duration ?: POLL_TIMEOUT } - /** - * Given a map of files, copies all the ones stored in a foreign file system - * and store them in the current working directory - * - * @param filesMap A map of files - * @return A new files map in which all foreign {@link Path} are replaced with local paths - */ - Map stageForeignFiles(Map filesMap, Path stageDir) { - final List actions = [] - final List paths = [] - final scheme = stageDir.scheme - - // check for foreign file to copy - for( Map.Entry entry : filesMap ) { - def name = entry.getKey() - def path = entry.getValue() - if( path.scheme == scheme ) - continue - // copy the path with a thread pool - actions.add( new FileStageAction(name, path, stageDir, maxRetries) ) - paths.add(path) + Batch newBatch(Path stageDir) { new Batch(stageDir) } + + void transfer(Batch batch) { + if( batch.size() ) { + log.trace "Stage foreign files: $batch" + submitStagingActions(batch.foreignPaths, batch.stageDir) + log.trace "Stage foreign files completed: $batch" } + } - // no foreign file to copy, just return the original map - if( !actions ) - return filesMap + protected FileTransfer createFileTransfer(Path source, Path stageDir) { + final stagePath = getCachePathFor(source,stageDir) + return new FileTransfer(source, stagePath, maxRetries) + } + + protected Future submitForExecution(FileTransfer transfer) { + threadPool.submit(transfer) + } - log.trace "Stage foreign files: $paths" - final result = new HashMap(filesMap) + protected FileTransfer getOrSubmit(Path source, Path stageDir) { + synchronized (stagingTransfers) { + FileTransfer transfer = stagingTransfers.get(source) + if( transfer == null ) { + transfer = createFileTransfer(source, stageDir) + transfer.refCount = 1 + transfer.result = submitForExecution(transfer) + stagingTransfers.put(source, transfer) + } + else + transfer.refCount ++ - final futures = submitStagingActions(actions) - log.trace "Stage foreign files completed: $paths" + return transfer + } + } - for( Future fut : futures ) { - final pair = fut.get() - result.put( pair.name, pair.path ) + protected void decOrRemove(FileTransfer action) { + synchronized (stagingTransfers) { + final key = action.source + assert stagingTransfers.containsKey(key) + if( --action.refCount == 0 ) { + stagingTransfers.remove(key) + } } - return result } - protected List> submitStagingActions(List actions) { + protected List submitStagingActions(List paths, Path stageDir) { - String msg = null - final result = new ArrayList>(actions.size()) - for ( def action : actions ) { + final result = new ArrayList(paths.size()) + for ( def file : paths ) { // here's the magic: use a Map to check if a future for the staging action already exist // - if exists take it to wait to the submit action termination // - otherwise create a new future submitting the action operation - final future = stagingFutures.getOrCreate(action) { - msg = "Staging foreign file: ${action.path.toUriString()}" - return stagingExecutor.submit(action) - } - result.add(future) + result << getOrSubmit(file,stageDir) } // wait for staging actions completion // note: make a copy of the list to avoid a ConcurrentModificationException - final futures = new ArrayList>(result) + final futures = new ArrayList(result) while( futures.size() ) { final itr = futures.iterator() while( itr.hasNext() ) { + final action = itr.next() try { - final fut = itr.next() - fut.get(pollTimeout.millis, TimeUnit.MILLISECONDS) + action.result.get(pollTimeout.millis, TimeUnit.MILLISECONDS) itr.remove() + decOrRemove(action) } catch( TimeoutException e ) { // the timeout event is used to report an info message that // a download is taking place only the very first time + final msg = action.getMessageAndClear() if( msg ) { log.info((String)msg) - msg = null } } catch( ExecutionException e ) { @@ -170,22 +149,100 @@ class FilePorter { return result } + /** + * Models a batch (collection) of foreign files that need to be transferred to + * the process staging are co-located with the work directory + */ + static class Batch { + + /** + * Holds the list of foreign files to be transferred + */ + private List foreignPaths = new ArrayList<>() + + /** + * The *local* directory where against where files need to be staged. + */ + private Path stageDir + + /** + * The stage directory scheme. A file is considered *foreign* when its scheme is different from + * the stage directory scheme. + */ + private String stageScheme + + Batch(Path stageDir) { + this.stageDir = stageDir + this.stageScheme = stageDir.scheme + } + + /** + * Add the specified path to list of foreign files when + * the path scheme is different from the stage directory scheme + * + * @param path + * The path to include in the foreign files batch + * @return + * + */ + Path addToForeign(Path path) { + if( path.scheme == stageScheme ) + return path + + // copy the path with a thread pool + foreignPaths << path + return getCachePathFor(path, stageDir) + } + + /** + * @return The number of foreign files in the batch + */ + int size() { foreignPaths.size() } + + /** + * @return {@code true} when it contains one or more files, {@code false} otherwise + */ + boolean asBoolean() { foreignPaths.size()>0 } + + } + + /** + * Model a foreign file transfer + */ @ToString @CompileStatic - @EqualsAndHashCode @PackageScope - @TupleConstructor - static class FileStageAction implements Callable { + static class FileTransfer implements Runnable { + + /** + * The source file (foreign) to be transfer + */ + final Path source - final String name - final Path path - final Path stageDir + /** + * The target path where the file need to be copied + */ + final Path target + + /** + * Max number of retries in case of error + */ final int maxRetries + volatile int refCount + volatile Future result + private String message + + FileTransfer(Path foreignPath, Path stagePath, int maxRetries=0) { + this.source = foreignPath + this.target = stagePath + this.maxRetries = maxRetries + this.message = "Staging foreign file: ${source.toUriString()}" + } + @Override - NamePathPair call() throws Exception { - final stagedPath = stageForeignFile(path,stageDir) - return new NamePathPair(name, stagedPath) + void run() throws Exception { + stageForeignFile(source, target) } /** @@ -194,15 +251,12 @@ class FilePorter { * @param filePath The {@link Path} of the remote file to copy * @return The path of a local copy of the remote file */ - protected Path stageForeignFile(Path filePath, Path stageDir) { - // create cache directory - final hash = CacheHelper.hasher([filePath, stageDir]).hash().toString() - final target = getCacheDir(stageDir, hash).resolve(filePath.getName()) + protected Path stageForeignFile(Path filePath, Path stagePath) { int count = 0 while( true ) { try { - return stageForeignFile0(filePath, target) + return stageForeignFile0(filePath, stagePath) } catch( IOException e ) { if( count++ < maxRetries && !(e instanceof NoSuchFileException )) { @@ -236,74 +290,30 @@ class FilePorter { return FileHelper.copyPath(source, target) } - private Path getCacheDir(Path workDir, String hash) { - def bucket = hash.substring(0,2) - def result = workDir.resolve( "$bucket/${hash.substring(2)}") - - if( !FilesEx.mkdirs(result) ) { - throw new IOException("Unable to create cache directory: $result -- Verify file system access permissions or if a file having the same name exists") - } - return result.toAbsolutePath() + synchronized String getMessageAndClear() { + def result = message + message = null + return result } - } - @ToString - @EqualsAndHashCode - @TupleConstructor - @PackageScope - static class NamePathPair { - String name - Path path - - String toString() { - "name=$name,path=$path" - } + static protected Path getCachePathFor(Path filePath, Path stageDir) { + final dirPath = stageDir.toUriString() // <-- use a string to avoid changes in the dir to alter the hashing + final hash = CacheHelper.hasher([filePath, dirPath]).hash().toString() + final result = getCacheDir0(stageDir, hash).resolve(filePath.getName()) + return result } - private ExecutorService createExecutor() { - log.debug "Creating executor service for $this" - - final result = new ThreadPoolExecutor( - coreThreads, - maxThreads, - keepAlive.millis, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(), - new DownloaderThreadFactory() ) - - result.allowCoreThreadTimeOut(true); + static private Path getCacheDir0(Path workDir, String hash) { + def bucket = hash.substring(0,2) + def result = workDir.resolve( "$bucket/${hash.substring(2)}") - // register the shutdown on termination - session.onShutdown { - result.shutdown() - result.awaitTermination(1, TimeUnit.MINUTES) + if( !FilesEx.mkdirs(result) ) { + throw new IOException("Unable to create cache directory: $result -- Verify file system access permissions or if a file having the same name exists") } - - return result + return result.toAbsolutePath() } - /** - * Custom thread factory - */ - static private class DownloaderThreadFactory implements ThreadFactory { - private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger(1); - private final String namePrefix = "FilePorter-" - - DownloaderThreadFactory() { - SecurityManager s = System.getSecurityManager(); - group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); - } - Thread newThread(Runnable r) { - Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); - if (t.isDaemon()) - t.setDaemon(false); - if (t.getPriority() != Thread.NORM_PRIORITY) - t.setPriority(Thread.NORM_PRIORITY); - return t; - } - } } diff --git a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy index d0c340d49a..b0b1efbfb0 100644 --- a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy @@ -106,7 +106,7 @@ class K8sTaskHandler extends TaskHandler { } // get input files paths - final paths = DockerBuilder.inputFilesToPaths(builder.getResolvedInputs()) + final paths = DockerBuilder.inputFilesToPaths(builder.getInputFiles()) final binDir = builder.binDir final workDir = builder.workDir // add standard paths diff --git a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sWrapperBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sWrapperBuilder.groovy index de62adb003..76846754d4 100644 --- a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sWrapperBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sWrapperBuilder.groovy @@ -15,7 +15,7 @@ */ package nextflow.k8s -import java.nio.file.Path + import groovy.transform.CompileStatic import nextflow.executor.BashWrapperBuilder @@ -42,10 +42,4 @@ class K8sWrapperBuilder extends BashWrapperBuilder { containerConfig.fixOwnership } - @Override - Map getResolvedInputs() { - super.getResolvedInputs() - } - - } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 98b3c7c2f9..a46215d53a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -24,11 +24,9 @@ import java.nio.file.LinkOption import java.nio.file.Path import java.nio.file.PathMatcher import java.util.concurrent.ExecutorService -import java.util.concurrent.TimeUnit import groovy.transform.CompileStatic import groovy.transform.EqualsAndHashCode -import groovy.transform.Memoized import groovy.transform.PackageScope import groovy.transform.ToString import groovy.util.logging.Slf4j @@ -36,7 +34,6 @@ import nextflow.Global import nextflow.Session import nextflow.extension.FilesEx import nextflow.file.FileHelper -import nextflow.util.BlockingThreadExecutorFactory /** * Implements the {@code publishDir} directory. It create links or copies the output * files of a given task to a user specified directory. @@ -89,6 +86,9 @@ class PublishDir { private boolean nullPathWarn + @Lazy + private ExecutorService threadPool = (Global.session as Session).getFileTransferThreadPool() + void setPath( Closure obj ) { setPath( obj.call() as Path ) } @@ -210,7 +210,7 @@ class PublishDir { safeProcessFile(source, destination) } else { - executor.submit({ safeProcessFile(source, destination) } as Runnable) + threadPool.submit({ safeProcessFile(source, destination) } as Runnable) } } @@ -334,24 +334,5 @@ class PublishDir { } } - @Memoized // <-- this guarantees that the same executor is used across different publish dir in the same session - @CompileStatic - static synchronized ExecutorService createExecutor(Session session) { - final result = new BlockingThreadExecutorFactory() - .withName('PublishDirExecutor') - .withMaxThreads( Runtime.runtime.availableProcessors()*3 ) - .create() - - session?.onShutdown { - result.shutdown() - result.awaitTermination(36,TimeUnit.HOURS) - } - - return result - } - @PackageScope - static ExecutorService getExecutor() { - createExecutor(Global.session as Session) - } } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index 7c32fb0f16..eff672826a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -15,6 +15,8 @@ */ package nextflow.processor +import static nextflow.processor.ErrorStrategy.* + import java.nio.file.LinkOption import java.nio.file.NoSuchFileException import java.nio.file.Path @@ -62,6 +64,7 @@ import nextflow.extension.DataflowHelper import nextflow.file.FileHelper import nextflow.file.FileHolder import nextflow.file.FilePatternSplitter +import nextflow.file.FilePorter import nextflow.script.BaseScript import nextflow.script.params.BasicMode import nextflow.script.params.EachInParam @@ -85,10 +88,6 @@ import nextflow.util.ArrayBag import nextflow.util.BlankSeparatedList import nextflow.util.CacheHelper import nextflow.util.CollectionHelper -import static nextflow.processor.ErrorStrategy.FINISH -import static nextflow.processor.ErrorStrategy.IGNORE -import static nextflow.processor.ErrorStrategy.RETRY -import static nextflow.processor.ErrorStrategy.TERMINATE /** * Implement nextflow process execution logic * @@ -1551,7 +1550,7 @@ class TaskProcessor { } - protected List normalizeInputToFiles( Object obj, int count ) { + protected List normalizeInputToFiles( Object obj, int count, FilePorter.Batch batch ) { Collection allItems = obj instanceof Collection ? obj : [obj] def len = allItems.size() @@ -1559,7 +1558,16 @@ class TaskProcessor { // use a bag so that cache hash key is not affected by file entries order def files = new ArrayBag(len) for( def item : allItems ) { - files << normalizeInputToFile(item, "input.${++count}") + + if( item instanceof Path ) { + def path = batch.addToForeign(item) + def holder = new FileHolder(path) + files << holder + } + else { + files << normalizeInputToFile(item, "input.${++count}") + } + } return files @@ -1753,18 +1761,26 @@ class TaskProcessor { return count } + protected Path getStageDir() { + return executor.getWorkDir().resolve('stage') + } + + final protected void makeTaskContextStage2( TaskRun task, Map secondPass, int count ) { final ctx = task.context final allNames = new HashMap() + final FilePorter.Batch batch = session.filePorter.newBatch(getStageDir()) + // -- all file parameters are processed in a second pass // so that we can use resolve the variables that eventually are in the file name - secondPass.each { FileInParam param, val -> - - def fileParam = param as FileInParam - def normalized = normalizeInputToFiles(val,count) - def resolved = expandWildcards( fileParam.getFilePattern(ctx), normalized ) + for( Map.Entry entry : secondPass.entrySet() ) { + final param = entry.getKey() + final val = entry.getValue() + final fileParam = param as FileInParam + final normalized = normalizeInputToFiles(val,count, batch) + final resolved = expandWildcards( fileParam.getFilePattern(ctx), normalized ) ctx.put( param.name, singleItemOrList(resolved, task.type) ) count += resolved.size() for( FileHolder item : resolved ) { @@ -1787,6 +1803,9 @@ class TaskProcessor { def message = "Process `$name` input file name collision -- There are multiple input files for each of the following file names: ${conflicts.keySet().join(', ')}" throw new ProcessUnrecoverableException(message) } + + // -- download foreign files + session.filePorter.transfer(batch) } final protected void makeTaskContextStage3( TaskRun task, HashCode hash, Path folder ) { diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/BashWrapperBuilderTest.groovy b/modules/nextflow/src/test/groovy/nextflow/executor/BashWrapperBuilderTest.groovy index 0a1292f1b5..f3c7cab044 100644 --- a/modules/nextflow/src/test/groovy/nextflow/executor/BashWrapperBuilderTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/executor/BashWrapperBuilderTest.groovy @@ -170,30 +170,6 @@ class BashWrapperBuilderTest extends Specification { folder?.deleteDir() } - def 'should resolve foreign files' () { - - given: - def INPUTS = [ - 'foo.txt': Paths.get('/some/foo.txt'), - 'bar.txt': Paths.get('/some/bar.txt'), - ] - - def RESOLVED = [ - 'foo.txt': Paths.get('/some/foo.txt'), - 'BAR.txt': Paths.get('/some/BAR.txt'), - ] - def bean = Mock(TaskBean) - bean.getInputFiles() >> INPUTS - def copy = Mock(SimpleFileCopyStrategy) - def bash = Spy(BashWrapperBuilder, constructorArgs:[bean, copy]) - - when: - def files = bash.getResolvedInputs() - then: - 1 * copy.resolveForeignFiles(INPUTS) >> RESOLVED - files == RESOLVED - - } def 'should create module command' () { given: @@ -220,7 +196,7 @@ class BashWrapperBuilderTest extends Specification { bash.getStatsEnabled() >> false bash.getStageInMode() >> 'symlink' - bash.getResolvedInputs() >> [:] + bash.getInputFiles() >> [:] bash.getContainerConfig() >> [engine: 'singularity', envWhitelist: 'FOO,BAR'] bash.getContainerImage() >> 'foo/bar' bash.getContainerMount() >> null @@ -247,7 +223,7 @@ class BashWrapperBuilderTest extends Specification { bash.createContainerBuilder(null) then: bash.createContainerBuilder0('docker') >> BUILDER - bash.getResolvedInputs() >> INPUTS + bash.getInputFiles() >> INPUTS bash.getStageInMode() >> null 1 * BUILDER.addMountForInputs(INPUTS) >> null diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/SimpleFileCopyStrategyTest.groovy b/modules/nextflow/src/test/groovy/nextflow/executor/SimpleFileCopyStrategyTest.groovy index 48d0819ccc..97a97a0a4f 100644 --- a/modules/nextflow/src/test/groovy/nextflow/executor/SimpleFileCopyStrategyTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/executor/SimpleFileCopyStrategyTest.groovy @@ -16,16 +16,12 @@ package nextflow.executor -import spock.lang.Specification -import spock.lang.Unroll - -import java.nio.file.Files import java.nio.file.Path import java.nio.file.Paths -import nextflow.Session import nextflow.processor.TaskBean -import test.TestHelper +import spock.lang.Specification +import spock.lang.Unroll /** * * @author Paolo Di Tommaso @@ -321,30 +317,6 @@ class SimpleFileCopyStrategyTest extends Specification { } - def 'should resolve foreign files' () { - given: - def folder = Files.createTempDirectory('test') - def session = new Session(workDir: folder) - - def INPUTS = [ - 'foo.txt': Paths.get('/some/foo.txt'), - 'bar.txt': Paths.get('/some/bar.txt'), - 'hello.txt': TestHelper.createInMemTempFile('any.name','Hello world') - ] - def strategy = new SimpleFileCopyStrategy(workDir: folder.resolve('xx/yy')) - - when: - def result = strategy.resolveForeignFiles(INPUTS) - then: - result.size() == 3 - result['foo.txt'] == Paths.get('/some/foo.txt') - result['bar.txt'] == Paths.get('/some/bar.txt') - result['hello.txt'].text == 'Hello world' - result['hello.txt'].toString().startsWith(folder.resolve('stage').toString()) - - cleanup: - folder.deleteDir() - } def 'should return cp script to unstage output files to S3' () { diff --git a/modules/nextflow/src/test/groovy/nextflow/file/FilePorterTest.groovy b/modules/nextflow/src/test/groovy/nextflow/file/FilePorterTest.groovy index 4abd6c4f16..77c829e973 100644 --- a/modules/nextflow/src/test/groovy/nextflow/file/FilePorterTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/file/FilePorterTest.groovy @@ -16,21 +16,15 @@ package nextflow.file -import spock.lang.Ignore -import spock.lang.Specification - import java.nio.file.FileSystems import java.nio.file.Files import java.nio.file.Path import java.nio.file.Paths -import java.util.concurrent.Callable -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit -import java.util.concurrent.TimeoutException import groovy.util.logging.Slf4j import nextflow.Session import nextflow.exception.ProcessStageException +import spock.lang.Specification import test.TestHelper /** * @@ -39,37 +33,7 @@ import test.TestHelper @Slf4j class FilePorterTest extends Specification { - def 'should get the core threads value' () { - - when: - def session = new Session() - def porter = new FilePorter(session) - then: - porter.coreThreads == Runtime.getRuntime().availableProcessors() - - when: - session = new Session([filePorter: [coreThreads: 10]]) - porter = new FilePorter(session) - then: - porter.coreThreads == 10 - - } - - def 'should get the max threads value' () { - - when: - def session = new Session() - def porter = new FilePorter(session) - then: - porter.maxThreads == 2 * Runtime.getRuntime().availableProcessors() - - when: - session = new Session([filePorter: [maxThreads: 99]]) - porter = new FilePorter(session) - then: - porter.maxThreads == 99 - } def 'should get the max retries value' () { @@ -97,46 +61,47 @@ class FilePorterTest extends Specification { def foreign1 = TestHelper.createInMemTempFile('hola.txt', 'hola mundo!') def foreign2 = TestHelper.createInMemTempFile('ciao.txt', 'ciao mondo!') def local = Paths.get('local.txt') - def files = [foo: local, bar: foreign1, baz: foreign2] when: def porter = new FilePorter(session) - def result = porter.stageForeignFiles(files, folder) + def batch = porter.newBatch(folder) + def file0 = batch.addToForeign(local) + def file1 = batch.addToForeign(foreign1) + def file2 = batch.addToForeign(foreign2) then: - result.foo == Paths.get('local.txt') - - result.bar.name == 'hola.txt' - result.bar.text == 'hola mundo!' - result.bar.fileSystem == FileSystems.default + file0 == local + and: + file1 != foreign1 + file1.name == foreign1.name + and: + file2 != foreign2 + file2.name == foreign2.name - result.baz.name == 'ciao.txt' - result.baz.text == 'ciao mondo!' - result.baz.fileSystem == FileSystems.default + when: + porter.transfer(batch) + then: + file1.name == 'hola.txt' + file1.text == 'hola mundo!' + file1.fileSystem == FileSystems.default + and: + file2.name == 'ciao.txt' + file2.text == 'ciao mondo!' + file2.fileSystem == FileSystems.default cleanup: folder?.deleteDir() } - static class DummyStage extends FilePorter.FileStageAction { - int secs - DummyStage(String name, Path file, int secs ) { - super(name,file,Paths.get('/work/dir'),0) - this.secs = secs - } - @Override - FilePorter.NamePathPair call() throws Exception { - log.debug "Dummy staging $path" - sleep secs - return new FilePorter.NamePathPair(name, path.resolve(name)) - } - } + static class ErrorStage extends FilePorter.FileTransfer { - static class ErrorStage extends FilePorter.FileStageAction { + ErrorStage(Path path, Path stagePath, int maxRetries) { + super(path, stagePath, maxRetries) + } @Override - FilePorter.NamePathPair call() throws Exception { + void run() throws Exception { throw new ProcessStageException('Cannot stage gile') } } @@ -144,24 +109,23 @@ class FilePorterTest extends Specification { def 'should submit actions' () { given: + def foreign1 = TestHelper.createInMemTempFile('hola.txt', 'hola mundo!') + def foreign2 = TestHelper.createInMemTempFile('ciao.txt', 'ciao mondo!') + def folder = Files.createTempDirectory('test') def session = new Session(workDir: folder) - def porter = new FilePorter(session) + FilePorter porter = Spy(FilePorter, constructorArgs:[session]) + def files = [ foreign1, foreign2 ] when: - def actions = [ - new DummyStage('foo', Paths.get('/data/foo'), 500), - new DummyStage('bar', Paths.get('/data/bat'), 3000) - ] - - def futures = porter.submitStagingActions(actions) + def result = porter.submitStagingActions(files, folder) then: - futures.size() == 2 - futures[0].isDone() - futures[1].isDone() + result.size() == 2 + result[0].result.done + result[1].result.done when: - porter.submitStagingActions([ new ErrorStage() ]) + porter.submitStagingActions([Paths.get('/missing/file')], folder) then: thrown(ProcessStageException) @@ -170,35 +134,67 @@ class FilePorterTest extends Specification { } - @Ignore - def 'should access future' () { + + def 'should check batch size and truth' () { given: - def exec = Executors.newFixedThreadPool(2) + def STAGE = Files.createTempDirectory('test') + def FTP_FILE = 'ftp://host.com/file.txt' as Path + def LOC_FILE = '/local/data.txt' as Path + def session = Mock(Session) + session.config >> [:] + def porter = new FilePorter(session) + when: - def fut = exec.submit( { log.info 'start'; sleep 5000; log.info 'done'; return 'Hello' } as Callable ) - - def wait = { - while( true ) { - try { - def str = fut.get(1, TimeUnit.SECONDS) - log.info "message => $str" - break - } - catch( TimeoutException e ) { - // - log.info('timeout') - } - } - } + def batch = porter.newBatch(STAGE) + then: + !batch + batch.size() == 0 - def t1 = Thread.start(wait) - def t2 = Thread.start(wait) + when: + batch.addToForeign(LOC_FILE) + then: + batch.size() == 0 + !batch - t1.join() - t2.join() + when: + def result = batch.addToForeign(FTP_FILE) + then: + result.scheme == 'file' + batch.size() == 1 + batch + cleanup: + STAGE?.deleteDir() + + } + + def 'should return stage path' () { + + given: + def STAGE = Files.createTempDirectory('test') + def FTP_FILE1 = 'ftp://host.com/file1.txt' as Path + def FTP_FILE2 = 'ftp://host.com/file2.txt' as Path + + when: + def stage1 = FilePorter.getCachePathFor(FTP_FILE1, STAGE) then: - noExceptionThrown() + stage1.toString().startsWith( STAGE.toString() ) + + when: + def stage2 = FilePorter.getCachePathFor(FTP_FILE2, STAGE) + then: + stage2.toString().startsWith( STAGE.toString() ) + stage1 != stage2 + + when: + STAGE.resolve('foo.txt').text = 'ciao' // <-- add a file to alter the content of the dir + and: + def newStage1 = FilePorter.getCachePathFor(FTP_FILE1, STAGE) + then: + stage1 == newStage1 + + cleanup: + STAGE?.deleteDir() } } diff --git a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy index 5c7e26e28f..4827706321 100644 --- a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy @@ -559,7 +559,7 @@ class K8sTaskHandlerTest extends Specification { mounts = handler.getContainerMounts() then: 1 * k8sConfig.getAutoMountHostPaths() >> true - 1 * wrapper.getResolvedInputs() >> ['foo': Paths.get('/base_path/foo.txt'), 'bar': Paths.get('/base_path/bar.txt')] + 1 * wrapper.getInputFiles() >> ['foo': Paths.get('/base_path/foo.txt'), 'bar': Paths.get('/base_path/bar.txt')] 1 * wrapper.getBinDir() >> Paths.get('/user/bin') 1 * wrapper.getWorkDir() >> Paths.get('/work/dir') mounts == ['/base_path', '/user/bin', '/work/dir'] diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy index d43f4652a1..67af601dcc 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy @@ -21,11 +21,10 @@ import java.nio.file.Files import java.nio.file.Paths import java.util.concurrent.TimeUnit -import nextflow.Global +import nextflow.Session import nextflow.file.FileHelper import spock.lang.Specification import test.TestHelper - /** * * @author Paolo Di Tommaso @@ -136,7 +135,7 @@ class PublishDirTest extends Specification { def 'should copy output files' () { given: - Global.session = null + def session = new Session() def folder = Files.createTempDirectory('nxf') folder.resolve('work-dir').mkdir() folder.resolve('work-dir/file1.txt').text = 'aaa' @@ -163,8 +162,8 @@ class PublishDirTest extends Specification { def publisher = new PublishDir(path: publishDir, mode: 'copy') publisher.apply( outputs, task ) - PublishDir.executor.shutdown() - PublishDir.executor.awaitTermination(5, TimeUnit.SECONDS) + session.fileTransferThreadPool.shutdown() + session.fileTransferThreadPool.awaitTermination(5, TimeUnit.SECONDS) then: publishDir.resolve('file1.txt').text == 'aaa' diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskProcessorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskProcessorTest.groovy index 95e4c3c58b..729af1cd9c 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskProcessorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskProcessorTest.groovy @@ -28,6 +28,7 @@ import nextflow.ISession import nextflow.Session import nextflow.exception.ProcessException import nextflow.exception.ProcessUnrecoverableException +import nextflow.executor.Executor import nextflow.executor.NopeExecutor import nextflow.file.FileHolder import nextflow.script.BaseScript @@ -787,4 +788,15 @@ class TaskProcessorTest extends Specification { } + def 'should return stage dir' () { + given: + def WORK_DIR = Paths.get('/the/work/dir') + def processor = new TaskProcessor() + processor.executor = Mock(Executor) { getWorkDir() >> WORK_DIR } + + expect: + processor.getStageDir() == WORK_DIR.resolve('stage') + } + + } diff --git a/modules/nf-ga4gh/src/main/nextflow/ga4gh/tes/executor/TesFileCopyStrategy.groovy b/modules/nf-ga4gh/src/main/nextflow/ga4gh/tes/executor/TesFileCopyStrategy.groovy index 92171987d5..e42add1b18 100644 --- a/modules/nf-ga4gh/src/main/nextflow/ga4gh/tes/executor/TesFileCopyStrategy.groovy +++ b/modules/nf-ga4gh/src/main/nextflow/ga4gh/tes/executor/TesFileCopyStrategy.groovy @@ -39,14 +39,6 @@ class TesFileCopyStrategy implements ScriptFileCopyStrategy { return null } - /** - * {@inheritDoc} - */ - @Override - Map resolveForeignFiles(Map inputFile) { - return inputFile - } - /** * {@inheritDoc} */ diff --git a/modules/nf-ignite/src/main/nextflow/executor/IgScriptStagingStrategy.groovy b/modules/nf-ignite/src/main/nextflow/executor/IgScriptStagingStrategy.groovy index 92af512756..47f6c34d87 100644 --- a/modules/nf-ignite/src/main/nextflow/executor/IgScriptStagingStrategy.groovy +++ b/modules/nf-ignite/src/main/nextflow/executor/IgScriptStagingStrategy.groovy @@ -90,17 +90,6 @@ class IgScriptStagingStrategy extends IgFileStagingStrategy implements ScriptFil return null } - /** - * Turn off remote file download - * - * @param inputFiles - * @return - */ - @Override - Map resolveForeignFiles(Map inputFiles) { - return inputFiles - } - /** * Turn off file unstage from the wrapper script returning a null string * diff --git a/modules/nf-ignite/src/test/nextflow/executor/IgScriptStagingStrategyTest.groovy b/modules/nf-ignite/src/test/nextflow/executor/IgScriptStagingStrategyTest.groovy index f6767186d8..8572846fa6 100644 --- a/modules/nf-ignite/src/test/nextflow/executor/IgScriptStagingStrategyTest.groovy +++ b/modules/nf-ignite/src/test/nextflow/executor/IgScriptStagingStrategyTest.groovy @@ -34,7 +34,6 @@ class IgScriptStagingStrategyTest extends Specification { strategy.getBeforeStartScript() == null strategy.getStageInputFilesScript(['foo.txt': PATH]) == null strategy.getUnstageOutputFilesScript(['foo.txt'], PATH) == null - strategy.resolveForeignFiles(['foo.txt': PATH]) == ['foo.txt': PATH] strategy.touchFile(PATH) == delegate.touchFile(PATH) strategy.fileStr(PATH) == delegate.fileStr(PATH)