diff --git a/baseprocess.go b/baseprocess.go index af6bda2..5b4ff2c 100644 --- a/baseprocess.go +++ b/baseprocess.go @@ -40,7 +40,7 @@ func (p *BaseProcess) Workflow() *Workflow { // InPort returns the in-port with name portName func (p *BaseProcess) InPort(portName string) *InPort { - if p.inPorts[portName] == nil { + if _, ok := p.inPorts[portName]; !ok { Failf("No such in-port ('%s') for process '%s'. Please check your workflow code!\n", portName, p.name) } return p.inPorts[portName] @@ -48,7 +48,7 @@ func (p *BaseProcess) InPort(portName string) *InPort { // InitInPort adds the in-port port to the process, with name portName func (p *BaseProcess) InitInPort(proc WorkflowProcess, portName string) { - if p.inPorts[portName] != nil { + if _, ok := p.inPorts[portName]; ok { Failf("Such an in-port ('%s') already exists for process '%s'. Please check your workflow code!\n", portName, p.name) } ipt := NewInPort(portName) diff --git a/examples/filegen/filegen.go b/examples/filegen/filegen.go index 9f5c263..68841be 100644 --- a/examples/filegen/filegen.go +++ b/examples/filegen/filegen.go @@ -11,7 +11,7 @@ func main() { fq := spc.NewFileSource(wf, "file_src", "hej1.txt", "hej2.txt", "hej3.txt") fw := sp.NewProc(wf, "filewriter", "echo {i:in} > {o:out}") - fw.SetPathPattern("out", "{i:in}") + fw.SetOut("out", "{i:in}") fw.In("in").From(fq.Out()) wf.Run() diff --git a/examples/param_channels/params.go b/examples/param_channels/params.go index 3faf83a..05bc75f 100644 --- a/examples/param_channels/params.go +++ b/examples/param_channels/params.go @@ -15,7 +15,7 @@ func main() { // An abc file printer abc := wf.NewProc("abc", "echo {p:a} {p:b} {p:c} > {o:out}; sleep 1") abc.Spawn = true - abc.SetPathPattern("out", "{p:a}_{p:b}_{p:c}.txt") + abc.SetOut("out", "{p:a}_{p:b}_{p:c}.txt") // A printer task prt := wf.NewProc("printer", "cat {i:in} >> log.txt") diff --git a/examples/resequencing/resequencing.go b/examples/resequencing/resequencing.go index a24d1e7..f451a19 100644 --- a/examples/resequencing/resequencing.go +++ b/examples/resequencing/resequencing.go @@ -15,20 +15,20 @@ package main import ( "fmt" - . "github.com/scipipe/scipipe" + sp "github.com/scipipe/scipipe" ) // ------------------------------------------------------------------------------------ // Set up static stuff like paths // ------------------------------------------------------------------------------------ const ( - fastq_base_url = "http://bioinfo.perdanauniversity.edu.my/tein4ngs/ngspractice/" - fastq_file_pat = "%s.ILLUMINA.low_coverage.4p_%s.fq" - ref_base_url = "http://ftp.ensembl.org/pub/release-75/fasta/homo_sapiens/dna/" - ref_file = "Homo_sapiens.GRCh37.75.dna.chromosome.17.fa" - ref_file_gz = "Homo_sapiens.GRCh37.75.dna.chromosome.17.fa.gz" - vcf_base_url = "http://ftp.1000genomes.ebi.ac.uk/vol1/ftp/phase1/analysis_results/integrated_call_sets/" - vcf_file = "ALL.chr17.integrated_phase1_v3.20101123.snps_indels_svs.genotypes.vcf.gz" + fastqBaseUrl = "http://bioinfo.perdanauniversity.edu.my/tein4ngs/ngspractice/" + fastQFilePtn = "%s.ILLUMINA.low_coverage.4p_%s.fq" + refBaseUrl = "http://ftp.ensembl.org/pub/release-75/fasta/homo_sapiens/dna/" + refFile = "Homo_sapiens.GRCh37.75.dna.chromosome.17.fa" + refFileGz = "Homo_sapiens.GRCh37.75.dna.chromosome.17.fa.gz" + vcfBaseUrl = "http://ftp.1000genomes.ebi.ac.uk/vol1/ftp/phase1/analysis_results/integrated_call_sets/" + vcfFile = "ALL.chr17.integrated_phase1_v3.20101123.snps_indels_svs.genotypes.vcf.gz" ) // ------------------------------------------------------------------------------------ @@ -43,77 +43,72 @@ func main() { // -------------------------------------------------------------------------------- // Initialize Workflow // -------------------------------------------------------------------------------- - wf := NewWorkflow("resequencing_wf", 4) + wf := sp.NewWorkflow("resequencing_workflow", 4) // -------------------------------------------------------------------------------- // Download Reference Genome // -------------------------------------------------------------------------------- - downloadRefCmd := "wget -O {o:outfile} " + ref_base_url + ref_file_gz - downloadRef := wf.NewProc("download_ref", downloadRefCmd) - downloadRef.SetPathStatic("outfile", ref_file_gz) + dlRef := wf.NewProc("download_ref", "wget -O {o:outfile} "+refBaseUrl+refFileGz) + dlRef.SetOut("outfile", refFileGz) // -------------------------------------------------------------------------------- // Unzip ref file // -------------------------------------------------------------------------------- - ungzipRefCmd := "gunzip -c {i:in} > {o:out}" - ungzipRef := wf.NewProc("ugzip_ref", ungzipRefCmd) - ungzipRef.SetPathReplace("in", "out", ".gz", "") - ungzipRef.In("in").From(downloadRef.Out("outfile")) + ungzipRef := wf.NewProc("ugzip_ref", "gunzip -c {i:in} > {o:out}") + ungzipRef.In("in").From(dlRef.Out("outfile")) + ungzipRef.SetOut("out", "{i:in|s/.gz//}") // -------------------------------------------------------------------------------- // Index Reference Genome // -------------------------------------------------------------------------------- indexRef := wf.NewProc("index_ref", "bwa index -a bwtsw {i:index}; echo done > {o:done}") - indexRef.SetPathExtend("index", "done", ".indexed") indexRef.In("index").From(ungzipRef.Out("out")) + indexRef.SetOut("done", "{i:index}.indexed") // Create (multi-level) maps where we can gather outports from processes // for each for loop iteration and access them in the merge step later - outPorts := map[string]map[string]map[string]*OutPort{} - for _, indv := range individuals { - outPorts[indv] = map[string]map[string]*OutPort{} - for _, smpl := range samples { - outPorts[indv][smpl] = map[string]*OutPort{} - indv_smpl := "_" + indv + "_" + smpl + outs := map[string]map[string]map[string]*sp.OutPort{} + for _, ind := range individuals { + outs[ind] = map[string]map[string]*sp.OutPort{} + for _, spl := range samples { + outs[ind][spl] = map[string]*sp.OutPort{} + indSpl := "_" + ind + "_" + spl // ------------------------------------------------------------------------ // Download FastQ component // ------------------------------------------------------------------------ - file_name := fmt.Sprintf(fastq_file_pat, indv, smpl) - downloadFastQCmd := "wget -O {o:fastq} " + fastq_base_url + file_name - downloadFastQ := wf.NewProc("download_fastq"+indv_smpl, downloadFastQCmd) - downloadFastQ.SetPathStatic("fastq", file_name) + fastqFile := fmt.Sprintf(fastQFilePtn, ind, spl) + dlFastq := wf.NewProc("download_fastq"+indSpl, "wget -O {o:fastq} "+fastqBaseUrl+fastqFile) + dlFastq.SetOut("fastq", fastqFile) // Save outPorts for later use - outPorts[indv][smpl]["fastq"] = downloadFastQ.Out("fastq") + outs[ind][spl]["fastq"] = dlFastq.Out("fastq") // ------------------------------------------------------------------------ // BWA Align // ------------------------------------------------------------------------ - bwaAlignCmd := "bwa aln {i:ref} {i:fastq} > {o:sai} # {i:idxdone}" - bwaAlign := wf.NewProc("bwa_aln"+indv_smpl, bwaAlignCmd) - bwaAlign.SetPathExtend("fastq", "sai", ".sai") - bwaAlign.In("ref").From(ungzipRef.Out("out")) - bwaAlign.In("idxdone").From(indexRef.Out("done")) - bwaAlign.In("fastq").From(downloadFastQ.Out("fastq")) + align := wf.NewProc("bwa_aln"+indSpl, "bwa aln {i:ref} {i:fastq} > {o:sai} # {i:idxdone}") + align.In("ref").From(ungzipRef.Out("out")) + align.In("idxdone").From(indexRef.Out("done")) + align.In("fastq").From(dlFastq.Out("fastq")) + align.SetOut("sai", "{i:fastq}.sai") // Save outPorts for later use - outPorts[indv][smpl]["sai"] = bwaAlign.Out("sai") + outs[ind][spl]["sai"] = align.Out("sai") } // --------------------------------------------------------------------------- // Merge // --------------------------------------------------------------------------- - bwaMergeCmd := "bwa sampe {i:ref} {i:sai1} {i:sai2} {i:fq1} {i:fq2} > {o:merged} # {i:refdone} {p:indv}" - bwaMerge := wf.NewProc("merge_"+indv, bwaMergeCmd) - bwaMerge.SetPathPattern("merged", "{p:indv}.merged.sam") - bwaMerge.InParamPort("indv").FromStr(indv) - bwaMerge.In("ref").From(ungzipRef.Out("out")) - bwaMerge.In("refdone").From(indexRef.Out("done")) - bwaMerge.In("sai1").From(outPorts[indv]["1"]["sai"]) - bwaMerge.In("sai2").From(outPorts[indv]["2"]["sai"]) - bwaMerge.In("fq1").From(outPorts[indv]["1"]["fastq"]) - bwaMerge.In("fq2").From(outPorts[indv]["2"]["fastq"]) + merge := wf.NewProc("merge_"+ind, "bwa sampe {i:ref} {i:sai1} {i:sai2} {i:fq1} {i:fq2} > {o:merged} # {i:refdone} {p:ind}") + merge.InParam("ind").FromStr(ind) + merge.In("ref").From(ungzipRef.Out("out")) + merge.In("refdone").From(indexRef.Out("done")) + merge.In("sai1").From(outs[ind]["1"]["sai"]) + merge.In("sai2").From(outs[ind]["2"]["sai"]) + merge.In("fq1").From(outs[ind]["1"]["fastq"]) + merge.In("fq2").From(outs[ind]["2"]["fastq"]) + merge.SetOut("merged", "{p:ind}.merged.sam") } // ------------------------------------------------------------------------------- diff --git a/ip.go b/ip.go index cb0014b..6417960 100644 --- a/ip.go +++ b/ip.go @@ -112,14 +112,21 @@ func (ip *FileIP) Path() string { // TempDir returns the path to a temporary directory where outputs are written func (ip *FileIP) TempDir() string { - return ip.path + ".tmp" + return filepath.Dir(ip.TempPath()) } // TempPath returns the temporary path of the physical file func (ip *FileIP) TempPath() string { - return ip.TempDir() + "/" + filepath.Base(ip.path) + if ip.path[0] == '/' { + return AbsPathPlaceholder + ip.path + } + return ip.path } +// AbsPathPlaceholder is a string to use instead of an initial '/', to indicate +// a path that belongs to the absolute root +const AbsPathPlaceholder = "__abs" + // FifoPath returns the path to use when a FIFO file is used instead of a // normal file func (ip *FileIP) FifoPath() string { diff --git a/ip_test.go b/ip_test.go index d004a4e..e3123ce 100644 --- a/ip_test.go +++ b/ip_test.go @@ -1,7 +1,6 @@ package scipipe import ( - "path/filepath" "testing" ) @@ -12,7 +11,7 @@ const ( func TestIPPaths(t *testing.T) { ip := NewFileIP(TESTPATH) assertPathsEqual(t, ip.Path(), TESTPATH) - assertPathsEqual(t, ip.TempPath(), TESTPATH+".tmp/"+filepath.Base(TESTPATH)) + assertPathsEqual(t, ip.TempPath(), TESTPATH) assertPathsEqual(t, ip.FifoPath(), TESTPATH+".fifo") } diff --git a/process.go b/process.go index a16666f..58e267d 100644 --- a/process.go +++ b/process.go @@ -58,29 +58,33 @@ func (p *Process) initPortsFromCmdPattern(cmd string, params map[string]string) // Find in/out port names and params and set up ports r := getShellCommandPlaceHolderRegex() ms := r.FindAllStringSubmatch(cmd, -1) - if len(ms) == 0 { - Fail("No placeholders found in command: " + cmd) - } + + portInfos := map[string]map[string]string{} for _, m := range ms { portType := m[1] portName := m[2] - if portType == "o" || portType == "os" { - p.outPorts[portName] = NewOutPort(portName) - p.outPorts[portName].process = p - if portType == "os" { + portInfos[portName] = map[string]string{} + portInfos[portName]["type"] = portType + if len(m) > 3 { + portInfos[portName]["extension"] = m[4] + } + } + + for portName, portInfo := range portInfos { + if portInfo["type"] == "o" || portInfo["type"] == "os" { + p.InitOutPort(p, portName) + if portInfo["type"] == "os" { p.OutPortsDoStream[portName] = true } - } else if portType == "i" { - p.inPorts[portName] = NewInPort(portName) - p.inPorts[portName].process = p - } else if portType == "p" { + } else if portInfo["type"] == "i" { + p.InitInPort(p, portName) + } else if portInfo["type"] == "p" { if params == nil || params[portName] == "" { - p.inParamPorts[portName] = NewInParamPort(portName) - p.inParamPorts[portName].process = p + p.InitInParamPort(p, portName) } } - if len(m) > 3 { - p.OutFileExtensions[portName] = m[4] + if ext, ok := portInfo["extension"]; ok { + p.OutFileExtensions[portName] = ext } } } @@ -197,13 +201,19 @@ func (p *Process) SetPathReplace(inPortName string, outPortName string, old stri } } -// SetPathPattern allows setting the path of outputs using a pattern similar to the +// SetOut allows setting the path of outputs using a pattern similar to the // command pattern used to create new processes. Available patterns to use are: // {i:inport} // {p:paramname} // {t:tagname} // An example might be: {i:foo}.replace_with_{p:replacement}.txt -func (p *Process) SetPathPattern(outPortName string, pathPattern string) { +// If an out-port with the specified name does not exist, it will be created. +// This allows to create out-ports for filenames that are created without explicitly +// stating a filename on the commandline, such as when only submitting a prefix. +func (p *Process) SetOut(outPortName string, pathPattern string) { + if _, ok := p.outPorts[outPortName]; !ok { + p.InitOutPort(p, outPortName) + } p.SetPathCustom(outPortName, func(t *Task) string { path := pathPattern // Avoiding reusing the same variable in multiple instances of this func r := getShellCommandPlaceHolderRegex() diff --git a/process_test.go b/process_test.go index 5c70cb6..cda24bf 100644 --- a/process_test.go +++ b/process_test.go @@ -1,6 +1,8 @@ package scipipe import ( + "io/ioutil" + "os" "testing" ) @@ -60,13 +62,13 @@ func TestSetPathPattern(t *testing.T) { wf := NewWorkflow("test_wf", 16) p := wf.NewProc("cat_foo", "cat {i:foo} > {o:bar} # {p:p1}") p.InParam("p1").FromStr("p1val") - p.SetPathPattern("bar", "{i:foo}.bar.{p:p1}.txt") + p.SetOut("bar", "{i:foo}.bar.{p:p1}.txt") mockTask := NewTask(wf, p, "echo_foo_task", "", map[string]*FileIP{"foo": NewFileIP("foo.txt")}, nil, nil, map[string]string{"p1": "p1val"}, nil, "", nil, 1) expected := "foo.txt.bar.p1val.txt" if p.PathFormatters["bar"](mockTask) != expected { - t.Errorf(`Did not get expected path in TestSetPathPattern. Got:%v Expected:%v`, p.PathFormatters["bar"](mockTask), expected) + t.Errorf(`Did not get expected path in SetOut. Got:%v Expected:%v`, p.PathFormatters["bar"](mockTask), expected) } } @@ -80,6 +82,34 @@ func TestDefaultPattern(t *testing.T) { // We expact a filename on the form: input filename . procname . paramname _ val . outport . extension expected := "foo.txt.cat_foo.p1_p1val.bar.txt" if p.PathFormatters["bar"](mockTask) != expected { - t.Errorf(`Did not get expected path in TestSetPathPattern. Got:%v Expected:%v`, p.PathFormatters["bar"](mockTask), expected) + t.Errorf(`Did not get expected path in SetOut. Got:%v Expected:%v`, p.PathFormatters["bar"](mockTask), expected) } } + +func TestDontCreatePortInShellCommand(t *testing.T) { + wf := NewWorkflow("test_wf", 4) + ef := wf.NewProc("echo_foo", "echo foo > /tmp/foo.txt") + ef.SetOut("foo", "/tmp/foo.txt") + + cf := wf.NewProc("cat_foo", "cat {i:foo} > {o:footoo}") + cf.In("foo").From(ef.Out("foo")) + cf.SetOut("footoo", "/tmp/footoo.txt") + + wf.Run() + + fileName := "/tmp/footoo.txt" + f, openErr := os.Open(fileName) + if openErr != nil { + t.Errorf("Could not open file: %s\n", fileName) + } + b, readErr := ioutil.ReadAll(f) + if readErr != nil { + t.Errorf("Could not read file: %s\n", fileName) + } + expected := "foo\n" + if string(b) != expected { + t.Errorf("File %s did not contain %s as expected, but %s\n", fileName, expected, string(b)) + } + + cleanFiles("/tmp/foo.txt", "/tmp/footoo.txt") +} diff --git a/sci_test.go b/sci_test.go index 9d5c054..cda419e 100644 --- a/sci_test.go +++ b/sci_test.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "os" "os/exec" + "path/filepath" "reflect" "sync" "testing" @@ -79,36 +80,33 @@ func TestConnectBackwards(t *testing.T) { func TestParameterCommand(t *testing.T) { initTestLogs() - wf := NewWorkflow("TestParameterCommandWf", 16) + + wf := NewWorkflow("TestParameterCommandWf", 4) cmb := NewCombinatoricsProcess("cmb") wf.AddProc(cmb) // An abc file printer abc := wf.NewProc("abc", "echo {p:a} {p:b} {p:c} > {o:out}") - abc.SetPathCustom("out", func(task *Task) string { - return fmt.Sprintf( - "/tmp/%s_%s_%s.txt", - task.Param("a"), - task.Param("b"), - task.Param("c"), - ) - }) + abc.SetOut("out", "/tmp/abc_{p:a}_{p:b}_{p:c}.txt") abc.InParam("a").From(cmb.A) abc.InParam("b").From(cmb.B) abc.InParam("c").From(cmb.C) // A printer process - prt := wf.NewProc("prt", "cat {i:in} >> /tmp/log.txt; rm {i:in} {i:in}.audit.json") + prt := wf.NewProc("prt", "cat {i:in} >> /tmp/log.txt") prt.In("in").From(abc.Out("out")) wf.Run() // Run tests - _, err := os.Stat("/tmp/log.txt") - assertNil(t, err) + filePath := "/tmp/log.txt" + if _, err := os.Stat(filePath); os.IsNotExist(err) { + t.Errorf("File does not exist: %s\n", filePath) + } - cleanFiles("/tmp/log.txt") + cleanFiles(filePath) + cleanFilePatterns("/tmp/abc_*_*_*.txt*") } func TestDontOverWriteExistingOutputs(t *testing.T) { @@ -389,6 +387,20 @@ func cleanFiles(fileNames ...string) { } } +func cleanFilePatterns(filePatterns ...string) { + for _, pattern := range filePatterns { + if matches, err := filepath.Glob(pattern); err == nil { + for _, file := range matches { + if err := os.Remove(file); err != nil { + Failf("Could not remove file: %s\nError: %v\n", file, err) + } + } + } else { + Failf("Could not glob pattern: %s\nError: %v\n", pattern, err) + } + } +} + func assertIsType(t *testing.T, expected interface{}, actual interface{}) { if !reflect.DeepEqual(reflect.TypeOf(expected), reflect.TypeOf(actual)) { t.Errorf("Types do not match! (%s) and (%s)\n", reflect.TypeOf(expected).String(), reflect.TypeOf(actual).String()) diff --git a/task.go b/task.go index 247f265..b6606cc 100644 --- a/task.go +++ b/task.go @@ -1,6 +1,8 @@ package scipipe import ( + "crypto/sha1" + "encoding/hex" "fmt" "os" "os/exec" @@ -62,15 +64,24 @@ func NewTask(workflow *Workflow, process *Process, name string, cmdPat string, i func formatCommand(cmd string, inIPs map[string]*FileIP, outIPs map[string]*FileIP, params map[string]string, tags map[string]string, prepend string) string { r := getShellCommandPlaceHolderRegex() placeHolderMatches := r.FindAllStringSubmatch(cmd, -1) + portInfos := map[string]map[string]string{} for _, placeHolderMatch := range placeHolderMatches { - var filePath string - - placeHolderStr := placeHolderMatch[0] - portType := placeHolderMatch[1] portName := placeHolderMatch[2] - sep := " " // Default + portInfos[portName] = map[string]string{} + portInfos[portName]["match"] = placeHolderMatch[0] + portInfos[portName]["port_type"] = placeHolderMatch[1] + portInfos[portName]["port_name"] = portName + portInfos[portName]["reduce_inputs"] = "false" + // Identify if the place holder represents a reduce-type in-port + if len(placeHolderMatch) > 5 { + portInfos[portName]["reduce_inputs"] = "true" + portInfos[portName]["sep"] = placeHolderMatch[7] + } + } - switch portType { + for portName, portInfo := range portInfos { + var filePath string + switch portInfo["port_type"] { case "o": if outIPs[portName] == nil { Fail("Missing outpath for outport '", portName, "' for command '", cmd, "'") @@ -85,13 +96,7 @@ func formatCommand(cmd string, inIPs map[string]*FileIP, outIPs map[string]*File if inIPs[portName] == nil { Fail("Missing in-IP for inport '", portName, "' for command '", cmd, "'") } - // Identify if the place holder represents a reduce-type in-port - reduceInputs := false - if len(placeHolderMatch) > 5 { - sep = placeHolderMatch[7] - reduceInputs = true - } - if reduceInputs && inIPs[portName].Path() == "" { + if portInfo["reduce_inputs"] == "true" && inIPs[portName].Path() == "" { // Merge multiple input paths from a substream on the IP, into one string ips := []*FileIP{} for ip := range inIPs[portName].SubStream.Chan { @@ -99,17 +104,17 @@ func formatCommand(cmd string, inIPs map[string]*FileIP, outIPs map[string]*File } paths := []string{} for _, ip := range ips { - paths = append(paths, ip.Path()) + paths = append(paths, parentDirPath(ip.Path())) } - filePath = strings.Join(paths, sep) + filePath = strings.Join(paths, portInfo["sep"]) } else { if inIPs[portName].Path() == "" { Fail("Missing inpath for inport '", portName, "', and no substream, for command '", cmd, "'") } if inIPs[portName].doStream { - filePath = inIPs[portName].FifoPath() + filePath = parentDirPath(inIPs[portName].FifoPath()) } else { - filePath = inIPs[portName].Path() + filePath = parentDirPath(inIPs[portName].Path()) } } case "p": @@ -129,7 +134,7 @@ func formatCommand(cmd string, inIPs map[string]*FileIP, outIPs map[string]*File default: Fail("Replace failed for port ", portName, " for command '", cmd, "'") } - cmd = strings.Replace(cmd, placeHolderStr, filePath, -1) + cmd = strings.Replace(cmd, portInfo["match"], filePath, -1) } // Add prepend string to the command @@ -267,10 +272,13 @@ func (t *Task) anyOutputsExist() (anyFileExists bool) { // createDirs creates directories for out-IPs of the task func (t *Task) createDirs() { + os.MkdirAll(t.TempDir(), 0777) for _, oip := range t.OutIPs { oipDir := oip.TempDir() // This will create all out dirs, including the temp dir if oip.doStream { // Temp dirs are not created for fifo files oipDir = filepath.Dir(oip.FifoPath()) + } else { + oipDir = t.TempDir() + "/" + oipDir } err := os.MkdirAll(oipDir, 0777) CheckWithMsg(err, "Could not create directory: "+oipDir) @@ -280,7 +288,7 @@ func (t *Task) createDirs() { // executeCommand executes the shell command cmd via bash func (t *Task) executeCommand(cmd string) { - out, err := exec.Command("bash", "-c", cmd).CombinedOutput() + out, err := exec.Command("bash", "-c", "cd "+t.TempDir()+" && "+cmd+" && cd ..").CombinedOutput() if err != nil { Failf("Command failed!\nCommand:\n%s\n\nOutput:\n%s\nOriginal error:%s\n", cmd, string(out), err.Error()) } @@ -311,11 +319,60 @@ func (t *Task) writeAuditLogs(startTime time.Time, finishTime time.Time) { } } -// Rename temporary output files to their proper file names +// atomizeIPs renames temporary output files/directories to their proper paths func (t *Task) atomizeIPs() { for _, oip := range t.OutIPs { + // Move paths for ports, to final destinations if !oip.doStream { - oip.Atomize() + os.Rename(t.TempDir()+"/"+oip.TempPath(), oip.Path()) + } + } + // For remaining paths in temporary execution dir, just move out of it + filepath.Walk(t.TempDir(), func(tempPath string, fileInfo os.FileInfo, err error) error { + if !fileInfo.IsDir() { + newPath := strings.Replace(tempPath, t.TempDir()+"/", "", 1) + newPath = strings.Replace(newPath, AbsPathPlaceholder+"/", "/", 1) + newPathDir := filepath.Dir(newPath) + if _, err := os.Stat(newPathDir); os.IsNotExist(err) { + os.MkdirAll(newPathDir, 0777) + } + Debug.Println("Moving: ", tempPath, " -> ", newPath) + renameErr := os.Rename(tempPath, newPath) + CheckWithMsg(renameErr, "Could not rename file "+tempPath+" to "+newPath) } + return err + }) + // Remove temporary execution dir + remErr := os.RemoveAll(t.TempDir()) + CheckWithMsg(remErr, "Could not remove temp dir: "+t.TempDir()) +} + +// TempDir returns a string that is unique to a task, suitable for use +// in file paths. It is built up by merging all input filenames and parameter +// values that a task takes as input, joined with dots. +func (t *Task) TempDir() string { + pathPcs := []string{"tmp." + t.Name} + for _, ipName := range sortedFileIPMapKeys(t.InIPs) { + pathPcs = append(pathPcs, filepath.Base(t.InIP(ipName).Path())) + } + for _, paramName := range sortedStringMapKeys(t.Params) { + pathPcs = append(pathPcs, paramName+"_"+t.Param(paramName)) + } + for _, tagName := range sortedStringMapKeys(t.Tags) { + pathPcs = append(pathPcs, tagName+"_"+t.Tag(tagName)) + } + pathSegment := strings.Join(pathPcs, ".") + if len(pathSegment) > 255 { + sha1sum := sha1.Sum([]byte(pathSegment)) + pathSegment = t.Name + "." + hex.EncodeToString(sha1sum[:]) + } + return pathSegment +} + +func parentDirPath(path string) string { + if path[0] == '/' { + return path } + // For relative paths, add ".." to get out of current dir + return "../" + path } diff --git a/task_test.go b/task_test.go new file mode 100644 index 0000000..a623fd6 --- /dev/null +++ b/task_test.go @@ -0,0 +1,26 @@ +package scipipe + +import ( + "testing" +) + +func TestTempDir(t *testing.T) { + tsk := NewTask(nil, nil, "test_task", "echo foo", map[string]*FileIP{"in1": NewFileIP("infile.txt"), "in2": NewFileIP("infile2.txt")}, nil, nil, map[string]string{"p1": "p1val", "p2": "p2val"}, nil, "", nil, 4) + + expected := "tmp.test_task.infile.txt.infile2.txt.p1_p1val.p2_p2val" + actual := tsk.TempDir() + if actual != expected { + t.Errorf("TempDir() was %s Expected: %s", actual, expected) + } +} + +func TestTempDirNotOver255(t *testing.T) { + longFileName := "very_long_filename_______________________________50_______________________________________________100_______________________________________________150_______________________________________________200_______________________________________________250__255_____" + tsk := NewTask(nil, nil, "test_task", "echo foo", map[string]*FileIP{"in1": NewFileIP(longFileName), "in2": NewFileIP("infile2.txt")}, nil, nil, map[string]string{"p1": "p1val", "p2": "p2val"}, nil, "", nil, 4) + + actual := len(tsk.TempDir()) + maxLen := 255 + if actual > 256 { + t.Errorf("TempDir() generated too long a string: %d chars, should be max %d chars\nString was: %s", actual, maxLen, tsk.TempDir()) + } +}