Permalink
Browse files

fix dependencies not inferred in some cases due to absolute paths not…

… recognised

affects cleanup and unnecessary re-execution on retry
  • Loading branch information...
ssadedin committed Sep 8, 2018
1 parent d9ba2e8 commit 7830ca4dc82510dfd92f9f0c64bb93359221f9c1
@@ -29,6 +29,8 @@ import groovy.util.logging.Log;
import groovyx.gpars.GParsPool
import groovy.time.TimeCategory;
import groovy.transform.CompileStatic;
import java.nio.file.Files
import java.nio.file.Path
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -65,6 +67,7 @@ class GraphEntry implements Serializable {
List<GraphEntry> children = []
/**
* An optional index to speed up lookups by canonical path - not populated by default,
* but can be populated by using index()
@@ -249,19 +252,21 @@ class GraphEntry implements Serializable {
}
/**
* Divide the outputs into groups depending on their
* inputs.
* @return
* Divide the outputs into groups depending on their inputs.
*
* @return Map keyed on sorted inputs separated by commas (as string),
* with values being the list of outputs having those inputs
*/
def groupOutputs(List<OutputMetaData> outputs = null) {
def outputGroups = [:]
@CompileStatic
Map<String,List> groupOutputs(List<OutputMetaData> outputs = null) {
Map<String,List> outputGroups = [:]
if(outputs == null)
outputs = this.values
for(def o in outputs) {
for(OutputMetaData o in outputs) {
def key = o.inputs.sort().join(',')
if(!outputGroups.containsKey(key)) {
outputGroups[key] = [o]
outputGroups[key] = [o]
}
else {
outputGroups[key] << o
@@ -272,10 +277,10 @@ class GraphEntry implements Serializable {
String dump() {
def inputs = children*.values*.inputs.flatten()
def inputs = children*.values*.inputs.flatten().unique()
if(!inputs)
inputs = ["<no inputs>"]
String inputValue = inputs.join('\n') + ' => \n'
return inputValue + dumpChildren(Math.min(20,inputs.collect {it.size()}.max()))
@@ -316,6 +321,20 @@ class GraphEntry implements Serializable {
return me + filteredChildren*.dumpChildren(indent+Math.min((names.collect{it.size()}.max()?:0), maxIndent)).join('\n')
}
/**
* Return the set of dependencies (inputs) to this graph entry that the given output
* depends on
*
* @param out
* @return
*/
@CompileStatic
List<OutputMetaData> getParentDependencies(OutputMetaData out) {
(List<OutputMetaData>)parents*.values.flatten().grep { OutputMetaData parentOutput ->
out.hasInput(parentOutput.canonicalPath)
}
}
String toString() {
"GraphEntry [outputs=" + values*.outputPath + ", inputs="+values*.inputs + " with ${children.size()} children]"
}
@@ -677,6 +696,17 @@ class Dependencies {
*/
void cleanup(List arguments) {
CliBuilder cli = new CliBuilder(usage: 'cleanup [-v] <files...>')
cli.with {
v 'Use verbose logging'
}
OptionAccessor opts = cli.parse(arguments)
if(opts.v) {
Utils.configureVerboseLogging()
}
arguments = opts.arguments()
log.info "Executing cleanup with arguments $arguments"
List<OutputMetaData> outputs = scanOutputFolder()
@@ -702,11 +732,13 @@ class Dependencies {
// log.info "\nOutput graph is: \n\n" + graph.dump()
// Identify the leaf nodes
List leaves = findLeaves(graph)
List<GraphEntry> leaves = findLeaves(graph)
// Filter out leaf nodes from this list if they are explicitly specified as intermediate files
leaves.removeAll { it.values.every { it.intermediate } }
log.info "Leaf nodes are: " + leaves.collect { GraphEntry e -> e.values*.outputFile.join(',') }.join(',')
// Find all the nodes that exist and match the users specs (or, if no specs, treat as wildcard)
List internalNodes = (outputs - leaves*.values.flatten()).grep { p ->
if(!p.outputFile.exists()) {
@@ -879,10 +911,15 @@ class Dependencies {
// the whole graph from the original inputs through to the final outputs
List allInputs = Utils.time("Calculate unique inputs") { outputs*.inputs.flatten().unique() }
List allOutputs = outputs*.outputPath
// Find all entries with inputs that are not outputs of any other entry
def outputsWithExternalInputs = outputs.grep { p -> ! p.inputs.any { allOutputs.contains(it) } }
Set allOutputs = outputs*.canonicalPath as Set
// Find outputs with "external" inputs - these are the top layer of outputs which
// are fed purely by external inputs.
//
// That is: find all entries with inputs that are not outputs of any other entry
def outputsWithExternalInputs = outputs.grep { OutputMetaData p ->
! p.canonicalInputs.any { ip -> allOutputs.contains(ip) }
}
// NOTE: turning this log on can be expensive for large numbers of inputs and outputs
// log.info "External inputs: " + outputsWithExternalInputs*.inputs + " for outputs " + outputsWithExternalInputs*.outputPath
@@ -891,11 +928,10 @@ class Dependencies {
// find groups of outputs (ie. ones that belong in the same branch of the tree)
// If there is no tree to attach to, make one
def entries = []
def outputGroups = [:]
def createdEntries = [:]
List entries = []
Map<String,List> outputGroups = [:]
Map createdEntries = [:]
if(rootTree == null) {
rootTree = new GraphEntry()
outputGroups = rootTree.groupOutputs(outputsWithExternalInputs)
outputGroups.each { key,outputGroup ->
@@ -918,14 +954,14 @@ class Dependencies {
for(OutputMetaData out in outputsWithExternalInputs) {
GraphEntry entry = new GraphEntry(values: [out])
log.info "New entry for ${out.outputPath}"
// log.info "New entry for ${out.outputPath}"
entries << entry
createdEntries[out.outputPath] = entry
outputGroups[out.outputPath] = entry.values
// find all nodes in the tree which this output depends on
out.inputs.each { inp ->
GraphEntry parentEntry = topRoot.findBy { it.outputPath == inp }
out.canonicalInputs.each { String inp ->
GraphEntry parentEntry = topRoot.findBy { OutputMetaData o -> o.canonicalPath == inp }
if(parentEntry) {
if(!(entry in parentEntry.children))
parentEntry.children << entry
@@ -935,7 +971,7 @@ class Dependencies {
log.info "Dependency $inp for $out.outputPath is external root input"
}
def dependenciesOnParents = entry.parents*.values.flatten().grep { out.inputs.contains(it.outputPath) }
List<OutputMetaData> dependenciesOnParents = entry.getParentDependencies(out)
out.maxTimestamp = (dependenciesOnParents*.maxTimestamp.flatten() + out.timestamp).max()
log.info "Maxtimestamp for $out.outputFile = $out.maxTimestamp"
@@ -956,7 +992,6 @@ class Dependencies {
Set outputsWithExternalInputsSet = outputsWithExternalInputs as Set
List remainingOutputs = outputs.grep { !outputsWithExternalInputsSet.contains(it) }
// remainingOutputs.removeAll(outputsWithExternalInputs)
List remainingWithoutExternal = new ArrayList(remainingOutputs.size())
@@ -1120,6 +1155,7 @@ class Dependencies {
def result = []
def outputs = [] as Set
graph.depthFirst { GraphEntry e ->
// log.info "$e.values has children? " + !e.children.isEmpty()
if(e.children.isEmpty() && e.values*.outputPath.every { !outputs.contains(it) } ) {
result << e
outputs += e.values*.outputPath
@@ -1149,4 +1185,6 @@ class Dependencies {
overrideTimestamps[f.absolutePath] = f.lastModified()
}
}
}
@@ -26,6 +26,7 @@
package bpipe
import groovy.transform.CompileStatic;
import groovy.transform.Memoized
import groovy.util.logging.Log
import java.nio.file.Files;
@@ -222,12 +223,7 @@ class OutputMetaData implements Serializable {
// The properties file may have a cached version of the "canonical path" to the
// output file. However this is an absolute path, so we can only use it if the
// base directory is still the same as when this property file was saved.
if(!p.containsKey("basePath") || (p["basePath"] != Runner.runDirectory)) {
this.canonicalPath = null
}
else {
this.canonicalPath = p.canonicalPath
}
initializeCanonicalPath(p)
basePath = p.basePath
@@ -259,6 +255,16 @@ class OutputMetaData implements Serializable {
accompanies = p.accompanies
command = p.command
}
@CompileStatic
private void initializeCanonicalPath(Properties p) {
if(!p.containsKey("basePath") || (p["basePath"] != Runner.runDirectory)) {
this.canonicalPath = Utils.canonicalFileFor(this.outputFile.path)
}
else {
this.canonicalPath = p.canonicalPath
}
}
@CompileStatic
String getBranchPath() {
@@ -329,4 +335,18 @@ class OutputMetaData implements Serializable {
String toString() {
outputPath + " <= " + this.inputs.join(",")
}
@CompileStatic
boolean hasInput(String inp) {
this.canonicalInputs.contains(inp)
}
List<String> canonicalInputs = null
@CompileStatic
List<String> getCanonicalInputs() {
if(this.canonicalInputs == null)
this.canonicalInputs = this.inputs.collect { Utils.canonicalFileFor(it).path }
return this.canonicalInputs
}
}
@@ -25,6 +25,7 @@
package bpipe
import java.util.logging.Level;
import java.util.logging.Logger
import java.util.regex.Pattern
import groovy.transform.CompileStatic
import groovy.util.logging.Log;
@@ -40,9 +41,10 @@ import groovy.util.logging.Log;
*
* @author simon.sadedin@mcri.edu.au
*/
@Log
class PipelineInput {
private static Logger log = Logger.getLogger('PipelineInput')
/**
* Support implicit cast to String when creating File objects
*/
@@ -108,10 +110,11 @@ class PipelineInput {
this.aliases = aliases
}
@CompileStatic
String toString() {
if(parentError)
throw parentError
List boxed = Utils.box(input)
Collection boxed = Utils.box(input)
if(defaultValueIndex>=boxed.size())
throw new PipelineError("Expected ${defaultValueIndex+1} or more inputs but fewer provided")
@@ -343,6 +346,7 @@ class PipelineInput {
* in the pipeline, and includes the original inputs as the last stage. This "stack" of inputs
* provides an appropriate order for searching for inputs to a pipeline stage.
*/
// @CompileStatic
List<List<String>> computeOutputStack() {
List relatedThreads = [Thread.currentThread().id, Pipeline.rootThreadId]
@@ -364,26 +368,26 @@ class PipelineInput {
// !this.is(it.context.@inputWrapper) && ( this.parent == null || !this.parent.is(it.context.@inputWrapper) )
}.collect { PipelineStage stage ->
List outputs = Utils.box(stage.context.@output)
List outputs = Utils.box(stage.context.@output) as List
log.info "Outputs in search from $stage.stageName will be $outputs"
if(outputs.isEmpty() && stage.context.nextInputs != null)
outputs = Utils.box(stage.context.nextInputs)
outputs = Utils.box(stage.context.nextInputs) as List
return outputs
}
// Add a final stage that represents the original inputs (bit of a hack)
// You can think of it as the initial inputs being the output of some previous stage
// that we know nothing about
reverseOutputs.add(Utils.box(stages[0].context.@input))
reverseOutputs.add(Utils.box(stages[0].context.@input) as List)
// Consider not just the actual inputs to the stage, but also the *original* unmodified inputs
if(stages[0].originalInputs)
reverseOutputs.add(Utils.box(stages[0].originalInputs))
reverseOutputs.add(Utils.box(stages[0].originalInputs) as List)
// Add an initial stage that represents the current input to this stage. This way
// if the from() spec is used and matches the actual inputs then it will go with those
// rather than searching backwards for a previous match
reverseOutputs.add(0,Utils.box(this.@input))
reverseOutputs.add(0,Utils.box(this.@input) as List)
return reverseOutputs
}
@@ -61,27 +61,29 @@ class Utils {
* @param inputs a single string or collection of strings
* @return
*/
static List findOlder(def outputs, def inputs) {
static List findOlder(def outputsToCheck, def inputs) {
// Some pipeline stages don't expect any outputs
// Fixing issue 44 - https://code.google.com/p/bpipe/issues/detail?id=44
if( !outputs || !inputs )
if( !outputsToCheck || !inputs )
return []
// Remove any directories appearing as inputs - their timestamps change whenever
// any file in the dir changes
inputs = inputs.grep { (it != null) && !(new File(it).isDirectory())}
// Box into a collection for simplicity
outputs = box(outputs)
List<File> outputs = box(outputsToCheck)
def inputFiles = inputs.collect { new File(it) }
List<File> inputFiles = inputs.collect { new File(it) }
def inputFileTimestamps = inputFiles.collectEntries { [ it, it.lastModified() ] }
Map<File, Long> inputFileTimestamps = inputFiles.collectEntries { [ it, it.lastModified() ] }
long maxInputTimestamp = (inputFileTimestamps.max { it.value }?.value)?:0
final long maxInputTimestamp = (inputFileTimestamps.max { it.value }?.value)?:0
outputs.collect { new File(it) }.grep { outFile ->
final boolean logTimestamps = inputFiles.size()*outputs.size() < 5000 // 5k lines in the log from 1 loop is too much
outputs.collect { new File(it) }.grep { File outFile ->
long outputTimestamp = outFile.lastModified()
@@ -113,8 +115,6 @@ class Utils {
else
log.info "Checking $outFile against ${inputs.size()} inputs"
boolean logTimestamps = inputFiles.size()*outputs.size() < 5000 // 5k lines in the log from 1 loop is too much
return inputFiles.any { File inFile ->
long inputFileTimestamp = inputFileTimestamps[inFile]
if(logTimestamps)
@@ -751,6 +751,7 @@ class Utils {
static HashMap<String,File> canonicalFiles = new ConcurrentHashMap<String, File>(1000)
@CompileStatic
static File canonicalFileFor(String path) {
File result = canonicalFiles[path]
if(result != null)

0 comments on commit 7830ca4

Please sign in to comment.