Skip to content


Use temp dir instead of temp file ext, for atomizing, to simplify han…
Browse files Browse the repository at this point in the history
…dling extraneous files
  • Loading branch information
samuell committed Jun 18, 2018
1 parent 54621c3 commit 9f757a2
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 107 deletions.
4 changes: 2 additions & 2 deletions baseprocess.go
Expand Up @@ -40,15 +40,15 @@ func (p *BaseProcess) Workflow() *Workflow {

// InPort returns the in-port with name portName
func (p *BaseProcess) InPort(portName string) *InPort {
if p.inPorts[portName] == nil {
if _, ok := p.inPorts[portName]; !ok {
Failf("No such in-port ('%s') for process '%s'. Please check your workflow code!\n", portName,
return p.inPorts[portName]

// InitInPort adds the in-port port to the process, with name portName
func (p *BaseProcess) InitInPort(proc WorkflowProcess, portName string) {
if p.inPorts[portName] != nil {
if _, ok := p.inPorts[portName]; ok {
Failf("Such an in-port ('%s') already exists for process '%s'. Please check your workflow code!\n", portName,
ipt := NewInPort(portName)
Expand Down
2 changes: 1 addition & 1 deletion examples/filegen/filegen.go
Expand Up @@ -11,7 +11,7 @@ func main() {
fq := spc.NewFileSource(wf, "file_src", "hej1.txt", "hej2.txt", "hej3.txt")

fw := sp.NewProc(wf, "filewriter", "echo {i:in} > {o:out}")
fw.SetPathPattern("out", "{i:in}")
fw.SetOut("out", "{i:in}")

Expand Down
2 changes: 1 addition & 1 deletion examples/param_channels/params.go
Expand Up @@ -15,7 +15,7 @@ func main() {
// An abc file printer
abc := wf.NewProc("abc", "echo {p:a} {p:b} {p:c} > {o:out}; sleep 1")
abc.Spawn = true
abc.SetPathPattern("out", "{p:a}_{p:b}_{p:c}.txt")
abc.SetOut("out", "{p:a}_{p:b}_{p:c}.txt")

// A printer task
prt := wf.NewProc("printer", "cat {i:in} >> log.txt")
Expand Down
85 changes: 40 additions & 45 deletions examples/resequencing/resequencing.go
Expand Up @@ -15,20 +15,20 @@ package main
import (

. ""
sp ""

// ------------------------------------------------------------------------------------
// Set up static stuff like paths
// ------------------------------------------------------------------------------------
const (
fastq_base_url = ""
fastq_file_pat = "%s.ILLUMINA.low_coverage.4p_%s.fq"
ref_base_url = ""
ref_file = "Homo_sapiens.GRCh37.75.dna.chromosome.17.fa"
ref_file_gz = "Homo_sapiens.GRCh37.75.dna.chromosome.17.fa.gz"
vcf_base_url = ""
vcf_file = "ALL.chr17.integrated_phase1_v3.20101123.snps_indels_svs.genotypes.vcf.gz"
fastqBaseUrl = ""
fastQFilePtn = "%s.ILLUMINA.low_coverage.4p_%s.fq"
refBaseUrl = ""
refFile = "Homo_sapiens.GRCh37.75.dna.chromosome.17.fa"
refFileGz = "Homo_sapiens.GRCh37.75.dna.chromosome.17.fa.gz"
vcfBaseUrl = ""
vcfFile = "ALL.chr17.integrated_phase1_v3.20101123.snps_indels_svs.genotypes.vcf.gz"

// ------------------------------------------------------------------------------------
Expand All @@ -43,77 +43,72 @@ func main() {
// --------------------------------------------------------------------------------
// Initialize Workflow
// --------------------------------------------------------------------------------
wf := NewWorkflow("resequencing_wf", 4)
wf := sp.NewWorkflow("resequencing_workflow", 4)

// --------------------------------------------------------------------------------
// Download Reference Genome
// --------------------------------------------------------------------------------
downloadRefCmd := "wget -O {o:outfile} " + ref_base_url + ref_file_gz
downloadRef := wf.NewProc("download_ref", downloadRefCmd)
downloadRef.SetPathStatic("outfile", ref_file_gz)
dlRef := wf.NewProc("download_ref", "wget -O {o:outfile} "+refBaseUrl+refFileGz)
dlRef.SetOut("outfile", refFileGz)

// --------------------------------------------------------------------------------
// Unzip ref file
// --------------------------------------------------------------------------------
ungzipRefCmd := "gunzip -c {i:in} > {o:out}"
ungzipRef := wf.NewProc("ugzip_ref", ungzipRefCmd)
ungzipRef.SetPathReplace("in", "out", ".gz", "")
ungzipRef := wf.NewProc("ugzip_ref", "gunzip -c {i:in} > {o:out}")
ungzipRef.SetOut("out", "{i:in|s/.gz//}")

// --------------------------------------------------------------------------------
// Index Reference Genome
// --------------------------------------------------------------------------------
indexRef := wf.NewProc("index_ref", "bwa index -a bwtsw {i:index}; echo done > {o:done}")
indexRef.SetPathExtend("index", "done", ".indexed")
indexRef.SetOut("done", "{i:index}.indexed")

// Create (multi-level) maps where we can gather outports from processes
// for each for loop iteration and access them in the merge step later
outPorts := map[string]map[string]map[string]*OutPort{}
for _, indv := range individuals {
outPorts[indv] = map[string]map[string]*OutPort{}
for _, smpl := range samples {
outPorts[indv][smpl] = map[string]*OutPort{}
indv_smpl := "_" + indv + "_" + smpl
outs := map[string]map[string]map[string]*sp.OutPort{}
for _, ind := range individuals {
outs[ind] = map[string]map[string]*sp.OutPort{}
for _, spl := range samples {
outs[ind][spl] = map[string]*sp.OutPort{}
indSpl := "_" + ind + "_" + spl

// ------------------------------------------------------------------------
// Download FastQ component
// ------------------------------------------------------------------------
file_name := fmt.Sprintf(fastq_file_pat, indv, smpl)
downloadFastQCmd := "wget -O {o:fastq} " + fastq_base_url + file_name
downloadFastQ := wf.NewProc("download_fastq"+indv_smpl, downloadFastQCmd)
downloadFastQ.SetPathStatic("fastq", file_name)
fastqFile := fmt.Sprintf(fastQFilePtn, ind, spl)
dlFastq := wf.NewProc("download_fastq"+indSpl, "wget -O {o:fastq} "+fastqBaseUrl+fastqFile)
dlFastq.SetOut("fastq", fastqFile)

// Save outPorts for later use
outPorts[indv][smpl]["fastq"] = downloadFastQ.Out("fastq")
outs[ind][spl]["fastq"] = dlFastq.Out("fastq")

// ------------------------------------------------------------------------
// BWA Align
// ------------------------------------------------------------------------
bwaAlignCmd := "bwa aln {i:ref} {i:fastq} > {o:sai} # {i:idxdone}"
bwaAlign := wf.NewProc("bwa_aln"+indv_smpl, bwaAlignCmd)
bwaAlign.SetPathExtend("fastq", "sai", ".sai")
align := wf.NewProc("bwa_aln"+indSpl, "bwa aln {i:ref} {i:fastq} > {o:sai} # {i:idxdone}")
align.SetOut("sai", "{i:fastq}.sai")

// Save outPorts for later use
outPorts[indv][smpl]["sai"] = bwaAlign.Out("sai")
outs[ind][spl]["sai"] = align.Out("sai")

// ---------------------------------------------------------------------------
// Merge
// ---------------------------------------------------------------------------
bwaMergeCmd := "bwa sampe {i:ref} {i:sai1} {i:sai2} {i:fq1} {i:fq2} > {o:merged} # {i:refdone} {p:indv}"
bwaMerge := wf.NewProc("merge_"+indv, bwaMergeCmd)
bwaMerge.SetPathPattern("merged", "{p:indv}.merged.sam")
merge := wf.NewProc("merge_"+ind, "bwa sampe {i:ref} {i:sai1} {i:sai2} {i:fq1} {i:fq2} > {o:merged} # {i:refdone} {p:ind}")
merge.SetOut("merged", "{p:ind}.merged.sam")

// -------------------------------------------------------------------------------
Expand Down
11 changes: 9 additions & 2 deletions ip.go
Expand Up @@ -112,14 +112,21 @@ func (ip *FileIP) Path() string {

// TempDir returns the path to a temporary directory where outputs are written
func (ip *FileIP) TempDir() string {
return ip.path + ".tmp"
return filepath.Dir(ip.TempPath())

// TempPath returns the temporary path of the physical file
func (ip *FileIP) TempPath() string {
return ip.TempDir() + "/" + filepath.Base(ip.path)
if ip.path[0] == '/' {
return AbsPathPlaceholder + ip.path
return ip.path

// AbsPathPlaceholder is a string to use instead of an initial '/', to indicate
// a path that belongs to the absolute root
const AbsPathPlaceholder = "__abs"

// FifoPath returns the path to use when a FIFO file is used instead of a
// normal file
func (ip *FileIP) FifoPath() string {
Expand Down
3 changes: 1 addition & 2 deletions ip_test.go
@@ -1,7 +1,6 @@
package scipipe

import (

Expand All @@ -12,7 +11,7 @@ const (
func TestIPPaths(t *testing.T) {
ip := NewFileIP(TESTPATH)
assertPathsEqual(t, ip.Path(), TESTPATH)
assertPathsEqual(t, ip.TempPath(), TESTPATH+".tmp/"+filepath.Base(TESTPATH))
assertPathsEqual(t, ip.TempPath(), TESTPATH)
assertPathsEqual(t, ip.FifoPath(), TESTPATH+".fifo")

Expand Down
44 changes: 27 additions & 17 deletions process.go
Expand Up @@ -58,29 +58,33 @@ func (p *Process) initPortsFromCmdPattern(cmd string, params map[string]string)
// Find in/out port names and params and set up ports
r := getShellCommandPlaceHolderRegex()
ms := r.FindAllStringSubmatch(cmd, -1)
if len(ms) == 0 {
Fail("No placeholders found in command: " + cmd)

portInfos := map[string]map[string]string{}
for _, m := range ms {
portType := m[1]
portName := m[2]
if portType == "o" || portType == "os" {
p.outPorts[portName] = NewOutPort(portName)
p.outPorts[portName].process = p
if portType == "os" {
portInfos[portName] = map[string]string{}
portInfos[portName]["type"] = portType
if len(m) > 3 {
portInfos[portName]["extension"] = m[4]

for portName, portInfo := range portInfos {
if portInfo["type"] == "o" || portInfo["type"] == "os" {
p.InitOutPort(p, portName)
if portInfo["type"] == "os" {
p.OutPortsDoStream[portName] = true
} else if portType == "i" {
p.inPorts[portName] = NewInPort(portName)
p.inPorts[portName].process = p
} else if portType == "p" {
} else if portInfo["type"] == "i" {
p.InitInPort(p, portName)
} else if portInfo["type"] == "p" {
if params == nil || params[portName] == "" {
p.inParamPorts[portName] = NewInParamPort(portName)
p.inParamPorts[portName].process = p
p.InitInParamPort(p, portName)
if len(m) > 3 {
p.OutFileExtensions[portName] = m[4]
if ext, ok := portInfo["extension"]; ok {
p.OutFileExtensions[portName] = ext
Expand Down Expand Up @@ -197,13 +201,19 @@ func (p *Process) SetPathReplace(inPortName string, outPortName string, old stri

// SetPathPattern allows setting the path of outputs using a pattern similar to the
// SetOut allows setting the path of outputs using a pattern similar to the
// command pattern used to create new processes. Available patterns to use are:
// {i:inport}
// {p:paramname}
// {t:tagname}
// An example might be: {i:foo}.replace_with_{p:replacement}.txt
func (p *Process) SetPathPattern(outPortName string, pathPattern string) {
// If an out-port with the specified name does not exist, it will be created.
// This allows to create out-ports for filenames that are created without explicitly
// stating a filename on the commandline, such as when only submitting a prefix.
func (p *Process) SetOut(outPortName string, pathPattern string) {
if _, ok := p.outPorts[outPortName]; !ok {
p.InitOutPort(p, outPortName)
p.SetPathCustom(outPortName, func(t *Task) string {
path := pathPattern // Avoiding reusing the same variable in multiple instances of this func
r := getShellCommandPlaceHolderRegex()
Expand Down
36 changes: 33 additions & 3 deletions process_test.go
@@ -1,6 +1,8 @@
package scipipe

import (

Expand Down Expand Up @@ -60,13 +62,13 @@ func TestSetPathPattern(t *testing.T) {
wf := NewWorkflow("test_wf", 16)
p := wf.NewProc("cat_foo", "cat {i:foo} > {o:bar} # {p:p1}")
p.SetPathPattern("bar", "{i:foo}.bar.{p:p1}.txt")
p.SetOut("bar", "{i:foo}.bar.{p:p1}.txt")

mockTask := NewTask(wf, p, "echo_foo_task", "", map[string]*FileIP{"foo": NewFileIP("foo.txt")}, nil, nil, map[string]string{"p1": "p1val"}, nil, "", nil, 1)

expected := ""
if p.PathFormatters["bar"](mockTask) != expected {
t.Errorf(`Did not get expected path in TestSetPathPattern. Got:%v Expected:%v`, p.PathFormatters["bar"](mockTask), expected)
t.Errorf(`Did not get expected path in SetOut. Got:%v Expected:%v`, p.PathFormatters["bar"](mockTask), expected)

Expand All @@ -80,6 +82,34 @@ func TestDefaultPattern(t *testing.T) {
// We expact a filename on the form: input filename . procname . paramname _ val . outport . extension
expected := ""
if p.PathFormatters["bar"](mockTask) != expected {
t.Errorf(`Did not get expected path in TestSetPathPattern. Got:%v Expected:%v`, p.PathFormatters["bar"](mockTask), expected)
t.Errorf(`Did not get expected path in SetOut. Got:%v Expected:%v`, p.PathFormatters["bar"](mockTask), expected)

func TestDontCreatePortInShellCommand(t *testing.T) {
wf := NewWorkflow("test_wf", 4)
ef := wf.NewProc("echo_foo", "echo foo > /tmp/foo.txt")
ef.SetOut("foo", "/tmp/foo.txt")

cf := wf.NewProc("cat_foo", "cat {i:foo} > {o:footoo}")
cf.SetOut("footoo", "/tmp/footoo.txt")


fileName := "/tmp/footoo.txt"
f, openErr := os.Open(fileName)
if openErr != nil {
t.Errorf("Could not open file: %s\n", fileName)
b, readErr := ioutil.ReadAll(f)
if readErr != nil {
t.Errorf("Could not read file: %s\n", fileName)
expected := "foo\n"
if string(b) != expected {
t.Errorf("File %s did not contain %s as expected, but %s\n", fileName, expected, string(b))

cleanFiles("/tmp/foo.txt", "/tmp/footoo.txt")

0 comments on commit 9f757a2

Please sign in to comment.