Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic task cleanup #3849

Draft
wants to merge 40 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
47d0168
Add initial task graph and metadata json file
bentsherman Mar 27, 2023
ae67027
Add task inputs and outputs to conrete DAG
bentsherman Mar 28, 2023
8f95cd6
Fix failing tests
bentsherman Mar 28, 2023
9f11e4b
Use path-based APIs to get file metadata
bentsherman Mar 29, 2023
db6aed1
Merge branch 'master' into ben-task-graph
bentsherman Mar 29, 2023
8456892
Use buffer to compute checksum
bentsherman Mar 30, 2023
77f2cdc
Add support for temporary output paths
bentsherman Mar 31, 2023
3e55ad5
Fix failing test
bentsherman Mar 31, 2023
e307f75
Add caveat about overlapping output channels [ci skip]
bentsherman Mar 31, 2023
08881b0
Delete files instead of emptying them (now supports directories and r…
bentsherman Apr 1, 2023
0cf07ec
Add `eager' cleanup option
bentsherman Apr 10, 2023
73b2f3b
Fix concurrency issues [ci fast]
bentsherman Apr 10, 2023
0dd98d6
Merge branch 'master' into ben-task-graph-pull
bentsherman Apr 21, 2023
0f505d3
Merge branch 'master' into ben-task-graph-pull
bentsherman Apr 26, 2023
e81e584
Replace synchronized with lock
bentsherman Apr 26, 2023
ba2e7a6
Merge branch 'ben-task-graph-pull' into 452-eager-cleanup
bentsherman Apr 28, 2023
f7bcfa8
Remove dependency on task graph branch
bentsherman Apr 28, 2023
4d90e27
Use downstream tasks to determine lifetime for task cleanup
bentsherman Apr 28, 2023
dd23b2a
Rename TemporaryFileObserver to TaskCleanupObserver
bentsherman Jul 7, 2023
6a34be6
Merge branch 'master' into 452-eager-cleanup
bentsherman Jul 7, 2023
ff08984
Wait for output files to be published
bentsherman Jul 7, 2023
9b343b6
Log warning if eager cleanup is used with incompatible publish modes
bentsherman Jul 7, 2023
6b5a820
Add eager cleanup for individual output files
bentsherman Jul 7, 2023
c42f249
Add warning about includeInputs with eager cleanup
bentsherman Jul 10, 2023
8b66c39
Add resumability
bentsherman Jul 11, 2023
43c939d
Upgrade kryo to 5.4.0 (#3562)
bentsherman Jul 11, 2023
5a78cf9
Add serializer for HashCode
bentsherman Jul 11, 2023
7b02b8e
Fix failing tests
bentsherman Jul 11, 2023
c727c05
Revert Kryo upgrade
bentsherman Jul 12, 2023
d37ee1f
Add kryo instantiator for HashCode
bentsherman Jul 12, 2023
859e96b
Remove HashCode serializer
bentsherman Jul 12, 2023
c89811c
Add log messages, fix infinite hang
bentsherman Jul 12, 2023
a034f60
Add support for includeInputs
bentsherman Jul 12, 2023
9741b1c
Update log messages
bentsherman Jul 12, 2023
3aae1af
Add lazy, eager, aggressive cleanup strategies
bentsherman Jul 12, 2023
08342dd
Merge branch 'master' into 452-eager-cleanup
bentsherman Sep 13, 2023
3498c5d
Add thread pool for task cleanup
bentsherman Sep 14, 2023
c421617
Minor improvement to checkCachedOutput()
bentsherman Sep 14, 2023
5b05aad
Merge branch 'master' into 452-eager-cleanup
bentsherman Jan 31, 2024
9ce10dc
minor edits
bentsherman Feb 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1377,6 +1377,18 @@ There are additional variables that can be defined within a configuration file t
The use of the `cleanup` option will prevent the use of the *resume* feature on subsequent executions of that pipeline run. Also, be aware that deleting all scratch files can take a lot of time, especially when using a shared file system or remote cloud storage.
:::

*New in version `23.10.0`:* it is possible to use an "eager" cleanup strategy by setting `cleanup = 'eager'`. This strategy will try to delete each task directory as soon as it is no longer needed by downstream tasks, rather than at the end of the workflow run. This feature is useful for freeing up disk storage during the workflow run.

The lifetime of a task is determined by the downstream tasks that use the task's output files. When all of these tasks finish, the upstream task can be deleted.

The following caveats apply when using `cleanup = 'eager'`:

- Eager cleanup will break the resumability of your pipeline. If a workflow run fails, you will have to restart from the beginning, whereas with `cleanup = true` the cleanup would not have happened. As a result, eager cleanup is designed to be used only when you are confident that the workflow run will not fail.

- Output files should not be published via symlink when using eager cleanup, because the symlinks will be invalidated when the original task directory is deleted. Avoid using the following publish modes: `copyNoFollow`, `rellink`, `symlink`.

- Eager cleanup currently does not work properly with processes that forward input files with the `includeInputs` option. In this case, the forwarded input files will be deleted prematurely, and any process that consumes the forwarded output channel may fail or produce incorrect output.

(config-profiles)=

## Config profiles
Expand Down
13 changes: 12 additions & 1 deletion modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,17 @@ class Session implements ISession {
}
}

void notifyProcessClose(TaskProcessor process) {
observers.each { observer ->
try {
observer.onProcessClose(process)
}
catch( Exception e ) {
log.debug(e.getMessage(), e)
}
}
}

void notifyProcessTerminate(TaskProcessor process) {
for( int i=0; i<observers.size(); i++ ) {
final observer = observers.get(i)
Expand Down Expand Up @@ -1120,7 +1131,7 @@ class Session implements ISession {
* Delete the workflow work directory from tasks temporary files
*/
void cleanup() {
if( !workDir || !config.cleanup )
if( !workDir || config.cleanup != true )
return

if( aborted || cancelled || error )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2298,6 +2298,10 @@ class TaskProcessor {
state.update { StateObj it -> it.incCompleted() }
}

protected void closeProcess() {
session.notifyProcessClose(this)
}

protected void terminateProcess() {
log.trace "<${name}> Sending poison pills and terminating process"
sendPoisonPill()
Expand Down Expand Up @@ -2440,6 +2444,7 @@ class TaskProcessor {
@Override
void afterStop(final DataflowProcessor processor) {
log.trace "<${name}> After stop"
closeProcess()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class DefaultObserverFactory implements TraceObserverFactory {
createDagObserver(result)
createWebLogObserver(result)
createAnsiLogObserver(result)
createTemporaryFileObserver(result)
return result
}

Expand Down Expand Up @@ -116,4 +117,9 @@ class DefaultObserverFactory implements TraceObserverFactory {
result << observer
}

protected void createTemporaryFileObserver(Collection<TraceObserver> result) {
if( session.config.cleanup == 'eager' )
result << new TemporaryFileObserver()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.trace

import java.nio.file.Path
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.dag.DAG
import nextflow.file.FileHelper
import nextflow.processor.TaskHandler
import nextflow.processor.TaskProcessor
import nextflow.processor.TaskRun
import nextflow.script.params.FileOutParam
/**
* Delete task directories once they are no longer needed.
*
* @author Ben Sherman <bentshermann@gmail.com>
*/
@Slf4j
@CompileStatic
class TemporaryFileObserver implements TraceObserver {

private DAG dag

private Map<String,ProcessState> processes = new HashMap<>()

private Map<TaskRun,TaskState> tasks = new HashMap<>()

private Map<Path,TaskRun> taskLookup = new HashMap<>()

private Lock sync = new ReentrantLock()

@Override
void onFlowCreate(Session session) {
this.dag = session.dag
}

/**
* When the workflow begins, determine the consumers of each process
* in the DAG.
*/
@Override
void onFlowBegin() {

for( def processNode : dag.vertices ) {
// skip nodes that are not processes
if( !processNode.process )
continue

// find all downstream processes in the abstract dag
def processName = processNode.process.name
def consumers = [] as Set
def queue = [ processNode ]

while( !queue.isEmpty() ) {
// remove a node from the search queue
final sourceNode = queue.remove(0)

// search each outgoing edge from the source node
for( def edge : dag.edges ) {
if( edge.from != sourceNode )
continue

def node = edge.to

// skip if process is terminal
if( !node )
continue

// add process nodes to the list of consumers
if( node.process != null )
consumers.add(node.process.name)
// add operator nodes to the queue to keep searching
else
queue.add(node)
}
}

log.trace "Process `${processName}` is consumed by the following processes: ${consumers.collect({ "`${it}`" }).join(', ')}"

processes[processName] = new ProcessState(consumers ?: [processName] as Set)
}
}

/**
* When a task is created, add it to the state map and add it as a consumer
* of any task whose output it takes as input.
*
* @param handler
* @param trace
*/
@Override
void onProcessPending(TaskHandler handler, TraceRecord trace) {
// query task input files
final task = handler.task
final inputs = task.getInputFilesMap().values()

sync.lock()
try {
// add task to the task state map
tasks[task] = new TaskState()

// add task as consumer to each task whoes output it takes as input
for( Path path : inputs )
if( path in taskLookup )
tasks[taskLookup[path]].consumers.add(task)
}
finally {
sync.unlock()
}
}

/**
* When a task is completed, track any temporary output files
* for automatic cleanup.
*
* @param handler
* @param trace
*/
@Override
void onProcessComplete(TaskHandler handler, TraceRecord trace) {
// query task output files
final task = handler.task
final outputs = task
.getOutputsByType(FileOutParam)
.values()
.flatten() as List<Path>

sync.lock()
try {
// mark task as completed
tasks[task].completed = true

// scan tasks for cleanup
cleanup0()

// add new output files to task lookup
for( Path path : outputs )
taskLookup[path] = task
}
finally {
sync.unlock()
}
}

/**
* When a process is closed (all tasks of the process have been created),
* mark the process as closed and scan tasks for cleanup.
*
* @param process
*/
@Override
void onProcessClose(TaskProcessor process) {
sync.lock()
try {
processes[process.name].closed = true
cleanup0()
}
finally {
sync.unlock()
}
}

/**
* Delete any task directories that have no more barriers.
*/
private void cleanup0() {
for( TaskRun task : tasks.keySet() ) {
final taskState = tasks[task]
if( taskState.completed && !taskState.deleted && canDelete(task) ) {
log.trace "Deleting task directory: ${task.workDir.toUriString()}"
FileHelper.deletePath(task.workDir)
taskState.deleted = true
}
}
}

/**
* Determine whether a task directory can be deleted.
*
* A task directory can be deleted if all of its process consumers
* are closed and all of its task consumers are completed.
*/
private boolean canDelete(TaskRun task) {
final taskState = tasks[task]
final processConsumers = processes[task.processor.name].consumers
final taskConsumers = taskState.consumers
processConsumers.every { p -> processes[p].closed } && taskConsumers.every { t -> tasks[t].completed }
}

static private class ProcessState {
Set<String> consumers
boolean closed = false

ProcessState(Set<String> consumers) {
this.consumers = consumers
}
}

static private class TaskState {
Set<TaskRun> consumers = [] as Set
boolean completed = false
boolean deleted = false
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ trait TraceObserver {
*/
void onProcessCreate( TaskProcessor process ){}

/**
* Invoked when the process is closed (all tasks have been created).
*/
void onProcessClose( TaskProcessor process ){}
bentsherman marked this conversation as resolved.
Show resolved Hide resolved

/*
* Invoked when all tak have been executed and process ends.
*/
Expand Down
22 changes: 14 additions & 8 deletions modules/nextflow/src/test/groovy/nextflow/SessionTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import nextflow.container.ContainerConfig
import nextflow.exception.AbortOperationException
import nextflow.script.ScriptFile
import nextflow.script.WorkflowMetadata
import nextflow.trace.TemporaryFileObserver
import nextflow.trace.TraceFileObserver
import nextflow.trace.TraceHelper
import nextflow.trace.WorkflowStatsObserver
import nextflow.trace.TraceFileObserver
import nextflow.util.Duration
import nextflow.util.VersionNumber
import spock.lang.Specification
Expand Down Expand Up @@ -248,26 +249,32 @@ class SessionTest extends Specification {
session = [:] as Session
result = session.createObservers()
then:
result.size()==1
result.size() == 1
result.any { it instanceof WorkflowStatsObserver }

when:
session = [:] as Session
session.config = [trace: [enabled: true, file:'name.txt']]
session.config = [cleanup: 'eager']
result = session.createObservers()
observer = result[1] as TraceFileObserver
then:
result.size() == 2
result.any { it instanceof TemporaryFileObserver }

when:
session = [:] as Session
session.config = [trace: [enabled: true, file:'name.txt']]
result = session.createObservers()
observer = result.find { it instanceof TraceFileObserver }
then:
observer.tracePath == Paths.get('name.txt').complete()
observer.separator == '\t'

when:
session = [:] as Session
session.config = [trace: [enabled: true, sep: 'x', fields: 'task_id,name,exit', file: 'alpha.txt']]
result = session.createObservers()
observer = result[1] as TraceFileObserver
observer = result.find { it instanceof TraceFileObserver }
then:
result.size() == 2
observer.tracePath == Paths.get('alpha.txt').complete()
observer.separator == 'x'
observer.fields == ['task_id','name','exit']
Expand All @@ -283,9 +290,8 @@ class SessionTest extends Specification {
session = [:] as Session
session.config = [trace: [enabled: true, fields: 'task_id,name,exit,vmem']]
result = session.createObservers()
observer = result[1] as TraceFileObserver
observer = result.find { it instanceof TraceFileObserver }
then:
result.size() == 2
observer.tracePath == Paths.get('trace-20221001.txt').complete()
observer.separator == '\t'
observer.fields == ['task_id','name','exit','vmem']
Expand Down